fix(sj_1.1.0-beta2): 修复工作流节点未执行问题

This commit is contained in:
opensnail 2024-06-30 00:07:51 +08:00
parent dc320b1332
commit 9c34d2c660
7 changed files with 143 additions and 60 deletions

View File

@ -46,7 +46,7 @@ public class JobExecutorResultActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder().match(JobExecutorResultDTO.class, result -> {
SnailJobLog.LOCAL.info("更新任务状态. 参数:[{}]", JsonUtil.toJsonString(result));
SnailJobLog.LOCAL.debug("更新任务状态. 参数:[{}]", JsonUtil.toJsonString(result));
try {
Assert.notNull(result.getTaskId(), ()-> new SnailJobServerException("taskId can not be null"));
Assert.notNull(result.getJobId(), ()-> new SnailJobServerException("jobId can not be null"));

View File

@ -64,7 +64,7 @@ public class WorkflowExecutorActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder().match(WorkflowNodeTaskExecuteDTO.class, taskExecute -> {
log.debug("工作流开始执行. [{}]", JsonUtil.toJsonString(taskExecute));
log.info("工作流开始执行. [{}]", JsonUtil.toJsonString(taskExecute));
try {
doExecutor(taskExecute);
@ -72,8 +72,9 @@ public class WorkflowExecutorActor extends AbstractActor {
} catch (Exception e) {
SnailJobLog.LOCAL.error("workflow executor exception. [{}]", taskExecute, e);
handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(),
JobOperationReasonEnum.TASK_EXECUTION_ERROR.getReason());
SpringContext.getContext().publishEvent(new WorkflowTaskFailAlarmEvent(taskExecute.getWorkflowTaskBatchId()));
JobOperationReasonEnum.TASK_EXECUTION_ERROR.getReason());
SpringContext.getContext()
.publishEvent(new WorkflowTaskFailAlarmEvent(taskExecute.getWorkflowTaskBatchId()));
} finally {
getContext().stop(getSelf());
}
@ -84,14 +85,17 @@ public class WorkflowExecutorActor extends AbstractActor {
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectById(taskExecute.getWorkflowTaskBatchId());
Assert.notNull(workflowTaskBatch, () -> new SnailJobServerException("任务不存在"));
if (SystemConstants.ROOT.equals(taskExecute.getParentId()) && JobTaskBatchStatusEnum.WAITING.getStatus() == workflowTaskBatch.getTaskBatchStatus()) {
handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.RUNNING.getStatus(), JobOperationReasonEnum.NONE.getReason());
if (SystemConstants.ROOT.equals(taskExecute.getParentId())
&& JobTaskBatchStatusEnum.WAITING.getStatus() == workflowTaskBatch.getTaskBatchStatus()) {
handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.RUNNING.getStatus(),
JobOperationReasonEnum.NONE.getReason());
Workflow workflow = workflowMapper.selectById(workflowTaskBatch.getWorkflowId());
JobTimerWheel.clearCache(MessageFormat.format(WorkflowTimerTask.IDEMPOTENT_KEY_PREFIX, taskExecute.getWorkflowTaskBatchId()));
JobTimerWheel.clearCache(
MessageFormat.format(WorkflowTimerTask.IDEMPOTENT_KEY_PREFIX, taskExecute.getWorkflowTaskBatchId()));
JobTimerWheel.registerWithWorkflow(() -> new WorkflowTimeoutCheckTask(taskExecute.getWorkflowTaskBatchId()),
Duration.ofSeconds(workflow.getExecutorTimeout()));
Duration.ofSeconds(workflow.getExecutorTimeout()));
}
// 获取DAG图
@ -103,9 +107,18 @@ public class WorkflowExecutorActor extends AbstractActor {
// 查到当前节点ParentId的所有兄弟节点是否有后继节点若有则不能直接完成任务
Set<Long> allSuccessors = Sets.newHashSet();
for (Long nodeId : setView.immutableCopy()) {
allSuccessors.addAll(graph.successors(nodeId));
Set<Long> successors = graph.successors(nodeId);
if (CollUtil.isNotEmpty(successors)) {
for (final Long successor : successors) {
// 寻找当前的节点的所有前序节点
allSuccessors.addAll(graph.predecessors(successor));
}
allSuccessors.addAll(successors);
}
}
log.warn("父节点:[{}] 所有的节点:[{}]", taskExecute.getParentId(), allSuccessors);
// 若所有的兄弟节点的子节点都没有后继节点可以完成次任务
if (CollUtil.isEmpty(allSuccessors)) {
workflowBatchHandler.complete(taskExecute.getWorkflowTaskBatchId(), workflowTaskBatch);
@ -114,35 +127,36 @@ public class WorkflowExecutorActor extends AbstractActor {
// 添加父节点为了判断父节点的处理状态
List<JobTaskBatch> allJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
.select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId,
JobTaskBatch::getTaskBatchStatus, JobTaskBatch::getOperationReason, JobTaskBatch::getId)
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
.in(JobTaskBatch::getWorkflowNodeId,
Sets.union(brotherNode, Sets.newHashSet(taskExecute.getParentId())))
.select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId,
JobTaskBatch::getTaskBatchStatus, JobTaskBatch::getOperationReason, JobTaskBatch::getId)
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
.in(JobTaskBatch::getWorkflowNodeId,
Sets.union(allSuccessors, Sets.newHashSet(taskExecute.getParentId())))
);
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
.in(WorkflowNode::getId, Sets.union(allSuccessors, Sets.newHashSet(taskExecute.getParentId())))
.orderByAsc(WorkflowNode::getPriorityLevel));
.in(WorkflowNode::getId, Sets.union(allSuccessors, Sets.newHashSet(taskExecute.getParentId())))
.orderByAsc(WorkflowNode::getPriorityLevel));
Map<Long, List<JobTaskBatch>> jobTaskBatchMap = StreamUtils.groupByKey(allJobTaskBatchList, JobTaskBatch::getWorkflowNodeId);
Map<Long, List<JobTaskBatch>> jobTaskBatchMap = StreamUtils.groupByKey(allJobTaskBatchList,
JobTaskBatch::getWorkflowNodeId);
Map<Long, WorkflowNode> workflowNodeMap = StreamUtils.toIdentityMap(workflowNodes, WorkflowNode::getId);
List<JobTaskBatch> parentJobTaskBatchList = jobTaskBatchMap.get(taskExecute.getParentId());
// 如果父节点是无需处理则不再继续执行
if (CollUtil.isNotEmpty(parentJobTaskBatchList) &&
parentJobTaskBatchList.stream()
.map(JobTaskBatch::getOperationReason)
.filter(Objects::nonNull)
.anyMatch(JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION::contains)) {
workflowBatchHandler.complete(taskExecute.getWorkflowTaskBatchId(), workflowTaskBatch);
return;
}
// if (CollUtil.isNotEmpty(parentJobTaskBatchList) &&
// parentJobTaskBatchList.stream()
// .map(JobTaskBatch::getOperationReason)
// .filter(Objects::nonNull)
// .anyMatch(JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION::contains)) {
// workflowBatchHandler.complete(taskExecute.getWorkflowTaskBatchId(), workflowTaskBatch);
// return;
// }
WorkflowNode parentWorkflowNode = workflowNodeMap.get(taskExecute.getParentId());
// 失败策略处理
if (CollUtil.isNotEmpty(parentJobTaskBatchList)
&& parentJobTaskBatchList.stream()
&& parentJobTaskBatchList.stream()
.map(JobTaskBatch::getTaskBatchStatus)
.anyMatch(i -> i != JobTaskBatchStatusEnum.SUCCESS.getStatus())) {
@ -153,29 +167,31 @@ public class WorkflowExecutorActor extends AbstractActor {
}
// 决策节点
if (Objects.nonNull(parentWorkflowNode) && WorkflowNodeTypeEnum.DECISION.getType() == parentWorkflowNode.getNodeType()) {
if (Objects.nonNull(parentWorkflowNode)
&& WorkflowNodeTypeEnum.DECISION.getType() == parentWorkflowNode.getNodeType()) {
// 获取决策节点子节点
Set<Long> successors = graph.successors(parentWorkflowNode.getId());
workflowNodes = workflowNodes.stream()
// 去掉父节点
.filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId())
// 过滤掉非当前决策节点ParentId的子节点
&& successors.contains(workflowNode.getId())).collect(Collectors.toList());
// 去掉父节点
.filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId())
// 过滤掉非当前决策节点ParentId的子节点
&& successors.contains(workflowNode.getId())).collect(Collectors.toList());
} else {
if (!brotherNodeIsComplete(taskExecute, brotherNode, jobTaskBatchMap, parentWorkflowNode)) {
return;
}
// TODO 不通过兄弟节点去控制是否执行后续节点
// if (!brotherNodeIsComplete(taskExecute, brotherNode, jobTaskBatchMap, parentWorkflowNode)) {
// return;
// }
workflowNodes = workflowNodes.stream()
// 去掉父节点
.filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId()))
.collect(Collectors.toList());
// 去掉父节点
.filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId()))
.collect(Collectors.toList());
// TODO 合并job task的结果到全局上下文中
// 此次的并发数与当时父节点的兄弟节点的数量一致
workflowBatchHandler.mergeWorkflowContextAndRetry(workflowTaskBatch,
StreamUtils.toSet(allJobTaskBatchList, JobTaskBatch::getId));
StreamUtils.toSet(allJobTaskBatchList, JobTaskBatch::getId));
}
List<Job> jobs = jobMapper.selectBatchIds(StreamUtils.toSet(workflowNodes, WorkflowNode::getJobId));
@ -183,6 +199,7 @@ public class WorkflowExecutorActor extends AbstractActor {
// 只会条件节点会使用
Object evaluationResult = null;
log.info("待执行的节点为. workflowNodes:[{}]", StreamUtils.toList(workflowNodes, WorkflowNode::getId));
for (WorkflowNode workflowNode : workflowNodes) {
// 批次已经存在就不在重复生成
@ -191,6 +208,41 @@ public class WorkflowExecutorActor extends AbstractActor {
continue;
}
// 决策当前节点要不要执行
Set<Long> predecessors = graph.predecessors(workflowNode.getId());
boolean predecessorsComplete = arePredecessorsComplete(predecessors, jobTaskBatchMap, workflowNode);
if (!SystemConstants.ROOT.equals(taskExecute.getParentId()) && !predecessorsComplete) {
continue;
}
// if (!SystemConstants.ROOT.equals(taskExecute.getParentId())) {
// boolean isConinue = true;
// for (final Long predecessor : predecessors) {
// if (SystemConstants.ROOT.equals(predecessor)) {
// continue;
// }
// List<JobTaskBatch> jobTaskBatches = jobTaskBatchMap.get(predecessor);
// // 说明此节点未执行, 继续等待执行完成
// if (CollUtil.isEmpty(jobTaskBatches)) {
// SnailJobLog.LOCAL.info("批次为空存在未完成的兄弟节点. [{}] 待执行节点:[{}]", predecessor, workflowNode.getId());
// isConinue = Boolean.FALSE;
// continue;
// }
//
// boolean isCompleted = jobTaskBatches.stream().anyMatch(
// jobTaskBatch -> JobTaskBatchStatusEnum.NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus()));
// if (isCompleted) {
// SnailJobLog.LOCAL.info("存在未完成的兄弟节点. [{}] 待执行节点:[{}]", predecessor, workflowNode.getId());
// isConinue = Boolean.FALSE;
// }
// }
//
// // TODO 理论上都不会执行应该 return
// if (!isConinue) {
// continue;
// }
//
// }
// 执行DAG中的节点
WorkflowExecutor workflowExecutor = WorkflowExecutorFactory.getWorkflowExecutor(workflowNode.getNodeType());
@ -202,6 +254,11 @@ public class WorkflowExecutorActor extends AbstractActor {
context.setTaskBatchId(taskExecute.getTaskBatchId());
context.setTaskExecutorScene(taskExecute.getTaskExecutorScene());
context.setWfContext(workflowTaskBatch.getWfContext());
// 这里父节点取最新的批次判断状态
if (CollUtil.isNotEmpty(parentJobTaskBatchList)) {
context.setParentOperationReason(parentJobTaskBatchList.get(0).getOperationReason());
}
workflowExecutor.execute(context);
evaluationResult = context.getEvaluationResult();
@ -209,31 +266,32 @@ public class WorkflowExecutorActor extends AbstractActor {
}
private boolean brotherNodeIsComplete(WorkflowNodeTaskExecuteDTO taskExecute, Set<Long> brotherNode,
Map<Long, List<JobTaskBatch>> jobTaskBatchMap, WorkflowNode parentWorkflowNode) {
if (SystemConstants.ROOT.equals(taskExecute.getParentId())) {
return Boolean.TRUE;
}
private boolean arePredecessorsComplete(Set<Long> predecessors,
Map<Long, List<JobTaskBatch>> jobTaskBatchMap, WorkflowNode waitExecWorkflowNode) {
// 决策节点不需要等待其他的兄弟节点是否完成一个完成直接流转到后继节点
if (WorkflowNodeTypeEnum.DECISION.getType() == parentWorkflowNode.getNodeType()) {
return Boolean.TRUE;
}
// if (WorkflowNodeTypeEnum.DECISION.getType() == parentWorkflowNode.getNodeType()) {
// return Boolean.TRUE;
// }
// 判断所有节点是否都完成
for (final Long nodeId : brotherNode) {
for (final Long nodeId : predecessors) {
if (SystemConstants.ROOT.equals(nodeId)) {
continue;
}
List<JobTaskBatch> jobTaskBatches = jobTaskBatchMap.get(nodeId);
// 说明此节点未执行, 继续等待执行完成
if (CollUtil.isEmpty(jobTaskBatches)) {
SnailJobLog.LOCAL.debug("存在未完成的兄弟节点. [{}]", nodeId);
SnailJobLog.LOCAL.info("批次为空存在未完成的兄弟节点. [{}] 待执行节点:[{}]", nodeId,
waitExecWorkflowNode.getId());
return Boolean.FALSE;
}
boolean isCompleted = jobTaskBatches.stream().anyMatch(
jobTaskBatch -> JobTaskBatchStatusEnum.NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus()));
jobTaskBatch -> JobTaskBatchStatusEnum.NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus()));
if (isCompleted) {
SnailJobLog.LOCAL.debug("存在未完成的兄弟节点. [{}]", nodeId);
SnailJobLog.LOCAL.info("存在未完成的兄弟节点. [{}] 待执行节点:[{}]", nodeId, waitExecWorkflowNode.getId());
return Boolean.FALSE;
}
}
@ -250,7 +308,7 @@ public class WorkflowExecutorActor extends AbstractActor {
jobTaskBatch.setOperationReason(operationReason);
jobTaskBatch.setUpdateDt(LocalDateTime.now());
Assert.isTrue(1 == workflowTaskBatchMapper.updateById(jobTaskBatch),
() -> new SnailJobServerException("更新任务失败"));
() -> new SnailJobServerException("更新任务失败"));
}

View File

@ -21,6 +21,7 @@ import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Sets;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@ -28,6 +29,9 @@ import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.Optional;
import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_DECISION_FAILED;
import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED;
/**
* @author xiaowoniu
* @date 2023-12-24 08:17:11
@ -59,7 +63,7 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
Boolean result = (Boolean) Optional.ofNullable(context.getEvaluationResult()).orElse(Boolean.FALSE);
if (result) {
if (result || (Sets.newHashSet(WORKFLOW_NODE_NO_REQUIRED.getReason(), WORKFLOW_DECISION_FAILED.getReason()).contains( context.getParentOperationReason()))) {
// 多个条件节点直接是或的关系只要一个成功其他节点就取消且是无需处理状态
taskBatchStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();
jobTaskStatus = JobTaskStatusEnum.CANCEL.getStatus();

View File

@ -10,11 +10,17 @@ import com.aizuda.snailjob.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.google.common.collect.Sets;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.Objects;
import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_DECISION_FAILED;
import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION;
import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED;
import static com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION;
/**
* @author xiaowoniu
* @date 2023-12-24 08:09:14
@ -36,7 +42,8 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
@Override
protected void afterExecute(WorkflowExecutorContext context) {
if (Objects.equals(context.getWorkflowNodeStatus(), StatusEnum.YES.getStatus())) {
if (!Sets.newHashSet(WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason(), WORKFLOW_NODE_NO_REQUIRED.getReason(), WORKFLOW_DECISION_FAILED.getReason())
.contains(context.getOperationReason())) {
return;
}
@ -51,7 +58,7 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
jobLogMetaDTO.setTaskId(jobTask.getId());
SnailJobLog.REMOTE.warn("节点[{}]已取消任务执行. 取消原因: 任务已关闭. <|>{}<|>",
context.getWorkflowNodeId(), jobLogMetaDTO);
context.getWorkflowNodeId(), jobLogMetaDTO);
}
@Override
@ -64,11 +71,14 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
if (Objects.equals(context.getWorkflowNodeStatus(), StatusEnum.NO.getStatus())) {
context.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
context.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason());
context.setOperationReason(WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason());
context.setJobTaskStatus(JobTaskStatusEnum.CANCEL.getStatus());
// 执行下一个节点
workflowTaskExecutor(context);
} else if (Sets.newHashSet(WORKFLOW_NODE_NO_REQUIRED.getReason(), WORKFLOW_DECISION_FAILED.getReason()).contains( context.getParentOperationReason())) {
// 针对无需处理的批次直接新增一个记录
context.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
context.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason());
context.setJobTaskStatus(JobTaskStatusEnum.CANCEL.getStatus());
} else {
invokeJobTask(context);
}

View File

@ -38,6 +38,16 @@ public class WorkflowExecutorContext {
*/
private Long parentWorkflowNodeId;
/**
* TODO 父节点批次状态
*/
private Integer parentJobTaskStatus;
/**
* 父节点批次操作原因状态
*/
private Integer parentOperationReason;
/**
* 任务属性
*/
@ -107,4 +117,5 @@ public class WorkflowExecutorContext {
* 工作流全局上下文
*/
private String wfContext;
}

View File

@ -48,7 +48,7 @@ public class OfflineNodeSchedule extends AbstractSchedule implements Lifecycle {
.le(ServerNode::getExpireAt, endTime));
if (CollUtil.isNotEmpty(serverNodes)) {
// 先删除DB中需要下线的机器
serverNodeMapper.deleteBatchIds(StreamUtils.toSet(serverNodes, ServerNode::getId));
serverNodeMapper.deleteByIds(StreamUtils.toSet(serverNodes, ServerNode::getId));
}
Set<RegisterNodeInfo> allPods = CacheRegisterTable.getAllPods();

View File

@ -190,7 +190,7 @@ public class WorkflowHandler {
graph, version);
} else {
if (WorkflowNodeTypeEnum.DECISION.getType() == nodeConfig.getNodeType()) {
throw new SnailJobServerException("决策节点不能作为叶子节点");
throw new SnailJobServerException("决策节点或者决策节点的后继节点不能作为叶子节点");
}
// 叶子节点记录一下