feat:2.4.0
1. 支持多数据源 2.不同的dispatcher线程池隔离
This commit is contained in:
parent
ba014bbacb
commit
28cba34986
@ -8,9 +8,9 @@ CREATE TABLE group_config
|
|||||||
group_status SMALLINT NOT NULL DEFAULT 0,
|
group_status SMALLINT NOT NULL DEFAULT 0,
|
||||||
version INT NOT NULL,
|
version INT NOT NULL,
|
||||||
group_partition INT NOT NULL,
|
group_partition INT NOT NULL,
|
||||||
route_key SMALLINT NOT NULL,
|
|
||||||
id_generator_mode SMALLINT NOT NULL DEFAULT 1,
|
id_generator_mode SMALLINT NOT NULL DEFAULT 1,
|
||||||
init_scene SMALLINT NOT NULL DEFAULT 0,
|
init_scene SMALLINT NOT NULL DEFAULT 0,
|
||||||
|
bucket_index INT NOT NULL DEFAULT 0,
|
||||||
create_dt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
create_dt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||||
update_dt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
|
update_dt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||||
);
|
);
|
||||||
@ -106,6 +106,7 @@ CREATE TABLE retry_task_0
|
|||||||
retry_count INT NOT NULL DEFAULT 0,
|
retry_count INT NOT NULL DEFAULT 0,
|
||||||
retry_status SMALLINT NOT NULL DEFAULT 0,
|
retry_status SMALLINT NOT NULL DEFAULT 0,
|
||||||
task_type SMALLINT NOT NULL DEFAULT 1,
|
task_type SMALLINT NOT NULL DEFAULT 1,
|
||||||
|
route_key SMALLINT NOT NULL,
|
||||||
create_dt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
create_dt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||||
update_dt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
|
update_dt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||||
);
|
);
|
||||||
|
@ -1,7 +1,10 @@
|
|||||||
package com.aizuda.easy.retry.client.job.core.cache;
|
package com.aizuda.easy.retry.client.job.core.cache;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.text.MessageFormat;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
@ -16,14 +19,20 @@ import java.util.function.Supplier;
|
|||||||
* @since : 2.4.0
|
* @since : 2.4.0
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
|
@Slf4j
|
||||||
public class ThreadPoolCache {
|
public class ThreadPoolCache {
|
||||||
private static final ConcurrentHashMap<Long, ThreadPoolExecutor> CACHE_THREAD_POOL = new ConcurrentHashMap<>();
|
private static final ConcurrentHashMap<Long, ThreadPoolExecutor> CACHE_THREAD_POOL = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public static ThreadPoolExecutor createThreadPool(Long taskBatchId, int parallelNum) {
|
public static ThreadPoolExecutor createThreadPool(Long taskBatchId, int parallelNum) {
|
||||||
|
if (CACHE_THREAD_POOL.containsKey(taskBatchId)) {
|
||||||
|
return CACHE_THREAD_POOL.get(taskBatchId);
|
||||||
|
}
|
||||||
|
|
||||||
Supplier<ThreadPoolExecutor> supplier = () -> {
|
Supplier<ThreadPoolExecutor> supplier = () -> {
|
||||||
|
|
||||||
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
|
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
|
||||||
parallelNum, parallelNum, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
|
parallelNum, parallelNum, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
|
||||||
|
new CustomizableThreadFactory(MessageFormat.format("easy-retry-job-{0}-", taskBatchId)));
|
||||||
threadPoolExecutor.allowCoreThreadTimeOut(true);
|
threadPoolExecutor.allowCoreThreadTimeOut(true);
|
||||||
return threadPoolExecutor;
|
return threadPoolExecutor;
|
||||||
};
|
};
|
||||||
|
@ -13,4 +13,6 @@ public class JobArgs {
|
|||||||
private String argsStr;
|
private String argsStr;
|
||||||
|
|
||||||
private String executorInfo;
|
private String executorInfo;
|
||||||
|
|
||||||
|
private Long taskBatchId;
|
||||||
}
|
}
|
||||||
|
@ -14,6 +14,7 @@ import com.google.common.util.concurrent.Futures;
|
|||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
@ -26,6 +27,7 @@ import java.util.concurrent.TimeUnit;
|
|||||||
* @date : 2023-09-27 09:48
|
* @date : 2023-09-27 09:48
|
||||||
* @since 2.4.0
|
* @since 2.4.0
|
||||||
*/
|
*/
|
||||||
|
@Slf4j
|
||||||
public abstract class AbstractJobExecutor implements IJobExecutor {
|
public abstract class AbstractJobExecutor implements IJobExecutor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -58,6 +60,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
|
|||||||
JobArgs jobArgs = new JobArgs();
|
JobArgs jobArgs = new JobArgs();
|
||||||
jobArgs.setArgsStr(jobContext.getArgsStr());
|
jobArgs.setArgsStr(jobContext.getArgsStr());
|
||||||
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
|
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
|
||||||
|
jobArgs.setTaskBatchId(jobContext.getTaskBatchId());
|
||||||
return jobArgs;
|
return jobArgs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -39,7 +39,7 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
|
|||||||
@Override
|
@Override
|
||||||
public void onSuccess(ExecuteResult result) {
|
public void onSuccess(ExecuteResult result) {
|
||||||
// 上报执行成功
|
// 上报执行成功
|
||||||
log.info("任务执行成功 [{}]", JsonUtil.toJsonString(result));
|
log.warn("任务执行成功 taskBatchId:[{}] [{}]", jobContext.getTaskBatchId(), JsonUtil.toJsonString(result));
|
||||||
|
|
||||||
if (Objects.isNull(result)) {
|
if (Objects.isNull(result)) {
|
||||||
result = ExecuteResult.success();
|
result = ExecuteResult.success();
|
||||||
@ -70,7 +70,7 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
|
|||||||
@Override
|
@Override
|
||||||
public void onFailure(final Throwable t) {
|
public void onFailure(final Throwable t) {
|
||||||
// 上报执行失败
|
// 上报执行失败
|
||||||
log.error("任务执行失败 jobTask:[{}]", jobContext.getTaskId(), t);
|
log.error("任务执行失败 任务执行成功 taskBatchId:[{}]", jobContext.getTaskBatchId(), t);
|
||||||
try {
|
try {
|
||||||
|
|
||||||
ExecuteResult failure = ExecuteResult.failure();
|
ExecuteResult failure = ExecuteResult.failure();
|
||||||
|
@ -0,0 +1,17 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||||
|
<mapper namespace="com.aizuda.easy.retry.template.datasource.persistence.mapper.JobLogMessageMapper">
|
||||||
|
|
||||||
|
<!-- 通用查询映射结果 -->
|
||||||
|
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage">
|
||||||
|
<id column="id" property="id" />
|
||||||
|
<result column="group_name" property="groupName" />
|
||||||
|
<result column="job_id" property="jobId" />
|
||||||
|
<result column="task_id" property="taskId" />
|
||||||
|
<result column="client_address" property="clientAddress"/>
|
||||||
|
<result column="task_batch_id" property="taskBatchId" />
|
||||||
|
<result column="create_dt" property="createDt" />
|
||||||
|
<result column="message" property="message" />
|
||||||
|
</resultMap>
|
||||||
|
|
||||||
|
</mapper>
|
@ -0,0 +1,43 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||||
|
<mapper namespace="com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper">
|
||||||
|
|
||||||
|
<!-- 通用查询映射结果 -->
|
||||||
|
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.Job">
|
||||||
|
<id column="id" property="id"/>
|
||||||
|
<result column="group_name" property="groupName"/>
|
||||||
|
<result column="job_name" property="jobName"/>
|
||||||
|
<result column="args_str" property="argsStr"/>
|
||||||
|
<result column="args_type" property="argsType"/>
|
||||||
|
<result column="ext_attrs" property="extAttrs"/>
|
||||||
|
<result column="next_trigger_at" property="nextTriggerAt"/>
|
||||||
|
<result column="job_status" property="jobStatus"/>
|
||||||
|
<result column="route_key" property="routeKey"/>
|
||||||
|
<result column="executor_type" property="executorType"/>
|
||||||
|
<result column="executor_info" property="executorInfo"/>
|
||||||
|
<result column="block_strategy" property="blockStrategy"/>
|
||||||
|
<result column="executor_timeout" property="executorTimeout"/>
|
||||||
|
<result column="max_retry_times" property="maxRetryTimes"/>
|
||||||
|
<result column="retry_interval" property="retryInterval"/>
|
||||||
|
<result column="bucket_index" property="bucketIndex"/>
|
||||||
|
<result column="description" property="description"/>
|
||||||
|
<result column="create_dt" property="createDt"/>
|
||||||
|
<result column="update_dt" property="updateDt"/>
|
||||||
|
<result column="deleted" property="deleted"/>
|
||||||
|
</resultMap>
|
||||||
|
|
||||||
|
<update id="updateBatchNextTriggerAtById" parameterType="java.util.List">
|
||||||
|
update job rt,
|
||||||
|
(
|
||||||
|
<foreach collection="list" item="item" index="index" separator=" union all ">
|
||||||
|
select
|
||||||
|
#{item.nextTriggerAt} as next_trigger_at,
|
||||||
|
#{item.id} as id
|
||||||
|
</foreach>
|
||||||
|
) tt
|
||||||
|
set
|
||||||
|
rt.next_trigger_at = tt.next_trigger_at
|
||||||
|
where rt.id = tt.id
|
||||||
|
</update>
|
||||||
|
|
||||||
|
</mapper>
|
@ -0,0 +1,38 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||||
|
<mapper namespace="com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper">
|
||||||
|
|
||||||
|
<!-- 通用查询映射结果 -->
|
||||||
|
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch">
|
||||||
|
<id column="id" property="id" />
|
||||||
|
<result column="group_name" property="groupName" />
|
||||||
|
<result column="job_id" property="jobId" />
|
||||||
|
<result column="task_batch_status" property="taskBatchStatus" />
|
||||||
|
<result column="create_dt" property="createDt" />
|
||||||
|
<result column="update_dt" property="updateDt" />
|
||||||
|
<result column="deleted" property="deleted" />
|
||||||
|
</resultMap>
|
||||||
|
<select id="selectJobBatchList"
|
||||||
|
parameterType="com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchQueryDO"
|
||||||
|
resultType="com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchResponseDO">
|
||||||
|
SELECT a.*, b.job_name, b.task_type, b.block_strategy, b.trigger_type
|
||||||
|
FROM job_task_batch a join job b on a.job_id = b.id
|
||||||
|
<where>
|
||||||
|
<if test="queryDO.jobId != null">
|
||||||
|
and job_id = #{queryDO.jobId}
|
||||||
|
</if>
|
||||||
|
<if test="queryDO.groupName != null">
|
||||||
|
and a.group_name = #{queryDO.groupName}
|
||||||
|
</if>
|
||||||
|
<if test="queryDO.taskBatchStatus != null">
|
||||||
|
and task_batch_status = #{queryDO.taskBatchStatus}
|
||||||
|
</if>
|
||||||
|
<if test="queryDO.jobName != null">
|
||||||
|
and job_name like #{queryDO.jobName}
|
||||||
|
</if>
|
||||||
|
and a.deleted = 0
|
||||||
|
order by a.id desc
|
||||||
|
</where>
|
||||||
|
|
||||||
|
</select>
|
||||||
|
</mapper>
|
@ -0,0 +1,18 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||||
|
<mapper namespace="com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper">
|
||||||
|
|
||||||
|
<!-- 通用查询映射结果 -->
|
||||||
|
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.JobTask">
|
||||||
|
<id column="id" property="id" />
|
||||||
|
<result column="group_name" property="groupName" />
|
||||||
|
<result column="job_id" property="jobId" />
|
||||||
|
<result column="task_batch_id" property="taskBatchId" />
|
||||||
|
<result column="parent_id" property="parentId" />
|
||||||
|
<result column="task_status" property="taskStatus" />
|
||||||
|
<result column="result_message" property="resultMessage" />
|
||||||
|
<result column="create_dt" property="createDt" />
|
||||||
|
<result column="update_dt" property="updateDt" />
|
||||||
|
</resultMap>
|
||||||
|
|
||||||
|
</mapper>
|
@ -0,0 +1,17 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||||
|
<mapper namespace="com.aizuda.easy.retry.template.datasource.persistence.mapper.JobLogMessageMapper">
|
||||||
|
|
||||||
|
<!-- 通用查询映射结果 -->
|
||||||
|
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage">
|
||||||
|
<id column="id" property="id" />
|
||||||
|
<result column="group_name" property="groupName" />
|
||||||
|
<result column="job_id" property="jobId" />
|
||||||
|
<result column="task_id" property="taskId" />
|
||||||
|
<result column="client_address" property="clientAddress"/>
|
||||||
|
<result column="task_batch_id" property="taskBatchId" />
|
||||||
|
<result column="create_dt" property="createDt" />
|
||||||
|
<result column="message" property="message" />
|
||||||
|
</resultMap>
|
||||||
|
|
||||||
|
</mapper>
|
@ -0,0 +1,43 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||||
|
<mapper namespace="com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper">
|
||||||
|
|
||||||
|
<!-- 通用查询映射结果 -->
|
||||||
|
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.Job">
|
||||||
|
<id column="id" property="id"/>
|
||||||
|
<result column="group_name" property="groupName"/>
|
||||||
|
<result column="job_name" property="jobName"/>
|
||||||
|
<result column="args_str" property="argsStr"/>
|
||||||
|
<result column="args_type" property="argsType"/>
|
||||||
|
<result column="ext_attrs" property="extAttrs"/>
|
||||||
|
<result column="next_trigger_at" property="nextTriggerAt"/>
|
||||||
|
<result column="job_status" property="jobStatus"/>
|
||||||
|
<result column="route_key" property="routeKey"/>
|
||||||
|
<result column="executor_type" property="executorType"/>
|
||||||
|
<result column="executor_info" property="executorInfo"/>
|
||||||
|
<result column="block_strategy" property="blockStrategy"/>
|
||||||
|
<result column="executor_timeout" property="executorTimeout"/>
|
||||||
|
<result column="max_retry_times" property="maxRetryTimes"/>
|
||||||
|
<result column="retry_interval" property="retryInterval"/>
|
||||||
|
<result column="bucket_index" property="bucketIndex"/>
|
||||||
|
<result column="description" property="description"/>
|
||||||
|
<result column="create_dt" property="createDt"/>
|
||||||
|
<result column="update_dt" property="updateDt"/>
|
||||||
|
<result column="deleted" property="deleted"/>
|
||||||
|
</resultMap>
|
||||||
|
|
||||||
|
<update id="updateBatchNextTriggerAtById" parameterType="java.util.List">
|
||||||
|
update job rt,
|
||||||
|
(
|
||||||
|
<foreach collection="list" item="item" index="index" separator=" union all ">
|
||||||
|
select
|
||||||
|
#{item.nextTriggerAt} as next_trigger_at,
|
||||||
|
#{item.id} as id
|
||||||
|
</foreach>
|
||||||
|
) tt
|
||||||
|
set
|
||||||
|
rt.next_trigger_at = tt.next_trigger_at
|
||||||
|
where rt.id = tt.id
|
||||||
|
</update>
|
||||||
|
|
||||||
|
</mapper>
|
@ -0,0 +1,38 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||||
|
<mapper namespace="com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper">
|
||||||
|
|
||||||
|
<!-- 通用查询映射结果 -->
|
||||||
|
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch">
|
||||||
|
<id column="id" property="id" />
|
||||||
|
<result column="group_name" property="groupName" />
|
||||||
|
<result column="job_id" property="jobId" />
|
||||||
|
<result column="task_batch_status" property="taskBatchStatus" />
|
||||||
|
<result column="create_dt" property="createDt" />
|
||||||
|
<result column="update_dt" property="updateDt" />
|
||||||
|
<result column="deleted" property="deleted" />
|
||||||
|
</resultMap>
|
||||||
|
<select id="selectJobBatchList"
|
||||||
|
parameterType="com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchQueryDO"
|
||||||
|
resultType="com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchResponseDO">
|
||||||
|
SELECT a.*, b.job_name, b.task_type, b.block_strategy, b.trigger_type
|
||||||
|
FROM job_task_batch a join job b on a.job_id = b.id
|
||||||
|
<where>
|
||||||
|
<if test="queryDO.jobId != null">
|
||||||
|
and job_id = #{queryDO.jobId}
|
||||||
|
</if>
|
||||||
|
<if test="queryDO.groupName != null">
|
||||||
|
and a.group_name = #{queryDO.groupName}
|
||||||
|
</if>
|
||||||
|
<if test="queryDO.taskBatchStatus != null">
|
||||||
|
and task_batch_status = #{queryDO.taskBatchStatus}
|
||||||
|
</if>
|
||||||
|
<if test="queryDO.jobName != null">
|
||||||
|
and job_name like #{queryDO.jobName}
|
||||||
|
</if>
|
||||||
|
and a.deleted = 0
|
||||||
|
order by a.id desc
|
||||||
|
</where>
|
||||||
|
|
||||||
|
</select>
|
||||||
|
</mapper>
|
@ -0,0 +1,18 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||||
|
<mapper namespace="com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper">
|
||||||
|
|
||||||
|
<!-- 通用查询映射结果 -->
|
||||||
|
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.JobTask">
|
||||||
|
<id column="id" property="id" />
|
||||||
|
<result column="group_name" property="groupName" />
|
||||||
|
<result column="job_id" property="jobId" />
|
||||||
|
<result column="task_batch_id" property="taskBatchId" />
|
||||||
|
<result column="parent_id" property="parentId" />
|
||||||
|
<result column="task_status" property="taskStatus" />
|
||||||
|
<result column="result_message" property="resultMessage" />
|
||||||
|
<result column="create_dt" property="createDt" />
|
||||||
|
<result column="update_dt" property="updateDt" />
|
||||||
|
</resultMap>
|
||||||
|
|
||||||
|
</mapper>
|
@ -13,18 +13,32 @@ import com.aizuda.easy.retry.common.core.context.SpringContext;
|
|||||||
*/
|
*/
|
||||||
public class ActorGenerator {
|
public class ActorGenerator {
|
||||||
|
|
||||||
|
/*----------------------------------------系统通用配置 START----------------------------------------*/
|
||||||
|
|
||||||
|
public static final String SCAN_BUCKET_ACTOR = "ScanBucketActor";
|
||||||
|
public static final String REQUEST_HANDLER_ACTOR = "RequestHandlerActor";
|
||||||
|
private static final String COMMON_LOG_DISPATCHER = "akka.actor.common-log-dispatcher";
|
||||||
|
private static final String COMMON_SCAN_TASK_DISPATCHER = "akka.actor.common-scan-task-dispatcher";
|
||||||
|
private static final String NETTY_RECEIVE_REQUEST_DISPATCHER = "akka.actor.netty-receive-request-dispatcher";
|
||||||
|
|
||||||
|
/*----------------------------------------系统通用配置 END----------------------------------------*/
|
||||||
|
|
||||||
|
/*----------------------------------------分布式重试任务 START----------------------------------------*/
|
||||||
public static final String SCAN_CALLBACK_GROUP_ACTOR = "ScanCallbackGroupActor";
|
public static final String SCAN_CALLBACK_GROUP_ACTOR = "ScanCallbackGroupActor";
|
||||||
public static final String SCAN_RETRY_GROUP_ACTOR = "ScanGroupActor";
|
public static final String SCAN_RETRY_GROUP_ACTOR = "ScanGroupActor";
|
||||||
public static final String SCAN_BUCKET_ACTOR = "ScanBucketActor";
|
|
||||||
public static final String FINISH_ACTOR = "FinishActor";
|
public static final String FINISH_ACTOR = "FinishActor";
|
||||||
public static final String FAILURE_ACTOR = "FailureActor";
|
public static final String FAILURE_ACTOR = "FailureActor";
|
||||||
public static final String NO_RETRY_ACTOR = "NoRetryActor";
|
public static final String NO_RETRY_ACTOR = "NoRetryActor";
|
||||||
public static final String EXEC_CALLBACK_UNIT_ACTOR = "ExecCallbackUnitActor";
|
public static final String EXEC_CALLBACK_UNIT_ACTOR = "ExecCallbackUnitActor";
|
||||||
public static final String EXEC_UNIT_ACTOR = "ExecUnitActor";
|
public static final String EXEC_UNIT_ACTOR = "ExecUnitActor";
|
||||||
public static final String LOG_ACTOR = "LogActor";
|
public static final String LOG_ACTOR = "LogActor";
|
||||||
public static final String REQUEST_HANDLER_ACTOR = "RequestHandlerActor";
|
|
||||||
|
|
||||||
/*----------------------------------------分布式任务调度----------------------------------------*/
|
private static final String RETRY_TASK_EXECUTOR_DISPATCHER = "akka.actor.retry-task-executor-dispatcher";
|
||||||
|
private static final String RETRY_TASK_EXECUTOR_RESULT_DISPATCHER = "akka.actor.retry-task-executor-result-dispatcher";
|
||||||
|
|
||||||
|
/*----------------------------------------分布式重试任务 END----------------------------------------*/
|
||||||
|
|
||||||
|
/*----------------------------------------分布式任务调度 START----------------------------------------*/
|
||||||
public static final String SCAN_JOB_ACTOR = "ScanJobActor";
|
public static final String SCAN_JOB_ACTOR = "ScanJobActor";
|
||||||
public static final String JOB_TASK_PREPARE_ACTOR = "JobTaskPrepareActor";
|
public static final String JOB_TASK_PREPARE_ACTOR = "JobTaskPrepareActor";
|
||||||
public static final String JOB_EXECUTOR_ACTOR = "JobExecutorActor";
|
public static final String JOB_EXECUTOR_ACTOR = "JobExecutorActor";
|
||||||
@ -33,6 +47,14 @@ public class ActorGenerator {
|
|||||||
public static final String REAL_JOB_EXECUTOR_ACTOR = "RealJobExecutorActor";
|
public static final String REAL_JOB_EXECUTOR_ACTOR = "RealJobExecutorActor";
|
||||||
public static final String REAL_STOP_TASK_INSTANCE_ACTOR = "RealStopTaskInstanceActor";
|
public static final String REAL_STOP_TASK_INSTANCE_ACTOR = "RealStopTaskInstanceActor";
|
||||||
|
|
||||||
|
/*----------------------------------------dispatcher----------------------------------------*/
|
||||||
|
private static final String JOB_TASK_DISPATCHER = "akka.actor.job-task-prepare-dispatcher";
|
||||||
|
private static final String JOB_TASK_EXECUTOR_DISPATCHER = "akka.actor.job-task-executor-dispatcher";
|
||||||
|
private static final String JOB_TASK_EXECUTOR_RESULT_DISPATCHER = "akka.actor.job-task-executor-result-dispatcher";
|
||||||
|
private static final String JOB_TASK_EXECUTOR_CALL_CLIENT_DISPATCHER = "akka.actor.job-task-executor-call-client-dispatcher";
|
||||||
|
|
||||||
|
/*----------------------------------------分布式任务调度 END----------------------------------------*/
|
||||||
|
|
||||||
private ActorGenerator() {}
|
private ActorGenerator() {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -41,7 +63,7 @@ public class ActorGenerator {
|
|||||||
* @return actor 引用
|
* @return actor 引用
|
||||||
*/
|
*/
|
||||||
public static ActorRef finishActor() {
|
public static ActorRef finishActor() {
|
||||||
return getDispatchResultActorSystem().actorOf(getSpringExtension().props(FINISH_ACTOR));
|
return getRetryActorSystem().actorOf(getSpringExtension().props(FINISH_ACTOR).withDispatcher(RETRY_TASK_EXECUTOR_RESULT_DISPATCHER));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -50,7 +72,7 @@ public class ActorGenerator {
|
|||||||
* @return actor 引用
|
* @return actor 引用
|
||||||
*/
|
*/
|
||||||
public static ActorRef failureActor() {
|
public static ActorRef failureActor() {
|
||||||
return getDispatchResultActorSystem().actorOf(getSpringExtension().props(FAILURE_ACTOR));
|
return getRetryActorSystem().actorOf(getSpringExtension().props(FAILURE_ACTOR).withDispatcher(RETRY_TASK_EXECUTOR_RESULT_DISPATCHER));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -59,7 +81,7 @@ public class ActorGenerator {
|
|||||||
* @return actor 引用
|
* @return actor 引用
|
||||||
*/
|
*/
|
||||||
public static ActorRef noRetryActor() {
|
public static ActorRef noRetryActor() {
|
||||||
return getDispatchResultActorSystem().actorOf(getSpringExtension().props(NO_RETRY_ACTOR));
|
return getRetryActorSystem().actorOf(getSpringExtension().props(NO_RETRY_ACTOR).withDispatcher(RETRY_TASK_EXECUTOR_RESULT_DISPATCHER));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -68,7 +90,7 @@ public class ActorGenerator {
|
|||||||
* @return actor 引用
|
* @return actor 引用
|
||||||
*/
|
*/
|
||||||
public static ActorRef execCallbackUnitActor() {
|
public static ActorRef execCallbackUnitActor() {
|
||||||
return getDispatchResultActorSystem().actorOf(getSpringExtension().props(EXEC_CALLBACK_UNIT_ACTOR));
|
return getRetryActorSystem().actorOf(getSpringExtension().props(EXEC_CALLBACK_UNIT_ACTOR).withDispatcher(RETRY_TASK_EXECUTOR_DISPATCHER));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -77,7 +99,9 @@ public class ActorGenerator {
|
|||||||
* @return actor 引用
|
* @return actor 引用
|
||||||
*/
|
*/
|
||||||
public static ActorRef execUnitActor() {
|
public static ActorRef execUnitActor() {
|
||||||
return getDispatchExecUnitActorSystem().actorOf(getSpringExtension().props(EXEC_UNIT_ACTOR));
|
return getRetryActorSystem().actorOf(getSpringExtension()
|
||||||
|
.props(EXEC_UNIT_ACTOR)
|
||||||
|
.withDispatcher(RETRY_TASK_EXECUTOR_DISPATCHER));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -86,7 +110,9 @@ public class ActorGenerator {
|
|||||||
* @return actor 引用
|
* @return actor 引用
|
||||||
*/
|
*/
|
||||||
public static ActorRef scanGroupActor() {
|
public static ActorRef scanGroupActor() {
|
||||||
return getDispatchRetryActorSystem().actorOf(getSpringExtension().props(SCAN_RETRY_GROUP_ACTOR));
|
return getCommonActorSystemSystem().actorOf(getSpringExtension()
|
||||||
|
.props(SCAN_RETRY_GROUP_ACTOR)
|
||||||
|
.withDispatcher(COMMON_SCAN_TASK_DISPATCHER));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -95,7 +121,9 @@ public class ActorGenerator {
|
|||||||
* @return actor 引用
|
* @return actor 引用
|
||||||
*/
|
*/
|
||||||
public static ActorRef scanCallbackGroupActor() {
|
public static ActorRef scanCallbackGroupActor() {
|
||||||
return getDispatchRetryActorSystem().actorOf(getSpringExtension().props(SCAN_CALLBACK_GROUP_ACTOR));
|
return getRetryActorSystem().actorOf(getSpringExtension()
|
||||||
|
.props(SCAN_CALLBACK_GROUP_ACTOR)
|
||||||
|
.withDispatcher(COMMON_SCAN_TASK_DISPATCHER));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -104,7 +132,9 @@ public class ActorGenerator {
|
|||||||
* @return actor 引用
|
* @return actor 引用
|
||||||
*/
|
*/
|
||||||
public static ActorRef scanJobActor() {
|
public static ActorRef scanJobActor() {
|
||||||
return getDispatchRetryActorSystem().actorOf(getSpringExtension().props(SCAN_JOB_ACTOR));
|
return getCommonActorSystemSystem().actorOf(getSpringExtension()
|
||||||
|
.props(SCAN_JOB_ACTOR)
|
||||||
|
.withDispatcher(COMMON_SCAN_TASK_DISPATCHER));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -113,7 +143,9 @@ public class ActorGenerator {
|
|||||||
* @return actor 引用
|
* @return actor 引用
|
||||||
*/
|
*/
|
||||||
public static ActorRef scanBucketActor() {
|
public static ActorRef scanBucketActor() {
|
||||||
return getDispatchRetryActorSystem().actorOf(getSpringExtension().props(SCAN_BUCKET_ACTOR));
|
return getCommonActorSystemSystem().actorOf(getSpringExtension()
|
||||||
|
.props(SCAN_BUCKET_ACTOR)
|
||||||
|
.withDispatcher(COMMON_SCAN_TASK_DISPATCHER));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -122,7 +154,9 @@ public class ActorGenerator {
|
|||||||
* @return actor 引用
|
* @return actor 引用
|
||||||
*/
|
*/
|
||||||
public static ActorRef logActor() {
|
public static ActorRef logActor() {
|
||||||
return getLogActorSystemSystem().actorOf(getSpringExtension().props(LOG_ACTOR));
|
return getCommonActorSystemSystem().actorOf(getSpringExtension()
|
||||||
|
.props(LOG_ACTOR)
|
||||||
|
.withDispatcher(COMMON_LOG_DISPATCHER));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -131,7 +165,8 @@ public class ActorGenerator {
|
|||||||
* @return actor 引用
|
* @return actor 引用
|
||||||
*/
|
*/
|
||||||
public static ActorRef requestHandlerActor() {
|
public static ActorRef requestHandlerActor() {
|
||||||
return getNettyActorSystem().actorOf(getSpringExtension().props(REQUEST_HANDLER_ACTOR));
|
return getNettyActorSystem().actorOf(getSpringExtension().props(REQUEST_HANDLER_ACTOR)
|
||||||
|
.withDispatcher(NETTY_RECEIVE_REQUEST_DISPATCHER));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -141,7 +176,8 @@ public class ActorGenerator {
|
|||||||
* @return actor 引用
|
* @return actor 引用
|
||||||
*/
|
*/
|
||||||
public static ActorRef jobTaskPrepareActor() {
|
public static ActorRef jobTaskPrepareActor() {
|
||||||
return getJobActorSystem().actorOf(getSpringExtension().props(JOB_TASK_PREPARE_ACTOR));
|
return getJobActorSystem().actorOf(getSpringExtension().props(JOB_TASK_PREPARE_ACTOR)
|
||||||
|
.withDispatcher(JOB_TASK_DISPATCHER));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -150,7 +186,11 @@ public class ActorGenerator {
|
|||||||
* @return actor 引用
|
* @return actor 引用
|
||||||
*/
|
*/
|
||||||
public static ActorRef jobTaskExecutorActor() {
|
public static ActorRef jobTaskExecutorActor() {
|
||||||
return getJobActorSystem().actorOf(getSpringExtension().props(JOB_EXECUTOR_ACTOR));
|
return getJobActorSystem()
|
||||||
|
.actorOf(getSpringExtension()
|
||||||
|
.props(JOB_EXECUTOR_ACTOR)
|
||||||
|
.withDispatcher(JOB_TASK_EXECUTOR_DISPATCHER)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -159,7 +199,9 @@ public class ActorGenerator {
|
|||||||
* @return actor 引用
|
* @return actor 引用
|
||||||
*/
|
*/
|
||||||
public static ActorRef jobTaskExecutorResultActor() {
|
public static ActorRef jobTaskExecutorResultActor() {
|
||||||
return getJobActorSystem().actorOf(getSpringExtension().props(JOB_EXECUTOR_RESULT_ACTOR));
|
return getJobActorSystem().actorOf(getSpringExtension()
|
||||||
|
.props(JOB_EXECUTOR_RESULT_ACTOR)
|
||||||
|
.withDispatcher(JOB_TASK_EXECUTOR_RESULT_DISPATCHER));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -168,7 +210,9 @@ public class ActorGenerator {
|
|||||||
* @return actor 引用
|
* @return actor 引用
|
||||||
*/
|
*/
|
||||||
public static ActorRef jobRealTaskExecutorActor() {
|
public static ActorRef jobRealTaskExecutorActor() {
|
||||||
return getJobActorSystem().actorOf(getSpringExtension().props(REAL_JOB_EXECUTOR_ACTOR));
|
return getJobActorSystem().actorOf(getSpringExtension()
|
||||||
|
.props(REAL_JOB_EXECUTOR_ACTOR)
|
||||||
|
.withDispatcher(JOB_TASK_EXECUTOR_CALL_CLIENT_DISPATCHER));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -177,7 +221,8 @@ public class ActorGenerator {
|
|||||||
* @return actor 引用
|
* @return actor 引用
|
||||||
*/
|
*/
|
||||||
public static ActorRef jobRealStopTaskInstanceActor() {
|
public static ActorRef jobRealStopTaskInstanceActor() {
|
||||||
return getJobActorSystem().actorOf(getSpringExtension().props(REAL_STOP_TASK_INSTANCE_ACTOR));
|
return getJobActorSystem().actorOf(getSpringExtension().props(REAL_STOP_TASK_INSTANCE_ACTOR)
|
||||||
|
.withDispatcher(JOB_TASK_EXECUTOR_CALL_CLIENT_DISPATCHER));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -186,35 +231,20 @@ public class ActorGenerator {
|
|||||||
* @return actor 引用
|
* @return actor 引用
|
||||||
*/
|
*/
|
||||||
public static ActorRef jobLogActor() {
|
public static ActorRef jobLogActor() {
|
||||||
return getJobActorSystem().actorOf(getSpringExtension().props(JOB_LOG_ACTOR));
|
return getCommonActorSystemSystem().actorOf(getSpringExtension().props(JOB_LOG_ACTOR)
|
||||||
|
.withDispatcher(COMMON_LOG_DISPATCHER));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static SpringExtension getSpringExtension() {
|
public static SpringExtension getSpringExtension() {
|
||||||
return SpringContext.getBeanByType(SpringExtension.class);
|
return SpringContext.getBeanByType(SpringExtension.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 重试任务提取器
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
public static ActorSystem getDispatchRetryActorSystem() {
|
|
||||||
return SpringContext.getBean("dispatchRetryActorSystem", ActorSystem.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 重试任务分发器
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
public static ActorSystem getDispatchExecUnitActorSystem() {
|
|
||||||
return SpringContext.getBean("dispatchExecUnitActorSystem", ActorSystem.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 重试任务结果分发器
|
* 重试任务结果分发器
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public static ActorSystem getDispatchResultActorSystem() {
|
public static ActorSystem getRetryActorSystem() {
|
||||||
return SpringContext.getBean("dispatchResultActorSystem", ActorSystem.class);
|
return SpringContext.getBean("retryActorSystem", ActorSystem.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -222,8 +252,8 @@ public class ActorGenerator {
|
|||||||
*
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public static ActorSystem getLogActorSystemSystem() {
|
public static ActorSystem getCommonActorSystemSystem() {
|
||||||
return SpringContext.getBean("logActorSystem", ActorSystem.class);
|
return SpringContext.getBean("commonActorSystem", ActorSystem.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -245,5 +275,4 @@ public class ActorGenerator {
|
|||||||
return SpringContext.getBean("jobActorSystem", ActorSystem.class);
|
return SpringContext.getBean("jobActorSystem", ActorSystem.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package com.aizuda.easy.retry.server.common.akka;
|
package com.aizuda.easy.retry.server.common.akka;
|
||||||
|
|
||||||
import akka.actor.ActorSystem;
|
import akka.actor.ActorSystem;
|
||||||
|
import com.typesafe.config.ConfigFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.context.ApplicationContext;
|
import org.springframework.context.ApplicationContext;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
@ -15,63 +16,26 @@ import org.springframework.context.annotation.Configuration;
|
|||||||
@Configuration
|
@Configuration
|
||||||
public class AkkaConfiguration {
|
public class AkkaConfiguration {
|
||||||
|
|
||||||
private static final String DISPATCH_RETRY_ACTOR_SYSTEM = "DISPATCH_RETRY_ACTOR_SYSTEM";
|
private static final String CONFIG_NAME = "easyretry";
|
||||||
private static final String DISPATCH_EXEC_UNIT_RETRY_ACTOR_SYSTEM = "DISPATCH_EXEC_UNIT_RETRY_ACTOR_SYSTEM";
|
|
||||||
private static final String DISPATCH_RESULT_ACTOR_SYSTEM = "DISPATCH_RESULT_ACTOR_SYSTEM";
|
|
||||||
private static final String LOG_ACTOR_SYSTEM = "LOG_ACTOR_SYSTEM";
|
|
||||||
private static final String NETTY_ACTOR_SYSTEM = "NETTY_ACTOR_SYSTEM";
|
private static final String NETTY_ACTOR_SYSTEM = "NETTY_ACTOR_SYSTEM";
|
||||||
private static final String JOB_ACTOR_SYSTEM = "JOB_ACTOR_SYSTEM";
|
private static final String JOB_ACTOR_SYSTEM = "JOB_ACTOR_SYSTEM";
|
||||||
|
private static final String RETRY_ACTOR_SYSTEM = "RETRY_ACTOR_SYSTEM";
|
||||||
|
private static final String COMMON_ACTOR_SYSTEM = "COMMON_ACTOR_SYSTEM";
|
||||||
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private ApplicationContext applicationContext;
|
private ApplicationContext applicationContext;
|
||||||
@Autowired
|
@Autowired
|
||||||
private SpringExtension springExtension;
|
private SpringExtension springExtension;
|
||||||
|
|
||||||
/**
|
|
||||||
* 重试分发ActorSystem
|
|
||||||
*
|
|
||||||
* @return {@link ActorSystem} 顶级actor
|
|
||||||
*/
|
|
||||||
@Bean("dispatchRetryActorSystem")
|
|
||||||
public ActorSystem createDispatchRetryActorSystem() {
|
|
||||||
ActorSystem system = ActorSystem.create(DISPATCH_RETRY_ACTOR_SYSTEM);
|
|
||||||
springExtension.initialize(applicationContext);
|
|
||||||
return system;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 重试分发执行ActorSystem
|
|
||||||
*
|
|
||||||
* @return {@link ActorSystem} 顶级actor
|
|
||||||
*/
|
|
||||||
@Bean("dispatchExecUnitActorSystem")
|
|
||||||
public ActorSystem createDispatchExecUnitRetryActorSystem() {
|
|
||||||
ActorSystem system = ActorSystem.create(DISPATCH_EXEC_UNIT_RETRY_ACTOR_SYSTEM);
|
|
||||||
springExtension.initialize(applicationContext);
|
|
||||||
return system;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 重试分发执行结果处理ActorSystem
|
|
||||||
*
|
|
||||||
* @return {@link ActorSystem} 顶级actor
|
|
||||||
*/
|
|
||||||
@Bean("dispatchResultActorSystem")
|
|
||||||
public ActorSystem createDispatchResultRetryActorSystem() {
|
|
||||||
ActorSystem system = ActorSystem.create(DISPATCH_RESULT_ACTOR_SYSTEM);
|
|
||||||
springExtension.initialize(applicationContext);
|
|
||||||
return system;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 日志处理
|
* 日志处理
|
||||||
*
|
*
|
||||||
* @return {@link ActorSystem} 顶级actor
|
* @return {@link ActorSystem} 顶级actor
|
||||||
*/
|
*/
|
||||||
@Bean("logActorSystem")
|
@Bean("commonActorSystem")
|
||||||
public ActorSystem createLogActorSystem() {
|
public ActorSystem createLogActorSystem() {
|
||||||
ActorSystem system = ActorSystem.create(LOG_ACTOR_SYSTEM);
|
ActorSystem system = ActorSystem.create(COMMON_ACTOR_SYSTEM, ConfigFactory.load(CONFIG_NAME));
|
||||||
springExtension.initialize(applicationContext);
|
springExtension.initialize(applicationContext);
|
||||||
return system;
|
return system;
|
||||||
}
|
}
|
||||||
@ -83,11 +47,24 @@ public class AkkaConfiguration {
|
|||||||
*/
|
*/
|
||||||
@Bean("nettyActorSystem")
|
@Bean("nettyActorSystem")
|
||||||
public ActorSystem nettyActorSystem() {
|
public ActorSystem nettyActorSystem() {
|
||||||
ActorSystem system = ActorSystem.create(NETTY_ACTOR_SYSTEM);
|
ActorSystem system = ActorSystem.create(NETTY_ACTOR_SYSTEM, ConfigFactory.load(CONFIG_NAME));
|
||||||
springExtension.initialize(applicationContext);
|
springExtension.initialize(applicationContext);
|
||||||
return system;
|
return system;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理retry调度
|
||||||
|
*
|
||||||
|
* @return {@link ActorSystem} 顶级actor
|
||||||
|
*/
|
||||||
|
@Bean("retryActorSystem")
|
||||||
|
public ActorSystem retryActorSystem() {
|
||||||
|
ActorSystem system = ActorSystem.create(RETRY_ACTOR_SYSTEM, ConfigFactory.load(CONFIG_NAME));
|
||||||
|
springExtension.initialize(applicationContext);
|
||||||
|
return system;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理job调度
|
* 处理job调度
|
||||||
*
|
*
|
||||||
@ -95,8 +72,9 @@ public class AkkaConfiguration {
|
|||||||
*/
|
*/
|
||||||
@Bean("jobActorSystem")
|
@Bean("jobActorSystem")
|
||||||
public ActorSystem jobActorSystem() {
|
public ActorSystem jobActorSystem() {
|
||||||
ActorSystem system = ActorSystem.create(JOB_ACTOR_SYSTEM);
|
ActorSystem system = ActorSystem.create(JOB_ACTOR_SYSTEM, ConfigFactory.load(CONFIG_NAME));
|
||||||
springExtension.initialize(applicationContext);
|
springExtension.initialize(applicationContext);
|
||||||
return system;
|
return system;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -19,11 +19,6 @@ public class JobPartitionTask extends PartitionTask {
|
|||||||
*/
|
*/
|
||||||
private String groupName;
|
private String groupName;
|
||||||
|
|
||||||
/**
|
|
||||||
* 名称
|
|
||||||
*/
|
|
||||||
private String jobName;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 下次触发时间
|
* 下次触发时间
|
||||||
*/
|
*/
|
||||||
|
@ -19,11 +19,6 @@ public class JobTaskPrepareDTO {
|
|||||||
*/
|
*/
|
||||||
private String groupName;
|
private String groupName;
|
||||||
|
|
||||||
/**
|
|
||||||
* 名称
|
|
||||||
*/
|
|
||||||
private String jobName;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 下次触发时间
|
* 下次触发时间
|
||||||
*/
|
*/
|
||||||
|
@ -6,6 +6,7 @@ import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
|
|||||||
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
|
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
|
||||||
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
|
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
|
||||||
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||||
|
import com.aizuda.easy.retry.common.core.util.JsonUtil;
|
||||||
import com.aizuda.easy.retry.server.common.WaitStrategy;
|
import com.aizuda.easy.retry.server.common.WaitStrategy;
|
||||||
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||||
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
|
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
|
||||||
@ -38,6 +39,7 @@ import org.springframework.transaction.support.TransactionSynchronizationManager
|
|||||||
import org.springframework.transaction.support.TransactionTemplate;
|
import org.springframework.transaction.support.TransactionTemplate;
|
||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@ -61,6 +63,7 @@ public class JobExecutorActor extends AbstractActor {
|
|||||||
public Receive createReceive() {
|
public Receive createReceive() {
|
||||||
return receiveBuilder().match(TaskExecuteDTO.class, taskExecute -> {
|
return receiveBuilder().match(TaskExecuteDTO.class, taskExecute -> {
|
||||||
try {
|
try {
|
||||||
|
log.info("准备执行任务. [{}] [{}]", LocalDateTime.now(), JsonUtil.toJsonString(taskExecute));
|
||||||
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
||||||
@Override
|
@Override
|
||||||
protected void doInTransactionWithoutResult(final TransactionStatus status) {
|
protected void doInTransactionWithoutResult(final TransactionStatus status) {
|
||||||
@ -112,6 +115,7 @@ public class JobExecutorActor extends AbstractActor {
|
|||||||
context.setJobId(job.getId());
|
context.setJobId(job.getId());
|
||||||
jobExecutor.execute(context);
|
jobExecutor.execute(context);
|
||||||
} finally {
|
} finally {
|
||||||
|
log.info("准备执行任务完成.[{}]", JsonUtil.toJsonString(taskExecute));
|
||||||
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
|
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
|
||||||
@Override
|
@Override
|
||||||
public void afterCompletion(int status) {
|
public void afterCompletion(int status) {
|
||||||
|
@ -4,6 +4,7 @@ import akka.actor.AbstractActor;
|
|||||||
import akka.actor.ActorRef;
|
import akka.actor.ActorRef;
|
||||||
import cn.hutool.core.lang.Assert;
|
import cn.hutool.core.lang.Assert;
|
||||||
import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum;
|
import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum;
|
||||||
|
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||||
import com.aizuda.easy.retry.common.core.util.JsonUtil;
|
import com.aizuda.easy.retry.common.core.util.JsonUtil;
|
||||||
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||||
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
||||||
@ -49,7 +50,7 @@ public class JobExecutorResultActor extends AbstractActor {
|
|||||||
public Receive createReceive() {
|
public Receive createReceive() {
|
||||||
return receiveBuilder().match(JobExecutorResultDTO.class, result -> {
|
return receiveBuilder().match(JobExecutorResultDTO.class, result -> {
|
||||||
log.info("更新任务状态. 参数:[{}]", JsonUtil.toJsonString(result));
|
log.info("更新任务状态. 参数:[{}]", JsonUtil.toJsonString(result));
|
||||||
|
try {
|
||||||
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
||||||
@Override
|
@Override
|
||||||
protected void doInTransactionWithoutResult(final TransactionStatus status) {
|
protected void doInTransactionWithoutResult(final TransactionStatus status) {
|
||||||
@ -84,6 +85,11 @@ public class JobExecutorResultActor extends AbstractActor {
|
|||||||
jobLogDTO.setTaskId(result.getTaskId());
|
jobLogDTO.setTaskId(result.getTaskId());
|
||||||
ActorRef actorRef = ActorGenerator.jobLogActor();
|
ActorRef actorRef = ActorGenerator.jobLogActor();
|
||||||
actorRef.tell(jobLogDTO, actorRef);
|
actorRef.tell(jobLogDTO, actorRef);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LogUtils.error(log, " job executor result exception. [{}]", result, e);
|
||||||
|
} finally {
|
||||||
|
getContext().stop(getSelf());
|
||||||
|
}
|
||||||
|
|
||||||
}).build();
|
}).build();
|
||||||
}
|
}
|
||||||
|
@ -51,7 +51,8 @@ public class JobTaskPrepareActor extends AbstractActor {
|
|||||||
|
|
||||||
private void doPrepare(JobTaskPrepareDTO prepare) {
|
private void doPrepare(JobTaskPrepareDTO prepare) {
|
||||||
|
|
||||||
List<JobTaskBatch> notCompleteJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
|
List<JobTaskBatch> notCompleteJobTaskBatchList = jobTaskBatchMapper
|
||||||
|
.selectList(new LambdaQueryWrapper<JobTaskBatch>()
|
||||||
.eq(JobTaskBatch::getJobId, prepare.getJobId())
|
.eq(JobTaskBatch::getJobId, prepare.getJobId())
|
||||||
.in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE));
|
.in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE));
|
||||||
|
|
||||||
|
@ -29,10 +29,8 @@ import org.springframework.context.annotation.Scope;
|
|||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.time.LocalDateTime;
|
||||||
import java.util.Collections;
|
import java.util.*;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Objects;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -81,8 +79,9 @@ public class ScanJobTaskActor extends AbstractActor {
|
|||||||
|
|
||||||
List<Job> waitUpdateJobs = new ArrayList<>();
|
List<Job> waitUpdateJobs = new ArrayList<>();
|
||||||
List<JobTaskPrepareDTO> waitExecJobs = new ArrayList<>();
|
List<JobTaskPrepareDTO> waitExecJobs = new ArrayList<>();
|
||||||
|
long now = DateUtils.toNowMilli();
|
||||||
for (PartitionTask partitionTask : partitionTasks) {
|
for (PartitionTask partitionTask : partitionTasks) {
|
||||||
processJob((JobPartitionTask) partitionTask, waitUpdateJobs, waitExecJobs);
|
processJob((JobPartitionTask) partitionTask, waitUpdateJobs, waitExecJobs, now);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 批量更新
|
// 批量更新
|
||||||
@ -96,9 +95,8 @@ public class ScanJobTaskActor extends AbstractActor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void processJob(JobPartitionTask partitionTask, final List<Job> waitUpdateJobs,
|
private void processJob(JobPartitionTask partitionTask, final List<Job> waitUpdateJobs,
|
||||||
final List<JobTaskPrepareDTO> waitExecJobs) {
|
final List<JobTaskPrepareDTO> waitExecJobs, long now) {
|
||||||
CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName());
|
CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName());
|
||||||
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(partitionTask);
|
|
||||||
|
|
||||||
Job job = new Job();
|
Job job = new Job();
|
||||||
job.setId(partitionTask.getId());
|
job.setId(partitionTask.getId());
|
||||||
@ -107,12 +105,11 @@ public class ScanJobTaskActor extends AbstractActor {
|
|||||||
Long nextTriggerAt = ResidentTaskCache.get(partitionTask.getId());
|
Long nextTriggerAt = ResidentTaskCache.get(partitionTask.getId());
|
||||||
if (needCalculateNextTriggerTime(partitionTask)) {
|
if (needCalculateNextTriggerTime(partitionTask)) {
|
||||||
// 更新下次触发时间
|
// 更新下次触发时间
|
||||||
nextTriggerAt = calculateNextTriggerTime(partitionTask);
|
nextTriggerAt = calculateNextTriggerTime(partitionTask, now);
|
||||||
} else {
|
} else {
|
||||||
// 若常驻任务的缓存时间为空则触发一次任务调度,说明常驻任务长时间未更新或者是系统刚刚启动
|
// 若常驻任务的缓存时间为空则触发一次任务调度,说明常驻任务长时间未更新或者是系统刚刚启动
|
||||||
triggerTask = Objects.isNull(nextTriggerAt);
|
triggerTask = Objects.isNull(nextTriggerAt);
|
||||||
// 若出现常驻任务时间为null或者常驻任务的内存时间长期未更新, 刷新为now
|
// 若出现常驻任务时间为null或者常驻任务的内存时间长期未更新, 刷新为now
|
||||||
long now = System.currentTimeMillis();
|
|
||||||
if (Objects.isNull(nextTriggerAt)
|
if (Objects.isNull(nextTriggerAt)
|
||||||
|| (nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) {
|
|| (nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) {
|
||||||
nextTriggerAt = now;
|
nextTriggerAt = now;
|
||||||
@ -124,7 +121,7 @@ public class ScanJobTaskActor extends AbstractActor {
|
|||||||
waitUpdateJobs.add(job);
|
waitUpdateJobs.add(job);
|
||||||
|
|
||||||
if (triggerTask) {
|
if (triggerTask) {
|
||||||
waitExecJobs.add(jobTaskPrepare);
|
waitExecJobs.add(JobTaskConverter.INSTANCE.toJobTaskPrepare(partitionTask));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -136,12 +133,12 @@ public class ScanJobTaskActor extends AbstractActor {
|
|||||||
return !Objects.equals(StatusEnum.YES.getStatus(), partitionTask.getResident());
|
return !Objects.equals(StatusEnum.YES.getStatus(), partitionTask.getResident());
|
||||||
}
|
}
|
||||||
|
|
||||||
private Long calculateNextTriggerTime(JobPartitionTask partitionTask) {
|
private Long calculateNextTriggerTime(JobPartitionTask partitionTask, long now) {
|
||||||
|
|
||||||
long now = System.currentTimeMillis();
|
|
||||||
long nextTriggerAt = partitionTask.getNextTriggerAt();
|
long nextTriggerAt = partitionTask.getNextTriggerAt();
|
||||||
if ((nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) {
|
if ((nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) {
|
||||||
nextTriggerAt = now;
|
nextTriggerAt = now;
|
||||||
|
partitionTask.setNextTriggerAt(nextTriggerAt);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 更新下次触发时间
|
// 更新下次触发时间
|
||||||
@ -158,13 +155,17 @@ public class ScanJobTaskActor extends AbstractActor {
|
|||||||
return Collections.emptyList();
|
return Collections.emptyList();
|
||||||
}
|
}
|
||||||
|
|
||||||
List<Job> jobs = jobMapper.selectPage(new PageDTO<Job>(0, systemProperties.getJobPullPageSize()),
|
List<Job> jobs = jobMapper.selectPage(new PageDTO<>(0, systemProperties.getJobPullPageSize()),
|
||||||
new LambdaQueryWrapper<Job>()
|
new LambdaQueryWrapper<Job>()
|
||||||
|
.select(Job::getGroupName, Job::getNextTriggerAt, Job::getBlockStrategy, Job::getTriggerType,
|
||||||
|
Job::getTriggerInterval, Job::getExecutorTimeout, Job::getTaskType, Job::getResident,
|
||||||
|
Job::getId)
|
||||||
.eq(Job::getJobStatus, StatusEnum.YES.getStatus())
|
.eq(Job::getJobStatus, StatusEnum.YES.getStatus())
|
||||||
.eq(Job::getDeleted, StatusEnum.NO.getStatus())
|
.eq(Job::getDeleted, StatusEnum.NO.getStatus())
|
||||||
.in(Job::getBucketIndex, scanTask.getBuckets())
|
.in(Job::getBucketIndex, scanTask.getBuckets())
|
||||||
.le(Job::getNextTriggerAt, System.currentTimeMillis() + SystemConstants.SCHEDULE_PERIOD * 1000)
|
.le(Job::getNextTriggerAt, DateUtils.toNowMilli() + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD))
|
||||||
.ge(Job::getId, startId)
|
.ge(Job::getId, startId)
|
||||||
|
.orderByAsc(Job::getId)
|
||||||
).getRecords();
|
).getRecords();
|
||||||
|
|
||||||
return JobTaskConverter.INSTANCE.toJobPartitionTasks(jobs);
|
return JobTaskConverter.INSTANCE.toJobPartitionTasks(jobs);
|
||||||
|
@ -41,7 +41,7 @@ import java.util.Objects;
|
|||||||
@Component(ActorGenerator.REAL_JOB_EXECUTOR_ACTOR)
|
@Component(ActorGenerator.REAL_JOB_EXECUTOR_ACTOR)
|
||||||
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class RealJobExecutorActor extends AbstractActor {
|
public class RequestClientActor extends AbstractActor {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private JobTaskMapper jobTaskMapper;
|
private JobTaskMapper jobTaskMapper;
|
@ -36,8 +36,6 @@ import java.time.LocalDateTime;
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
public class FinishActor extends AbstractActor {
|
public class FinishActor extends AbstractActor {
|
||||||
|
|
||||||
public static final String BEAN_NAME = "FinishActor";
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private AccessTemplate accessTemplate;
|
private AccessTemplate accessTemplate;
|
||||||
@Autowired
|
@Autowired
|
||||||
@ -57,7 +55,6 @@ public class FinishActor extends AbstractActor {
|
|||||||
@Override
|
@Override
|
||||||
protected void doInTransactionWithoutResult(TransactionStatus status) {
|
protected void doInTransactionWithoutResult(TransactionStatus status) {
|
||||||
|
|
||||||
|
|
||||||
retryTask.setUpdateDt(LocalDateTime.now());
|
retryTask.setUpdateDt(LocalDateTime.now());
|
||||||
Assert.isTrue(1 == accessTemplate.getRetryTaskAccess()
|
Assert.isTrue(1 == accessTemplate.getRetryTaskAccess()
|
||||||
.updateById(retryTask.getGroupName(), retryTask),
|
.updateById(retryTask.getGroupName(), retryTask),
|
||||||
|
@ -118,7 +118,7 @@ public abstract class AbstractScanGroup extends AbstractActor {
|
|||||||
}, lastId);
|
}, lastId);
|
||||||
|
|
||||||
log.warn(this.getClass().getName() + " retry scan end. groupName:[{}] startId:[{}] preCostTime:[{}] total:[{}] realPullCount:[{}]",
|
log.warn(this.getClass().getName() + " retry scan end. groupName:[{}] startId:[{}] preCostTime:[{}] total:[{}] realPullCount:[{}]",
|
||||||
groupName, lastId, total, preCostTime().get(), count);
|
groupName, lastId, preCostTime().get(), total, count.get());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3,6 +3,7 @@ package com.aizuda.easy.retry.server.retry.task.support.handler;
|
|||||||
import cn.hutool.core.lang.Assert;
|
import cn.hutool.core.lang.Assert;
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
|
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
|
||||||
|
import com.aizuda.easy.retry.common.core.util.JsonUtil;
|
||||||
import com.aizuda.easy.retry.server.common.WaitStrategy;
|
import com.aizuda.easy.retry.server.common.WaitStrategy;
|
||||||
import com.aizuda.easy.retry.server.common.config.SystemProperties;
|
import com.aizuda.easy.retry.server.common.config.SystemProperties;
|
||||||
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
|
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
|
||||||
@ -17,9 +18,11 @@ import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
|
|||||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMapper;
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMapper;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.slf4j.helpers.FormattingTuple;
|
import org.slf4j.helpers.FormattingTuple;
|
||||||
import org.slf4j.helpers.MessageFormatter;
|
import org.slf4j.helpers.MessageFormatter;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.dao.DuplicateKeyException;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
|
||||||
@ -36,6 +39,7 @@ import java.util.concurrent.TimeUnit;
|
|||||||
* @since 1.5.0
|
* @since 1.5.0
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
|
@Slf4j
|
||||||
public class CallbackRetryTaskHandler {
|
public class CallbackRetryTaskHandler {
|
||||||
|
|
||||||
private static final String CALLBACK_UNIQUE_ID_RULE = "{}_{}";
|
private static final String CALLBACK_UNIQUE_ID_RULE = "{}_{}";
|
||||||
@ -75,9 +79,15 @@ public class CallbackRetryTaskHandler {
|
|||||||
|
|
||||||
callbackRetryTask.setNextTriggerAt(DateUtils.toLocalDateTime(waitStrategy.computeTriggerTime(waitStrategyContext)));
|
callbackRetryTask.setNextTriggerAt(DateUtils.toLocalDateTime(waitStrategy.computeTriggerTime(waitStrategyContext)));
|
||||||
|
|
||||||
|
try {
|
||||||
Assert.isTrue(1 == accessTemplate.getRetryTaskAccess()
|
Assert.isTrue(1 == accessTemplate.getRetryTaskAccess()
|
||||||
.insert(callbackRetryTask.getGroupName(), callbackRetryTask),
|
.insert(callbackRetryTask.getGroupName(), callbackRetryTask),
|
||||||
() -> new EasyRetryServerException("failed to report data"));
|
() -> new EasyRetryServerException("failed to report data"));
|
||||||
|
} catch (DuplicateKeyException e) {
|
||||||
|
log.warn("回调数据重复新增. [{}]", JsonUtil.toJsonString(retryTask));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// 初始化回调日志
|
// 初始化回调日志
|
||||||
RetryTaskLog retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTask(callbackRetryTask);
|
RetryTaskLog retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTask(callbackRetryTask);
|
||||||
|
@ -13,6 +13,7 @@ import io.netty.util.TimerTask;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author: www.byteblogs.com
|
* @author: www.byteblogs.com
|
||||||
@ -38,6 +39,9 @@ public class CallbackTimerTask extends AbstractTimerTask {
|
|||||||
.eq(RetryTask::getGroupName, context.getGroupName())
|
.eq(RetryTask::getGroupName, context.getGroupName())
|
||||||
.eq(RetryTask::getUniqueId, context.getUniqueId())
|
.eq(RetryTask::getUniqueId, context.getUniqueId())
|
||||||
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()));
|
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()));
|
||||||
|
if (Objects.isNull(retryTask)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
TaskExecutor taskExecutor = TaskActuatorFactory.getTaskActuator(context.getScene());
|
TaskExecutor taskExecutor = TaskActuatorFactory.getTaskActuator(context.getScene());
|
||||||
taskExecutor.actuator(retryTask);
|
taskExecutor.actuator(retryTask);
|
||||||
}
|
}
|
||||||
|
@ -13,6 +13,8 @@ import io.netty.util.Timeout;
|
|||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author: www.byteblogs.com
|
* @author: www.byteblogs.com
|
||||||
* @date : 2023-09-22 17:09
|
* @date : 2023-09-22 17:09
|
||||||
@ -37,6 +39,9 @@ public class RetryTimerTask extends AbstractTimerTask {
|
|||||||
.eq(RetryTask::getGroupName, context.getGroupName())
|
.eq(RetryTask::getGroupName, context.getGroupName())
|
||||||
.eq(RetryTask::getUniqueId, context.getUniqueId())
|
.eq(RetryTask::getUniqueId, context.getUniqueId())
|
||||||
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()));
|
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()));
|
||||||
|
if (Objects.isNull(retryTask)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
TaskExecutor taskExecutor = TaskActuatorFactory.getTaskActuator(context.getScene());
|
TaskExecutor taskExecutor = TaskActuatorFactory.getTaskActuator(context.getScene());
|
||||||
taskExecutor.actuator(retryTask);
|
taskExecutor.actuator(retryTask);
|
||||||
}
|
}
|
||||||
|
@ -67,6 +67,14 @@ public class ConsumerBucketActor extends AbstractActor {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (SystemModeEnum.isJob(systemProperties.getMode())) {
|
||||||
|
// 扫描回调数据
|
||||||
|
ScanTask scanTask = new ScanTask();
|
||||||
|
scanTask.setBuckets(consumerBucket.getBuckets());
|
||||||
|
ActorRef scanJobActorRef = cacheActorRef("DEFAULT_JOB_KEY", TaskTypeEnum.JOB);
|
||||||
|
scanJobActorRef.tell(scanTask, scanJobActorRef);
|
||||||
|
}
|
||||||
|
|
||||||
if (SystemModeEnum.isRetry(systemProperties.getMode())) {
|
if (SystemModeEnum.isRetry(systemProperties.getMode())) {
|
||||||
List<GroupConfig> groupConfigs = null;
|
List<GroupConfig> groupConfigs = null;
|
||||||
try {
|
try {
|
||||||
@ -93,14 +101,6 @@ public class ConsumerBucketActor extends AbstractActor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (SystemModeEnum.isJob(systemProperties.getMode())) {
|
|
||||||
// 扫描回调数据
|
|
||||||
ScanTask scanTask = new ScanTask();
|
|
||||||
scanTask.setBuckets(consumerBucket.getBuckets());
|
|
||||||
ActorRef scanJobActorRef = cacheActorRef("DEFAULT_JOB_KEY", TaskTypeEnum.JOB);
|
|
||||||
scanJobActorRef.tell(scanTask, scanJobActorRef);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -0,0 +1,103 @@
|
|||||||
|
akka {
|
||||||
|
actor {
|
||||||
|
common-log-dispatcher {
|
||||||
|
type = "Dispatcher"
|
||||||
|
executor = "thread-pool-executor"
|
||||||
|
thread-pool-executor {
|
||||||
|
core-pool-size-min = 8
|
||||||
|
core-pool-size-factor = 2.0
|
||||||
|
core-pool-size-max = 64
|
||||||
|
}
|
||||||
|
throughput = 10
|
||||||
|
}
|
||||||
|
|
||||||
|
common-scan-task-dispatcher {
|
||||||
|
type = "Dispatcher"
|
||||||
|
executor = "thread-pool-executor"
|
||||||
|
thread-pool-executor {
|
||||||
|
core-pool-size-min = 8
|
||||||
|
core-pool-size-factor = 2.0
|
||||||
|
core-pool-size-max = 64
|
||||||
|
}
|
||||||
|
throughput = 10
|
||||||
|
}
|
||||||
|
|
||||||
|
netty-receive-request-dispatcher {
|
||||||
|
type = "Dispatcher"
|
||||||
|
executor = "thread-pool-executor"
|
||||||
|
thread-pool-executor {
|
||||||
|
core-pool-size-min = 16
|
||||||
|
core-pool-size-factor = 2.0
|
||||||
|
core-pool-size-max = 64
|
||||||
|
}
|
||||||
|
throughput = 10
|
||||||
|
}
|
||||||
|
|
||||||
|
retry-task-executor-dispatcher {
|
||||||
|
type = "Dispatcher"
|
||||||
|
executor = "thread-pool-executor"
|
||||||
|
thread-pool-executor {
|
||||||
|
core-pool-size-min = 32
|
||||||
|
core-pool-size-factor = 2.0
|
||||||
|
core-pool-size-max = 64
|
||||||
|
}
|
||||||
|
throughput = 10
|
||||||
|
}
|
||||||
|
|
||||||
|
retry-task-executor-result-dispatcher {
|
||||||
|
type = "Dispatcher"
|
||||||
|
executor = "thread-pool-executor"
|
||||||
|
thread-pool-executor {
|
||||||
|
core-pool-size-min = 8
|
||||||
|
core-pool-size-factor = 2.0
|
||||||
|
core-pool-size-max = 64
|
||||||
|
}
|
||||||
|
throughput = 10
|
||||||
|
}
|
||||||
|
|
||||||
|
job-task-prepare-dispatcher {
|
||||||
|
type = "Dispatcher"
|
||||||
|
executor = "thread-pool-executor"
|
||||||
|
thread-pool-executor {
|
||||||
|
core-pool-size-min = 32
|
||||||
|
core-pool-size-factor = 2.0
|
||||||
|
core-pool-size-max = 64
|
||||||
|
}
|
||||||
|
throughput = 10
|
||||||
|
}
|
||||||
|
|
||||||
|
job-task-executor-dispatcher {
|
||||||
|
type = "Dispatcher"
|
||||||
|
executor = "thread-pool-executor"
|
||||||
|
thread-pool-executor {
|
||||||
|
core-pool-size-min = 32
|
||||||
|
core-pool-size-factor = 2.0
|
||||||
|
core-pool-size-max = 64
|
||||||
|
}
|
||||||
|
throughput = 10
|
||||||
|
}
|
||||||
|
|
||||||
|
job-task-executor-call-client-dispatcher {
|
||||||
|
type = "Dispatcher"
|
||||||
|
executor = "thread-pool-executor"
|
||||||
|
thread-pool-executor {
|
||||||
|
core-pool-size-min = 32
|
||||||
|
core-pool-size-factor = 2.0
|
||||||
|
core-pool-size-max = 64
|
||||||
|
}
|
||||||
|
throughput = 10
|
||||||
|
}
|
||||||
|
|
||||||
|
job-task-executor-result-dispatcher {
|
||||||
|
type = "Dispatcher"
|
||||||
|
executor = "thread-pool-executor"
|
||||||
|
thread-pool-executor {
|
||||||
|
core-pool-size-min = 8
|
||||||
|
core-pool-size-factor = 2.0
|
||||||
|
core-pool-size-max = 64
|
||||||
|
}
|
||||||
|
throughput = 10
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user