diff --git a/doc/sql/easy_retry_mysql.sql b/doc/sql/easy_retry_mysql.sql index 52d2cb1c..26c41e5d 100644 --- a/doc/sql/easy_retry_mysql.sql +++ b/doc/sql/easy_retry_mysql.sql @@ -481,6 +481,7 @@ CREATE TABLE `workflow_node` `expression_type` tinyint(4) NOT NULL DEFAULT 0 COMMENT '1、SpEl、2、Aviator 3、QL', `fail_strategy` tinyint(4) NOT NULL DEFAULT 0 COMMENT '失败策略 1、跳过 2、阻塞', `workflow_node_status` tinyint(4) NOT NULL DEFAULT 1 COMMENT '工作流节点状态 0、关闭、1、开启', + `priority_level` int(11) NOT NULL DEFAULT 1 COMMENT '优先级', `node_expression` text DEFAULT NULL COMMENT '节点表达式', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/WorkflowNode.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/WorkflowNode.java index 7f389369..979ef806 100644 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/WorkflowNode.java +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/WorkflowNode.java @@ -69,6 +69,11 @@ public class WorkflowNode implements Serializable { */ private Integer failStrategy; + /** + * 优先级 + */ + private Integer priorityLevel; + /** * 工作流节点状态 0、关闭、1、开启 */ diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java index 7af044e7..54bb7353 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -103,11 +103,15 @@ public class WorkflowExecutorActor extends AbstractActor { Map jobTaskBatchMap = jobTaskBatchList.stream().collect(Collectors.toMap(JobTaskBatch::getWorkflowNodeId, i -> i)); - List workflowNodes = workflowNodeMapper.selectBatchIds(successors); + List workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper() + .in(WorkflowNode::getId, successors).orderByAsc(WorkflowNode::getPriorityLevel)); List jobs = jobMapper.selectBatchIds(workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet())); Map jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i)); // 不管什么任务都需要创建一个 job_task_batch记录 保障一个节点执行创建一次,同时可以判断出DAG是否全部执行完成 + + // 只会条件节点会使用 + Boolean evaluationResult = null; for (WorkflowNode workflowNode : workflowNodes) { // 批次已经存在就不在重复生成 if (Objects.nonNull(jobTaskBatchMap.get(workflowNode.getId()))) { @@ -121,8 +125,12 @@ public class WorkflowExecutorActor extends AbstractActor { context.setJob(jobMap.get(workflowNode.getJobId())); context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId()); context.setParentWorkflowNodeId(taskExecute.getParentId()); + context.setResult(taskExecute.getResult()); + context.setEvaluationResult(evaluationResult); workflowExecutor.execute(context); + + evaluationResult = context.getEvaluationResult(); } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java index e76e0149..ccc9d4f9 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java @@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.job.task.support.executor.workflow; import akka.actor.ActorRef; import cn.hutool.core.lang.Assert; +import cn.hutool.core.lang.Opt; import cn.hutool.core.util.StrUtil; import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; @@ -12,10 +13,8 @@ import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; -import com.aizuda.easy.retry.server.common.util.ClientInfoUtils; import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO; -import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter; import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator; import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext; @@ -24,13 +23,11 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.expression.EvaluationContext; import org.springframework.expression.ExpressionParser; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; import java.util.Map; import java.util.Optional; @@ -49,6 +46,7 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor { private final JobTaskBatchGenerator jobTaskBatchGenerator; private final JobTaskMapper jobTaskMapper; + @Override public WorkflowNodeTypeEnum getWorkflowNodeType() { return WorkflowNodeTypeEnum.CONDITION; @@ -61,30 +59,42 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor { int jobTaskStatus = JobTaskStatusEnum.SUCCESS.getStatus(); String message = StrUtil.EMPTY; - try { - // 根据配置的表达式执行 - Boolean result = doEval(context.getExpression(), JsonUtil.parseHashMap(context.getResult())); - if (Boolean.TRUE.equals(result)) { - // 若是工作流则开启下一个任务 - try { - WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); - taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); - taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType()); - taskExecuteDTO.setParentId(context.getWorkflowNodeId()); - taskExecuteDTO.setResult(context.getResult()); - ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); - actorRef.tell(taskExecuteDTO, actorRef); - } catch (Exception e) { - log.error("工作流执行失败", e); - } + Boolean result = Optional.ofNullable(context.getEvaluationResult()).orElse(Boolean.FALSE); + + if (result) { + // 多个条件节点直接是或的关系,只要一个成功其他节点就取消 + taskBatchStatus = JobTaskBatchStatusEnum.CANCEL.getStatus(); + } else { + try { + // 根据配置的表达式执行 + result = doEval(context.getNodeExpression(), JsonUtil.parseHashMap(context.getResult())); + log.info("执行条件表达式:[{}],参数: [{}] 结果:[{}]", context.getNodeExpression(), context.getResult(), result); + } catch (Exception e) { + taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus(); + operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR.getReason(); + jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus(); + message = e.getMessage(); } - } catch (Exception e) { - taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus(); - operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR.getReason(); - jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus(); - message = e.getMessage(); } + if (result) { + // 若是工作流则开启下一个任务 + try { + WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); + taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); + taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType()); + taskExecuteDTO.setParentId(context.getWorkflowNodeId()); + taskExecuteDTO.setResult(context.getResult()); + ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); + actorRef.tell(taskExecuteDTO, actorRef); + } catch (Exception e) { + log.error("工作流执行失败", e); + } + } + + // 回传执行结果 + context.setEvaluationResult(result); + JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context); generatorContext.setTaskBatchStatus(taskBatchStatus); generatorContext.setOperationReason(operationReason); @@ -99,13 +109,14 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor { jobTask.setClientInfo(StrUtil.EMPTY); jobTask.setTaskBatchId(jobTaskBatch.getId()); jobTask.setArgsType(1); - jobTask.setArgsStr(context.getExpression()); + jobTask.setArgsStr(Optional.ofNullable(context.getResult()).orElse(StrUtil.EMPTY)); jobTask.setTaskStatus(jobTaskStatus); - jobTask.setResultMessage(Optional.ofNullable(context.getResult()).orElse(StrUtil.EMPTY)); + jobTask.setResultMessage(String.valueOf(result)); Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败")); // 保存执行的日志 JobLogDTO jobLogDTO = new JobLogDTO(); + // TODO 等实时日志处理完毕后,再处理 jobLogDTO.setMessage(message); jobLogDTO.setTaskId(jobTask.getId()); jobLogDTO.setJobId(SystemConstants.CONDITION_JOB_ID); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorContext.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorContext.java index c2b1e04f..3ee5a1f9 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorContext.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/WorkflowExecutorContext.java @@ -1,12 +1,8 @@ package com.aizuda.easy.retry.server.job.task.support.executor.workflow; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; -import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; import lombok.Data; -import java.util.List; -import java.util.Map; - /** * @author xiaowoniu * @date 2023-12-24 @@ -47,7 +43,7 @@ public class WorkflowExecutorContext { */ private Job job; - private String expression; + private String nodeExpression; /** * 客户端返回的结果 @@ -70,8 +66,8 @@ public class WorkflowExecutorContext { private Integer workflowNodeStatus; /** - * 节点表达式 + * 条件节点的判定结果 */ - private String nodeExpression; + private Boolean evaluationResult; }