工作流中调用工作流节点

This commit is contained in:
csc 2025-06-04 22:57:01 +08:00
parent 24d92743e9
commit 9efe1597d3
27 changed files with 235 additions and 4 deletions

View File

@ -225,6 +225,11 @@ public interface SystemConstants {
*/
Long CALLBACK_JOB_ID = -2000L;
/**
* 系统内置的工作流任务ID
*/
Long WORKFLOW_JOB_ID = -3000L;
/**
* 客户端返回的非json对象单值比如 "aa", 123等
*/

View File

@ -18,20 +18,68 @@ import java.util.List;
@Getter
public enum JobOperationReasonEnum {
/*
* 定义任务执行状态枚举
*/
/**
* 任务执行状态
*/
NONE(0, StrUtil.EMPTY),
/**
* 任务执行状态任务执行超时
*/
TASK_EXECUTION_TIMEOUT(1, "Task execution timeout"),
/**
* 任务执行状态没有客户端节点
*/
NOT_CLIENT(2, "No client nodes"),
/**
* 任务执行状态任务已关闭
*/
JOB_CLOSED(3, "JOB closed"),
/**
* 任务执行状态任务被丢弃
*/
JOB_DISCARD(4, "Task discarded"),
/**
* 任务执行状态任务被覆盖
*/
JOB_OVERLAY(5, "Task overridden"),
/**
* 任务执行状态没有可执行的任务项
*/
NOT_EXECUTION_TASK(6, "No executable task items"),
/**
* 任务执行状态任务执行过程中发生意外异常
*/
TASK_EXECUTION_ERROR(7, "Unexpected exception occurred during task execution"),
/**
* 任务执行状态手动停止
*/
MANNER_STOP(8, "Manual stop"),
/**
* 工作流条件节点执行异常
*/
WORKFLOW_CONDITION_NODE_EXECUTION_ERROR(9, "Condition node execution exception"),
/**
* 任务被中断
*/
JOB_TASK_INTERRUPTED(10, "Task interrupted"),
/**
* 工作流回调节点执行异常
*/
WORKFLOW_CALLBACK_NODE_EXECUTION_ERROR(11, "Callback node execution exception"),
/**
* 工作流节点无需操作
*/
WORKFLOW_NODE_NO_REQUIRED(12, "No action required"),
/**
* 工作流节点已关闭跳过执行
*/
WORKFLOW_NODE_CLOSED_SKIP_EXECUTION(13, "Node closed, skipped execution"),
/**
* 工作流决策未通过
*/
WORKFLOW_DECISION_FAILED(14, "Judgment not passed"),

View File

@ -50,6 +50,9 @@ public enum JobTaskBatchStatusEnum {
public static final List<Integer> NOT_COMPLETE = Arrays.asList(WAITING.status, RUNNING.status);
/**
* 任务完成 状态 包含 SUCCESS, FAIL, STOP, CANCEL
*/
public static final List<Integer> COMPLETED = Arrays.asList(SUCCESS.status, FAIL.status, STOP.status, CANCEL.status);
public static final List<Integer> NOT_SUCCESS = Arrays.asList(FAIL.status, STOP.status, CANCEL.status);

View File

@ -8,15 +8,37 @@ import lombok.Getter;
* @author opensnail
* @date 2023-10-02 10:39:22
* @since 2.4.0
*
* 任务类型
* 1. 集群任务 ClusterJobExecutor
* 2. 广播任务 BroadcastTaskJobExecutor
* 3. 静态分片任务 ShardingJobExecutor
* 4. Map 任务 MapJobExecutor
* 5. MapReduce 任务 MapReduceJobExecutor
*/
@AllArgsConstructor
@Getter
public enum JobTaskTypeEnum {
/**
* 集群任务
*/
CLUSTER(1),
/**
* 广播任务
*/
BROADCAST(2),
/**
* 静态分片任务
*/
SHARDING(3),
/**
* Map 任务
*/
MAP(4),
/**
* MapReduce 任务
*/
MAP_REDUCE(5),
;

View File

@ -4,7 +4,7 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 1任务节点 2条件节点 3回调节点
* 1任务节点 2条件节点 3回调节点 4工作流节点
*
* @author xiaowoniu
* @date 2023-12-24 08:13:43
@ -13,9 +13,21 @@ import lombok.Getter;
@AllArgsConstructor
@Getter
public enum WorkflowNodeTypeEnum {
/**
* 任务节点
*/
JOB_TASK(1, "JOB task"),
/**
* 条件节点
*/
DECISION(2, "Decision node"),
/**
* 回调节点
*/
CALLBACK(3, "Callback node"),
/**
* 工作流节点
*/
WORKFLOW(4, "Workflow node"),
;

View File

@ -7,6 +7,7 @@ import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.dto.CallbackConfig;
import com.aizuda.snailjob.server.common.dto.DecisionConfig;
import com.aizuda.snailjob.server.common.dto.JobTaskConfig;
import com.aizuda.snailjob.server.common.dto.WorkflowConfig;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.common.vo.WorkflowBatchResponseVO;
import com.aizuda.snailjob.server.common.vo.WorkflowResponseVO;
@ -54,7 +55,8 @@ public interface WorkflowConverter {
@Mappings({
@Mapping(target = "decision", expression = "java(WorkflowConverter.parseDecisionConfig(workflowNode))"),
@Mapping(target = "callback", expression = "java(WorkflowConverter.parseCallbackConfig(workflowNode))"),
@Mapping(target = "jobTask", expression = "java(WorkflowConverter.parseJobTaskConfig(workflowNode))")
@Mapping(target = "jobTask", expression = "java(WorkflowConverter.parseJobTaskConfig(workflowNode))"),
@Mapping(target = "subWorkflow", expression = "java(WorkflowConverter.parseWorkflowConfig(workflowNode))")
})
WorkflowDetailResponseVO.NodeInfo convert(WorkflowNode workflowNode);
@ -110,6 +112,13 @@ public interface WorkflowConverter {
return null;
}
static WorkflowConfig parseWorkflowConfig(WorkflowNode workflowNode) {
if (WorkflowNodeTypeEnum.WORKFLOW.getType() == workflowNode.getNodeType()){
return JsonUtil.parseObject(workflowNode.getNodeInfo(), WorkflowConfig.class);
}
return null;
}
static Set<Long> toNotifyIds(String notifyIds) {
if (StrUtil.isBlank(notifyIds)) {
return new HashSet<>();

View File

@ -0,0 +1,20 @@
package com.aizuda.snailjob.server.common.dto;
import lombok.Data;
/**
* 工作流配置
*/
@Data
public class WorkflowConfig {
/**
* ID
*/
private Long id;
/**
* 名称
*/
private String name;
}

View File

@ -8,6 +8,7 @@ import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.dto.CallbackConfig;
import com.aizuda.snailjob.server.common.dto.DecisionConfig;
import com.aizuda.snailjob.server.common.dto.JobTaskConfig;
import com.aizuda.snailjob.server.common.dto.WorkflowConfig;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.vo.request.WorkflowRequestVO;
import com.aizuda.snailjob.server.common.vo.WorkflowDetailResponseVO;
@ -176,7 +177,9 @@ public class WorkflowHandler {
workflowNode.setJobId(jobTask.getJobId());
}else if (WorkflowNodeTypeEnum.WORKFLOW.getType() == nodeConfig.getNodeType()) {
// TODO 工作流结点
throw new SnailJobServerException("Unsupported node type [{}]", nodeConfig.getNodeType());
WorkflowConfig workflow = nodeInfo.getSubWorkflow();
workflowNode.setJobId(SystemConstants.WORKFLOW_JOB_ID);
workflowNode.setNodeInfo(JsonUtil.toJsonString(workflow));
} else {
throw new SnailJobServerException("Unsupported node type [{}]", nodeConfig.getNodeType());
}

View File

@ -230,6 +230,7 @@ public class ActorGenerator {
/**
* Job调度准备阶段actor
*
* @see com.aizuda.snailjob.server.job.task.support.dispatch.JobTaskPrepareActor
* @return actor 引用
*/
public static ActorRef jobTaskPrepareActor() {
@ -274,6 +275,7 @@ public class ActorGenerator {
/**
* Job任务执行阶段actor
*
* @see com.aizuda.snailjob.server.job.task.support.dispatch.WorkflowExecutorActor
* @return actor 引用
*/
public static ActorRef workflowTaskExecutorActor() {
@ -298,6 +300,8 @@ public class ActorGenerator {
/**
* Job任务向客户端发起请求阶段actor
*
* @see com.aizuda.snailjob.server.job.task.support.executor.job.RequestClientActor
*
* @return actor 引用
*/
public static ActorRef jobRealTaskExecutorActor() {

View File

@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.common.vo;
import com.aizuda.snailjob.server.common.dto.CallbackConfig;
import com.aizuda.snailjob.server.common.dto.DecisionConfig;
import com.aizuda.snailjob.server.common.dto.JobTaskConfig;
import com.aizuda.snailjob.server.common.dto.WorkflowConfig;
import lombok.Data;
import java.util.List;
@ -144,6 +145,11 @@ public class WorkflowDetailResponseVO {
*/
private JobTaskConfig jobTask;
/**
* 工作流配置
*/
private WorkflowConfig subWorkflow;
/**
* 定时任务批次信息
*/

View File

@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.common.vo.request;
import com.aizuda.snailjob.server.common.dto.CallbackConfig;
import com.aizuda.snailjob.server.common.dto.DecisionConfig;
import com.aizuda.snailjob.server.common.dto.JobTaskConfig;
import com.aizuda.snailjob.server.common.dto.WorkflowConfig;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
@ -107,7 +108,7 @@ public class WorkflowRequestVO {
private Integer priorityLevel;
/**
* 子节点
* 子节点下一个节点
*/
private NodeConfig childNode;
@ -130,6 +131,11 @@ public class WorkflowRequestVO {
* 回调配置
*/
private CallbackConfig callback;
/**
* 工作流配置
*/
private WorkflowConfig subWorkflow;
}
/**

View File

@ -220,6 +220,16 @@ public class WorkflowExecutorActor extends AbstractActor {
}
/**
* 填充父工作流节点的执行原因
* 该方法旨在处理父工作流节点的执行原因以决定是否继续执行后续节点
* 它通过检查是否存在不应跳过的执行原因来确定是否应该继续执行
*
* @param allJobTaskBatchList 包含所有作业任务批次的列表
* @param parentJobTaskBatchList 包含父工作流节点相关作业任务批次的列表
* @param parentWorkflowNode 父工作流节点
* @param context 工作流执行的上下文环境
*/
private static void fillParentOperationReason(final List<JobTaskBatch> allJobTaskBatchList,
final List<JobTaskBatch> parentJobTaskBatchList, final WorkflowNode parentWorkflowNode,
final WorkflowExecutorContext context) {

View File

@ -10,6 +10,15 @@ import org.springframework.transaction.annotation.Transactional;
* @author opensnail
* @date 2023-10-03 22:13:04
* @since 2.4.0
*
* 任务类型
* 1. 集群任务 ClusterJobExecutor
* @see com.aizuda.snailjob.server.job.task.support.executor.job.ClusterJobExecutor
* 2. 广播任务 BroadcastTaskJobExecutor
* 3. 静态分片任务 ShardingJobExecutor
* 4. Map 任务 MapJobExecutor
* 5. MapReduce 任务 MapReduceJobExecutor
*
*/
public abstract class AbstractJobExecutor implements JobExecutor, InitializingBean {

View File

@ -17,6 +17,8 @@ import java.util.List;
* @author opensnail
* @date 2023-10-06 10:27:26
* @since 2.4.0
*
* 广播任务执行器
*/
@Component
@Slf4j

View File

@ -18,6 +18,7 @@ import java.util.List;
* @author opensnail
* @date 2023-10-03 22:12:40
* @since 2.4.0
* 集群任务执行器
*/
@Component
public class ClusterJobExecutor extends AbstractJobExecutor {

View File

@ -6,6 +6,8 @@ import org.springframework.stereotype.Component;
/**
* @author: shuguang.zhang
* @date : 2024-06-19
*
* Map 任务
*/
@Component
public class MapJobExecutor extends MapReduceJobExecutor {

View File

@ -16,6 +16,8 @@ import java.util.List;
* @author: opensnail
* @date : 2024-06-12
* @since : sj_1.1.0
*
* MapReduce 任务执行器
*/
@Component
public class MapReduceJobExecutor extends AbstractJobExecutor {

View File

@ -40,6 +40,8 @@ import java.util.Objects;
* @author opensnail
* @date 2023-10-06 16:42:08
* @since 2.4.0
*
* 调用客户端执行任务
*/
@Component(ActorGenerator.REAL_JOB_EXECUTOR_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)

View File

@ -17,6 +17,8 @@ import java.util.List;
* @author opensnail
* @date 2023-10-06 17:33:51
* @since 2.4.0
*
* 分片任务执行器
*/
@Component
@Slf4j

View File

@ -37,6 +37,12 @@ import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKF
* @author xiaowoniu
* @date 2023-12-24 08:15:19
* @since 2.6.0
*
* 抽象工作流执行器
*<br/>
* 1.任务节点 JobTaskWorkflowExecutor
* 2.决策节点 @see com.aizuda.snailjob.server.job.task.support.executor.workflow.DecisionWorkflowExecutor
* 3.回调通知节点 @see com.aizuda.snailjob.server.job.task.support.executor.workflow.CallbackWorkflowExecutor
*/
@Slf4j
public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, InitializingBean {

View File

@ -34,6 +34,8 @@ import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKF
* @author xiaowoniu
* @date 2023-12-24 08:18:06
* @since 2.6.0
*
* 回调通知节点
*/
@Component
@RequiredArgsConstructor

View File

@ -34,6 +34,8 @@ import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKF
* @author xiaowoniu
* @date 2023-12-24 08:17:11
* @since 2.6.0
*
* 决策节点执行器
*/
@Component
@RequiredArgsConstructor

View File

@ -22,6 +22,8 @@ import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKF
* @author xiaowoniu
* @date 2023-12-24 08:09:14
* @since 2.6.0
*
* 任务节点执行器
*/
@Component
@RequiredArgsConstructor
@ -73,6 +75,9 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
}
/**
* @see com.aizuda.snailjob.server.job.task.support.dispatch.JobTaskPrepareActor
*/
private static void invokeJobTask(final WorkflowExecutorContext context) {
// 生成任务批次
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(context.getJob(), context);

View File

@ -11,6 +11,9 @@ import lombok.Data;
@Data
public class WorkflowExecutorContext {
/**
* 命名空间id
*/
private String namespaceId;
/**

View File

@ -12,6 +12,9 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class WorkflowExecutorFactory {
/**
* Map<节点类型,节点执行类>
*/
private static final ConcurrentHashMap<WorkflowNodeTypeEnum, WorkflowExecutor> CACHE = new ConcurrentHashMap<>();
protected static void registerJobExecutor(WorkflowNodeTypeEnum workflowNodeTypeEnum, WorkflowExecutor executor) {

View File

@ -0,0 +1,39 @@
package com.aizuda.snailjob.server.job.task.support.executor.workflow;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.WorkflowNodeTypeEnum;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class WorkflowWorkflowExecutor extends AbstractWorkflowExecutor {
@Override
public WorkflowNodeTypeEnum getWorkflowNodeType() {
return WorkflowNodeTypeEnum.WORKFLOW;
}
@Override
protected boolean doPreValidate(WorkflowExecutorContext context) {
return true;
}
@Override
protected void afterExecute(WorkflowExecutorContext context) {
}
@Override
protected void beforeExecute(WorkflowExecutorContext context) {
}
@Override
protected void doExecute(WorkflowExecutorContext context) {
context.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
context.setOperationReason(JobOperationReasonEnum.NONE.getReason());
context.setJobTaskStatus(JobTaskStatusEnum.SUCCESS.getStatus());
}
}

View File

@ -270,6 +270,9 @@ public class WorkflowServiceImpl implements WorkflowService {
return 1 == workflowMapper.updateById(workflow);
}
/**
* 手动触发工作流
*/
@Override
public Boolean trigger(WorkflowTriggerVO triggerVO) {
Workflow workflow = workflowMapper.selectById(triggerVO.getWorkflowId());