From 012bfd063448dcd1bf30a97c13761e4723a08da1 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Sat, 23 Dec 2023 11:17:35 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.6.0=201.=20=E5=90=8E=E7=AB=AF?= =?UTF-8?q?=E6=B5=81=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/sql/easy_retry_mysql.sql | 127 +++++++++--------- .../retry/client/job/core/dto/JobContext.java | 4 + .../executor/JobExecutorFutureCallback.java | 5 +- .../model/request/DispatchJobRequest.java | 9 +- .../request/DispatchJobResultRequest.java | 4 + .../common/core/constant/SystemConstants.java | 5 + .../persistence/po/JobTaskBatch.java | 10 ++ .../persistence/po/WorkflowTaskBatch.java | 11 +- .../server/common/akka/ActorGenerator.java | 24 ++++ .../common/enums/JobTriggerTypeEnum.java | 4 +- .../retry/server/common/util/GraphUtils.java | 41 ++++++ .../job/task/dto/JobExecutorResultDTO.java | 8 +- .../job/task/dto/JobTaskPrepareDTO.java | 11 ++ .../task/dto/WorkflowNodeTaskExecuteDTO.java | 29 ++++ .../job/task/dto/WorkflowTaskPrepareDTO.java | 35 +++++ .../job/task/dto/WorkflowTimerTaskDTO.java | 21 +++ .../task/support/WorkflowPrePareHandler.java | 15 +++ .../task/support/WorkflowTaskConverter.java | 12 +- .../callback/ClientCallbackContext.java | 2 + .../support/dispatch/JobExecutorActor.java | 59 ++++---- .../dispatch/JobExecutorResultActor.java | 2 +- .../support/dispatch/JobTaskPrepareActor.java | 2 + .../dispatch/ScanWorkflowTaskActor.java | 2 +- .../dispatch/WorkflowExecutorActor.java | 110 +++++++++++++++ .../support/event/JobTaskFailAlarmEvent.java | 11 +- .../batch/JobTaskBatchGenerator.java | 1 + .../batch/JobTaskBatchGeneratorContext.java | 5 + .../batch/WorkflowBatchGenerator.java | 48 +++++++ .../WorkflowTaskBatchGeneratorContext.java | 51 +++++++ .../support/handler/JobTaskBatchHandler.java | 32 ++++- .../prepare/RunningJobPrepareHandler.java | 2 +- .../AbstractWorkflowPrePareHandler.java | 21 +++ .../TerminalWorkflowPrepareHandler.java | 32 +++++ .../task/support/timer/WorkflowTimerTask.java | 72 ++++++++++ .../starter/dispatch/ConsumerBucketActor.java | 2 +- .../web/service/impl/WorkflowServiceImpl.java | 35 +++-- 36 files changed, 745 insertions(+), 119 deletions(-) create mode 100644 easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/GraphUtils.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowNodeTaskExecuteDTO.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTimerTaskDTO.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowPrePareHandler.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowBatchGenerator.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowTaskBatchGeneratorContext.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/AbstractWorkflowPrePareHandler.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/TerminalWorkflowPrepareHandler.java create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/WorkflowTimerTask.java diff --git a/doc/sql/easy_retry_mysql.sql b/doc/sql/easy_retry_mysql.sql index 940ef7d85..6e96748a2 100644 --- a/doc/sql/easy_retry_mysql.sql +++ b/doc/sql/easy_retry_mysql.sql @@ -356,22 +356,24 @@ CREATE TABLE `job_task` CREATE TABLE `job_task_batch` ( - `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', - `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id', - `group_name` varchar(64) NOT NULL COMMENT '组名称', - `job_id` bigint(20) NOT NULL COMMENT '任务id', - `task_batch_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务批次状态 0、失败 1、成功', - `operation_reason` tinyint(4) NOT NULL DEFAULT '0' COMMENT '操作原因', - `execution_at` bigint(13) NOT NULL DEFAULT '0' COMMENT '任务执行时间', - `parent_id` varchar(64) NOT NULL DEFAULT '' COMMENT '父节点', - `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', - `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', - `deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除', - `ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段', + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', + `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id', + `group_name` varchar(64) NOT NULL COMMENT '组名称', + `job_id` bigint(20) NOT NULL COMMENT '任务id', + `workflow_node_id` bigint(20) NOT NULL COMMENT '工作流节点id', + `workflow_task_batch_id` bigint(20) NOT NULL COMMENT '工作流任务批次id', + `task_batch_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务批次状态 0、失败 1、成功', + `operation_reason` tinyint(4) NOT NULL DEFAULT '0' COMMENT '操作原因', + `execution_at` bigint(13) NOT NULL DEFAULT '0' COMMENT '任务执行时间', + `parent_id` varchar(64) NOT NULL DEFAULT '' COMMENT '父节点', + `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + `deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除', + `ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段', PRIMARY KEY (`id`), - KEY `idx_job_id_task_batch_status` (`job_id`, `task_batch_status`), - KEY `idx_create_dt` (`create_dt`), - KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`) + KEY `idx_job_id_task_batch_status` (`job_id`, `task_batch_status`), + KEY `idx_create_dt` (`create_dt`), + KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`) ) ENGINE = InnoDB AUTO_INCREMENT = 0 DEFAULT CHARSET = utf8mb4 COMMENT ='任务批次'; @@ -442,25 +444,25 @@ CREATE TABLE `retry_summary` CREATE TABLE `workflow` ( - `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', - `workflow_name` varchar(64) NOT NULL COMMENT '工作流名称', - `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id', - `group_name` varchar(64) NOT NULL COMMENT '组名称', - `workflow_status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '工作流状态 0、关闭、1、开启', - `trigger_type` tinyint(4) NOT NULL COMMENT '触发类型 1.CRON 表达式 2. 固定时间', - `trigger_interval` varchar(255) NOT NULL COMMENT '间隔时长', - `next_trigger_at` bigint(13) NOT NULL COMMENT '下次触发时间', - `executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒', - `description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述', - `flow_info` text DEFAULT NULL COMMENT '流程信息', - `bucket_index` int(11) NOT NULL DEFAULT '0' COMMENT 'bucket', - `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', - `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', - `deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除', - `ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段', + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', + `workflow_name` varchar(64) NOT NULL COMMENT '工作流名称', + `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id', + `group_name` varchar(64) NOT NULL COMMENT '组名称', + `workflow_status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '工作流状态 0、关闭、1、开启', + `trigger_type` tinyint(4) NOT NULL COMMENT '触发类型 1.CRON 表达式 2. 固定时间', + `trigger_interval` varchar(255) NOT NULL COMMENT '间隔时长', + `next_trigger_at` bigint(13) NOT NULL COMMENT '下次触发时间', + `executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒', + `description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述', + `flow_info` text DEFAULT NULL COMMENT '流程信息', + `bucket_index` int(11) NOT NULL DEFAULT '0' COMMENT 'bucket', + `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + `deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除', + `ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段', PRIMARY KEY (`id`), - KEY `idx_create_dt` (`create_dt`), - KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`) + KEY `idx_create_dt` (`create_dt`), + KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`) ) ENGINE = InnoDB AUTO_INCREMENT = 0 DEFAULT CHARSET = utf8mb4 COMMENT ='工作流'; @@ -468,23 +470,23 @@ CREATE TABLE `workflow` CREATE TABLE `workflow_node` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', - `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id', - `node_name` varchar(64) NOT NULL COMMENT '节点名称', - `group_name` varchar(64) NOT NULL COMMENT '组名称', - `job_id` bigint(20) NOT NULL DEFAULT -1 COMMENT '任务信息id', - `workflow_id` bigint(20) NOT NULL COMMENT '工作流ID', - `node_type` tinyint(4) NOT NULL DEFAULT 1 COMMENT '1、任务节点 2、条件节点', - `expression_type` tinyint(4) NOT NULL DEFAULT 0 COMMENT '1、SpEl、2、Aviator 3、QL', - `fail_strategy` tinyint(4) NOT NULL DEFAULT 0 COMMENT '失败策略 1、跳过 2、阻塞', - `workflow_node_status` tinyint(4) NOT NULL DEFAULT 1 COMMENT '工作流节点状态 0、关闭、1、开启', - `node_expression` text DEFAULT NULL COMMENT '节点表达式', - `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', - `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', - `deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除', - `ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段', + `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id', + `node_name` varchar(64) NOT NULL COMMENT '节点名称', + `group_name` varchar(64) NOT NULL COMMENT '组名称', + `job_id` bigint(20) NOT NULL DEFAULT -1 COMMENT '任务信息id', + `workflow_id` bigint(20) NOT NULL COMMENT '工作流ID', + `node_type` tinyint(4) NOT NULL DEFAULT 1 COMMENT '1、任务节点 2、条件节点', + `expression_type` tinyint(4) NOT NULL DEFAULT 0 COMMENT '1、SpEl、2、Aviator 3、QL', + `fail_strategy` tinyint(4) NOT NULL DEFAULT 0 COMMENT '失败策略 1、跳过 2、阻塞', + `workflow_node_status` tinyint(4) NOT NULL DEFAULT 1 COMMENT '工作流节点状态 0、关闭、1、开启', + `node_expression` text DEFAULT NULL COMMENT '节点表达式', + `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + `deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除', + `ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段', PRIMARY KEY (`id`), - KEY `idx_create_dt` (`create_dt`), - KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`) + KEY `idx_create_dt` (`create_dt`), + KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`) ) ENGINE = InnoDB AUTO_INCREMENT = 0 DEFAULT CHARSET = utf8mb4 COMMENT ='工作流节点'; @@ -492,20 +494,21 @@ CREATE TABLE `workflow_node` CREATE TABLE `workflow_task_batch` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', - `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id', - `group_name` varchar(64) NOT NULL COMMENT '组名称', - `workflow_id` bigint(20) NOT NULL COMMENT '工作流任务id', - `task_batch_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务批次状态 0、失败 1、成功', - `operation_reason` tinyint(4) NOT NULL DEFAULT '0' COMMENT '操作原因', - `execution_at` bigint(13) NOT NULL DEFAULT '0' COMMENT '任务执行时间', - `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', - `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', - `deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除', - `ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段', + `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id', + `group_name` varchar(64) NOT NULL COMMENT '组名称', + `workflow_id` bigint(20) NOT NULL COMMENT '工作流任务id', + `task_batch_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务批次状态 0、失败 1、成功', + `operation_reason` tinyint(4) NOT NULL DEFAULT '0' COMMENT '操作原因', + `flow_info` text DEFAULT NULL COMMENT '流程信息', + `execution_at` bigint(13) NOT NULL DEFAULT '0' COMMENT '任务执行时间', + `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + `deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除', + `ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段', PRIMARY KEY (`id`), - KEY `idx_job_id_task_batch_status` (`workflow_id`, `task_batch_status`), - KEY `idx_create_dt` (`create_dt`), - KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`) + KEY `idx_job_id_task_batch_status` (`workflow_id`, `task_batch_status`), + KEY `idx_create_dt` (`create_dt`), + KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`) ) ENGINE = InnoDB AUTO_INCREMENT = 0 DEFAULT CHARSET = utf8mb4 COMMENT ='工作流批次'; diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/dto/JobContext.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/dto/JobContext.java index 93e27c47b..0d3071b42 100644 --- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/dto/JobContext.java +++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/dto/JobContext.java @@ -13,6 +13,10 @@ public class JobContext { private Long taskBatchId; + private Long workflowBatchId; + + private Long workflowNodeId; + private Long taskId; private String groupName; diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/JobExecutorFutureCallback.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/JobExecutorFutureCallback.java index 1e6cb862b..bf19b6156 100644 --- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/JobExecutorFutureCallback.java +++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/JobExecutorFutureCallback.java @@ -30,7 +30,7 @@ public class JobExecutorFutureCallback implements FutureCallback .client(JobNettyClient.class) .callback(nettyResult -> LogUtils.info(log, "Data report successfully requestId:[{}]", nettyResult.getRequestId())).build(); - private JobContext jobContext; + private final JobContext jobContext; public JobExecutorFutureCallback(final JobContext jobContext) { this.jobContext = jobContext; @@ -97,6 +97,9 @@ public class JobExecutorFutureCallback implements FutureCallback dispatchJobRequest.setGroupName(jobContext.getGroupName()); dispatchJobRequest.setJobId(jobContext.getJobId()); dispatchJobRequest.setTaskId(jobContext.getTaskId()); + dispatchJobRequest.setWorkflowBatchId(jobContext.getWorkflowBatchId()); + dispatchJobRequest.setTaskBatchId(jobContext.getTaskBatchId()); + dispatchJobRequest.setTaskId(jobContext.getTaskId()); dispatchJobRequest.setTaskType(jobContext.getTaskType()); dispatchJobRequest.setExecuteResult(executeResult); dispatchJobRequest.setTaskStatus(status); diff --git a/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/request/DispatchJobRequest.java b/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/request/DispatchJobRequest.java index 12d433583..3763ffb72 100644 --- a/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/request/DispatchJobRequest.java +++ b/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/request/DispatchJobRequest.java @@ -36,13 +36,18 @@ public class DispatchJobRequest { @NotBlank(message = "executorInfo 不能为空") private String executorInfo; + @NotBlank(message = "executorTimeout 不能为空") + private Integer executorTimeout; + private String argsStr; private Integer shardingTotal; private Integer shardingIndex; - @NotBlank(message = "executorTimeout 不能为空") - private Integer executorTimeout; + private Long workflowBatchId; + + private Long workflowNodeId; + } diff --git a/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/request/DispatchJobResultRequest.java b/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/request/DispatchJobResultRequest.java index 5c16383b0..e895fd0d2 100644 --- a/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/request/DispatchJobResultRequest.java +++ b/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/request/DispatchJobResultRequest.java @@ -14,6 +14,10 @@ public class DispatchJobResultRequest { private Long taskBatchId; + private Long workflowBatchId; + + private Long workflowNodeId; + private Long taskId; /** diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java index 26a7467f1..89bfdd41d 100644 --- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java @@ -96,4 +96,9 @@ public interface SystemConstants { * AT 所有人 */ String AT_ALL = "all"; + + /** + * 根节点 + */ + Long ROOT = -1L; } diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/JobTaskBatch.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/JobTaskBatch.java index e5f0892e7..8d157ca7d 100644 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/JobTaskBatch.java +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/JobTaskBatch.java @@ -48,6 +48,16 @@ public class JobTaskBatch implements Serializable { */ private Long jobId; + /** + * 工作流批次id + */ + private Long workflowTaskBatchId; + + /** + * 工作流节点id + */ + private Long workflowNodeId; + /** * 任务批次状态 */ diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/WorkflowTaskBatch.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/WorkflowTaskBatch.java index 3c673f530..012c1d94c 100644 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/WorkflowTaskBatch.java +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/WorkflowTaskBatch.java @@ -47,18 +47,23 @@ public class WorkflowTaskBatch implements Serializable { /** * 任务批次状态 0、失败 1、成功 */ - private Byte taskBatchStatus; + private Integer taskBatchStatus; /** * 操作原因 */ - private Byte operationReason; + private Integer operationReason; /** * 任务执行时间 */ private Long executionAt; + /** + * 流程信息 + */ + private String flowInfo; + /** * 创建时间 */ @@ -72,7 +77,7 @@ public class WorkflowTaskBatch implements Serializable { /** * 逻辑删除 1、删除 */ - private Byte deleted; + private Integer deleted; /** * 扩展字段 diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java index a8269e320..4c74fa7aa 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java @@ -44,6 +44,7 @@ public class ActorGenerator { public static final String JOB_TASK_PREPARE_ACTOR = "JobTaskPrepareActor"; public static final String WORKFLOW_TASK_PREPARE_ACTOR = "WorkflowTaskPrepareActor"; public static final String JOB_EXECUTOR_ACTOR = "JobExecutorActor"; + public static final String WORKFLOW_EXECUTOR_ACTOR = "WorkflowExecutorActor"; public static final String JOB_EXECUTOR_RESULT_ACTOR = "JobExecutorResultActor"; public static final String JOB_LOG_ACTOR = "JobLogActor"; public static final String REAL_JOB_EXECUTOR_ACTOR = "RealJobExecutorActor"; @@ -195,6 +196,16 @@ public class ActorGenerator { .withDispatcher(JOB_TASK_DISPATCHER)); } + /** + * Job调度准备阶段actor + * + * @return actor 引用 + */ + public static ActorRef workflowTaskPrepareActor() { + return getJobActorSystem().actorOf(getSpringExtension().props(WORKFLOW_TASK_PREPARE_ACTOR) + .withDispatcher(JOB_TASK_DISPATCHER)); + } + /** * Job任务执行阶段actor * @@ -208,6 +219,19 @@ public class ActorGenerator { ); } + /** + * Job任务执行阶段actor + * + * @return actor 引用 + */ + public static ActorRef workflowTaskExecutorActor() { + return getJobActorSystem() + .actorOf(getSpringExtension() + .props(WORKFLOW_EXECUTOR_ACTOR) + .withDispatcher(JOB_TASK_EXECUTOR_DISPATCHER) + ); + } + /** * Job任务执行结果actor * diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/JobTriggerTypeEnum.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/JobTriggerTypeEnum.java index 536a5f566..c7f86a99a 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/JobTriggerTypeEnum.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/JobTriggerTypeEnum.java @@ -15,7 +15,9 @@ import lombok.Getter; @AllArgsConstructor public enum JobTriggerTypeEnum { AUTO(1, "自动触发"), - MANUAL(2, "手动触发"); + MANUAL(2, "手动触发"), + WORKFLOW(2, "DAG触发"), + ; private final Integer type; private final String desc; diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/GraphUtils.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/GraphUtils.java new file mode 100644 index 000000000..0b1535c57 --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/GraphUtils.java @@ -0,0 +1,41 @@ +package com.aizuda.easy.retry.server.common.util; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.graph.GraphBuilder; +import com.google.common.graph.MutableGraph; + +import java.io.IOException; +import java.util.Map; + +/** + * @author: xiaowoniu + * @date : 2023-12-22 + * @since : 2.6.0 + */ +public class GraphUtils { + + + // 从JSON反序列化为Guava图 + public static MutableGraph deserializeJsonToGraph(String jsonGraph) throws IOException { + ObjectMapper objectMapper = new ObjectMapper(); + + // 将JSON字符串转换为Map> + Map> adjacencyList = objectMapper.readValue( + jsonGraph, new TypeReference>>() {}); + + // 创建Guava图并添加节点和边 + MutableGraph graph = GraphBuilder.directed().build(); + for (Map.Entry> entry : adjacencyList.entrySet()) { + T node = entry.getKey(); + Iterable successors = entry.getValue(); + + graph.addNode(node); + for (T successor : successors) { + graph.putEdge(node, successor); + } + } + + return graph; + } +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobExecutorResultDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobExecutorResultDTO.java index f85da0122..6c7f42632 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobExecutorResultDTO.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobExecutorResultDTO.java @@ -1,6 +1,5 @@ package com.aizuda.easy.retry.server.job.task.dto; -import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import lombok.Data; /** @@ -15,6 +14,13 @@ public class JobExecutorResultDTO { private Long taskBatchId; + /** + * 工作流任务批次id + */ + private Long workflowTaskBatchId; + + private Long workflowNodeId; + private Long taskId; /** diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTaskPrepareDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTaskPrepareDTO.java index 66531523c..c45f981e5 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTaskPrepareDTO.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTaskPrepareDTO.java @@ -57,4 +57,15 @@ public class JobTaskPrepareDTO { */ private Integer triggerType; + /** + * 工作流任务批次id + */ + private Long workflowTaskBatchId; + + /** + * 工作流节点id + */ + private Long workflowNodeId; + + } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowNodeTaskExecuteDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowNodeTaskExecuteDTO.java new file mode 100644 index 000000000..bde18b22a --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowNodeTaskExecuteDTO.java @@ -0,0 +1,29 @@ +package com.aizuda.easy.retry.server.job.task.dto; + +import lombok.Data; + +/** + * @author: xiaowoniu + * @date : 2023-12-22 + * @since : 2.6.0 + */ +@Data +public class WorkflowNodeTaskExecuteDTO { + + /** + * 工作流id + */ + private Long workflowId; + + /** + * 工作流任务批次id + */ + private Long workflowTaskBatchId; + /** + * 触发类似 1、auto 2、manual + */ + private Integer triggerType; + + private Long parentId; + +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java index 50652c974..313b058e1 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java @@ -17,4 +17,39 @@ public class WorkflowTaskPrepareDTO { */ private Integer triggerType; + /** + * 工作流名称 + */ + private String workflowName; + + /** + * 命名空间id + */ + private String namespaceId; + + /** + * 组名称 + */ + private String groupName; + + /** + * 触发间隔 + */ + private String triggerInterval; + + /** + * 执行超时时间 + */ + private Integer executorTimeout; + + /** + * 工作流状态 0、关闭、1、开启 + */ + private Integer workflowStatus; + + /** + * 流程信息 + */ + private String flowInfo; + } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTimerTaskDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTimerTaskDTO.java new file mode 100644 index 000000000..0be435a94 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTimerTaskDTO.java @@ -0,0 +1,21 @@ +package com.aizuda.easy.retry.server.job.task.dto; + +import lombok.Data; + +/** + * @author www.byteblogs.com + * @date 2023-12-22 + * @since 2.6.0 + */ +@Data +public class WorkflowTimerTaskDTO { + + private Long workflowTaskBatchId; + + private Long workflowId; + + /** + * 触发类似 1、auto 2、manual + */ + private Integer triggerType; +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowPrePareHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowPrePareHandler.java new file mode 100644 index 000000000..41fc68b2d --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowPrePareHandler.java @@ -0,0 +1,15 @@ +package com.aizuda.easy.retry.server.job.task.support; + +import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO; + +/** + * @author www.byteblogs.com + * @date 2023-10-22 09:34:00 + * @since 2.6.0 + */ +public interface WorkflowPrePareHandler { + + boolean matches(Integer status); + + void handler(WorkflowTaskPrepareDTO workflowTaskPrepareDTO); +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java index 1ee0836fc..665e46073 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java @@ -1,10 +1,13 @@ package com.aizuda.easy.retry.server.job.task.support; -import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTaskDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowPartitionTaskDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO; +import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext; import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow; +import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch; import org.mapstruct.Mapper; +import org.mapstruct.Mapping; +import org.mapstruct.Mappings; import org.mapstruct.factory.Mappers; import java.util.List; @@ -20,5 +23,12 @@ public interface WorkflowTaskConverter { List toWorkflowPartitionTaskList(List workflowList); + @Mappings( + @Mapping(source = "id", target = "workflowId") + ) WorkflowTaskPrepareDTO toWorkflowTaskPrepareDTO(WorkflowPartitionTaskDTO workflowPartitionTaskDTO); + + WorkflowTaskBatchGeneratorContext toWorkflowTaskBatchGeneratorContext(WorkflowTaskPrepareDTO workflowTaskPrepareDTO); + + WorkflowTaskBatch toWorkflowTaskBatch(WorkflowTaskBatchGeneratorContext context); } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClientCallbackContext.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClientCallbackContext.java index 42ced064f..9393b1a88 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClientCallbackContext.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClientCallbackContext.java @@ -20,6 +20,8 @@ public class ClientCallbackContext { private Long taskBatchId; + private Long workflowBatchId; + private Long taskId; private String groupName; diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java index 8c288b00e..7586ab7c7 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java @@ -87,7 +87,7 @@ public class JobExecutorActor extends AbstractActor { private void doExecute(final TaskExecuteDTO taskExecute) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper(); + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); // 自动的校验任务必须是开启状态,手动触发无需校验 if (JobTriggerTypeEnum.AUTO.getType().equals(taskExecute.getTriggerType())) { queryWrapper.eq(Job::getJobStatus, StatusEnum.YES.getStatus()); @@ -150,38 +150,39 @@ public class JobExecutorActor extends AbstractActor { } private void doHandlerResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) { - if (Objects.isNull(job) || JobTriggerTypeEnum.MANUAL.getType().equals(taskExecuteDTO.getTriggerType())) { + if (Objects.isNull(job) + || JobTriggerTypeEnum.MANUAL.getType().equals(taskExecuteDTO.getTriggerType()) + || JobTriggerTypeEnum.WORKFLOW.getType().equals(taskExecuteDTO.getTriggerType()) + // 是否是常驻任务 + || Objects.equals(StatusEnum.NO.getStatus(), job.getResident()) + ) { return; } - // 是否是常驻任务 - if (Objects.equals(StatusEnum.YES.getStatus(), job.getResident())) { + JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO(); + jobTimerTaskDTO.setJobId(taskExecuteDTO.getJobId()); + jobTimerTaskDTO.setTaskBatchId(taskExecuteDTO.getTaskBatchId()); + jobTimerTaskDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType()); + ResidentJobTimerTask timerTask = new ResidentJobTimerTask(jobTimerTaskDTO, job); + WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(job.getTriggerType()); - JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO(); - jobTimerTaskDTO.setJobId(taskExecuteDTO.getJobId()); - jobTimerTaskDTO.setTaskBatchId(taskExecuteDTO.getTaskBatchId()); - jobTimerTaskDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType()); - ResidentJobTimerTask timerTask = new ResidentJobTimerTask(jobTimerTaskDTO, job); - WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(job.getTriggerType()); - - Long preTriggerAt = ResidentTaskCache.get(job.getId()); - if (Objects.isNull(preTriggerAt) || preTriggerAt < job.getNextTriggerAt()) { - preTriggerAt = job.getNextTriggerAt(); - } - - WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext(); - waitStrategyContext.setTriggerInterval(job.getTriggerInterval()); - waitStrategyContext.setNextTriggerAt(preTriggerAt); - Long nextTriggerAt = waitStrategy.computeTriggerTime(waitStrategyContext); - - // 获取时间差的毫秒数 - long milliseconds = nextTriggerAt - preTriggerAt; - - log.info("常驻任务监控. 任务时间差:[{}] 取余:[{}]", milliseconds, DateUtils.toNowMilli() % 1000); - job.setNextTriggerAt(nextTriggerAt); - - JobTimerWheel.register(jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - DateUtils.toNowMilli() % 1000, TimeUnit.MILLISECONDS); - ResidentTaskCache.refresh(job.getId(), nextTriggerAt); + Long preTriggerAt = ResidentTaskCache.get(job.getId()); + if (Objects.isNull(preTriggerAt) || preTriggerAt < job.getNextTriggerAt()) { + preTriggerAt = job.getNextTriggerAt(); } + + WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext(); + waitStrategyContext.setTriggerInterval(job.getTriggerInterval()); + waitStrategyContext.setNextTriggerAt(preTriggerAt); + Long nextTriggerAt = waitStrategy.computeTriggerTime(waitStrategyContext); + + // 获取时间差的毫秒数 + long milliseconds = nextTriggerAt - preTriggerAt; + + log.info("常驻任务监控. 任务时间差:[{}] 取余:[{}]", milliseconds, DateUtils.toNowMilli() % 1000); + job.setNextTriggerAt(nextTriggerAt); + + JobTimerWheel.register(jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - DateUtils.toNowMilli() % 1000, TimeUnit.MILLISECONDS); + ResidentTaskCache.refresh(job.getId(), nextTriggerAt); } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java index 1aa83dbe5..ba0dfb2de 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java @@ -65,7 +65,7 @@ public class JobExecutorResultActor extends AbstractActor { ()-> new EasyRetryServerException("更新任务实例失败")); // 更新批次上的状态 - boolean complete = jobTaskBatchHandler.complete(result.getTaskBatchId(), result.getJobOperationReason()); + boolean complete = jobTaskBatchHandler.complete(result.getWorkflowNodeId(), result.getWorkflowTaskBatchId(), result.getTaskBatchId(), result.getJobOperationReason()); if (complete) { // 尝试停止任务 // 若是集群任务则客户端会主动关闭 diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobTaskPrepareActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobTaskPrepareActor.java index 508b9ca38..bd5fa7d38 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobTaskPrepareActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobTaskPrepareActor.java @@ -66,6 +66,8 @@ public class JobTaskPrepareActor extends AbstractActor { for (JobTaskBatch jobTaskBatch : notCompleteJobTaskBatchList) { prepare.setExecutionAt(jobTaskBatch.getExecutionAt()); prepare.setTaskBatchId(jobTaskBatch.getId()); + prepare.setWorkflowTaskBatchId(jobTaskBatch.getWorkflowTaskBatchId()); + prepare.setWorkflowNodeId(jobTaskBatch.getWorkflowNodeId()); prepare.setOnlyTimeoutCheck(onlyTimeoutCheck); for (JobPrePareHandler prePareHandler : prePareHandlers) { if (prePareHandler.matches(jobTaskBatch.getTaskBatchStatus())) { diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java index 9ac2c0407..dd6ee2da0 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java @@ -84,7 +84,7 @@ public class ScanWorkflowTaskActor extends AbstractActor { for (final WorkflowTaskPrepareDTO waitExecTask : waitExecWorkflows) { // 执行预处理阶段 - ActorRef actorRef = ActorGenerator.jobTaskPrepareActor(); + ActorRef actorRef = ActorGenerator.workflowTaskPrepareActor(); waitExecTask.setTriggerType(JobTriggerTypeEnum.AUTO.getType()); actorRef.tell(waitExecTask, actorRef); } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java new file mode 100644 index 000000000..2a3b4cc31 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -0,0 +1,110 @@ +package com.aizuda.easy.retry.server.job.task.support.dispatch; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import cn.hutool.core.lang.Assert; +import com.aizuda.easy.retry.common.core.constant.SystemConstants; +import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; +import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum; +import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.common.util.DateUtils; +import com.aizuda.easy.retry.server.common.util.GraphUtils; +import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; +import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO; +import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO; +import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.Job; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; +import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode; +import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.graph.GraphBuilder; +import com.google.common.graph.MutableGraph; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * @author: xiaowoniu + * @date : 2023-12-22 10:34 + * @since : 2.6.0 + */ +@Component(ActorGenerator.WORKFLOW_EXECUTOR_ACTOR) +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +@Slf4j +@RequiredArgsConstructor +public class WorkflowExecutorActor extends AbstractActor { + private final WorkflowTaskBatchMapper workflowTaskBatchMapper; + private final WorkflowNodeMapper workflowNodeMapper; + private final JobMapper jobMapper; + private final JobTaskBatchMapper jobTaskBatchMapper; + + @Override + public Receive createReceive() { + return receiveBuilder().match(WorkflowNodeTaskExecuteDTO.class, taskExecute -> { + try { + doExecutor(taskExecute); + } catch (Exception e) { + LogUtils.error(log, "workflow executor exception. [{}]", taskExecute, e); + handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(), JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason()); + // TODO 发送通知 + } finally { + getContext().stop(getSelf()); + } + }).build(); + } + + private void doExecutor(WorkflowNodeTaskExecuteDTO taskExecute) throws IOException { + WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectById(taskExecute.getWorkflowTaskBatchId()); + Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("任务不存在")); + + // 获取DAG图 + String flowInfo = workflowTaskBatch.getFlowInfo(); + MutableGraph graph = GraphUtils.deserializeJsonToGraph(flowInfo); + + Set predecessors = graph.predecessors(taskExecute.getParentId()); + List workflowNodes = workflowNodeMapper.selectBatchIds(predecessors); + Set jobIdSet = workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet()); + List jobs = jobMapper.selectBatchIds(jobIdSet); + for (Job job : jobs) { + // 生成任务批次 + JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job); + jobTaskPrepare.setTriggerType(JobTriggerTypeEnum.WORKFLOW.getType()); + jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli()); + // 执行预处理阶段 + ActorRef actorRef = ActorGenerator.jobTaskPrepareActor(); + actorRef.tell(jobTaskPrepare, actorRef); + } + + } + + private void handlerTaskBatch(WorkflowNodeTaskExecuteDTO taskExecute, int taskStatus, int operationReason) { + + WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch(); + jobTaskBatch.setId(taskExecute.getWorkflowTaskBatchId()); + jobTaskBatch.setExecutionAt(DateUtils.toNowMilli()); + jobTaskBatch.setTaskBatchStatus(taskStatus); + jobTaskBatch.setOperationReason(operationReason); + Assert.isTrue(1 == workflowTaskBatchMapper.updateById(jobTaskBatch), + () -> new EasyRetryServerException("更新任务失败")); + + } +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/event/JobTaskFailAlarmEvent.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/event/JobTaskFailAlarmEvent.java index f4f62cfee..df2de9e83 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/event/JobTaskFailAlarmEvent.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/event/JobTaskFailAlarmEvent.java @@ -1,22 +1,23 @@ package com.aizuda.easy.retry.server.job.task.support.event; +import lombok.Getter; import org.springframework.context.ApplicationEvent; /** * job任务失败事件 + * * @author: zuoJunLin * @date : 2023-12-02 21:40 * @since 2.5.0 */ +@Getter public class JobTaskFailAlarmEvent extends ApplicationEvent { - private Long jobTaskBatchId; + + private final Long jobTaskBatchId; public JobTaskFailAlarmEvent(Long jobTaskBatchId) { super(jobTaskBatchId); - this.jobTaskBatchId=jobTaskBatchId; + this.jobTaskBatchId = jobTaskBatchId; } - public Long getJobTaskBatchId() { - return jobTaskBatchId; - } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java index 98eb62d59..c23a1ac42 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java @@ -42,6 +42,7 @@ public class JobTaskBatchGenerator { jobTaskBatch.setGroupName(context.getGroupName()); jobTaskBatch.setCreateDt(LocalDateTime.now()); jobTaskBatch.setNamespaceId(context.getNamespaceId()); + jobTaskBatch.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); // 无执行的节点 if (CollectionUtils.isEmpty(CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()))) { jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus()); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGeneratorContext.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGeneratorContext.java index 1ff1d1989..bdbd0a9d1 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGeneratorContext.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGeneratorContext.java @@ -45,5 +45,10 @@ public class JobTaskBatchGeneratorContext { */ private Integer triggerType; + /** + * 工作流任务批次id + */ + private Long workflowTaskBatchId; + } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowBatchGenerator.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowBatchGenerator.java new file mode 100644 index 000000000..79b2104ba --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowBatchGenerator.java @@ -0,0 +1,48 @@ +package com.aizuda.easy.retry.server.job.task.support.generator.batch; + +import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.easy.retry.server.common.util.DateUtils; +import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO; +import com.aizuda.easy.retry.server.job.task.dto.WorkflowTimerTaskDTO; +import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter; +import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerTask; +import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerWheel; +import com.aizuda.easy.retry.server.job.task.support.timer.WorkflowTimerTask; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.util.concurrent.TimeUnit; + +/** + * @author: xiaowoniu + * @date : 2023-12-22 09:04 + * @since : 2.6.0 + */ +@Component +@RequiredArgsConstructor +@Slf4j +public class WorkflowBatchGenerator { + private final WorkflowTaskBatchMapper workflowTaskBatchMapper; + @Transactional + public void generateJobTaskBatch(WorkflowTaskBatchGeneratorContext context) { + + // 生成任务批次 + WorkflowTaskBatch workflowTaskBatch = WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatch(context); + workflowTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.WAITING.getStatus()); + workflowTaskBatchMapper.insert(workflowTaskBatch); + + // 开始执行工作流 + // 进入时间轮 + long delay = context.getNextTriggerAt() - DateUtils.toNowMilli(); + WorkflowTimerTaskDTO workflowTimerTaskDTO = new WorkflowTimerTaskDTO(); + workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskBatch.getId()); + workflowTimerTaskDTO.setWorkflowId(context.getWorkflowId()); + workflowTimerTaskDTO.setTriggerType(context.getTriggerType()); + JobTimerWheel.register(workflowTaskBatch.getId(), + new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS); + } +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowTaskBatchGeneratorContext.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowTaskBatchGeneratorContext.java new file mode 100644 index 000000000..1dbf4c996 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowTaskBatchGeneratorContext.java @@ -0,0 +1,51 @@ +package com.aizuda.easy.retry.server.job.task.support.generator.batch; + +import lombok.Data; + +/** + * @author www.byteblogs.com + * @date 2023-10-02 13:12:48 + * @since 2.4.0 + */ +@Data +public class WorkflowTaskBatchGeneratorContext { + + private String namespaceId; + + /** + * 组名称 + */ + private String groupName; + + /** + * 工作流id + */ + private Long workflowId; + + /** + * 下次触发时间 + */ + private Long nextTriggerAt; + + /** + * 操作原因 + */ + private Integer operationReason; + + /** + * 任务批次状态 + */ + private Integer taskBatchStatus; + + /** + * 触发类似 1、auto 2、manual + */ + private Integer triggerType; + + /** + * 流程信息 + */ + private String flowInfo; + + +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java index 36297b77b..6b985885b 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java @@ -1,9 +1,14 @@ package com.aizuda.easy.retry.server.job.task.support.handler; +import akka.actor.ActorRef; +import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.context.SpringContext; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; +import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum; +import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.easy.retry.server.job.task.support.event.JobTaskFailAlarmEvent; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; @@ -11,6 +16,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; @@ -25,6 +31,7 @@ import java.util.stream.Collectors; * @date : 2023-10-10 16:50 */ @Component +@Slf4j public class JobTaskBatchHandler { @Autowired @@ -32,7 +39,16 @@ public class JobTaskBatchHandler { @Autowired private JobTaskBatchMapper jobTaskBatchMapper; - public boolean complete(Long taskBatchId, Integer jobOperationReason) { + /** + * TODO 参数待优化 + * + * @param workflowNodeId + * @param workflowTaskBatchId + * @param taskBatchId + * @param jobOperationReason + * @return + */ + public boolean complete(Long workflowNodeId, Long workflowTaskBatchId, Long taskBatchId, Integer jobOperationReason) { List jobTasks = jobTaskMapper.selectList( new LambdaQueryWrapper().select(JobTask::getTaskStatus) @@ -76,6 +92,20 @@ public class JobTaskBatchHandler { jobTaskBatch.setOperationReason(jobOperationReason); } + if (Objects.nonNull(workflowNodeId) && Objects.nonNull(workflowTaskBatchId)) { + // 若是工作流则开启下一个任务 + try { + WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); + taskExecuteDTO.setWorkflowTaskBatchId(workflowTaskBatchId); + taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType()); + taskExecuteDTO.setParentId(workflowNodeId); + ActorRef actorRef = ActorGenerator.jobTaskExecutorActor(); + actorRef.tell(taskExecuteDTO, actorRef); + } catch (Exception e) { + log.error("任务调度执行失败", e); + } + } + return 1 == jobTaskBatchMapper.update(jobTaskBatch, new LambdaUpdateWrapper() .eq(JobTaskBatch::getId, taskBatchId) diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/RunningJobPrepareHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/RunningJobPrepareHandler.java index 2c9f6f671..aec884fe2 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/RunningJobPrepareHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/RunningJobPrepareHandler.java @@ -43,7 +43,7 @@ public class RunningJobPrepareHandler extends AbstractJobPrePareHandler { // 若存在所有的任务都是完成,但是批次上的状态为运行中,则是并发导致的未把批次状态变成为终态,此处做一次兜底处理 int blockStrategy = prepare.getBlockStrategy(); JobOperationReasonEnum jobOperationReasonEnum = JobOperationReasonEnum.NONE; - if (jobTaskBatchHandler.complete(prepare.getTaskBatchId(), jobOperationReasonEnum.getReason())) { + if (jobTaskBatchHandler.complete(prepare.getWorkflowNodeId(), prepare.getWorkflowTaskBatchId(), prepare.getTaskBatchId(), jobOperationReasonEnum.getReason())) { blockStrategy = BlockStrategyEnum.CONCURRENCY.getBlockStrategy(); } else { // 计算超时时间 diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/AbstractWorkflowPrePareHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/AbstractWorkflowPrePareHandler.java new file mode 100644 index 000000000..5226c3c19 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/AbstractWorkflowPrePareHandler.java @@ -0,0 +1,21 @@ +package com.aizuda.easy.retry.server.job.task.support.prepare.workflow; + +import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO; +import com.aizuda.easy.retry.server.job.task.support.WorkflowPrePareHandler; + +/** + * @author: xiaowoniu + * @date : 2023-12-22 08:57 + * @since : 2.6.0 + */ +public abstract class AbstractWorkflowPrePareHandler implements WorkflowPrePareHandler { + + @Override + public void handler(WorkflowTaskPrepareDTO workflowTaskPrepareDTO) { + + doHandler(workflowTaskPrepareDTO); + } + + protected abstract void doHandler(WorkflowTaskPrepareDTO jobPrepareDTO); + +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/TerminalWorkflowPrepareHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/TerminalWorkflowPrepareHandler.java new file mode 100644 index 000000000..fb93a6b10 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/TerminalWorkflowPrepareHandler.java @@ -0,0 +1,32 @@ +package com.aizuda.easy.retry.server.job.task.support.prepare.workflow; + +import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO; +import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter; +import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowBatchGenerator; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.Objects; + +/** + * @author: xiaowoniu + * @date : 2023-12-22 08:59 + * @since : 2.6.0 + */ +@Component +@RequiredArgsConstructor +@Slf4j +public class TerminalWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler { + private final WorkflowBatchGenerator workflowBatchGenerator; + @Override + public boolean matches(final Integer status) { + return Objects.isNull(status); + } + + @Override + protected void doHandler(final WorkflowTaskPrepareDTO jobPrepareDTO) { + log.info("无处理中的数据. workflowId:[{}]", jobPrepareDTO.getWorkflowId()); + workflowBatchGenerator.generateJobTaskBatch(WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatchGeneratorContext(jobPrepareDTO)); + } +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/WorkflowTimerTask.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/WorkflowTimerTask.java new file mode 100644 index 000000000..01544d5c4 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/WorkflowTimerTask.java @@ -0,0 +1,72 @@ +package com.aizuda.easy.retry.server.job.task.support.timer; + +import akka.actor.ActorRef; +import cn.hutool.core.lang.Assert; +import com.aizuda.easy.retry.common.core.constant.SystemConstants; +import com.aizuda.easy.retry.common.core.context.SpringContext; +import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; +import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.common.util.DateUtils; +import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO; +import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO; +import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO; +import com.aizuda.easy.retry.server.job.task.dto.WorkflowTimerTaskDTO; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.time.LocalDateTime; + +/** + * @author: xiaowoniu + * @date : 2023-09-25 + * @since 2.6.0 + */ +@AllArgsConstructor +@Slf4j +public class WorkflowTimerTask implements TimerTask { + + private WorkflowTimerTaskDTO workflowTimerTaskDTO; + + @Override + public void run(final Timeout timeout) throws Exception { + // 执行任务调度 + log.info("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), workflowTimerTaskDTO.getWorkflowTaskBatchId()); + + try { + + int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus(); + int operationReason = JobOperationReasonEnum.NONE.getReason(); + handlerTaskBatch(workflowTimerTaskDTO.getWorkflowTaskBatchId(), taskStatus, operationReason); + + WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); + taskExecuteDTO.setWorkflowTaskBatchId(workflowTimerTaskDTO.getWorkflowTaskBatchId()); + taskExecuteDTO.setWorkflowId(workflowTimerTaskDTO.getWorkflowId()); + taskExecuteDTO.setTriggerType(workflowTimerTaskDTO.getTriggerType()); + taskExecuteDTO.setParentId(SystemConstants.ROOT); + ActorRef actorRef = ActorGenerator.jobTaskExecutorActor(); + actorRef.tell(taskExecuteDTO, actorRef); + + } catch (Exception e) { + log.error("任务调度执行失败", e); + } + } + + private void handlerTaskBatch(Long workflowTaskBatchId, int taskStatus, int operationReason) { + + WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch(); + jobTaskBatch.setId(workflowTaskBatchId); + jobTaskBatch.setExecutionAt(DateUtils.toNowMilli()); + jobTaskBatch.setTaskBatchStatus(taskStatus); + jobTaskBatch.setOperationReason(operationReason); + Assert.isTrue(1 == SpringContext.getBeanByType(WorkflowTaskBatchMapper.class).updateById(jobTaskBatch), + () -> new EasyRetryServerException("更新任务失败")); + + } + +} diff --git a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java index 2af47c610..40c348a44 100644 --- a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java +++ b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java @@ -78,7 +78,7 @@ public class ConsumerBucketActor extends AbstractActor { // 扫描DAG工作流任务数据 ActorRef scanWorkflowActorRef = cacheActorRef(DEFAULT_WORKFLOW_KEY, TaskTypeEnum.WORKFLOW); - scanJobActorRef.tell(scanTask, scanWorkflowActorRef); + scanWorkflowActorRef.tell(scanTask, scanWorkflowActorRef); } if (SystemModeEnum.isRetry(systemProperties.getMode())) { diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java index 1496ad6f2..20dca5171 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java @@ -1,11 +1,18 @@ package com.aizuda.easy.retry.server.web.service.impl; import cn.hutool.core.lang.Assert; +import cn.hutool.core.util.HashUtil; import cn.hutool.core.util.StrUtil; +import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.util.JsonUtil; +import com.aizuda.easy.retry.server.common.WaitStrategy; +import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.common.strategy.WaitStrategies; +import com.aizuda.easy.retry.server.common.util.DateUtils; import com.aizuda.easy.retry.server.web.model.base.PageResult; +import com.aizuda.easy.retry.server.web.model.request.JobRequestVO; import com.aizuda.easy.retry.server.web.model.request.UserSessionVO; import com.aizuda.easy.retry.server.web.model.request.WorkflowQueryVO; import com.aizuda.easy.retry.server.web.model.request.WorkflowRequestVO; @@ -30,6 +37,7 @@ import com.google.common.graph.GraphBuilder; import com.google.common.graph.MutableGraph; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; @@ -49,7 +57,7 @@ import java.util.stream.Collectors; public class WorkflowServiceImpl implements WorkflowService { private final WorkflowMapper workflowMapper; private final WorkflowNodeMapper workflowNodeMapper; - private final static long root = -1; + private final SystemProperties systemProperties; @Override @Transactional @@ -57,20 +65,22 @@ public class WorkflowServiceImpl implements WorkflowService { MutableGraph graph = GraphBuilder.directed().allowsSelfLoops(false).build(); // 添加虚拟头节点 - graph.addNode(root); + graph.addNode(SystemConstants.ROOT); // 组装工作流信息 Workflow workflow = WorkflowConverter.INSTANCE.toWorkflow(workflowRequestVO); - // TODO 临时设置值 - workflow.setNextTriggerAt(1L); + workflow.setNextTriggerAt(calculateNextTriggerAt(workflowRequestVO, DateUtils.toNowMilli())); workflow.setFlowInfo(StrUtil.EMPTY); + workflow.setBucketIndex(HashUtil.bkdrHash(workflowRequestVO.getGroupName() + workflowRequestVO.getWorkflowName()) + % systemProperties.getBucketTotal()); + workflow.setNamespaceId(UserSessionUtils.currentUserSession().getNamespaceId()); Assert.isTrue(1 == workflowMapper.insert(workflow), () -> new EasyRetryServerException("新增工作流失败")); // 获取DAG节点配置 NodeConfig nodeConfig = workflowRequestVO.getNodeConfig(); // 递归构建图 - buildGraph(Lists.newArrayList(root), workflowRequestVO.getGroupName(), workflow.getId(), nodeConfig, graph); + buildGraph(Lists.newArrayList(SystemConstants.ROOT), workflowRequestVO.getGroupName(), workflow.getId(), nodeConfig, graph); log.info("图构建完成. graph:[{}]", graph); // 保存图信息 @@ -80,6 +90,13 @@ public class WorkflowServiceImpl implements WorkflowService { return true; } + private static Long calculateNextTriggerAt(final WorkflowRequestVO workflowRequestVO, Long time) { + WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(workflowRequestVO.getTriggerType()); + WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext(); + waitStrategyContext.setTriggerInterval(workflowRequestVO.getTriggerInterval()); + waitStrategyContext.setNextTriggerAt(time); + return waitStrategy.computeTriggerTime(waitStrategyContext); + } @Override public WorkflowDetailResponseVO getWorkflowDetail(Long id) throws IOException { @@ -101,7 +118,7 @@ public class WorkflowServiceImpl implements WorkflowService { try { MutableGraph graph = deserializeJsonToGraph(flowInfo); // 反序列化构建图 - WorkflowDetailResponseVO.NodeConfig config = buildNodeConfig(graph, root, new HashMap<>(), workflowNodeMap); + WorkflowDetailResponseVO.NodeConfig config = buildNodeConfig(graph, SystemConstants.ROOT, new HashMap<>(), workflowNodeMap); responseVO.setNodeConfig(config); } catch (Exception e) { log.error("反序列化失败. json:[{}]", flowInfo, e); @@ -138,13 +155,13 @@ public class WorkflowServiceImpl implements WorkflowService { MutableGraph graph = GraphBuilder.directed().allowsSelfLoops(false).build(); // 添加虚拟头节点 - graph.addNode(root); + graph.addNode(SystemConstants.ROOT); // 获取DAG节点配置 NodeConfig nodeConfig = workflowRequestVO.getNodeConfig(); // 递归构建图 - buildGraph(Lists.newArrayList(root), workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph); + buildGraph(Lists.newArrayList(SystemConstants.ROOT), workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph); log.info("图构建完成. graph:[{}]", graph); @@ -215,7 +232,7 @@ public class WorkflowServiceImpl implements WorkflowService { buildNodeConfig(graph, successor, nodeConfigMap, workflowNodeMap); } - if (parentId != root && mount) { + if (parentId != SystemConstants.ROOT && mount) { previousNodeInfo.setChildNode(currentConfig); }