diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java index 81c125580..cdc653cee 100644 --- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java @@ -103,7 +103,7 @@ public interface SystemConstants { Long ROOT = -1L; /** - * 根节点 + * 系统内置的条件任务ID */ Long CONDITION_JOB_ID = -1000L; } diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobOperationReasonEnum.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobOperationReasonEnum.java index 603241678..0f4f6616b 100644 --- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobOperationReasonEnum.java +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/JobOperationReasonEnum.java @@ -26,7 +26,7 @@ public enum JobOperationReasonEnum { NOT_EXECUTE_TASK(6, "无可执行任务项"), TASK_EXECUTE_ERROR(7, "任务执行期间发生非预期异常"), MANNER_STOP(8, "手动停止"), - WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR(8, "手动停止"), + WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR(8, "条件节点执行异常"), ; private final int reason; diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java index 6554e3b8b..eb4b44fd4 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java @@ -6,6 +6,7 @@ import com.aizuda.easy.retry.server.job.task.support.executor.workflow.WorkflowE import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext; import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext; import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow; +import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode; import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch; import org.mapstruct.Mapper; import org.mapstruct.Mapping; @@ -35,4 +36,9 @@ public interface WorkflowTaskConverter { WorkflowTaskBatch toWorkflowTaskBatch(WorkflowTaskBatchGeneratorContext context); JobTaskBatchGeneratorContext toJobTaskBatchGeneratorContext(WorkflowExecutorContext context); + + @Mappings( + @Mapping(source = "id", target = "workflowNodeId") + ) + WorkflowExecutorContext toWorkflowExecutorContext(WorkflowNode workflowNode); } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java index 03bf5846b..7af044e77 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -19,6 +19,7 @@ import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.WorkflowExecutor; +import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter; import com.aizuda.easy.retry.server.job.task.support.executor.workflow.WorkflowExecutorContext; import com.aizuda.easy.retry.server.job.task.support.executor.workflow.WorkflowExecutorFactory; import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler; @@ -106,7 +107,6 @@ public class WorkflowExecutorActor extends AbstractActor { List jobs = jobMapper.selectBatchIds(workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet())); Map jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i)); - // TODO 此次做策略,按照任务节点 条件节点 回调节点做抽象 // 不管什么任务都需要创建一个 job_task_batch记录 保障一个节点执行创建一次,同时可以判断出DAG是否全部执行完成 for (WorkflowNode workflowNode : workflowNodes) { // 批次已经存在就不在重复生成 @@ -117,9 +117,8 @@ public class WorkflowExecutorActor extends AbstractActor { // 执行DAG中的节点 WorkflowExecutor workflowExecutor = WorkflowExecutorFactory.getWorkflowExecutor(workflowNode.getNodeType()); - WorkflowExecutorContext context = new WorkflowExecutorContext(); + WorkflowExecutorContext context = WorkflowTaskConverter.INSTANCE.toWorkflowExecutorContext(workflowNode); context.setJob(jobMap.get(workflowNode.getJobId())); - context.setWorkflowNodeId(workflowNode.getWorkflowId()); context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId()); context.setParentWorkflowNodeId(taskExecute.getParentId()); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java index abdf2e93c..6b778e296 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java @@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.job.task.support.executor.workflow; import com.aizuda.easy.retry.server.job.task.support.WorkflowExecutor; import org.springframework.beans.factory.InitializingBean; +import org.springframework.transaction.annotation.Transactional; /** * @author xiaowoniu @@ -11,6 +12,7 @@ import org.springframework.beans.factory.InitializingBean; public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, InitializingBean { @Override + @Transactional public void execute(WorkflowExecutorContext context) { doExecute(context); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java index 3ed522714..e76e01499 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java @@ -1,27 +1,39 @@ package com.aizuda.easy.retry.server.job.task.support.executor.workflow; import akka.actor.ActorRef; +import cn.hutool.core.lang.Assert; +import cn.hutool.core.util.StrUtil; import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum; import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.common.util.ClientInfoUtils; +import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO; +import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter; import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator; import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.expression.EvaluationContext; import org.springframework.expression.ExpressionParser; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import java.util.Map; +import java.util.Optional; /** * @author xiaowoniu @@ -33,24 +45,26 @@ import java.util.Map; @Slf4j public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor { - private final static ExpressionParser ENGINE = new SpelExpressionParser(); + private static final ExpressionParser ENGINE = new SpelExpressionParser(); private final JobTaskBatchGenerator jobTaskBatchGenerator; - + private final JobTaskMapper jobTaskMapper; @Override public WorkflowNodeTypeEnum getWorkflowNodeType() { return WorkflowNodeTypeEnum.CONDITION; } @Override - protected void doExecute(WorkflowExecutorContext context) { + public void doExecute(WorkflowExecutorContext context) { int taskBatchStatus = JobTaskBatchStatusEnum.SUCCESS.getStatus(); int operationReason = JobOperationReasonEnum.NONE.getReason(); + int jobTaskStatus = JobTaskStatusEnum.SUCCESS.getStatus(); + String message = StrUtil.EMPTY; try { // 根据配置的表达式执行 Boolean result = doEval(context.getExpression(), JsonUtil.parseHashMap(context.getResult())); - if (result) { + if (Boolean.TRUE.equals(result)) { // 若是工作流则开启下一个任务 try { WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); @@ -67,15 +81,39 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor { } catch (Exception e) { taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus(); operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR.getReason(); + jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus(); + message = e.getMessage(); } JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context); generatorContext.setTaskBatchStatus(taskBatchStatus); generatorContext.setOperationReason(operationReason); - - // 特殊的job generatorContext.setJobId(SystemConstants.CONDITION_JOB_ID); - jobTaskBatchGenerator.generateJobTaskBatch(generatorContext); + JobTaskBatch jobTaskBatch = jobTaskBatchGenerator.generateJobTaskBatch(generatorContext); + + // 生成执行任务实例 + JobTask jobTask = new JobTask(); + jobTask.setGroupName(context.getGroupName()); + jobTask.setNamespaceId(context.getNamespaceId()); + jobTask.setJobId(SystemConstants.CONDITION_JOB_ID); + jobTask.setClientInfo(StrUtil.EMPTY); + jobTask.setTaskBatchId(jobTaskBatch.getId()); + jobTask.setArgsType(1); + jobTask.setArgsStr(context.getExpression()); + jobTask.setTaskStatus(jobTaskStatus); + jobTask.setResultMessage(Optional.ofNullable(context.getResult()).orElse(StrUtil.EMPTY)); + Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败")); + + // 保存执行的日志 + JobLogDTO jobLogDTO = new JobLogDTO(); + jobLogDTO.setMessage(message); + jobLogDTO.setTaskId(jobTask.getId()); + jobLogDTO.setJobId(SystemConstants.CONDITION_JOB_ID); + jobLogDTO.setGroupName(context.getGroupName()); + jobLogDTO.setNamespaceId(context.getNamespaceId()); + jobLogDTO.setTaskBatchId(jobTaskBatch.getId()); + ActorRef actorRef = ActorGenerator.jobLogActor(); + actorRef.tell(jobLogDTO, actorRef); } protected Boolean doEval(String expression, Map context) { diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorContext.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorContext.java index bb12cd89f..c2b1e04f8 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorContext.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorContext.java @@ -54,4 +54,24 @@ public class WorkflowExecutorContext { */ private String result; + /** + * 1、SpEl、2、Aviator 3、QL + */ + private Integer expressionType; + + /** + * 失败策略 1、跳过 2、阻塞 + */ + private Integer failStrategy; + + /** + * 工作流节点状态 0、关闭、1、开启 + */ + private Integer workflowNodeStatus; + + /** + * 节点表达式 + */ + private String nodeExpression; + } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java index e6f796fce..4249097a6 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java @@ -12,6 +12,7 @@ import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerTask; import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerWheel; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DuplicateKeyException; @@ -36,7 +37,7 @@ public class JobTaskBatchGenerator { private JobTaskBatchMapper jobTaskBatchMapper; @Transactional - public void generateJobTaskBatch(JobTaskBatchGeneratorContext context) { + public JobTaskBatch generateJobTaskBatch(JobTaskBatchGeneratorContext context) { // 生成一个新的任务 JobTaskBatch jobTaskBatch = JobTaskConverter.INSTANCE.toJobTaskBatch(context); @@ -56,12 +57,16 @@ public class JobTaskBatchGenerator { Assert.isTrue(1 == jobTaskBatchMapper.insert(jobTaskBatch), () -> new EasyRetryServerException("新增调度任务失败.jobId:[{}]", context.getJobId())); } catch (DuplicateKeyException ignored) { // 忽略重复的DAG任务 - return; + return jobTaskBatchMapper.selectOne(new LambdaQueryWrapper() + .eq(JobTaskBatch::getWorkflowTaskBatchId, context.getWorkflowTaskBatchId()) + .eq(JobTaskBatch::getWorkflowNodeId, context.getWorkflowNodeId()) + ); + } // 非待处理状态无需进入时间轮中 if (JobTaskBatchStatusEnum.WAITING.getStatus() != jobTaskBatch.getTaskBatchStatus()) { - return; + return jobTaskBatch; } // 进入时间轮 @@ -75,6 +80,7 @@ public class JobTaskBatchGenerator { JobTimerWheel.register(jobTaskBatch.getId(), new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS); + return jobTaskBatch; } } diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/model/request/WorkflowRequestVO.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/model/request/WorkflowRequestVO.java index 1a625f91d..8273ca2c0 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/model/request/WorkflowRequestVO.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/model/request/WorkflowRequestVO.java @@ -46,10 +46,10 @@ public class WorkflowRequestVO { */ private String description; - /** * DAG节点配置 */ + @NotNull(message = "DAG节点配置不能为空") private NodeConfig nodeConfig; @Data diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowBatchServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowBatchServiceImpl.java index a58ca50ae..6b02e4a84 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowBatchServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowBatchServiceImpl.java @@ -118,10 +118,12 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService { Map workflowNodeMap = nodeInfos.stream() .peek(nodeInfo -> { - JobTaskBatch taskBatch = jobTaskBatchMap.getOrDefault(nodeInfo.getId(), new JobTaskBatch()); - nodeInfo.setExecutionAt(DateUtils.toLocalDateTime(taskBatch.getExecutionAt())); - nodeInfo.setTaskBatchStatus(taskBatch.getTaskBatchStatus()); - nodeInfo.setOperationReason(taskBatch.getOperationReason()); + JobTaskBatch jobTaskBatch = jobTaskBatchMap.get(nodeInfo.getId()); + if (Objects.nonNull(jobTaskBatch)) { + nodeInfo.setExecutionAt(DateUtils.toLocalDateTime(jobTaskBatch.getExecutionAt())); + nodeInfo.setTaskBatchStatus(jobTaskBatch.getTaskBatchStatus()); + nodeInfo.setOperationReason(jobTaskBatch.getOperationReason()); + } }) .collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, i -> i)); diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java index db4cf920c..8b2b30ea8 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java @@ -5,6 +5,7 @@ import cn.hutool.core.util.HashUtil; import cn.hutool.core.util.StrUtil; import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.enums.StatusEnum; +import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum; import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.common.WaitStrategy; import com.aizuda.easy.retry.server.common.config.SystemProperties; @@ -232,6 +233,9 @@ public class WorkflowServiceImpl implements WorkflowService { workflowNode.setWorkflowId(workflowId); workflowNode.setGroupName(groupName); workflowNode.setNodeType(nodeConfig.getNodeType()); + if (WorkflowNodeTypeEnum.CONDITION.getType() == nodeConfig.getNodeType()) { + workflowNode.setJobId(SystemConstants.CONDITION_JOB_ID); + } Assert.isTrue(1 == workflowNodeMapper.insert(workflowNode), () -> new EasyRetryServerException("新增工作流节点失败")); // 添加节点 graph.addNode(workflowNode.getId());