feat(sj_1.1.0): 调试决策节点

This commit is contained in:
opensnail 2024-06-20 23:14:18 +08:00
parent 5391fe1d18
commit 758a9875d7
3 changed files with 48 additions and 28 deletions

View File

@ -112,9 +112,6 @@ public class WorkflowExecutorActor extends AbstractActor {
return;
}
// TODO 暂时删除待认证
// Sets.SetView<Long> union = Sets.union(allSuccessors, brotherNode);
// 添加父节点为了判断父节点的处理状态
List<JobTaskBatch> allJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
.select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId,
@ -155,23 +152,35 @@ public class WorkflowExecutorActor extends AbstractActor {
}
}
if (!brotherNodeIsComplete(taskExecute, brotherNode, jobTaskBatchMap, parentWorkflowNode)) {
return;
}
// 决策节点
if (Objects.nonNull(parentWorkflowNode) && WorkflowNodeTypeEnum.DECISION.getType() == parentWorkflowNode.getNodeType()) {
// 去掉父节点
workflowNodes = workflowNodes.stream()
.filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId()))
.collect(Collectors.toList());
// 获取决策节点子节点
Set<Long> successors = graph.successors(parentWorkflowNode.getId());
workflowNodes = workflowNodes.stream()
// 去掉父节点
.filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId())
// 过滤掉非当前决策节点ParentId的子节点
&& successors.contains(workflowNode.getId())).collect(Collectors.toList());
} else {
if (!brotherNodeIsComplete(taskExecute, brotherNode, jobTaskBatchMap, parentWorkflowNode)) {
return;
}
workflowNodes = workflowNodes.stream()
// 去掉父节点
.filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId()))
.collect(Collectors.toList());
// TODO 合并job task的结果到全局上下文中
// 此次的并发数与当时父节点的兄弟节点的数量一致
workflowBatchHandler.mergeWorkflowContextAndRetry(workflowTaskBatch,
StreamUtils.toSet(allJobTaskBatchList, JobTaskBatch::getId));
}
List<Job> jobs = jobMapper.selectBatchIds(StreamUtils.toSet(workflowNodes, WorkflowNode::getJobId));
Map<Long, Job> jobMap = StreamUtils.toIdentityMap(jobs, Job::getId);
// TODO 合并job task的结果到全局上下文中
// 此次的并发数与当时父节点的兄弟节点的数量一致
workflowBatchHandler.mergeWorkflowContextAndRetry(workflowTaskBatch,
StreamUtils.toSet(allJobTaskBatchList, JobTaskBatch::getId));
// 只会条件节点会使用
Object evaluationResult = null;
for (WorkflowNode workflowNode : workflowNodes) {

View File

@ -148,8 +148,6 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
jobTask.setClientInfo(StrUtil.EMPTY);
jobTask.setTaskBatchId(jobTaskBatch.getId());
jobTask.setArgsType(JobArgsTypeEnum.TEXT.getArgsType());
// TODO 待定是否删除
jobTask.setArgsStr(Optional.ofNullable(context.getTaskResult()).orElse(StrUtil.EMPTY));
jobTask.setTaskStatus(context.getJobTaskStatus());
jobTask.setResultMessage(String.valueOf(context.getEvaluationResult()));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new SnailJobServerException("新增任务实例失败"));

View File

@ -16,12 +16,16 @@ import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.expression.ExpressionInvocationHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.Optional;
/**
@ -33,8 +37,7 @@ import java.util.Optional;
@RequiredArgsConstructor
@Slf4j
public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
private final JobTaskMapper jobTaskMapper;
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
@Override
public WorkflowNodeTypeEnum getWorkflowNodeType() {
@ -64,14 +67,24 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
DecisionConfig decisionConfig = JsonUtil.parseObject(context.getNodeInfo(), DecisionConfig.class);
if (StatusEnum.NO.getStatus().equals(decisionConfig.getDefaultDecision())) {
try {
ExpressionEngine realExpressionEngine = ExpressionTypeEnum.valueOf(decisionConfig.getExpressionType());
Assert.notNull(realExpressionEngine, () -> new SnailJobServerException("表达式引擎不存在"));
ExpressionInvocationHandler invocationHandler = new ExpressionInvocationHandler(realExpressionEngine);
ExpressionEngine expressionEngine = ExpressionFactory.getExpressionEngine(invocationHandler);
result = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), context.getWfContext())).orElse(Boolean.FALSE);
if (!result) {
// 这里重新加载一次最新的上下文
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne(new LambdaQueryWrapper<WorkflowTaskBatch>()
.select(WorkflowTaskBatch::getWfContext)
.eq(WorkflowTaskBatch::getId, context.getWorkflowTaskBatchId())
);
if (Objects.isNull(workflowTaskBatch)) {
operationReason = JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason();
} else {
ExpressionEngine realExpressionEngine = ExpressionTypeEnum.valueOf(decisionConfig.getExpressionType());
Assert.notNull(realExpressionEngine, () -> new SnailJobServerException("表达式引擎不存在"));
ExpressionInvocationHandler invocationHandler = new ExpressionInvocationHandler(realExpressionEngine);
ExpressionEngine expressionEngine = ExpressionFactory.getExpressionEngine(invocationHandler);
result = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), workflowTaskBatch.getWfContext())).orElse(Boolean.FALSE);
if (!result) {
operationReason = JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason();
}
}
} catch (Exception e) {
log.error("执行条件表达式解析异常. 表达式:[{}],参数: [{}]", decisionConfig.getNodeExpression(), context.getWfContext(), e);
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
@ -85,9 +98,9 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
}
}
if (JobTaskBatchStatusEnum.SUCCESS.getStatus() == taskBatchStatus && result) {
workflowTaskExecutor(context);
}
// if (JobTaskBatchStatusEnum.SUCCESS.getStatus() == taskBatchStatus && result) {
// workflowTaskExecutor(context);
// }
// 回传执行结果
context.setEvaluationResult(result);