feat: 2.6.0

1. DAG条件节点未完成
This commit is contained in:
byteblogs168 2023-12-26 00:12:05 +08:00
parent 0b60a2ee64
commit cd140cb622
5 changed files with 56 additions and 35 deletions

View File

@ -481,6 +481,7 @@ CREATE TABLE `workflow_node`
`expression_type` tinyint(4) NOT NULL DEFAULT 0 COMMENT '1、SpEl、2、Aviator 3、QL',
`fail_strategy` tinyint(4) NOT NULL DEFAULT 0 COMMENT '失败策略 1、跳过 2、阻塞',
`workflow_node_status` tinyint(4) NOT NULL DEFAULT 1 COMMENT '工作流节点状态 0、关闭、1、开启',
`priority_level` int(11) NOT NULL DEFAULT 1 COMMENT '优先级',
`node_expression` text DEFAULT NULL COMMENT '节点表达式',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',

View File

@ -69,6 +69,11 @@ public class WorkflowNode implements Serializable {
*/
private Integer failStrategy;
/**
* 优先级
*/
private Integer priorityLevel;
/**
* 工作流节点状态 0关闭1开启
*/

View File

@ -103,11 +103,15 @@ public class WorkflowExecutorActor extends AbstractActor {
Map<Long, JobTaskBatch> jobTaskBatchMap = jobTaskBatchList.stream().collect(Collectors.toMap(JobTaskBatch::getWorkflowNodeId, i -> i));
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectBatchIds(successors);
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
.in(WorkflowNode::getId, successors).orderByAsc(WorkflowNode::getPriorityLevel));
List<Job> jobs = jobMapper.selectBatchIds(workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet()));
Map<Long, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i));
// 不管什么任务都需要创建一个 job_task_batch记录 保障一个节点执行创建一次同时可以判断出DAG是否全部执行完成
// 只会条件节点会使用
Boolean evaluationResult = null;
for (WorkflowNode workflowNode : workflowNodes) {
// 批次已经存在就不在重复生成
if (Objects.nonNull(jobTaskBatchMap.get(workflowNode.getId()))) {
@ -121,8 +125,12 @@ public class WorkflowExecutorActor extends AbstractActor {
context.setJob(jobMap.get(workflowNode.getJobId()));
context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
context.setParentWorkflowNodeId(taskExecute.getParentId());
context.setResult(taskExecute.getResult());
context.setEvaluationResult(evaluationResult);
workflowExecutor.execute(context);
evaluationResult = context.getEvaluationResult();
}
}

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Opt;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
@ -12,10 +13,8 @@ import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
@ -24,13 +23,11 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.Map;
import java.util.Optional;
@ -49,6 +46,7 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
private final JobTaskBatchGenerator jobTaskBatchGenerator;
private final JobTaskMapper jobTaskMapper;
@Override
public WorkflowNodeTypeEnum getWorkflowNodeType() {
return WorkflowNodeTypeEnum.CONDITION;
@ -61,10 +59,25 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
int jobTaskStatus = JobTaskStatusEnum.SUCCESS.getStatus();
String message = StrUtil.EMPTY;
Boolean result = Optional.ofNullable(context.getEvaluationResult()).orElse(Boolean.FALSE);
if (result) {
// 多个条件节点直接是或的关系只要一个成功其他节点就取消
taskBatchStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();
} else {
try {
// 根据配置的表达式执行
Boolean result = doEval(context.getExpression(), JsonUtil.parseHashMap(context.getResult()));
if (Boolean.TRUE.equals(result)) {
result = doEval(context.getNodeExpression(), JsonUtil.parseHashMap(context.getResult()));
log.info("执行条件表达式:[{}],参数: [{}] 结果:[{}]", context.getNodeExpression(), context.getResult(), result);
} catch (Exception e) {
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR.getReason();
jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus();
message = e.getMessage();
}
}
if (result) {
// 若是工作流则开启下一个任务
try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
@ -78,12 +91,9 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
log.error("工作流执行失败", e);
}
}
} catch (Exception e) {
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR.getReason();
jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus();
message = e.getMessage();
}
// 回传执行结果
context.setEvaluationResult(result);
JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context);
generatorContext.setTaskBatchStatus(taskBatchStatus);
@ -99,13 +109,14 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
jobTask.setClientInfo(StrUtil.EMPTY);
jobTask.setTaskBatchId(jobTaskBatch.getId());
jobTask.setArgsType(1);
jobTask.setArgsStr(context.getExpression());
jobTask.setArgsStr(Optional.ofNullable(context.getResult()).orElse(StrUtil.EMPTY));
jobTask.setTaskStatus(jobTaskStatus);
jobTask.setResultMessage(Optional.ofNullable(context.getResult()).orElse(StrUtil.EMPTY));
jobTask.setResultMessage(String.valueOf(result));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败"));
// 保存执行的日志
JobLogDTO jobLogDTO = new JobLogDTO();
// TODO 等实时日志处理完毕后再处理
jobLogDTO.setMessage(message);
jobLogDTO.setTaskId(jobTask.getId());
jobLogDTO.setJobId(SystemConstants.CONDITION_JOB_ID);

View File

@ -1,12 +1,8 @@
package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import lombok.Data;
import java.util.List;
import java.util.Map;
/**
* @author xiaowoniu
* @date 2023-12-24
@ -47,7 +43,7 @@ public class WorkflowExecutorContext {
*/
private Job job;
private String expression;
private String nodeExpression;
/**
* 客户端返回的结果
@ -70,8 +66,8 @@ public class WorkflowExecutorContext {
private Integer workflowNodeStatus;
/**
* 节点表达式
* 条件节点的判定结果
*/
private String nodeExpression;
private Boolean evaluationResult;
}