From 0c29d86047eb48f3ced76827a7c5d2b40cf64c70 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Mon, 17 Jun 2024 23:34:32 +0800 Subject: [PATCH] =?UTF-8?q?feat(sj=5F1.1.0):=20=E4=BF=AE=E5=A4=8D=E5=B7=A5?= =?UTF-8?q?=E4=BD=9C=E6=B5=81=E5=AD=90=E8=8A=82=E7=82=B9=E4=B8=8D=E8=A7=A6?= =?UTF-8?q?=E5=8F=91=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../support/dispatch/JobExecutorActor.java | 17 +++++++------ .../dispatch/WorkflowExecutorActor.java | 25 +++++++++++-------- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java index 3d84440b8..47f795fa5 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java @@ -6,9 +6,7 @@ import cn.hutool.core.lang.Assert; import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.constant.SystemConstants; import com.aizuda.snailjob.common.core.context.SpringContext; -import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum; -import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; -import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.common.core.enums.*; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.akka.ActorGenerator; @@ -18,7 +16,6 @@ import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO; import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO; -import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum; import com.aizuda.snailjob.server.job.task.support.JobExecutor; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent; @@ -59,6 +56,9 @@ import java.time.LocalDateTime; import java.util.List; import java.util.Objects; +import static com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum.MAP; +import static com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum.MAP_REDUCE; + /** * @author: opensnail * @date : 2023-09-25 17:41 @@ -138,10 +138,11 @@ public class JobExecutorActor extends AbstractActor { JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(job.getTaskType()); JobTaskGenerateContext instanceGenerateContext = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job); instanceGenerateContext.setTaskBatchId(taskExecute.getTaskBatchId()); - instanceGenerateContext.setTaskName(SystemConstants.MAP_ROOT); - instanceGenerateContext.setMapSubTask(Lists.newArrayList(StrUtil.EMPTY)); - // TODO 此处需要判断任务类型 - instanceGenerateContext.setMrStage(MapReduceStageEnum.MAP.getStage()); + if (Lists.newArrayList(MAP_REDUCE.getType(), MAP.getType()).contains(job.getTaskType())) { + instanceGenerateContext.setTaskName(SystemConstants.MAP_ROOT); + instanceGenerateContext.setMapSubTask(Lists.newArrayList(StrUtil.EMPTY)); + instanceGenerateContext.setMrStage(MapReduceStageEnum.MAP.getStage()); + } List taskList = taskInstance.generate(instanceGenerateContext); if (CollUtil.isEmpty(taskList)) { SnailJobLog.LOCAL.warn("Generate job task is empty, taskBatchId:[{}]", taskExecute.getTaskBatchId()); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java index c316a2020..157a88fd8 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -40,10 +40,7 @@ import org.springframework.stereotype.Component; import java.text.MessageFormat; import java.time.Duration; import java.time.LocalDateTime; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; /** @@ -101,14 +98,22 @@ public class WorkflowExecutorActor extends AbstractActor { String flowInfo = workflowTaskBatch.getFlowInfo(); MutableGraph graph = MutableGraphCache.getOrDefault(workflowTaskBatch.getId(), flowInfo); - Set successors = graph.successors(taskExecute.getParentId()); - if (CollUtil.isEmpty(successors)) { + Set brotherNode = MutableGraphCache.getBrotherNode(graph, taskExecute.getParentId()); + Sets.SetView setView = Sets.union(brotherNode, Sets.newHashSet(taskExecute.getParentId())); + // 查到当前节点【ParentId】的所有兄弟节点是否有后继节点,若有则不能直接完成任务 + Set allSuccessors = Sets.newHashSet(); + for (Long nodeId : setView.immutableCopy()) { + allSuccessors.addAll(graph.successors(nodeId)); + } + + // 若所有的兄弟节点的子节点都没有后继节点可以完成次任务 + if (CollUtil.isEmpty(allSuccessors)) { workflowBatchHandler.complete(taskExecute.getWorkflowTaskBatchId(), workflowTaskBatch); return; } - Set brotherNode = MutableGraphCache.getBrotherNode(graph, taskExecute.getParentId()); - Sets.SetView union = Sets.union(successors, brotherNode); + // TODO 暂时删除,待认证 +// Sets.SetView union = Sets.union(allSuccessors, brotherNode); // 添加父节点,为了判断父节点的处理状态 List allJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper() @@ -116,11 +121,11 @@ public class WorkflowExecutorActor extends AbstractActor { JobTaskBatch::getTaskBatchStatus, JobTaskBatch::getOperationReason) .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId()) .in(JobTaskBatch::getWorkflowNodeId, - Sets.union(union, Sets.newHashSet(taskExecute.getParentId()))) + Sets.union(brotherNode, Sets.newHashSet(taskExecute.getParentId()))) ); List workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper() - .in(WorkflowNode::getId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId()))) + .in(WorkflowNode::getId, Sets.union(allSuccessors, Sets.newHashSet(taskExecute.getParentId()))) .orderByAsc(WorkflowNode::getPriorityLevel)); Map> jobTaskBatchMap = StreamUtils.groupByKey(allJobTaskBatchList, JobTaskBatch::getWorkflowNodeId);