feat: 2.6.0

1. 优化工作流处理
This commit is contained in:
byteblogs168 2024-01-07 22:55:50 +08:00
parent 6ec1e30e4d
commit 7c9f317dd3
34 changed files with 488 additions and 258 deletions

View File

@ -48,6 +48,8 @@ public class ExpressionInvocationHandler implements InvocationHandler {
context.put(paramNameArr[i], methodArgs[i]);
}
return method.invoke(expression, context);
// 替换参数
args[1] = new Object[]{context};
return method.invoke(expressionEngine, args);
}
}

View File

@ -24,9 +24,12 @@ public enum JobOperationReasonEnum {
NOT_EXECUTE_TASK(6, "无可执行任务项"),
TASK_EXECUTE_ERROR(7, "任务执行期间发生非预期异常"),
MANNER_STOP(8, "手动停止"),
WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR(8, "条件节点执行异常"),
JOB_TASK_INTERRUPTED(9, "任务中断"),
WORKFLOW_CALLBACK_NODE_EXECUTOR_ERROR(8, "条件节点执行异常"),
WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR(9, "条件节点执行异常"),
JOB_TASK_INTERRUPTED(10, "任务中断"),
WORKFLOW_CALLBACK_NODE_EXECUTOR_ERROR(11, "回调节点执行异常"),
WORKFLOW_NODE_NO_OPERATION_REQUIRED(12, "无需处理"),
WORKFLOW_NODE_EXECUTOR_ERROR_SKIP(13, "节点处理失败并跳过"),
;

View File

@ -52,4 +52,6 @@ public enum JobTaskBatchStatusEnum {
public static final List<Integer> COMPLETED = Arrays.asList(SUCCESS.status, FAIL.status, STOP.status, CANCEL.status);
public static final List<Integer> NOT_SUCCESS = Arrays.asList(FAIL.status, STOP.status, CANCEL.status);
}

View File

@ -35,6 +35,11 @@ public enum JobTaskStatusEnum {
* 任务停止
*/
STOP(5),
/**
* 取消
*/
CANCEL(6),
;
private final int status;

View File

@ -26,7 +26,7 @@ public class SpELExpressionEngine extends AbstractExpressionEngine {
try {
final EvaluationContext evaluationContext = new StandardEvaluationContext();
context.forEach(evaluationContext::setVariable);
return ENGINE.parseExpression(expression).getValue(evaluationContext, String.class);
return ENGINE.parseExpression(expression).getValue(evaluationContext, Object.class);
} catch (Exception e) {
throw new EasyRetryCommonException("SpEL表达式解析异常. expression:[{}] context:[{}]",
expression, JsonUtil.toJsonString(context), e);

View File

@ -27,4 +27,9 @@ public class DecisionConfig {
*/
private Integer logicalCondition;
/**
* 是否为其他情况
*/
private Integer defaultDecision;
}

View File

@ -16,7 +16,7 @@ import lombok.Getter;
public enum JobExecuteStrategyEnum {
AUTO(1, "自动执行"),
MANUAL(2, "手动执行"),
WORKFLOW(2, "DAG执行"),
WORKFLOW(3, "DAG执行"),
;
private final Integer type;

View File

@ -17,6 +17,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig;
import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
@ -73,7 +74,7 @@ public class ServerNodeBalance implements Lifecycle, Runnable {
// 删除本地缓存的消费桶的信息
DistributeInstance.INSTANCE.clearConsumerBucket();
if(CollectionUtils.isEmpty(podIpSet)) {
if (CollectionUtils.isEmpty(podIpSet)) {
return;
}
@ -117,13 +118,8 @@ public class ServerNodeBalance implements Lifecycle, Runnable {
}
private void refreshExpireAtCache(List<ServerNode> remotePods) {
// 刷新最新的节点注册信息
for (ServerNode node : remotePods) {
Optional.ofNullable(CacheRegisterTable.getServerNode(node.getGroupName(), node.getNamespaceId(), node.getHostId())).ifPresent(registerNodeInfo -> {
registerNodeInfo.setExpireAt(node.getExpireAt());
});
}
// 重新刷新缓存
refreshCache(remotePods);
}
private void refreshCache(List<ServerNode> remotePods) {

View File

@ -3,9 +3,14 @@ package com.aizuda.easy.retry.server.job.task.support.cache;
import com.aizuda.easy.retry.server.common.util.GraphUtils;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.graph.MutableGraph;
import org.springframework.util.CollectionUtils;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
@ -45,5 +50,38 @@ public class MutableGraphCache {
return getOrDefault(workflowBatchId, "");
}
/**
* 获取所有的叶子节点
*
* @param workflowBatchId 工作流批次ID
* @param jsonGraph JSON格式的图对象字符串
* @return 叶子节点
*/
public static List<Long> getLeaves(Long workflowBatchId, String jsonGraph) {
MutableGraph<Long> graph = getOrDefault(workflowBatchId, jsonGraph);
List<Long> leaves = Lists.newArrayList();
for (Long node : graph.nodes()) {
if (CollectionUtils.isEmpty(graph.successors(node))) {
leaves.add(node);
}
}
return leaves;
}
public static Set<Long> getAllDescendants(MutableGraph<Long> graph, Long parentId) {
Set<Long> descendants = new HashSet<>();
getAllDescendantsHelper(graph, parentId, descendants);
return descendants;
}
private static void getAllDescendantsHelper(MutableGraph<Long> graph, Long parentId, Set<Long> descendants) {
Set<Long> successors = graph.successors(parentId);
descendants.addAll(successors);
for (Long successor : successors) {
getAllDescendantsHelper(graph, successor, descendants);
}
}
}

View File

@ -13,6 +13,7 @@ import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.util.DateUtils;
@ -152,7 +153,7 @@ public class JobExecutorActor extends AbstractActor {
@Override
public void afterCompletion(int status) {
// 清除时间轮的缓存
JobTimerWheel.clearCache(taskExecute.getTaskBatchId());
JobTimerWheel.clearCache(TaskTypeEnum.JOB.getType(), taskExecute.getTaskBatchId());
//方法内容
doHandlerResidentTask(job, taskExecute);
}
@ -206,7 +207,7 @@ public class JobExecutorActor extends AbstractActor {
log.info("常驻任务监控. 任务时间差:[{}] 取余:[{}]", milliseconds, DateUtils.toNowMilli() % 1000);
job.setNextTriggerAt(nextTriggerAt);
JobTimerWheel.register(jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - DateUtils.toNowMilli() % 1000, TimeUnit.MILLISECONDS);
JobTimerWheel.register(TaskTypeEnum.JOB.getType(), jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - DateUtils.toNowMilli() % 1000, TimeUnit.MILLISECONDS);
ResidentTaskCache.refresh(job.getId(), nextTriggerAt);
}
}

View File

@ -5,7 +5,6 @@ import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.FailStrategyEnum;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
@ -97,14 +96,23 @@ public class WorkflowExecutorActor extends AbstractActor {
);
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
.eq(WorkflowNode::getWorkflowNodeStatus, StatusEnum.YES.getStatus())
.in(WorkflowNode::getId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId())))
.orderByAsc(WorkflowNode::getPriorityLevel));
.orderByAsc(WorkflowNode::getPriorityLevel));
Map<Long, List<JobTaskBatch>> jobTaskBatchMap = allJobTaskBatchList.stream().collect(Collectors.groupingBy(JobTaskBatch::getWorkflowNodeId));
Map<Long, WorkflowNode> workflowNodeMap = workflowNodes.stream().collect(Collectors.toMap(WorkflowNode::getId, i -> i));
List<JobTaskBatch> parentJobTaskBatchList = jobTaskBatchMap.get(taskExecute.getParentId());
// 如果父节点是无需处理则不再继续执行
if (!CollectionUtils.isEmpty(parentJobTaskBatchList) &&
parentJobTaskBatchList.stream()
.map(JobTaskBatch::getOperationReason)
.filter(Objects::nonNull)
.anyMatch(i -> i == JobOperationReasonEnum.WORKFLOW_NODE_NO_OPERATION_REQUIRED.getReason())) {
workflowBatchHandler.complete(taskExecute.getWorkflowTaskBatchId(), workflowTaskBatch);
return;
}
// 失败策略处理
if (!CollectionUtils.isEmpty(parentJobTaskBatchList)
&& parentJobTaskBatchList.stream()
@ -126,7 +134,7 @@ public class WorkflowExecutorActor extends AbstractActor {
Map<Long, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, i -> i));
// 只会条件节点会使用
Boolean evaluationResult = null;
Object evaluationResult = null;
for (WorkflowNode workflowNode : workflowNodes) {
// 批次已经存在就不在重复生成

View File

@ -1,9 +1,26 @@
package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.LockExecutor;
import com.aizuda.easy.retry.server.job.task.support.WorkflowExecutor;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.job.task.support.handler.DistributedLockHandler;
import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
@ -11,7 +28,10 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.Objects;
import java.util.Optional;
/**
* @author xiaowoniu
@ -26,6 +46,12 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
private DistributedLockHandler distributedLockHandler;
@Autowired
private JobTaskBatchMapper jobTaskBatchMapper;
@Autowired
private JobTaskBatchGenerator jobTaskBatchGenerator;
@Autowired
private WorkflowBatchHandler workflowBatchHandler;
@Autowired
private JobTaskMapper jobTaskMapper;
@Override
@Transactional
@ -43,13 +69,84 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
return;
}
if (!preValidate(context)) {
return;
}
beforeExecute(context);
doExecute(context);
afterExecute(context);
});
}
protected boolean preValidate(WorkflowExecutorContext context) {
// 无需处理
if (Objects.equals(context.getWorkflowNodeStatus(), StatusEnum.NO.getStatus())) {
JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context);
generatorContext.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
generatorContext.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_NO_OPERATION_REQUIRED.getReason());
generatorContext.setJobId(context.getJobId());
jobTaskBatchGenerator.generateJobTaskBatch(generatorContext);
try {
workflowBatchHandler.complete(context.getWorkflowTaskBatchId());
} catch (IOException e) {
throw new EasyRetryServerException("工作流完成处理异常", e);
}
return false;
}
return doPreValidate(context);
}
protected abstract boolean doPreValidate(WorkflowExecutorContext context);
protected abstract void afterExecute(WorkflowExecutorContext context);
protected abstract void beforeExecute(WorkflowExecutorContext context);
protected abstract void doExecute(WorkflowExecutorContext context);
protected JobTaskBatch generateJobTaskBatch(WorkflowExecutorContext context) {
JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context);
return jobTaskBatchGenerator.generateJobTaskBatch(generatorContext);
}
protected void workflowTaskExecutor(WorkflowExecutorContext context) {
try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
taskExecuteDTO.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
taskExecuteDTO.setParentId(context.getWorkflowNodeId());
taskExecuteDTO.setTaskBatchId(context.getTaskBatchId());
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
} catch (Exception e) {
log.error("工作流执行失败", e);
}
}
protected JobTask generateJobTask(WorkflowExecutorContext context, JobTaskBatch jobTaskBatch) {
// 生成执行任务实例
JobTask jobTask = new JobTask();
jobTask.setGroupName(context.getGroupName());
jobTask.setNamespaceId(context.getNamespaceId());
jobTask.setJobId(context.getJobId());
jobTask.setClientInfo(StrUtil.EMPTY);
jobTask.setTaskBatchId(jobTaskBatch.getId());
jobTask.setArgsType(1);
jobTask.setArgsStr(Optional.ofNullable(context.getResult()).orElse(StrUtil.EMPTY));
jobTask.setTaskStatus(context.getJobTaskStatus());
jobTask.setResultMessage(String.valueOf(context.getEvaluationResult()));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败"));
return jobTask;
}
@Override
public void afterPropertiesSet() throws Exception {
WorkflowExecutorFactory.registerJobExecutor(getWorkflowNodeType(), this);

View File

@ -1,7 +1,6 @@
package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
@ -13,11 +12,8 @@ import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.client.RequestInterceptor;
import com.aizuda.easy.retry.server.common.dto.CallbackConfig;
import com.aizuda.easy.retry.server.common.enums.ContentTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.model.dto.CallbackParamsDTO;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
@ -28,7 +24,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
@ -36,8 +31,6 @@ import org.springframework.web.client.RestTemplate;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
/**
* @author xiaowoniu
@ -52,15 +45,20 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
private static final String CALLBACK_TIMEOUT = "10";
private final RestTemplate restTemplate;
private final JobTaskMapper jobTaskMapper;
private final JobTaskBatchGenerator jobTaskBatchGenerator;
@Override
public WorkflowNodeTypeEnum getWorkflowNodeType() {
return WorkflowNodeTypeEnum.CALLBACK;
}
@Override
protected void beforeExecute(WorkflowExecutorContext context) {
}
@Override
protected void doExecute(WorkflowExecutorContext context) {
CallbackConfig decisionConfig = JsonUtil.parseObject(context.getNodeInfo(), CallbackConfig.class);
int taskBatchStatus = JobTaskBatchStatusEnum.SUCCESS.getStatus();
int operationReason = JobOperationReasonEnum.NONE.getReason();
@ -91,34 +89,39 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
} catch (Exception e) {
log.error("回调异常. webHook:[{}],参数: [{}]", decisionConfig.getWebhook(), context.getResult(), e);
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR.getReason();
operationReason = JobOperationReasonEnum.WORKFLOW_CALLBACK_NODE_EXECUTOR_ERROR.getReason();
jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus();
message = e.getMessage();
}
JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context);
generatorContext.setTaskBatchStatus(taskBatchStatus);
generatorContext.setOperationReason(operationReason);
generatorContext.setJobId(SystemConstants.CALLBACK_JOB_ID);
JobTaskBatch jobTaskBatch = jobTaskBatchGenerator.generateJobTaskBatch(generatorContext);
if (JobTaskBatchStatusEnum.SUCCESS.getStatus() == taskBatchStatus) {
workflowTaskExecutor(context);
}
// 生成执行任务实例
JobTask jobTask = new JobTask();
jobTask.setGroupName(context.getGroupName());
jobTask.setNamespaceId(context.getNamespaceId());
jobTask.setJobId(SystemConstants.CALLBACK_JOB_ID);
jobTask.setClientInfo(StrUtil.EMPTY);
jobTask.setTaskBatchId(jobTaskBatch.getId());
jobTask.setArgsType(1);
jobTask.setArgsStr(Optional.ofNullable(context.getResult()).orElse(StrUtil.EMPTY));
jobTask.setTaskStatus(jobTaskStatus);
jobTask.setResultMessage(Optional.ofNullable(result).orElse(StrUtil.EMPTY));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败"));
context.setTaskBatchStatus(taskBatchStatus);
context.setOperationReason(operationReason);
context.setJobTaskStatus(jobTaskStatus);
context.setEvaluationResult(result);
context.setLogMessage(message);
}
@Override
protected boolean doPreValidate(WorkflowExecutorContext context) {
return true;
}
@Override
protected void afterExecute(WorkflowExecutorContext context) {
JobTaskBatch jobTaskBatch = generateJobTaskBatch(context);
JobTask jobTask = generateJobTask(context, jobTaskBatch);
// 保存执行的日志
JobLogDTO jobLogDTO = new JobLogDTO();
// TODO 等实时日志处理完毕后再处理
jobLogDTO.setMessage(message);
jobLogDTO.setMessage(context.getLogMessage());
jobLogDTO.setTaskId(jobTask.getId());
jobLogDTO.setJobId(SystemConstants.CALLBACK_JOB_ID);
jobLogDTO.setGroupName(context.getGroupName());

View File

@ -4,25 +4,17 @@ import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.common.core.enums.*;
import com.aizuda.easy.retry.common.core.expression.ExpressionEngine;
import com.aizuda.easy.retry.common.core.expression.ExpressionFactory;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.dto.DecisionConfig;
import com.aizuda.easy.retry.server.common.enums.ExpressionTypeEnum;
import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.common.enums.LogicalConditionEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.expression.ExpressionInvocationHandler;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
@ -31,7 +23,9 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/**
* @author xiaowoniu
@ -42,7 +36,6 @@ import java.util.*;
@RequiredArgsConstructor
@Slf4j
public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
private final JobTaskBatchGenerator jobTaskBatchGenerator;
private final JobTaskMapper jobTaskMapper;
@Override
@ -50,6 +43,11 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
return WorkflowNodeTypeEnum.DECISION;
}
@Override
protected void beforeExecute(WorkflowExecutorContext context) {
}
@Override
public void doExecute(WorkflowExecutorContext context) {
int taskBatchStatus = JobTaskBatchStatusEnum.SUCCESS.getStatus();
@ -57,89 +55,83 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
int jobTaskStatus = JobTaskStatusEnum.SUCCESS.getStatus();
String message = StrUtil.EMPTY;
Boolean result = Optional.ofNullable(context.getEvaluationResult()).orElse(Boolean.FALSE);
Boolean result = (Boolean) Optional.ofNullable(context.getEvaluationResult()).orElse(Boolean.FALSE);
if (result) {
// 多个条件节点直接是或的关系只要一个成功其他节点就取消
// 多个条件节点直接是或的关系只要一个成功其他节点就取消且是无需处理状态
taskBatchStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();
jobTaskStatus = JobTaskStatusEnum.CANCEL.getStatus();
operationReason = JobOperationReasonEnum.WORKFLOW_NODE_NO_OPERATION_REQUIRED.getReason();
} else {
boolean tempResult = Boolean.TRUE;
DecisionConfig decisionConfig = JsonUtil.parseObject(context.getNodeInfo(), DecisionConfig.class);
try {
ExpressionEngine realExpressionEngine = ExpressionTypeEnum.valueOf(decisionConfig.getExpressionType());
Assert.notNull(realExpressionEngine, () -> new EasyRetryServerException("表达式引擎不存在"));
ExpressionInvocationHandler invocationHandler = new ExpressionInvocationHandler(realExpressionEngine);
ExpressionEngine expressionEngine = ExpressionFactory.getExpressionEngine(invocationHandler);
if (StatusEnum.NO.getStatus().equals(decisionConfig.getDefaultDecision())) {
try {
ExpressionEngine realExpressionEngine = ExpressionTypeEnum.valueOf(decisionConfig.getExpressionType());
Assert.notNull(realExpressionEngine, () -> new EasyRetryServerException("表达式引擎不存在"));
ExpressionInvocationHandler invocationHandler = new ExpressionInvocationHandler(realExpressionEngine);
ExpressionEngine expressionEngine = ExpressionFactory.getExpressionEngine(invocationHandler);
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getResultMessage)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()));
Boolean tempResult = Boolean.TRUE;
for (JobTask jobTask : jobTasks) {
Boolean execResult = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), jobTask.getResultMessage())).orElse(Boolean.FALSE);
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getResultMessage)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()));
if (Objects.equals(decisionConfig.getLogicalCondition(), LogicalConditionEnum.AND.getCode())) {
tempResult = tempResult && execResult;
} else {
tempResult = tempResult || execResult;
if (tempResult) {
break;
for (JobTask jobTask : jobTasks) {
boolean execResult = (Boolean) Optional.ofNullable(expressionEngine.eval(decisionConfig.getNodeExpression(), jobTask.getResultMessage())).orElse(Boolean.FALSE);
if (Objects.equals(decisionConfig.getLogicalCondition(), LogicalConditionEnum.AND.getCode())) {
tempResult = tempResult && execResult;
} else {
tempResult = tempResult || execResult;
if (tempResult) {
break;
}
}
log.info("执行条件表达式:[{}],参数: [{}] 结果:[{}]", decisionConfig.getNodeExpression(), jobTask.getResultMessage(), result);
}
log.info("执行条件表达式:[{}],参数: [{}] 结果:[{}]", decisionConfig.getNodeExpression(), jobTask.getResultMessage(), result);
result = tempResult;
} catch (Exception e) {
log.error("执行条件表达式解析异常. 表达式:[{}],参数: [{}]", decisionConfig.getNodeExpression(), context.getResult(), e);
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR.getReason();
jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus();
message = e.getMessage();
}
result = tempResult;
} catch (Exception e) {
log.error("执行条件表达式解析异常. 表达式:[{}],参数: [{}]", decisionConfig.getNodeExpression(), context.getResult(), e);
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR.getReason();
jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus();
message = e.getMessage();
}
}
if (result) {
try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
taskExecuteDTO.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
taskExecuteDTO.setParentId(context.getWorkflowNodeId());
taskExecuteDTO.setTaskBatchId(context.getTaskBatchId());
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
} catch (Exception e) {
log.error("工作流执行失败", e);
}
if (JobTaskBatchStatusEnum.SUCCESS.getStatus() == taskBatchStatus) {
workflowTaskExecutor(context);
}
// 回传执行结果
context.setEvaluationResult(result);
context.setTaskBatchStatus(taskBatchStatus);
context.setOperationReason(operationReason);
context.setJobTaskStatus(jobTaskStatus);
context.setLogMessage(message);
}
JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context);
generatorContext.setTaskBatchStatus(taskBatchStatus);
generatorContext.setOperationReason(operationReason);
generatorContext.setJobId(SystemConstants.DECISION_JOB_ID);
JobTaskBatch jobTaskBatch = jobTaskBatchGenerator.generateJobTaskBatch(generatorContext);
@Override
protected boolean doPreValidate(WorkflowExecutorContext context) {
return true;
}
// 生成执行任务实例
JobTask jobTask = new JobTask();
jobTask.setGroupName(context.getGroupName());
jobTask.setNamespaceId(context.getNamespaceId());
jobTask.setJobId(SystemConstants.DECISION_JOB_ID);
jobTask.setClientInfo(StrUtil.EMPTY);
jobTask.setTaskBatchId(jobTaskBatch.getId());
jobTask.setArgsType(1);
jobTask.setArgsStr(Optional.ofNullable(context.getResult()).orElse(StrUtil.EMPTY));
jobTask.setTaskStatus(jobTaskStatus);
jobTask.setResultMessage(String.valueOf(result));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败"));
@Override
protected void afterExecute(WorkflowExecutorContext context) {
JobTaskBatch jobTaskBatch = generateJobTaskBatch(context);
JobTask jobTask = generateJobTask(context, jobTaskBatch);
// 保存执行的日志
JobLogDTO jobLogDTO = new JobLogDTO();
// TODO 等实时日志处理完毕后再处理
jobLogDTO.setMessage(message);
jobLogDTO.setMessage(context.getLogMessage());
jobLogDTO.setTaskId(jobTask.getId());
jobLogDTO.setJobId(SystemConstants.DECISION_JOB_ID);
jobLogDTO.setGroupName(context.getGroupName());
@ -148,5 +140,4 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
ActorRef actorRef = ActorGenerator.jobLogActor();
actorRef.tell(jobLogDTO, actorRef);
}
}

View File

@ -1,20 +1,33 @@
package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Objects;
/**
* @author xiaowoniu
* @date 2023-12-24 08:09:14
* @since 2.6.0
*/
@Component
@RequiredArgsConstructor
public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
@Override
@ -22,12 +35,27 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
return WorkflowNodeTypeEnum.JOB_TASK;
}
@Override
protected boolean doPreValidate(WorkflowExecutorContext context) {
return true;
}
@Override
protected void afterExecute(WorkflowExecutorContext context) {
}
@Override
protected void beforeExecute(WorkflowExecutorContext context) {
}
@Override
protected void doExecute(WorkflowExecutorContext context) {
// 生成任务批次
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(context.getJob());
jobTaskPrepare.setExecuteStrategy(JobExecuteStrategyEnum.WORKFLOW.getType());
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli() + DateUtils.toNowMilli() % 5000);
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli() + DateUtils.toNowMilli() % 1000);
jobTaskPrepare.setWorkflowNodeId(context.getWorkflowNodeId());
jobTaskPrepare.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
jobTaskPrepare.setParentWorkflowNodeId(context.getParentWorkflowNodeId());

View File

@ -61,7 +61,7 @@ public class WorkflowExecutorContext {
/**
* 条件节点的判定结果
*/
private Boolean evaluationResult;
private Object evaluationResult;
/**
* 调度任务id
@ -73,5 +73,23 @@ public class WorkflowExecutorContext {
*/
private String nodeInfo;
/**
* 任务批次状态
*/
private Integer taskBatchStatus;
/**
* 操作原因
*/
private Integer operationReason;
/**
* 任务状态
*/
private Integer jobTaskStatus;
/**
* 日志信息
*/
private String logMessage;
}

View File

@ -30,6 +30,8 @@ public class ExpressionInvocationHandler implements InvocationHandler {
if (StrUtil.isNotBlank(params)) {
contextMap = JsonUtil.parseHashMap(params);
}
return method.invoke(expressionEngine, contextMap);
args[1] = new Object[]{contextMap};
return method.invoke(expressionEngine, args);
}
}

View File

@ -6,6 +6,7 @@ import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
@ -51,7 +52,8 @@ public class JobTaskBatchGenerator {
jobTaskBatch.setCreateDt(LocalDateTime.now());
// 无执行的节点
if (CollectionUtils.isEmpty(CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()))) {
if (Objects.isNull(context.getOperationReason()) && Objects.isNull(context.getTaskBatchStatus()) &&
CollectionUtils.isEmpty(CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()))) {
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
jobTaskBatch.setOperationReason(JobOperationReasonEnum.NOT_CLIENT.getReason());
@ -105,7 +107,7 @@ public class JobTaskBatchGenerator {
jobTimerTaskDTO.setExecuteStrategy(context.getExecuteStrategy());
jobTimerTaskDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
jobTimerTaskDTO.setWorkflowNodeId(context.getWorkflowNodeId());
JobTimerWheel.register(jobTaskBatch.getId(),
JobTimerWheel.register(TaskTypeEnum.JOB.getType(), jobTaskBatch.getId(),
new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
return jobTaskBatch;

View File

@ -4,6 +4,7 @@ import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
@ -55,7 +56,7 @@ public class WorkflowBatchGenerator {
workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskBatch.getId());
workflowTimerTaskDTO.setWorkflowId(context.getWorkflowId());
workflowTimerTaskDTO.setExecuteStrategy(context.getExecuteStrategy());
JobTimerWheel.register(workflowTaskBatch.getId(),
JobTimerWheel.register(TaskTypeEnum.WORKFLOW.getType(), workflowTaskBatch.getId(),
new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
}
}

View File

@ -6,7 +6,6 @@ import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
@ -87,65 +86,35 @@ public class WorkflowBatchHandler {
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
.eq(WorkflowNode::getWorkflowNodeStatus, StatusEnum.YES.getStatus())
.in(WorkflowNode::getId, graph.nodes()));
if (jobTaskBatches.size() < workflowNodes.size()) {
return false;
}
Map<Long, WorkflowNode> workflowNodeMap = workflowNodes.stream()
.collect(Collectors.toMap(WorkflowNode::getId, workflowNode -> workflowNode));
Map<Long, List<JobTaskBatch>> map = jobTaskBatches.stream()
.collect(Collectors.groupingBy(JobTaskBatch::getParentWorkflowNodeId));
Map<Long, List<JobTaskBatch>> currentWorkflowNodeMap = jobTaskBatches.stream()
.collect(Collectors.groupingBy(JobTaskBatch::getWorkflowNodeId));
// 判定最后的工作流批次状态
int taskStatus = JobTaskBatchStatusEnum.SUCCESS.getStatus();
int operationReason = JobOperationReasonEnum.NONE.getReason();
for (final JobTaskBatch jobTaskBatch : jobTaskBatches) {
Set<Long> predecessors = graph.predecessors(jobTaskBatch.getWorkflowNodeId());
WorkflowNode workflowNode = workflowNodeMap.get(jobTaskBatch.getWorkflowNodeId());
// 条件节点是或的关系一个成功就代表成功
if (WorkflowNodeTypeEnum.DECISION.getType() == workflowNode.getNodeType()) {
for (final Long predecessor : predecessors) {
List<JobTaskBatch> jobTaskBatcheList = map.getOrDefault(predecessor, Lists.newArrayList());
Map<Integer, Long> statusCountMap = jobTaskBatcheList.stream()
.collect(Collectors.groupingBy(JobTaskBatch::getTaskBatchStatus, Collectors.counting()));
long successCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.SUCCESS.getStatus(), 0L);
long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L);
long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L);
if (successCount > 0) {
break;
}
if (failCount > 0) {
taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
break;
}
if (stopCount > 0) {
taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
break;
}
}
} else {
for (final Long predecessor : predecessors) {
List<JobTaskBatch> jobTaskBatcheList = map.getOrDefault(predecessor, Lists.newArrayList());
Map<Integer, Long> statusCountMap = jobTaskBatcheList.stream()
.collect(Collectors.groupingBy(JobTaskBatch::getTaskBatchStatus, Collectors.counting()));
long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L);
long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L);
long cancelCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.CANCEL.getStatus(), 0L);
// 一个节点没有成功则认为失败
if (failCount > 0 || stopCount > 0 || cancelCount > 0) {
taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
break;
}
// 判定所有的叶子节点是否完成
List<Long> leaves = MutableGraphCache.getLeaves(workflowTaskBatchId, flowInfo);
for (Long leaf : leaves) {
List<JobTaskBatch> jobTaskBatchList = currentWorkflowNodeMap.getOrDefault(leaf, Lists.newArrayList());
if (CollectionUtils.isEmpty(jobTaskBatchList)) {
boolean isNeedProcess = checkLeafCompleted(graph, currentWorkflowNodeMap, graph.predecessors(leaf));
// 说明当前叶子节点需要处理但是未处理返回false
if (isNeedProcess) {
return false;
}
}
if (taskStatus != JobTaskBatchStatusEnum.SUCCESS.getStatus()) {
break;
// 判定叶子节点的状态
for (JobTaskBatch jobTaskBatch : jobTaskBatchList) {
if (JobTaskBatchStatusEnum.NOT_SUCCESS.contains(jobTaskBatch.getTaskBatchStatus())) {
// 只要叶子节点不是无需处理的都是失败
if (JobOperationReasonEnum.WORKFLOW_NODE_NO_OPERATION_REQUIRED.getReason() != jobTaskBatch.getOperationReason()) {
taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
}
}
}
}
handlerTaskBatch(workflowTaskBatchId, taskStatus, operationReason);
@ -154,6 +123,34 @@ public class WorkflowBatchHandler {
}
private static boolean checkLeafCompleted(MutableGraph<Long> graph, Map<Long,
List<JobTaskBatch>> currentWorkflowNodeMap, Set<Long> parentIds) {
// 判定子节点是否需要处理
boolean isNeedProcess = true;
for (Long nodeId : parentIds) {
List<JobTaskBatch> jobTaskBatchList = currentWorkflowNodeMap.get(nodeId);
if (CollectionUtils.isEmpty(jobTaskBatchList)) {
// 递归查询有执行过的任务批次
isNeedProcess = isNeedProcess || checkLeafCompleted(graph, currentWorkflowNodeMap, graph.predecessors(nodeId));
continue;
}
for (JobTaskBatch jobTaskBatch : jobTaskBatchList) {
// 只要是无需处理的说明后面的子节点都不需要处理了isNeedProcess为false
if (JobOperationReasonEnum.WORKFLOW_NODE_NO_OPERATION_REQUIRED.getReason() == jobTaskBatch.getOperationReason()) {
isNeedProcess = false;
continue;
}
isNeedProcess = true;
}
}
return isNeedProcess;
}
private void handlerTaskBatch(Long workflowTaskBatchId, int taskStatus, int operationReason) {
WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch();
@ -215,7 +212,7 @@ public class WorkflowBatchHandler {
.orElseGet(() -> workflowTaskBatchMapper.selectById(workflowTaskBatchId));
Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("任务不存在"));
String flowInfo = workflowTaskBatch.getFlowInfo();
MutableGraph<Long> graph =MutableGraphCache.getOrDefault(workflowTaskBatchId, flowInfo);
MutableGraph<Long> graph = MutableGraphCache.getOrDefault(workflowTaskBatchId, flowInfo);
Set<Long> successors = graph.successors(SystemConstants.ROOT);
if (CollectionUtils.isEmpty(successors)) {
return;
@ -232,7 +229,15 @@ public class WorkflowBatchHandler {
checkWorkflowExecutor(SystemConstants.ROOT, workflowTaskBatchId, graph, jobTaskBatchMap);
}
private void checkWorkflowExecutor(Long parentId, Long workflowTaskBatchId, MutableGraph<Long> graph, Map<Long, JobTaskBatch> jobTaskBatchMap) {
private void checkWorkflowExecutor(Long parentId, Long workflowTaskBatchId, MutableGraph<Long> graph, Map<Long, JobTaskBatch> jobTaskBatchMap) {
// 判定条件节点是否已经执行完成
JobTaskBatch parentJobTaskBatch = jobTaskBatchMap.get(parentId);
if (Objects.nonNull(parentJobTaskBatch) &&
JobOperationReasonEnum.WORKFLOW_NODE_NO_OPERATION_REQUIRED.getReason()
== parentJobTaskBatch.getOperationReason()) {
return;
}
Set<Long> successors = graph.successors(parentId);
if (CollectionUtils.isEmpty(successors)) {
@ -245,12 +250,11 @@ public class WorkflowBatchHandler {
// 重新尝试执行, 重新生成任务批次
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(workflowTaskBatchId);
taskExecuteDTO.setWorkflowId(successor);
taskExecuteDTO.setExecuteStrategy(JobExecuteStrategyEnum.WORKFLOW.getType());
taskExecuteDTO.setParentId(parentId);
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
continue;
break;
}
if (NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus())) {
@ -258,18 +262,19 @@ public class WorkflowBatchHandler {
Job job = jobMapper.selectById(jobTaskBatch.getJobId());
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job);
jobTaskPrepare.setExecuteStrategy(JobExecuteStrategyEnum.WORKFLOW.getType());
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli());
jobTaskPrepare.setWorkflowNodeId(successor);
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli() + DateUtils.toNowMilli() % 1000);
jobTaskPrepare.setWorkflowTaskBatchId(workflowTaskBatchId);
jobTaskPrepare.setParentWorkflowNodeId(parentId);
// 执行预处理阶段
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
actorRef.tell(jobTaskPrepare, actorRef);
continue;
break;
}
// 已经是终态的需要递归遍历后继节点是否正常执行
checkWorkflowExecutor(successor, workflowTaskBatchId, graph, jobTaskBatchMap);
}
}
}

View File

@ -1,9 +1,11 @@
package com.aizuda.easy.retry.server.job.task.support.idempotent;
import cn.hutool.core.lang.Pair;
import com.aizuda.easy.retry.server.common.IdempotentStrategy;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.text.MessageFormat;
import java.time.LocalDateTime;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
@ -14,8 +16,9 @@ import java.util.concurrent.TimeUnit;
* @since 2.4.0
*/
public class TimerIdempotent implements IdempotentStrategy<Long, Long> {
private static final String KEY_FORMAT = "{0}_{1}_{2}";
private static final Cache<Long, Long> cache;
private static final Cache<String, Long> cache;
static {
cache = CacheBuilder.newBuilder()
@ -27,7 +30,7 @@ public class TimerIdempotent implements IdempotentStrategy<Long, Long> {
@Override
public boolean set(Long key, Long value) {
cache.put(key, value);
cache.put(getKey(key, value), value);
return Boolean.TRUE;
}
@ -38,12 +41,16 @@ public class TimerIdempotent implements IdempotentStrategy<Long, Long> {
@Override
public boolean isExist(Long key, Long value) {
return cache.asMap().containsKey(key);
return cache.asMap().containsKey(getKey(key, value));
}
@Override
public boolean clear(Long key, Long value) {
cache.invalidate(key);
cache.invalidate(getKey(key, value));
return Boolean.TRUE;
}
private static String getKey(Long key, Long value) {
return MessageFormat.format(KEY_FORMAT,key, value);
}
}

View File

@ -1,6 +1,7 @@
package com.aizuda.easy.retry.server.job.task.support.prepare.job;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
@ -32,7 +33,7 @@ public class WaitJobPrepareHandler extends AbstractJobPrePareHandler {
log.info("存在待处理任务. taskBatchId:[{}]", jobPrepareDTO.getTaskBatchId());
// 若时间轮中数据不存在则重新加入
if (!JobTimerWheel.isExisted(jobPrepareDTO.getTaskBatchId())) {
if (!JobTimerWheel.isExisted(TaskTypeEnum.JOB.getType(), jobPrepareDTO.getTaskBatchId())) {
log.info("存在待处理任务且时间轮中不存在 taskBatchId:[{}]", jobPrepareDTO.getTaskBatchId());
// 进入时间轮
@ -40,7 +41,7 @@ public class WaitJobPrepareHandler extends AbstractJobPrePareHandler {
JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO();
jobTimerTaskDTO.setTaskBatchId(jobPrepareDTO.getTaskBatchId());
jobTimerTaskDTO.setJobId(jobPrepareDTO.getJobId());
JobTimerWheel.register(jobPrepareDTO.getTaskBatchId(),
JobTimerWheel.register(TaskTypeEnum.JOB.getType(), jobPrepareDTO.getTaskBatchId(),
new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
}
}

View File

@ -1,6 +1,7 @@
package com.aizuda.easy.retry.server.job.task.support.prepare.workflow;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTimerTaskDTO;
@ -33,7 +34,7 @@ public class WaiWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler {
log.info("存在待处理任务. workflowTaskBatchId:[{}]", workflowTaskPrepareDTO.getWorkflowTaskBatchId());
// 若时间轮中数据不存在则重新加入
if (!JobTimerWheel.isExisted(workflowTaskPrepareDTO.getWorkflowTaskBatchId())) {
if (!JobTimerWheel.isExisted(TaskTypeEnum.WORKFLOW.getType(), workflowTaskPrepareDTO.getWorkflowTaskBatchId())) {
log.info("存在待处理任务且时间轮中不存在 workflowTaskBatchId:[{}]", workflowTaskPrepareDTO.getWorkflowTaskBatchId());
// 进入时间轮
@ -42,7 +43,7 @@ public class WaiWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler {
workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskPrepareDTO.getWorkflowTaskBatchId());
workflowTimerTaskDTO.setWorkflowId(workflowTaskPrepareDTO.getWorkflowId());
workflowTimerTaskDTO.setExecuteStrategy(workflowTaskPrepareDTO.getExecuteStrategy());
JobTimerWheel.register(workflowTaskPrepareDTO.getWorkflowTaskBatchId(),
JobTimerWheel.register(TaskTypeEnum.WORKFLOW.getType(), workflowTaskPrepareDTO.getWorkflowTaskBatchId(),
new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
}
}

View File

@ -41,9 +41,9 @@ public class JobTimerWheel implements Lifecycle {
timer.start();
}
public static void register(Long uniqueId, TimerTask task, long delay, TimeUnit unit) {
public static void register(Integer taskType, Long uniqueId, TimerTask task, long delay, TimeUnit unit) {
if (!isExisted(uniqueId)) {
if (!isExisted(taskType, uniqueId)) {
delay = delay < 0 ? 0 : delay;
log.info("加入时间轮. delay:[{}ms] uniqueId:[{}]", delay, uniqueId);
try {
@ -55,12 +55,12 @@ public class JobTimerWheel implements Lifecycle {
}
}
public static boolean isExisted(Long uniqueId) {
return idempotent.isExist(uniqueId, uniqueId);
public static boolean isExisted(Integer taskType, Long uniqueId) {
return idempotent.isExist(Long.valueOf(taskType), uniqueId);
}
public static void clearCache(Long uniqueId) {
idempotent.clear(uniqueId, uniqueId);
public static void clearCache(Integer taskType, Long uniqueId) {
idempotent.clear(Long.valueOf(taskType), uniqueId);
}
@Override

View File

@ -3,6 +3,7 @@ package com.aizuda.easy.retry.server.job.task.support.timer;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
@ -28,7 +29,7 @@ public class ResidentJobTimerTask implements TimerTask {
public void run(Timeout timeout) throws Exception {
try {
// 清除时间轮的缓存
JobTimerWheel.clearCache(jobTimerTaskDTO.getTaskBatchId());
JobTimerWheel.clearCache(TaskTypeEnum.JOB.getType(), jobTimerTaskDTO.getTaskBatchId());
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job);
jobTaskPrepare.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
// 执行预处理阶段

View File

@ -163,13 +163,17 @@ public class WorkflowHandler {
// 添加边
graph.putEdge(parentId, workflowNode.getId());
}
log.warn("workflowNodeId:[{}] parentIds:[{}]",
log.info("workflowNodeId:[{}] parentIds:[{}]",
workflowNode.getId(), JsonUtil.toJsonString(parentIds));
WorkflowRequestVO.NodeConfig childNode = nodeInfo.getChildNode();
if (Objects.nonNull(childNode) && !CollectionUtils.isEmpty(childNode.getConditionNodes())) {
buildGraph(Lists.newArrayList(workflowNode.getId()), deque, groupName, workflowId, childNode,
graph, version);
} else {
if (WorkflowNodeTypeEnum.DECISION.getType() == nodeConfig.getNodeType()) {
throw new EasyRetryServerException("决策节点不能作为叶子节点");
}
// 叶子节点记录一下
deque.add(workflowNode.getId());
}

View File

@ -2,7 +2,9 @@ package com.aizuda.easy.retry.server.web.service.impl;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.server.common.dto.JobTaskConfig;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.support.cache.MutableGraphCache;
import com.aizuda.easy.retry.server.web.model.base.PageResult;
@ -29,16 +31,15 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatc
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.graph.MutableGraph;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.stream.Collectors;
/**
@ -50,6 +51,7 @@ import java.util.stream.Collectors;
@Slf4j
@RequiredArgsConstructor
public class WorkflowBatchServiceImpl implements WorkflowBatchService {
private static final Integer NOT_HANDLE_STATUS = 99;
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
private final WorkflowMapper workflowMapper;
private final WorkflowNodeMapper workflowNodeMapper;
@ -114,6 +116,10 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
.collect(Collectors.groupingBy(JobTaskBatch::getWorkflowNodeId));
List<WorkflowDetailResponseVO.NodeInfo> nodeInfos = WorkflowConverter.INSTANCE.toNodeInfo(workflowNodes);
String flowInfo = workflowTaskBatch.getFlowInfo();
MutableGraph<Long> graph = MutableGraphCache.getOrDefault(id, flowInfo);
Set<Long> allNoOperationNode = Sets.newHashSet();
Map<Long, WorkflowDetailResponseVO.NodeInfo> workflowNodeMap = nodeInfos.stream()
.peek(nodeInfo -> {
List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMap.get(nodeInfo.getId());
@ -121,16 +127,36 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
nodeInfo.setJobBatchList(JobBatchResponseVOConverter.INSTANCE.jobTaskBatchToJobBatchResponseVOs(jobTaskBatchList));
// 取第最新的一条状态
nodeInfo.setTaskBatchStatus(jobTaskBatchList.get(0).getTaskBatchStatus());
} else {
// 前端显示待上游调度
nodeInfo.setTaskBatchStatus(-1);
if (jobTaskBatchList.stream()
.map(JobTaskBatch::getOperationReason)
.filter(Objects::nonNull)
.anyMatch(i -> i == JobOperationReasonEnum.WORKFLOW_NODE_NO_OPERATION_REQUIRED.getReason())) {
// 当前节点下面的所有节点都是无需处理的节点
Set<Long> allDescendants = MutableGraphCache.getAllDescendants(graph, nodeInfo.getId());
allNoOperationNode.addAll(allDescendants);
} else {
// 删除被误添加的节点
allNoOperationNode.remove(nodeInfo.getId());
}
}
})
.collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, i -> i));
String flowInfo = workflowTaskBatch.getFlowInfo();
for (Long noOperationNodeId : allNoOperationNode) {
WorkflowDetailResponseVO.NodeInfo nodeInfo = workflowNodeMap.get(noOperationNodeId);
JobBatchResponseVO jobBatchResponseVO = new JobBatchResponseVO();
JobTaskConfig jobTask = nodeInfo.getJobTask();
if (Objects.nonNull(jobTask)) {
jobBatchResponseVO.setJobId(jobTask.getJobId());
}
// 只为前端展示提供
nodeInfo.setTaskBatchStatus(NOT_HANDLE_STATUS);
jobBatchResponseVO.setTaskBatchStatus(NOT_HANDLE_STATUS);
jobBatchResponseVO.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_NO_OPERATION_REQUIRED.getReason());
nodeInfo.setJobBatchList(Lists.newArrayList(jobBatchResponseVO));
}
try {
MutableGraph<Long> graph = MutableGraphCache.getOrDefault(id, flowInfo);
// 反序列化构建图
WorkflowDetailResponseVO.NodeConfig config = workflowHandler.buildNodeConfig(graph, SystemConstants.ROOT, new HashMap<>(), workflowNodeMap);
responseVO.setNodeConfig(config);

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -5,8 +5,8 @@
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Easy Retry</title>
<script type="module" crossorigin src="./assets/uHyOSDtN.js"></script>
<link rel="stylesheet" crossorigin href="./assets/20ce6E84.css">
<script type="module" crossorigin src="./assets/JMm1tHoX.js"></script>
<link rel="stylesheet" crossorigin href="./assets/7BncgBnW.css">
</head>
<body>

View File

@ -33,8 +33,6 @@ function plugin (Vue) {
return permissionList.find((val) => {
return val.permissionId === permission
}).actionList.findIndex((val) => {
console.log(val)
console.log(action)
return val === action
}) > -1
}

View File

@ -61,13 +61,13 @@
:rowKey="(record) => record.id"
:columns="columns"
:data="loadData"
:scroll="{ x: 1800 }"
:scroll="{ x: 1600 }"
>
<span slot="serial" slot-scope="text, record">
{{ record.id }}
</span>
<span slot="workflowName" slot-scope="text, record">
<a href="#" @click="handlerOpenDrawer(record)">{{ text }}</a>
<span slot="workflowName" slot-scope="text">
{{ text }}
</span>
<span slot="workflowStatus" slot-scope="text">
<a-tag :color="workflowStatus[text].color">
@ -129,7 +129,7 @@
>
<a href="javascript:;" v-if="record.workflowStatus === 0">删除</a>
</a-popconfirm>
<a-divider type="vertical" />
<a-divider type="vertical" v-if="record.workflowStatus === 0"/>
<a-popconfirm
title="是否复制此工作流?"
ok-text="复制"
@ -144,18 +144,6 @@
</span>
</s-table>
<Drawer
title="任务详情"
placement="right"
:width="800"
:visibleAmplify="true"
:visible="openDrawer"
@closeDrawer="onClose"
@handlerAmplify="handleInfo"
>
<job-info ref="jobInfoRef" :showHeader="false" :column="2"/>
</Drawer>
</a-card>
</template>
@ -199,22 +187,20 @@ export default {
fixed: 'left'
},
{
title: '任务名称',
title: '工作流名称',
dataIndex: 'workflowName',
scopedSlots: { customRender: 'workflowName' },
ellipsis: true,
fixed: 'left'
scopedSlots: { customRender: 'workflowName' }
},
{
title: '组名称',
dataIndex: 'groupName',
width: '10%'
width: '15%'
},
{
title: '触发时间',
dataIndex: 'nextTriggerAt',
width: '10%',
ellipsis: true
ellipsis: true,
width: '10%'
},
{
title: '状态',
@ -240,8 +226,7 @@ export default {
{
title: '更新时间',
dataIndex: 'updateDt',
sorter: true,
width: '10%'
sorter: true
},
{
title: '操作',