fix(sj_1.1.0-beta2): 优化回调节点

This commit is contained in:
opensnail 2024-06-30 22:58:02 +08:00
parent 9dee6ac587
commit 3cb31a14d5
4 changed files with 144 additions and 109 deletions

View File

@ -8,6 +8,7 @@ import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.FailStrategyEnum;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.enums.NodeTypeEnum;
import com.aizuda.snailjob.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
@ -212,7 +213,8 @@ public class WorkflowExecutorActor extends AbstractActor {
// 决策当前节点要不要执行
Set<Long> predecessors = graph.predecessors(workflowNode.getId());
boolean predecessorsComplete = arePredecessorsComplete(taskExecute, predecessors, jobTaskBatchMap, workflowNode, parentJobTaskBatchList);
boolean predecessorsComplete = arePredecessorsComplete(taskExecute, predecessors, jobTaskBatchMap,
workflowNode);
if (!SystemConstants.ROOT.equals(taskExecute.getParentId()) && !predecessorsComplete) {
continue;
}
@ -230,7 +232,7 @@ public class WorkflowExecutorActor extends AbstractActor {
context.setWfContext(workflowTaskBatch.getWfContext());
// 这里父节点取最新的批次判断状态
if (CollUtil.isNotEmpty(parentJobTaskBatchList)) {
context.setParentOperationReason(parentJobTaskBatchList.get(0).getOperationReason());
fillParentOperationReason(allJobTaskBatchList, parentJobTaskBatchList, parentWorkflowNode, context);
}
workflowExecutor.execute(context);
@ -240,9 +242,33 @@ public class WorkflowExecutorActor extends AbstractActor {
}
private static void fillParentOperationReason(final List<JobTaskBatch> allJobTaskBatchList,
final List<JobTaskBatch> parentJobTaskBatchList, final WorkflowNode parentWorkflowNode,
final WorkflowExecutorContext context) {
JobTaskBatch jobTaskBatch = allJobTaskBatchList.stream()
.filter(batch -> !WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(batch.getOperationReason()))
.findFirst().orElse(null);
/*
若当前节点的父节点存在无需处理的节点(比如决策节点的某个未匹中的分支)则需要等待正常的节点来执行此节点若正常节点已经调度过了
此时则没有能触发后继节点继续调度的节点存在了 因此这里将改变parentOperationReason = 0使之能继续往后处理
基于性能的考虑这里在直接在parentJobTaskBatchList列表的头节点插入一个不是跳过的节点这样就可以正常流转了
eg: {"-1":[480],"480":[481,488,490],"481":[482],"482":[483],"483":[484],"484":[485],"485":[486],"486":[487],"487":[497,498],"488":[489],"489":[497,498],"490":[491,493,495],"491":[492],"492":[497,498],"493":[494],"494":[497,498],"495":[496],"496":[497,498],"497":[499],"498":[499],"499":[]}
*/
if (parentJobTaskBatchList.stream()
.map(JobTaskBatch::getOperationReason)
.filter(Objects::nonNull)
.anyMatch(JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION::contains)
&& Objects.nonNull(jobTaskBatch)
&& parentWorkflowNode.getNodeType() != WorkflowNodeTypeEnum.DECISION.getType()) {
context.setParentOperationReason(JobOperationReasonEnum.NONE.getReason());
} else {
context.setParentOperationReason(parentJobTaskBatchList.get(0).getOperationReason());
}
}
private boolean arePredecessorsComplete(final WorkflowNodeTaskExecuteDTO taskExecute, Set<Long> predecessors,
Map<Long, List<JobTaskBatch>> jobTaskBatchMap, WorkflowNode waitExecWorkflowNode,
List<JobTaskBatch> parentJobTaskBatchList) {
Map<Long, List<JobTaskBatch>> jobTaskBatchMap, WorkflowNode waitExecWorkflowNode) {
// 是否存在无需处理的节点
List<JobTaskBatch> isExistedNotSkipJobTaskBatches = new ArrayList<>();
@ -263,37 +289,21 @@ public class WorkflowExecutorActor extends AbstractActor {
boolean isCompleted = jobTaskBatches.stream().anyMatch(
jobTaskBatch -> JobTaskBatchStatusEnum.NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus()));
if (isCompleted) {
SnailJobLog.LOCAL.info("存在未完成的兄弟节点. [{}] 待执行节点:[{}] parentId:[{}]", nodeId, taskExecute.getParentId(),
SnailJobLog.LOCAL.info("存在未完成的兄弟节点. [{}] 待执行节点:[{}] parentId:[{}]", nodeId,
taskExecute.getParentId(),
waitExecWorkflowNode.getId());
return Boolean.FALSE;
}
if (CollUtil.isEmpty(isExistedNotSkipJobTaskBatches)) {
isExistedNotSkipJobTaskBatches = jobTaskBatches.stream().filter(
jobTaskBatch -> !WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(jobTaskBatch.getOperationReason())).toList();
jobTaskBatch -> !WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(jobTaskBatch.getOperationReason()))
.toList();
}
}
// 父节点是否存在无需处理的节点若存在一个不是的则需要等待正常的节点处理
// 如果父节点是无需处理则不再继续执行
if (CollUtil.isNotEmpty(parentJobTaskBatchList) &&
parentJobTaskBatchList.stream()
.map(JobTaskBatch::getOperationReason)
.filter(Objects::nonNull)
.anyMatch(JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION::contains)
&& CollUtil.isNotEmpty(isExistedNotSkipJobTaskBatches)) {
/*
等待正常的节点来执行此节点若正常节点已经调度过了此时则没有能触发后继节点继续调度的节点存在了
因此这里将重新选当前节点的前驱节点中选一个作为父节点来触发使之能够继续往后执行
基于性能的考虑这里在直接在parentJobTaskBatchList列表的头节点插入一个不是跳过的节点这样就可以正常流转了
eg: {"-1":[480],"480":[481,488,490],"481":[482],"482":[483],"483":[484],"484":[485],"485":[486],"486":[487],"487":[497,498],"488":[489],"489":[497,498],"490":[491,493,495],"491":[492],"492":[497,498],"493":[494],"494":[497,498],"495":[496],"496":[497,498],"497":[499],"498":[499],"499":[]}
*/
log.warn("-->>> isExistedNotSkip:[{}] nodeId:[{}] parentId:[{}]", CollUtil.isNotEmpty(isExistedNotSkipJobTaskBatches), waitExecWorkflowNode.getId(), taskExecute.getParentId());
parentJobTaskBatchList.add(0, isExistedNotSkipJobTaskBatches.get(0));
}
return Boolean.TRUE;
}

View File

@ -9,14 +9,12 @@ import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.dto.CallbackConfig;
import com.aizuda.snailjob.server.common.dto.JobLogMetaDTO;
import com.aizuda.snailjob.server.common.rpc.okhttp.RequestInterceptor;
import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
import com.aizuda.snailjob.server.model.dto.CallbackParamsDTO;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.github.rholder.retry.*;
import com.google.common.collect.Sets;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
@ -26,11 +24,14 @@ import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_DECISION_FAILED;
import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED;
/**
* @author xiaowoniu
* @date 2023-12-24 08:18:06
@ -39,7 +40,8 @@ import java.util.concurrent.TimeUnit;
@Component
@RequiredArgsConstructor
public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
private static final Set<Integer> NO_REQUIRED_CONFIG = Sets.newHashSet(WORKFLOW_NODE_NO_REQUIRED.getReason(),
WORKFLOW_DECISION_FAILED.getReason());
private static final String CALLBACK_TIMEOUT = "10";
private final RestTemplate restTemplate;
@ -61,7 +63,12 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
context.setOperationReason(JobOperationReasonEnum.NONE.getReason());
context.setJobTaskStatus(JobTaskStatusEnum.SUCCESS.getStatus());
if (Objects.equals(context.getWorkflowNodeStatus(), StatusEnum.NO.getStatus())) {
if (NO_REQUIRED_CONFIG.contains(context.getParentOperationReason())) {
// 针对无需处理的批次直接新增一个记录
context.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
context.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason());
context.setJobTaskStatus(JobTaskStatusEnum.CANCEL.getStatus());
} else if (Objects.equals(context.getWorkflowNodeStatus(), StatusEnum.NO.getStatus())) {
context.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
context.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason());
context.setJobTaskStatus(JobTaskStatusEnum.CANCEL.getStatus());
@ -69,9 +76,6 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
invokeCallback(context);
}
// ToDo 执行下一个节点
// workflowTaskExecutor(context);
}
private void invokeCallback(WorkflowExecutorContext context) {
@ -160,8 +164,14 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
SnailJobLog.REMOTE.info("节点[{}]回调成功.\n回调参数:{} \n回调结果:[{}] <|>{}<|>",
context.getWorkflowNodeId(), context.getWfContext(), context.getEvaluationResult(), jobLogMetaDTO);
} else if (jobTaskBatch.getTaskBatchStatus() == JobTaskStatusEnum.CANCEL.getStatus()) {
if (NO_REQUIRED_CONFIG.contains(context.getParentOperationReason())) {
SnailJobLog.REMOTE.warn("节点[{}]取消回调. 取消原因: 当前任务无需处理 <|>{}<|>",
context.getWorkflowNodeId(), jobLogMetaDTO);
} else {
SnailJobLog.REMOTE.warn("节点[{}]取消回调. 取消原因: 任务状态已关闭 <|>{}<|>",
context.getWorkflowNodeId(), jobLogMetaDTO);
}
} else {
SnailJobLog.REMOTE.error("节点[{}]回调失败.\n失败原因:{} <|>{}<|>",
context.getWorkflowNodeId(),

View File

@ -63,8 +63,8 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
context.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason());
context.setJobTaskStatus(JobTaskStatusEnum.CANCEL.getStatus());
// 创建批次和任务节点
invokeCancelJobTask(context);
// 创建批次和任务节点4
invokeCancelJobTask(context, "当前节点无需处理");
} else if (Objects.equals(context.getWorkflowNodeStatus(), StatusEnum.NO.getStatus())) {
// 针对无需处理的批次直接新增一个记录
context.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
@ -72,7 +72,7 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
context.setJobTaskStatus(JobTaskStatusEnum.CANCEL.getStatus());
// 创建批次和任务节点
invokeCancelJobTask(context);
invokeCancelJobTask(context, "任务已关闭");
} else {
invokeJobTask(context);
}
@ -88,7 +88,7 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
actorRef.tell(jobTaskPrepare, actorRef);
}
private void invokeCancelJobTask(final WorkflowExecutorContext context) {
private void invokeCancelJobTask(final WorkflowExecutorContext context, String cancelReason) {
JobTaskBatch jobTaskBatch = generateJobTaskBatch(context);
JobTask jobTask = generateJobTask(context, jobTaskBatch);
@ -100,7 +100,7 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
jobLogMetaDTO.setJobId(context.getJobId());
jobLogMetaDTO.setTaskId(jobTask.getId());
SnailJobLog.REMOTE.warn("节点[{}]已取消任务执行. 取消原因: 任务已关闭. <|>{}<|>",
context.getWorkflowNodeId(), jobLogMetaDTO);
SnailJobLog.REMOTE.warn("节点[{}]已取消任务执行. 取消原因: {}. <|>{}<|>",
context.getWorkflowNodeId(), cancelReason, jobLogMetaDTO);
}
}

View File

@ -50,6 +50,7 @@ import java.util.stream.Collectors;
@Slf4j
@RequiredArgsConstructor
public class WorkflowBatchServiceImpl implements WorkflowBatchService {
private static final Integer NOT_HANDLE_STATUS = 99;
private static final Integer WORKFLOW_DECISION_FAILED_STATUS = 98;
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
@ -87,7 +88,8 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
"batch.create_dt", queryVO.getStartDt(), queryVO.getEndDt())
.eq("batch.deleted", 0)
.orderByDesc("batch.id");
List<WorkflowBatchResponseDO> batchResponseDOList = workflowTaskBatchMapper.selectWorkflowBatchPageList(pageDTO, wrapper);
List<WorkflowBatchResponseDO> batchResponseDOList = workflowTaskBatchMapper.selectWorkflowBatchPageList(pageDTO,
wrapper);
List<WorkflowBatchResponseVO> batchResponseVOList =
WorkflowConverter.INSTANCE.convertListToWorkflowBatchList(batchResponseDOList);
@ -145,11 +147,13 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
jobTaskBatchList = jobTaskBatchList.stream()
.sorted(Comparator.comparingInt(JobTaskBatch::getTaskBatchStatus))
.collect(Collectors.toList());
nodeInfo.setJobBatchList(JobBatchResponseVOConverter.INSTANCE.convertListToJobBatchList(jobTaskBatchList));
nodeInfo.setJobBatchList(
JobBatchResponseVOConverter.INSTANCE.convertListToJobBatchList(jobTaskBatchList));
// 取第最新的一条状态
JobTaskBatch jobTaskBatch = jobTaskBatchList.get(0);
if (JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason() == jobTaskBatch.getOperationReason()) {
if (JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason()
== jobTaskBatch.getOperationReason()) {
// 前端展示使用
nodeInfo.setTaskBatchStatus(WORKFLOW_DECISION_FAILED_STATUS);
} else {
@ -177,6 +181,15 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
for (Long noOperationNodeId : allNoOperationNode) {
WorkflowDetailResponseVO.NodeInfo nodeInfo = workflowNodeMap.get(noOperationNodeId);
List<JobTaskBatch> jobTaskBatches = jobTaskBatchMap.get(nodeInfo.getId());
if (CollUtil.isNotEmpty(jobTaskBatches)) {
jobTaskBatches = jobTaskBatches.stream()
.sorted(Comparator.comparingInt(JobTaskBatch::getTaskBatchStatus))
.collect(Collectors.toList());
nodeInfo.setJobBatchList(
JobBatchResponseVOConverter.INSTANCE.convertListToJobBatchList(jobTaskBatches));
} else {
JobBatchResponseVO jobBatchResponseVO = new JobBatchResponseVO();
JobTaskConfig jobTask = nodeInfo.getJobTask();
if (Objects.nonNull(jobTask)) {
@ -188,6 +201,8 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
jobBatchResponseVO.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason());
nodeInfo.setJobBatchList(Lists.newArrayList(jobBatchResponseVO));
}
}
try {
// 反序列化构建图
WorkflowDetailResponseVO.NodeConfig config = workflowHandler.buildNodeConfig(graph, SystemConstants.ROOT,