From 758a9875d73731a7419ddaf4ea54a0f3c2e6fb80 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Thu, 20 Jun 2024 23:14:18 +0800 Subject: [PATCH] =?UTF-8?q?feat(sj=5F1.1.0):=20=E8=B0=83=E8=AF=95=E5=86=B3?= =?UTF-8?q?=E7=AD=96=E8=8A=82=E7=82=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dispatch/WorkflowExecutorActor.java | 39 ++++++++++++------- .../workflow/AbstractWorkflowExecutor.java | 2 - .../workflow/DecisionWorkflowExecutor.java | 35 +++++++++++------ 3 files changed, 48 insertions(+), 28 deletions(-) diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java index ab27869d..c2cdaaca 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -112,9 +112,6 @@ public class WorkflowExecutorActor extends AbstractActor { return; } - // TODO 暂时删除,待认证 -// Sets.SetView union = Sets.union(allSuccessors, brotherNode); - // 添加父节点,为了判断父节点的处理状态 List allJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper() .select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId, @@ -155,23 +152,35 @@ public class WorkflowExecutorActor extends AbstractActor { } } - if (!brotherNodeIsComplete(taskExecute, brotherNode, jobTaskBatchMap, parentWorkflowNode)) { - return; - } + // 决策节点 + if (Objects.nonNull(parentWorkflowNode) && WorkflowNodeTypeEnum.DECISION.getType() == parentWorkflowNode.getNodeType()) { - // 去掉父节点 - workflowNodes = workflowNodes.stream() - .filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId())) - .collect(Collectors.toList()); + // 获取决策节点子节点 + Set successors = graph.successors(parentWorkflowNode.getId()); + workflowNodes = workflowNodes.stream() + // 去掉父节点 + .filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId()) + // 过滤掉非当前决策节点【ParentId】的子节点 + && successors.contains(workflowNode.getId())).collect(Collectors.toList()); + } else { + if (!brotherNodeIsComplete(taskExecute, brotherNode, jobTaskBatchMap, parentWorkflowNode)) { + return; + } + + workflowNodes = workflowNodes.stream() + // 去掉父节点 + .filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId())) + .collect(Collectors.toList()); + + // TODO 合并job task的结果到全局上下文中 + // 此次的并发数与当时父节点的兄弟节点的数量一致 + workflowBatchHandler.mergeWorkflowContextAndRetry(workflowTaskBatch, + StreamUtils.toSet(allJobTaskBatchList, JobTaskBatch::getId)); + } List jobs = jobMapper.selectBatchIds(StreamUtils.toSet(workflowNodes, WorkflowNode::getJobId)); Map jobMap = StreamUtils.toIdentityMap(jobs, Job::getId); - // TODO 合并job task的结果到全局上下文中 - // 此次的并发数与当时父节点的兄弟节点的数量一致 - workflowBatchHandler.mergeWorkflowContextAndRetry(workflowTaskBatch, - StreamUtils.toSet(allJobTaskBatchList, JobTaskBatch::getId)); - // 只会条件节点会使用 Object evaluationResult = null; for (WorkflowNode workflowNode : workflowNodes) { diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java index 2bb1d608..9ec31773 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java @@ -148,8 +148,6 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init jobTask.setClientInfo(StrUtil.EMPTY); jobTask.setTaskBatchId(jobTaskBatch.getId()); jobTask.setArgsType(JobArgsTypeEnum.TEXT.getArgsType()); - // TODO 待定是否删除 - jobTask.setArgsStr(Optional.ofNullable(context.getTaskResult()).orElse(StrUtil.EMPTY)); jobTask.setTaskStatus(context.getJobTaskStatus()); jobTask.setResultMessage(String.valueOf(context.getEvaluationResult())); Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new SnailJobServerException("新增任务实例失败")); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java index 61b478fc..9ebd4b51 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java @@ -16,12 +16,16 @@ import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent; import com.aizuda.snailjob.server.job.task.support.expression.ExpressionInvocationHandler; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; +import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.util.Objects; import java.util.Optional; /** @@ -33,8 +37,7 @@ import java.util.Optional; @RequiredArgsConstructor @Slf4j public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor { - private final JobTaskMapper jobTaskMapper; - + private final WorkflowTaskBatchMapper workflowTaskBatchMapper; @Override public WorkflowNodeTypeEnum getWorkflowNodeType() { @@ -64,14 +67,24 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor { DecisionConfig decisionConfig = JsonUtil.parseObject(context.getNodeInfo(), DecisionConfig.class); if (StatusEnum.NO.getStatus().equals(decisionConfig.getDefaultDecision())) { try { - ExpressionEngine realExpressionEngine = ExpressionTypeEnum.valueOf(decisionConfig.getExpressionType()); - Assert.notNull(realExpressionEngine, () -> new SnailJobServerException("表达式引擎不存在")); - ExpressionInvocationHandler invocationHandler = new ExpressionInvocationHandler(realExpressionEngine); - ExpressionEngine expressionEngine = ExpressionFactory.getExpressionEngine(invocationHandler); - result = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), context.getWfContext())).orElse(Boolean.FALSE); - if (!result) { + // 这里重新加载一次最新的上下文 + WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne(new LambdaQueryWrapper() + .select(WorkflowTaskBatch::getWfContext) + .eq(WorkflowTaskBatch::getId, context.getWorkflowTaskBatchId()) + ); + if (Objects.isNull(workflowTaskBatch)) { operationReason = JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason(); + } else { + ExpressionEngine realExpressionEngine = ExpressionTypeEnum.valueOf(decisionConfig.getExpressionType()); + Assert.notNull(realExpressionEngine, () -> new SnailJobServerException("表达式引擎不存在")); + ExpressionInvocationHandler invocationHandler = new ExpressionInvocationHandler(realExpressionEngine); + ExpressionEngine expressionEngine = ExpressionFactory.getExpressionEngine(invocationHandler); + result = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), workflowTaskBatch.getWfContext())).orElse(Boolean.FALSE); + if (!result) { + operationReason = JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason(); + } } + } catch (Exception e) { log.error("执行条件表达式解析异常. 表达式:[{}],参数: [{}]", decisionConfig.getNodeExpression(), context.getWfContext(), e); taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus(); @@ -85,9 +98,9 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor { } } - if (JobTaskBatchStatusEnum.SUCCESS.getStatus() == taskBatchStatus && result) { - workflowTaskExecutor(context); - } +// if (JobTaskBatchStatusEnum.SUCCESS.getStatus() == taskBatchStatus && result) { +// workflowTaskExecutor(context); +// } // 回传执行结果 context.setEvaluationResult(result);