feat: 2.6.0
1. 优化条件节点判定 2. 新增回调节点的逻辑
This commit is contained in:
parent
497455d4ce
commit
639806ef69
@ -15,7 +15,7 @@ public class CallbackConfig {
|
|||||||
/**
|
/**
|
||||||
* webhook
|
* webhook
|
||||||
*/
|
*/
|
||||||
private Integer webhook;
|
private String webhook;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 请求类型
|
* 请求类型
|
||||||
|
@ -0,0 +1,22 @@
|
|||||||
|
package com.aizuda.easy.retry.server.common.enums;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Getter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author xiaowoniu
|
||||||
|
* @date 2024-01-01 22:56:28
|
||||||
|
* @since 2.6.0
|
||||||
|
*/
|
||||||
|
@Getter
|
||||||
|
@AllArgsConstructor
|
||||||
|
public enum LogicalConditionEnum {
|
||||||
|
/**
|
||||||
|
* 逻辑条件
|
||||||
|
*/
|
||||||
|
AND(1, "并"),
|
||||||
|
OR(2, "或");
|
||||||
|
|
||||||
|
private final Integer code;
|
||||||
|
private final String desc;
|
||||||
|
}
|
@ -26,5 +26,8 @@ public class WorkflowNodeTaskExecuteDTO {
|
|||||||
|
|
||||||
private Long parentId;
|
private Long parentId;
|
||||||
|
|
||||||
private String result;
|
/**
|
||||||
|
* 调度任务id
|
||||||
|
*/
|
||||||
|
private Long taskBatchId;
|
||||||
}
|
}
|
||||||
|
@ -119,7 +119,7 @@ public class JobExecutorActor extends AbstractActor {
|
|||||||
taskExecuteDTO.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
|
taskExecuteDTO.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
|
||||||
taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType());
|
taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType());
|
||||||
taskExecuteDTO.setParentId(taskExecute.getWorkflowNodeId());
|
taskExecuteDTO.setParentId(taskExecute.getWorkflowNodeId());
|
||||||
taskExecuteDTO.setResult(StrUtil.EMPTY);
|
taskExecuteDTO.setTaskBatchId(taskExecute.getTaskBatchId());
|
||||||
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
|
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
|
||||||
actorRef.tell(taskExecuteDTO, actorRef);
|
actorRef.tell(taskExecuteDTO, actorRef);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -100,7 +100,7 @@ public class WorkflowExecutorActor extends AbstractActor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 添加父节点,为了判断父节点的处理状态
|
// 添加父节点,为了判断父节点的处理状态
|
||||||
List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
|
List<JobTaskBatch> allJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
|
||||||
.select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId, JobTaskBatch::getTaskBatchStatus)
|
.select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId, JobTaskBatch::getTaskBatchStatus)
|
||||||
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
|
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
|
||||||
.in(JobTaskBatch::getWorkflowNodeId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId())))
|
.in(JobTaskBatch::getWorkflowNodeId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId())))
|
||||||
@ -109,12 +109,15 @@ public class WorkflowExecutorActor extends AbstractActor {
|
|||||||
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
|
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
|
||||||
.in(WorkflowNode::getId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId()))).orderByAsc(WorkflowNode::getPriorityLevel));
|
.in(WorkflowNode::getId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId()))).orderByAsc(WorkflowNode::getPriorityLevel));
|
||||||
|
|
||||||
Map<Long, JobTaskBatch> jobTaskBatchMap = jobTaskBatchList.stream().collect(Collectors.toMap(JobTaskBatch::getWorkflowNodeId, i -> i));
|
Map<Long, List<JobTaskBatch>> jobTaskBatchMap = allJobTaskBatchList.stream().collect(Collectors.groupingBy(JobTaskBatch::getWorkflowNodeId));
|
||||||
Map<Long, WorkflowNode> workflowNodeMap = workflowNodes.stream().collect(Collectors.toMap(WorkflowNode::getId, i -> i));
|
Map<Long, WorkflowNode> workflowNodeMap = workflowNodes.stream().collect(Collectors.toMap(WorkflowNode::getId, i -> i));
|
||||||
JobTaskBatch parentJobTaskBatch = jobTaskBatchMap.get(taskExecute.getParentId());
|
List<JobTaskBatch> parentJobTaskBatchList = jobTaskBatchMap.get(taskExecute.getParentId());
|
||||||
|
|
||||||
// 失败策略处理
|
// 失败策略处理
|
||||||
if (Objects.nonNull(parentJobTaskBatch) && JobTaskBatchStatusEnum.SUCCESS.getStatus() != parentJobTaskBatch.getTaskBatchStatus()) {
|
if (!CollectionUtils.isEmpty(parentJobTaskBatchList)
|
||||||
|
&& parentJobTaskBatchList.stream()
|
||||||
|
.map(JobTaskBatch::getTaskBatchStatus)
|
||||||
|
.anyMatch(i -> i != JobTaskBatchStatusEnum.SUCCESS.getStatus())) {
|
||||||
// 判断是否继续处理,根据失败策略
|
// 判断是否继续处理,根据失败策略
|
||||||
WorkflowNode workflowNode = workflowNodeMap.get(taskExecute.getParentId());
|
WorkflowNode workflowNode = workflowNodeMap.get(taskExecute.getParentId());
|
||||||
// 失败了阻塞策略
|
// 失败了阻塞策略
|
||||||
@ -125,7 +128,7 @@ public class WorkflowExecutorActor extends AbstractActor {
|
|||||||
|
|
||||||
// 去掉父节点
|
// 去掉父节点
|
||||||
workflowNodes = workflowNodes.stream().filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId())).collect(
|
workflowNodes = workflowNodes.stream().filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId())).collect(
|
||||||
Collectors.toList());
|
Collectors.toList());
|
||||||
|
|
||||||
List<Job> jobs = jobMapper.selectBatchIds(workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet()));
|
List<Job> jobs = jobMapper.selectBatchIds(workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet()));
|
||||||
Map<Long, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i));
|
Map<Long, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i));
|
||||||
@ -135,8 +138,8 @@ public class WorkflowExecutorActor extends AbstractActor {
|
|||||||
for (WorkflowNode workflowNode : workflowNodes) {
|
for (WorkflowNode workflowNode : workflowNodes) {
|
||||||
|
|
||||||
// 批次已经存在就不在重复生成
|
// 批次已经存在就不在重复生成
|
||||||
JobTaskBatch jobTaskBatch = jobTaskBatchMap.get(workflowNode.getId());
|
List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMap.get(workflowNode.getId());
|
||||||
if (Objects.nonNull(jobTaskBatch) && JobTaskBatchStatusEnum.COMPLETED.contains(jobTaskBatch.getTaskBatchStatus())) {
|
if (!CollectionUtils.isEmpty(jobTaskBatchList)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -147,8 +150,8 @@ public class WorkflowExecutorActor extends AbstractActor {
|
|||||||
context.setJob(jobMap.get(workflowNode.getJobId()));
|
context.setJob(jobMap.get(workflowNode.getJobId()));
|
||||||
context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
|
context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
|
||||||
context.setParentWorkflowNodeId(taskExecute.getParentId());
|
context.setParentWorkflowNodeId(taskExecute.getParentId());
|
||||||
context.setResult(taskExecute.getResult());
|
|
||||||
context.setEvaluationResult(evaluationResult);
|
context.setEvaluationResult(evaluationResult);
|
||||||
|
context.setTaskBatchId(taskExecute.getTaskBatchId());
|
||||||
|
|
||||||
workflowExecutor.execute(context);
|
workflowExecutor.execute(context);
|
||||||
|
|
||||||
|
@ -1,7 +1,21 @@
|
|||||||
package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
|
package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
|
||||||
|
|
||||||
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
|
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
|
||||||
|
import com.aizuda.easy.retry.common.core.util.JsonUtil;
|
||||||
|
import com.aizuda.easy.retry.server.common.dto.CallbackConfig;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import org.springframework.http.HttpEntity;
|
||||||
|
import org.springframework.http.HttpHeaders;
|
||||||
|
import org.springframework.http.HttpMethod;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.web.client.RestTemplate;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author xiaowoniu
|
* @author xiaowoniu
|
||||||
@ -9,7 +23,10 @@ import org.springframework.stereotype.Component;
|
|||||||
* @since 2.6.0
|
* @since 2.6.0
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
|
@RequiredArgsConstructor
|
||||||
public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
|
public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
|
||||||
|
private final RestTemplate restTemplate;
|
||||||
|
private final JobTaskMapper jobTaskMapper;
|
||||||
@Override
|
@Override
|
||||||
public WorkflowNodeTypeEnum getWorkflowNodeType() {
|
public WorkflowNodeTypeEnum getWorkflowNodeType() {
|
||||||
return WorkflowNodeTypeEnum.CALLBACK;
|
return WorkflowNodeTypeEnum.CALLBACK;
|
||||||
@ -17,6 +34,24 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doExecute(WorkflowExecutorContext context) {
|
protected void doExecute(WorkflowExecutorContext context) {
|
||||||
|
CallbackConfig decisionConfig = JsonUtil.parseObject(context.getNodeInfo(), CallbackConfig.class);
|
||||||
|
|
||||||
|
HttpHeaders requestHeaders = new HttpHeaders();
|
||||||
|
requestHeaders.set(HttpHeaders.CONTENT_TYPE, decisionConfig.getContentType());
|
||||||
|
requestHeaders.set("secret", decisionConfig.getSecret());
|
||||||
|
|
||||||
|
// TODO 拼接所有的任务结果值传递给下游节点
|
||||||
|
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
|
||||||
|
.select(JobTask::getResultMessage)
|
||||||
|
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()));
|
||||||
|
|
||||||
|
Map<String, String> uriVariables = new HashMap<>();
|
||||||
|
uriVariables.put("secret", decisionConfig.getSecret());
|
||||||
|
restTemplate.exchange(decisionConfig.getWebhook(), HttpMethod.POST,
|
||||||
|
new HttpEntity<>("", requestHeaders), Object.class, uriVariables);
|
||||||
|
|
||||||
|
// TODO 保存批次
|
||||||
|
// TODO 保存任务
|
||||||
|
// TODO 保存日志
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -12,8 +12,10 @@ import com.aizuda.easy.retry.common.core.expression.ExpressionEngine;
|
|||||||
import com.aizuda.easy.retry.common.core.expression.ExpressionFactory;
|
import com.aizuda.easy.retry.common.core.expression.ExpressionFactory;
|
||||||
import com.aizuda.easy.retry.common.core.util.JsonUtil;
|
import com.aizuda.easy.retry.common.core.util.JsonUtil;
|
||||||
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||||
|
import com.aizuda.easy.retry.server.common.dto.DecisionConfig;
|
||||||
import com.aizuda.easy.retry.server.common.enums.ExpressionTypeEnum;
|
import com.aizuda.easy.retry.server.common.enums.ExpressionTypeEnum;
|
||||||
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
|
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
|
||||||
|
import com.aizuda.easy.retry.server.common.enums.LogicalConditionEnum;
|
||||||
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
||||||
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
|
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
|
||||||
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
|
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
|
||||||
@ -24,6 +26,7 @@ import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatc
|
|||||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.expression.EvaluationContext;
|
import org.springframework.expression.EvaluationContext;
|
||||||
@ -32,9 +35,7 @@ import org.springframework.expression.spel.standard.SpelExpressionParser;
|
|||||||
import org.springframework.expression.spel.support.StandardEvaluationContext;
|
import org.springframework.expression.spel.support.StandardEvaluationContext;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.*;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author xiaowoniu
|
* @author xiaowoniu
|
||||||
@ -45,9 +46,6 @@ import java.util.Optional;
|
|||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
|
public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
|
||||||
|
|
||||||
private static final ExpressionParser ENGINE = new SpelExpressionParser();
|
|
||||||
|
|
||||||
private final JobTaskBatchGenerator jobTaskBatchGenerator;
|
private final JobTaskBatchGenerator jobTaskBatchGenerator;
|
||||||
private final JobTaskMapper jobTaskMapper;
|
private final JobTaskMapper jobTaskMapper;
|
||||||
|
|
||||||
@ -69,15 +67,36 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
|
|||||||
// 多个条件节点直接是或的关系,只要一个成功其他节点就取消
|
// 多个条件节点直接是或的关系,只要一个成功其他节点就取消
|
||||||
taskBatchStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();
|
taskBatchStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();
|
||||||
} else {
|
} else {
|
||||||
|
DecisionConfig decisionConfig = JsonUtil.parseObject(context.getNodeInfo(), DecisionConfig.class);
|
||||||
try {
|
try {
|
||||||
ExpressionEngine realExpressionEngine = ExpressionTypeEnum.valueOf(context.getExpressionType());
|
ExpressionEngine realExpressionEngine = ExpressionTypeEnum.valueOf(decisionConfig.getExpressionType());
|
||||||
Assert.notNull(realExpressionEngine, () -> new EasyRetryServerException("表达式引擎不存在"));
|
Assert.notNull(realExpressionEngine, () -> new EasyRetryServerException("表达式引擎不存在"));
|
||||||
ExpressionInvocationHandler invocationHandler = new ExpressionInvocationHandler(realExpressionEngine);
|
ExpressionInvocationHandler invocationHandler = new ExpressionInvocationHandler(realExpressionEngine);
|
||||||
ExpressionEngine expressionEngine = ExpressionFactory.getExpressionEngine(invocationHandler);
|
ExpressionEngine expressionEngine = ExpressionFactory.getExpressionEngine(invocationHandler);
|
||||||
result = (Boolean) Optional.ofNullable(expressionEngine.eval(context.getNodeExpression(), context.getResult())).orElse(Boolean.FALSE);
|
|
||||||
log.info("执行条件表达式:[{}],参数: [{}] 结果:[{}]", context.getNodeExpression(), context.getResult(), result);
|
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
|
||||||
|
.select(JobTask::getResultMessage)
|
||||||
|
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()));
|
||||||
|
Boolean tempResult = Boolean.TRUE;
|
||||||
|
for (JobTask jobTask : jobTasks) {
|
||||||
|
Boolean execResult = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), jobTask.getResultMessage())).orElse(Boolean.FALSE);
|
||||||
|
|
||||||
|
if (Objects.equals(decisionConfig.getLogicalCondition(), LogicalConditionEnum.AND.getCode())) {
|
||||||
|
tempResult = tempResult && execResult;
|
||||||
|
} else {
|
||||||
|
tempResult = tempResult || execResult;
|
||||||
|
if (tempResult) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("执行条件表达式:[{}],参数: [{}] 结果:[{}]", decisionConfig.getNodeExpression(), jobTask.getResultMessage(), result);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
result = tempResult;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("执行条件表达式解析异常. 表达式:[{}],参数: [{}]", context.getNodeExpression(), context.getResult(), e);
|
log.error("执行条件表达式解析异常. 表达式:[{}],参数: [{}]", decisionConfig.getNodeExpression(), context.getResult(), e);
|
||||||
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
|
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
|
||||||
operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR.getReason();
|
operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR.getReason();
|
||||||
jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus();
|
jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus();
|
||||||
@ -92,7 +111,7 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
|
|||||||
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
|
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
|
||||||
taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType());
|
taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType());
|
||||||
taskExecuteDTO.setParentId(context.getWorkflowNodeId());
|
taskExecuteDTO.setParentId(context.getWorkflowNodeId());
|
||||||
taskExecuteDTO.setResult(context.getResult());
|
taskExecuteDTO.setTaskBatchId(context.getTaskBatchId());
|
||||||
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
|
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
|
||||||
actorRef.tell(taskExecuteDTO, actorRef);
|
actorRef.tell(taskExecuteDTO, actorRef);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
@ -135,15 +154,4 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
|
|||||||
actorRef.tell(jobLogDTO, actorRef);
|
actorRef.tell(jobLogDTO, actorRef);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Boolean doEval(String expression, Map<String, Object> context) {
|
|
||||||
try {
|
|
||||||
final EvaluationContext evaluationContext = new StandardEvaluationContext();
|
|
||||||
context.forEach(evaluationContext::setVariable);
|
|
||||||
return ENGINE.parseExpression(expression).getValue(evaluationContext, Boolean.class);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new EasyRetryServerException("SpEL表达式解析异常. expression:[{}] context:[{}]",
|
|
||||||
expression, JsonUtil.toJsonString(context), e);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -27,7 +27,7 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
|
|||||||
// 生成任务批次
|
// 生成任务批次
|
||||||
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(context.getJob());
|
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(context.getJob());
|
||||||
jobTaskPrepare.setTriggerType(JobTriggerTypeEnum.WORKFLOW.getType());
|
jobTaskPrepare.setTriggerType(JobTriggerTypeEnum.WORKFLOW.getType());
|
||||||
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli());
|
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli() + DateUtils.toNowMilli() % 1000);
|
||||||
jobTaskPrepare.setWorkflowNodeId(context.getWorkflowNodeId());
|
jobTaskPrepare.setWorkflowNodeId(context.getWorkflowNodeId());
|
||||||
jobTaskPrepare.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
|
jobTaskPrepare.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
|
||||||
jobTaskPrepare.setParentWorkflowNodeId(context.getParentWorkflowNodeId());
|
jobTaskPrepare.setParentWorkflowNodeId(context.getParentWorkflowNodeId());
|
||||||
|
@ -43,18 +43,11 @@ public class WorkflowExecutorContext {
|
|||||||
*/
|
*/
|
||||||
private Job job;
|
private Job job;
|
||||||
|
|
||||||
private String nodeExpression;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 客户端返回的结果
|
* 客户端返回的结果
|
||||||
*/
|
*/
|
||||||
private String result;
|
private String result;
|
||||||
|
|
||||||
/**
|
|
||||||
* 1、SpEl、2、Aviator 3、QL
|
|
||||||
*/
|
|
||||||
private Integer expressionType;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 失败策略 1、跳过 2、阻塞
|
* 失败策略 1、跳过 2、阻塞
|
||||||
*/
|
*/
|
||||||
@ -70,4 +63,15 @@ public class WorkflowExecutorContext {
|
|||||||
*/
|
*/
|
||||||
private Boolean evaluationResult;
|
private Boolean evaluationResult;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 调度任务id
|
||||||
|
*/
|
||||||
|
private Long taskBatchId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 节点信息
|
||||||
|
*/
|
||||||
|
private String nodeInfo;
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -67,7 +67,6 @@ public class JobTaskBatchGenerator {
|
|||||||
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
|
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
|
||||||
taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType());
|
taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType());
|
||||||
taskExecuteDTO.setParentId(context.getWorkflowNodeId());
|
taskExecuteDTO.setParentId(context.getWorkflowNodeId());
|
||||||
taskExecuteDTO.setResult(StrUtil.EMPTY);
|
|
||||||
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
|
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
|
||||||
actorRef.tell(taskExecuteDTO, actorRef);
|
actorRef.tell(taskExecuteDTO, actorRef);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -86,7 +86,7 @@ public class JobTaskBatchHandler {
|
|||||||
taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType());
|
taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType());
|
||||||
taskExecuteDTO.setParentId(completeJobBatchDTO.getWorkflowNodeId());
|
taskExecuteDTO.setParentId(completeJobBatchDTO.getWorkflowNodeId());
|
||||||
// 这里取第一个的任务执行结果
|
// 这里取第一个的任务执行结果
|
||||||
taskExecuteDTO.setResult(jobTasks.get(0).getResultMessage());
|
taskExecuteDTO.setTaskBatchId(completeJobBatchDTO.getTaskBatchId());
|
||||||
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
|
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
|
||||||
actorRef.tell(taskExecuteDTO, actorRef);
|
actorRef.tell(taskExecuteDTO, actorRef);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -127,7 +127,7 @@ public class WorkflowDetailResponseVO {
|
|||||||
/**
|
/**
|
||||||
* 定时任务批次信息
|
* 定时任务批次信息
|
||||||
*/
|
*/
|
||||||
private JobBatchResponseVO jobBatch;
|
private List<JobBatchResponseVO> jobBatchList;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 子节点
|
* 子节点
|
||||||
|
@ -31,6 +31,8 @@ public interface JobBatchResponseVOConverter {
|
|||||||
})
|
})
|
||||||
JobBatchResponseVO toJobBatchResponseVO(JobBatchResponseDO jobBatchResponseDO);
|
JobBatchResponseVO toJobBatchResponseVO(JobBatchResponseDO jobBatchResponseDO);
|
||||||
|
|
||||||
|
List<JobBatchResponseVO> jobTaskBatchToJobBatchResponseVOs(List<JobTaskBatch> jobTaskBatchList);
|
||||||
|
|
||||||
@Mappings({
|
@Mappings({
|
||||||
@Mapping(target = "executionAt", expression = "java(JobBatchResponseVOConverter.toLocalDateTime(jobTaskBatch.getExecutionAt()))")
|
@Mapping(target = "executionAt", expression = "java(JobBatchResponseVOConverter.toLocalDateTime(jobTaskBatch.getExecutionAt()))")
|
||||||
})
|
})
|
||||||
|
@ -107,19 +107,18 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
|
|||||||
.eq(WorkflowNode::getDeleted, StatusEnum.NO.getStatus())
|
.eq(WorkflowNode::getDeleted, StatusEnum.NO.getStatus())
|
||||||
.eq(WorkflowNode::getWorkflowId, workflow.getId()));
|
.eq(WorkflowNode::getWorkflowId, workflow.getId()));
|
||||||
|
|
||||||
List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
|
List<JobTaskBatch> alJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
|
||||||
.eq(JobTaskBatch::getWorkflowTaskBatchId, id));
|
.eq(JobTaskBatch::getWorkflowTaskBatchId, id));
|
||||||
|
|
||||||
Map<Long, JobTaskBatch> jobTaskBatchMap = jobTaskBatchList.stream()
|
Map<Long, List<JobTaskBatch>> jobTaskBatchMap = alJobTaskBatchList.stream()
|
||||||
.collect(Collectors.toMap(JobTaskBatch::getWorkflowNodeId, i -> i, (v1, v2) -> v1));
|
.collect(Collectors.groupingBy(JobTaskBatch::getWorkflowNodeId));
|
||||||
List<WorkflowDetailResponseVO.NodeInfo> nodeInfos = WorkflowConverter.INSTANCE.toNodeInfo(workflowNodes);
|
List<WorkflowDetailResponseVO.NodeInfo> nodeInfos = WorkflowConverter.INSTANCE.toNodeInfo(workflowNodes);
|
||||||
|
|
||||||
Map<Long, WorkflowDetailResponseVO.NodeInfo> workflowNodeMap = nodeInfos.stream()
|
Map<Long, WorkflowDetailResponseVO.NodeInfo> workflowNodeMap = nodeInfos.stream()
|
||||||
.peek(nodeInfo -> {
|
.peek(nodeInfo -> {
|
||||||
JobTaskBatch jobTaskBatch = jobTaskBatchMap.get(nodeInfo.getId());
|
List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMap.get(nodeInfo.getId());
|
||||||
if (Objects.nonNull(jobTaskBatch)) {
|
if (!CollectionUtils.isEmpty(jobTaskBatchList)) {
|
||||||
JobBatchResponseVO jobBatchResponseVO = JobBatchResponseVOConverter.INSTANCE.toJobBatchResponseVO(jobTaskBatch);
|
nodeInfo.setJobBatchList(JobBatchResponseVOConverter.INSTANCE.jobTaskBatchToJobBatchResponseVOs(jobTaskBatchList));
|
||||||
nodeInfo.setJobBatch(jobBatchResponseVO);
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, i -> i));
|
.collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, i -> i));
|
||||||
|
Loading…
Reference in New Issue
Block a user