工作流中调用工作流节点

This commit is contained in:
csc 2025-06-23 01:30:12 +08:00
parent 92d64617e4
commit 21e18a1029
14 changed files with 102 additions and 15 deletions

View File

@ -84,7 +84,7 @@ public enum JobOperationReasonEnum {
/** /**
* 手动调用 * 手动调用
*/ */
MANUAL_CALL(15, "Manual call"), MANUAL_TRIGGER(15, "Manual call"),
/** /**
* 由工作流中被调用 * 由工作流中被调用
*/ */

View File

@ -14,10 +14,26 @@ import lombok.Getter;
@Getter @Getter
@AllArgsConstructor @AllArgsConstructor
public enum JobTaskExecutorSceneEnum { public enum JobTaskExecutorSceneEnum {
/**
* 自动触发任务
*/
AUTO_JOB(1, SyetemTaskTypeEnum.JOB), AUTO_JOB(1, SyetemTaskTypeEnum.JOB),
/**
* 手动触发任务
*/
MANUAL_JOB(2, SyetemTaskTypeEnum.JOB), MANUAL_JOB(2, SyetemTaskTypeEnum.JOB),
/**
* 自动触发工作流
*/
AUTO_WORKFLOW(3, SyetemTaskTypeEnum.WORKFLOW), AUTO_WORKFLOW(3, SyetemTaskTypeEnum.WORKFLOW),
/**
* 手动触发工作流
*/
MANUAL_WORKFLOW(4, SyetemTaskTypeEnum.WORKFLOW), MANUAL_WORKFLOW(4, SyetemTaskTypeEnum.WORKFLOW),
/**
* 工作流触发工作流
*/
WORKFLOW_WORKFLOW(5, SyetemTaskTypeEnum.WORKFLOW),
; ;
private final Integer type; private final Integer type;

View File

@ -19,6 +19,9 @@ public class WorkflowNodeTaskExecuteDTO {
*/ */
private Integer taskExecutorScene; private Integer taskExecutorScene;
/**
* 前一个节点 id
*/
private Long parentId; private Long parentId;
/** /**

View File

@ -1,5 +1,6 @@
package com.aizuda.snailjob.server.job.task.dto; package com.aizuda.snailjob.server.job.task.dto;
import com.aizuda.snailjob.server.job.task.support.executor.workflow.WorkflowExecutorContext;
import lombok.Data; import lombok.Data;
/** /**
@ -15,7 +16,7 @@ public class WorkflowTaskPrepareDTO {
private Long workflowId; private Long workflowId;
/** /**
* 执行策略 1auto 2manual 3workflow * 执行策略 1auto 2manual 3auto_workflow 4manual_workflow
*/ */
private Integer taskExecutorScene; private Integer taskExecutorScene;
@ -64,6 +65,11 @@ public class WorkflowTaskPrepareDTO {
*/ */
private long nextTriggerAt; private long nextTriggerAt;
/**
* 操作原因
*/
private Integer operationReason;
/** /**
* 任务执行时间 * 任务执行时间
*/ */
@ -78,4 +84,9 @@ public class WorkflowTaskPrepareDTO {
* 工作流上下文 * 工作流上下文
*/ */
private String wfContext; private String wfContext;
/**
* 父工作流上下文
*/
private WorkflowExecutorContext parentWfContext;
} }

View File

@ -93,7 +93,7 @@ public class WorkflowExecutorActor extends AbstractActor {
private void doExecutor(WorkflowNodeTaskExecuteDTO taskExecute) { private void doExecutor(WorkflowNodeTaskExecuteDTO taskExecute) {
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectById(taskExecute.getWorkflowTaskBatchId()); WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectById(taskExecute.getWorkflowTaskBatchId());
Assert.notNull(workflowTaskBatch, () -> new SnailJobServerException("Task does not exist")); Assert.notNull(workflowTaskBatch, () -> new SnailJobServerException("Task does not exist,WorkflowTaskBatchId:"+taskExecute.getWorkflowTaskBatchId()));
if (SystemConstants.ROOT.equals(taskExecute.getParentId()) if (SystemConstants.ROOT.equals(taskExecute.getParentId())
&& JobTaskBatchStatusEnum.WAITING.getStatus() == workflowTaskBatch.getTaskBatchStatus()) { && JobTaskBatchStatusEnum.WAITING.getStatus() == workflowTaskBatch.getTaskBatchStatus()) {

View File

@ -1,14 +1,29 @@
package com.aizuda.snailjob.server.job.task.support.executor.workflow; package com.aizuda.snailjob.server.job.task.support.executor.workflow;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.constant.SystemConstants; import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.WorkflowNodeTypeEnum; import com.aizuda.snailjob.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO; import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO;
import com.aizuda.snailjob.server.common.dto.WorkflowConfig;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskPrepareDTO;
import com.aizuda.snailjob.server.job.task.support.WorkflowPrePareHandler;
import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.snailjob.server.job.task.support.handler.JobTaskBatchHandler;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.snailjob.template.datasource.persistence.po.Workflow;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -18,6 +33,13 @@ import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKF
@RequiredArgsConstructor @RequiredArgsConstructor
public class WorkflowWorkflowExecutor extends AbstractWorkflowExecutor { public class WorkflowWorkflowExecutor extends AbstractWorkflowExecutor {
private final JobTaskBatchMapper jobTaskBatchMapper;
private final WorkflowBatchHandler workflowBatchHandler;
private final JobTaskBatchHandler jobTaskBatchHandler;
private final JobMapper jobMapper;
private final WorkflowMapper workflowMapper;
private final WorkflowPrePareHandler terminalWorkflowPrepareHandler;
@Override @Override
public WorkflowNodeTypeEnum getWorkflowNodeType() { public WorkflowNodeTypeEnum getWorkflowNodeType() {
return WorkflowNodeTypeEnum.WORKFLOW; return WorkflowNodeTypeEnum.WORKFLOW;
@ -30,6 +52,15 @@ public class WorkflowWorkflowExecutor extends AbstractWorkflowExecutor {
@Override @Override
protected void afterExecute(WorkflowExecutorContext context) { protected void afterExecute(WorkflowExecutorContext context) {
}
@Override
protected void beforeExecute(WorkflowExecutorContext context) {
context.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
context.setOperationReason(JobOperationReasonEnum.NONE.getReason());
context.setJobTaskStatus(JobTaskStatusEnum.SUCCESS.getStatus());
JobTaskBatch jobTaskBatch = generateJobTaskBatch(context); JobTaskBatch jobTaskBatch = generateJobTaskBatch(context);
JobTask jobTask = generateJobTask(context, jobTaskBatch); JobTask jobTask = generateJobTask(context, jobTaskBatch);
@ -53,15 +84,21 @@ public class WorkflowWorkflowExecutor extends AbstractWorkflowExecutor {
} }
} }
@Override
protected void beforeExecute(WorkflowExecutorContext context) {
}
@Override @Override
protected void doExecute(WorkflowExecutorContext context) { protected void doExecute(WorkflowExecutorContext context) {
context.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus()); WorkflowConfig nodeInfo = JsonUtil.parseObject(context.getNodeInfo(), WorkflowConfig.class);
context.setOperationReason(JobOperationReasonEnum.NONE.getReason()); Workflow workflow = workflowMapper.selectById(nodeInfo.getId());
context.setJobTaskStatus(JobTaskStatusEnum.SUCCESS.getStatus()); Assert.notNull(workflow, () -> new SnailJobServerException("workflow can not be null."));
WorkflowTaskPrepareDTO prepareDTO = WorkflowTaskConverter.INSTANCE.toWorkflowTaskPrepareDTO(workflow);
// 下次触发时间设置now表示立即执行
prepareDTO.setNextTriggerAt(DateUtils.toNowMilli());
// 执行策略工作流触发工作流
prepareDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.WORKFLOW_WORKFLOW.getType());
// 操作原因工作流调用
prepareDTO.setOperationReason(JobOperationReasonEnum.WORKFLOW_CALLED.getReason());
prepareDTO.setParentWfContext(context);
terminalWorkflowPrepareHandler.handler(prepareDTO);
} }
} }

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.job.task.support.generator.batch;
import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTimerTaskDTO; import com.aizuda.snailjob.server.job.task.dto.WorkflowTimerTaskDTO;
@ -28,6 +29,10 @@ import java.util.Optional;
public class WorkflowBatchGenerator { public class WorkflowBatchGenerator {
private final WorkflowTaskBatchMapper workflowTaskBatchMapper; private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
/**
* 生成工作流批次插入数据库
* 开始执行工作流
*/
public void generateJobTaskBatch(WorkflowTaskBatchGeneratorContext context) { public void generateJobTaskBatch(WorkflowTaskBatchGeneratorContext context) {
// 生成任务批次 // 生成任务批次
@ -35,6 +40,7 @@ public class WorkflowBatchGenerator {
workflowTaskBatch.setTaskBatchStatus(Optional.ofNullable(context.getTaskBatchStatus()).orElse(JobTaskBatchStatusEnum.WAITING.getStatus())); workflowTaskBatch.setTaskBatchStatus(Optional.ofNullable(context.getTaskBatchStatus()).orElse(JobTaskBatchStatusEnum.WAITING.getStatus()));
workflowTaskBatch.setOperationReason(context.getOperationReason()); workflowTaskBatch.setOperationReason(context.getOperationReason());
workflowTaskBatch.setWfContext(context.getWfContext()); workflowTaskBatch.setWfContext(context.getWfContext());
workflowTaskBatch.setExtAttrs(JsonUtil.toJsonString(context.getParentWorkflowContext()));
Assert.isTrue(1 == workflowTaskBatchMapper.insert(workflowTaskBatch), () -> new SnailJobServerException("Adding new scheduling task failed. [{}]", context.getWorkflowId())); Assert.isTrue(1 == workflowTaskBatchMapper.insert(workflowTaskBatch), () -> new SnailJobServerException("Adding new scheduling task failed. [{}]", context.getWorkflowId()));

View File

@ -1,5 +1,6 @@
package com.aizuda.snailjob.server.job.task.support.generator.batch; package com.aizuda.snailjob.server.job.task.support.generator.batch;
import com.aizuda.snailjob.server.job.task.support.executor.workflow.WorkflowExecutorContext;
import lombok.Data; import lombok.Data;
/** /**
@ -38,7 +39,7 @@ public class WorkflowTaskBatchGeneratorContext {
private Integer taskBatchStatus; private Integer taskBatchStatus;
/** /**
* 执行策略 1auto 2manual 3workflow * 执行策略 1auto 2manual 3auto_workflow 4manual_workflow
*/ */
private Integer taskExecutorScene; private Integer taskExecutorScene;
@ -52,5 +53,9 @@ public class WorkflowTaskBatchGeneratorContext {
*/ */
private String wfContext; private String wfContext;
/**
* 父工作流执行上下文
*/
private WorkflowExecutorContext parentWorkflowContext;
} }

View File

@ -99,6 +99,9 @@ public class WorkflowBatchHandler {
return isNeedProcess; return isNeedProcess;
} }
/**
* 工作流执行完成
*/
public boolean complete(Long workflowTaskBatchId) { public boolean complete(Long workflowTaskBatchId) {
return complete(workflowTaskBatchId, null); return complete(workflowTaskBatchId, null);
} }
@ -186,7 +189,7 @@ public class WorkflowBatchHandler {
WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch(); WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch();
jobTaskBatch.setId(workflowTaskBatchId); jobTaskBatch.setId(workflowTaskBatchId);
jobTaskBatch.setTaskBatchStatus(taskStatus); jobTaskBatch.setTaskBatchStatus(taskStatus);
jobTaskBatch.setOperationReason(operationReason); // jobTaskBatch.setOperationReason(operationReason);
workflowTaskBatchMapper.updateById(jobTaskBatch); workflowTaskBatchMapper.updateById(jobTaskBatch);
} }

View File

@ -15,7 +15,7 @@ import java.util.Objects;
import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED; import static com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum.COMPLETED;
/** /**
* 处理处于已完成 {@link JobTaskBatchStatusEnum::COMPLETED} 状态的任务 * 处理处于已完成 {@link JobTaskBatchStatusEnum::COMPLETED} 完成状态的任务
* *
* @author opensnail * @author opensnail
* @date 2023-10-02 10:16:28 * @date 2023-10-02 10:16:28

View File

@ -32,6 +32,9 @@ public class RunningWorkflowPrepareHandler extends AbstractWorkflowPrePareHandle
private final WorkflowBatchHandler workflowBatchHandler; private final WorkflowBatchHandler workflowBatchHandler;
/**
* 运行中的任务处理
*/
@Override @Override
public boolean matches(Integer status) { public boolean matches(Integer status) {
return Objects.nonNull(status) && JobTaskBatchStatusEnum.RUNNING.getStatus() == status; return Objects.nonNull(status) && JobTaskBatchStatusEnum.RUNNING.getStatus() == status;

View File

@ -14,7 +14,7 @@ import java.time.Duration;
import java.util.Objects; import java.util.Objects;
/** /**
* 处理处于{@link JobTaskBatchStatusEnum::WAIT}状态的任务 * 处理处于{@link JobTaskBatchStatusEnum::WAITING}待处理 状态的任务
* *
* @author xiaowoniu * @author xiaowoniu
* @date 2023-10-05 18:29:22 * @date 2023-10-05 18:29:22

View File

@ -33,7 +33,7 @@ public class WorkflowTimerTask implements TimerTask<String> {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(workflowTimerTaskDTO.getWorkflowTaskBatchId()); taskExecuteDTO.setWorkflowTaskBatchId(workflowTimerTaskDTO.getWorkflowTaskBatchId());
taskExecuteDTO.setTaskExecutorScene(workflowTimerTaskDTO.getTaskExecutorScene()); taskExecuteDTO.setTaskExecutorScene(workflowTimerTaskDTO.getTaskExecutorScene());//执行策略
taskExecuteDTO.setParentId(SystemConstants.ROOT); taskExecuteDTO.setParentId(SystemConstants.ROOT);
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef); actorRef.tell(taskExecuteDTO, actorRef);

View File

@ -6,6 +6,7 @@ import cn.hutool.core.lang.Pair;
import cn.hutool.core.util.HashUtil; import cn.hutool.core.util.HashUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.constant.SystemConstants; import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.expression.ExpressionEngine; import com.aizuda.snailjob.common.core.expression.ExpressionEngine;
import com.aizuda.snailjob.common.core.expression.ExpressionFactory; import com.aizuda.snailjob.common.core.expression.ExpressionFactory;
@ -293,6 +294,8 @@ public class WorkflowServiceImpl implements WorkflowService {
// 设置now表示立即执行 // 设置now表示立即执行
prepareDTO.setNextTriggerAt(DateUtils.toNowMilli()); prepareDTO.setNextTriggerAt(DateUtils.toNowMilli());
prepareDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType()); prepareDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType());
// 操作原因手动触发
prepareDTO.setOperationReason(JobOperationReasonEnum.MANUAL_TRIGGER.getReason());
String tmpWfContext = triggerVO.getTmpWfContext(); String tmpWfContext = triggerVO.getTmpWfContext();
if (StrUtil.isNotBlank(tmpWfContext) && !JsonUtil.isEmptyJson(tmpWfContext)){ if (StrUtil.isNotBlank(tmpWfContext) && !JsonUtil.isEmptyJson(tmpWfContext)){
prepareDTO.setWfContext(tmpWfContext); prepareDTO.setWfContext(tmpWfContext);