feat:2.4.0

1. 支持多数据源
2.不同的dispatcher线程池隔离
This commit is contained in:
byteblogs168 2023-11-13 00:02:36 +08:00
parent 3234c69f4e
commit 75abf2cc55
29 changed files with 548 additions and 173 deletions

View File

@ -8,9 +8,9 @@ CREATE TABLE group_config
group_status SMALLINT NOT NULL DEFAULT 0,
version INT NOT NULL,
group_partition INT NOT NULL,
route_key SMALLINT NOT NULL,
id_generator_mode SMALLINT NOT NULL DEFAULT 1,
init_scene SMALLINT NOT NULL DEFAULT 0,
bucket_index INT NOT NULL DEFAULT 0,
create_dt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_dt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
@ -90,7 +90,7 @@ COMMENT ON COLUMN "retry_dead_letter_0"."ext_attrs" IS '扩展字段';
COMMENT ON COLUMN "retry_dead_letter_0"."task_type" IS '任务类型 1、重试数据 2、回调数据';
COMMENT ON COLUMN "retry_dead_letter_0"."create_dt" IS '创建时间';
COMMENT ON TABLE "retry_dead_letter_0" IS '死信队列表';
CREATE TABLE retry_task_0
(
id BIGSERIAL PRIMARY KEY,
@ -106,6 +106,7 @@ CREATE TABLE retry_task_0
retry_count INT NOT NULL DEFAULT 0,
retry_status SMALLINT NOT NULL DEFAULT 0,
task_type SMALLINT NOT NULL DEFAULT 1,
route_key SMALLINT NOT NULL,
create_dt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_dt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
@ -289,7 +290,7 @@ COMMENT ON COLUMN "system_user"."role" IS '角色1-普通用户、2-管理员
COMMENT ON COLUMN "system_user"."create_dt" IS '创建时间';
COMMENT ON COLUMN "system_user"."update_dt" IS '修改时间';
COMMENT ON TABLE "system_user" IS '系统用户表';
-- pwd: admin
INSERT INTO system_user (username, password, role)
VALUES ('admin', '465c194afb65670f38322df087f0a9bb225cc257e43eb4ac5a0c98ef5b3173ac', 2);
@ -311,7 +312,7 @@ COMMENT ON COLUMN "system_user_permission"."create_dt" IS '创建时间';
COMMENT ON COLUMN "system_user_permission"."update_dt" IS '修改时间';
COMMENT ON TABLE "system_user_permission" IS '系统用户权限表';
CREATE TABLE sequence_alloc
(
id BIGSERIAL PRIMARY KEY,
@ -327,4 +328,4 @@ COMMENT ON COLUMN "sequence_alloc"."group_name" IS '组名称';
COMMENT ON COLUMN "sequence_alloc"."max_id" IS '最大id';
COMMENT ON COLUMN "sequence_alloc"."step" IS '步长';
COMMENT ON COLUMN "sequence_alloc"."update_dt" IS '更新时间';
COMMENT ON TABLE "sequence_alloc" IS '号段模式序号ID分配表';
COMMENT ON TABLE "sequence_alloc" IS '号段模式序号ID分配表';

View File

@ -1,7 +1,10 @@
package com.aizuda.easy.retry.client.job.core.cache;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Component;
import java.text.MessageFormat;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
@ -16,14 +19,20 @@ import java.util.function.Supplier;
* @since : 2.4.0
*/
@Component
@Slf4j
public class ThreadPoolCache {
private static final ConcurrentHashMap<Long, ThreadPoolExecutor> CACHE_THREAD_POOL = new ConcurrentHashMap<>();
public static ThreadPoolExecutor createThreadPool(Long taskBatchId, int parallelNum) {
if (CACHE_THREAD_POOL.containsKey(taskBatchId)) {
return CACHE_THREAD_POOL.get(taskBatchId);
}
Supplier<ThreadPoolExecutor> supplier = () -> {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
parallelNum, parallelNum, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
parallelNum, parallelNum, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
new CustomizableThreadFactory(MessageFormat.format("easy-retry-job-{0}-", taskBatchId)));
threadPoolExecutor.allowCoreThreadTimeOut(true);
return threadPoolExecutor;
};

View File

@ -13,4 +13,6 @@ public class JobArgs {
private String argsStr;
private String executorInfo;
private Long taskBatchId;
}

View File

@ -14,6 +14,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadPoolExecutor;
@ -26,6 +27,7 @@ import java.util.concurrent.TimeUnit;
* @date : 2023-09-27 09:48
* @since 2.4.0
*/
@Slf4j
public abstract class AbstractJobExecutor implements IJobExecutor {
@Override
@ -58,6 +60,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
JobArgs jobArgs = new JobArgs();
jobArgs.setArgsStr(jobContext.getArgsStr());
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
jobArgs.setTaskBatchId(jobContext.getTaskBatchId());
return jobArgs;
}

View File

@ -39,7 +39,7 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
@Override
public void onSuccess(ExecuteResult result) {
// 上报执行成功
log.info("任务执行成功 [{}]", JsonUtil.toJsonString(result));
log.warn("任务执行成功 taskBatchId:[{}] [{}]", jobContext.getTaskBatchId(), JsonUtil.toJsonString(result));
if (Objects.isNull(result)) {
result = ExecuteResult.success();
@ -70,7 +70,7 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
@Override
public void onFailure(final Throwable t) {
// 上报执行失败
log.error("任务执行失败 jobTask:[{}]", jobContext.getTaskId(), t);
log.error("任务执行失败 任务执行成功 taskBatchId:[{}]", jobContext.getTaskBatchId(), t);
try {
ExecuteResult failure = ExecuteResult.failure();

View File

@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.easy.retry.template.datasource.persistence.mapper.JobLogMessageMapper">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage">
<id column="id" property="id" />
<result column="group_name" property="groupName" />
<result column="job_id" property="jobId" />
<result column="task_id" property="taskId" />
<result column="client_address" property="clientAddress"/>
<result column="task_batch_id" property="taskBatchId" />
<result column="create_dt" property="createDt" />
<result column="message" property="message" />
</resultMap>
</mapper>

View File

@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.Job">
<id column="id" property="id"/>
<result column="group_name" property="groupName"/>
<result column="job_name" property="jobName"/>
<result column="args_str" property="argsStr"/>
<result column="args_type" property="argsType"/>
<result column="ext_attrs" property="extAttrs"/>
<result column="next_trigger_at" property="nextTriggerAt"/>
<result column="job_status" property="jobStatus"/>
<result column="route_key" property="routeKey"/>
<result column="executor_type" property="executorType"/>
<result column="executor_info" property="executorInfo"/>
<result column="block_strategy" property="blockStrategy"/>
<result column="executor_timeout" property="executorTimeout"/>
<result column="max_retry_times" property="maxRetryTimes"/>
<result column="retry_interval" property="retryInterval"/>
<result column="bucket_index" property="bucketIndex"/>
<result column="description" property="description"/>
<result column="create_dt" property="createDt"/>
<result column="update_dt" property="updateDt"/>
<result column="deleted" property="deleted"/>
</resultMap>
<update id="updateBatchNextTriggerAtById" parameterType="java.util.List">
update job rt,
(
<foreach collection="list" item="item" index="index" separator=" union all ">
select
#{item.nextTriggerAt} as next_trigger_at,
#{item.id} as id
</foreach>
) tt
set
rt.next_trigger_at = tt.next_trigger_at
where rt.id = tt.id
</update>
</mapper>

View File

@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch">
<id column="id" property="id" />
<result column="group_name" property="groupName" />
<result column="job_id" property="jobId" />
<result column="task_batch_status" property="taskBatchStatus" />
<result column="create_dt" property="createDt" />
<result column="update_dt" property="updateDt" />
<result column="deleted" property="deleted" />
</resultMap>
<select id="selectJobBatchList"
parameterType="com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchQueryDO"
resultType="com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchResponseDO">
SELECT a.*, b.job_name, b.task_type, b.block_strategy, b.trigger_type
FROM job_task_batch a join job b on a.job_id = b.id
<where>
<if test="queryDO.jobId != null">
and job_id = #{queryDO.jobId}
</if>
<if test="queryDO.groupName != null">
and a.group_name = #{queryDO.groupName}
</if>
<if test="queryDO.taskBatchStatus != null">
and task_batch_status = #{queryDO.taskBatchStatus}
</if>
<if test="queryDO.jobName != null">
and job_name like #{queryDO.jobName}
</if>
and a.deleted = 0
order by a.id desc
</where>
</select>
</mapper>

View File

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.JobTask">
<id column="id" property="id" />
<result column="group_name" property="groupName" />
<result column="job_id" property="jobId" />
<result column="task_batch_id" property="taskBatchId" />
<result column="parent_id" property="parentId" />
<result column="task_status" property="taskStatus" />
<result column="result_message" property="resultMessage" />
<result column="create_dt" property="createDt" />
<result column="update_dt" property="updateDt" />
</resultMap>
</mapper>

View File

@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.easy.retry.template.datasource.persistence.mapper.JobLogMessageMapper">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage">
<id column="id" property="id" />
<result column="group_name" property="groupName" />
<result column="job_id" property="jobId" />
<result column="task_id" property="taskId" />
<result column="client_address" property="clientAddress"/>
<result column="task_batch_id" property="taskBatchId" />
<result column="create_dt" property="createDt" />
<result column="message" property="message" />
</resultMap>
</mapper>

View File

@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.Job">
<id column="id" property="id"/>
<result column="group_name" property="groupName"/>
<result column="job_name" property="jobName"/>
<result column="args_str" property="argsStr"/>
<result column="args_type" property="argsType"/>
<result column="ext_attrs" property="extAttrs"/>
<result column="next_trigger_at" property="nextTriggerAt"/>
<result column="job_status" property="jobStatus"/>
<result column="route_key" property="routeKey"/>
<result column="executor_type" property="executorType"/>
<result column="executor_info" property="executorInfo"/>
<result column="block_strategy" property="blockStrategy"/>
<result column="executor_timeout" property="executorTimeout"/>
<result column="max_retry_times" property="maxRetryTimes"/>
<result column="retry_interval" property="retryInterval"/>
<result column="bucket_index" property="bucketIndex"/>
<result column="description" property="description"/>
<result column="create_dt" property="createDt"/>
<result column="update_dt" property="updateDt"/>
<result column="deleted" property="deleted"/>
</resultMap>
<update id="updateBatchNextTriggerAtById" parameterType="java.util.List">
update job rt,
(
<foreach collection="list" item="item" index="index" separator=" union all ">
select
#{item.nextTriggerAt} as next_trigger_at,
#{item.id} as id
</foreach>
) tt
set
rt.next_trigger_at = tt.next_trigger_at
where rt.id = tt.id
</update>
</mapper>

View File

@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch">
<id column="id" property="id" />
<result column="group_name" property="groupName" />
<result column="job_id" property="jobId" />
<result column="task_batch_status" property="taskBatchStatus" />
<result column="create_dt" property="createDt" />
<result column="update_dt" property="updateDt" />
<result column="deleted" property="deleted" />
</resultMap>
<select id="selectJobBatchList"
parameterType="com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchQueryDO"
resultType="com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchResponseDO">
SELECT a.*, b.job_name, b.task_type, b.block_strategy, b.trigger_type
FROM job_task_batch a join job b on a.job_id = b.id
<where>
<if test="queryDO.jobId != null">
and job_id = #{queryDO.jobId}
</if>
<if test="queryDO.groupName != null">
and a.group_name = #{queryDO.groupName}
</if>
<if test="queryDO.taskBatchStatus != null">
and task_batch_status = #{queryDO.taskBatchStatus}
</if>
<if test="queryDO.jobName != null">
and job_name like #{queryDO.jobName}
</if>
and a.deleted = 0
order by a.id desc
</where>
</select>
</mapper>

View File

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.JobTask">
<id column="id" property="id" />
<result column="group_name" property="groupName" />
<result column="job_id" property="jobId" />
<result column="task_batch_id" property="taskBatchId" />
<result column="parent_id" property="parentId" />
<result column="task_status" property="taskStatus" />
<result column="result_message" property="resultMessage" />
<result column="create_dt" property="createDt" />
<result column="update_dt" property="updateDt" />
</resultMap>
</mapper>

View File

@ -13,18 +13,32 @@ import com.aizuda.easy.retry.common.core.context.SpringContext;
*/
public class ActorGenerator {
/*----------------------------------------系统通用配置 START----------------------------------------*/
public static final String SCAN_BUCKET_ACTOR = "ScanBucketActor";
public static final String REQUEST_HANDLER_ACTOR = "RequestHandlerActor";
private static final String COMMON_LOG_DISPATCHER = "akka.actor.common-log-dispatcher";
private static final String COMMON_SCAN_TASK_DISPATCHER = "akka.actor.common-scan-task-dispatcher";
private static final String NETTY_RECEIVE_REQUEST_DISPATCHER = "akka.actor.netty-receive-request-dispatcher";
/*----------------------------------------系统通用配置 END----------------------------------------*/
/*----------------------------------------分布式重试任务 START----------------------------------------*/
public static final String SCAN_CALLBACK_GROUP_ACTOR = "ScanCallbackGroupActor";
public static final String SCAN_RETRY_GROUP_ACTOR = "ScanGroupActor";
public static final String SCAN_BUCKET_ACTOR = "ScanBucketActor";
public static final String FINISH_ACTOR = "FinishActor";
public static final String FAILURE_ACTOR = "FailureActor";
public static final String NO_RETRY_ACTOR = "NoRetryActor";
public static final String EXEC_CALLBACK_UNIT_ACTOR = "ExecCallbackUnitActor";
public static final String EXEC_UNIT_ACTOR = "ExecUnitActor";
public static final String LOG_ACTOR = "LogActor";
public static final String REQUEST_HANDLER_ACTOR = "RequestHandlerActor";
/*----------------------------------------分布式任务调度----------------------------------------*/
private static final String RETRY_TASK_EXECUTOR_DISPATCHER = "akka.actor.retry-task-executor-dispatcher";
private static final String RETRY_TASK_EXECUTOR_RESULT_DISPATCHER = "akka.actor.retry-task-executor-result-dispatcher";
/*----------------------------------------分布式重试任务 END----------------------------------------*/
/*----------------------------------------分布式任务调度 START----------------------------------------*/
public static final String SCAN_JOB_ACTOR = "ScanJobActor";
public static final String JOB_TASK_PREPARE_ACTOR = "JobTaskPrepareActor";
public static final String JOB_EXECUTOR_ACTOR = "JobExecutorActor";
@ -33,6 +47,14 @@ public class ActorGenerator {
public static final String REAL_JOB_EXECUTOR_ACTOR = "RealJobExecutorActor";
public static final String REAL_STOP_TASK_INSTANCE_ACTOR = "RealStopTaskInstanceActor";
/*----------------------------------------dispatcher----------------------------------------*/
private static final String JOB_TASK_DISPATCHER = "akka.actor.job-task-prepare-dispatcher";
private static final String JOB_TASK_EXECUTOR_DISPATCHER = "akka.actor.job-task-executor-dispatcher";
private static final String JOB_TASK_EXECUTOR_RESULT_DISPATCHER = "akka.actor.job-task-executor-result-dispatcher";
private static final String JOB_TASK_EXECUTOR_CALL_CLIENT_DISPATCHER = "akka.actor.job-task-executor-call-client-dispatcher";
/*----------------------------------------分布式任务调度 END----------------------------------------*/
private ActorGenerator() {}
/**
@ -41,7 +63,7 @@ public class ActorGenerator {
* @return actor 引用
*/
public static ActorRef finishActor() {
return getDispatchResultActorSystem().actorOf(getSpringExtension().props(FINISH_ACTOR));
return getRetryActorSystem().actorOf(getSpringExtension().props(FINISH_ACTOR).withDispatcher(RETRY_TASK_EXECUTOR_RESULT_DISPATCHER));
}
/**
@ -50,7 +72,7 @@ public class ActorGenerator {
* @return actor 引用
*/
public static ActorRef failureActor() {
return getDispatchResultActorSystem().actorOf(getSpringExtension().props(FAILURE_ACTOR));
return getRetryActorSystem().actorOf(getSpringExtension().props(FAILURE_ACTOR).withDispatcher(RETRY_TASK_EXECUTOR_RESULT_DISPATCHER));
}
/**
@ -59,7 +81,7 @@ public class ActorGenerator {
* @return actor 引用
*/
public static ActorRef noRetryActor() {
return getDispatchResultActorSystem().actorOf(getSpringExtension().props(NO_RETRY_ACTOR));
return getRetryActorSystem().actorOf(getSpringExtension().props(NO_RETRY_ACTOR).withDispatcher(RETRY_TASK_EXECUTOR_RESULT_DISPATCHER));
}
/**
@ -68,7 +90,7 @@ public class ActorGenerator {
* @return actor 引用
*/
public static ActorRef execCallbackUnitActor() {
return getDispatchResultActorSystem().actorOf(getSpringExtension().props(EXEC_CALLBACK_UNIT_ACTOR));
return getRetryActorSystem().actorOf(getSpringExtension().props(EXEC_CALLBACK_UNIT_ACTOR).withDispatcher(RETRY_TASK_EXECUTOR_DISPATCHER));
}
/**
@ -77,7 +99,9 @@ public class ActorGenerator {
* @return actor 引用
*/
public static ActorRef execUnitActor() {
return getDispatchExecUnitActorSystem().actorOf(getSpringExtension().props(EXEC_UNIT_ACTOR));
return getRetryActorSystem().actorOf(getSpringExtension()
.props(EXEC_UNIT_ACTOR)
.withDispatcher(RETRY_TASK_EXECUTOR_DISPATCHER));
}
/**
@ -86,7 +110,9 @@ public class ActorGenerator {
* @return actor 引用
*/
public static ActorRef scanGroupActor() {
return getDispatchRetryActorSystem().actorOf(getSpringExtension().props(SCAN_RETRY_GROUP_ACTOR));
return getCommonActorSystemSystem().actorOf(getSpringExtension()
.props(SCAN_RETRY_GROUP_ACTOR)
.withDispatcher(COMMON_SCAN_TASK_DISPATCHER));
}
/**
@ -95,7 +121,9 @@ public class ActorGenerator {
* @return actor 引用
*/
public static ActorRef scanCallbackGroupActor() {
return getDispatchRetryActorSystem().actorOf(getSpringExtension().props(SCAN_CALLBACK_GROUP_ACTOR));
return getRetryActorSystem().actorOf(getSpringExtension()
.props(SCAN_CALLBACK_GROUP_ACTOR)
.withDispatcher(COMMON_SCAN_TASK_DISPATCHER));
}
/**
@ -104,7 +132,9 @@ public class ActorGenerator {
* @return actor 引用
*/
public static ActorRef scanJobActor() {
return getDispatchRetryActorSystem().actorOf(getSpringExtension().props(SCAN_JOB_ACTOR));
return getCommonActorSystemSystem().actorOf(getSpringExtension()
.props(SCAN_JOB_ACTOR)
.withDispatcher(COMMON_SCAN_TASK_DISPATCHER));
}
/**
@ -113,7 +143,9 @@ public class ActorGenerator {
* @return actor 引用
*/
public static ActorRef scanBucketActor() {
return getDispatchRetryActorSystem().actorOf(getSpringExtension().props(SCAN_BUCKET_ACTOR));
return getCommonActorSystemSystem().actorOf(getSpringExtension()
.props(SCAN_BUCKET_ACTOR)
.withDispatcher(COMMON_SCAN_TASK_DISPATCHER));
}
/**
@ -122,7 +154,9 @@ public class ActorGenerator {
* @return actor 引用
*/
public static ActorRef logActor() {
return getLogActorSystemSystem().actorOf(getSpringExtension().props(LOG_ACTOR));
return getCommonActorSystemSystem().actorOf(getSpringExtension()
.props(LOG_ACTOR)
.withDispatcher(COMMON_LOG_DISPATCHER));
}
/**
@ -131,7 +165,8 @@ public class ActorGenerator {
* @return actor 引用
*/
public static ActorRef requestHandlerActor() {
return getNettyActorSystem().actorOf(getSpringExtension().props(REQUEST_HANDLER_ACTOR));
return getNettyActorSystem().actorOf(getSpringExtension().props(REQUEST_HANDLER_ACTOR)
.withDispatcher(NETTY_RECEIVE_REQUEST_DISPATCHER));
}
@ -141,7 +176,8 @@ public class ActorGenerator {
* @return actor 引用
*/
public static ActorRef jobTaskPrepareActor() {
return getJobActorSystem().actorOf(getSpringExtension().props(JOB_TASK_PREPARE_ACTOR));
return getJobActorSystem().actorOf(getSpringExtension().props(JOB_TASK_PREPARE_ACTOR)
.withDispatcher(JOB_TASK_DISPATCHER));
}
/**
@ -150,7 +186,11 @@ public class ActorGenerator {
* @return actor 引用
*/
public static ActorRef jobTaskExecutorActor() {
return getJobActorSystem().actorOf(getSpringExtension().props(JOB_EXECUTOR_ACTOR));
return getJobActorSystem()
.actorOf(getSpringExtension()
.props(JOB_EXECUTOR_ACTOR)
.withDispatcher(JOB_TASK_EXECUTOR_DISPATCHER)
);
}
/**
@ -159,7 +199,9 @@ public class ActorGenerator {
* @return actor 引用
*/
public static ActorRef jobTaskExecutorResultActor() {
return getJobActorSystem().actorOf(getSpringExtension().props(JOB_EXECUTOR_RESULT_ACTOR));
return getJobActorSystem().actorOf(getSpringExtension()
.props(JOB_EXECUTOR_RESULT_ACTOR)
.withDispatcher(JOB_TASK_EXECUTOR_RESULT_DISPATCHER));
}
/**
@ -168,7 +210,9 @@ public class ActorGenerator {
* @return actor 引用
*/
public static ActorRef jobRealTaskExecutorActor() {
return getJobActorSystem().actorOf(getSpringExtension().props(REAL_JOB_EXECUTOR_ACTOR));
return getJobActorSystem().actorOf(getSpringExtension()
.props(REAL_JOB_EXECUTOR_ACTOR)
.withDispatcher(JOB_TASK_EXECUTOR_CALL_CLIENT_DISPATCHER));
}
/**
@ -177,7 +221,8 @@ public class ActorGenerator {
* @return actor 引用
*/
public static ActorRef jobRealStopTaskInstanceActor() {
return getJobActorSystem().actorOf(getSpringExtension().props(REAL_STOP_TASK_INSTANCE_ACTOR));
return getJobActorSystem().actorOf(getSpringExtension().props(REAL_STOP_TASK_INSTANCE_ACTOR)
.withDispatcher(JOB_TASK_EXECUTOR_CALL_CLIENT_DISPATCHER));
}
/**
@ -186,35 +231,20 @@ public class ActorGenerator {
* @return actor 引用
*/
public static ActorRef jobLogActor() {
return getJobActorSystem().actorOf(getSpringExtension().props(JOB_LOG_ACTOR));
return getCommonActorSystemSystem().actorOf(getSpringExtension().props(JOB_LOG_ACTOR)
.withDispatcher(COMMON_LOG_DISPATCHER));
}
public static SpringExtension getSpringExtension() {
return SpringContext.getBeanByType(SpringExtension.class);
}
/**
* 重试任务提取器
* @return
*/
public static ActorSystem getDispatchRetryActorSystem() {
return SpringContext.getBean("dispatchRetryActorSystem", ActorSystem.class);
}
/**
* 重试任务分发器
* @return
*/
public static ActorSystem getDispatchExecUnitActorSystem() {
return SpringContext.getBean("dispatchExecUnitActorSystem", ActorSystem.class);
}
/**
/**
* 重试任务结果分发器
* @return
*/
public static ActorSystem getDispatchResultActorSystem() {
return SpringContext.getBean("dispatchResultActorSystem", ActorSystem.class);
public static ActorSystem getRetryActorSystem() {
return SpringContext.getBean("retryActorSystem", ActorSystem.class);
}
/**
@ -222,8 +252,8 @@ public class ActorGenerator {
*
* @return
*/
public static ActorSystem getLogActorSystemSystem() {
return SpringContext.getBean("logActorSystem", ActorSystem.class);
public static ActorSystem getCommonActorSystemSystem() {
return SpringContext.getBean("commonActorSystem", ActorSystem.class);
}
@ -245,5 +275,4 @@ public class ActorGenerator {
return SpringContext.getBean("jobActorSystem", ActorSystem.class);
}
}

View File

@ -1,6 +1,7 @@
package com.aizuda.easy.retry.server.common.akka;
import akka.actor.ActorSystem;
import com.typesafe.config.ConfigFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
@ -15,63 +16,26 @@ import org.springframework.context.annotation.Configuration;
@Configuration
public class AkkaConfiguration {
private static final String DISPATCH_RETRY_ACTOR_SYSTEM = "DISPATCH_RETRY_ACTOR_SYSTEM";
private static final String DISPATCH_EXEC_UNIT_RETRY_ACTOR_SYSTEM = "DISPATCH_EXEC_UNIT_RETRY_ACTOR_SYSTEM";
private static final String DISPATCH_RESULT_ACTOR_SYSTEM = "DISPATCH_RESULT_ACTOR_SYSTEM";
private static final String LOG_ACTOR_SYSTEM = "LOG_ACTOR_SYSTEM";
private static final String CONFIG_NAME = "easyretry";
private static final String NETTY_ACTOR_SYSTEM = "NETTY_ACTOR_SYSTEM";
private static final String JOB_ACTOR_SYSTEM = "JOB_ACTOR_SYSTEM";
private static final String RETRY_ACTOR_SYSTEM = "RETRY_ACTOR_SYSTEM";
private static final String COMMON_ACTOR_SYSTEM = "COMMON_ACTOR_SYSTEM";
@Autowired
private ApplicationContext applicationContext;
@Autowired
private SpringExtension springExtension;
/**
* 重试分发ActorSystem
*
* @return {@link ActorSystem} 顶级actor
*/
@Bean("dispatchRetryActorSystem")
public ActorSystem createDispatchRetryActorSystem() {
ActorSystem system = ActorSystem.create(DISPATCH_RETRY_ACTOR_SYSTEM);
springExtension.initialize(applicationContext);
return system;
}
/**
* 重试分发执行ActorSystem
*
* @return {@link ActorSystem} 顶级actor
*/
@Bean("dispatchExecUnitActorSystem")
public ActorSystem createDispatchExecUnitRetryActorSystem() {
ActorSystem system = ActorSystem.create(DISPATCH_EXEC_UNIT_RETRY_ACTOR_SYSTEM);
springExtension.initialize(applicationContext);
return system;
}
/**
* 重试分发执行结果处理ActorSystem
*
* @return {@link ActorSystem} 顶级actor
*/
@Bean("dispatchResultActorSystem")
public ActorSystem createDispatchResultRetryActorSystem() {
ActorSystem system = ActorSystem.create(DISPATCH_RESULT_ACTOR_SYSTEM);
springExtension.initialize(applicationContext);
return system;
}
/**
* 日志处理
*
* @return {@link ActorSystem} 顶级actor
*/
@Bean("logActorSystem")
@Bean("commonActorSystem")
public ActorSystem createLogActorSystem() {
ActorSystem system = ActorSystem.create(LOG_ACTOR_SYSTEM);
ActorSystem system = ActorSystem.create(COMMON_ACTOR_SYSTEM, ConfigFactory.load(CONFIG_NAME));
springExtension.initialize(applicationContext);
return system;
}
@ -83,11 +47,24 @@ public class AkkaConfiguration {
*/
@Bean("nettyActorSystem")
public ActorSystem nettyActorSystem() {
ActorSystem system = ActorSystem.create(NETTY_ACTOR_SYSTEM);
ActorSystem system = ActorSystem.create(NETTY_ACTOR_SYSTEM, ConfigFactory.load(CONFIG_NAME));
springExtension.initialize(applicationContext);
return system;
}
/**
* 处理retry调度
*
* @return {@link ActorSystem} 顶级actor
*/
@Bean("retryActorSystem")
public ActorSystem retryActorSystem() {
ActorSystem system = ActorSystem.create(RETRY_ACTOR_SYSTEM, ConfigFactory.load(CONFIG_NAME));
springExtension.initialize(applicationContext);
return system;
}
/**
* 处理job调度
*
@ -95,8 +72,9 @@ public class AkkaConfiguration {
*/
@Bean("jobActorSystem")
public ActorSystem jobActorSystem() {
ActorSystem system = ActorSystem.create(JOB_ACTOR_SYSTEM);
ActorSystem system = ActorSystem.create(JOB_ACTOR_SYSTEM, ConfigFactory.load(CONFIG_NAME));
springExtension.initialize(applicationContext);
return system;
}
}

View File

@ -19,11 +19,6 @@ public class JobPartitionTask extends PartitionTask {
*/
private String groupName;
/**
* 名称
*/
private String jobName;
/**
* 下次触发时间
*/

View File

@ -19,11 +19,6 @@ public class JobTaskPrepareDTO {
*/
private String groupName;
/**
* 名称
*/
private String jobName;
/**
* 下次触发时间
*/

View File

@ -6,6 +6,7 @@ import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
@ -38,6 +39,7 @@ import org.springframework.transaction.support.TransactionSynchronizationManager
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@ -61,6 +63,7 @@ public class JobExecutorActor extends AbstractActor {
public Receive createReceive() {
return receiveBuilder().match(TaskExecuteDTO.class, taskExecute -> {
try {
log.info("准备执行任务. [{}] [{}]", LocalDateTime.now(), JsonUtil.toJsonString(taskExecute));
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {
@ -112,6 +115,7 @@ public class JobExecutorActor extends AbstractActor {
context.setJobId(job.getId());
jobExecutor.execute(context);
} finally {
log.info("准备执行任务完成.[{}]", JsonUtil.toJsonString(taskExecute));
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCompletion(int status) {

View File

@ -4,6 +4,7 @@ import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
@ -49,41 +50,46 @@ public class JobExecutorResultActor extends AbstractActor {
public Receive createReceive() {
return receiveBuilder().match(JobExecutorResultDTO.class, result -> {
log.info("更新任务状态. 参数:[{}]", JsonUtil.toJsonString(result));
try {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {
JobTask jobTask = new JobTask();
jobTask.setTaskStatus(result.getTaskStatus());
if (Objects.nonNull(result.getResult())) {
jobTask.setResultMessage(JsonUtil.toJsonString(result.getResult()));
}
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {
JobTask jobTask = new JobTask();
jobTask.setTaskStatus(result.getTaskStatus());
if (Objects.nonNull(result.getResult())) {
jobTask.setResultMessage(JsonUtil.toJsonString(result.getResult()));
}
Assert.isTrue(1 == jobTaskMapper.update(jobTask,
new LambdaUpdateWrapper<JobTask>().eq(JobTask::getId, result.getTaskId())),
()-> new EasyRetryServerException("更新任务实例失败"));
Assert.isTrue(1 == jobTaskMapper.update(jobTask,
new LambdaUpdateWrapper<JobTask>().eq(JobTask::getId, result.getTaskId())),
()-> new EasyRetryServerException("更新任务实例失败"));
// 更新批次上的状态
boolean complete = jobTaskBatchHandler.complete(result.getTaskBatchId(), result.getJobOperationReason());
if (complete) {
// 尝试停止任务
// 若是集群任务则客户端会主动关闭
if (result.getTaskType() != TaskTypeEnum.CLUSTER.getType()) {
JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(result.getTaskType());
TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(result);
stopJobContext.setNeedUpdateTaskStatus(Boolean.FALSE);
stopJobContext.setForceStop(Boolean.TRUE);
instanceInterrupt.stop(stopJobContext);
// 更新批次上的状态
boolean complete = jobTaskBatchHandler.complete(result.getTaskBatchId(), result.getJobOperationReason());
if (complete) {
// 尝试停止任务
// 若是集群任务则客户端会主动关闭
if (result.getTaskType() != TaskTypeEnum.CLUSTER.getType()) {
JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(result.getTaskType());
TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(result);
stopJobContext.setNeedUpdateTaskStatus(Boolean.FALSE);
stopJobContext.setForceStop(Boolean.TRUE);
instanceInterrupt.stop(stopJobContext);
}
}
}
}
});
});
JobLogDTO jobLogDTO = JobTaskConverter.INSTANCE.toJobLogDTO(result);
jobLogDTO.setMessage(result.getMessage());
jobLogDTO.setTaskId(result.getTaskId());
ActorRef actorRef = ActorGenerator.jobLogActor();
actorRef.tell(jobLogDTO, actorRef);
JobLogDTO jobLogDTO = JobTaskConverter.INSTANCE.toJobLogDTO(result);
jobLogDTO.setMessage(result.getMessage());
jobLogDTO.setTaskId(result.getTaskId());
ActorRef actorRef = ActorGenerator.jobLogActor();
actorRef.tell(jobLogDTO, actorRef);
} catch (Exception e) {
LogUtils.error(log, " job executor result exception. [{}]", result, e);
} finally {
getContext().stop(getSelf());
}
}).build();
}

View File

@ -51,7 +51,8 @@ public class JobTaskPrepareActor extends AbstractActor {
private void doPrepare(JobTaskPrepareDTO prepare) {
List<JobTaskBatch> notCompleteJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
List<JobTaskBatch> notCompleteJobTaskBatchList = jobTaskBatchMapper
.selectList(new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getJobId, prepare.getJobId())
.in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE));

View File

@ -29,10 +29,8 @@ import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.time.LocalDateTime;
import java.util.*;
/**
@ -72,7 +70,7 @@ public class ScanJobTaskActor extends AbstractActor {
}
long total = PartitionTaskUtils.process(startId -> listAvailableJobs(startId, scanTask),
this::processJobPartitionTasks, 0);
this::processJobPartitionTasks, 0);
log.info("job scan end. total:[{}]", total);
}
@ -81,8 +79,9 @@ public class ScanJobTaskActor extends AbstractActor {
List<Job> waitUpdateJobs = new ArrayList<>();
List<JobTaskPrepareDTO> waitExecJobs = new ArrayList<>();
long now = DateUtils.toNowMilli();
for (PartitionTask partitionTask : partitionTasks) {
processJob((JobPartitionTask) partitionTask, waitUpdateJobs, waitExecJobs);
processJob((JobPartitionTask) partitionTask, waitUpdateJobs, waitExecJobs, now);
}
// 批量更新
@ -96,9 +95,8 @@ public class ScanJobTaskActor extends AbstractActor {
}
private void processJob(JobPartitionTask partitionTask, final List<Job> waitUpdateJobs,
final List<JobTaskPrepareDTO> waitExecJobs) {
final List<JobTaskPrepareDTO> waitExecJobs, long now) {
CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName());
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(partitionTask);
Job job = new Job();
job.setId(partitionTask.getId());
@ -107,14 +105,13 @@ public class ScanJobTaskActor extends AbstractActor {
Long nextTriggerAt = ResidentTaskCache.get(partitionTask.getId());
if (needCalculateNextTriggerTime(partitionTask)) {
// 更新下次触发时间
nextTriggerAt = calculateNextTriggerTime(partitionTask);
nextTriggerAt = calculateNextTriggerTime(partitionTask, now);
} else {
// 若常驻任务的缓存时间为空则触发一次任务调度说明常驻任务长时间未更新或者是系统刚刚启动
triggerTask = Objects.isNull(nextTriggerAt);
// 若出现常驻任务时间为null或者常驻任务的内存时间长期未更新, 刷新为now
long now = System.currentTimeMillis();
if (Objects.isNull(nextTriggerAt)
|| (nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) {
|| (nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) {
nextTriggerAt = now;
}
}
@ -124,7 +121,7 @@ public class ScanJobTaskActor extends AbstractActor {
waitUpdateJobs.add(job);
if (triggerTask) {
waitExecJobs.add(jobTaskPrepare);
waitExecJobs.add(JobTaskConverter.INSTANCE.toJobTaskPrepare(partitionTask));
}
}
@ -136,12 +133,12 @@ public class ScanJobTaskActor extends AbstractActor {
return !Objects.equals(StatusEnum.YES.getStatus(), partitionTask.getResident());
}
private Long calculateNextTriggerTime(JobPartitionTask partitionTask) {
private Long calculateNextTriggerTime(JobPartitionTask partitionTask, long now) {
long now = System.currentTimeMillis();
long nextTriggerAt = partitionTask.getNextTriggerAt();
if ((nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) {
nextTriggerAt = now;
partitionTask.setNextTriggerAt(nextTriggerAt);
}
// 更新下次触发时间
@ -158,13 +155,17 @@ public class ScanJobTaskActor extends AbstractActor {
return Collections.emptyList();
}
List<Job> jobs = jobMapper.selectPage(new PageDTO<Job>(0, systemProperties.getJobPullPageSize()),
new LambdaQueryWrapper<Job>()
.eq(Job::getJobStatus, StatusEnum.YES.getStatus())
.eq(Job::getDeleted, StatusEnum.NO.getStatus())
.in(Job::getBucketIndex, scanTask.getBuckets())
.le(Job::getNextTriggerAt, System.currentTimeMillis() + SystemConstants.SCHEDULE_PERIOD * 1000)
.ge(Job::getId, startId)
List<Job> jobs = jobMapper.selectPage(new PageDTO<>(0, systemProperties.getJobPullPageSize()),
new LambdaQueryWrapper<Job>()
.select(Job::getGroupName, Job::getNextTriggerAt, Job::getBlockStrategy, Job::getTriggerType,
Job::getTriggerInterval, Job::getExecutorTimeout, Job::getTaskType, Job::getResident,
Job::getId)
.eq(Job::getJobStatus, StatusEnum.YES.getStatus())
.eq(Job::getDeleted, StatusEnum.NO.getStatus())
.in(Job::getBucketIndex, scanTask.getBuckets())
.le(Job::getNextTriggerAt, DateUtils.toNowMilli() + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD))
.ge(Job::getId, startId)
.orderByAsc(Job::getId)
).getRecords();
return JobTaskConverter.INSTANCE.toJobPartitionTasks(jobs);

View File

@ -41,7 +41,7 @@ import java.util.Objects;
@Component(ActorGenerator.REAL_JOB_EXECUTOR_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class RealJobExecutorActor extends AbstractActor {
public class RequestClientActor extends AbstractActor {
@Autowired
private JobTaskMapper jobTaskMapper;

View File

@ -36,8 +36,6 @@ import java.time.LocalDateTime;
@Slf4j
public class FinishActor extends AbstractActor {
public static final String BEAN_NAME = "FinishActor";
@Autowired
private AccessTemplate accessTemplate;
@Autowired
@ -57,7 +55,6 @@ public class FinishActor extends AbstractActor {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
retryTask.setUpdateDt(LocalDateTime.now());
Assert.isTrue(1 == accessTemplate.getRetryTaskAccess()
.updateById(retryTask.getGroupName(), retryTask),

View File

@ -118,7 +118,7 @@ public abstract class AbstractScanGroup extends AbstractActor {
}, lastId);
log.warn(this.getClass().getName() + " retry scan end. groupName:[{}] startId:[{}] preCostTime:[{}] total:[{}] realPullCount:[{}]",
groupName, lastId, total, preCostTime().get(), count);
groupName, lastId, preCostTime().get(), total, count.get());
}

View File

@ -3,6 +3,7 @@ package com.aizuda.easy.retry.server.retry.task.support.handler;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
@ -17,9 +18,11 @@ import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.helpers.FormattingTuple;
import org.slf4j.helpers.MessageFormatter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@ -36,6 +39,7 @@ import java.util.concurrent.TimeUnit;
* @since 1.5.0
*/
@Component
@Slf4j
public class CallbackRetryTaskHandler {
private static final String CALLBACK_UNIQUE_ID_RULE = "{}_{}";
@ -75,9 +79,15 @@ public class CallbackRetryTaskHandler {
callbackRetryTask.setNextTriggerAt(DateUtils.toLocalDateTime(waitStrategy.computeTriggerTime(waitStrategyContext)));
Assert.isTrue(1 == accessTemplate.getRetryTaskAccess()
.insert(callbackRetryTask.getGroupName(), callbackRetryTask),
() -> new EasyRetryServerException("failed to report data"));
try {
Assert.isTrue(1 == accessTemplate.getRetryTaskAccess()
.insert(callbackRetryTask.getGroupName(), callbackRetryTask),
() -> new EasyRetryServerException("failed to report data"));
} catch (DuplicateKeyException e) {
log.warn("回调数据重复新增. [{}]", JsonUtil.toJsonString(retryTask));
return;
}
// 初始化回调日志
RetryTaskLog retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTask(callbackRetryTask);

View File

@ -13,6 +13,7 @@ import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.Objects;
/**
* @author: www.byteblogs.com
@ -38,6 +39,9 @@ public class CallbackTimerTask extends AbstractTimerTask {
.eq(RetryTask::getGroupName, context.getGroupName())
.eq(RetryTask::getUniqueId, context.getUniqueId())
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()));
if (Objects.isNull(retryTask)) {
return;
}
TaskExecutor taskExecutor = TaskActuatorFactory.getTaskActuator(context.getScene());
taskExecutor.actuator(retryTask);
}

View File

@ -13,6 +13,8 @@ import io.netty.util.Timeout;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
/**
* @author: www.byteblogs.com
* @date : 2023-09-22 17:09
@ -37,6 +39,9 @@ public class RetryTimerTask extends AbstractTimerTask {
.eq(RetryTask::getGroupName, context.getGroupName())
.eq(RetryTask::getUniqueId, context.getUniqueId())
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()));
if (Objects.isNull(retryTask)) {
return;
}
TaskExecutor taskExecutor = TaskActuatorFactory.getTaskActuator(context.getScene());
taskExecutor.actuator(retryTask);
}

View File

@ -67,6 +67,14 @@ public class ConsumerBucketActor extends AbstractActor {
return;
}
if (SystemModeEnum.isJob(systemProperties.getMode())) {
// 扫描回调数据
ScanTask scanTask = new ScanTask();
scanTask.setBuckets(consumerBucket.getBuckets());
ActorRef scanJobActorRef = cacheActorRef("DEFAULT_JOB_KEY", TaskTypeEnum.JOB);
scanJobActorRef.tell(scanTask, scanJobActorRef);
}
if (SystemModeEnum.isRetry(systemProperties.getMode())) {
List<GroupConfig> groupConfigs = null;
try {
@ -93,14 +101,6 @@ public class ConsumerBucketActor extends AbstractActor {
}
}
if (SystemModeEnum.isJob(systemProperties.getMode())) {
// 扫描回调数据
ScanTask scanTask = new ScanTask();
scanTask.setBuckets(consumerBucket.getBuckets());
ActorRef scanJobActorRef = cacheActorRef("DEFAULT_JOB_KEY", TaskTypeEnum.JOB);
scanJobActorRef.tell(scanTask, scanJobActorRef);
}
}
/**

View File

@ -0,0 +1,103 @@
akka {
actor {
common-log-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 8
core-pool-size-factor = 2.0
core-pool-size-max = 64
}
throughput = 10
}
common-scan-task-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 8
core-pool-size-factor = 2.0
core-pool-size-max = 64
}
throughput = 10
}
netty-receive-request-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 16
core-pool-size-factor = 2.0
core-pool-size-max = 64
}
throughput = 10
}
retry-task-executor-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 32
core-pool-size-factor = 2.0
core-pool-size-max = 64
}
throughput = 10
}
retry-task-executor-result-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 8
core-pool-size-factor = 2.0
core-pool-size-max = 64
}
throughput = 10
}
job-task-prepare-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 32
core-pool-size-factor = 2.0
core-pool-size-max = 64
}
throughput = 10
}
job-task-executor-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 32
core-pool-size-factor = 2.0
core-pool-size-max = 64
}
throughput = 10
}
job-task-executor-call-client-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 32
core-pool-size-factor = 2.0
core-pool-size-max = 64
}
throughput = 10
}
job-task-executor-result-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 8
core-pool-size-factor = 2.0
core-pool-size-max = 64
}
throughput = 10
}
}
}