feat: 2.6.0
1. 后端流程
This commit is contained in:
parent
bd24ffae0c
commit
ace65032f5
@ -360,6 +360,8 @@ CREATE TABLE `job_task_batch`
|
|||||||
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
|
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
|
||||||
`group_name` varchar(64) NOT NULL COMMENT '组名称',
|
`group_name` varchar(64) NOT NULL COMMENT '组名称',
|
||||||
`job_id` bigint(20) NOT NULL COMMENT '任务id',
|
`job_id` bigint(20) NOT NULL COMMENT '任务id',
|
||||||
|
`workflow_node_id` bigint(20) NOT NULL COMMENT '工作流节点id',
|
||||||
|
`workflow_task_batch_id` bigint(20) NOT NULL COMMENT '工作流任务批次id',
|
||||||
`task_batch_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务批次状态 0、失败 1、成功',
|
`task_batch_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务批次状态 0、失败 1、成功',
|
||||||
`operation_reason` tinyint(4) NOT NULL DEFAULT '0' COMMENT '操作原因',
|
`operation_reason` tinyint(4) NOT NULL DEFAULT '0' COMMENT '操作原因',
|
||||||
`execution_at` bigint(13) NOT NULL DEFAULT '0' COMMENT '任务执行时间',
|
`execution_at` bigint(13) NOT NULL DEFAULT '0' COMMENT '任务执行时间',
|
||||||
@ -497,6 +499,7 @@ CREATE TABLE `workflow_task_batch`
|
|||||||
`workflow_id` bigint(20) NOT NULL COMMENT '工作流任务id',
|
`workflow_id` bigint(20) NOT NULL COMMENT '工作流任务id',
|
||||||
`task_batch_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务批次状态 0、失败 1、成功',
|
`task_batch_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务批次状态 0、失败 1、成功',
|
||||||
`operation_reason` tinyint(4) NOT NULL DEFAULT '0' COMMENT '操作原因',
|
`operation_reason` tinyint(4) NOT NULL DEFAULT '0' COMMENT '操作原因',
|
||||||
|
`flow_info` text DEFAULT NULL COMMENT '流程信息',
|
||||||
`execution_at` bigint(13) NOT NULL DEFAULT '0' COMMENT '任务执行时间',
|
`execution_at` bigint(13) NOT NULL DEFAULT '0' COMMENT '任务执行时间',
|
||||||
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||||
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||||
|
@ -13,6 +13,10 @@ public class JobContext {
|
|||||||
|
|
||||||
private Long taskBatchId;
|
private Long taskBatchId;
|
||||||
|
|
||||||
|
private Long workflowBatchId;
|
||||||
|
|
||||||
|
private Long workflowNodeId;
|
||||||
|
|
||||||
private Long taskId;
|
private Long taskId;
|
||||||
|
|
||||||
private String groupName;
|
private String groupName;
|
||||||
|
@ -30,7 +30,7 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
|
|||||||
.client(JobNettyClient.class)
|
.client(JobNettyClient.class)
|
||||||
.callback(nettyResult -> LogUtils.info(log, "Data report successfully requestId:[{}]", nettyResult.getRequestId())).build();
|
.callback(nettyResult -> LogUtils.info(log, "Data report successfully requestId:[{}]", nettyResult.getRequestId())).build();
|
||||||
|
|
||||||
private JobContext jobContext;
|
private final JobContext jobContext;
|
||||||
|
|
||||||
public JobExecutorFutureCallback(final JobContext jobContext) {
|
public JobExecutorFutureCallback(final JobContext jobContext) {
|
||||||
this.jobContext = jobContext;
|
this.jobContext = jobContext;
|
||||||
@ -97,6 +97,9 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
|
|||||||
dispatchJobRequest.setGroupName(jobContext.getGroupName());
|
dispatchJobRequest.setGroupName(jobContext.getGroupName());
|
||||||
dispatchJobRequest.setJobId(jobContext.getJobId());
|
dispatchJobRequest.setJobId(jobContext.getJobId());
|
||||||
dispatchJobRequest.setTaskId(jobContext.getTaskId());
|
dispatchJobRequest.setTaskId(jobContext.getTaskId());
|
||||||
|
dispatchJobRequest.setWorkflowBatchId(jobContext.getWorkflowBatchId());
|
||||||
|
dispatchJobRequest.setTaskBatchId(jobContext.getTaskBatchId());
|
||||||
|
dispatchJobRequest.setTaskId(jobContext.getTaskId());
|
||||||
dispatchJobRequest.setTaskType(jobContext.getTaskType());
|
dispatchJobRequest.setTaskType(jobContext.getTaskType());
|
||||||
dispatchJobRequest.setExecuteResult(executeResult);
|
dispatchJobRequest.setExecuteResult(executeResult);
|
||||||
dispatchJobRequest.setTaskStatus(status);
|
dispatchJobRequest.setTaskStatus(status);
|
||||||
|
@ -36,13 +36,18 @@ public class DispatchJobRequest {
|
|||||||
@NotBlank(message = "executorInfo 不能为空")
|
@NotBlank(message = "executorInfo 不能为空")
|
||||||
private String executorInfo;
|
private String executorInfo;
|
||||||
|
|
||||||
|
@NotBlank(message = "executorTimeout 不能为空")
|
||||||
|
private Integer executorTimeout;
|
||||||
|
|
||||||
private String argsStr;
|
private String argsStr;
|
||||||
|
|
||||||
private Integer shardingTotal;
|
private Integer shardingTotal;
|
||||||
|
|
||||||
private Integer shardingIndex;
|
private Integer shardingIndex;
|
||||||
|
|
||||||
@NotBlank(message = "executorTimeout 不能为空")
|
private Long workflowBatchId;
|
||||||
private Integer executorTimeout;
|
|
||||||
|
private Long workflowNodeId;
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -14,6 +14,10 @@ public class DispatchJobResultRequest {
|
|||||||
|
|
||||||
private Long taskBatchId;
|
private Long taskBatchId;
|
||||||
|
|
||||||
|
private Long workflowBatchId;
|
||||||
|
|
||||||
|
private Long workflowNodeId;
|
||||||
|
|
||||||
private Long taskId;
|
private Long taskId;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -96,4 +96,9 @@ public interface SystemConstants {
|
|||||||
* AT 所有人
|
* AT 所有人
|
||||||
*/
|
*/
|
||||||
String AT_ALL = "all";
|
String AT_ALL = "all";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根节点
|
||||||
|
*/
|
||||||
|
Long ROOT = -1L;
|
||||||
}
|
}
|
||||||
|
@ -48,6 +48,16 @@ public class JobTaskBatch implements Serializable {
|
|||||||
*/
|
*/
|
||||||
private Long jobId;
|
private Long jobId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 工作流批次id
|
||||||
|
*/
|
||||||
|
private Long workflowTaskBatchId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 工作流节点id
|
||||||
|
*/
|
||||||
|
private Long workflowNodeId;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 任务批次状态
|
* 任务批次状态
|
||||||
*/
|
*/
|
||||||
|
@ -47,18 +47,23 @@ public class WorkflowTaskBatch implements Serializable {
|
|||||||
/**
|
/**
|
||||||
* 任务批次状态 0、失败 1、成功
|
* 任务批次状态 0、失败 1、成功
|
||||||
*/
|
*/
|
||||||
private Byte taskBatchStatus;
|
private Integer taskBatchStatus;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 操作原因
|
* 操作原因
|
||||||
*/
|
*/
|
||||||
private Byte operationReason;
|
private Integer operationReason;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 任务执行时间
|
* 任务执行时间
|
||||||
*/
|
*/
|
||||||
private Long executionAt;
|
private Long executionAt;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 流程信息
|
||||||
|
*/
|
||||||
|
private String flowInfo;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 创建时间
|
* 创建时间
|
||||||
*/
|
*/
|
||||||
@ -72,7 +77,7 @@ public class WorkflowTaskBatch implements Serializable {
|
|||||||
/**
|
/**
|
||||||
* 逻辑删除 1、删除
|
* 逻辑删除 1、删除
|
||||||
*/
|
*/
|
||||||
private Byte deleted;
|
private Integer deleted;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 扩展字段
|
* 扩展字段
|
||||||
|
@ -44,6 +44,7 @@ public class ActorGenerator {
|
|||||||
public static final String JOB_TASK_PREPARE_ACTOR = "JobTaskPrepareActor";
|
public static final String JOB_TASK_PREPARE_ACTOR = "JobTaskPrepareActor";
|
||||||
public static final String WORKFLOW_TASK_PREPARE_ACTOR = "WorkflowTaskPrepareActor";
|
public static final String WORKFLOW_TASK_PREPARE_ACTOR = "WorkflowTaskPrepareActor";
|
||||||
public static final String JOB_EXECUTOR_ACTOR = "JobExecutorActor";
|
public static final String JOB_EXECUTOR_ACTOR = "JobExecutorActor";
|
||||||
|
public static final String WORKFLOW_EXECUTOR_ACTOR = "WorkflowExecutorActor";
|
||||||
public static final String JOB_EXECUTOR_RESULT_ACTOR = "JobExecutorResultActor";
|
public static final String JOB_EXECUTOR_RESULT_ACTOR = "JobExecutorResultActor";
|
||||||
public static final String JOB_LOG_ACTOR = "JobLogActor";
|
public static final String JOB_LOG_ACTOR = "JobLogActor";
|
||||||
public static final String REAL_JOB_EXECUTOR_ACTOR = "RealJobExecutorActor";
|
public static final String REAL_JOB_EXECUTOR_ACTOR = "RealJobExecutorActor";
|
||||||
@ -195,6 +196,16 @@ public class ActorGenerator {
|
|||||||
.withDispatcher(JOB_TASK_DISPATCHER));
|
.withDispatcher(JOB_TASK_DISPATCHER));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Job调度准备阶段actor
|
||||||
|
*
|
||||||
|
* @return actor 引用
|
||||||
|
*/
|
||||||
|
public static ActorRef workflowTaskPrepareActor() {
|
||||||
|
return getJobActorSystem().actorOf(getSpringExtension().props(WORKFLOW_TASK_PREPARE_ACTOR)
|
||||||
|
.withDispatcher(JOB_TASK_DISPATCHER));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Job任务执行阶段actor
|
* Job任务执行阶段actor
|
||||||
*
|
*
|
||||||
@ -208,6 +219,19 @@ public class ActorGenerator {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Job任务执行阶段actor
|
||||||
|
*
|
||||||
|
* @return actor 引用
|
||||||
|
*/
|
||||||
|
public static ActorRef workflowTaskExecutorActor() {
|
||||||
|
return getJobActorSystem()
|
||||||
|
.actorOf(getSpringExtension()
|
||||||
|
.props(WORKFLOW_EXECUTOR_ACTOR)
|
||||||
|
.withDispatcher(JOB_TASK_EXECUTOR_DISPATCHER)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Job任务执行结果actor
|
* Job任务执行结果actor
|
||||||
*
|
*
|
||||||
|
@ -15,7 +15,9 @@ import lombok.Getter;
|
|||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
public enum JobTriggerTypeEnum {
|
public enum JobTriggerTypeEnum {
|
||||||
AUTO(1, "自动触发"),
|
AUTO(1, "自动触发"),
|
||||||
MANUAL(2, "手动触发");
|
MANUAL(2, "手动触发"),
|
||||||
|
WORKFLOW(2, "DAG触发"),
|
||||||
|
;
|
||||||
|
|
||||||
private final Integer type;
|
private final Integer type;
|
||||||
private final String desc;
|
private final String desc;
|
||||||
|
@ -0,0 +1,41 @@
|
|||||||
|
package com.aizuda.easy.retry.server.common.util;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.graph.GraphBuilder;
|
||||||
|
import com.google.common.graph.MutableGraph;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author: xiaowoniu
|
||||||
|
* @date : 2023-12-22
|
||||||
|
* @since : 2.6.0
|
||||||
|
*/
|
||||||
|
public class GraphUtils {
|
||||||
|
|
||||||
|
|
||||||
|
// 从JSON反序列化为Guava图
|
||||||
|
public static <T> MutableGraph<T> deserializeJsonToGraph(String jsonGraph) throws IOException {
|
||||||
|
ObjectMapper objectMapper = new ObjectMapper();
|
||||||
|
|
||||||
|
// 将JSON字符串转换为Map<String, Iterable<String>>
|
||||||
|
Map<T, Iterable<T>> adjacencyList = objectMapper.readValue(
|
||||||
|
jsonGraph, new TypeReference<Map<T, Iterable<T>>>() {});
|
||||||
|
|
||||||
|
// 创建Guava图并添加节点和边
|
||||||
|
MutableGraph<T> graph = GraphBuilder.directed().build();
|
||||||
|
for (Map.Entry<T, Iterable<T>> entry : adjacencyList.entrySet()) {
|
||||||
|
T node = entry.getKey();
|
||||||
|
Iterable<T> successors = entry.getValue();
|
||||||
|
|
||||||
|
graph.addNode(node);
|
||||||
|
for (T successor : successors) {
|
||||||
|
graph.putEdge(node, successor);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return graph;
|
||||||
|
}
|
||||||
|
}
|
@ -1,6 +1,5 @@
|
|||||||
package com.aizuda.easy.retry.server.job.task.dto;
|
package com.aizuda.easy.retry.server.job.task.dto;
|
||||||
|
|
||||||
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -15,6 +14,13 @@ public class JobExecutorResultDTO {
|
|||||||
|
|
||||||
private Long taskBatchId;
|
private Long taskBatchId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 工作流任务批次id
|
||||||
|
*/
|
||||||
|
private Long workflowTaskBatchId;
|
||||||
|
|
||||||
|
private Long workflowNodeId;
|
||||||
|
|
||||||
private Long taskId;
|
private Long taskId;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -57,4 +57,15 @@ public class JobTaskPrepareDTO {
|
|||||||
*/
|
*/
|
||||||
private Integer triggerType;
|
private Integer triggerType;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 工作流任务批次id
|
||||||
|
*/
|
||||||
|
private Long workflowTaskBatchId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 工作流节点id
|
||||||
|
*/
|
||||||
|
private Long workflowNodeId;
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,29 @@
|
|||||||
|
package com.aizuda.easy.retry.server.job.task.dto;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author: xiaowoniu
|
||||||
|
* @date : 2023-12-22
|
||||||
|
* @since : 2.6.0
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class WorkflowNodeTaskExecuteDTO {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 工作流id
|
||||||
|
*/
|
||||||
|
private Long workflowId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 工作流任务批次id
|
||||||
|
*/
|
||||||
|
private Long workflowTaskBatchId;
|
||||||
|
/**
|
||||||
|
* 触发类似 1、auto 2、manual
|
||||||
|
*/
|
||||||
|
private Integer triggerType;
|
||||||
|
|
||||||
|
private Long parentId;
|
||||||
|
|
||||||
|
}
|
@ -17,4 +17,39 @@ public class WorkflowTaskPrepareDTO {
|
|||||||
*/
|
*/
|
||||||
private Integer triggerType;
|
private Integer triggerType;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 工作流名称
|
||||||
|
*/
|
||||||
|
private String workflowName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 命名空间id
|
||||||
|
*/
|
||||||
|
private String namespaceId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 组名称
|
||||||
|
*/
|
||||||
|
private String groupName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 触发间隔
|
||||||
|
*/
|
||||||
|
private String triggerInterval;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 执行超时时间
|
||||||
|
*/
|
||||||
|
private Integer executorTimeout;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 工作流状态 0、关闭、1、开启
|
||||||
|
*/
|
||||||
|
private Integer workflowStatus;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 流程信息
|
||||||
|
*/
|
||||||
|
private String flowInfo;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,21 @@
|
|||||||
|
package com.aizuda.easy.retry.server.job.task.dto;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author www.byteblogs.com
|
||||||
|
* @date 2023-12-22
|
||||||
|
* @since 2.6.0
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class WorkflowTimerTaskDTO {
|
||||||
|
|
||||||
|
private Long workflowTaskBatchId;
|
||||||
|
|
||||||
|
private Long workflowId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 触发类似 1、auto 2、manual
|
||||||
|
*/
|
||||||
|
private Integer triggerType;
|
||||||
|
}
|
@ -0,0 +1,15 @@
|
|||||||
|
package com.aizuda.easy.retry.server.job.task.support;
|
||||||
|
|
||||||
|
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author www.byteblogs.com
|
||||||
|
* @date 2023-10-22 09:34:00
|
||||||
|
* @since 2.6.0
|
||||||
|
*/
|
||||||
|
public interface WorkflowPrePareHandler {
|
||||||
|
|
||||||
|
boolean matches(Integer status);
|
||||||
|
|
||||||
|
void handler(WorkflowTaskPrepareDTO workflowTaskPrepareDTO);
|
||||||
|
}
|
@ -1,10 +1,13 @@
|
|||||||
package com.aizuda.easy.retry.server.job.task.support;
|
package com.aizuda.easy.retry.server.job.task.support;
|
||||||
|
|
||||||
import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTaskDTO;
|
|
||||||
import com.aizuda.easy.retry.server.job.task.dto.WorkflowPartitionTaskDTO;
|
import com.aizuda.easy.retry.server.job.task.dto.WorkflowPartitionTaskDTO;
|
||||||
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
|
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
|
||||||
import org.mapstruct.Mapper;
|
import org.mapstruct.Mapper;
|
||||||
|
import org.mapstruct.Mapping;
|
||||||
|
import org.mapstruct.Mappings;
|
||||||
import org.mapstruct.factory.Mappers;
|
import org.mapstruct.factory.Mappers;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -20,5 +23,12 @@ public interface WorkflowTaskConverter {
|
|||||||
|
|
||||||
List<WorkflowPartitionTaskDTO> toWorkflowPartitionTaskList(List<Workflow> workflowList);
|
List<WorkflowPartitionTaskDTO> toWorkflowPartitionTaskList(List<Workflow> workflowList);
|
||||||
|
|
||||||
|
@Mappings(
|
||||||
|
@Mapping(source = "id", target = "workflowId")
|
||||||
|
)
|
||||||
WorkflowTaskPrepareDTO toWorkflowTaskPrepareDTO(WorkflowPartitionTaskDTO workflowPartitionTaskDTO);
|
WorkflowTaskPrepareDTO toWorkflowTaskPrepareDTO(WorkflowPartitionTaskDTO workflowPartitionTaskDTO);
|
||||||
|
|
||||||
|
WorkflowTaskBatchGeneratorContext toWorkflowTaskBatchGeneratorContext(WorkflowTaskPrepareDTO workflowTaskPrepareDTO);
|
||||||
|
|
||||||
|
WorkflowTaskBatch toWorkflowTaskBatch(WorkflowTaskBatchGeneratorContext context);
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,8 @@ public class ClientCallbackContext {
|
|||||||
|
|
||||||
private Long taskBatchId;
|
private Long taskBatchId;
|
||||||
|
|
||||||
|
private Long workflowBatchId;
|
||||||
|
|
||||||
private Long taskId;
|
private Long taskId;
|
||||||
|
|
||||||
private String groupName;
|
private String groupName;
|
||||||
|
@ -87,7 +87,7 @@ public class JobExecutorActor extends AbstractActor {
|
|||||||
|
|
||||||
private void doExecute(final TaskExecuteDTO taskExecute) {
|
private void doExecute(final TaskExecuteDTO taskExecute) {
|
||||||
|
|
||||||
LambdaQueryWrapper<Job> queryWrapper = new LambdaQueryWrapper<Job>();
|
LambdaQueryWrapper<Job> queryWrapper = new LambdaQueryWrapper<>();
|
||||||
// 自动的校验任务必须是开启状态,手动触发无需校验
|
// 自动的校验任务必须是开启状态,手动触发无需校验
|
||||||
if (JobTriggerTypeEnum.AUTO.getType().equals(taskExecute.getTriggerType())) {
|
if (JobTriggerTypeEnum.AUTO.getType().equals(taskExecute.getTriggerType())) {
|
||||||
queryWrapper.eq(Job::getJobStatus, StatusEnum.YES.getStatus());
|
queryWrapper.eq(Job::getJobStatus, StatusEnum.YES.getStatus());
|
||||||
@ -150,13 +150,15 @@ public class JobExecutorActor extends AbstractActor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void doHandlerResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) {
|
private void doHandlerResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) {
|
||||||
if (Objects.isNull(job) || JobTriggerTypeEnum.MANUAL.getType().equals(taskExecuteDTO.getTriggerType())) {
|
if (Objects.isNull(job)
|
||||||
|
|| JobTriggerTypeEnum.MANUAL.getType().equals(taskExecuteDTO.getTriggerType())
|
||||||
|
|| JobTriggerTypeEnum.WORKFLOW.getType().equals(taskExecuteDTO.getTriggerType())
|
||||||
|
// 是否是常驻任务
|
||||||
|
|| Objects.equals(StatusEnum.NO.getStatus(), job.getResident())
|
||||||
|
) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 是否是常驻任务
|
|
||||||
if (Objects.equals(StatusEnum.YES.getStatus(), job.getResident())) {
|
|
||||||
|
|
||||||
JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO();
|
JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO();
|
||||||
jobTimerTaskDTO.setJobId(taskExecuteDTO.getJobId());
|
jobTimerTaskDTO.setJobId(taskExecuteDTO.getJobId());
|
||||||
jobTimerTaskDTO.setTaskBatchId(taskExecuteDTO.getTaskBatchId());
|
jobTimerTaskDTO.setTaskBatchId(taskExecuteDTO.getTaskBatchId());
|
||||||
@ -184,4 +186,3 @@ public class JobExecutorActor extends AbstractActor {
|
|||||||
ResidentTaskCache.refresh(job.getId(), nextTriggerAt);
|
ResidentTaskCache.refresh(job.getId(), nextTriggerAt);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
@ -65,7 +65,7 @@ public class JobExecutorResultActor extends AbstractActor {
|
|||||||
()-> new EasyRetryServerException("更新任务实例失败"));
|
()-> new EasyRetryServerException("更新任务实例失败"));
|
||||||
|
|
||||||
// 更新批次上的状态
|
// 更新批次上的状态
|
||||||
boolean complete = jobTaskBatchHandler.complete(result.getTaskBatchId(), result.getJobOperationReason());
|
boolean complete = jobTaskBatchHandler.complete(result.getWorkflowNodeId(), result.getWorkflowTaskBatchId(), result.getTaskBatchId(), result.getJobOperationReason());
|
||||||
if (complete) {
|
if (complete) {
|
||||||
// 尝试停止任务
|
// 尝试停止任务
|
||||||
// 若是集群任务则客户端会主动关闭
|
// 若是集群任务则客户端会主动关闭
|
||||||
|
@ -66,6 +66,8 @@ public class JobTaskPrepareActor extends AbstractActor {
|
|||||||
for (JobTaskBatch jobTaskBatch : notCompleteJobTaskBatchList) {
|
for (JobTaskBatch jobTaskBatch : notCompleteJobTaskBatchList) {
|
||||||
prepare.setExecutionAt(jobTaskBatch.getExecutionAt());
|
prepare.setExecutionAt(jobTaskBatch.getExecutionAt());
|
||||||
prepare.setTaskBatchId(jobTaskBatch.getId());
|
prepare.setTaskBatchId(jobTaskBatch.getId());
|
||||||
|
prepare.setWorkflowTaskBatchId(jobTaskBatch.getWorkflowTaskBatchId());
|
||||||
|
prepare.setWorkflowNodeId(jobTaskBatch.getWorkflowNodeId());
|
||||||
prepare.setOnlyTimeoutCheck(onlyTimeoutCheck);
|
prepare.setOnlyTimeoutCheck(onlyTimeoutCheck);
|
||||||
for (JobPrePareHandler prePareHandler : prePareHandlers) {
|
for (JobPrePareHandler prePareHandler : prePareHandlers) {
|
||||||
if (prePareHandler.matches(jobTaskBatch.getTaskBatchStatus())) {
|
if (prePareHandler.matches(jobTaskBatch.getTaskBatchStatus())) {
|
||||||
|
@ -84,7 +84,7 @@ public class ScanWorkflowTaskActor extends AbstractActor {
|
|||||||
|
|
||||||
for (final WorkflowTaskPrepareDTO waitExecTask : waitExecWorkflows) {
|
for (final WorkflowTaskPrepareDTO waitExecTask : waitExecWorkflows) {
|
||||||
// 执行预处理阶段
|
// 执行预处理阶段
|
||||||
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
|
ActorRef actorRef = ActorGenerator.workflowTaskPrepareActor();
|
||||||
waitExecTask.setTriggerType(JobTriggerTypeEnum.AUTO.getType());
|
waitExecTask.setTriggerType(JobTriggerTypeEnum.AUTO.getType());
|
||||||
actorRef.tell(waitExecTask, actorRef);
|
actorRef.tell(waitExecTask, actorRef);
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,110 @@
|
|||||||
|
package com.aizuda.easy.retry.server.job.task.support.dispatch;
|
||||||
|
|
||||||
|
import akka.actor.AbstractActor;
|
||||||
|
import akka.actor.ActorRef;
|
||||||
|
import cn.hutool.core.lang.Assert;
|
||||||
|
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
|
||||||
|
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
|
||||||
|
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
|
||||||
|
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||||
|
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||||
|
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
|
||||||
|
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
||||||
|
import com.aizuda.easy.retry.server.common.util.DateUtils;
|
||||||
|
import com.aizuda.easy.retry.server.common.util.GraphUtils;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.graph.GraphBuilder;
|
||||||
|
import com.google.common.graph.MutableGraph;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||||
|
import org.springframework.context.annotation.Scope;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author: xiaowoniu
|
||||||
|
* @date : 2023-12-22 10:34
|
||||||
|
* @since : 2.6.0
|
||||||
|
*/
|
||||||
|
@Component(ActorGenerator.WORKFLOW_EXECUTOR_ACTOR)
|
||||||
|
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||||||
|
@Slf4j
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class WorkflowExecutorActor extends AbstractActor {
|
||||||
|
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
|
||||||
|
private final WorkflowNodeMapper workflowNodeMapper;
|
||||||
|
private final JobMapper jobMapper;
|
||||||
|
private final JobTaskBatchMapper jobTaskBatchMapper;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Receive createReceive() {
|
||||||
|
return receiveBuilder().match(WorkflowNodeTaskExecuteDTO.class, taskExecute -> {
|
||||||
|
try {
|
||||||
|
doExecutor(taskExecute);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LogUtils.error(log, "workflow executor exception. [{}]", taskExecute, e);
|
||||||
|
handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(), JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason());
|
||||||
|
// TODO 发送通知
|
||||||
|
} finally {
|
||||||
|
getContext().stop(getSelf());
|
||||||
|
}
|
||||||
|
}).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doExecutor(WorkflowNodeTaskExecuteDTO taskExecute) throws IOException {
|
||||||
|
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectById(taskExecute.getWorkflowTaskBatchId());
|
||||||
|
Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("任务不存在"));
|
||||||
|
|
||||||
|
// 获取DAG图
|
||||||
|
String flowInfo = workflowTaskBatch.getFlowInfo();
|
||||||
|
MutableGraph<Long> graph = GraphUtils.deserializeJsonToGraph(flowInfo);
|
||||||
|
|
||||||
|
Set<Long> predecessors = graph.predecessors(taskExecute.getParentId());
|
||||||
|
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectBatchIds(predecessors);
|
||||||
|
Set<Long> jobIdSet = workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet());
|
||||||
|
List<Job> jobs = jobMapper.selectBatchIds(jobIdSet);
|
||||||
|
for (Job job : jobs) {
|
||||||
|
// 生成任务批次
|
||||||
|
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job);
|
||||||
|
jobTaskPrepare.setTriggerType(JobTriggerTypeEnum.WORKFLOW.getType());
|
||||||
|
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli());
|
||||||
|
// 执行预处理阶段
|
||||||
|
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
|
||||||
|
actorRef.tell(jobTaskPrepare, actorRef);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handlerTaskBatch(WorkflowNodeTaskExecuteDTO taskExecute, int taskStatus, int operationReason) {
|
||||||
|
|
||||||
|
WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch();
|
||||||
|
jobTaskBatch.setId(taskExecute.getWorkflowTaskBatchId());
|
||||||
|
jobTaskBatch.setExecutionAt(DateUtils.toNowMilli());
|
||||||
|
jobTaskBatch.setTaskBatchStatus(taskStatus);
|
||||||
|
jobTaskBatch.setOperationReason(operationReason);
|
||||||
|
Assert.isTrue(1 == workflowTaskBatchMapper.updateById(jobTaskBatch),
|
||||||
|
() -> new EasyRetryServerException("更新任务失败"));
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
@ -1,22 +1,23 @@
|
|||||||
package com.aizuda.easy.retry.server.job.task.support.event;
|
package com.aizuda.easy.retry.server.job.task.support.event;
|
||||||
|
|
||||||
|
import lombok.Getter;
|
||||||
import org.springframework.context.ApplicationEvent;
|
import org.springframework.context.ApplicationEvent;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* job任务失败事件
|
* job任务失败事件
|
||||||
|
*
|
||||||
* @author: zuoJunLin
|
* @author: zuoJunLin
|
||||||
* @date : 2023-12-02 21:40
|
* @date : 2023-12-02 21:40
|
||||||
* @since 2.5.0
|
* @since 2.5.0
|
||||||
*/
|
*/
|
||||||
|
@Getter
|
||||||
public class JobTaskFailAlarmEvent extends ApplicationEvent {
|
public class JobTaskFailAlarmEvent extends ApplicationEvent {
|
||||||
private Long jobTaskBatchId;
|
|
||||||
|
private final Long jobTaskBatchId;
|
||||||
|
|
||||||
public JobTaskFailAlarmEvent(Long jobTaskBatchId) {
|
public JobTaskFailAlarmEvent(Long jobTaskBatchId) {
|
||||||
super(jobTaskBatchId);
|
super(jobTaskBatchId);
|
||||||
this.jobTaskBatchId = jobTaskBatchId;
|
this.jobTaskBatchId = jobTaskBatchId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Long getJobTaskBatchId() {
|
|
||||||
return jobTaskBatchId;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -42,6 +42,7 @@ public class JobTaskBatchGenerator {
|
|||||||
jobTaskBatch.setGroupName(context.getGroupName());
|
jobTaskBatch.setGroupName(context.getGroupName());
|
||||||
jobTaskBatch.setCreateDt(LocalDateTime.now());
|
jobTaskBatch.setCreateDt(LocalDateTime.now());
|
||||||
jobTaskBatch.setNamespaceId(context.getNamespaceId());
|
jobTaskBatch.setNamespaceId(context.getNamespaceId());
|
||||||
|
jobTaskBatch.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
|
||||||
// 无执行的节点
|
// 无执行的节点
|
||||||
if (CollectionUtils.isEmpty(CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()))) {
|
if (CollectionUtils.isEmpty(CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()))) {
|
||||||
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
|
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
|
||||||
|
@ -45,5 +45,10 @@ public class JobTaskBatchGeneratorContext {
|
|||||||
*/
|
*/
|
||||||
private Integer triggerType;
|
private Integer triggerType;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 工作流任务批次id
|
||||||
|
*/
|
||||||
|
private Long workflowTaskBatchId;
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,48 @@
|
|||||||
|
package com.aizuda.easy.retry.server.job.task.support.generator.batch;
|
||||||
|
|
||||||
|
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
|
||||||
|
import com.aizuda.easy.retry.server.common.util.DateUtils;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTimerTaskDTO;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerTask;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerWheel;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.timer.WorkflowTimerTask;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author: xiaowoniu
|
||||||
|
* @date : 2023-12-22 09:04
|
||||||
|
* @since : 2.6.0
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
@Slf4j
|
||||||
|
public class WorkflowBatchGenerator {
|
||||||
|
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
|
||||||
|
@Transactional
|
||||||
|
public void generateJobTaskBatch(WorkflowTaskBatchGeneratorContext context) {
|
||||||
|
|
||||||
|
// 生成任务批次
|
||||||
|
WorkflowTaskBatch workflowTaskBatch = WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatch(context);
|
||||||
|
workflowTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.WAITING.getStatus());
|
||||||
|
workflowTaskBatchMapper.insert(workflowTaskBatch);
|
||||||
|
|
||||||
|
// 开始执行工作流
|
||||||
|
// 进入时间轮
|
||||||
|
long delay = context.getNextTriggerAt() - DateUtils.toNowMilli();
|
||||||
|
WorkflowTimerTaskDTO workflowTimerTaskDTO = new WorkflowTimerTaskDTO();
|
||||||
|
workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskBatch.getId());
|
||||||
|
workflowTimerTaskDTO.setWorkflowId(context.getWorkflowId());
|
||||||
|
workflowTimerTaskDTO.setTriggerType(context.getTriggerType());
|
||||||
|
JobTimerWheel.register(workflowTaskBatch.getId(),
|
||||||
|
new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,51 @@
|
|||||||
|
package com.aizuda.easy.retry.server.job.task.support.generator.batch;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author www.byteblogs.com
|
||||||
|
* @date 2023-10-02 13:12:48
|
||||||
|
* @since 2.4.0
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class WorkflowTaskBatchGeneratorContext {
|
||||||
|
|
||||||
|
private String namespaceId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 组名称
|
||||||
|
*/
|
||||||
|
private String groupName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 工作流id
|
||||||
|
*/
|
||||||
|
private Long workflowId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 下次触发时间
|
||||||
|
*/
|
||||||
|
private Long nextTriggerAt;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 操作原因
|
||||||
|
*/
|
||||||
|
private Integer operationReason;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 任务批次状态
|
||||||
|
*/
|
||||||
|
private Integer taskBatchStatus;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 触发类似 1、auto 2、manual
|
||||||
|
*/
|
||||||
|
private Integer triggerType;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 流程信息
|
||||||
|
*/
|
||||||
|
private String flowInfo;
|
||||||
|
|
||||||
|
|
||||||
|
}
|
@ -1,9 +1,14 @@
|
|||||||
package com.aizuda.easy.retry.server.job.task.support.handler;
|
package com.aizuda.easy.retry.server.job.task.support.handler;
|
||||||
|
|
||||||
|
import akka.actor.ActorRef;
|
||||||
|
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
|
||||||
import com.aizuda.easy.retry.common.core.context.SpringContext;
|
import com.aizuda.easy.retry.common.core.context.SpringContext;
|
||||||
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
|
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
|
||||||
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
|
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
|
||||||
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
|
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
|
||||||
|
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||||
|
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
|
||||||
import com.aizuda.easy.retry.server.job.task.support.event.JobTaskFailAlarmEvent;
|
import com.aizuda.easy.retry.server.job.task.support.event.JobTaskFailAlarmEvent;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
|
||||||
@ -11,6 +16,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
|
|||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
@ -25,6 +31,7 @@ import java.util.stream.Collectors;
|
|||||||
* @date : 2023-10-10 16:50
|
* @date : 2023-10-10 16:50
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
|
@Slf4j
|
||||||
public class JobTaskBatchHandler {
|
public class JobTaskBatchHandler {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
@ -32,7 +39,16 @@ public class JobTaskBatchHandler {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private JobTaskBatchMapper jobTaskBatchMapper;
|
private JobTaskBatchMapper jobTaskBatchMapper;
|
||||||
|
|
||||||
public boolean complete(Long taskBatchId, Integer jobOperationReason) {
|
/**
|
||||||
|
* TODO 参数待优化
|
||||||
|
*
|
||||||
|
* @param workflowNodeId
|
||||||
|
* @param workflowTaskBatchId
|
||||||
|
* @param taskBatchId
|
||||||
|
* @param jobOperationReason
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public boolean complete(Long workflowNodeId, Long workflowTaskBatchId, Long taskBatchId, Integer jobOperationReason) {
|
||||||
|
|
||||||
List<JobTask> jobTasks = jobTaskMapper.selectList(
|
List<JobTask> jobTasks = jobTaskMapper.selectList(
|
||||||
new LambdaQueryWrapper<JobTask>().select(JobTask::getTaskStatus)
|
new LambdaQueryWrapper<JobTask>().select(JobTask::getTaskStatus)
|
||||||
@ -76,6 +92,20 @@ public class JobTaskBatchHandler {
|
|||||||
jobTaskBatch.setOperationReason(jobOperationReason);
|
jobTaskBatch.setOperationReason(jobOperationReason);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (Objects.nonNull(workflowNodeId) && Objects.nonNull(workflowTaskBatchId)) {
|
||||||
|
// 若是工作流则开启下一个任务
|
||||||
|
try {
|
||||||
|
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
|
||||||
|
taskExecuteDTO.setWorkflowTaskBatchId(workflowTaskBatchId);
|
||||||
|
taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType());
|
||||||
|
taskExecuteDTO.setParentId(workflowNodeId);
|
||||||
|
ActorRef actorRef = ActorGenerator.jobTaskExecutorActor();
|
||||||
|
actorRef.tell(taskExecuteDTO, actorRef);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("任务调度执行失败", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return 1 == jobTaskBatchMapper.update(jobTaskBatch,
|
return 1 == jobTaskBatchMapper.update(jobTaskBatch,
|
||||||
new LambdaUpdateWrapper<JobTaskBatch>()
|
new LambdaUpdateWrapper<JobTaskBatch>()
|
||||||
.eq(JobTaskBatch::getId, taskBatchId)
|
.eq(JobTaskBatch::getId, taskBatchId)
|
||||||
|
@ -43,7 +43,7 @@ public class RunningJobPrepareHandler extends AbstractJobPrePareHandler {
|
|||||||
// 若存在所有的任务都是完成,但是批次上的状态为运行中,则是并发导致的未把批次状态变成为终态,此处做一次兜底处理
|
// 若存在所有的任务都是完成,但是批次上的状态为运行中,则是并发导致的未把批次状态变成为终态,此处做一次兜底处理
|
||||||
int blockStrategy = prepare.getBlockStrategy();
|
int blockStrategy = prepare.getBlockStrategy();
|
||||||
JobOperationReasonEnum jobOperationReasonEnum = JobOperationReasonEnum.NONE;
|
JobOperationReasonEnum jobOperationReasonEnum = JobOperationReasonEnum.NONE;
|
||||||
if (jobTaskBatchHandler.complete(prepare.getTaskBatchId(), jobOperationReasonEnum.getReason())) {
|
if (jobTaskBatchHandler.complete(prepare.getWorkflowNodeId(), prepare.getWorkflowTaskBatchId(), prepare.getTaskBatchId(), jobOperationReasonEnum.getReason())) {
|
||||||
blockStrategy = BlockStrategyEnum.CONCURRENCY.getBlockStrategy();
|
blockStrategy = BlockStrategyEnum.CONCURRENCY.getBlockStrategy();
|
||||||
} else {
|
} else {
|
||||||
// 计算超时时间
|
// 计算超时时间
|
||||||
|
@ -0,0 +1,21 @@
|
|||||||
|
package com.aizuda.easy.retry.server.job.task.support.prepare.workflow;
|
||||||
|
|
||||||
|
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.WorkflowPrePareHandler;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author: xiaowoniu
|
||||||
|
* @date : 2023-12-22 08:57
|
||||||
|
* @since : 2.6.0
|
||||||
|
*/
|
||||||
|
public abstract class AbstractWorkflowPrePareHandler implements WorkflowPrePareHandler {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handler(WorkflowTaskPrepareDTO workflowTaskPrepareDTO) {
|
||||||
|
|
||||||
|
doHandler(workflowTaskPrepareDTO);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract void doHandler(WorkflowTaskPrepareDTO jobPrepareDTO);
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,32 @@
|
|||||||
|
package com.aizuda.easy.retry.server.job.task.support.prepare.workflow;
|
||||||
|
|
||||||
|
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowBatchGenerator;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author: xiaowoniu
|
||||||
|
* @date : 2023-12-22 08:59
|
||||||
|
* @since : 2.6.0
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
@Slf4j
|
||||||
|
public class TerminalWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler {
|
||||||
|
private final WorkflowBatchGenerator workflowBatchGenerator;
|
||||||
|
@Override
|
||||||
|
public boolean matches(final Integer status) {
|
||||||
|
return Objects.isNull(status);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doHandler(final WorkflowTaskPrepareDTO jobPrepareDTO) {
|
||||||
|
log.info("无处理中的数据. workflowId:[{}]", jobPrepareDTO.getWorkflowId());
|
||||||
|
workflowBatchGenerator.generateJobTaskBatch(WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatchGeneratorContext(jobPrepareDTO));
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,72 @@
|
|||||||
|
package com.aizuda.easy.retry.server.job.task.support.timer;
|
||||||
|
|
||||||
|
import akka.actor.ActorRef;
|
||||||
|
import cn.hutool.core.lang.Assert;
|
||||||
|
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
|
||||||
|
import com.aizuda.easy.retry.common.core.context.SpringContext;
|
||||||
|
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
|
||||||
|
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
|
||||||
|
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||||
|
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
||||||
|
import com.aizuda.easy.retry.server.common.util.DateUtils;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTimerTaskDTO;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
|
||||||
|
import io.netty.util.Timeout;
|
||||||
|
import io.netty.util.TimerTask;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author: xiaowoniu
|
||||||
|
* @date : 2023-09-25
|
||||||
|
* @since 2.6.0
|
||||||
|
*/
|
||||||
|
@AllArgsConstructor
|
||||||
|
@Slf4j
|
||||||
|
public class WorkflowTimerTask implements TimerTask {
|
||||||
|
|
||||||
|
private WorkflowTimerTaskDTO workflowTimerTaskDTO;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run(final Timeout timeout) throws Exception {
|
||||||
|
// 执行任务调度
|
||||||
|
log.info("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), workflowTimerTaskDTO.getWorkflowTaskBatchId());
|
||||||
|
|
||||||
|
try {
|
||||||
|
|
||||||
|
int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus();
|
||||||
|
int operationReason = JobOperationReasonEnum.NONE.getReason();
|
||||||
|
handlerTaskBatch(workflowTimerTaskDTO.getWorkflowTaskBatchId(), taskStatus, operationReason);
|
||||||
|
|
||||||
|
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
|
||||||
|
taskExecuteDTO.setWorkflowTaskBatchId(workflowTimerTaskDTO.getWorkflowTaskBatchId());
|
||||||
|
taskExecuteDTO.setWorkflowId(workflowTimerTaskDTO.getWorkflowId());
|
||||||
|
taskExecuteDTO.setTriggerType(workflowTimerTaskDTO.getTriggerType());
|
||||||
|
taskExecuteDTO.setParentId(SystemConstants.ROOT);
|
||||||
|
ActorRef actorRef = ActorGenerator.jobTaskExecutorActor();
|
||||||
|
actorRef.tell(taskExecuteDTO, actorRef);
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("任务调度执行失败", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handlerTaskBatch(Long workflowTaskBatchId, int taskStatus, int operationReason) {
|
||||||
|
|
||||||
|
WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch();
|
||||||
|
jobTaskBatch.setId(workflowTaskBatchId);
|
||||||
|
jobTaskBatch.setExecutionAt(DateUtils.toNowMilli());
|
||||||
|
jobTaskBatch.setTaskBatchStatus(taskStatus);
|
||||||
|
jobTaskBatch.setOperationReason(operationReason);
|
||||||
|
Assert.isTrue(1 == SpringContext.getBeanByType(WorkflowTaskBatchMapper.class).updateById(jobTaskBatch),
|
||||||
|
() -> new EasyRetryServerException("更新任务失败"));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -78,7 +78,7 @@ public class ConsumerBucketActor extends AbstractActor {
|
|||||||
|
|
||||||
// 扫描DAG工作流任务数据
|
// 扫描DAG工作流任务数据
|
||||||
ActorRef scanWorkflowActorRef = cacheActorRef(DEFAULT_WORKFLOW_KEY, TaskTypeEnum.WORKFLOW);
|
ActorRef scanWorkflowActorRef = cacheActorRef(DEFAULT_WORKFLOW_KEY, TaskTypeEnum.WORKFLOW);
|
||||||
scanJobActorRef.tell(scanTask, scanWorkflowActorRef);
|
scanWorkflowActorRef.tell(scanTask, scanWorkflowActorRef);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (SystemModeEnum.isRetry(systemProperties.getMode())) {
|
if (SystemModeEnum.isRetry(systemProperties.getMode())) {
|
||||||
|
@ -1,11 +1,18 @@
|
|||||||
package com.aizuda.easy.retry.server.web.service.impl;
|
package com.aizuda.easy.retry.server.web.service.impl;
|
||||||
|
|
||||||
import cn.hutool.core.lang.Assert;
|
import cn.hutool.core.lang.Assert;
|
||||||
|
import cn.hutool.core.util.HashUtil;
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
|
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
|
||||||
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
|
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
|
||||||
import com.aizuda.easy.retry.common.core.util.JsonUtil;
|
import com.aizuda.easy.retry.common.core.util.JsonUtil;
|
||||||
|
import com.aizuda.easy.retry.server.common.WaitStrategy;
|
||||||
|
import com.aizuda.easy.retry.server.common.config.SystemProperties;
|
||||||
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
||||||
|
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
|
||||||
|
import com.aizuda.easy.retry.server.common.util.DateUtils;
|
||||||
import com.aizuda.easy.retry.server.web.model.base.PageResult;
|
import com.aizuda.easy.retry.server.web.model.base.PageResult;
|
||||||
|
import com.aizuda.easy.retry.server.web.model.request.JobRequestVO;
|
||||||
import com.aizuda.easy.retry.server.web.model.request.UserSessionVO;
|
import com.aizuda.easy.retry.server.web.model.request.UserSessionVO;
|
||||||
import com.aizuda.easy.retry.server.web.model.request.WorkflowQueryVO;
|
import com.aizuda.easy.retry.server.web.model.request.WorkflowQueryVO;
|
||||||
import com.aizuda.easy.retry.server.web.model.request.WorkflowRequestVO;
|
import com.aizuda.easy.retry.server.web.model.request.WorkflowRequestVO;
|
||||||
@ -30,6 +37,7 @@ import com.google.common.graph.GraphBuilder;
|
|||||||
import com.google.common.graph.MutableGraph;
|
import com.google.common.graph.MutableGraph;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
@ -49,7 +57,7 @@ import java.util.stream.Collectors;
|
|||||||
public class WorkflowServiceImpl implements WorkflowService {
|
public class WorkflowServiceImpl implements WorkflowService {
|
||||||
private final WorkflowMapper workflowMapper;
|
private final WorkflowMapper workflowMapper;
|
||||||
private final WorkflowNodeMapper workflowNodeMapper;
|
private final WorkflowNodeMapper workflowNodeMapper;
|
||||||
private final static long root = -1;
|
private final SystemProperties systemProperties;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Transactional
|
@Transactional
|
||||||
@ -57,20 +65,22 @@ public class WorkflowServiceImpl implements WorkflowService {
|
|||||||
|
|
||||||
MutableGraph<Long> graph = GraphBuilder.directed().allowsSelfLoops(false).build();
|
MutableGraph<Long> graph = GraphBuilder.directed().allowsSelfLoops(false).build();
|
||||||
// 添加虚拟头节点
|
// 添加虚拟头节点
|
||||||
graph.addNode(root);
|
graph.addNode(SystemConstants.ROOT);
|
||||||
|
|
||||||
// 组装工作流信息
|
// 组装工作流信息
|
||||||
Workflow workflow = WorkflowConverter.INSTANCE.toWorkflow(workflowRequestVO);
|
Workflow workflow = WorkflowConverter.INSTANCE.toWorkflow(workflowRequestVO);
|
||||||
// TODO 临时设置值
|
workflow.setNextTriggerAt(calculateNextTriggerAt(workflowRequestVO, DateUtils.toNowMilli()));
|
||||||
workflow.setNextTriggerAt(1L);
|
|
||||||
workflow.setFlowInfo(StrUtil.EMPTY);
|
workflow.setFlowInfo(StrUtil.EMPTY);
|
||||||
|
workflow.setBucketIndex(HashUtil.bkdrHash(workflowRequestVO.getGroupName() + workflowRequestVO.getWorkflowName())
|
||||||
|
% systemProperties.getBucketTotal());
|
||||||
|
workflow.setNamespaceId(UserSessionUtils.currentUserSession().getNamespaceId());
|
||||||
Assert.isTrue(1 == workflowMapper.insert(workflow), () -> new EasyRetryServerException("新增工作流失败"));
|
Assert.isTrue(1 == workflowMapper.insert(workflow), () -> new EasyRetryServerException("新增工作流失败"));
|
||||||
|
|
||||||
// 获取DAG节点配置
|
// 获取DAG节点配置
|
||||||
NodeConfig nodeConfig = workflowRequestVO.getNodeConfig();
|
NodeConfig nodeConfig = workflowRequestVO.getNodeConfig();
|
||||||
|
|
||||||
// 递归构建图
|
// 递归构建图
|
||||||
buildGraph(Lists.newArrayList(root), workflowRequestVO.getGroupName(), workflow.getId(), nodeConfig, graph);
|
buildGraph(Lists.newArrayList(SystemConstants.ROOT), workflowRequestVO.getGroupName(), workflow.getId(), nodeConfig, graph);
|
||||||
|
|
||||||
log.info("图构建完成. graph:[{}]", graph);
|
log.info("图构建完成. graph:[{}]", graph);
|
||||||
// 保存图信息
|
// 保存图信息
|
||||||
@ -80,6 +90,13 @@ public class WorkflowServiceImpl implements WorkflowService {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Long calculateNextTriggerAt(final WorkflowRequestVO workflowRequestVO, Long time) {
|
||||||
|
WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(workflowRequestVO.getTriggerType());
|
||||||
|
WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
|
||||||
|
waitStrategyContext.setTriggerInterval(workflowRequestVO.getTriggerInterval());
|
||||||
|
waitStrategyContext.setNextTriggerAt(time);
|
||||||
|
return waitStrategy.computeTriggerTime(waitStrategyContext);
|
||||||
|
}
|
||||||
@Override
|
@Override
|
||||||
public WorkflowDetailResponseVO getWorkflowDetail(Long id) throws IOException {
|
public WorkflowDetailResponseVO getWorkflowDetail(Long id) throws IOException {
|
||||||
|
|
||||||
@ -101,7 +118,7 @@ public class WorkflowServiceImpl implements WorkflowService {
|
|||||||
try {
|
try {
|
||||||
MutableGraph<Long> graph = deserializeJsonToGraph(flowInfo);
|
MutableGraph<Long> graph = deserializeJsonToGraph(flowInfo);
|
||||||
// 反序列化构建图
|
// 反序列化构建图
|
||||||
WorkflowDetailResponseVO.NodeConfig config = buildNodeConfig(graph, root, new HashMap<>(), workflowNodeMap);
|
WorkflowDetailResponseVO.NodeConfig config = buildNodeConfig(graph, SystemConstants.ROOT, new HashMap<>(), workflowNodeMap);
|
||||||
responseVO.setNodeConfig(config);
|
responseVO.setNodeConfig(config);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("反序列化失败. json:[{}]", flowInfo, e);
|
log.error("反序列化失败. json:[{}]", flowInfo, e);
|
||||||
@ -138,13 +155,13 @@ public class WorkflowServiceImpl implements WorkflowService {
|
|||||||
|
|
||||||
MutableGraph<Long> graph = GraphBuilder.directed().allowsSelfLoops(false).build();
|
MutableGraph<Long> graph = GraphBuilder.directed().allowsSelfLoops(false).build();
|
||||||
// 添加虚拟头节点
|
// 添加虚拟头节点
|
||||||
graph.addNode(root);
|
graph.addNode(SystemConstants.ROOT);
|
||||||
|
|
||||||
// 获取DAG节点配置
|
// 获取DAG节点配置
|
||||||
NodeConfig nodeConfig = workflowRequestVO.getNodeConfig();
|
NodeConfig nodeConfig = workflowRequestVO.getNodeConfig();
|
||||||
|
|
||||||
// 递归构建图
|
// 递归构建图
|
||||||
buildGraph(Lists.newArrayList(root), workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph);
|
buildGraph(Lists.newArrayList(SystemConstants.ROOT), workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph);
|
||||||
|
|
||||||
log.info("图构建完成. graph:[{}]", graph);
|
log.info("图构建完成. graph:[{}]", graph);
|
||||||
|
|
||||||
@ -215,7 +232,7 @@ public class WorkflowServiceImpl implements WorkflowService {
|
|||||||
buildNodeConfig(graph, successor, nodeConfigMap, workflowNodeMap);
|
buildNodeConfig(graph, successor, nodeConfigMap, workflowNodeMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (parentId != root && mount) {
|
if (parentId != SystemConstants.ROOT && mount) {
|
||||||
previousNodeInfo.setChildNode(currentConfig);
|
previousNodeInfo.setChildNode(currentConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user