feat(sj_1.1.0): 修复工作流子节点不触发问题

This commit is contained in:
opensnail 2024-06-17 23:34:32 +08:00
parent 83c771f74e
commit 36ba663fbf
2 changed files with 24 additions and 18 deletions

View File

@ -6,9 +6,7 @@ import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.enums.*;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
@ -18,7 +16,6 @@ import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import com.aizuda.snailjob.server.job.task.support.JobExecutor;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
@ -59,6 +56,9 @@ import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
import static com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum.MAP;
import static com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum.MAP_REDUCE;
/**
* @author: opensnail
* @date : 2023-09-25 17:41
@ -138,10 +138,11 @@ public class JobExecutorActor extends AbstractActor {
JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(job.getTaskType());
JobTaskGenerateContext instanceGenerateContext = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job);
instanceGenerateContext.setTaskBatchId(taskExecute.getTaskBatchId());
instanceGenerateContext.setTaskName(SystemConstants.MAP_ROOT);
instanceGenerateContext.setMapSubTask(Lists.newArrayList(StrUtil.EMPTY));
// TODO 此处需要判断任务类型
instanceGenerateContext.setMrStage(MapReduceStageEnum.MAP.getStage());
if (Lists.newArrayList(MAP_REDUCE.getType(), MAP.getType()).contains(job.getTaskType())) {
instanceGenerateContext.setTaskName(SystemConstants.MAP_ROOT);
instanceGenerateContext.setMapSubTask(Lists.newArrayList(StrUtil.EMPTY));
instanceGenerateContext.setMrStage(MapReduceStageEnum.MAP.getStage());
}
List<JobTask> taskList = taskInstance.generate(instanceGenerateContext);
if (CollUtil.isEmpty(taskList)) {
SnailJobLog.LOCAL.warn("Generate job task is empty, taskBatchId:[{}]", taskExecute.getTaskBatchId());

View File

@ -40,10 +40,7 @@ import org.springframework.stereotype.Component;
import java.text.MessageFormat;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
/**
@ -101,14 +98,22 @@ public class WorkflowExecutorActor extends AbstractActor {
String flowInfo = workflowTaskBatch.getFlowInfo();
MutableGraph<Long> graph = MutableGraphCache.getOrDefault(workflowTaskBatch.getId(), flowInfo);
Set<Long> successors = graph.successors(taskExecute.getParentId());
if (CollUtil.isEmpty(successors)) {
Set<Long> brotherNode = MutableGraphCache.getBrotherNode(graph, taskExecute.getParentId());
Sets.SetView<Long> setView = Sets.union(brotherNode, Sets.newHashSet(taskExecute.getParentId()));
// 查到当前节点ParentId的所有兄弟节点是否有后继节点若有则不能直接完成任务
Set<Long> allSuccessors = Sets.newHashSet();
for (Long nodeId : setView.immutableCopy()) {
allSuccessors.addAll(graph.successors(nodeId));
}
// 若所有的兄弟节点的子节点都没有后继节点可以完成次任务
if (CollUtil.isEmpty(allSuccessors)) {
workflowBatchHandler.complete(taskExecute.getWorkflowTaskBatchId(), workflowTaskBatch);
return;
}
Set<Long> brotherNode = MutableGraphCache.getBrotherNode(graph, taskExecute.getParentId());
Sets.SetView<Long> union = Sets.union(successors, brotherNode);
// TODO 暂时删除待认证
// Sets.SetView<Long> union = Sets.union(allSuccessors, brotherNode);
// 添加父节点为了判断父节点的处理状态
List<JobTaskBatch> allJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
@ -116,11 +121,11 @@ public class WorkflowExecutorActor extends AbstractActor {
JobTaskBatch::getTaskBatchStatus, JobTaskBatch::getOperationReason)
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
.in(JobTaskBatch::getWorkflowNodeId,
Sets.union(union, Sets.newHashSet(taskExecute.getParentId())))
Sets.union(brotherNode, Sets.newHashSet(taskExecute.getParentId())))
);
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
.in(WorkflowNode::getId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId())))
.in(WorkflowNode::getId, Sets.union(allSuccessors, Sets.newHashSet(taskExecute.getParentId())))
.orderByAsc(WorkflowNode::getPriorityLevel));
Map<Long, List<JobTaskBatch>> jobTaskBatchMap = StreamUtils.groupByKey(allJobTaskBatchList, JobTaskBatch::getWorkflowNodeId);