feat: 2.6.0

1. 优化条件节点判定
2. 新增回调节点的逻辑
This commit is contained in:
byteblogs168 2024-01-01 23:20:32 +08:00
parent d14566a823
commit d7a53ae3e6
14 changed files with 126 additions and 51 deletions

View File

@ -15,7 +15,7 @@ public class CallbackConfig {
/**
* webhook
*/
private Integer webhook;
private String webhook;
/**
* 请求类型

View File

@ -0,0 +1,22 @@
package com.aizuda.easy.retry.server.common.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author xiaowoniu
* @date 2024-01-01 22:56:28
* @since 2.6.0
*/
@Getter
@AllArgsConstructor
public enum LogicalConditionEnum {
/**
* 逻辑条件
*/
AND(1, ""),
OR(2, "");
private final Integer code;
private final String desc;
}

View File

@ -26,5 +26,8 @@ public class WorkflowNodeTaskExecuteDTO {
private Long parentId;
private String result;
/**
* 调度任务id
*/
private Long taskBatchId;
}

View File

@ -119,7 +119,7 @@ public class JobExecutorActor extends AbstractActor {
taskExecuteDTO.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType());
taskExecuteDTO.setParentId(taskExecute.getWorkflowNodeId());
taskExecuteDTO.setResult(StrUtil.EMPTY);
taskExecuteDTO.setTaskBatchId(taskExecute.getTaskBatchId());
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
} catch (Exception e) {

View File

@ -100,7 +100,7 @@ public class WorkflowExecutorActor extends AbstractActor {
}
// 添加父节点为了判断父节点的处理状态
List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
List<JobTaskBatch> allJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
.select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId, JobTaskBatch::getTaskBatchStatus)
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
.in(JobTaskBatch::getWorkflowNodeId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId())))
@ -109,12 +109,15 @@ public class WorkflowExecutorActor extends AbstractActor {
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
.in(WorkflowNode::getId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId()))).orderByAsc(WorkflowNode::getPriorityLevel));
Map<Long, JobTaskBatch> jobTaskBatchMap = jobTaskBatchList.stream().collect(Collectors.toMap(JobTaskBatch::getWorkflowNodeId, i -> i));
Map<Long, List<JobTaskBatch>> jobTaskBatchMap = allJobTaskBatchList.stream().collect(Collectors.groupingBy(JobTaskBatch::getWorkflowNodeId));
Map<Long, WorkflowNode> workflowNodeMap = workflowNodes.stream().collect(Collectors.toMap(WorkflowNode::getId, i -> i));
JobTaskBatch parentJobTaskBatch = jobTaskBatchMap.get(taskExecute.getParentId());
List<JobTaskBatch> parentJobTaskBatchList = jobTaskBatchMap.get(taskExecute.getParentId());
// 失败策略处理
if (Objects.nonNull(parentJobTaskBatch) && JobTaskBatchStatusEnum.SUCCESS.getStatus() != parentJobTaskBatch.getTaskBatchStatus()) {
if (!CollectionUtils.isEmpty(parentJobTaskBatchList)
&& parentJobTaskBatchList.stream()
.map(JobTaskBatch::getTaskBatchStatus)
.anyMatch(i -> i != JobTaskBatchStatusEnum.SUCCESS.getStatus())) {
// 判断是否继续处理根据失败策略
WorkflowNode workflowNode = workflowNodeMap.get(taskExecute.getParentId());
// 失败了阻塞策略
@ -125,7 +128,7 @@ public class WorkflowExecutorActor extends AbstractActor {
// 去掉父节点
workflowNodes = workflowNodes.stream().filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId())).collect(
Collectors.toList());
Collectors.toList());
List<Job> jobs = jobMapper.selectBatchIds(workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet()));
Map<Long, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i));
@ -135,8 +138,8 @@ public class WorkflowExecutorActor extends AbstractActor {
for (WorkflowNode workflowNode : workflowNodes) {
// 批次已经存在就不在重复生成
JobTaskBatch jobTaskBatch = jobTaskBatchMap.get(workflowNode.getId());
if (Objects.nonNull(jobTaskBatch) && JobTaskBatchStatusEnum.COMPLETED.contains(jobTaskBatch.getTaskBatchStatus())) {
List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMap.get(workflowNode.getId());
if (!CollectionUtils.isEmpty(jobTaskBatchList)) {
continue;
}
@ -147,8 +150,8 @@ public class WorkflowExecutorActor extends AbstractActor {
context.setJob(jobMap.get(workflowNode.getJobId()));
context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
context.setParentWorkflowNodeId(taskExecute.getParentId());
context.setResult(taskExecute.getResult());
context.setEvaluationResult(evaluationResult);
context.setTaskBatchId(taskExecute.getTaskBatchId());
workflowExecutor.execute(context);

View File

@ -1,7 +1,21 @@
package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.dto.CallbackConfig;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author xiaowoniu
@ -9,7 +23,10 @@ import org.springframework.stereotype.Component;
* @since 2.6.0
*/
@Component
@RequiredArgsConstructor
public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
private final RestTemplate restTemplate;
private final JobTaskMapper jobTaskMapper;
@Override
public WorkflowNodeTypeEnum getWorkflowNodeType() {
return WorkflowNodeTypeEnum.CALLBACK;
@ -17,6 +34,24 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
@Override
protected void doExecute(WorkflowExecutorContext context) {
CallbackConfig decisionConfig = JsonUtil.parseObject(context.getNodeInfo(), CallbackConfig.class);
HttpHeaders requestHeaders = new HttpHeaders();
requestHeaders.set(HttpHeaders.CONTENT_TYPE, decisionConfig.getContentType());
requestHeaders.set("secret", decisionConfig.getSecret());
// TODO 拼接所有的任务结果值传递给下游节点
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getResultMessage)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()));
Map<String, String> uriVariables = new HashMap<>();
uriVariables.put("secret", decisionConfig.getSecret());
restTemplate.exchange(decisionConfig.getWebhook(), HttpMethod.POST,
new HttpEntity<>("", requestHeaders), Object.class, uriVariables);
// TODO 保存批次
// TODO 保存任务
// TODO 保存日志
}
}

View File

@ -12,8 +12,10 @@ import com.aizuda.easy.retry.common.core.expression.ExpressionEngine;
import com.aizuda.easy.retry.common.core.expression.ExpressionFactory;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.dto.DecisionConfig;
import com.aizuda.easy.retry.server.common.enums.ExpressionTypeEnum;
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
import com.aizuda.easy.retry.server.common.enums.LogicalConditionEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
@ -24,6 +26,7 @@ import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatc
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.expression.EvaluationContext;
@ -32,9 +35,7 @@ import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.*;
/**
* @author xiaowoniu
@ -45,9 +46,6 @@ import java.util.Optional;
@RequiredArgsConstructor
@Slf4j
public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
private static final ExpressionParser ENGINE = new SpelExpressionParser();
private final JobTaskBatchGenerator jobTaskBatchGenerator;
private final JobTaskMapper jobTaskMapper;
@ -69,15 +67,36 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
// 多个条件节点直接是或的关系只要一个成功其他节点就取消
taskBatchStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();
} else {
DecisionConfig decisionConfig = JsonUtil.parseObject(context.getNodeInfo(), DecisionConfig.class);
try {
ExpressionEngine realExpressionEngine = ExpressionTypeEnum.valueOf(context.getExpressionType());
ExpressionEngine realExpressionEngine = ExpressionTypeEnum.valueOf(decisionConfig.getExpressionType());
Assert.notNull(realExpressionEngine, () -> new EasyRetryServerException("表达式引擎不存在"));
ExpressionInvocationHandler invocationHandler = new ExpressionInvocationHandler(realExpressionEngine);
ExpressionEngine expressionEngine = ExpressionFactory.getExpressionEngine(invocationHandler);
result = (Boolean) Optional.ofNullable(expressionEngine.eval(context.getNodeExpression(), context.getResult())).orElse(Boolean.FALSE);
log.info("执行条件表达式:[{}],参数: [{}] 结果:[{}]", context.getNodeExpression(), context.getResult(), result);
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getResultMessage)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()));
Boolean tempResult = Boolean.TRUE;
for (JobTask jobTask : jobTasks) {
Boolean execResult = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), jobTask.getResultMessage())).orElse(Boolean.FALSE);
if (Objects.equals(decisionConfig.getLogicalCondition(), LogicalConditionEnum.AND.getCode())) {
tempResult = tempResult && execResult;
} else {
tempResult = tempResult || execResult;
if (tempResult) {
break;
}
}
log.info("执行条件表达式:[{}],参数: [{}] 结果:[{}]", decisionConfig.getNodeExpression(), jobTask.getResultMessage(), result);
}
result = tempResult;
} catch (Exception e) {
log.error("执行条件表达式解析异常. 表达式:[{}],参数: [{}]", context.getNodeExpression(), context.getResult(), e);
log.error("执行条件表达式解析异常. 表达式:[{}],参数: [{}]", decisionConfig.getNodeExpression(), context.getResult(), e);
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR.getReason();
jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus();
@ -92,7 +111,7 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType());
taskExecuteDTO.setParentId(context.getWorkflowNodeId());
taskExecuteDTO.setResult(context.getResult());
taskExecuteDTO.setTaskBatchId(context.getTaskBatchId());
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
} catch (Exception e) {
@ -135,15 +154,4 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
actorRef.tell(jobLogDTO, actorRef);
}
protected Boolean doEval(String expression, Map<String, Object> context) {
try {
final EvaluationContext evaluationContext = new StandardEvaluationContext();
context.forEach(evaluationContext::setVariable);
return ENGINE.parseExpression(expression).getValue(evaluationContext, Boolean.class);
} catch (Exception e) {
throw new EasyRetryServerException("SpEL表达式解析异常. expression:[{}] context:[{}]",
expression, JsonUtil.toJsonString(context), e);
}
}
}

View File

@ -27,7 +27,7 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
// 生成任务批次
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(context.getJob());
jobTaskPrepare.setTriggerType(JobTriggerTypeEnum.WORKFLOW.getType());
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli());
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli() + DateUtils.toNowMilli() % 1000);
jobTaskPrepare.setWorkflowNodeId(context.getWorkflowNodeId());
jobTaskPrepare.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
jobTaskPrepare.setParentWorkflowNodeId(context.getParentWorkflowNodeId());

View File

@ -43,18 +43,11 @@ public class WorkflowExecutorContext {
*/
private Job job;
private String nodeExpression;
/**
* 客户端返回的结果
*/
private String result;
/**
* 1SpEl2Aviator 3QL
*/
private Integer expressionType;
/**
* 失败策略 1跳过 2阻塞
*/
@ -70,4 +63,15 @@ public class WorkflowExecutorContext {
*/
private Boolean evaluationResult;
/**
* 调度任务id
*/
private Long taskBatchId;
/**
* 节点信息
*/
private String nodeInfo;
}

View File

@ -67,7 +67,6 @@ public class JobTaskBatchGenerator {
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType());
taskExecuteDTO.setParentId(context.getWorkflowNodeId());
taskExecuteDTO.setResult(StrUtil.EMPTY);
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
} catch (Exception e) {

View File

@ -86,7 +86,7 @@ public class JobTaskBatchHandler {
taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType());
taskExecuteDTO.setParentId(completeJobBatchDTO.getWorkflowNodeId());
// 这里取第一个的任务执行结果
taskExecuteDTO.setResult(jobTasks.get(0).getResultMessage());
taskExecuteDTO.setTaskBatchId(completeJobBatchDTO.getTaskBatchId());
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
} catch (Exception e) {

View File

@ -127,7 +127,7 @@ public class WorkflowDetailResponseVO {
/**
* 定时任务批次信息
*/
private JobBatchResponseVO jobBatch;
private List<JobBatchResponseVO> jobBatchList;
/**
* 子节点

View File

@ -31,6 +31,8 @@ public interface JobBatchResponseVOConverter {
})
JobBatchResponseVO toJobBatchResponseVO(JobBatchResponseDO jobBatchResponseDO);
List<JobBatchResponseVO> jobTaskBatchToJobBatchResponseVOs(List<JobTaskBatch> jobTaskBatchList);
@Mappings({
@Mapping(target = "executionAt", expression = "java(JobBatchResponseVOConverter.toLocalDateTime(jobTaskBatch.getExecutionAt()))")
})

View File

@ -107,19 +107,18 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
.eq(WorkflowNode::getDeleted, StatusEnum.NO.getStatus())
.eq(WorkflowNode::getWorkflowId, workflow.getId()));
List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
List<JobTaskBatch> alJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getWorkflowTaskBatchId, id));
Map<Long, JobTaskBatch> jobTaskBatchMap = jobTaskBatchList.stream()
.collect(Collectors.toMap(JobTaskBatch::getWorkflowNodeId, i -> i, (v1, v2) -> v1));
Map<Long, List<JobTaskBatch>> jobTaskBatchMap = alJobTaskBatchList.stream()
.collect(Collectors.groupingBy(JobTaskBatch::getWorkflowNodeId));
List<WorkflowDetailResponseVO.NodeInfo> nodeInfos = WorkflowConverter.INSTANCE.toNodeInfo(workflowNodes);
Map<Long, WorkflowDetailResponseVO.NodeInfo> workflowNodeMap = nodeInfos.stream()
.peek(nodeInfo -> {
JobTaskBatch jobTaskBatch = jobTaskBatchMap.get(nodeInfo.getId());
if (Objects.nonNull(jobTaskBatch)) {
JobBatchResponseVO jobBatchResponseVO = JobBatchResponseVOConverter.INSTANCE.toJobBatchResponseVO(jobTaskBatch);
nodeInfo.setJobBatch(jobBatchResponseVO);
List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMap.get(nodeInfo.getId());
if (!CollectionUtils.isEmpty(jobTaskBatchList)) {
nodeInfo.setJobBatchList(JobBatchResponseVOConverter.INSTANCE.jobTaskBatchToJobBatchResponseVOs(jobTaskBatchList));
}
})
.collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, i -> i));