diff --git a/doc/sql/easy_retry_mysql.sql b/doc/sql/easy_retry_mysql.sql
index 940ef7d8..6e96748a 100644
--- a/doc/sql/easy_retry_mysql.sql
+++ b/doc/sql/easy_retry_mysql.sql
@@ -356,22 +356,24 @@ CREATE TABLE `job_task`
CREATE TABLE `job_task_batch`
(
- `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
- `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
- `group_name` varchar(64) NOT NULL COMMENT '组名称',
- `job_id` bigint(20) NOT NULL COMMENT '任务id',
- `task_batch_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务批次状态 0、失败 1、成功',
- `operation_reason` tinyint(4) NOT NULL DEFAULT '0' COMMENT '操作原因',
- `execution_at` bigint(13) NOT NULL DEFAULT '0' COMMENT '任务执行时间',
- `parent_id` varchar(64) NOT NULL DEFAULT '' COMMENT '父节点',
- `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
- `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
- `deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
- `ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段',
+ `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
+ `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
+ `group_name` varchar(64) NOT NULL COMMENT '组名称',
+ `job_id` bigint(20) NOT NULL COMMENT '任务id',
+ `workflow_node_id` bigint(20) NOT NULL COMMENT '工作流节点id',
+ `workflow_task_batch_id` bigint(20) NOT NULL COMMENT '工作流任务批次id',
+ `task_batch_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务批次状态 0、失败 1、成功',
+ `operation_reason` tinyint(4) NOT NULL DEFAULT '0' COMMENT '操作原因',
+ `execution_at` bigint(13) NOT NULL DEFAULT '0' COMMENT '任务执行时间',
+ `parent_id` varchar(64) NOT NULL DEFAULT '' COMMENT '父节点',
+ `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
+ `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
+ `deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
+ `ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段',
PRIMARY KEY (`id`),
- KEY `idx_job_id_task_batch_status` (`job_id`, `task_batch_status`),
- KEY `idx_create_dt` (`create_dt`),
- KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`)
+ KEY `idx_job_id_task_batch_status` (`job_id`, `task_batch_status`),
+ KEY `idx_create_dt` (`create_dt`),
+ KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT ='任务批次';
@@ -442,25 +444,25 @@ CREATE TABLE `retry_summary`
CREATE TABLE `workflow`
(
- `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
- `workflow_name` varchar(64) NOT NULL COMMENT '工作流名称',
- `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
- `group_name` varchar(64) NOT NULL COMMENT '组名称',
- `workflow_status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '工作流状态 0、关闭、1、开启',
- `trigger_type` tinyint(4) NOT NULL COMMENT '触发类型 1.CRON 表达式 2. 固定时间',
- `trigger_interval` varchar(255) NOT NULL COMMENT '间隔时长',
- `next_trigger_at` bigint(13) NOT NULL COMMENT '下次触发时间',
- `executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',
- `description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述',
- `flow_info` text DEFAULT NULL COMMENT '流程信息',
- `bucket_index` int(11) NOT NULL DEFAULT '0' COMMENT 'bucket',
- `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
- `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
- `deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
- `ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段',
+ `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
+ `workflow_name` varchar(64) NOT NULL COMMENT '工作流名称',
+ `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
+ `group_name` varchar(64) NOT NULL COMMENT '组名称',
+ `workflow_status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '工作流状态 0、关闭、1、开启',
+ `trigger_type` tinyint(4) NOT NULL COMMENT '触发类型 1.CRON 表达式 2. 固定时间',
+ `trigger_interval` varchar(255) NOT NULL COMMENT '间隔时长',
+ `next_trigger_at` bigint(13) NOT NULL COMMENT '下次触发时间',
+ `executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',
+ `description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述',
+ `flow_info` text DEFAULT NULL COMMENT '流程信息',
+ `bucket_index` int(11) NOT NULL DEFAULT '0' COMMENT 'bucket',
+ `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
+ `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
+ `deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
+ `ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段',
PRIMARY KEY (`id`),
- KEY `idx_create_dt` (`create_dt`),
- KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`)
+ KEY `idx_create_dt` (`create_dt`),
+ KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT ='工作流';
@@ -468,23 +470,23 @@ CREATE TABLE `workflow`
CREATE TABLE `workflow_node`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
- `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
- `node_name` varchar(64) NOT NULL COMMENT '节点名称',
- `group_name` varchar(64) NOT NULL COMMENT '组名称',
- `job_id` bigint(20) NOT NULL DEFAULT -1 COMMENT '任务信息id',
- `workflow_id` bigint(20) NOT NULL COMMENT '工作流ID',
- `node_type` tinyint(4) NOT NULL DEFAULT 1 COMMENT '1、任务节点 2、条件节点',
- `expression_type` tinyint(4) NOT NULL DEFAULT 0 COMMENT '1、SpEl、2、Aviator 3、QL',
- `fail_strategy` tinyint(4) NOT NULL DEFAULT 0 COMMENT '失败策略 1、跳过 2、阻塞',
- `workflow_node_status` tinyint(4) NOT NULL DEFAULT 1 COMMENT '工作流节点状态 0、关闭、1、开启',
- `node_expression` text DEFAULT NULL COMMENT '节点表达式',
- `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
- `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
- `deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
- `ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段',
+ `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
+ `node_name` varchar(64) NOT NULL COMMENT '节点名称',
+ `group_name` varchar(64) NOT NULL COMMENT '组名称',
+ `job_id` bigint(20) NOT NULL DEFAULT -1 COMMENT '任务信息id',
+ `workflow_id` bigint(20) NOT NULL COMMENT '工作流ID',
+ `node_type` tinyint(4) NOT NULL DEFAULT 1 COMMENT '1、任务节点 2、条件节点',
+ `expression_type` tinyint(4) NOT NULL DEFAULT 0 COMMENT '1、SpEl、2、Aviator 3、QL',
+ `fail_strategy` tinyint(4) NOT NULL DEFAULT 0 COMMENT '失败策略 1、跳过 2、阻塞',
+ `workflow_node_status` tinyint(4) NOT NULL DEFAULT 1 COMMENT '工作流节点状态 0、关闭、1、开启',
+ `node_expression` text DEFAULT NULL COMMENT '节点表达式',
+ `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
+ `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
+ `deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
+ `ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段',
PRIMARY KEY (`id`),
- KEY `idx_create_dt` (`create_dt`),
- KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`)
+ KEY `idx_create_dt` (`create_dt`),
+ KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT ='工作流节点';
@@ -492,20 +494,21 @@ CREATE TABLE `workflow_node`
CREATE TABLE `workflow_task_batch`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
- `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
- `group_name` varchar(64) NOT NULL COMMENT '组名称',
- `workflow_id` bigint(20) NOT NULL COMMENT '工作流任务id',
- `task_batch_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务批次状态 0、失败 1、成功',
- `operation_reason` tinyint(4) NOT NULL DEFAULT '0' COMMENT '操作原因',
- `execution_at` bigint(13) NOT NULL DEFAULT '0' COMMENT '任务执行时间',
- `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
- `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
- `deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
- `ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段',
+ `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
+ `group_name` varchar(64) NOT NULL COMMENT '组名称',
+ `workflow_id` bigint(20) NOT NULL COMMENT '工作流任务id',
+ `task_batch_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务批次状态 0、失败 1、成功',
+ `operation_reason` tinyint(4) NOT NULL DEFAULT '0' COMMENT '操作原因',
+ `flow_info` text DEFAULT NULL COMMENT '流程信息',
+ `execution_at` bigint(13) NOT NULL DEFAULT '0' COMMENT '任务执行时间',
+ `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
+ `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
+ `deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
+ `ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段',
PRIMARY KEY (`id`),
- KEY `idx_job_id_task_batch_status` (`workflow_id`, `task_batch_status`),
- KEY `idx_create_dt` (`create_dt`),
- KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`)
+ KEY `idx_job_id_task_batch_status` (`workflow_id`, `task_batch_status`),
+ KEY `idx_create_dt` (`create_dt`),
+ KEY `idx_namespace_id_group_name` (`namespace_id`, `group_name`)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT ='工作流批次';
diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/dto/JobContext.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/dto/JobContext.java
index 93e27c47..0d3071b4 100644
--- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/dto/JobContext.java
+++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/dto/JobContext.java
@@ -13,6 +13,10 @@ public class JobContext {
private Long taskBatchId;
+ private Long workflowBatchId;
+
+ private Long workflowNodeId;
+
private Long taskId;
private String groupName;
diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/JobExecutorFutureCallback.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/JobExecutorFutureCallback.java
index 1e6cb862..bf19b615 100644
--- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/JobExecutorFutureCallback.java
+++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/executor/JobExecutorFutureCallback.java
@@ -30,7 +30,7 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
.client(JobNettyClient.class)
.callback(nettyResult -> LogUtils.info(log, "Data report successfully requestId:[{}]", nettyResult.getRequestId())).build();
- private JobContext jobContext;
+ private final JobContext jobContext;
public JobExecutorFutureCallback(final JobContext jobContext) {
this.jobContext = jobContext;
@@ -97,6 +97,9 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
dispatchJobRequest.setGroupName(jobContext.getGroupName());
dispatchJobRequest.setJobId(jobContext.getJobId());
dispatchJobRequest.setTaskId(jobContext.getTaskId());
+ dispatchJobRequest.setWorkflowBatchId(jobContext.getWorkflowBatchId());
+ dispatchJobRequest.setTaskBatchId(jobContext.getTaskBatchId());
+ dispatchJobRequest.setTaskId(jobContext.getTaskId());
dispatchJobRequest.setTaskType(jobContext.getTaskType());
dispatchJobRequest.setExecuteResult(executeResult);
dispatchJobRequest.setTaskStatus(status);
diff --git a/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/request/DispatchJobRequest.java b/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/request/DispatchJobRequest.java
index 12d43358..3763ffb7 100644
--- a/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/request/DispatchJobRequest.java
+++ b/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/request/DispatchJobRequest.java
@@ -36,13 +36,18 @@ public class DispatchJobRequest {
@NotBlank(message = "executorInfo 不能为空")
private String executorInfo;
+ @NotBlank(message = "executorTimeout 不能为空")
+ private Integer executorTimeout;
+
private String argsStr;
private Integer shardingTotal;
private Integer shardingIndex;
- @NotBlank(message = "executorTimeout 不能为空")
- private Integer executorTimeout;
+ private Long workflowBatchId;
+
+ private Long workflowNodeId;
+
}
diff --git a/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/request/DispatchJobResultRequest.java b/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/request/DispatchJobResultRequest.java
index 5c16383b..e895fd0d 100644
--- a/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/request/DispatchJobResultRequest.java
+++ b/easy-retry-common/easy-retry-common-client-api/src/main/java/com/aizuda/easy/retry/client/model/request/DispatchJobResultRequest.java
@@ -14,6 +14,10 @@ public class DispatchJobResultRequest {
private Long taskBatchId;
+ private Long workflowBatchId;
+
+ private Long workflowNodeId;
+
private Long taskId;
/**
diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java
index 26a7467f..89bfdd41 100644
--- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java
+++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java
@@ -96,4 +96,9 @@ public interface SystemConstants {
* AT 所有人
*/
String AT_ALL = "all";
+
+ /**
+ * 根节点
+ */
+ Long ROOT = -1L;
}
diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/JobTaskBatch.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/JobTaskBatch.java
index e5f0892e..8d157ca7 100644
--- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/JobTaskBatch.java
+++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/JobTaskBatch.java
@@ -48,6 +48,16 @@ public class JobTaskBatch implements Serializable {
*/
private Long jobId;
+ /**
+ * 工作流批次id
+ */
+ private Long workflowTaskBatchId;
+
+ /**
+ * 工作流节点id
+ */
+ private Long workflowNodeId;
+
/**
* 任务批次状态
*/
diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/WorkflowTaskBatch.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/WorkflowTaskBatch.java
index 3c673f53..012c1d94 100644
--- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/WorkflowTaskBatch.java
+++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/WorkflowTaskBatch.java
@@ -47,18 +47,23 @@ public class WorkflowTaskBatch implements Serializable {
/**
* 任务批次状态 0、失败 1、成功
*/
- private Byte taskBatchStatus;
+ private Integer taskBatchStatus;
/**
* 操作原因
*/
- private Byte operationReason;
+ private Integer operationReason;
/**
* 任务执行时间
*/
private Long executionAt;
+ /**
+ * 流程信息
+ */
+ private String flowInfo;
+
/**
* 创建时间
*/
@@ -72,7 +77,7 @@ public class WorkflowTaskBatch implements Serializable {
/**
* 逻辑删除 1、删除
*/
- private Byte deleted;
+ private Integer deleted;
/**
* 扩展字段
diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java
index a8269e32..4c74fa7a 100644
--- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java
+++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java
@@ -44,6 +44,7 @@ public class ActorGenerator {
public static final String JOB_TASK_PREPARE_ACTOR = "JobTaskPrepareActor";
public static final String WORKFLOW_TASK_PREPARE_ACTOR = "WorkflowTaskPrepareActor";
public static final String JOB_EXECUTOR_ACTOR = "JobExecutorActor";
+ public static final String WORKFLOW_EXECUTOR_ACTOR = "WorkflowExecutorActor";
public static final String JOB_EXECUTOR_RESULT_ACTOR = "JobExecutorResultActor";
public static final String JOB_LOG_ACTOR = "JobLogActor";
public static final String REAL_JOB_EXECUTOR_ACTOR = "RealJobExecutorActor";
@@ -195,6 +196,16 @@ public class ActorGenerator {
.withDispatcher(JOB_TASK_DISPATCHER));
}
+ /**
+ * Job调度准备阶段actor
+ *
+ * @return actor 引用
+ */
+ public static ActorRef workflowTaskPrepareActor() {
+ return getJobActorSystem().actorOf(getSpringExtension().props(WORKFLOW_TASK_PREPARE_ACTOR)
+ .withDispatcher(JOB_TASK_DISPATCHER));
+ }
+
/**
* Job任务执行阶段actor
*
@@ -208,6 +219,19 @@ public class ActorGenerator {
);
}
+ /**
+ * Job任务执行阶段actor
+ *
+ * @return actor 引用
+ */
+ public static ActorRef workflowTaskExecutorActor() {
+ return getJobActorSystem()
+ .actorOf(getSpringExtension()
+ .props(WORKFLOW_EXECUTOR_ACTOR)
+ .withDispatcher(JOB_TASK_EXECUTOR_DISPATCHER)
+ );
+ }
+
/**
* Job任务执行结果actor
*
diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/JobTriggerTypeEnum.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/JobTriggerTypeEnum.java
index 536a5f56..c7f86a99 100644
--- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/JobTriggerTypeEnum.java
+++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/JobTriggerTypeEnum.java
@@ -15,7 +15,9 @@ import lombok.Getter;
@AllArgsConstructor
public enum JobTriggerTypeEnum {
AUTO(1, "自动触发"),
- MANUAL(2, "手动触发");
+ MANUAL(2, "手动触发"),
+ WORKFLOW(2, "DAG触发"),
+ ;
private final Integer type;
private final String desc;
diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/GraphUtils.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/GraphUtils.java
new file mode 100644
index 00000000..0b1535c5
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/GraphUtils.java
@@ -0,0 +1,41 @@
+package com.aizuda.easy.retry.server.common.util;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.graph.GraphBuilder;
+import com.google.common.graph.MutableGraph;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * @author: xiaowoniu
+ * @date : 2023-12-22
+ * @since : 2.6.0
+ */
+public class GraphUtils {
+
+
+ // 从JSON反序列化为Guava图
+ public static <T> MutableGraph<T> deserializeJsonToGraph(String jsonGraph) throws IOException {
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ // 将JSON字符串转换为Map<String, Iterable<String>>
+ Map<T, Iterable<T>> adjacencyList = objectMapper.readValue(
+ jsonGraph, new TypeReference<Map<T, Iterable<T>>>() {});
+
+ // 创建Guava图并添加节点和边
+ MutableGraph<T> graph = GraphBuilder.directed().build();
+ for (Map.Entry<T, Iterable<T>> entry : adjacencyList.entrySet()) {
+ T node = entry.getKey();
+ Iterable<T> successors = entry.getValue();
+
+ graph.addNode(node);
+ for (T successor : successors) {
+ graph.putEdge(node, successor);
+ }
+ }
+
+ return graph;
+ }
+}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobExecutorResultDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobExecutorResultDTO.java
index f85da012..6c7f4263 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobExecutorResultDTO.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobExecutorResultDTO.java
@@ -1,6 +1,5 @@
package com.aizuda.easy.retry.server.job.task.dto;
-import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import lombok.Data;
/**
@@ -15,6 +14,13 @@ public class JobExecutorResultDTO {
private Long taskBatchId;
+ /**
+ * 工作流任务批次id
+ */
+ private Long workflowTaskBatchId;
+
+ private Long workflowNodeId;
+
private Long taskId;
/**
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTaskPrepareDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTaskPrepareDTO.java
index 66531523..c45f981e 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTaskPrepareDTO.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTaskPrepareDTO.java
@@ -57,4 +57,15 @@ public class JobTaskPrepareDTO {
*/
private Integer triggerType;
+ /**
+ * 工作流任务批次id
+ */
+ private Long workflowTaskBatchId;
+
+ /**
+ * 工作流节点id
+ */
+ private Long workflowNodeId;
+
+
}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowNodeTaskExecuteDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowNodeTaskExecuteDTO.java
new file mode 100644
index 00000000..bde18b22
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowNodeTaskExecuteDTO.java
@@ -0,0 +1,29 @@
+package com.aizuda.easy.retry.server.job.task.dto;
+
+import lombok.Data;
+
+/**
+ * @author: xiaowoniu
+ * @date : 2023-12-22
+ * @since : 2.6.0
+ */
+@Data
+public class WorkflowNodeTaskExecuteDTO {
+
+ /**
+ * 工作流id
+ */
+ private Long workflowId;
+
+ /**
+ * 工作流任务批次id
+ */
+ private Long workflowTaskBatchId;
+ /**
+ * 触发类似 1、auto 2、manual
+ */
+ private Integer triggerType;
+
+ private Long parentId;
+
+}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java
index 50652c97..313b058e 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java
@@ -17,4 +17,39 @@ public class WorkflowTaskPrepareDTO {
*/
private Integer triggerType;
+ /**
+ * 工作流名称
+ */
+ private String workflowName;
+
+ /**
+ * 命名空间id
+ */
+ private String namespaceId;
+
+ /**
+ * 组名称
+ */
+ private String groupName;
+
+ /**
+ * 触发间隔
+ */
+ private String triggerInterval;
+
+ /**
+ * 执行超时时间
+ */
+ private Integer executorTimeout;
+
+ /**
+ * 工作流状态 0、关闭、1、开启
+ */
+ private Integer workflowStatus;
+
+ /**
+ * 流程信息
+ */
+ private String flowInfo;
+
}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTimerTaskDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTimerTaskDTO.java
new file mode 100644
index 00000000..0be435a9
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTimerTaskDTO.java
@@ -0,0 +1,21 @@
+package com.aizuda.easy.retry.server.job.task.dto;
+
+import lombok.Data;
+
+/**
+ * @author www.byteblogs.com
+ * @date 2023-12-22
+ * @since 2.6.0
+ */
+@Data
+public class WorkflowTimerTaskDTO {
+
+ private Long workflowTaskBatchId;
+
+ private Long workflowId;
+
+ /**
+ * 触发类似 1、auto 2、manual
+ */
+ private Integer triggerType;
+}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowPrePareHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowPrePareHandler.java
new file mode 100644
index 00000000..41fc68b2
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowPrePareHandler.java
@@ -0,0 +1,15 @@
+package com.aizuda.easy.retry.server.job.task.support;
+
+import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
+
+/**
+ * @author www.byteblogs.com
+ * @date 2023-10-22 09:34:00
+ * @since 2.6.0
+ */
+public interface WorkflowPrePareHandler {
+
+ boolean matches(Integer status);
+
+ void handler(WorkflowTaskPrepareDTO workflowTaskPrepareDTO);
+}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java
index 1ee0836f..665e4607 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java
@@ -1,10 +1,13 @@
package com.aizuda.easy.retry.server.job.task.support;
-import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowPartitionTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
+import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext;
import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow;
+import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
import org.mapstruct.Mapper;
+import org.mapstruct.Mapping;
+import org.mapstruct.Mappings;
import org.mapstruct.factory.Mappers;
import java.util.List;
@@ -20,5 +23,12 @@ public interface WorkflowTaskConverter {
List<WorkflowPartitionTaskDTO> toWorkflowPartitionTaskList(List<Workflow> workflowList);
+ @Mappings(
+ @Mapping(source = "id", target = "workflowId")
+ )
WorkflowTaskPrepareDTO toWorkflowTaskPrepareDTO(WorkflowPartitionTaskDTO workflowPartitionTaskDTO);
+
+ WorkflowTaskBatchGeneratorContext toWorkflowTaskBatchGeneratorContext(WorkflowTaskPrepareDTO workflowTaskPrepareDTO);
+
+ WorkflowTaskBatch toWorkflowTaskBatch(WorkflowTaskBatchGeneratorContext context);
}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClientCallbackContext.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClientCallbackContext.java
index 42ced064..9393b1a8 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClientCallbackContext.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClientCallbackContext.java
@@ -20,6 +20,8 @@ public class ClientCallbackContext {
private Long taskBatchId;
+ private Long workflowBatchId;
+
private Long taskId;
private String groupName;
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java
index 8c288b00..7586ab7c 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java
@@ -87,7 +87,7 @@ public class JobExecutorActor extends AbstractActor {
private void doExecute(final TaskExecuteDTO taskExecute) {
- LambdaQueryWrapper<Job> queryWrapper = new LambdaQueryWrapper<Job>();
+ LambdaQueryWrapper<Job> queryWrapper = new LambdaQueryWrapper<>();
// 自动的校验任务必须是开启状态,手动触发无需校验
if (JobTriggerTypeEnum.AUTO.getType().equals(taskExecute.getTriggerType())) {
queryWrapper.eq(Job::getJobStatus, StatusEnum.YES.getStatus());
@@ -150,38 +150,39 @@ public class JobExecutorActor extends AbstractActor {
}
private void doHandlerResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) {
- if (Objects.isNull(job) || JobTriggerTypeEnum.MANUAL.getType().equals(taskExecuteDTO.getTriggerType())) {
+ if (Objects.isNull(job)
+ || JobTriggerTypeEnum.MANUAL.getType().equals(taskExecuteDTO.getTriggerType())
+ || JobTriggerTypeEnum.WORKFLOW.getType().equals(taskExecuteDTO.getTriggerType())
+ // 是否是常驻任务
+ || Objects.equals(StatusEnum.NO.getStatus(), job.getResident())
+ ) {
return;
}
- // 是否是常驻任务
- if (Objects.equals(StatusEnum.YES.getStatus(), job.getResident())) {
+ JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO();
+ jobTimerTaskDTO.setJobId(taskExecuteDTO.getJobId());
+ jobTimerTaskDTO.setTaskBatchId(taskExecuteDTO.getTaskBatchId());
+ jobTimerTaskDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType());
+ ResidentJobTimerTask timerTask = new ResidentJobTimerTask(jobTimerTaskDTO, job);
+ WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(job.getTriggerType());
- JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO();
- jobTimerTaskDTO.setJobId(taskExecuteDTO.getJobId());
- jobTimerTaskDTO.setTaskBatchId(taskExecuteDTO.getTaskBatchId());
- jobTimerTaskDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType());
- ResidentJobTimerTask timerTask = new ResidentJobTimerTask(jobTimerTaskDTO, job);
- WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(job.getTriggerType());
-
- Long preTriggerAt = ResidentTaskCache.get(job.getId());
- if (Objects.isNull(preTriggerAt) || preTriggerAt < job.getNextTriggerAt()) {
- preTriggerAt = job.getNextTriggerAt();
- }
-
- WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
- waitStrategyContext.setTriggerInterval(job.getTriggerInterval());
- waitStrategyContext.setNextTriggerAt(preTriggerAt);
- Long nextTriggerAt = waitStrategy.computeTriggerTime(waitStrategyContext);
-
- // 获取时间差的毫秒数
- long milliseconds = nextTriggerAt - preTriggerAt;
-
- log.info("常驻任务监控. 任务时间差:[{}] 取余:[{}]", milliseconds, DateUtils.toNowMilli() % 1000);
- job.setNextTriggerAt(nextTriggerAt);
-
- JobTimerWheel.register(jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - DateUtils.toNowMilli() % 1000, TimeUnit.MILLISECONDS);
- ResidentTaskCache.refresh(job.getId(), nextTriggerAt);
+ Long preTriggerAt = ResidentTaskCache.get(job.getId());
+ if (Objects.isNull(preTriggerAt) || preTriggerAt < job.getNextTriggerAt()) {
+ preTriggerAt = job.getNextTriggerAt();
}
+
+ WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
+ waitStrategyContext.setTriggerInterval(job.getTriggerInterval());
+ waitStrategyContext.setNextTriggerAt(preTriggerAt);
+ Long nextTriggerAt = waitStrategy.computeTriggerTime(waitStrategyContext);
+
+ // 获取时间差的毫秒数
+ long milliseconds = nextTriggerAt - preTriggerAt;
+
+ log.info("常驻任务监控. 任务时间差:[{}] 取余:[{}]", milliseconds, DateUtils.toNowMilli() % 1000);
+ job.setNextTriggerAt(nextTriggerAt);
+
+ JobTimerWheel.register(jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - DateUtils.toNowMilli() % 1000, TimeUnit.MILLISECONDS);
+ ResidentTaskCache.refresh(job.getId(), nextTriggerAt);
}
}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java
index 1aa83dbe..ba0dfb2d 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorResultActor.java
@@ -65,7 +65,7 @@ public class JobExecutorResultActor extends AbstractActor {
()-> new EasyRetryServerException("更新任务实例失败"));
// 更新批次上的状态
- boolean complete = jobTaskBatchHandler.complete(result.getTaskBatchId(), result.getJobOperationReason());
+ boolean complete = jobTaskBatchHandler.complete(result.getWorkflowNodeId(), result.getWorkflowTaskBatchId(), result.getTaskBatchId(), result.getJobOperationReason());
if (complete) {
// 尝试停止任务
// 若是集群任务则客户端会主动关闭
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobTaskPrepareActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobTaskPrepareActor.java
index 508b9ca3..bd5fa7d3 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobTaskPrepareActor.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobTaskPrepareActor.java
@@ -66,6 +66,8 @@ public class JobTaskPrepareActor extends AbstractActor {
for (JobTaskBatch jobTaskBatch : notCompleteJobTaskBatchList) {
prepare.setExecutionAt(jobTaskBatch.getExecutionAt());
prepare.setTaskBatchId(jobTaskBatch.getId());
+ prepare.setWorkflowTaskBatchId(jobTaskBatch.getWorkflowTaskBatchId());
+ prepare.setWorkflowNodeId(jobTaskBatch.getWorkflowNodeId());
prepare.setOnlyTimeoutCheck(onlyTimeoutCheck);
for (JobPrePareHandler prePareHandler : prePareHandlers) {
if (prePareHandler.matches(jobTaskBatch.getTaskBatchStatus())) {
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java
index 9ac2c040..dd6ee2da 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java
@@ -84,7 +84,7 @@ public class ScanWorkflowTaskActor extends AbstractActor {
for (final WorkflowTaskPrepareDTO waitExecTask : waitExecWorkflows) {
// 执行预处理阶段
- ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
+ ActorRef actorRef = ActorGenerator.workflowTaskPrepareActor();
waitExecTask.setTriggerType(JobTriggerTypeEnum.AUTO.getType());
actorRef.tell(waitExecTask, actorRef);
}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java
new file mode 100644
index 00000000..2a3b4cc3
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java
@@ -0,0 +1,110 @@
+package com.aizuda.easy.retry.server.job.task.support.dispatch;
+
+import akka.actor.AbstractActor;
+import akka.actor.ActorRef;
+import cn.hutool.core.lang.Assert;
+import com.aizuda.easy.retry.common.core.constant.SystemConstants;
+import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
+import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
+import com.aizuda.easy.retry.common.core.log.LogUtils;
+import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
+import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
+import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
+import com.aizuda.easy.retry.server.common.util.DateUtils;
+import com.aizuda.easy.retry.server.common.util.GraphUtils;
+import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
+import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
+import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
+import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
+import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
+import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
+import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper;
+import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
+import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
+import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
+import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode;
+import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.graph.GraphBuilder;
+import com.google.common.graph.MutableGraph;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * @author: xiaowoniu
+ * @date : 2023-12-22 10:34
+ * @since : 2.6.0
+ */
+@Component(ActorGenerator.WORKFLOW_EXECUTOR_ACTOR)
+@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
+@Slf4j
+@RequiredArgsConstructor
+public class WorkflowExecutorActor extends AbstractActor {
+ private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
+ private final WorkflowNodeMapper workflowNodeMapper;
+ private final JobMapper jobMapper;
+ private final JobTaskBatchMapper jobTaskBatchMapper;
+
+ @Override
+ public Receive createReceive() {
+ return receiveBuilder().match(WorkflowNodeTaskExecuteDTO.class, taskExecute -> {
+ try {
+ doExecutor(taskExecute);
+ } catch (Exception e) {
+ LogUtils.error(log, "workflow executor exception. [{}]", taskExecute, e);
+ handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(), JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason());
+ // TODO 发送通知
+ } finally {
+ getContext().stop(getSelf());
+ }
+ }).build();
+ }
+
+ private void doExecutor(WorkflowNodeTaskExecuteDTO taskExecute) throws IOException {
+ WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectById(taskExecute.getWorkflowTaskBatchId());
+ Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("任务不存在"));
+
+ // 获取DAG图
+ String flowInfo = workflowTaskBatch.getFlowInfo();
+ MutableGraph<Long> graph = GraphUtils.deserializeJsonToGraph(flowInfo);
+
+ Set<Long> predecessors = graph.predecessors(taskExecute.getParentId());
+ List<WorkflowNode> workflowNodes = workflowNodeMapper.selectBatchIds(predecessors);
+ Set<Long> jobIdSet = workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet());
+ List<Job> jobs = jobMapper.selectBatchIds(jobIdSet);
+ for (Job job : jobs) {
+ // 生成任务批次
+ JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job);
+ jobTaskPrepare.setTriggerType(JobTriggerTypeEnum.WORKFLOW.getType());
+ jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli());
+ // 执行预处理阶段
+ ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
+ actorRef.tell(jobTaskPrepare, actorRef);
+ }
+
+ }
+
+ private void handlerTaskBatch(WorkflowNodeTaskExecuteDTO taskExecute, int taskStatus, int operationReason) {
+
+ WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch();
+ jobTaskBatch.setId(taskExecute.getWorkflowTaskBatchId());
+ jobTaskBatch.setExecutionAt(DateUtils.toNowMilli());
+ jobTaskBatch.setTaskBatchStatus(taskStatus);
+ jobTaskBatch.setOperationReason(operationReason);
+ Assert.isTrue(1 == workflowTaskBatchMapper.updateById(jobTaskBatch),
+ () -> new EasyRetryServerException("更新任务失败"));
+
+ }
+}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/event/JobTaskFailAlarmEvent.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/event/JobTaskFailAlarmEvent.java
index f4f62cfe..df2de9e8 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/event/JobTaskFailAlarmEvent.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/event/JobTaskFailAlarmEvent.java
@@ -1,22 +1,23 @@
package com.aizuda.easy.retry.server.job.task.support.event;
+import lombok.Getter;
import org.springframework.context.ApplicationEvent;
/**
* job任务失败事件
+ *
* @author: zuoJunLin
* @date : 2023-12-02 21:40
* @since 2.5.0
*/
+@Getter
public class JobTaskFailAlarmEvent extends ApplicationEvent {
- private Long jobTaskBatchId;
+
+ private final Long jobTaskBatchId;
public JobTaskFailAlarmEvent(Long jobTaskBatchId) {
super(jobTaskBatchId);
- this.jobTaskBatchId=jobTaskBatchId;
+ this.jobTaskBatchId = jobTaskBatchId;
}
- public Long getJobTaskBatchId() {
- return jobTaskBatchId;
- }
}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java
index 98eb62d5..c23a1ac4 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java
@@ -42,6 +42,7 @@ public class JobTaskBatchGenerator {
jobTaskBatch.setGroupName(context.getGroupName());
jobTaskBatch.setCreateDt(LocalDateTime.now());
jobTaskBatch.setNamespaceId(context.getNamespaceId());
+ jobTaskBatch.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
// 无执行的节点
if (CollectionUtils.isEmpty(CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()))) {
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGeneratorContext.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGeneratorContext.java
index 1ff1d198..bdbd0a9d 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGeneratorContext.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGeneratorContext.java
@@ -45,5 +45,10 @@ public class JobTaskBatchGeneratorContext {
*/
private Integer triggerType;
+ /**
+ * 工作流任务批次id
+ */
+ private Long workflowTaskBatchId;
+
}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowBatchGenerator.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowBatchGenerator.java
new file mode 100644
index 00000000..79b2104b
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowBatchGenerator.java
@@ -0,0 +1,48 @@
+package com.aizuda.easy.retry.server.job.task.support.generator.batch;
+
+import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
+import com.aizuda.easy.retry.server.common.util.DateUtils;
+import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
+import com.aizuda.easy.retry.server.job.task.dto.WorkflowTimerTaskDTO;
+import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
+import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerTask;
+import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerWheel;
+import com.aizuda.easy.retry.server.job.task.support.timer.WorkflowTimerTask;
+import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
+import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author: xiaowoniu
+ * @date : 2023-12-22 09:04
+ * @since : 2.6.0
+ */
+@Component
+@RequiredArgsConstructor
+@Slf4j
+public class WorkflowBatchGenerator {
+ private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
+ @Transactional
+ public void generateJobTaskBatch(WorkflowTaskBatchGeneratorContext context) {
+
+ // 生成任务批次
+ WorkflowTaskBatch workflowTaskBatch = WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatch(context);
+ workflowTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.WAITING.getStatus());
+ workflowTaskBatchMapper.insert(workflowTaskBatch);
+
+ // 开始执行工作流
+ // 进入时间轮
+ long delay = context.getNextTriggerAt() - DateUtils.toNowMilli();
+ WorkflowTimerTaskDTO workflowTimerTaskDTO = new WorkflowTimerTaskDTO();
+ workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskBatch.getId());
+ workflowTimerTaskDTO.setWorkflowId(context.getWorkflowId());
+ workflowTimerTaskDTO.setTriggerType(context.getTriggerType());
+ JobTimerWheel.register(workflowTaskBatch.getId(),
+ new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
+ }
+}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowTaskBatchGeneratorContext.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowTaskBatchGeneratorContext.java
new file mode 100644
index 00000000..1dbf4c99
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowTaskBatchGeneratorContext.java
@@ -0,0 +1,51 @@
+package com.aizuda.easy.retry.server.job.task.support.generator.batch;
+
+import lombok.Data;
+
+/**
+ * @author www.byteblogs.com
+ * @date 2023-10-02 13:12:48
+ * @since 2.4.0
+ */
+@Data
+public class WorkflowTaskBatchGeneratorContext {
+
+ private String namespaceId;
+
+ /**
+ * 组名称
+ */
+ private String groupName;
+
+ /**
+ * 工作流id
+ */
+ private Long workflowId;
+
+ /**
+ * 下次触发时间
+ */
+ private Long nextTriggerAt;
+
+ /**
+ * 操作原因
+ */
+ private Integer operationReason;
+
+ /**
+ * 任务批次状态
+ */
+ private Integer taskBatchStatus;
+
+ /**
+ * 触发类似 1、auto 2、manual
+ */
+ private Integer triggerType;
+
+ /**
+ * 流程信息
+ */
+ private String flowInfo;
+
+
+}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java
index 36297b77..6b985885 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java
@@ -1,9 +1,14 @@
package com.aizuda.easy.retry.server.job.task.support.handler;
+import akka.actor.ActorRef;
+import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
+import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
+import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
+import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.event.JobTaskFailAlarmEvent;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
@@ -11,6 +16,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
@@ -25,6 +31,7 @@ import java.util.stream.Collectors;
* @date : 2023-10-10 16:50
*/
@Component
+@Slf4j
public class JobTaskBatchHandler {
@Autowired
@@ -32,7 +39,16 @@ public class JobTaskBatchHandler {
@Autowired
private JobTaskBatchMapper jobTaskBatchMapper;
- public boolean complete(Long taskBatchId, Integer jobOperationReason) {
+ /**
+ * TODO 参数待优化
+ *
+ * @param workflowNodeId
+ * @param workflowTaskBatchId
+ * @param taskBatchId
+ * @param jobOperationReason
+ * @return
+ */
+ public boolean complete(Long workflowNodeId, Long workflowTaskBatchId, Long taskBatchId, Integer jobOperationReason) {
List<JobTask> jobTasks = jobTaskMapper.selectList(
new LambdaQueryWrapper<JobTask>().select(JobTask::getTaskStatus)
@@ -76,6 +92,20 @@ public class JobTaskBatchHandler {
jobTaskBatch.setOperationReason(jobOperationReason);
}
+ if (Objects.nonNull(workflowNodeId) && Objects.nonNull(workflowTaskBatchId)) {
+ // 若是工作流则开启下一个任务
+ try {
+ WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
+ taskExecuteDTO.setWorkflowTaskBatchId(workflowTaskBatchId);
+ taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType());
+ taskExecuteDTO.setParentId(workflowNodeId);
+ ActorRef actorRef = ActorGenerator.jobTaskExecutorActor();
+ actorRef.tell(taskExecuteDTO, actorRef);
+ } catch (Exception e) {
+ log.error("任务调度执行失败", e);
+ }
+ }
+
return 1 == jobTaskBatchMapper.update(jobTaskBatch,
new LambdaUpdateWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getId, taskBatchId)
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/RunningJobPrepareHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/RunningJobPrepareHandler.java
index 2c9f6f67..aec884fe 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/RunningJobPrepareHandler.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/RunningJobPrepareHandler.java
@@ -43,7 +43,7 @@ public class RunningJobPrepareHandler extends AbstractJobPrePareHandler {
// 若存在所有的任务都是完成,但是批次上的状态为运行中,则是并发导致的未把批次状态变成为终态,此处做一次兜底处理
int blockStrategy = prepare.getBlockStrategy();
JobOperationReasonEnum jobOperationReasonEnum = JobOperationReasonEnum.NONE;
- if (jobTaskBatchHandler.complete(prepare.getTaskBatchId(), jobOperationReasonEnum.getReason())) {
+ if (jobTaskBatchHandler.complete(prepare.getWorkflowNodeId(), prepare.getWorkflowTaskBatchId(), prepare.getTaskBatchId(), jobOperationReasonEnum.getReason())) {
blockStrategy = BlockStrategyEnum.CONCURRENCY.getBlockStrategy();
} else {
// 计算超时时间
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/AbstractWorkflowPrePareHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/AbstractWorkflowPrePareHandler.java
new file mode 100644
index 00000000..5226c3c1
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/AbstractWorkflowPrePareHandler.java
@@ -0,0 +1,21 @@
+package com.aizuda.easy.retry.server.job.task.support.prepare.workflow;
+
+import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
+import com.aizuda.easy.retry.server.job.task.support.WorkflowPrePareHandler;
+
+/**
+ * @author: xiaowoniu
+ * @date : 2023-12-22 08:57
+ * @since : 2.6.0
+ */
+public abstract class AbstractWorkflowPrePareHandler implements WorkflowPrePareHandler {
+
+ @Override
+ public void handler(WorkflowTaskPrepareDTO workflowTaskPrepareDTO) {
+
+ doHandler(workflowTaskPrepareDTO);
+ }
+
+ protected abstract void doHandler(WorkflowTaskPrepareDTO jobPrepareDTO);
+
+}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/TerminalWorkflowPrepareHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/TerminalWorkflowPrepareHandler.java
new file mode 100644
index 00000000..fb93a6b1
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/TerminalWorkflowPrepareHandler.java
@@ -0,0 +1,32 @@
+package com.aizuda.easy.retry.server.job.task.support.prepare.workflow;
+
+import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
+import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
+import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowBatchGenerator;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.Objects;
+
+/**
+ * @author: xiaowoniu
+ * @date : 2023-12-22 08:59
+ * @since : 2.6.0
+ */
+@Component
+@RequiredArgsConstructor
+@Slf4j
+public class TerminalWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler {
+ private final WorkflowBatchGenerator workflowBatchGenerator;
+ @Override
+ public boolean matches(final Integer status) {
+ return Objects.isNull(status);
+ }
+
+ @Override
+ protected void doHandler(final WorkflowTaskPrepareDTO jobPrepareDTO) {
+ log.info("无处理中的数据. workflowId:[{}]", jobPrepareDTO.getWorkflowId());
+ workflowBatchGenerator.generateJobTaskBatch(WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatchGeneratorContext(jobPrepareDTO));
+ }
+}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/WorkflowTimerTask.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/WorkflowTimerTask.java
new file mode 100644
index 00000000..01544d5c
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/WorkflowTimerTask.java
@@ -0,0 +1,72 @@
+package com.aizuda.easy.retry.server.job.task.support.timer;
+
+import akka.actor.ActorRef;
+import cn.hutool.core.lang.Assert;
+import com.aizuda.easy.retry.common.core.constant.SystemConstants;
+import com.aizuda.easy.retry.common.core.context.SpringContext;
+import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
+import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
+import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
+import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
+import com.aizuda.easy.retry.server.common.util.DateUtils;
+import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
+import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
+import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
+import com.aizuda.easy.retry.server.job.task.dto.WorkflowTimerTaskDTO;
+import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
+import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.time.LocalDateTime;
+
+/**
+ * @author: xiaowoniu
+ * @date : 2023-09-25
+ * @since 2.6.0
+ */
+@AllArgsConstructor
+@Slf4j
+public class WorkflowTimerTask implements TimerTask {
+
+ private WorkflowTimerTaskDTO workflowTimerTaskDTO;
+
+ @Override
+ public void run(final Timeout timeout) throws Exception {
+ // 执行任务调度
+ log.info("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), workflowTimerTaskDTO.getWorkflowTaskBatchId());
+
+ try {
+
+ int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus();
+ int operationReason = JobOperationReasonEnum.NONE.getReason();
+ handlerTaskBatch(workflowTimerTaskDTO.getWorkflowTaskBatchId(), taskStatus, operationReason);
+
+ WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
+ taskExecuteDTO.setWorkflowTaskBatchId(workflowTimerTaskDTO.getWorkflowTaskBatchId());
+ taskExecuteDTO.setWorkflowId(workflowTimerTaskDTO.getWorkflowId());
+ taskExecuteDTO.setTriggerType(workflowTimerTaskDTO.getTriggerType());
+ taskExecuteDTO.setParentId(SystemConstants.ROOT);
+ ActorRef actorRef = ActorGenerator.jobTaskExecutorActor();
+ actorRef.tell(taskExecuteDTO, actorRef);
+
+ } catch (Exception e) {
+ log.error("任务调度执行失败", e);
+ }
+ }
+
+ private void handlerTaskBatch(Long workflowTaskBatchId, int taskStatus, int operationReason) {
+
+ WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch();
+ jobTaskBatch.setId(workflowTaskBatchId);
+ jobTaskBatch.setExecutionAt(DateUtils.toNowMilli());
+ jobTaskBatch.setTaskBatchStatus(taskStatus);
+ jobTaskBatch.setOperationReason(operationReason);
+ Assert.isTrue(1 == SpringContext.getBeanByType(WorkflowTaskBatchMapper.class).updateById(jobTaskBatch),
+ () -> new EasyRetryServerException("更新任务失败"));
+
+ }
+
+}
diff --git a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java
index 2af47c61..40c348a4 100644
--- a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java
+++ b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java
@@ -78,7 +78,7 @@ public class ConsumerBucketActor extends AbstractActor {
// 扫描DAG工作流任务数据
ActorRef scanWorkflowActorRef = cacheActorRef(DEFAULT_WORKFLOW_KEY, TaskTypeEnum.WORKFLOW);
- scanJobActorRef.tell(scanTask, scanWorkflowActorRef);
+ scanWorkflowActorRef.tell(scanTask, scanWorkflowActorRef);
}
if (SystemModeEnum.isRetry(systemProperties.getMode())) {
diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java
index 1496ad6f..20dca517 100644
--- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java
+++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java
@@ -1,11 +1,18 @@
package com.aizuda.easy.retry.server.web.service.impl;
import cn.hutool.core.lang.Assert;
+import cn.hutool.core.util.HashUtil;
import cn.hutool.core.util.StrUtil;
+import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
+import com.aizuda.easy.retry.server.common.WaitStrategy;
+import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
+import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
+import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.web.model.base.PageResult;
+import com.aizuda.easy.retry.server.web.model.request.JobRequestVO;
import com.aizuda.easy.retry.server.web.model.request.UserSessionVO;
import com.aizuda.easy.retry.server.web.model.request.WorkflowQueryVO;
import com.aizuda.easy.retry.server.web.model.request.WorkflowRequestVO;
@@ -30,6 +37,7 @@ import com.google.common.graph.GraphBuilder;
import com.google.common.graph.MutableGraph;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
@@ -49,7 +57,7 @@ import java.util.stream.Collectors;
public class WorkflowServiceImpl implements WorkflowService {
private final WorkflowMapper workflowMapper;
private final WorkflowNodeMapper workflowNodeMapper;
- private final static long root = -1;
+ private final SystemProperties systemProperties;
@Override
@Transactional
@@ -57,20 +65,22 @@ public class WorkflowServiceImpl implements WorkflowService {
MutableGraph<Long> graph = GraphBuilder.directed().allowsSelfLoops(false).build();
// 添加虚拟头节点
- graph.addNode(root);
+ graph.addNode(SystemConstants.ROOT);
// 组装工作流信息
Workflow workflow = WorkflowConverter.INSTANCE.toWorkflow(workflowRequestVO);
- // TODO 临时设置值
- workflow.setNextTriggerAt(1L);
+ workflow.setNextTriggerAt(calculateNextTriggerAt(workflowRequestVO, DateUtils.toNowMilli()));
workflow.setFlowInfo(StrUtil.EMPTY);
+ workflow.setBucketIndex(HashUtil.bkdrHash(workflowRequestVO.getGroupName() + workflowRequestVO.getWorkflowName())
+ % systemProperties.getBucketTotal());
+ workflow.setNamespaceId(UserSessionUtils.currentUserSession().getNamespaceId());
Assert.isTrue(1 == workflowMapper.insert(workflow), () -> new EasyRetryServerException("新增工作流失败"));
// 获取DAG节点配置
NodeConfig nodeConfig = workflowRequestVO.getNodeConfig();
// 递归构建图
- buildGraph(Lists.newArrayList(root), workflowRequestVO.getGroupName(), workflow.getId(), nodeConfig, graph);
+ buildGraph(Lists.newArrayList(SystemConstants.ROOT), workflowRequestVO.getGroupName(), workflow.getId(), nodeConfig, graph);
log.info("图构建完成. graph:[{}]", graph);
// 保存图信息
@@ -80,6 +90,13 @@ public class WorkflowServiceImpl implements WorkflowService {
return true;
}
+ private static Long calculateNextTriggerAt(final WorkflowRequestVO workflowRequestVO, Long time) {
+ WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(workflowRequestVO.getTriggerType());
+ WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
+ waitStrategyContext.setTriggerInterval(workflowRequestVO.getTriggerInterval());
+ waitStrategyContext.setNextTriggerAt(time);
+ return waitStrategy.computeTriggerTime(waitStrategyContext);
+ }
@Override
public WorkflowDetailResponseVO getWorkflowDetail(Long id) throws IOException {
@@ -101,7 +118,7 @@ public class WorkflowServiceImpl implements WorkflowService {
try {
MutableGraph<Long> graph = deserializeJsonToGraph(flowInfo);
// 反序列化构建图
- WorkflowDetailResponseVO.NodeConfig config = buildNodeConfig(graph, root, new HashMap<>(), workflowNodeMap);
+ WorkflowDetailResponseVO.NodeConfig config = buildNodeConfig(graph, SystemConstants.ROOT, new HashMap<>(), workflowNodeMap);
responseVO.setNodeConfig(config);
} catch (Exception e) {
log.error("反序列化失败. json:[{}]", flowInfo, e);
@@ -138,13 +155,13 @@ public class WorkflowServiceImpl implements WorkflowService {
MutableGraph<Long> graph = GraphBuilder.directed().allowsSelfLoops(false).build();
// 添加虚拟头节点
- graph.addNode(root);
+ graph.addNode(SystemConstants.ROOT);
// 获取DAG节点配置
NodeConfig nodeConfig = workflowRequestVO.getNodeConfig();
// 递归构建图
- buildGraph(Lists.newArrayList(root), workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph);
+ buildGraph(Lists.newArrayList(SystemConstants.ROOT), workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph);
log.info("图构建完成. graph:[{}]", graph);
@@ -215,7 +232,7 @@ public class WorkflowServiceImpl implements WorkflowService {
buildNodeConfig(graph, successor, nodeConfigMap, workflowNodeMap);
}
- if (parentId != root && mount) {
+ if (parentId != SystemConstants.ROOT && mount) {
previousNodeInfo.setChildNode(currentConfig);
}