fix(sj_1.1.0-beta2): 优化条件节点的执行
This commit is contained in:
parent
9c34d2c660
commit
9dee6ac587
@ -43,6 +43,8 @@ import java.time.LocalDateTime;
|
|||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author: xiaowoniu
|
* @author: xiaowoniu
|
||||||
* @date : 2023-12-22 10:34
|
* @date : 2023-12-22 10:34
|
||||||
@ -210,38 +212,10 @@ public class WorkflowExecutorActor extends AbstractActor {
|
|||||||
|
|
||||||
// 决策当前节点要不要执行
|
// 决策当前节点要不要执行
|
||||||
Set<Long> predecessors = graph.predecessors(workflowNode.getId());
|
Set<Long> predecessors = graph.predecessors(workflowNode.getId());
|
||||||
boolean predecessorsComplete = arePredecessorsComplete(predecessors, jobTaskBatchMap, workflowNode);
|
boolean predecessorsComplete = arePredecessorsComplete(taskExecute, predecessors, jobTaskBatchMap, workflowNode, parentJobTaskBatchList);
|
||||||
if (!SystemConstants.ROOT.equals(taskExecute.getParentId()) && !predecessorsComplete) {
|
if (!SystemConstants.ROOT.equals(taskExecute.getParentId()) && !predecessorsComplete) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// if (!SystemConstants.ROOT.equals(taskExecute.getParentId())) {
|
|
||||||
// boolean isConinue = true;
|
|
||||||
// for (final Long predecessor : predecessors) {
|
|
||||||
// if (SystemConstants.ROOT.equals(predecessor)) {
|
|
||||||
// continue;
|
|
||||||
// }
|
|
||||||
// List<JobTaskBatch> jobTaskBatches = jobTaskBatchMap.get(predecessor);
|
|
||||||
// // 说明此节点未执行, 继续等待执行完成
|
|
||||||
// if (CollUtil.isEmpty(jobTaskBatches)) {
|
|
||||||
// SnailJobLog.LOCAL.info("批次为空存在未完成的兄弟节点. [{}] 待执行节点:[{}]", predecessor, workflowNode.getId());
|
|
||||||
// isConinue = Boolean.FALSE;
|
|
||||||
// continue;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// boolean isCompleted = jobTaskBatches.stream().anyMatch(
|
|
||||||
// jobTaskBatch -> JobTaskBatchStatusEnum.NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus()));
|
|
||||||
// if (isCompleted) {
|
|
||||||
// SnailJobLog.LOCAL.info("存在未完成的兄弟节点. [{}] 待执行节点:[{}]", predecessor, workflowNode.getId());
|
|
||||||
// isConinue = Boolean.FALSE;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // TODO 理论上都不会执行应该 return
|
|
||||||
// if (!isConinue) {
|
|
||||||
// continue;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// }
|
|
||||||
|
|
||||||
// 执行DAG中的节点
|
// 执行DAG中的节点
|
||||||
WorkflowExecutor workflowExecutor = WorkflowExecutorFactory.getWorkflowExecutor(workflowNode.getNodeType());
|
WorkflowExecutor workflowExecutor = WorkflowExecutorFactory.getWorkflowExecutor(workflowNode.getNodeType());
|
||||||
@ -266,14 +240,12 @@ public class WorkflowExecutorActor extends AbstractActor {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean arePredecessorsComplete(Set<Long> predecessors,
|
private boolean arePredecessorsComplete(final WorkflowNodeTaskExecuteDTO taskExecute, Set<Long> predecessors,
|
||||||
Map<Long, List<JobTaskBatch>> jobTaskBatchMap, WorkflowNode waitExecWorkflowNode) {
|
Map<Long, List<JobTaskBatch>> jobTaskBatchMap, WorkflowNode waitExecWorkflowNode,
|
||||||
|
List<JobTaskBatch> parentJobTaskBatchList) {
|
||||||
// 决策节点不需要等待其他的兄弟节点是否完成,一个完成直接流转到后继节点
|
|
||||||
// if (WorkflowNodeTypeEnum.DECISION.getType() == parentWorkflowNode.getNodeType()) {
|
|
||||||
// return Boolean.TRUE;
|
|
||||||
// }
|
|
||||||
|
|
||||||
|
// 是否存在无需处理的节点
|
||||||
|
List<JobTaskBatch> isExistedNotSkipJobTaskBatches = new ArrayList<>();
|
||||||
// 判断所有节点是否都完成
|
// 判断所有节点是否都完成
|
||||||
for (final Long nodeId : predecessors) {
|
for (final Long nodeId : predecessors) {
|
||||||
if (SystemConstants.ROOT.equals(nodeId)) {
|
if (SystemConstants.ROOT.equals(nodeId)) {
|
||||||
@ -291,9 +263,35 @@ public class WorkflowExecutorActor extends AbstractActor {
|
|||||||
boolean isCompleted = jobTaskBatches.stream().anyMatch(
|
boolean isCompleted = jobTaskBatches.stream().anyMatch(
|
||||||
jobTaskBatch -> JobTaskBatchStatusEnum.NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus()));
|
jobTaskBatch -> JobTaskBatchStatusEnum.NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus()));
|
||||||
if (isCompleted) {
|
if (isCompleted) {
|
||||||
SnailJobLog.LOCAL.info("存在未完成的兄弟节点. [{}] 待执行节点:[{}]", nodeId, waitExecWorkflowNode.getId());
|
SnailJobLog.LOCAL.info("存在未完成的兄弟节点. [{}] 待执行节点:[{}] parentId:[{}]", nodeId, taskExecute.getParentId(),
|
||||||
|
waitExecWorkflowNode.getId());
|
||||||
return Boolean.FALSE;
|
return Boolean.FALSE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (CollUtil.isEmpty(isExistedNotSkipJobTaskBatches)) {
|
||||||
|
isExistedNotSkipJobTaskBatches = jobTaskBatches.stream().filter(
|
||||||
|
jobTaskBatch -> !WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(jobTaskBatch.getOperationReason())).toList();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// 父节点是否存在无需处理的节点,若存在一个不是的则需要等待正常的节点处理
|
||||||
|
// 如果父节点是无需处理则不再继续执行
|
||||||
|
if (CollUtil.isNotEmpty(parentJobTaskBatchList) &&
|
||||||
|
parentJobTaskBatchList.stream()
|
||||||
|
.map(JobTaskBatch::getOperationReason)
|
||||||
|
.filter(Objects::nonNull)
|
||||||
|
.anyMatch(JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION::contains)
|
||||||
|
&& CollUtil.isNotEmpty(isExistedNotSkipJobTaskBatches)) {
|
||||||
|
/*
|
||||||
|
等待正常的节点来执行此节点,若正常节点已经调度过了,此时则没有能触发后继节点继续调度的节点存在了。
|
||||||
|
因此这里将重新选当前节点的前驱节点中选一个作为父节点来触发,使之能够继续往后执行
|
||||||
|
基于性能的考虑这里在直接在parentJobTaskBatchList列表的头节点插入一个不是跳过的节点,这样就可以正常流转了
|
||||||
|
eg: {"-1":[480],"480":[481,488,490],"481":[482],"482":[483],"483":[484],"484":[485],"485":[486],"486":[487],"487":[497,498],"488":[489],"489":[497,498],"490":[491,493,495],"491":[492],"492":[497,498],"493":[494],"494":[497,498],"495":[496],"496":[497,498],"497":[499],"498":[499],"499":[]}
|
||||||
|
*/
|
||||||
|
log.warn("-->>> isExistedNotSkip:[{}] nodeId:[{}] parentId:[{}]", CollUtil.isNotEmpty(isExistedNotSkipJobTaskBatches), waitExecWorkflowNode.getId(), taskExecute.getParentId());
|
||||||
|
parentJobTaskBatchList.add(0, isExistedNotSkipJobTaskBatches.get(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
return Boolean.TRUE;
|
return Boolean.TRUE;
|
||||||
|
@ -15,6 +15,7 @@ import lombok.RequiredArgsConstructor;
|
|||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_DECISION_FAILED;
|
import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_DECISION_FAILED;
|
||||||
import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION;
|
import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION;
|
||||||
@ -30,6 +31,9 @@ import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKF
|
|||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
|
public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
|
||||||
|
|
||||||
|
private static final Set<Integer> NO_REQUIRED_CONFIG = Sets.newHashSet(WORKFLOW_NODE_NO_REQUIRED.getReason(),
|
||||||
|
WORKFLOW_DECISION_FAILED.getReason());
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public WorkflowNodeTypeEnum getWorkflowNodeType() {
|
public WorkflowNodeTypeEnum getWorkflowNodeType() {
|
||||||
return WorkflowNodeTypeEnum.JOB_TASK;
|
return WorkflowNodeTypeEnum.JOB_TASK;
|
||||||
@ -42,23 +46,7 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void afterExecute(WorkflowExecutorContext context) {
|
protected void afterExecute(WorkflowExecutorContext context) {
|
||||||
if (!Sets.newHashSet(WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason(), WORKFLOW_NODE_NO_REQUIRED.getReason(), WORKFLOW_DECISION_FAILED.getReason())
|
|
||||||
.contains(context.getOperationReason())) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
JobTaskBatch jobTaskBatch = generateJobTaskBatch(context);
|
|
||||||
JobTask jobTask = generateJobTask(context, jobTaskBatch);
|
|
||||||
|
|
||||||
JobLogMetaDTO jobLogMetaDTO = new JobLogMetaDTO();
|
|
||||||
jobLogMetaDTO.setNamespaceId(context.getNamespaceId());
|
|
||||||
jobLogMetaDTO.setGroupName(context.getGroupName());
|
|
||||||
jobLogMetaDTO.setTaskBatchId(jobTaskBatch.getId());
|
|
||||||
jobLogMetaDTO.setJobId(context.getJobId());
|
|
||||||
jobLogMetaDTO.setTaskId(jobTask.getId());
|
|
||||||
|
|
||||||
SnailJobLog.REMOTE.warn("节点[{}]已取消任务执行. 取消原因: 任务已关闭. <|>{}<|>",
|
|
||||||
context.getWorkflowNodeId(), jobLogMetaDTO);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -69,16 +57,22 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
|
|||||||
@Override
|
@Override
|
||||||
protected void doExecute(WorkflowExecutorContext context) {
|
protected void doExecute(WorkflowExecutorContext context) {
|
||||||
|
|
||||||
if (Objects.equals(context.getWorkflowNodeStatus(), StatusEnum.NO.getStatus())) {
|
if (NO_REQUIRED_CONFIG.contains(context.getParentOperationReason())) {
|
||||||
context.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
|
|
||||||
context.setOperationReason(WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason());
|
|
||||||
context.setJobTaskStatus(JobTaskStatusEnum.CANCEL.getStatus());
|
|
||||||
|
|
||||||
} else if (Sets.newHashSet(WORKFLOW_NODE_NO_REQUIRED.getReason(), WORKFLOW_DECISION_FAILED.getReason()).contains( context.getParentOperationReason())) {
|
|
||||||
// 针对无需处理的批次直接新增一个记录
|
// 针对无需处理的批次直接新增一个记录
|
||||||
context.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
|
context.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
|
||||||
context.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason());
|
context.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason());
|
||||||
context.setJobTaskStatus(JobTaskStatusEnum.CANCEL.getStatus());
|
context.setJobTaskStatus(JobTaskStatusEnum.CANCEL.getStatus());
|
||||||
|
|
||||||
|
// 创建批次和任务节点
|
||||||
|
invokeCancelJobTask(context);
|
||||||
|
} else if (Objects.equals(context.getWorkflowNodeStatus(), StatusEnum.NO.getStatus())) {
|
||||||
|
// 针对无需处理的批次直接新增一个记录
|
||||||
|
context.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
|
||||||
|
context.setOperationReason(WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason());
|
||||||
|
context.setJobTaskStatus(JobTaskStatusEnum.CANCEL.getStatus());
|
||||||
|
|
||||||
|
// 创建批次和任务节点
|
||||||
|
invokeCancelJobTask(context);
|
||||||
} else {
|
} else {
|
||||||
invokeJobTask(context);
|
invokeJobTask(context);
|
||||||
}
|
}
|
||||||
@ -93,4 +87,20 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
|
|||||||
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
|
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
|
||||||
actorRef.tell(jobTaskPrepare, actorRef);
|
actorRef.tell(jobTaskPrepare, actorRef);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void invokeCancelJobTask(final WorkflowExecutorContext context) {
|
||||||
|
|
||||||
|
JobTaskBatch jobTaskBatch = generateJobTaskBatch(context);
|
||||||
|
JobTask jobTask = generateJobTask(context, jobTaskBatch);
|
||||||
|
|
||||||
|
JobLogMetaDTO jobLogMetaDTO = new JobLogMetaDTO();
|
||||||
|
jobLogMetaDTO.setNamespaceId(context.getNamespaceId());
|
||||||
|
jobLogMetaDTO.setGroupName(context.getGroupName());
|
||||||
|
jobLogMetaDTO.setTaskBatchId(jobTaskBatch.getId());
|
||||||
|
jobLogMetaDTO.setJobId(context.getJobId());
|
||||||
|
jobLogMetaDTO.setTaskId(jobTask.getId());
|
||||||
|
|
||||||
|
SnailJobLog.REMOTE.warn("节点[{}]已取消任务执行. 取消原因: 任务已关闭. <|>{}<|>",
|
||||||
|
context.getWorkflowNodeId(), jobLogMetaDTO);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user