From 75abf2cc550b023c3a84345eb1ebd0bf6e6cdcc5 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Mon, 13 Nov 2023 00:02:36 +0800 Subject: [PATCH] =?UTF-8?q?feat:2.4.0=201.=20=E6=94=AF=E6=8C=81=E5=A4=9A?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E6=BA=90=202.=E4=B8=8D=E5=90=8C=E7=9A=84disp?= =?UTF-8?q?atcher=E7=BA=BF=E7=A8=8B=E6=B1=A0=E9=9A=94=E7=A6=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/sql/easy_retry_postgre.sql | 11 +- .../job/core/cache/ThreadPoolCache.java | 11 +- .../retry/client/job/core/dto/JobArgs.java | 2 + .../core/executor/AbstractJobExecutor.java | 3 + .../executor/JobExecutorFutureCallback.java | 4 +- .../mariadb/mapper/JobLogMessageMapper.xml | 17 +++ .../resources/mariadb/mapper/JobMapper.xml | 43 +++++++ .../mariadb/mapper/JobTaskBatchMapper.xml | 38 ++++++ .../mariadb/mapper/JobTaskMapper.xml | 18 +++ .../postgres/mapper/JobLogMessageMapper.xml | 17 +++ .../resources/postgres/mapper/JobMapper.xml | 43 +++++++ .../postgres/mapper/JobTaskBatchMapper.xml | 38 ++++++ .../postgres/mapper/JobTaskMapper.xml | 18 +++ .../server/common/akka/ActorGenerator.java | 113 +++++++++++------- .../server/common/akka/AkkaConfiguration.java | 68 ++++------- .../server/job/task/dto/JobPartitionTask.java | 5 - .../job/task/dto/JobTaskPrepareDTO.java | 5 - .../support/dispatch/JobExecutorActor.java | 4 + .../dispatch/JobExecutorResultActor.java | 66 +++++----- .../support/dispatch/JobTaskPrepareActor.java | 3 +- .../support/dispatch/ScanJobTaskActor.java | 43 +++---- ...utorActor.java => RequestClientActor.java} | 2 +- .../dispatch/actor/result/FinishActor.java | 3 - .../actor/scan/AbstractScanGroup.java | 2 +- .../handler/CallbackRetryTaskHandler.java | 16 ++- .../task/support/timer/CallbackTimerTask.java | 4 + .../task/support/timer/RetryTimerTask.java | 5 + .../starter/dispatch/ConsumerBucketActor.java | 16 +-- .../src/main/resources/easyretry.conf | 103 ++++++++++++++++ 29 files changed, 548 insertions(+), 173 deletions(-) create mode 100644 easy-retry-datasource/easy-retry-mariadb-datasource/src/main/resources/mariadb/mapper/JobLogMessageMapper.xml create mode 100644 easy-retry-datasource/easy-retry-mariadb-datasource/src/main/resources/mariadb/mapper/JobMapper.xml create mode 100644 easy-retry-datasource/easy-retry-mariadb-datasource/src/main/resources/mariadb/mapper/JobTaskBatchMapper.xml create mode 100644 easy-retry-datasource/easy-retry-mariadb-datasource/src/main/resources/mariadb/mapper/JobTaskMapper.xml create mode 100644 easy-retry-datasource/easy-retry-postgres-datasource/src/main/resources/postgres/mapper/JobLogMessageMapper.xml create mode 100644 easy-retry-datasource/easy-retry-postgres-datasource/src/main/resources/postgres/mapper/JobMapper.xml create mode 100644 easy-retry-datasource/easy-retry-postgres-datasource/src/main/resources/postgres/mapper/JobTaskBatchMapper.xml create mode 100644 easy-retry-datasource/easy-retry-postgres-datasource/src/main/resources/postgres/mapper/JobTaskMapper.xml rename easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/{RealJobExecutorActor.java => RequestClientActor.java} (99%) create mode 100644 easy-retry-server/easy-retry-server-starter/src/main/resources/easyretry.conf diff --git a/doc/sql/easy_retry_postgre.sql b/doc/sql/easy_retry_postgre.sql index 33868c7f..e17ed9d0 100644 --- a/doc/sql/easy_retry_postgre.sql +++ b/doc/sql/easy_retry_postgre.sql @@ -8,9 +8,9 @@ CREATE TABLE group_config group_status SMALLINT NOT NULL DEFAULT 0, version INT NOT NULL, group_partition INT NOT NULL, - route_key SMALLINT NOT NULL, id_generator_mode SMALLINT NOT NULL DEFAULT 1, init_scene SMALLINT NOT NULL DEFAULT 0, + bucket_index INT NOT NULL DEFAULT 0, create_dt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, update_dt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ); @@ -90,7 +90,7 @@ COMMENT ON COLUMN "retry_dead_letter_0"."ext_attrs" IS '扩展字段'; COMMENT ON COLUMN "retry_dead_letter_0"."task_type" IS '任务类型 1、重试数据 2、回调数据'; COMMENT ON COLUMN "retry_dead_letter_0"."create_dt" IS '创建时间'; COMMENT ON TABLE "retry_dead_letter_0" IS '死信队列表'; - + CREATE TABLE retry_task_0 ( id BIGSERIAL PRIMARY KEY, @@ -106,6 +106,7 @@ CREATE TABLE retry_task_0 retry_count INT NOT NULL DEFAULT 0, retry_status SMALLINT NOT NULL DEFAULT 0, task_type SMALLINT NOT NULL DEFAULT 1, + route_key SMALLINT NOT NULL, create_dt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, update_dt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ); @@ -289,7 +290,7 @@ COMMENT ON COLUMN "system_user"."role" IS '角色:1-普通用户、2-管理员 COMMENT ON COLUMN "system_user"."create_dt" IS '创建时间'; COMMENT ON COLUMN "system_user"."update_dt" IS '修改时间'; COMMENT ON TABLE "system_user" IS '系统用户表'; - + -- pwd: admin INSERT INTO system_user (username, password, role) VALUES ('admin', '465c194afb65670f38322df087f0a9bb225cc257e43eb4ac5a0c98ef5b3173ac', 2); @@ -311,7 +312,7 @@ COMMENT ON COLUMN "system_user_permission"."create_dt" IS '创建时间'; COMMENT ON COLUMN "system_user_permission"."update_dt" IS '修改时间'; COMMENT ON TABLE "system_user_permission" IS '系统用户权限表'; - + CREATE TABLE sequence_alloc ( id BIGSERIAL PRIMARY KEY, @@ -327,4 +328,4 @@ COMMENT ON COLUMN "sequence_alloc"."group_name" IS '组名称'; COMMENT ON COLUMN "sequence_alloc"."max_id" IS '最大id'; COMMENT ON COLUMN "sequence_alloc"."step" IS '步长'; COMMENT ON COLUMN "sequence_alloc"."update_dt" IS '更新时间'; -COMMENT ON TABLE "sequence_alloc" IS '号段模式序号ID分配表'; \ No newline at end of file +COMMENT ON TABLE "sequence_alloc" IS '号段模式序号ID分配表'; diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/cache/ThreadPoolCache.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/cache/ThreadPoolCache.java index 75be2f90..46376005 100644 --- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/cache/ThreadPoolCache.java +++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/cache/ThreadPoolCache.java @@ -1,7 +1,10 @@ package com.aizuda.easy.retry.client.job.core.cache; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.stereotype.Component; +import java.text.MessageFormat; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; @@ -16,14 +19,20 @@ import java.util.function.Supplier; * @since : 2.4.0 */ @Component +@Slf4j public class ThreadPoolCache { private static final ConcurrentHashMap CACHE_THREAD_POOL = new ConcurrentHashMap<>(); public static ThreadPoolExecutor createThreadPool(Long taskBatchId, int parallelNum) { + if (CACHE_THREAD_POOL.containsKey(taskBatchId)) { + return CACHE_THREAD_POOL.get(taskBatchId); + } + Supplier supplier = () -> { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( - parallelNum, parallelNum, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + parallelNum, parallelNum, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), + new CustomizableThreadFactory(MessageFormat.format("easy-retry-job-{0}-", taskBatchId))); threadPoolExecutor.allowCoreThreadTimeOut(true); return threadPoolExecutor; }; diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/dto/JobArgs.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/dto/JobArgs.java index 3648b671..410d0ad0 100644 --- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/dto/JobArgs.java +++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/dto/JobArgs.java @@ -13,4 +13,6 @@ public class JobArgs { private String argsStr; private String executorInfo; + + private Long taskBatchId; } diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/AbstractJobExecutor.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/AbstractJobExecutor.java index f666afc3..8c76a335 100644 --- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/AbstractJobExecutor.java +++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/AbstractJobExecutor.java @@ -14,6 +14,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Callable; import java.util.concurrent.ThreadPoolExecutor; @@ -26,6 +27,7 @@ import java.util.concurrent.TimeUnit; * @date : 2023-09-27 09:48 * @since 2.4.0 */ +@Slf4j public abstract class AbstractJobExecutor implements IJobExecutor { @Override @@ -58,6 +60,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor { JobArgs jobArgs = new JobArgs(); jobArgs.setArgsStr(jobContext.getArgsStr()); jobArgs.setExecutorInfo(jobContext.getExecutorInfo()); + jobArgs.setTaskBatchId(jobContext.getTaskBatchId()); return jobArgs; } diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/JobExecutorFutureCallback.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/JobExecutorFutureCallback.java index 5f905386..1e6cb862 100644 --- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/JobExecutorFutureCallback.java +++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/JobExecutorFutureCallback.java @@ -39,7 +39,7 @@ public class JobExecutorFutureCallback implements FutureCallback @Override public void onSuccess(ExecuteResult result) { // 上报执行成功 - log.info("任务执行成功 [{}]", JsonUtil.toJsonString(result)); + log.warn("任务执行成功 taskBatchId:[{}] [{}]", jobContext.getTaskBatchId(), JsonUtil.toJsonString(result)); if (Objects.isNull(result)) { result = ExecuteResult.success(); @@ -70,7 +70,7 @@ public class JobExecutorFutureCallback implements FutureCallback @Override public void onFailure(final Throwable t) { // 上报执行失败 - log.error("任务执行失败 jobTask:[{}]", jobContext.getTaskId(), t); + log.error("任务执行失败 任务执行成功 taskBatchId:[{}]", jobContext.getTaskBatchId(), t); try { ExecuteResult failure = ExecuteResult.failure(); diff --git a/easy-retry-datasource/easy-retry-mariadb-datasource/src/main/resources/mariadb/mapper/JobLogMessageMapper.xml b/easy-retry-datasource/easy-retry-mariadb-datasource/src/main/resources/mariadb/mapper/JobLogMessageMapper.xml new file mode 100644 index 00000000..6c87b753 --- /dev/null +++ b/easy-retry-datasource/easy-retry-mariadb-datasource/src/main/resources/mariadb/mapper/JobLogMessageMapper.xml @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + + + diff --git a/easy-retry-datasource/easy-retry-mariadb-datasource/src/main/resources/mariadb/mapper/JobMapper.xml b/easy-retry-datasource/easy-retry-mariadb-datasource/src/main/resources/mariadb/mapper/JobMapper.xml new file mode 100644 index 00000000..de263ea0 --- /dev/null +++ b/easy-retry-datasource/easy-retry-mariadb-datasource/src/main/resources/mariadb/mapper/JobMapper.xml @@ -0,0 +1,43 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + update job rt, + ( + + select + #{item.nextTriggerAt} as next_trigger_at, + #{item.id} as id + + ) tt + set + rt.next_trigger_at = tt.next_trigger_at + where rt.id = tt.id + + + diff --git a/easy-retry-datasource/easy-retry-mariadb-datasource/src/main/resources/mariadb/mapper/JobTaskBatchMapper.xml b/easy-retry-datasource/easy-retry-mariadb-datasource/src/main/resources/mariadb/mapper/JobTaskBatchMapper.xml new file mode 100644 index 00000000..b506e7a2 --- /dev/null +++ b/easy-retry-datasource/easy-retry-mariadb-datasource/src/main/resources/mariadb/mapper/JobTaskBatchMapper.xml @@ -0,0 +1,38 @@ + + + + + + + + + + + + + + + + diff --git a/easy-retry-datasource/easy-retry-mariadb-datasource/src/main/resources/mariadb/mapper/JobTaskMapper.xml b/easy-retry-datasource/easy-retry-mariadb-datasource/src/main/resources/mariadb/mapper/JobTaskMapper.xml new file mode 100644 index 00000000..0cbf78f3 --- /dev/null +++ b/easy-retry-datasource/easy-retry-mariadb-datasource/src/main/resources/mariadb/mapper/JobTaskMapper.xml @@ -0,0 +1,18 @@ + + + + + + + + + + + + + + + + + + diff --git a/easy-retry-datasource/easy-retry-postgres-datasource/src/main/resources/postgres/mapper/JobLogMessageMapper.xml b/easy-retry-datasource/easy-retry-postgres-datasource/src/main/resources/postgres/mapper/JobLogMessageMapper.xml new file mode 100644 index 00000000..6c87b753 --- /dev/null +++ b/easy-retry-datasource/easy-retry-postgres-datasource/src/main/resources/postgres/mapper/JobLogMessageMapper.xml @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + + + diff --git a/easy-retry-datasource/easy-retry-postgres-datasource/src/main/resources/postgres/mapper/JobMapper.xml b/easy-retry-datasource/easy-retry-postgres-datasource/src/main/resources/postgres/mapper/JobMapper.xml new file mode 100644 index 00000000..de263ea0 --- /dev/null +++ b/easy-retry-datasource/easy-retry-postgres-datasource/src/main/resources/postgres/mapper/JobMapper.xml @@ -0,0 +1,43 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + update job rt, + ( + + select + #{item.nextTriggerAt} as next_trigger_at, + #{item.id} as id + + ) tt + set + rt.next_trigger_at = tt.next_trigger_at + where rt.id = tt.id + + + diff --git a/easy-retry-datasource/easy-retry-postgres-datasource/src/main/resources/postgres/mapper/JobTaskBatchMapper.xml b/easy-retry-datasource/easy-retry-postgres-datasource/src/main/resources/postgres/mapper/JobTaskBatchMapper.xml new file mode 100644 index 00000000..b506e7a2 --- /dev/null +++ b/easy-retry-datasource/easy-retry-postgres-datasource/src/main/resources/postgres/mapper/JobTaskBatchMapper.xml @@ -0,0 +1,38 @@ + + + + + + + + + + + + + + + + diff --git a/easy-retry-datasource/easy-retry-postgres-datasource/src/main/resources/postgres/mapper/JobTaskMapper.xml b/easy-retry-datasource/easy-retry-postgres-datasource/src/main/resources/postgres/mapper/JobTaskMapper.xml new file mode 100644 index 00000000..0cbf78f3 --- /dev/null +++ b/easy-retry-datasource/easy-retry-postgres-datasource/src/main/resources/postgres/mapper/JobTaskMapper.xml @@ -0,0 +1,18 @@ + + + + + + + + + + + + + + + + + + diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java index 48dc4f99..2914ac2e 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java @@ -13,18 +13,32 @@ import com.aizuda.easy.retry.common.core.context.SpringContext; */ public class ActorGenerator { + /*----------------------------------------系统通用配置 START----------------------------------------*/ + + public static final String SCAN_BUCKET_ACTOR = "ScanBucketActor"; + public static final String REQUEST_HANDLER_ACTOR = "RequestHandlerActor"; + private static final String COMMON_LOG_DISPATCHER = "akka.actor.common-log-dispatcher"; + private static final String COMMON_SCAN_TASK_DISPATCHER = "akka.actor.common-scan-task-dispatcher"; + private static final String NETTY_RECEIVE_REQUEST_DISPATCHER = "akka.actor.netty-receive-request-dispatcher"; + + /*----------------------------------------系统通用配置 END----------------------------------------*/ + + /*----------------------------------------分布式重试任务 START----------------------------------------*/ public static final String SCAN_CALLBACK_GROUP_ACTOR = "ScanCallbackGroupActor"; public static final String SCAN_RETRY_GROUP_ACTOR = "ScanGroupActor"; - public static final String SCAN_BUCKET_ACTOR = "ScanBucketActor"; public static final String FINISH_ACTOR = "FinishActor"; public static final String FAILURE_ACTOR = "FailureActor"; public static final String NO_RETRY_ACTOR = "NoRetryActor"; public static final String EXEC_CALLBACK_UNIT_ACTOR = "ExecCallbackUnitActor"; public static final String EXEC_UNIT_ACTOR = "ExecUnitActor"; public static final String LOG_ACTOR = "LogActor"; - public static final String REQUEST_HANDLER_ACTOR = "RequestHandlerActor"; - /*----------------------------------------分布式任务调度----------------------------------------*/ + private static final String RETRY_TASK_EXECUTOR_DISPATCHER = "akka.actor.retry-task-executor-dispatcher"; + private static final String RETRY_TASK_EXECUTOR_RESULT_DISPATCHER = "akka.actor.retry-task-executor-result-dispatcher"; + + /*----------------------------------------分布式重试任务 END----------------------------------------*/ + + /*----------------------------------------分布式任务调度 START----------------------------------------*/ public static final String SCAN_JOB_ACTOR = "ScanJobActor"; public static final String JOB_TASK_PREPARE_ACTOR = "JobTaskPrepareActor"; public static final String JOB_EXECUTOR_ACTOR = "JobExecutorActor"; @@ -33,6 +47,14 @@ public class ActorGenerator { public static final String REAL_JOB_EXECUTOR_ACTOR = "RealJobExecutorActor"; public static final String REAL_STOP_TASK_INSTANCE_ACTOR = "RealStopTaskInstanceActor"; + /*----------------------------------------dispatcher----------------------------------------*/ + private static final String JOB_TASK_DISPATCHER = "akka.actor.job-task-prepare-dispatcher"; + private static final String JOB_TASK_EXECUTOR_DISPATCHER = "akka.actor.job-task-executor-dispatcher"; + private static final String JOB_TASK_EXECUTOR_RESULT_DISPATCHER = "akka.actor.job-task-executor-result-dispatcher"; + private static final String JOB_TASK_EXECUTOR_CALL_CLIENT_DISPATCHER = "akka.actor.job-task-executor-call-client-dispatcher"; + + /*----------------------------------------分布式任务调度 END----------------------------------------*/ + private ActorGenerator() {} /** @@ -41,7 +63,7 @@ public class ActorGenerator { * @return actor 引用 */ public static ActorRef finishActor() { - return getDispatchResultActorSystem().actorOf(getSpringExtension().props(FINISH_ACTOR)); + return getRetryActorSystem().actorOf(getSpringExtension().props(FINISH_ACTOR).withDispatcher(RETRY_TASK_EXECUTOR_RESULT_DISPATCHER)); } /** @@ -50,7 +72,7 @@ public class ActorGenerator { * @return actor 引用 */ public static ActorRef failureActor() { - return getDispatchResultActorSystem().actorOf(getSpringExtension().props(FAILURE_ACTOR)); + return getRetryActorSystem().actorOf(getSpringExtension().props(FAILURE_ACTOR).withDispatcher(RETRY_TASK_EXECUTOR_RESULT_DISPATCHER)); } /** @@ -59,7 +81,7 @@ public class ActorGenerator { * @return actor 引用 */ public static ActorRef noRetryActor() { - return getDispatchResultActorSystem().actorOf(getSpringExtension().props(NO_RETRY_ACTOR)); + return getRetryActorSystem().actorOf(getSpringExtension().props(NO_RETRY_ACTOR).withDispatcher(RETRY_TASK_EXECUTOR_RESULT_DISPATCHER)); } /** @@ -68,7 +90,7 @@ public class ActorGenerator { * @return actor 引用 */ public static ActorRef execCallbackUnitActor() { - return getDispatchResultActorSystem().actorOf(getSpringExtension().props(EXEC_CALLBACK_UNIT_ACTOR)); + return getRetryActorSystem().actorOf(getSpringExtension().props(EXEC_CALLBACK_UNIT_ACTOR).withDispatcher(RETRY_TASK_EXECUTOR_DISPATCHER)); } /** @@ -77,7 +99,9 @@ public class ActorGenerator { * @return actor 引用 */ public static ActorRef execUnitActor() { - return getDispatchExecUnitActorSystem().actorOf(getSpringExtension().props(EXEC_UNIT_ACTOR)); + return getRetryActorSystem().actorOf(getSpringExtension() + .props(EXEC_UNIT_ACTOR) + .withDispatcher(RETRY_TASK_EXECUTOR_DISPATCHER)); } /** @@ -86,7 +110,9 @@ public class ActorGenerator { * @return actor 引用 */ public static ActorRef scanGroupActor() { - return getDispatchRetryActorSystem().actorOf(getSpringExtension().props(SCAN_RETRY_GROUP_ACTOR)); + return getCommonActorSystemSystem().actorOf(getSpringExtension() + .props(SCAN_RETRY_GROUP_ACTOR) + .withDispatcher(COMMON_SCAN_TASK_DISPATCHER)); } /** @@ -95,7 +121,9 @@ public class ActorGenerator { * @return actor 引用 */ public static ActorRef scanCallbackGroupActor() { - return getDispatchRetryActorSystem().actorOf(getSpringExtension().props(SCAN_CALLBACK_GROUP_ACTOR)); + return getRetryActorSystem().actorOf(getSpringExtension() + .props(SCAN_CALLBACK_GROUP_ACTOR) + .withDispatcher(COMMON_SCAN_TASK_DISPATCHER)); } /** @@ -104,7 +132,9 @@ public class ActorGenerator { * @return actor 引用 */ public static ActorRef scanJobActor() { - return getDispatchRetryActorSystem().actorOf(getSpringExtension().props(SCAN_JOB_ACTOR)); + return getCommonActorSystemSystem().actorOf(getSpringExtension() + .props(SCAN_JOB_ACTOR) + .withDispatcher(COMMON_SCAN_TASK_DISPATCHER)); } /** @@ -113,7 +143,9 @@ public class ActorGenerator { * @return actor 引用 */ public static ActorRef scanBucketActor() { - return getDispatchRetryActorSystem().actorOf(getSpringExtension().props(SCAN_BUCKET_ACTOR)); + return getCommonActorSystemSystem().actorOf(getSpringExtension() + .props(SCAN_BUCKET_ACTOR) + .withDispatcher(COMMON_SCAN_TASK_DISPATCHER)); } /** @@ -122,7 +154,9 @@ public class ActorGenerator { * @return actor 引用 */ public static ActorRef logActor() { - return getLogActorSystemSystem().actorOf(getSpringExtension().props(LOG_ACTOR)); + return getCommonActorSystemSystem().actorOf(getSpringExtension() + .props(LOG_ACTOR) + .withDispatcher(COMMON_LOG_DISPATCHER)); } /** @@ -131,7 +165,8 @@ public class ActorGenerator { * @return actor 引用 */ public static ActorRef requestHandlerActor() { - return getNettyActorSystem().actorOf(getSpringExtension().props(REQUEST_HANDLER_ACTOR)); + return getNettyActorSystem().actorOf(getSpringExtension().props(REQUEST_HANDLER_ACTOR) + .withDispatcher(NETTY_RECEIVE_REQUEST_DISPATCHER)); } @@ -141,7 +176,8 @@ public class ActorGenerator { * @return actor 引用 */ public static ActorRef jobTaskPrepareActor() { - return getJobActorSystem().actorOf(getSpringExtension().props(JOB_TASK_PREPARE_ACTOR)); + return getJobActorSystem().actorOf(getSpringExtension().props(JOB_TASK_PREPARE_ACTOR) + .withDispatcher(JOB_TASK_DISPATCHER)); } /** @@ -150,7 +186,11 @@ public class ActorGenerator { * @return actor 引用 */ public static ActorRef jobTaskExecutorActor() { - return getJobActorSystem().actorOf(getSpringExtension().props(JOB_EXECUTOR_ACTOR)); + return getJobActorSystem() + .actorOf(getSpringExtension() + .props(JOB_EXECUTOR_ACTOR) + .withDispatcher(JOB_TASK_EXECUTOR_DISPATCHER) + ); } /** @@ -159,7 +199,9 @@ public class ActorGenerator { * @return actor 引用 */ public static ActorRef jobTaskExecutorResultActor() { - return getJobActorSystem().actorOf(getSpringExtension().props(JOB_EXECUTOR_RESULT_ACTOR)); + return getJobActorSystem().actorOf(getSpringExtension() + .props(JOB_EXECUTOR_RESULT_ACTOR) + .withDispatcher(JOB_TASK_EXECUTOR_RESULT_DISPATCHER)); } /** @@ -168,7 +210,9 @@ public class ActorGenerator { * @return actor 引用 */ public static ActorRef jobRealTaskExecutorActor() { - return getJobActorSystem().actorOf(getSpringExtension().props(REAL_JOB_EXECUTOR_ACTOR)); + return getJobActorSystem().actorOf(getSpringExtension() + .props(REAL_JOB_EXECUTOR_ACTOR) + .withDispatcher(JOB_TASK_EXECUTOR_CALL_CLIENT_DISPATCHER)); } /** @@ -177,7 +221,8 @@ public class ActorGenerator { * @return actor 引用 */ public static ActorRef jobRealStopTaskInstanceActor() { - return getJobActorSystem().actorOf(getSpringExtension().props(REAL_STOP_TASK_INSTANCE_ACTOR)); + return getJobActorSystem().actorOf(getSpringExtension().props(REAL_STOP_TASK_INSTANCE_ACTOR) + .withDispatcher(JOB_TASK_EXECUTOR_CALL_CLIENT_DISPATCHER)); } /** @@ -186,35 +231,20 @@ public class ActorGenerator { * @return actor 引用 */ public static ActorRef jobLogActor() { - return getJobActorSystem().actorOf(getSpringExtension().props(JOB_LOG_ACTOR)); + return getCommonActorSystemSystem().actorOf(getSpringExtension().props(JOB_LOG_ACTOR) + .withDispatcher(COMMON_LOG_DISPATCHER)); } public static SpringExtension getSpringExtension() { return SpringContext.getBeanByType(SpringExtension.class); } - /** - * 重试任务提取器 - * @return - */ - public static ActorSystem getDispatchRetryActorSystem() { - return SpringContext.getBean("dispatchRetryActorSystem", ActorSystem.class); - } - - /** - * 重试任务分发器 - * @return - */ - public static ActorSystem getDispatchExecUnitActorSystem() { - return SpringContext.getBean("dispatchExecUnitActorSystem", ActorSystem.class); - } - - /** + /** * 重试任务结果分发器 * @return */ - public static ActorSystem getDispatchResultActorSystem() { - return SpringContext.getBean("dispatchResultActorSystem", ActorSystem.class); + public static ActorSystem getRetryActorSystem() { + return SpringContext.getBean("retryActorSystem", ActorSystem.class); } /** @@ -222,8 +252,8 @@ public class ActorGenerator { * * @return */ - public static ActorSystem getLogActorSystemSystem() { - return SpringContext.getBean("logActorSystem", ActorSystem.class); + public static ActorSystem getCommonActorSystemSystem() { + return SpringContext.getBean("commonActorSystem", ActorSystem.class); } @@ -245,5 +275,4 @@ public class ActorGenerator { return SpringContext.getBean("jobActorSystem", ActorSystem.class); } - } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/AkkaConfiguration.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/AkkaConfiguration.java index 5fdde59b..3b629963 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/AkkaConfiguration.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/AkkaConfiguration.java @@ -1,6 +1,7 @@ package com.aizuda.easy.retry.server.common.akka; import akka.actor.ActorSystem; +import com.typesafe.config.ConfigFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; @@ -15,63 +16,26 @@ import org.springframework.context.annotation.Configuration; @Configuration public class AkkaConfiguration { - private static final String DISPATCH_RETRY_ACTOR_SYSTEM = "DISPATCH_RETRY_ACTOR_SYSTEM"; - private static final String DISPATCH_EXEC_UNIT_RETRY_ACTOR_SYSTEM = "DISPATCH_EXEC_UNIT_RETRY_ACTOR_SYSTEM"; - private static final String DISPATCH_RESULT_ACTOR_SYSTEM = "DISPATCH_RESULT_ACTOR_SYSTEM"; - private static final String LOG_ACTOR_SYSTEM = "LOG_ACTOR_SYSTEM"; + private static final String CONFIG_NAME = "easyretry"; private static final String NETTY_ACTOR_SYSTEM = "NETTY_ACTOR_SYSTEM"; private static final String JOB_ACTOR_SYSTEM = "JOB_ACTOR_SYSTEM"; + private static final String RETRY_ACTOR_SYSTEM = "RETRY_ACTOR_SYSTEM"; + private static final String COMMON_ACTOR_SYSTEM = "COMMON_ACTOR_SYSTEM"; + @Autowired private ApplicationContext applicationContext; @Autowired private SpringExtension springExtension; - /** - * 重试分发ActorSystem - * - * @return {@link ActorSystem} 顶级actor - */ - @Bean("dispatchRetryActorSystem") - public ActorSystem createDispatchRetryActorSystem() { - ActorSystem system = ActorSystem.create(DISPATCH_RETRY_ACTOR_SYSTEM); - springExtension.initialize(applicationContext); - return system; - } - - /** - * 重试分发执行ActorSystem - * - * @return {@link ActorSystem} 顶级actor - */ - @Bean("dispatchExecUnitActorSystem") - public ActorSystem createDispatchExecUnitRetryActorSystem() { - ActorSystem system = ActorSystem.create(DISPATCH_EXEC_UNIT_RETRY_ACTOR_SYSTEM); - springExtension.initialize(applicationContext); - return system; - } - - - /** - * 重试分发执行结果处理ActorSystem - * - * @return {@link ActorSystem} 顶级actor - */ - @Bean("dispatchResultActorSystem") - public ActorSystem createDispatchResultRetryActorSystem() { - ActorSystem system = ActorSystem.create(DISPATCH_RESULT_ACTOR_SYSTEM); - springExtension.initialize(applicationContext); - return system; - } - /** * 日志处理 * * @return {@link ActorSystem} 顶级actor */ - @Bean("logActorSystem") + @Bean("commonActorSystem") public ActorSystem createLogActorSystem() { - ActorSystem system = ActorSystem.create(LOG_ACTOR_SYSTEM); + ActorSystem system = ActorSystem.create(COMMON_ACTOR_SYSTEM, ConfigFactory.load(CONFIG_NAME)); springExtension.initialize(applicationContext); return system; } @@ -83,11 +47,24 @@ public class AkkaConfiguration { */ @Bean("nettyActorSystem") public ActorSystem nettyActorSystem() { - ActorSystem system = ActorSystem.create(NETTY_ACTOR_SYSTEM); + ActorSystem system = ActorSystem.create(NETTY_ACTOR_SYSTEM, ConfigFactory.load(CONFIG_NAME)); springExtension.initialize(applicationContext); return system; } + /** + * 处理retry调度 + * + * @return {@link ActorSystem} 顶级actor + */ + @Bean("retryActorSystem") + public ActorSystem retryActorSystem() { + ActorSystem system = ActorSystem.create(RETRY_ACTOR_SYSTEM, ConfigFactory.load(CONFIG_NAME)); + springExtension.initialize(applicationContext); + return system; + } + + /** * 处理job调度 * @@ -95,8 +72,9 @@ public class AkkaConfiguration { */ @Bean("jobActorSystem") public ActorSystem jobActorSystem() { - ActorSystem system = ActorSystem.create(JOB_ACTOR_SYSTEM); + ActorSystem system = ActorSystem.create(JOB_ACTOR_SYSTEM, ConfigFactory.load(CONFIG_NAME)); springExtension.initialize(applicationContext); return system; } + } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTask.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTask.java index 4e173ee5..0537c9ca 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTask.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTask.java @@ -19,11 +19,6 @@ public class JobPartitionTask extends PartitionTask { */ private String groupName; - /** - * 名称 - */ - private String jobName; - /** * 下次触发时间 */ diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTaskPrepareDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTaskPrepareDTO.java index 92fd455f..7d25d4de 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTaskPrepareDTO.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTaskPrepareDTO.java @@ -19,11 +19,6 @@ public class JobTaskPrepareDTO { */ private String groupName; - /** - * 名称 - */ - private String jobName; - /** * 下次触发时间 */ diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java index 85eb02bc..ab372854 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java @@ -6,6 +6,7 @@ import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.common.WaitStrategy; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; @@ -38,6 +39,7 @@ import org.springframework.transaction.support.TransactionSynchronizationManager import org.springframework.transaction.support.TransactionTemplate; import org.springframework.util.CollectionUtils; +import java.time.LocalDateTime; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -61,6 +63,7 @@ public class JobExecutorActor extends AbstractActor { public Receive createReceive() { return receiveBuilder().match(TaskExecuteDTO.class, taskExecute -> { try { + log.info("准备执行任务. [{}] [{}]", LocalDateTime.now(), JsonUtil.toJsonString(taskExecute)); transactionTemplate.execute(new TransactionCallbackWithoutResult() { @Override protected void doInTransactionWithoutResult(final TransactionStatus status) { @@ -112,6 +115,7 @@ public class JobExecutorActor extends AbstractActor { context.setJobId(job.getId()); jobExecutor.execute(context); } finally { + log.info("准备执行任务完成.[{}]", JsonUtil.toJsonString(taskExecute)); TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { @Override public void afterCompletion(int status) { diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java index b9b455c0..1aa83dbe 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java @@ -4,6 +4,7 @@ import akka.actor.AbstractActor; import akka.actor.ActorRef; import cn.hutool.core.lang.Assert; import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum; +import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; @@ -49,41 +50,46 @@ public class JobExecutorResultActor extends AbstractActor { public Receive createReceive() { return receiveBuilder().match(JobExecutorResultDTO.class, result -> { log.info("更新任务状态. 参数:[{}]", JsonUtil.toJsonString(result)); + try { + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + @Override + protected void doInTransactionWithoutResult(final TransactionStatus status) { + JobTask jobTask = new JobTask(); + jobTask.setTaskStatus(result.getTaskStatus()); + if (Objects.nonNull(result.getResult())) { + jobTask.setResultMessage(JsonUtil.toJsonString(result.getResult())); + } - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - @Override - protected void doInTransactionWithoutResult(final TransactionStatus status) { - JobTask jobTask = new JobTask(); - jobTask.setTaskStatus(result.getTaskStatus()); - if (Objects.nonNull(result.getResult())) { - jobTask.setResultMessage(JsonUtil.toJsonString(result.getResult())); - } + Assert.isTrue(1 == jobTaskMapper.update(jobTask, + new LambdaUpdateWrapper().eq(JobTask::getId, result.getTaskId())), + ()-> new EasyRetryServerException("更新任务实例失败")); - Assert.isTrue(1 == jobTaskMapper.update(jobTask, - new LambdaUpdateWrapper().eq(JobTask::getId, result.getTaskId())), - ()-> new EasyRetryServerException("更新任务实例失败")); - - // 更新批次上的状态 - boolean complete = jobTaskBatchHandler.complete(result.getTaskBatchId(), result.getJobOperationReason()); - if (complete) { - // 尝试停止任务 - // 若是集群任务则客户端会主动关闭 - if (result.getTaskType() != TaskTypeEnum.CLUSTER.getType()) { - JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(result.getTaskType()); - TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(result); - stopJobContext.setNeedUpdateTaskStatus(Boolean.FALSE); - stopJobContext.setForceStop(Boolean.TRUE); - instanceInterrupt.stop(stopJobContext); + // 更新批次上的状态 + boolean complete = jobTaskBatchHandler.complete(result.getTaskBatchId(), result.getJobOperationReason()); + if (complete) { + // 尝试停止任务 + // 若是集群任务则客户端会主动关闭 + if (result.getTaskType() != TaskTypeEnum.CLUSTER.getType()) { + JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(result.getTaskType()); + TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(result); + stopJobContext.setNeedUpdateTaskStatus(Boolean.FALSE); + stopJobContext.setForceStop(Boolean.TRUE); + instanceInterrupt.stop(stopJobContext); + } } } - } - }); + }); - JobLogDTO jobLogDTO = JobTaskConverter.INSTANCE.toJobLogDTO(result); - jobLogDTO.setMessage(result.getMessage()); - jobLogDTO.setTaskId(result.getTaskId()); - ActorRef actorRef = ActorGenerator.jobLogActor(); - actorRef.tell(jobLogDTO, actorRef); + JobLogDTO jobLogDTO = JobTaskConverter.INSTANCE.toJobLogDTO(result); + jobLogDTO.setMessage(result.getMessage()); + jobLogDTO.setTaskId(result.getTaskId()); + ActorRef actorRef = ActorGenerator.jobLogActor(); + actorRef.tell(jobLogDTO, actorRef); + } catch (Exception e) { + LogUtils.error(log, " job executor result exception. [{}]", result, e); + } finally { + getContext().stop(getSelf()); + } }).build(); } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobTaskPrepareActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobTaskPrepareActor.java index 03878f45..508b9ca3 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobTaskPrepareActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobTaskPrepareActor.java @@ -51,7 +51,8 @@ public class JobTaskPrepareActor extends AbstractActor { private void doPrepare(JobTaskPrepareDTO prepare) { - List notCompleteJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper() + List notCompleteJobTaskBatchList = jobTaskBatchMapper + .selectList(new LambdaQueryWrapper() .eq(JobTaskBatch::getJobId, prepare.getJobId()) .in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE)); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java index 8bb4cd2e..e189f797 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java @@ -29,10 +29,8 @@ import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Objects; +import java.time.LocalDateTime; +import java.util.*; /** @@ -72,7 +70,7 @@ public class ScanJobTaskActor extends AbstractActor { } long total = PartitionTaskUtils.process(startId -> listAvailableJobs(startId, scanTask), - this::processJobPartitionTasks, 0); + this::processJobPartitionTasks, 0); log.info("job scan end. total:[{}]", total); } @@ -81,8 +79,9 @@ public class ScanJobTaskActor extends AbstractActor { List waitUpdateJobs = new ArrayList<>(); List waitExecJobs = new ArrayList<>(); + long now = DateUtils.toNowMilli(); for (PartitionTask partitionTask : partitionTasks) { - processJob((JobPartitionTask) partitionTask, waitUpdateJobs, waitExecJobs); + processJob((JobPartitionTask) partitionTask, waitUpdateJobs, waitExecJobs, now); } // 批量更新 @@ -96,9 +95,8 @@ public class ScanJobTaskActor extends AbstractActor { } private void processJob(JobPartitionTask partitionTask, final List waitUpdateJobs, - final List waitExecJobs) { + final List waitExecJobs, long now) { CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName()); - JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(partitionTask); Job job = new Job(); job.setId(partitionTask.getId()); @@ -107,14 +105,13 @@ public class ScanJobTaskActor extends AbstractActor { Long nextTriggerAt = ResidentTaskCache.get(partitionTask.getId()); if (needCalculateNextTriggerTime(partitionTask)) { // 更新下次触发时间 - nextTriggerAt = calculateNextTriggerTime(partitionTask); + nextTriggerAt = calculateNextTriggerTime(partitionTask, now); } else { // 若常驻任务的缓存时间为空则触发一次任务调度,说明常驻任务长时间未更新或者是系统刚刚启动 triggerTask = Objects.isNull(nextTriggerAt); // 若出现常驻任务时间为null或者常驻任务的内存时间长期未更新, 刷新为now - long now = System.currentTimeMillis(); if (Objects.isNull(nextTriggerAt) - || (nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) { + || (nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) { nextTriggerAt = now; } } @@ -124,7 +121,7 @@ public class ScanJobTaskActor extends AbstractActor { waitUpdateJobs.add(job); if (triggerTask) { - waitExecJobs.add(jobTaskPrepare); + waitExecJobs.add(JobTaskConverter.INSTANCE.toJobTaskPrepare(partitionTask)); } } @@ -136,12 +133,12 @@ public class ScanJobTaskActor extends AbstractActor { return !Objects.equals(StatusEnum.YES.getStatus(), partitionTask.getResident()); } - private Long calculateNextTriggerTime(JobPartitionTask partitionTask) { + private Long calculateNextTriggerTime(JobPartitionTask partitionTask, long now) { - long now = System.currentTimeMillis(); long nextTriggerAt = partitionTask.getNextTriggerAt(); if ((nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) { nextTriggerAt = now; + partitionTask.setNextTriggerAt(nextTriggerAt); } // 更新下次触发时间 @@ -158,13 +155,17 @@ public class ScanJobTaskActor extends AbstractActor { return Collections.emptyList(); } - List jobs = jobMapper.selectPage(new PageDTO(0, systemProperties.getJobPullPageSize()), - new LambdaQueryWrapper() - .eq(Job::getJobStatus, StatusEnum.YES.getStatus()) - .eq(Job::getDeleted, StatusEnum.NO.getStatus()) - .in(Job::getBucketIndex, scanTask.getBuckets()) - .le(Job::getNextTriggerAt, System.currentTimeMillis() + SystemConstants.SCHEDULE_PERIOD * 1000) - .ge(Job::getId, startId) + List jobs = jobMapper.selectPage(new PageDTO<>(0, systemProperties.getJobPullPageSize()), + new LambdaQueryWrapper() + .select(Job::getGroupName, Job::getNextTriggerAt, Job::getBlockStrategy, Job::getTriggerType, + Job::getTriggerInterval, Job::getExecutorTimeout, Job::getTaskType, Job::getResident, + Job::getId) + .eq(Job::getJobStatus, StatusEnum.YES.getStatus()) + .eq(Job::getDeleted, StatusEnum.NO.getStatus()) + .in(Job::getBucketIndex, scanTask.getBuckets()) + .le(Job::getNextTriggerAt, DateUtils.toNowMilli() + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) + .ge(Job::getId, startId) + .orderByAsc(Job::getId) ).getRecords(); return JobTaskConverter.INSTANCE.toJobPartitionTasks(jobs); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/RealJobExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/RequestClientActor.java similarity index 99% rename from easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/RealJobExecutorActor.java rename to easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/RequestClientActor.java index eed60e8b..b927364d 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/RealJobExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/RequestClientActor.java @@ -41,7 +41,7 @@ import java.util.Objects; @Component(ActorGenerator.REAL_JOB_EXECUTOR_ACTOR) @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) @Slf4j -public class RealJobExecutorActor extends AbstractActor { +public class RequestClientActor extends AbstractActor { @Autowired private JobTaskMapper jobTaskMapper; diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/result/FinishActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/result/FinishActor.java index b4ffe947..5dc787e6 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/result/FinishActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/result/FinishActor.java @@ -36,8 +36,6 @@ import java.time.LocalDateTime; @Slf4j public class FinishActor extends AbstractActor { - public static final String BEAN_NAME = "FinishActor"; - @Autowired private AccessTemplate accessTemplate; @Autowired @@ -57,7 +55,6 @@ public class FinishActor extends AbstractActor { @Override protected void doInTransactionWithoutResult(TransactionStatus status) { - retryTask.setUpdateDt(LocalDateTime.now()); Assert.isTrue(1 == accessTemplate.getRetryTaskAccess() .updateById(retryTask.getGroupName(), retryTask), diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java index e85d23fc..b02335a7 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java @@ -118,7 +118,7 @@ public abstract class AbstractScanGroup extends AbstractActor { }, lastId); log.warn(this.getClass().getName() + " retry scan end. groupName:[{}] startId:[{}] preCostTime:[{}] total:[{}] realPullCount:[{}]", - groupName, lastId, total, preCostTime().get(), count); + groupName, lastId, preCostTime().get(), total, count.get()); } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/handler/CallbackRetryTaskHandler.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/handler/CallbackRetryTaskHandler.java index 50b233ab..f061f143 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/handler/CallbackRetryTaskHandler.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/handler/CallbackRetryTaskHandler.java @@ -3,6 +3,7 @@ package com.aizuda.easy.retry.server.retry.task.support.handler; import cn.hutool.core.lang.Assert; import cn.hutool.core.util.StrUtil; import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; +import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.common.WaitStrategy; import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; @@ -17,9 +18,11 @@ import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog; +import lombok.extern.slf4j.Slf4j; import org.slf4j.helpers.FormattingTuple; import org.slf4j.helpers.MessageFormatter; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @@ -36,6 +39,7 @@ import java.util.concurrent.TimeUnit; * @since 1.5.0 */ @Component +@Slf4j public class CallbackRetryTaskHandler { private static final String CALLBACK_UNIQUE_ID_RULE = "{}_{}"; @@ -75,9 +79,15 @@ public class CallbackRetryTaskHandler { callbackRetryTask.setNextTriggerAt(DateUtils.toLocalDateTime(waitStrategy.computeTriggerTime(waitStrategyContext))); - Assert.isTrue(1 == accessTemplate.getRetryTaskAccess() - .insert(callbackRetryTask.getGroupName(), callbackRetryTask), - () -> new EasyRetryServerException("failed to report data")); + try { + Assert.isTrue(1 == accessTemplate.getRetryTaskAccess() + .insert(callbackRetryTask.getGroupName(), callbackRetryTask), + () -> new EasyRetryServerException("failed to report data")); + } catch (DuplicateKeyException e) { + log.warn("回调数据重复新增. [{}]", JsonUtil.toJsonString(retryTask)); + return; + } + // 初始化回调日志 RetryTaskLog retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTask(callbackRetryTask); diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/CallbackTimerTask.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/CallbackTimerTask.java index 2ded2b5a..d7408cea 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/CallbackTimerTask.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/CallbackTimerTask.java @@ -13,6 +13,7 @@ import io.netty.util.TimerTask; import lombok.extern.slf4j.Slf4j; import java.time.LocalDateTime; +import java.util.Objects; /** * @author: www.byteblogs.com @@ -38,6 +39,9 @@ public class CallbackTimerTask extends AbstractTimerTask { .eq(RetryTask::getGroupName, context.getGroupName()) .eq(RetryTask::getUniqueId, context.getUniqueId()) .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())); + if (Objects.isNull(retryTask)) { + return; + } TaskExecutor taskExecutor = TaskActuatorFactory.getTaskActuator(context.getScene()); taskExecutor.actuator(retryTask); } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/RetryTimerTask.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/RetryTimerTask.java index 630ef2d0..56e240e5 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/RetryTimerTask.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/RetryTimerTask.java @@ -13,6 +13,8 @@ import io.netty.util.Timeout; import lombok.Data; import lombok.extern.slf4j.Slf4j; +import java.util.Objects; + /** * @author: www.byteblogs.com * @date : 2023-09-22 17:09 @@ -37,6 +39,9 @@ public class RetryTimerTask extends AbstractTimerTask { .eq(RetryTask::getGroupName, context.getGroupName()) .eq(RetryTask::getUniqueId, context.getUniqueId()) .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())); + if (Objects.isNull(retryTask)) { + return; + } TaskExecutor taskExecutor = TaskActuatorFactory.getTaskActuator(context.getScene()); taskExecutor.actuator(retryTask); } diff --git a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java index 3969125d..619a2e26 100644 --- a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java +++ b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java @@ -67,6 +67,14 @@ public class ConsumerBucketActor extends AbstractActor { return; } + if (SystemModeEnum.isJob(systemProperties.getMode())) { + // 扫描回调数据 + ScanTask scanTask = new ScanTask(); + scanTask.setBuckets(consumerBucket.getBuckets()); + ActorRef scanJobActorRef = cacheActorRef("DEFAULT_JOB_KEY", TaskTypeEnum.JOB); + scanJobActorRef.tell(scanTask, scanJobActorRef); + } + if (SystemModeEnum.isRetry(systemProperties.getMode())) { List groupConfigs = null; try { @@ -93,14 +101,6 @@ public class ConsumerBucketActor extends AbstractActor { } } - if (SystemModeEnum.isJob(systemProperties.getMode())) { - // 扫描回调数据 - ScanTask scanTask = new ScanTask(); - scanTask.setBuckets(consumerBucket.getBuckets()); - ActorRef scanJobActorRef = cacheActorRef("DEFAULT_JOB_KEY", TaskTypeEnum.JOB); - scanJobActorRef.tell(scanTask, scanJobActorRef); - } - } /** diff --git a/easy-retry-server/easy-retry-server-starter/src/main/resources/easyretry.conf b/easy-retry-server/easy-retry-server-starter/src/main/resources/easyretry.conf new file mode 100644 index 00000000..b8cefac7 --- /dev/null +++ b/easy-retry-server/easy-retry-server-starter/src/main/resources/easyretry.conf @@ -0,0 +1,103 @@ +akka { + actor { + common-log-dispatcher { + type = "Dispatcher" + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-min = 8 + core-pool-size-factor = 2.0 + core-pool-size-max = 64 + } + throughput = 10 + } + + common-scan-task-dispatcher { + type = "Dispatcher" + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-min = 8 + core-pool-size-factor = 2.0 + core-pool-size-max = 64 + } + throughput = 10 + } + + netty-receive-request-dispatcher { + type = "Dispatcher" + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-min = 16 + core-pool-size-factor = 2.0 + core-pool-size-max = 64 + } + throughput = 10 + } + + retry-task-executor-dispatcher { + type = "Dispatcher" + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-min = 32 + core-pool-size-factor = 2.0 + core-pool-size-max = 64 + } + throughput = 10 + } + + retry-task-executor-result-dispatcher { + type = "Dispatcher" + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-min = 8 + core-pool-size-factor = 2.0 + core-pool-size-max = 64 + } + throughput = 10 + } + + job-task-prepare-dispatcher { + type = "Dispatcher" + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-min = 32 + core-pool-size-factor = 2.0 + core-pool-size-max = 64 + } + throughput = 10 + } + + job-task-executor-dispatcher { + type = "Dispatcher" + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-min = 32 + core-pool-size-factor = 2.0 + core-pool-size-max = 64 + } + throughput = 10 + } + + job-task-executor-call-client-dispatcher { + type = "Dispatcher" + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-min = 32 + core-pool-size-factor = 2.0 + core-pool-size-max = 64 + } + throughput = 10 + } + + job-task-executor-result-dispatcher { + type = "Dispatcher" + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-min = 8 + core-pool-size-factor = 2.0 + core-pool-size-max = 64 + } + throughput = 10 + } + + } +}