feat: 2.6.0
1. DAG条件节点未完成
This commit is contained in:
		
							parent
							
								
									742a5b7a8c
								
							
						
					
					
						commit
						21318eb6d6
					
				@ -481,6 +481,7 @@ CREATE TABLE `workflow_node`
 | 
				
			|||||||
    `expression_type`      tinyint(4) NOT NULL DEFAULT 0 COMMENT '1、SpEl、2、Aviator 3、QL',
 | 
					    `expression_type`      tinyint(4) NOT NULL DEFAULT 0 COMMENT '1、SpEl、2、Aviator 3、QL',
 | 
				
			||||||
    `fail_strategy`        tinyint(4) NOT NULL DEFAULT 0 COMMENT '失败策略 1、跳过 2、阻塞',
 | 
					    `fail_strategy`        tinyint(4) NOT NULL DEFAULT 0 COMMENT '失败策略 1、跳过 2、阻塞',
 | 
				
			||||||
    `workflow_node_status` tinyint(4) NOT NULL DEFAULT 1 COMMENT '工作流节点状态 0、关闭、1、开启',
 | 
					    `workflow_node_status` tinyint(4) NOT NULL DEFAULT 1 COMMENT '工作流节点状态 0、关闭、1、开启',
 | 
				
			||||||
 | 
					    `priority_level`        int(11)    NOT NULL DEFAULT 1 COMMENT '优先级',
 | 
				
			||||||
    `node_expression`      text                 DEFAULT NULL COMMENT '节点表达式',
 | 
					    `node_expression`      text                 DEFAULT NULL COMMENT '节点表达式',
 | 
				
			||||||
    `create_dt`            datetime    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
 | 
					    `create_dt`            datetime    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
 | 
				
			||||||
    `update_dt`            datetime    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
 | 
					    `update_dt`            datetime    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
 | 
				
			||||||
 | 
				
			|||||||
@ -69,6 +69,11 @@ public class WorkflowNode implements Serializable {
 | 
				
			|||||||
     */
 | 
					     */
 | 
				
			||||||
    private Integer failStrategy;
 | 
					    private Integer failStrategy;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    /**
 | 
				
			||||||
 | 
					     * 优先级
 | 
				
			||||||
 | 
					     */
 | 
				
			||||||
 | 
					    private Integer priorityLevel;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    /**
 | 
					    /**
 | 
				
			||||||
     * 工作流节点状态 0、关闭、1、开启
 | 
					     * 工作流节点状态 0、关闭、1、开启
 | 
				
			||||||
     */
 | 
					     */
 | 
				
			||||||
 | 
				
			|||||||
@ -103,11 +103,15 @@ public class WorkflowExecutorActor extends AbstractActor {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        Map<Long, JobTaskBatch> jobTaskBatchMap = jobTaskBatchList.stream().collect(Collectors.toMap(JobTaskBatch::getWorkflowNodeId, i -> i));
 | 
					        Map<Long, JobTaskBatch> jobTaskBatchMap = jobTaskBatchList.stream().collect(Collectors.toMap(JobTaskBatch::getWorkflowNodeId, i -> i));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        List<WorkflowNode> workflowNodes = workflowNodeMapper.selectBatchIds(successors);
 | 
					        List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
 | 
				
			||||||
 | 
					                .in(WorkflowNode::getId, successors).orderByAsc(WorkflowNode::getPriorityLevel));
 | 
				
			||||||
        List<Job> jobs = jobMapper.selectBatchIds(workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet()));
 | 
					        List<Job> jobs = jobMapper.selectBatchIds(workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet()));
 | 
				
			||||||
        Map<Long, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i));
 | 
					        Map<Long, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        // 不管什么任务都需要创建一个 job_task_batch记录 保障一个节点执行创建一次,同时可以判断出DAG是否全部执行完成
 | 
					        // 不管什么任务都需要创建一个 job_task_batch记录 保障一个节点执行创建一次,同时可以判断出DAG是否全部执行完成
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        // 只会条件节点会使用
 | 
				
			||||||
 | 
					        Boolean evaluationResult = null;
 | 
				
			||||||
        for (WorkflowNode workflowNode : workflowNodes) {
 | 
					        for (WorkflowNode workflowNode : workflowNodes) {
 | 
				
			||||||
            // 批次已经存在就不在重复生成
 | 
					            // 批次已经存在就不在重复生成
 | 
				
			||||||
            if (Objects.nonNull(jobTaskBatchMap.get(workflowNode.getId()))) {
 | 
					            if (Objects.nonNull(jobTaskBatchMap.get(workflowNode.getId()))) {
 | 
				
			||||||
@ -121,8 +125,12 @@ public class WorkflowExecutorActor extends AbstractActor {
 | 
				
			|||||||
            context.setJob(jobMap.get(workflowNode.getJobId()));
 | 
					            context.setJob(jobMap.get(workflowNode.getJobId()));
 | 
				
			||||||
            context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
 | 
					            context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
 | 
				
			||||||
            context.setParentWorkflowNodeId(taskExecute.getParentId());
 | 
					            context.setParentWorkflowNodeId(taskExecute.getParentId());
 | 
				
			||||||
 | 
					            context.setResult(taskExecute.getResult());
 | 
				
			||||||
 | 
					            context.setEvaluationResult(evaluationResult);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            workflowExecutor.execute(context);
 | 
					            workflowExecutor.execute(context);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            evaluationResult = context.getEvaluationResult();
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
				
			|||||||
@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import akka.actor.ActorRef;
 | 
					import akka.actor.ActorRef;
 | 
				
			||||||
import cn.hutool.core.lang.Assert;
 | 
					import cn.hutool.core.lang.Assert;
 | 
				
			||||||
 | 
					import cn.hutool.core.lang.Opt;
 | 
				
			||||||
import cn.hutool.core.util.StrUtil;
 | 
					import cn.hutool.core.util.StrUtil;
 | 
				
			||||||
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
 | 
					import com.aizuda.easy.retry.common.core.constant.SystemConstants;
 | 
				
			||||||
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
 | 
					import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
 | 
				
			||||||
@ -12,10 +13,8 @@ import com.aizuda.easy.retry.common.core.util.JsonUtil;
 | 
				
			|||||||
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
 | 
					import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
 | 
				
			||||||
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
 | 
					import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
 | 
				
			||||||
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
 | 
					import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
 | 
				
			||||||
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
 | 
					 | 
				
			||||||
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
 | 
					import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
 | 
				
			||||||
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
 | 
					import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
 | 
				
			||||||
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
 | 
					 | 
				
			||||||
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
 | 
					import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
 | 
				
			||||||
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator;
 | 
					import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator;
 | 
				
			||||||
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
 | 
					import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
 | 
				
			||||||
@ -24,13 +23,11 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
 | 
				
			|||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
 | 
					import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
 | 
				
			||||||
import lombok.RequiredArgsConstructor;
 | 
					import lombok.RequiredArgsConstructor;
 | 
				
			||||||
import lombok.extern.slf4j.Slf4j;
 | 
					import lombok.extern.slf4j.Slf4j;
 | 
				
			||||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
					 | 
				
			||||||
import org.springframework.expression.EvaluationContext;
 | 
					import org.springframework.expression.EvaluationContext;
 | 
				
			||||||
import org.springframework.expression.ExpressionParser;
 | 
					import org.springframework.expression.ExpressionParser;
 | 
				
			||||||
import org.springframework.expression.spel.standard.SpelExpressionParser;
 | 
					import org.springframework.expression.spel.standard.SpelExpressionParser;
 | 
				
			||||||
import org.springframework.expression.spel.support.StandardEvaluationContext;
 | 
					import org.springframework.expression.spel.support.StandardEvaluationContext;
 | 
				
			||||||
import org.springframework.stereotype.Component;
 | 
					import org.springframework.stereotype.Component;
 | 
				
			||||||
import org.springframework.transaction.annotation.Transactional;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
import java.util.Map;
 | 
					import java.util.Map;
 | 
				
			||||||
import java.util.Optional;
 | 
					import java.util.Optional;
 | 
				
			||||||
@ -49,6 +46,7 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    private final JobTaskBatchGenerator jobTaskBatchGenerator;
 | 
					    private final JobTaskBatchGenerator jobTaskBatchGenerator;
 | 
				
			||||||
    private final JobTaskMapper jobTaskMapper;
 | 
					    private final JobTaskMapper jobTaskMapper;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    public WorkflowNodeTypeEnum getWorkflowNodeType() {
 | 
					    public WorkflowNodeTypeEnum getWorkflowNodeType() {
 | 
				
			||||||
        return WorkflowNodeTypeEnum.CONDITION;
 | 
					        return WorkflowNodeTypeEnum.CONDITION;
 | 
				
			||||||
@ -61,10 +59,25 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
 | 
				
			|||||||
        int jobTaskStatus = JobTaskStatusEnum.SUCCESS.getStatus();
 | 
					        int jobTaskStatus = JobTaskStatusEnum.SUCCESS.getStatus();
 | 
				
			||||||
        String message = StrUtil.EMPTY;
 | 
					        String message = StrUtil.EMPTY;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Boolean result = Optional.ofNullable(context.getEvaluationResult()).orElse(Boolean.FALSE);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if (result) {
 | 
				
			||||||
 | 
					            // 多个条件节点直接是或的关系,只要一个成功其他节点就取消
 | 
				
			||||||
 | 
					            taskBatchStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();
 | 
				
			||||||
 | 
					        } else {
 | 
				
			||||||
            try {
 | 
					            try {
 | 
				
			||||||
                // 根据配置的表达式执行
 | 
					                // 根据配置的表达式执行
 | 
				
			||||||
            Boolean result = doEval(context.getExpression(), JsonUtil.parseHashMap(context.getResult()));
 | 
					                result = doEval(context.getNodeExpression(), JsonUtil.parseHashMap(context.getResult()));
 | 
				
			||||||
            if (Boolean.TRUE.equals(result)) {
 | 
					                log.info("执行条件表达式:[{}],参数: [{}] 结果:[{}]", context.getNodeExpression(), context.getResult(), result);
 | 
				
			||||||
 | 
					            } catch (Exception e) {
 | 
				
			||||||
 | 
					                taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
 | 
				
			||||||
 | 
					                operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR.getReason();
 | 
				
			||||||
 | 
					                jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus();
 | 
				
			||||||
 | 
					                message = e.getMessage();
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if (result) {
 | 
				
			||||||
            // 若是工作流则开启下一个任务
 | 
					            // 若是工作流则开启下一个任务
 | 
				
			||||||
            try {
 | 
					            try {
 | 
				
			||||||
                WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
 | 
					                WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
 | 
				
			||||||
@ -78,12 +91,9 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
 | 
				
			|||||||
                log.error("工作流执行失败", e);
 | 
					                log.error("工作流执行失败", e);
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        } catch (Exception e) {
 | 
					
 | 
				
			||||||
            taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
 | 
					        // 回传执行结果
 | 
				
			||||||
            operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR.getReason();
 | 
					        context.setEvaluationResult(result);
 | 
				
			||||||
            jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus();
 | 
					 | 
				
			||||||
            message = e.getMessage();
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context);
 | 
					        JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context);
 | 
				
			||||||
        generatorContext.setTaskBatchStatus(taskBatchStatus);
 | 
					        generatorContext.setTaskBatchStatus(taskBatchStatus);
 | 
				
			||||||
@ -99,13 +109,14 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
 | 
				
			|||||||
        jobTask.setClientInfo(StrUtil.EMPTY);
 | 
					        jobTask.setClientInfo(StrUtil.EMPTY);
 | 
				
			||||||
        jobTask.setTaskBatchId(jobTaskBatch.getId());
 | 
					        jobTask.setTaskBatchId(jobTaskBatch.getId());
 | 
				
			||||||
        jobTask.setArgsType(1);
 | 
					        jobTask.setArgsType(1);
 | 
				
			||||||
        jobTask.setArgsStr(context.getExpression());
 | 
					        jobTask.setArgsStr(Optional.ofNullable(context.getResult()).orElse(StrUtil.EMPTY));
 | 
				
			||||||
        jobTask.setTaskStatus(jobTaskStatus);
 | 
					        jobTask.setTaskStatus(jobTaskStatus);
 | 
				
			||||||
        jobTask.setResultMessage(Optional.ofNullable(context.getResult()).orElse(StrUtil.EMPTY));
 | 
					        jobTask.setResultMessage(String.valueOf(result));
 | 
				
			||||||
        Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败"));
 | 
					        Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败"));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        // 保存执行的日志
 | 
					        // 保存执行的日志
 | 
				
			||||||
        JobLogDTO jobLogDTO = new JobLogDTO();
 | 
					        JobLogDTO jobLogDTO = new JobLogDTO();
 | 
				
			||||||
 | 
					        // TODO 等实时日志处理完毕后,再处理
 | 
				
			||||||
        jobLogDTO.setMessage(message);
 | 
					        jobLogDTO.setMessage(message);
 | 
				
			||||||
        jobLogDTO.setTaskId(jobTask.getId());
 | 
					        jobLogDTO.setTaskId(jobTask.getId());
 | 
				
			||||||
        jobLogDTO.setJobId(SystemConstants.CONDITION_JOB_ID);
 | 
					        jobLogDTO.setJobId(SystemConstants.CONDITION_JOB_ID);
 | 
				
			||||||
 | 
				
			|||||||
@ -1,12 +1,8 @@
 | 
				
			|||||||
package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
 | 
					package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
 | 
					import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
 | 
				
			||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
 | 
					 | 
				
			||||||
import lombok.Data;
 | 
					import lombok.Data;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import java.util.List;
 | 
					 | 
				
			||||||
import java.util.Map;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
/**
 | 
					/**
 | 
				
			||||||
 * @author xiaowoniu
 | 
					 * @author xiaowoniu
 | 
				
			||||||
 * @date 2023-12-24
 | 
					 * @date 2023-12-24
 | 
				
			||||||
@ -47,7 +43,7 @@ public class WorkflowExecutorContext {
 | 
				
			|||||||
     */
 | 
					     */
 | 
				
			||||||
    private Job job;
 | 
					    private Job job;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private String expression;
 | 
					    private String nodeExpression;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    /**
 | 
					    /**
 | 
				
			||||||
     * 客户端返回的结果
 | 
					     * 客户端返回的结果
 | 
				
			||||||
@ -70,8 +66,8 @@ public class WorkflowExecutorContext {
 | 
				
			|||||||
    private Integer workflowNodeStatus;
 | 
					    private Integer workflowNodeStatus;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    /**
 | 
					    /**
 | 
				
			||||||
     * 节点表达式
 | 
					     * 条件节点的判定结果
 | 
				
			||||||
     */
 | 
					     */
 | 
				
			||||||
    private String nodeExpression;
 | 
					    private Boolean evaluationResult;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
		Reference in New Issue
	
	Block a user