From 9c34d2c660d30ea31cc68ed36297df231dca216e Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Sun, 30 Jun 2024 00:07:51 +0800 Subject: [PATCH] =?UTF-8?q?fix(sj=5F1.1.0-beta2):=20=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E5=B7=A5=E4=BD=9C=E6=B5=81=E8=8A=82=E7=82=B9=E6=9C=AA=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dispatch/JobExecutorResultActor.java | 2 +- .../dispatch/WorkflowExecutorActor.java | 160 ++++++++++++------ .../workflow/DecisionWorkflowExecutor.java | 6 +- .../workflow/JobTaskWorkflowExecutor.java | 20 ++- .../workflow/WorkflowExecutorContext.java | 11 ++ .../starter/schedule/OfflineNodeSchedule.java | 2 +- .../web/service/handler/WorkflowHandler.java | 2 +- 7 files changed, 143 insertions(+), 60 deletions(-) diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java index f250c69d..2f228ffe 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java @@ -46,7 +46,7 @@ public class JobExecutorResultActor extends AbstractActor { @Override public Receive createReceive() { return receiveBuilder().match(JobExecutorResultDTO.class, result -> { - SnailJobLog.LOCAL.info("更新任务状态. 参数:[{}]", JsonUtil.toJsonString(result)); + SnailJobLog.LOCAL.debug("更新任务状态. 参数:[{}]", JsonUtil.toJsonString(result)); try { Assert.notNull(result.getTaskId(), ()-> new SnailJobServerException("taskId can not be null")); Assert.notNull(result.getJobId(), ()-> new SnailJobServerException("jobId can not be null")); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java index c2cdaaca..3e3d4a0b 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -64,7 +64,7 @@ public class WorkflowExecutorActor extends AbstractActor { @Override public Receive createReceive() { return receiveBuilder().match(WorkflowNodeTaskExecuteDTO.class, taskExecute -> { - log.debug("工作流开始执行. [{}]", JsonUtil.toJsonString(taskExecute)); + log.info("工作流开始执行. [{}]", JsonUtil.toJsonString(taskExecute)); try { doExecutor(taskExecute); @@ -72,8 +72,9 @@ public class WorkflowExecutorActor extends AbstractActor { } catch (Exception e) { SnailJobLog.LOCAL.error("workflow executor exception. [{}]", taskExecute, e); handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(), - JobOperationReasonEnum.TASK_EXECUTION_ERROR.getReason()); - SpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(taskExecute.getWorkflowTaskBatchId())); + JobOperationReasonEnum.TASK_EXECUTION_ERROR.getReason()); + SpringContext.getContext() + .publishEvent(new WorkflowTaskFailAlarmEvent(taskExecute.getWorkflowTaskBatchId())); } finally { getContext().stop(getSelf()); } @@ -84,14 +85,17 @@ public class WorkflowExecutorActor extends AbstractActor { WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectById(taskExecute.getWorkflowTaskBatchId()); Assert.notNull(workflowTaskBatch, () -> new SnailJobServerException("任务不存在")); - if (SystemConstants.ROOT.equals(taskExecute.getParentId()) && JobTaskBatchStatusEnum.WAITING.getStatus() == workflowTaskBatch.getTaskBatchStatus()) { - handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.RUNNING.getStatus(), JobOperationReasonEnum.NONE.getReason()); + if (SystemConstants.ROOT.equals(taskExecute.getParentId()) + && JobTaskBatchStatusEnum.WAITING.getStatus() == workflowTaskBatch.getTaskBatchStatus()) { + handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.RUNNING.getStatus(), + JobOperationReasonEnum.NONE.getReason()); Workflow workflow = workflowMapper.selectById(workflowTaskBatch.getWorkflowId()); - JobTimerWheel.clearCache(MessageFormat.format(WorkflowTimerTask.IDEMPOTENT_KEY_PREFIX, taskExecute.getWorkflowTaskBatchId())); + JobTimerWheel.clearCache( + MessageFormat.format(WorkflowTimerTask.IDEMPOTENT_KEY_PREFIX, taskExecute.getWorkflowTaskBatchId())); JobTimerWheel.registerWithWorkflow(() -> new WorkflowTimeoutCheckTask(taskExecute.getWorkflowTaskBatchId()), - Duration.ofSeconds(workflow.getExecutorTimeout())); + Duration.ofSeconds(workflow.getExecutorTimeout())); } // 获取DAG图 @@ -103,9 +107,18 @@ public class WorkflowExecutorActor extends AbstractActor { // 查到当前节点【ParentId】的所有兄弟节点是否有后继节点,若有则不能直接完成任务 Set allSuccessors = Sets.newHashSet(); for (Long nodeId : setView.immutableCopy()) { - allSuccessors.addAll(graph.successors(nodeId)); + Set successors = graph.successors(nodeId); + if (CollUtil.isNotEmpty(successors)) { + for (final Long successor : successors) { + // 寻找当前的节点的所有前序节点 + allSuccessors.addAll(graph.predecessors(successor)); + } + allSuccessors.addAll(successors); + } } + log.warn("父节点:[{}] 所有的节点:[{}]", taskExecute.getParentId(), allSuccessors); + // 若所有的兄弟节点的子节点都没有后继节点可以完成次任务 if (CollUtil.isEmpty(allSuccessors)) { workflowBatchHandler.complete(taskExecute.getWorkflowTaskBatchId(), workflowTaskBatch); @@ -114,35 +127,36 @@ public class WorkflowExecutorActor extends AbstractActor { // 添加父节点,为了判断父节点的处理状态 List allJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper() - .select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId, - JobTaskBatch::getTaskBatchStatus, JobTaskBatch::getOperationReason, JobTaskBatch::getId) - .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId()) - .in(JobTaskBatch::getWorkflowNodeId, - Sets.union(brotherNode, Sets.newHashSet(taskExecute.getParentId()))) + .select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId, + JobTaskBatch::getTaskBatchStatus, JobTaskBatch::getOperationReason, JobTaskBatch::getId) + .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId()) + .in(JobTaskBatch::getWorkflowNodeId, + Sets.union(allSuccessors, Sets.newHashSet(taskExecute.getParentId()))) ); List workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper() - .in(WorkflowNode::getId, Sets.union(allSuccessors, Sets.newHashSet(taskExecute.getParentId()))) - .orderByAsc(WorkflowNode::getPriorityLevel)); + .in(WorkflowNode::getId, Sets.union(allSuccessors, Sets.newHashSet(taskExecute.getParentId()))) + .orderByAsc(WorkflowNode::getPriorityLevel)); - Map> jobTaskBatchMap = StreamUtils.groupByKey(allJobTaskBatchList, JobTaskBatch::getWorkflowNodeId); + Map> jobTaskBatchMap = StreamUtils.groupByKey(allJobTaskBatchList, + JobTaskBatch::getWorkflowNodeId); Map workflowNodeMap = StreamUtils.toIdentityMap(workflowNodes, WorkflowNode::getId); List parentJobTaskBatchList = jobTaskBatchMap.get(taskExecute.getParentId()); // 如果父节点是无需处理则不再继续执行 - if (CollUtil.isNotEmpty(parentJobTaskBatchList) && - parentJobTaskBatchList.stream() - .map(JobTaskBatch::getOperationReason) - .filter(Objects::nonNull) - .anyMatch(JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION::contains)) { - workflowBatchHandler.complete(taskExecute.getWorkflowTaskBatchId(), workflowTaskBatch); - return; - } +// if (CollUtil.isNotEmpty(parentJobTaskBatchList) && +// parentJobTaskBatchList.stream() +// .map(JobTaskBatch::getOperationReason) +// .filter(Objects::nonNull) +// .anyMatch(JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION::contains)) { +// workflowBatchHandler.complete(taskExecute.getWorkflowTaskBatchId(), workflowTaskBatch); +// return; +// } WorkflowNode parentWorkflowNode = workflowNodeMap.get(taskExecute.getParentId()); // 失败策略处理 if (CollUtil.isNotEmpty(parentJobTaskBatchList) - && parentJobTaskBatchList.stream() + && parentJobTaskBatchList.stream() .map(JobTaskBatch::getTaskBatchStatus) .anyMatch(i -> i != JobTaskBatchStatusEnum.SUCCESS.getStatus())) { @@ -153,29 +167,31 @@ public class WorkflowExecutorActor extends AbstractActor { } // 决策节点 - if (Objects.nonNull(parentWorkflowNode) && WorkflowNodeTypeEnum.DECISION.getType() == parentWorkflowNode.getNodeType()) { + if (Objects.nonNull(parentWorkflowNode) + && WorkflowNodeTypeEnum.DECISION.getType() == parentWorkflowNode.getNodeType()) { // 获取决策节点子节点 Set successors = graph.successors(parentWorkflowNode.getId()); workflowNodes = workflowNodes.stream() - // 去掉父节点 - .filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId()) - // 过滤掉非当前决策节点【ParentId】的子节点 - && successors.contains(workflowNode.getId())).collect(Collectors.toList()); + // 去掉父节点 + .filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId()) + // 过滤掉非当前决策节点【ParentId】的子节点 + && successors.contains(workflowNode.getId())).collect(Collectors.toList()); } else { - if (!brotherNodeIsComplete(taskExecute, brotherNode, jobTaskBatchMap, parentWorkflowNode)) { - return; - } + // TODO 不通过兄弟节点去控制是否执行后续节点 +// if (!brotherNodeIsComplete(taskExecute, brotherNode, jobTaskBatchMap, parentWorkflowNode)) { +// return; +// } workflowNodes = workflowNodes.stream() - // 去掉父节点 - .filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId())) - .collect(Collectors.toList()); + // 去掉父节点 + .filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId())) + .collect(Collectors.toList()); // TODO 合并job task的结果到全局上下文中 // 此次的并发数与当时父节点的兄弟节点的数量一致 workflowBatchHandler.mergeWorkflowContextAndRetry(workflowTaskBatch, - StreamUtils.toSet(allJobTaskBatchList, JobTaskBatch::getId)); + StreamUtils.toSet(allJobTaskBatchList, JobTaskBatch::getId)); } List jobs = jobMapper.selectBatchIds(StreamUtils.toSet(workflowNodes, WorkflowNode::getJobId)); @@ -183,6 +199,7 @@ public class WorkflowExecutorActor extends AbstractActor { // 只会条件节点会使用 Object evaluationResult = null; + log.info("待执行的节点为. workflowNodes:[{}]", StreamUtils.toList(workflowNodes, WorkflowNode::getId)); for (WorkflowNode workflowNode : workflowNodes) { // 批次已经存在就不在重复生成 @@ -191,6 +208,41 @@ public class WorkflowExecutorActor extends AbstractActor { continue; } + // 决策当前节点要不要执行 + Set predecessors = graph.predecessors(workflowNode.getId()); + boolean predecessorsComplete = arePredecessorsComplete(predecessors, jobTaskBatchMap, workflowNode); + if (!SystemConstants.ROOT.equals(taskExecute.getParentId()) && !predecessorsComplete) { + continue; + } +// if (!SystemConstants.ROOT.equals(taskExecute.getParentId())) { +// boolean isConinue = true; +// for (final Long predecessor : predecessors) { +// if (SystemConstants.ROOT.equals(predecessor)) { +// continue; +// } +// List jobTaskBatches = jobTaskBatchMap.get(predecessor); +// // 说明此节点未执行, 继续等待执行完成 +// if (CollUtil.isEmpty(jobTaskBatches)) { +// SnailJobLog.LOCAL.info("批次为空存在未完成的兄弟节点. [{}] 待执行节点:[{}]", predecessor, workflowNode.getId()); +// isConinue = Boolean.FALSE; +// continue; +// } +// +// boolean isCompleted = jobTaskBatches.stream().anyMatch( +// jobTaskBatch -> JobTaskBatchStatusEnum.NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus())); +// if (isCompleted) { +// SnailJobLog.LOCAL.info("存在未完成的兄弟节点. [{}] 待执行节点:[{}]", predecessor, workflowNode.getId()); +// isConinue = Boolean.FALSE; +// } +// } +// +// // TODO 理论上都不会执行应该 return +// if (!isConinue) { +// continue; +// } +// +// } + // 执行DAG中的节点 WorkflowExecutor workflowExecutor = WorkflowExecutorFactory.getWorkflowExecutor(workflowNode.getNodeType()); @@ -202,6 +254,11 @@ public class WorkflowExecutorActor extends AbstractActor { context.setTaskBatchId(taskExecute.getTaskBatchId()); context.setTaskExecutorScene(taskExecute.getTaskExecutorScene()); context.setWfContext(workflowTaskBatch.getWfContext()); + // 这里父节点取最新的批次判断状态 + if (CollUtil.isNotEmpty(parentJobTaskBatchList)) { + context.setParentOperationReason(parentJobTaskBatchList.get(0).getOperationReason()); + } + workflowExecutor.execute(context); evaluationResult = context.getEvaluationResult(); @@ -209,31 +266,32 @@ public class WorkflowExecutorActor extends AbstractActor { } - private boolean brotherNodeIsComplete(WorkflowNodeTaskExecuteDTO taskExecute, Set brotherNode, - Map> jobTaskBatchMap, WorkflowNode parentWorkflowNode) { - - if (SystemConstants.ROOT.equals(taskExecute.getParentId())) { - return Boolean.TRUE; - } + private boolean arePredecessorsComplete(Set predecessors, + Map> jobTaskBatchMap, WorkflowNode waitExecWorkflowNode) { // 决策节点不需要等待其他的兄弟节点是否完成,一个完成直接流转到后继节点 - if (WorkflowNodeTypeEnum.DECISION.getType() == parentWorkflowNode.getNodeType()) { - return Boolean.TRUE; - } +// if (WorkflowNodeTypeEnum.DECISION.getType() == parentWorkflowNode.getNodeType()) { +// return Boolean.TRUE; +// } // 判断所有节点是否都完成 - for (final Long nodeId : brotherNode) { + for (final Long nodeId : predecessors) { + if (SystemConstants.ROOT.equals(nodeId)) { + continue; + } + List jobTaskBatches = jobTaskBatchMap.get(nodeId); // 说明此节点未执行, 继续等待执行完成 if (CollUtil.isEmpty(jobTaskBatches)) { - SnailJobLog.LOCAL.debug("存在未完成的兄弟节点. [{}]", nodeId); + SnailJobLog.LOCAL.info("批次为空存在未完成的兄弟节点. [{}] 待执行节点:[{}]", nodeId, + waitExecWorkflowNode.getId()); return Boolean.FALSE; } boolean isCompleted = jobTaskBatches.stream().anyMatch( - jobTaskBatch -> JobTaskBatchStatusEnum.NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus())); + jobTaskBatch -> JobTaskBatchStatusEnum.NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus())); if (isCompleted) { - SnailJobLog.LOCAL.debug("存在未完成的兄弟节点. [{}]", nodeId); + SnailJobLog.LOCAL.info("存在未完成的兄弟节点. [{}] 待执行节点:[{}]", nodeId, waitExecWorkflowNode.getId()); return Boolean.FALSE; } } @@ -250,7 +308,7 @@ public class WorkflowExecutorActor extends AbstractActor { jobTaskBatch.setOperationReason(operationReason); jobTaskBatch.setUpdateDt(LocalDateTime.now()); Assert.isTrue(1 == workflowTaskBatchMapper.updateById(jobTaskBatch), - () -> new SnailJobServerException("更新任务失败")); + () -> new SnailJobServerException("更新任务失败")); } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java index cd9d32c9..00533559 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/DecisionWorkflowExecutor.java @@ -21,6 +21,7 @@ import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.google.common.collect.Sets; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -28,6 +29,9 @@ import org.springframework.stereotype.Component; import java.util.Objects; import java.util.Optional; +import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_DECISION_FAILED; +import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED; + /** * @author xiaowoniu * @date 2023-12-24 08:17:11 @@ -59,7 +63,7 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor { Boolean result = (Boolean) Optional.ofNullable(context.getEvaluationResult()).orElse(Boolean.FALSE); - if (result) { + if (result || (Sets.newHashSet(WORKFLOW_NODE_NO_REQUIRED.getReason(), WORKFLOW_DECISION_FAILED.getReason()).contains( context.getParentOperationReason()))) { // 多个条件节点直接是或的关系,只要一个成功其他节点就取消且是无需处理状态 taskBatchStatus = JobTaskBatchStatusEnum.CANCEL.getStatus(); jobTaskStatus = JobTaskStatusEnum.CANCEL.getStatus(); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java index 8bf8d834..6ee289e0 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java @@ -10,11 +10,17 @@ import com.aizuda.snailjob.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; +import com.google.common.collect.Sets; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; import java.util.Objects; +import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_DECISION_FAILED; +import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION; +import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED; +import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION; + /** * @author xiaowoniu * @date 2023-12-24 08:09:14 @@ -36,7 +42,8 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor { @Override protected void afterExecute(WorkflowExecutorContext context) { - if (Objects.equals(context.getWorkflowNodeStatus(), StatusEnum.YES.getStatus())) { + if (!Sets.newHashSet(WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason(), WORKFLOW_NODE_NO_REQUIRED.getReason(), WORKFLOW_DECISION_FAILED.getReason()) + .contains(context.getOperationReason())) { return; } @@ -51,7 +58,7 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor { jobLogMetaDTO.setTaskId(jobTask.getId()); SnailJobLog.REMOTE.warn("节点[{}]已取消任务执行. 取消原因: 任务已关闭. <|>{}<|>", - context.getWorkflowNodeId(), jobLogMetaDTO); + context.getWorkflowNodeId(), jobLogMetaDTO); } @Override @@ -64,11 +71,14 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor { if (Objects.equals(context.getWorkflowNodeStatus(), StatusEnum.NO.getStatus())) { context.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus()); - context.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason()); + context.setOperationReason(WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason()); context.setJobTaskStatus(JobTaskStatusEnum.CANCEL.getStatus()); - // 执行下一个节点 - workflowTaskExecutor(context); + } else if (Sets.newHashSet(WORKFLOW_NODE_NO_REQUIRED.getReason(), WORKFLOW_DECISION_FAILED.getReason()).contains( context.getParentOperationReason())) { + // 针对无需处理的批次直接新增一个记录 + context.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus()); + context.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason()); + context.setJobTaskStatus(JobTaskStatusEnum.CANCEL.getStatus()); } else { invokeJobTask(context); } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/WorkflowExecutorContext.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/WorkflowExecutorContext.java index bd3491c0..8c7b4e03 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/WorkflowExecutorContext.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/WorkflowExecutorContext.java @@ -38,6 +38,16 @@ public class WorkflowExecutorContext { */ private Long parentWorkflowNodeId; + /** + * TODO 父节点批次状态 + */ + private Integer parentJobTaskStatus; + + /** + * 父节点批次操作原因状态 + */ + private Integer parentOperationReason; + /** * 任务属性 */ @@ -107,4 +117,5 @@ public class WorkflowExecutorContext { * 工作流全局上下文 */ private String wfContext; + } diff --git a/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/schedule/OfflineNodeSchedule.java b/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/schedule/OfflineNodeSchedule.java index 9e777670..6eb46d16 100644 --- a/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/schedule/OfflineNodeSchedule.java +++ b/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/schedule/OfflineNodeSchedule.java @@ -48,7 +48,7 @@ public class OfflineNodeSchedule extends AbstractSchedule implements Lifecycle { .le(ServerNode::getExpireAt, endTime)); if (CollUtil.isNotEmpty(serverNodes)) { // 先删除DB中需要下线的机器 - serverNodeMapper.deleteBatchIds(StreamUtils.toSet(serverNodes, ServerNode::getId)); + serverNodeMapper.deleteByIds(StreamUtils.toSet(serverNodes, ServerNode::getId)); } Set allPods = CacheRegisterTable.getAllPods(); diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/handler/WorkflowHandler.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/handler/WorkflowHandler.java index 7782d778..ca8239d2 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/handler/WorkflowHandler.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/handler/WorkflowHandler.java @@ -190,7 +190,7 @@ public class WorkflowHandler { graph, version); } else { if (WorkflowNodeTypeEnum.DECISION.getType() == nodeConfig.getNodeType()) { - throw new SnailJobServerException("决策节点不能作为叶子节点"); + throw new SnailJobServerException("决策节点或者决策节点的后继节点不能作为叶子节点"); } // 叶子节点记录一下