feat: 2.6.0

1. 优化回调节点
This commit is contained in:
byteblogs168 2024-01-03 17:55:43 +08:00
parent a382fd4fdd
commit 941a70a267
35 changed files with 129 additions and 64 deletions

View File

@ -9,6 +9,8 @@ import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order; import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.time.Duration;
/** /**
* 缓存本地的分布式锁的名称 * 缓存本地的分布式锁的名称
* *
@ -48,6 +50,7 @@ public class CacheLockRecord implements Lifecycle {
CACHE = CacheBuilder.newBuilder() CACHE = CacheBuilder.newBuilder()
// 设置并发级别为cpu核心数 // 设置并发级别为cpu核心数
.concurrencyLevel(Runtime.getRuntime().availableProcessors()) .concurrencyLevel(Runtime.getRuntime().availableProcessors())
.expireAfterWrite(Duration.ofHours(1))
.build(); .build();
} }

View File

@ -20,7 +20,7 @@ public class CallbackConfig {
/** /**
* 请求类型 * 请求类型
*/ */
private String contentType; private Integer contentType;
/** /**
* 秘钥 * 秘钥

View File

@ -0,0 +1,32 @@
package com.aizuda.easy.retry.server.common.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.springframework.http.MediaType;
/**
* @author: xiaowoniu
* @date : 2024-01-03
* @since : 2.6.0
*/
@AllArgsConstructor
@Getter
public enum ContentTypeEnum {
JSON(1, MediaType.APPLICATION_JSON),
FORM(2, MediaType.APPLICATION_FORM_URLENCODED)
;
private final Integer type;
private final MediaType mediaType;
public static ContentTypeEnum valueOf(Integer type) {
for (ContentTypeEnum contentTypeEnum : values()) {
if (contentTypeEnum.getType().equals(type)) {
return contentTypeEnum;
}
}
return ContentTypeEnum.JSON;
}
}

View File

@ -3,6 +3,7 @@ package com.aizuda.easy.retry.server.common.lock;
import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.Lifecycle; import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.cache.CacheLockRecord;
import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.LockConfig; import com.aizuda.easy.retry.server.common.dto.LockConfig;
import com.aizuda.easy.retry.server.common.enums.UnLockOperationEnum; import com.aizuda.easy.retry.server.common.enums.UnLockOperationEnum;
@ -60,6 +61,7 @@ public class JdbcLockProvider extends AbstractLockProvider implements Lifecycle
return distributedLockMapper.update(distributedLock, new LambdaUpdateWrapper<DistributedLock>() return distributedLockMapper.update(distributedLock, new LambdaUpdateWrapper<DistributedLock>()
.eq(DistributedLock::getName, lockConfig.getLockName())) > 0; .eq(DistributedLock::getName, lockConfig.getLockName())) > 0;
} else { } else {
CacheLockRecord.remove(lockConfig.getLockName());
return distributedLockMapper.delete(new LambdaUpdateWrapper<DistributedLock>() return distributedLockMapper.delete(new LambdaUpdateWrapper<DistributedLock>()
.eq(DistributedLock::getName, lockConfig.getLockName())) > 0; .eq(DistributedLock::getName, lockConfig.getLockName())) > 0;
} }

View File

@ -53,9 +53,9 @@ public class JobTaskPrepareDTO {
private boolean onlyTimeoutCheck; private boolean onlyTimeoutCheck;
/** /**
* 触发类似 1auto 2manual * 执行策略 1auto 2manual 3workflow
*/ */
private Integer triggerType; private Integer executeStrategy;
/** /**
* 工作流任务批次id * 工作流任务批次id

View File

@ -19,7 +19,7 @@ public class JobTimerTaskDTO {
private Long workflowNodeId; private Long workflowNodeId;
/** /**
* 触发类似 1auto 2manual * 执行策略 1auto 2manual 3workflow
*/ */
private Integer triggerType; private Integer executeStrategy;
} }

View File

@ -18,8 +18,8 @@ public class TaskExecuteDTO {
private Long workflowNodeId; private Long workflowNodeId;
/** /**
* 触发类似 1auto 2manual * 执行策略 1auto 2manual 3workflow
*/ */
private Integer triggerType; private Integer executeStrategy;
} }

View File

@ -20,9 +20,9 @@ public class WorkflowNodeTaskExecuteDTO {
*/ */
private Long workflowTaskBatchId; private Long workflowTaskBatchId;
/** /**
* 触发类似 1auto 2manual * 执行策略 1auto 2manual 3workflow
*/ */
private Integer triggerType; private Integer executeStrategy;
private Long parentId; private Long parentId;

View File

@ -15,9 +15,9 @@ public class WorkflowTaskPrepareDTO {
private Long workflowId; private Long workflowId;
/** /**
* 触发类似 1auto 2manual * 执行策略 1auto 2manual 3workflow
*/ */
private Integer triggerType; private Integer executeStrategy;
/** /**
* 阻塞策略 1丢弃 2覆盖 3并行 * 阻塞策略 1丢弃 2覆盖 3并行

View File

@ -15,7 +15,7 @@ public class WorkflowTimerTaskDTO {
private Long workflowId; private Long workflowId;
/** /**
* 触发类似 1auto 2manual * 执行策略 1auto 2manual 3workflow
*/ */
private Integer triggerType; private Integer executeStrategy;
} }

View File

@ -28,9 +28,8 @@ public class WorkflowBlockStrategyContext extends BlockStrategyContext {
*/ */
private String flowInfo; private String flowInfo;
/** /**
* 触发类似 1auto 2manual * 执行策略 1auto 2manual 3workflow
*/ */
private Integer triggerType; private Integer executeStrategy;
} }

View File

@ -90,7 +90,7 @@ public class JobExecutorActor extends AbstractActor {
LambdaQueryWrapper<Job> queryWrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<Job> queryWrapper = new LambdaQueryWrapper<>();
// 自动地校验任务必须是开启状态手动触发无需校验 // 自动地校验任务必须是开启状态手动触发无需校验
if (JobExecuteStrategyEnum.AUTO.getType().equals(taskExecute.getTriggerType())) { if (JobExecuteStrategyEnum.AUTO.getType().equals(taskExecute.getExecuteStrategy())) {
queryWrapper.eq(Job::getJobStatus, StatusEnum.YES.getStatus()); queryWrapper.eq(Job::getJobStatus, StatusEnum.YES.getStatus());
} }
@ -116,7 +116,7 @@ public class JobExecutorActor extends AbstractActor {
try { try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId()); taskExecuteDTO.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
taskExecuteDTO.setTriggerType(JobExecuteStrategyEnum.AUTO.getType()); taskExecuteDTO.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
taskExecuteDTO.setParentId(taskExecute.getWorkflowNodeId()); taskExecuteDTO.setParentId(taskExecute.getWorkflowNodeId());
taskExecuteDTO.setTaskBatchId(taskExecute.getTaskBatchId()); taskExecuteDTO.setTaskBatchId(taskExecute.getTaskBatchId());
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
@ -175,8 +175,8 @@ public class JobExecutorActor extends AbstractActor {
private void doHandlerResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) { private void doHandlerResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) {
if (Objects.isNull(job) if (Objects.isNull(job)
|| JobExecuteStrategyEnum.MANUAL.getType().equals(taskExecuteDTO.getTriggerType()) || JobExecuteStrategyEnum.MANUAL.getType().equals(taskExecuteDTO.getExecuteStrategy())
|| JobExecuteStrategyEnum.WORKFLOW.getType().equals(taskExecuteDTO.getTriggerType()) || JobExecuteStrategyEnum.WORKFLOW.getType().equals(taskExecuteDTO.getExecuteStrategy())
// 是否是常驻任务 // 是否是常驻任务
|| Objects.equals(StatusEnum.NO.getStatus(), job.getResident()) || Objects.equals(StatusEnum.NO.getStatus(), job.getResident())
) { ) {
@ -186,7 +186,7 @@ public class JobExecutorActor extends AbstractActor {
JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO(); JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO();
jobTimerTaskDTO.setJobId(taskExecuteDTO.getJobId()); jobTimerTaskDTO.setJobId(taskExecuteDTO.getJobId());
jobTimerTaskDTO.setTaskBatchId(taskExecuteDTO.getTaskBatchId()); jobTimerTaskDTO.setTaskBatchId(taskExecuteDTO.getTaskBatchId());
jobTimerTaskDTO.setTriggerType(JobExecuteStrategyEnum.AUTO.getType()); jobTimerTaskDTO.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
ResidentJobTimerTask timerTask = new ResidentJobTimerTask(jobTimerTaskDTO, job); ResidentJobTimerTask timerTask = new ResidentJobTimerTask(jobTimerTaskDTO, job);
WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(job.getTriggerType()); WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(job.getTriggerType());

View File

@ -92,7 +92,7 @@ public class ScanJobTaskActor extends AbstractActor {
for (final JobTaskPrepareDTO waitExecJob : waitExecJobs) { for (final JobTaskPrepareDTO waitExecJob : waitExecJobs) {
// 执行预处理阶段 // 执行预处理阶段
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor(); ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
waitExecJob.setTriggerType(JobExecuteStrategyEnum.AUTO.getType()); waitExecJob.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
actorRef.tell(waitExecJob, actorRef); actorRef.tell(waitExecJob, actorRef);
} }
} }

View File

@ -79,7 +79,7 @@ public class ScanWorkflowTaskActor extends AbstractActor {
for (final WorkflowTaskPrepareDTO waitExecTask : waitExecWorkflows) { for (final WorkflowTaskPrepareDTO waitExecTask : waitExecWorkflows) {
// 执行预处理阶段 // 执行预处理阶段
ActorRef actorRef = ActorGenerator.workflowTaskPrepareActor(); ActorRef actorRef = ActorGenerator.workflowTaskPrepareActor();
waitExecTask.setTriggerType(JobExecuteStrategyEnum.AUTO.getType()); waitExecTask.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
actorRef.tell(waitExecTask, actorRef); actorRef.tell(waitExecTask, actorRef);
} }
} }

View File

@ -5,6 +5,7 @@ import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.FailStrategyEnum; import com.aizuda.easy.retry.common.core.enums.FailStrategyEnum;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
@ -96,7 +97,9 @@ public class WorkflowExecutorActor extends AbstractActor {
); );
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>() List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
.in(WorkflowNode::getId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId()))).orderByAsc(WorkflowNode::getPriorityLevel)); .eq(WorkflowNode::getWorkflowNodeStatus, StatusEnum.YES.getStatus())
.in(WorkflowNode::getId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId())))
.orderByAsc(WorkflowNode::getPriorityLevel));
Map<Long, List<JobTaskBatch>> jobTaskBatchMap = allJobTaskBatchList.stream().collect(Collectors.groupingBy(JobTaskBatch::getWorkflowNodeId)); Map<Long, List<JobTaskBatch>> jobTaskBatchMap = allJobTaskBatchList.stream().collect(Collectors.groupingBy(JobTaskBatch::getWorkflowNodeId));
Map<Long, WorkflowNode> workflowNodeMap = workflowNodes.stream().collect(Collectors.toMap(WorkflowNode::getId, i -> i)); Map<Long, WorkflowNode> workflowNodeMap = workflowNodes.stream().collect(Collectors.toMap(WorkflowNode::getId, i -> i));

View File

@ -11,6 +11,8 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.text.MessageFormat;
/** /**
* @author xiaowoniu * @author xiaowoniu
* @date 2023-12-24 08:15:19 * @date 2023-12-24 08:15:19
@ -19,6 +21,7 @@ import org.springframework.transaction.annotation.Transactional;
@Slf4j @Slf4j
public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, InitializingBean { public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, InitializingBean {
private static final String KEY = "workflow_execute_{0}_{1}";
@Autowired @Autowired
private DistributedLockHandler distributedLockHandler; private DistributedLockHandler distributedLockHandler;
@Autowired @Autowired
@ -27,7 +30,8 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
@Override @Override
@Transactional @Transactional
public void execute(WorkflowExecutorContext context) { public void execute(WorkflowExecutorContext context) {
distributedLockHandler.lockAndProcessAfterUnlockDel("workflow_execute_" + context.getWorkflowNodeId(), "PT5S", distributedLockHandler.lockAndProcessAfterUnlockDel(
MessageFormat.format(KEY, context.getWorkflowTaskBatchId(), context.getWorkflowNodeId()), "PT5S",
() -> { () -> {
Long total = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper<JobTaskBatch>() Long total = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper<JobTaskBatch>()

View File

@ -10,7 +10,9 @@ import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum; import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.client.RequestInterceptor;
import com.aizuda.easy.retry.server.common.dto.CallbackConfig; import com.aizuda.easy.retry.server.common.dto.CallbackConfig;
import com.aizuda.easy.retry.server.common.enums.ContentTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO; import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter; import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
@ -26,6 +28,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpEntity; import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod; import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
@ -33,6 +36,7 @@ import org.springframework.web.client.RestTemplate;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
/** /**
@ -45,6 +49,7 @@ import java.util.Optional;
@Slf4j @Slf4j
public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor { public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
private static final String SECRET = "secret"; private static final String SECRET = "secret";
private static final String CALLBACK_TIMEOUT = "10";
private final RestTemplate restTemplate; private final RestTemplate restTemplate;
private final JobTaskMapper jobTaskMapper; private final JobTaskMapper jobTaskMapper;
private final JobTaskBatchGenerator jobTaskBatchGenerator; private final JobTaskBatchGenerator jobTaskBatchGenerator;
@ -64,15 +69,17 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
String message = StrUtil.EMPTY; String message = StrUtil.EMPTY;
HttpHeaders requestHeaders = new HttpHeaders(); HttpHeaders requestHeaders = new HttpHeaders();
requestHeaders.set(HttpHeaders.CONTENT_TYPE, decisionConfig.getContentType());
requestHeaders.set(SECRET, decisionConfig.getSecret()); requestHeaders.set(SECRET, decisionConfig.getSecret());
requestHeaders.setContentType(ContentTypeEnum.valueOf(decisionConfig.getContentType()).getMediaType());
// 设置回调超时时间
requestHeaders.set(RequestInterceptor.TIMEOUT_TIME, CALLBACK_TIMEOUT);
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>() List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getResultMessage, JobTask::getClientInfo) .select(JobTask::getResultMessage, JobTask::getClientInfo)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId())); .eq(JobTask::getTaskBatchId, context.getTaskBatchId()));
List<CallbackParamsDTO> callbackParamsList = WorkflowTaskConverter.INSTANCE.toCallbackParamsDTO(jobTasks); List<CallbackParamsDTO> callbackParamsList = WorkflowTaskConverter.INSTANCE.toCallbackParamsDTO(jobTasks);
String result = StrUtil.EMPTY; String result = null;
try { try {
Map<String, String> uriVariables = new HashMap<>(); Map<String, String> uriVariables = new HashMap<>();
uriVariables.put(SECRET, decisionConfig.getSecret()); uriVariables.put(SECRET, decisionConfig.getSecret());
@ -92,7 +99,7 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context); JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context);
generatorContext.setTaskBatchStatus(taskBatchStatus); generatorContext.setTaskBatchStatus(taskBatchStatus);
generatorContext.setOperationReason(operationReason); generatorContext.setOperationReason(operationReason);
generatorContext.setJobId(SystemConstants.DECISION_JOB_ID); generatorContext.setJobId(SystemConstants.CALLBACK_JOB_ID);
JobTaskBatch jobTaskBatch = jobTaskBatchGenerator.generateJobTaskBatch(generatorContext); JobTaskBatch jobTaskBatch = jobTaskBatchGenerator.generateJobTaskBatch(generatorContext);
// 生成执行任务实例 // 生成执行任务实例
@ -105,7 +112,7 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
jobTask.setArgsType(1); jobTask.setArgsType(1);
jobTask.setArgsStr(Optional.ofNullable(context.getResult()).orElse(StrUtil.EMPTY)); jobTask.setArgsStr(Optional.ofNullable(context.getResult()).orElse(StrUtil.EMPTY));
jobTask.setTaskStatus(jobTaskStatus); jobTask.setTaskStatus(jobTaskStatus);
jobTask.setResultMessage(result); jobTask.setResultMessage(Optional.ofNullable(result).orElse(StrUtil.EMPTY));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败")); Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败"));
// 保存执行的日志 // 保存执行的日志

View File

@ -104,7 +104,7 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
try { try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
taskExecuteDTO.setTriggerType(JobExecuteStrategyEnum.AUTO.getType()); taskExecuteDTO.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
taskExecuteDTO.setParentId(context.getWorkflowNodeId()); taskExecuteDTO.setParentId(context.getWorkflowNodeId());
taskExecuteDTO.setTaskBatchId(context.getTaskBatchId()); taskExecuteDTO.setTaskBatchId(context.getTaskBatchId());
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();

View File

@ -26,7 +26,7 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
protected void doExecute(WorkflowExecutorContext context) { protected void doExecute(WorkflowExecutorContext context) {
// 生成任务批次 // 生成任务批次
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(context.getJob()); JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(context.getJob());
jobTaskPrepare.setTriggerType(JobExecuteStrategyEnum.WORKFLOW.getType()); jobTaskPrepare.setExecuteStrategy(JobExecuteStrategyEnum.WORKFLOW.getType());
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli() + DateUtils.toNowMilli() % 5000); jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli() + DateUtils.toNowMilli() % 5000);
jobTaskPrepare.setWorkflowNodeId(context.getWorkflowNodeId()); jobTaskPrepare.setWorkflowNodeId(context.getWorkflowNodeId());
jobTaskPrepare.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); jobTaskPrepare.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());

View File

@ -64,7 +64,7 @@ public class JobTaskBatchGenerator {
try { try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
taskExecuteDTO.setTriggerType(JobExecuteStrategyEnum.AUTO.getType()); taskExecuteDTO.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
taskExecuteDTO.setParentId(context.getWorkflowNodeId()); taskExecuteDTO.setParentId(context.getWorkflowNodeId());
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef); actorRef.tell(taskExecuteDTO, actorRef);
@ -102,7 +102,7 @@ public class JobTaskBatchGenerator {
JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO(); JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO();
jobTimerTaskDTO.setTaskBatchId(jobTaskBatch.getId()); jobTimerTaskDTO.setTaskBatchId(jobTaskBatch.getId());
jobTimerTaskDTO.setJobId(context.getJobId()); jobTimerTaskDTO.setJobId(context.getJobId());
jobTimerTaskDTO.setTriggerType(context.getTriggerType()); jobTimerTaskDTO.setExecuteStrategy(context.getExecuteStrategy());
jobTimerTaskDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); jobTimerTaskDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
jobTimerTaskDTO.setWorkflowNodeId(context.getWorkflowNodeId()); jobTimerTaskDTO.setWorkflowNodeId(context.getWorkflowNodeId());
JobTimerWheel.register(jobTaskBatch.getId(), JobTimerWheel.register(jobTaskBatch.getId(),

View File

@ -41,9 +41,9 @@ public class JobTaskBatchGeneratorContext {
private Integer taskBatchStatus; private Integer taskBatchStatus;
/** /**
* 触发类似 1auto 2manual * 执行策略 1auto 2manual 3workflow
*/ */
private Integer triggerType; private Integer executeStrategy;
/** /**
* 工作流任务批次id * 工作流任务批次id

View File

@ -54,7 +54,7 @@ public class WorkflowBatchGenerator {
WorkflowTimerTaskDTO workflowTimerTaskDTO = new WorkflowTimerTaskDTO(); WorkflowTimerTaskDTO workflowTimerTaskDTO = new WorkflowTimerTaskDTO();
workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskBatch.getId()); workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskBatch.getId());
workflowTimerTaskDTO.setWorkflowId(context.getWorkflowId()); workflowTimerTaskDTO.setWorkflowId(context.getWorkflowId());
workflowTimerTaskDTO.setTriggerType(context.getTriggerType()); workflowTimerTaskDTO.setExecuteStrategy(context.getExecuteStrategy());
JobTimerWheel.register(workflowTaskBatch.getId(), JobTimerWheel.register(workflowTaskBatch.getId(),
new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS); new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
} }

View File

@ -38,9 +38,9 @@ public class WorkflowTaskBatchGeneratorContext {
private Integer taskBatchStatus; private Integer taskBatchStatus;
/** /**
* 触发类似 1auto 2manual * 执行策略 1auto 2manual 3workflow
*/ */
private Integer triggerType; private Integer executeStrategy;
/** /**
* 流程信息 * 流程信息

View File

@ -80,7 +80,7 @@ public class JobTaskBatchHandler {
try { try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(completeJobBatchDTO.getWorkflowTaskBatchId()); taskExecuteDTO.setWorkflowTaskBatchId(completeJobBatchDTO.getWorkflowTaskBatchId());
taskExecuteDTO.setTriggerType(JobExecuteStrategyEnum.AUTO.getType()); taskExecuteDTO.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
taskExecuteDTO.setParentId(completeJobBatchDTO.getWorkflowNodeId()); taskExecuteDTO.setParentId(completeJobBatchDTO.getWorkflowNodeId());
// 这里取第一个的任务执行结果 // 这里取第一个的任务执行结果
taskExecuteDTO.setTaskBatchId(completeJobBatchDTO.getTaskBatchId()); taskExecuteDTO.setTaskBatchId(completeJobBatchDTO.getTaskBatchId());

View File

@ -5,6 +5,7 @@ import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum; import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum; import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
@ -84,6 +85,7 @@ public class WorkflowBatchHandler {
} }
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>() List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
.eq(WorkflowNode::getWorkflowNodeStatus, StatusEnum.YES.getStatus())
.in(WorkflowNode::getId, graph.nodes())); .in(WorkflowNode::getId, graph.nodes()));
if (jobTaskBatches.size() < workflowNodes.size()) { if (jobTaskBatches.size() < workflowNodes.size()) {
return false; return false;
@ -115,40 +117,31 @@ public class WorkflowBatchHandler {
if (failCount > 0) { if (failCount > 0) {
taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus(); taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
operationReason = JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason();
break; break;
} }
if (stopCount > 0) { if (stopCount > 0) {
taskStatus = JobTaskBatchStatusEnum.STOP.getStatus(); taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
operationReason = JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason();
break; break;
} }
} }
} else { } else {
for (final Long predecessor : predecessors) { for (final Long predecessor : predecessors) {
List<JobTaskBatch> jobTaskBatcheList = map.getOrDefault(predecessor, Lists.newArrayList()); List<JobTaskBatch> jobTaskBatcheList = map.getOrDefault(predecessor, Lists.newArrayList());
Map<Integer, Long> statusCountMap = jobTaskBatcheList.stream() Map<Integer, Long> statusCountMap = jobTaskBatcheList.stream()
.collect(Collectors.groupingBy(JobTaskBatch::getTaskBatchStatus, Collectors.counting())); .collect(Collectors.groupingBy(JobTaskBatch::getTaskBatchStatus, Collectors.counting()));
long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L); long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L);
long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L); long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L);
if (failCount > 0) { long cancelCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.CANCEL.getStatus(), 0L);
// 一个节点没有成功则认为失败
if (failCount > 0 || stopCount > 0 || cancelCount > 0) {
taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus(); taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
operationReason = JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason();
break;
}
if (stopCount > 0) {
taskStatus = JobTaskBatchStatusEnum.STOP.getStatus();
operationReason = JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason();
break; break;
} }
} }
} }
if (taskStatus != JobTaskBatchStatusEnum.SUCCESS.getStatus()) { if (taskStatus != JobTaskBatchStatusEnum.SUCCESS.getStatus()) {
break; break;
} }
@ -253,7 +246,7 @@ public class WorkflowBatchHandler {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(workflowTaskBatchId); taskExecuteDTO.setWorkflowTaskBatchId(workflowTaskBatchId);
taskExecuteDTO.setWorkflowId(successor); taskExecuteDTO.setWorkflowId(successor);
taskExecuteDTO.setTriggerType(1); taskExecuteDTO.setExecuteStrategy(JobExecuteStrategyEnum.WORKFLOW.getType());
taskExecuteDTO.setParentId(parentId); taskExecuteDTO.setParentId(parentId);
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef); actorRef.tell(taskExecuteDTO, actorRef);
@ -264,7 +257,7 @@ public class WorkflowBatchHandler {
// 生成任务批次 // 生成任务批次
Job job = jobMapper.selectById(jobTaskBatch.getJobId()); Job job = jobMapper.selectById(jobTaskBatch.getJobId());
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job); JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job);
jobTaskPrepare.setTriggerType(JobExecuteStrategyEnum.WORKFLOW.getType()); jobTaskPrepare.setExecuteStrategy(JobExecuteStrategyEnum.WORKFLOW.getType());
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli()); jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli());
jobTaskPrepare.setWorkflowNodeId(successor); jobTaskPrepare.setWorkflowNodeId(successor);
jobTaskPrepare.setWorkflowTaskBatchId(workflowTaskBatchId); jobTaskPrepare.setWorkflowTaskBatchId(workflowTaskBatchId);

View File

@ -41,7 +41,7 @@ public class WaiWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler {
WorkflowTimerTaskDTO workflowTimerTaskDTO = new WorkflowTimerTaskDTO(); WorkflowTimerTaskDTO workflowTimerTaskDTO = new WorkflowTimerTaskDTO();
workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskPrepareDTO.getWorkflowTaskBatchId()); workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskPrepareDTO.getWorkflowTaskBatchId());
workflowTimerTaskDTO.setWorkflowId(workflowTaskPrepareDTO.getWorkflowId()); workflowTimerTaskDTO.setWorkflowId(workflowTaskPrepareDTO.getWorkflowId());
workflowTimerTaskDTO.setTriggerType(workflowTaskPrepareDTO.getTriggerType()); workflowTimerTaskDTO.setExecuteStrategy(workflowTaskPrepareDTO.getExecuteStrategy());
JobTimerWheel.register(workflowTaskPrepareDTO.getWorkflowTaskBatchId(), JobTimerWheel.register(workflowTaskPrepareDTO.getWorkflowTaskBatchId(),
new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS); new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
} }

View File

@ -31,7 +31,7 @@ public class JobTimerTask implements TimerTask {
TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO(); TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO();
taskExecuteDTO.setTaskBatchId(jobTimerTaskDTO.getTaskBatchId()); taskExecuteDTO.setTaskBatchId(jobTimerTaskDTO.getTaskBatchId());
taskExecuteDTO.setJobId(jobTimerTaskDTO.getJobId()); taskExecuteDTO.setJobId(jobTimerTaskDTO.getJobId());
taskExecuteDTO.setTriggerType(jobTimerTaskDTO.getTriggerType()); taskExecuteDTO.setExecuteStrategy(jobTimerTaskDTO.getExecuteStrategy());
taskExecuteDTO.setWorkflowTaskBatchId(jobTimerTaskDTO.getWorkflowTaskBatchId()); taskExecuteDTO.setWorkflowTaskBatchId(jobTimerTaskDTO.getWorkflowTaskBatchId());
taskExecuteDTO.setWorkflowNodeId(jobTimerTaskDTO.getWorkflowNodeId()); taskExecuteDTO.setWorkflowNodeId(jobTimerTaskDTO.getWorkflowNodeId());
ActorRef actorRef = ActorGenerator.jobTaskExecutorActor(); ActorRef actorRef = ActorGenerator.jobTaskExecutorActor();

View File

@ -30,7 +30,7 @@ public class ResidentJobTimerTask implements TimerTask {
// 清除时间轮的缓存 // 清除时间轮的缓存
JobTimerWheel.clearCache(jobTimerTaskDTO.getTaskBatchId()); JobTimerWheel.clearCache(jobTimerTaskDTO.getTaskBatchId());
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job); JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job);
jobTaskPrepare.setTriggerType(JobExecuteStrategyEnum.AUTO.getType()); jobTaskPrepare.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
// 执行预处理阶段 // 执行预处理阶段
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor(); ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
actorRef.tell(jobTaskPrepare, actorRef); actorRef.tell(jobTaskPrepare, actorRef);

View File

@ -47,7 +47,7 @@ public class WorkflowTimerTask implements TimerTask {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(workflowTimerTaskDTO.getWorkflowTaskBatchId()); taskExecuteDTO.setWorkflowTaskBatchId(workflowTimerTaskDTO.getWorkflowTaskBatchId());
taskExecuteDTO.setWorkflowId(workflowTimerTaskDTO.getWorkflowId()); taskExecuteDTO.setWorkflowId(workflowTimerTaskDTO.getWorkflowId());
taskExecuteDTO.setTriggerType(workflowTimerTaskDTO.getTriggerType()); taskExecuteDTO.setExecuteStrategy(workflowTimerTaskDTO.getExecuteStrategy());
taskExecuteDTO.setParentId(SystemConstants.ROOT); taskExecuteDTO.setParentId(SystemConstants.ROOT);
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef); actorRef.tell(taskExecuteDTO, actorRef);

View File

@ -36,6 +36,11 @@ public class WorkflowDetailResponseVO {
*/ */
private Integer triggerType; private Integer triggerType;
/**
* 阻塞策略
*/
private Integer blockStrategy;
/** /**
* 触发间隔 * 触发间隔
*/ */
@ -109,6 +114,11 @@ public class WorkflowDetailResponseVO {
*/ */
private Integer failStrategy; private Integer failStrategy;
/**
* 任务批次状态
*/
private Integer taskBatchStatus;
/** /**
* 判定配置 * 判定配置
*/ */

View File

@ -47,7 +47,7 @@ public class WorkflowResponseVO {
/** /**
* 任务执行时间 * 任务执行时间
*/ */
private Long nextTriggerAt; private LocalDateTime nextTriggerAt;
/** /**
* 创建时间 * 创建时间

View File

@ -50,6 +50,11 @@ public interface WorkflowConverter {
List<WorkflowResponseVO> toWorkflowResponseVO(List<Workflow> workflowList); List<WorkflowResponseVO> toWorkflowResponseVO(List<Workflow> workflowList);
@Mappings({
@Mapping(target = "nextTriggerAt", expression = "java(WorkflowConverter.toLocalDateTime(workflow.getNextTriggerAt()))")
})
WorkflowResponseVO toWorkflowResponseVO(Workflow workflow);
List<WorkflowBatchResponseVO> toWorkflowBatchResponseVO(List<WorkflowBatchResponseDO> workflowBatchResponseList); List<WorkflowBatchResponseVO> toWorkflowBatchResponseVO(List<WorkflowBatchResponseDO> workflowBatchResponseList);
@Mappings({ @Mappings({

View File

@ -230,7 +230,7 @@ public class JobServiceImpl implements JobService {
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job); JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job);
// 设置now表示立即执行 // 设置now表示立即执行
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli()); jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli());
jobTaskPrepare.setTriggerType(JobExecuteStrategyEnum.MANUAL.getType()); jobTaskPrepare.setExecuteStrategy(JobExecuteStrategyEnum.MANUAL.getType());
// 创建批次 // 创建批次
jobPrePareHandler.handler(jobTaskPrepare); jobPrePareHandler.handler(jobTaskPrepare);

View File

@ -108,7 +108,7 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
.eq(WorkflowNode::getWorkflowId, workflow.getId())); .eq(WorkflowNode::getWorkflowId, workflow.getId()));
List<JobTaskBatch> alJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>() List<JobTaskBatch> alJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getWorkflowTaskBatchId, id)); .eq(JobTaskBatch::getWorkflowTaskBatchId, id).orderByDesc(JobTaskBatch::getId));
Map<Long, List<JobTaskBatch>> jobTaskBatchMap = alJobTaskBatchList.stream() Map<Long, List<JobTaskBatch>> jobTaskBatchMap = alJobTaskBatchList.stream()
.collect(Collectors.groupingBy(JobTaskBatch::getWorkflowNodeId)); .collect(Collectors.groupingBy(JobTaskBatch::getWorkflowNodeId));
@ -119,6 +119,11 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMap.get(nodeInfo.getId()); List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMap.get(nodeInfo.getId());
if (!CollectionUtils.isEmpty(jobTaskBatchList)) { if (!CollectionUtils.isEmpty(jobTaskBatchList)) {
nodeInfo.setJobBatchList(JobBatchResponseVOConverter.INSTANCE.jobTaskBatchToJobBatchResponseVOs(jobTaskBatchList)); nodeInfo.setJobBatchList(JobBatchResponseVOConverter.INSTANCE.jobTaskBatchToJobBatchResponseVOs(jobTaskBatchList));
// 取第最新的一条状态
nodeInfo.setTaskBatchStatus(jobTaskBatchList.get(0).getTaskBatchStatus());
} else {
// 前端显示待上游调度
nodeInfo.setTaskBatchStatus(-1);
} }
}) })
.collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, i -> i)); .collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, i -> i));

View File

@ -86,6 +86,7 @@ public class WorkflowServiceImpl implements WorkflowService {
log.info("图构建完成. graph:[{}]", graph); log.info("图构建完成. graph:[{}]", graph);
// 保存图信息 // 保存图信息
workflow.setVersion(null);
workflow.setFlowInfo(JsonUtil.toJsonString(GraphUtils.serializeGraphToJson(graph))); workflow.setFlowInfo(JsonUtil.toJsonString(GraphUtils.serializeGraphToJson(graph)));
Assert.isTrue(1 == workflowMapper.updateById(workflow), () -> new EasyRetryServerException("保存工作流图失败")); Assert.isTrue(1 == workflowMapper.updateById(workflow), () -> new EasyRetryServerException("保存工作流图失败"));
return true; return true;
@ -166,20 +167,21 @@ public class WorkflowServiceImpl implements WorkflowService {
// 获取DAG节点配置 // 获取DAG节点配置
NodeConfig nodeConfig = workflowRequestVO.getNodeConfig(); NodeConfig nodeConfig = workflowRequestVO.getNodeConfig();
int version = workflow.getVersion();
// 递归构建图 // 递归构建图
workflowHandler.buildGraph(Lists.newArrayList(SystemConstants.ROOT), new LinkedBlockingDeque<>(), workflowHandler.buildGraph(Lists.newArrayList(SystemConstants.ROOT), new LinkedBlockingDeque<>(),
workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph, workflow.getVersion() + 1); workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph, version + 1);
log.info("图构建完成. graph:[{}]", graph); log.info("图构建完成. graph:[{}]", graph);
// 保存图信息 // 保存图信息
workflow = new Workflow(); workflow = new Workflow();
workflow.setId(workflowRequestVO.getId()); workflow.setId(workflowRequestVO.getId());
workflow.setVersion(workflow.getVersion() + 1); workflow.setVersion(version);
workflow.setFlowInfo(JsonUtil.toJsonString(GraphUtils.serializeGraphToJson(graph))); workflow.setFlowInfo(JsonUtil.toJsonString(GraphUtils.serializeGraphToJson(graph)));
Assert.isTrue(workflowMapper.update(workflow, new LambdaQueryWrapper<Workflow>() Assert.isTrue(workflowMapper.update(workflow, new LambdaQueryWrapper<Workflow>()
.eq(Workflow::getId, workflow.getId()) .eq(Workflow::getId, workflow.getId())
.eq(Workflow::getVersion, workflow.getVersion()) .eq(Workflow::getVersion, version)
) > 0, () -> new EasyRetryServerException("更新失败")); ) > 0, () -> new EasyRetryServerException("更新失败"));
return Boolean.TRUE; return Boolean.TRUE;