From 9dee6ac58783c5074fc1bc450c2aa8f4298f68f2 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Sun, 30 Jun 2024 14:03:51 +0800 Subject: [PATCH] =?UTF-8?q?fix(sj=5F1.1.0-beta2):=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E6=9D=A1=E4=BB=B6=E8=8A=82=E7=82=B9=E7=9A=84=E6=89=A7=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dispatch/WorkflowExecutorActor.java | 72 +++++++++---------- .../workflow/JobTaskWorkflowExecutor.java | 54 ++++++++------ 2 files changed, 67 insertions(+), 59 deletions(-) diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java index 3e3d4a0b0..3e9c6bf4e 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -43,6 +43,8 @@ import java.time.LocalDateTime; import java.util.*; import java.util.stream.Collectors; +import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION; + /** * @author: xiaowoniu * @date : 2023-12-22 10:34 @@ -210,38 +212,10 @@ public class WorkflowExecutorActor extends AbstractActor { // 决策当前节点要不要执行 Set predecessors = graph.predecessors(workflowNode.getId()); - boolean predecessorsComplete = arePredecessorsComplete(predecessors, jobTaskBatchMap, workflowNode); + boolean predecessorsComplete = arePredecessorsComplete(taskExecute, predecessors, jobTaskBatchMap, workflowNode, parentJobTaskBatchList); if (!SystemConstants.ROOT.equals(taskExecute.getParentId()) && !predecessorsComplete) { continue; } -// if (!SystemConstants.ROOT.equals(taskExecute.getParentId())) { -// boolean isConinue = true; -// for (final Long predecessor : predecessors) { -// if (SystemConstants.ROOT.equals(predecessor)) { -// continue; -// } -// List jobTaskBatches = jobTaskBatchMap.get(predecessor); -// // 说明此节点未执行, 继续等待执行完成 -// if (CollUtil.isEmpty(jobTaskBatches)) { -// SnailJobLog.LOCAL.info("批次为空存在未完成的兄弟节点. [{}] 待执行节点:[{}]", predecessor, workflowNode.getId()); -// isConinue = Boolean.FALSE; -// continue; -// } -// -// boolean isCompleted = jobTaskBatches.stream().anyMatch( -// jobTaskBatch -> JobTaskBatchStatusEnum.NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus())); -// if (isCompleted) { -// SnailJobLog.LOCAL.info("存在未完成的兄弟节点. [{}] 待执行节点:[{}]", predecessor, workflowNode.getId()); -// isConinue = Boolean.FALSE; -// } -// } -// -// // TODO 理论上都不会执行应该 return -// if (!isConinue) { -// continue; -// } -// -// } // 执行DAG中的节点 WorkflowExecutor workflowExecutor = WorkflowExecutorFactory.getWorkflowExecutor(workflowNode.getNodeType()); @@ -266,14 +240,12 @@ public class WorkflowExecutorActor extends AbstractActor { } - private boolean arePredecessorsComplete(Set predecessors, - Map> jobTaskBatchMap, WorkflowNode waitExecWorkflowNode) { - - // 决策节点不需要等待其他的兄弟节点是否完成,一个完成直接流转到后继节点 -// if (WorkflowNodeTypeEnum.DECISION.getType() == parentWorkflowNode.getNodeType()) { -// return Boolean.TRUE; -// } + private boolean arePredecessorsComplete(final WorkflowNodeTaskExecuteDTO taskExecute, Set predecessors, + Map> jobTaskBatchMap, WorkflowNode waitExecWorkflowNode, + List parentJobTaskBatchList) { + // 是否存在无需处理的节点 + List isExistedNotSkipJobTaskBatches = new ArrayList<>(); // 判断所有节点是否都完成 for (final Long nodeId : predecessors) { if (SystemConstants.ROOT.equals(nodeId)) { @@ -291,9 +263,35 @@ public class WorkflowExecutorActor extends AbstractActor { boolean isCompleted = jobTaskBatches.stream().anyMatch( jobTaskBatch -> JobTaskBatchStatusEnum.NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus())); if (isCompleted) { - SnailJobLog.LOCAL.info("存在未完成的兄弟节点. [{}] 待执行节点:[{}]", nodeId, waitExecWorkflowNode.getId()); + SnailJobLog.LOCAL.info("存在未完成的兄弟节点. [{}] 待执行节点:[{}] parentId:[{}]", nodeId, taskExecute.getParentId(), + waitExecWorkflowNode.getId()); return Boolean.FALSE; } + + if (CollUtil.isEmpty(isExistedNotSkipJobTaskBatches)) { + isExistedNotSkipJobTaskBatches = jobTaskBatches.stream().filter( + jobTaskBatch -> !WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(jobTaskBatch.getOperationReason())).toList(); + + } + + } + + // 父节点是否存在无需处理的节点,若存在一个不是的则需要等待正常的节点处理 + // 如果父节点是无需处理则不再继续执行 + if (CollUtil.isNotEmpty(parentJobTaskBatchList) && + parentJobTaskBatchList.stream() + .map(JobTaskBatch::getOperationReason) + .filter(Objects::nonNull) + .anyMatch(JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION::contains) + && CollUtil.isNotEmpty(isExistedNotSkipJobTaskBatches)) { + /* + 等待正常的节点来执行此节点,若正常节点已经调度过了,此时则没有能触发后继节点继续调度的节点存在了。 + 因此这里将重新选当前节点的前驱节点中选一个作为父节点来触发,使之能够继续往后执行 + 基于性能的考虑这里在直接在parentJobTaskBatchList列表的头节点插入一个不是跳过的节点,这样就可以正常流转了 + eg: {"-1":[480],"480":[481,488,490],"481":[482],"482":[483],"483":[484],"484":[485],"485":[486],"486":[487],"487":[497,498],"488":[489],"489":[497,498],"490":[491,493,495],"491":[492],"492":[497,498],"493":[494],"494":[497,498],"495":[496],"496":[497,498],"497":[499],"498":[499],"499":[]} + */ + log.warn("-->>> isExistedNotSkip:[{}] nodeId:[{}] parentId:[{}]", CollUtil.isNotEmpty(isExistedNotSkipJobTaskBatches), waitExecWorkflowNode.getId(), taskExecute.getParentId()); + parentJobTaskBatchList.add(0, isExistedNotSkipJobTaskBatches.get(0)); } return Boolean.TRUE; diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java index 6ee289e0f..bae540e51 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java @@ -15,6 +15,7 @@ import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; import java.util.Objects; +import java.util.Set; import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_DECISION_FAILED; import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION; @@ -30,6 +31,9 @@ import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKF @RequiredArgsConstructor public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor { + private static final Set NO_REQUIRED_CONFIG = Sets.newHashSet(WORKFLOW_NODE_NO_REQUIRED.getReason(), + WORKFLOW_DECISION_FAILED.getReason()); + @Override public WorkflowNodeTypeEnum getWorkflowNodeType() { return WorkflowNodeTypeEnum.JOB_TASK; @@ -42,23 +46,7 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor { @Override protected void afterExecute(WorkflowExecutorContext context) { - if (!Sets.newHashSet(WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason(), WORKFLOW_NODE_NO_REQUIRED.getReason(), WORKFLOW_DECISION_FAILED.getReason()) - .contains(context.getOperationReason())) { - return; - } - JobTaskBatch jobTaskBatch = generateJobTaskBatch(context); - JobTask jobTask = generateJobTask(context, jobTaskBatch); - - JobLogMetaDTO jobLogMetaDTO = new JobLogMetaDTO(); - jobLogMetaDTO.setNamespaceId(context.getNamespaceId()); - jobLogMetaDTO.setGroupName(context.getGroupName()); - jobLogMetaDTO.setTaskBatchId(jobTaskBatch.getId()); - jobLogMetaDTO.setJobId(context.getJobId()); - jobLogMetaDTO.setTaskId(jobTask.getId()); - - SnailJobLog.REMOTE.warn("节点[{}]已取消任务执行. 取消原因: 任务已关闭. <|>{}<|>", - context.getWorkflowNodeId(), jobLogMetaDTO); } @Override @@ -69,16 +57,22 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor { @Override protected void doExecute(WorkflowExecutorContext context) { - if (Objects.equals(context.getWorkflowNodeStatus(), StatusEnum.NO.getStatus())) { - context.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus()); - context.setOperationReason(WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason()); - context.setJobTaskStatus(JobTaskStatusEnum.CANCEL.getStatus()); - - } else if (Sets.newHashSet(WORKFLOW_NODE_NO_REQUIRED.getReason(), WORKFLOW_DECISION_FAILED.getReason()).contains( context.getParentOperationReason())) { + if (NO_REQUIRED_CONFIG.contains(context.getParentOperationReason())) { // 针对无需处理的批次直接新增一个记录 context.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus()); context.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason()); context.setJobTaskStatus(JobTaskStatusEnum.CANCEL.getStatus()); + + // 创建批次和任务节点 + invokeCancelJobTask(context); + } else if (Objects.equals(context.getWorkflowNodeStatus(), StatusEnum.NO.getStatus())) { + // 针对无需处理的批次直接新增一个记录 + context.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus()); + context.setOperationReason(WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason()); + context.setJobTaskStatus(JobTaskStatusEnum.CANCEL.getStatus()); + + // 创建批次和任务节点 + invokeCancelJobTask(context); } else { invokeJobTask(context); } @@ -93,4 +87,20 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor { ActorRef actorRef = ActorGenerator.jobTaskPrepareActor(); actorRef.tell(jobTaskPrepare, actorRef); } + + private void invokeCancelJobTask(final WorkflowExecutorContext context) { + + JobTaskBatch jobTaskBatch = generateJobTaskBatch(context); + JobTask jobTask = generateJobTask(context, jobTaskBatch); + + JobLogMetaDTO jobLogMetaDTO = new JobLogMetaDTO(); + jobLogMetaDTO.setNamespaceId(context.getNamespaceId()); + jobLogMetaDTO.setGroupName(context.getGroupName()); + jobLogMetaDTO.setTaskBatchId(jobTaskBatch.getId()); + jobLogMetaDTO.setJobId(context.getJobId()); + jobLogMetaDTO.setTaskId(jobTask.getId()); + + SnailJobLog.REMOTE.warn("节点[{}]已取消任务执行. 取消原因: 任务已关闭. <|>{}<|>", + context.getWorkflowNodeId(), jobLogMetaDTO); + } }