diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheLockRecord.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheLockRecord.java index f56daf34..c0009ac3 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheLockRecord.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheLockRecord.java @@ -9,6 +9,8 @@ import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; +import java.time.Duration; + /** * 缓存本地的分布式锁的名称 * @@ -48,6 +50,7 @@ public class CacheLockRecord implements Lifecycle { CACHE = CacheBuilder.newBuilder() // 设置并发级别为cpu核心数 .concurrencyLevel(Runtime.getRuntime().availableProcessors()) + .expireAfterWrite(Duration.ofHours(1)) .build(); } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/CallbackConfig.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/CallbackConfig.java index 62ee0606..3c71eb8f 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/CallbackConfig.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/CallbackConfig.java @@ -20,7 +20,7 @@ public class CallbackConfig { /** * 请求类型 */ - private String contentType; + private Integer contentType; /** * 秘钥 diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/ContentTypeEnum.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/ContentTypeEnum.java new file mode 100644 index 00000000..3cdc3856 --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/ContentTypeEnum.java @@ -0,0 +1,32 @@ +package com.aizuda.easy.retry.server.common.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.springframework.http.MediaType; + +/** + * @author: xiaowoniu + * @date : 2024-01-03 + * @since : 2.6.0 + */ +@AllArgsConstructor +@Getter +public enum ContentTypeEnum { + + JSON(1, MediaType.APPLICATION_JSON), + FORM(2, MediaType.APPLICATION_FORM_URLENCODED) + ; + + private final Integer type; + private final MediaType mediaType; + + public static ContentTypeEnum valueOf(Integer type) { + for (ContentTypeEnum contentTypeEnum : values()) { + if (contentTypeEnum.getType().equals(type)) { + return contentTypeEnum; + } + } + + return ContentTypeEnum.JSON; + } +} diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/lock/JdbcLockProvider.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/lock/JdbcLockProvider.java index 8cf03ebe..b86bf30b 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/lock/JdbcLockProvider.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/lock/JdbcLockProvider.java @@ -3,6 +3,7 @@ package com.aizuda.easy.retry.server.common.lock; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.common.Lifecycle; +import com.aizuda.easy.retry.server.common.cache.CacheLockRecord; import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.dto.LockConfig; import com.aizuda.easy.retry.server.common.enums.UnLockOperationEnum; @@ -60,6 +61,7 @@ public class JdbcLockProvider extends AbstractLockProvider implements Lifecycle return distributedLockMapper.update(distributedLock, new LambdaUpdateWrapper() .eq(DistributedLock::getName, lockConfig.getLockName())) > 0; } else { + CacheLockRecord.remove(lockConfig.getLockName()); return distributedLockMapper.delete(new LambdaUpdateWrapper() .eq(DistributedLock::getName, lockConfig.getLockName())) > 0; } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTaskPrepareDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTaskPrepareDTO.java index a7cda835..94d60ab5 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTaskPrepareDTO.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTaskPrepareDTO.java @@ -53,9 +53,9 @@ public class JobTaskPrepareDTO { private boolean onlyTimeoutCheck; /** - * 触发类似 1、auto 2、manual + * 执行策略 1、auto 2、manual 3、workflow */ - private Integer triggerType; + private Integer executeStrategy; /** * 工作流任务批次id diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTimerTaskDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTimerTaskDTO.java index 82bf7e54..6c654e06 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTimerTaskDTO.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobTimerTaskDTO.java @@ -19,7 +19,7 @@ public class JobTimerTaskDTO { private Long workflowNodeId; /** - * 触发类似 1、auto 2、manual + * 执行策略 1、auto 2、manual 3、workflow */ - private Integer triggerType; + private Integer executeStrategy; } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/TaskExecuteDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/TaskExecuteDTO.java index 23f96b6f..79c8962f 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/TaskExecuteDTO.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/TaskExecuteDTO.java @@ -18,8 +18,8 @@ public class TaskExecuteDTO { private Long workflowNodeId; /** - * 触发类似 1、auto 2、manual + * 执行策略 1、auto 2、manual 3、workflow */ - private Integer triggerType; + private Integer executeStrategy; } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowNodeTaskExecuteDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowNodeTaskExecuteDTO.java index 3c8d79aa..b0c0387c 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowNodeTaskExecuteDTO.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowNodeTaskExecuteDTO.java @@ -20,9 +20,9 @@ public class WorkflowNodeTaskExecuteDTO { */ private Long workflowTaskBatchId; /** - * 触发类似 1、auto 2、manual + * 执行策略 1、auto 2、manual 3、workflow */ - private Integer triggerType; + private Integer executeStrategy; private Long parentId; diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java index 6a179c0b..be88a99f 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java @@ -15,9 +15,9 @@ public class WorkflowTaskPrepareDTO { private Long workflowId; /** - * 触发类似 1、auto 2、manual + * 执行策略 1、auto 2、manual 3、workflow */ - private Integer triggerType; + private Integer executeStrategy; /** * 阻塞策略 1、丢弃 2、覆盖 3、并行 diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTimerTaskDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTimerTaskDTO.java index 0be435a9..da66601a 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTimerTaskDTO.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTimerTaskDTO.java @@ -15,7 +15,7 @@ public class WorkflowTimerTaskDTO { private Long workflowId; /** - * 触发类似 1、auto 2、manual + * 执行策略 1、auto 2、manual 3、workflow */ - private Integer triggerType; + private Integer executeStrategy; } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/WorkflowBlockStrategyContext.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/WorkflowBlockStrategyContext.java index fbfd413a..24fee1d7 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/WorkflowBlockStrategyContext.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/block/workflow/WorkflowBlockStrategyContext.java @@ -28,9 +28,8 @@ public class WorkflowBlockStrategyContext extends BlockStrategyContext { */ private String flowInfo; - /** - * 触发类似 1、auto 2、manual + * 执行策略 1、auto 2、manual 3、workflow */ - private Integer triggerType; + private Integer executeStrategy; } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java index 645b98cd..af57eba7 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java @@ -90,7 +90,7 @@ public class JobExecutorActor extends AbstractActor { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); // 自动地校验任务必须是开启状态,手动触发无需校验 - if (JobExecuteStrategyEnum.AUTO.getType().equals(taskExecute.getTriggerType())) { + if (JobExecuteStrategyEnum.AUTO.getType().equals(taskExecute.getExecuteStrategy())) { queryWrapper.eq(Job::getJobStatus, StatusEnum.YES.getStatus()); } @@ -116,7 +116,7 @@ public class JobExecutorActor extends AbstractActor { try { WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); taskExecuteDTO.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId()); - taskExecuteDTO.setTriggerType(JobExecuteStrategyEnum.AUTO.getType()); + taskExecuteDTO.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType()); taskExecuteDTO.setParentId(taskExecute.getWorkflowNodeId()); taskExecuteDTO.setTaskBatchId(taskExecute.getTaskBatchId()); ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); @@ -175,8 +175,8 @@ public class JobExecutorActor extends AbstractActor { private void doHandlerResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) { if (Objects.isNull(job) - || JobExecuteStrategyEnum.MANUAL.getType().equals(taskExecuteDTO.getTriggerType()) - || JobExecuteStrategyEnum.WORKFLOW.getType().equals(taskExecuteDTO.getTriggerType()) + || JobExecuteStrategyEnum.MANUAL.getType().equals(taskExecuteDTO.getExecuteStrategy()) + || JobExecuteStrategyEnum.WORKFLOW.getType().equals(taskExecuteDTO.getExecuteStrategy()) // 是否是常驻任务 || Objects.equals(StatusEnum.NO.getStatus(), job.getResident()) ) { @@ -186,7 +186,7 @@ public class JobExecutorActor extends AbstractActor { JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO(); jobTimerTaskDTO.setJobId(taskExecuteDTO.getJobId()); jobTimerTaskDTO.setTaskBatchId(taskExecuteDTO.getTaskBatchId()); - jobTimerTaskDTO.setTriggerType(JobExecuteStrategyEnum.AUTO.getType()); + jobTimerTaskDTO.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType()); ResidentJobTimerTask timerTask = new ResidentJobTimerTask(jobTimerTaskDTO, job); WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(job.getTriggerType()); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java index 83a2fcad..5b99281b 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java @@ -92,7 +92,7 @@ public class ScanJobTaskActor extends AbstractActor { for (final JobTaskPrepareDTO waitExecJob : waitExecJobs) { // 执行预处理阶段 ActorRef actorRef = ActorGenerator.jobTaskPrepareActor(); - waitExecJob.setTriggerType(JobExecuteStrategyEnum.AUTO.getType()); + waitExecJob.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType()); actorRef.tell(waitExecJob, actorRef); } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java index a76fc2df..e170c78b 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java @@ -79,7 +79,7 @@ public class ScanWorkflowTaskActor extends AbstractActor { for (final WorkflowTaskPrepareDTO waitExecTask : waitExecWorkflows) { // 执行预处理阶段 ActorRef actorRef = ActorGenerator.workflowTaskPrepareActor(); - waitExecTask.setTriggerType(JobExecuteStrategyEnum.AUTO.getType()); + waitExecTask.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType()); actorRef.tell(waitExecTask, actorRef); } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java index 8351917d..f798d3ab 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -5,6 +5,7 @@ import cn.hutool.core.lang.Assert; import com.aizuda.easy.retry.common.core.enums.FailStrategyEnum; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; @@ -96,7 +97,9 @@ public class WorkflowExecutorActor extends AbstractActor { ); List workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper() - .in(WorkflowNode::getId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId()))).orderByAsc(WorkflowNode::getPriorityLevel)); + .eq(WorkflowNode::getWorkflowNodeStatus, StatusEnum.YES.getStatus()) + .in(WorkflowNode::getId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId()))) + .orderByAsc(WorkflowNode::getPriorityLevel)); Map> jobTaskBatchMap = allJobTaskBatchList.stream().collect(Collectors.groupingBy(JobTaskBatch::getWorkflowNodeId)); Map workflowNodeMap = workflowNodes.stream().collect(Collectors.toMap(WorkflowNode::getId, i -> i)); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java index 9a9d5392..58453714 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/AbstractWorkflowExecutor.java @@ -11,6 +11,8 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.transaction.annotation.Transactional; +import java.text.MessageFormat; + /** * @author xiaowoniu * @date 2023-12-24 08:15:19 @@ -19,6 +21,7 @@ import org.springframework.transaction.annotation.Transactional; @Slf4j public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, InitializingBean { + private static final String KEY = "workflow_execute_{0}_{1}"; @Autowired private DistributedLockHandler distributedLockHandler; @Autowired @@ -27,7 +30,8 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init @Override @Transactional public void execute(WorkflowExecutorContext context) { - distributedLockHandler.lockAndProcessAfterUnlockDel("workflow_execute_" + context.getWorkflowNodeId(), "PT5S", + distributedLockHandler.lockAndProcessAfterUnlockDel( + MessageFormat.format(KEY, context.getWorkflowTaskBatchId(), context.getWorkflowNodeId()), "PT5S", () -> { Long total = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper() diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java index 5336ae45..8e7450a4 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/CallbackWorkflowExecutor.java @@ -10,7 +10,9 @@ import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum; import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.client.RequestInterceptor; import com.aizuda.easy.retry.server.common.dto.CallbackConfig; +import com.aizuda.easy.retry.server.common.enums.ContentTypeEnum; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO; import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter; @@ -26,6 +28,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; +import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; @@ -33,6 +36,7 @@ import org.springframework.web.client.RestTemplate; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; /** @@ -45,6 +49,7 @@ import java.util.Optional; @Slf4j public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor { private static final String SECRET = "secret"; + private static final String CALLBACK_TIMEOUT = "10"; private final RestTemplate restTemplate; private final JobTaskMapper jobTaskMapper; private final JobTaskBatchGenerator jobTaskBatchGenerator; @@ -64,15 +69,17 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor { String message = StrUtil.EMPTY; HttpHeaders requestHeaders = new HttpHeaders(); - requestHeaders.set(HttpHeaders.CONTENT_TYPE, decisionConfig.getContentType()); requestHeaders.set(SECRET, decisionConfig.getSecret()); + requestHeaders.setContentType(ContentTypeEnum.valueOf(decisionConfig.getContentType()).getMediaType()); + // 设置回调超时时间 + requestHeaders.set(RequestInterceptor.TIMEOUT_TIME, CALLBACK_TIMEOUT); List jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper() .select(JobTask::getResultMessage, JobTask::getClientInfo) .eq(JobTask::getTaskBatchId, context.getTaskBatchId())); List callbackParamsList = WorkflowTaskConverter.INSTANCE.toCallbackParamsDTO(jobTasks); - String result = StrUtil.EMPTY; + String result = null; try { Map uriVariables = new HashMap<>(); uriVariables.put(SECRET, decisionConfig.getSecret()); @@ -92,7 +99,7 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor { JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context); generatorContext.setTaskBatchStatus(taskBatchStatus); generatorContext.setOperationReason(operationReason); - generatorContext.setJobId(SystemConstants.DECISION_JOB_ID); + generatorContext.setJobId(SystemConstants.CALLBACK_JOB_ID); JobTaskBatch jobTaskBatch = jobTaskBatchGenerator.generateJobTaskBatch(generatorContext); // 生成执行任务实例 @@ -105,7 +112,7 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor { jobTask.setArgsType(1); jobTask.setArgsStr(Optional.ofNullable(context.getResult()).orElse(StrUtil.EMPTY)); jobTask.setTaskStatus(jobTaskStatus); - jobTask.setResultMessage(result); + jobTask.setResultMessage(Optional.ofNullable(result).orElse(StrUtil.EMPTY)); Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败")); // 保存执行的日志 diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java index bce97e87..47880ec2 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/ConditionWorkflowExecutor.java @@ -104,7 +104,7 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor { try { WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); - taskExecuteDTO.setTriggerType(JobExecuteStrategyEnum.AUTO.getType()); + taskExecuteDTO.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType()); taskExecuteDTO.setParentId(context.getWorkflowNodeId()); taskExecuteDTO.setTaskBatchId(context.getTaskBatchId()); ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java index 4e7dcf40..d6ccb237 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/workflow/JobTaskWorkflowExecutor.java @@ -26,7 +26,7 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor { protected void doExecute(WorkflowExecutorContext context) { // 生成任务批次 JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(context.getJob()); - jobTaskPrepare.setTriggerType(JobExecuteStrategyEnum.WORKFLOW.getType()); + jobTaskPrepare.setExecuteStrategy(JobExecuteStrategyEnum.WORKFLOW.getType()); jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli() + DateUtils.toNowMilli() % 5000); jobTaskPrepare.setWorkflowNodeId(context.getWorkflowNodeId()); jobTaskPrepare.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java index 6adb08fe..1152fb03 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGenerator.java @@ -64,7 +64,7 @@ public class JobTaskBatchGenerator { try { WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); - taskExecuteDTO.setTriggerType(JobExecuteStrategyEnum.AUTO.getType()); + taskExecuteDTO.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType()); taskExecuteDTO.setParentId(context.getWorkflowNodeId()); ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); actorRef.tell(taskExecuteDTO, actorRef); @@ -102,7 +102,7 @@ public class JobTaskBatchGenerator { JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO(); jobTimerTaskDTO.setTaskBatchId(jobTaskBatch.getId()); jobTimerTaskDTO.setJobId(context.getJobId()); - jobTimerTaskDTO.setTriggerType(context.getTriggerType()); + jobTimerTaskDTO.setExecuteStrategy(context.getExecuteStrategy()); jobTimerTaskDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); jobTimerTaskDTO.setWorkflowNodeId(context.getWorkflowNodeId()); JobTimerWheel.register(jobTaskBatch.getId(), diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGeneratorContext.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGeneratorContext.java index 5255ce5d..ca4aed29 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGeneratorContext.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/JobTaskBatchGeneratorContext.java @@ -41,9 +41,9 @@ public class JobTaskBatchGeneratorContext { private Integer taskBatchStatus; /** - * 触发类似 1、auto 2、manual + * 执行策略 1、auto 2、manual 3、workflow */ - private Integer triggerType; + private Integer executeStrategy; /** * 工作流任务批次id diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowBatchGenerator.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowBatchGenerator.java index 7441ed39..01c6856f 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowBatchGenerator.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowBatchGenerator.java @@ -54,7 +54,7 @@ public class WorkflowBatchGenerator { WorkflowTimerTaskDTO workflowTimerTaskDTO = new WorkflowTimerTaskDTO(); workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskBatch.getId()); workflowTimerTaskDTO.setWorkflowId(context.getWorkflowId()); - workflowTimerTaskDTO.setTriggerType(context.getTriggerType()); + workflowTimerTaskDTO.setExecuteStrategy(context.getExecuteStrategy()); JobTimerWheel.register(workflowTaskBatch.getId(), new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS); } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowTaskBatchGeneratorContext.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowTaskBatchGeneratorContext.java index 1dbf4c99..d75496ba 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowTaskBatchGeneratorContext.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/batch/WorkflowTaskBatchGeneratorContext.java @@ -38,9 +38,9 @@ public class WorkflowTaskBatchGeneratorContext { private Integer taskBatchStatus; /** - * 触发类似 1、auto 2、manual + * 执行策略 1、auto 2、manual 3、workflow */ - private Integer triggerType; + private Integer executeStrategy; /** * 流程信息 diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java index 2ed88140..244aeb65 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java @@ -80,7 +80,7 @@ public class JobTaskBatchHandler { try { WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); taskExecuteDTO.setWorkflowTaskBatchId(completeJobBatchDTO.getWorkflowTaskBatchId()); - taskExecuteDTO.setTriggerType(JobExecuteStrategyEnum.AUTO.getType()); + taskExecuteDTO.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType()); taskExecuteDTO.setParentId(completeJobBatchDTO.getWorkflowNodeId()); // 这里取第一个的任务执行结果 taskExecuteDTO.setTaskBatchId(completeJobBatchDTO.getTaskBatchId()); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java index 0670c807..de583ec8 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/WorkflowBatchHandler.java @@ -5,6 +5,7 @@ import cn.hutool.core.lang.Assert; import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum; @@ -84,6 +85,7 @@ public class WorkflowBatchHandler { } List workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper() + .eq(WorkflowNode::getWorkflowNodeStatus, StatusEnum.YES.getStatus()) .in(WorkflowNode::getId, graph.nodes())); if (jobTaskBatches.size() < workflowNodes.size()) { return false; @@ -115,40 +117,31 @@ public class WorkflowBatchHandler { if (failCount > 0) { taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus(); - operationReason = JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason(); break; } if (stopCount > 0) { - taskStatus = JobTaskBatchStatusEnum.STOP.getStatus(); - operationReason = JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason(); + taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus(); break; } } } else { - for (final Long predecessor : predecessors) { List jobTaskBatcheList = map.getOrDefault(predecessor, Lists.newArrayList()); Map statusCountMap = jobTaskBatcheList.stream() .collect(Collectors.groupingBy(JobTaskBatch::getTaskBatchStatus, Collectors.counting())); long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L); long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L); - if (failCount > 0) { + long cancelCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.CANCEL.getStatus(), 0L); + // 一个节点没有成功则认为失败 + if (failCount > 0 || stopCount > 0 || cancelCount > 0) { taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus(); - operationReason = JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason(); - break; - } - - if (stopCount > 0) { - taskStatus = JobTaskBatchStatusEnum.STOP.getStatus(); - operationReason = JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason(); break; } } } - if (taskStatus != JobTaskBatchStatusEnum.SUCCESS.getStatus()) { break; } @@ -253,7 +246,7 @@ public class WorkflowBatchHandler { WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); taskExecuteDTO.setWorkflowTaskBatchId(workflowTaskBatchId); taskExecuteDTO.setWorkflowId(successor); - taskExecuteDTO.setTriggerType(1); + taskExecuteDTO.setExecuteStrategy(JobExecuteStrategyEnum.WORKFLOW.getType()); taskExecuteDTO.setParentId(parentId); ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); actorRef.tell(taskExecuteDTO, actorRef); @@ -264,7 +257,7 @@ public class WorkflowBatchHandler { // 生成任务批次 Job job = jobMapper.selectById(jobTaskBatch.getJobId()); JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job); - jobTaskPrepare.setTriggerType(JobExecuteStrategyEnum.WORKFLOW.getType()); + jobTaskPrepare.setExecuteStrategy(JobExecuteStrategyEnum.WORKFLOW.getType()); jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli()); jobTaskPrepare.setWorkflowNodeId(successor); jobTaskPrepare.setWorkflowTaskBatchId(workflowTaskBatchId); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/WaiWorkflowPrepareHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/WaiWorkflowPrepareHandler.java index 936f89db..64d73589 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/WaiWorkflowPrepareHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/prepare/workflow/WaiWorkflowPrepareHandler.java @@ -41,7 +41,7 @@ public class WaiWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler { WorkflowTimerTaskDTO workflowTimerTaskDTO = new WorkflowTimerTaskDTO(); workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskPrepareDTO.getWorkflowTaskBatchId()); workflowTimerTaskDTO.setWorkflowId(workflowTaskPrepareDTO.getWorkflowId()); - workflowTimerTaskDTO.setTriggerType(workflowTaskPrepareDTO.getTriggerType()); + workflowTimerTaskDTO.setExecuteStrategy(workflowTaskPrepareDTO.getExecuteStrategy()); JobTimerWheel.register(workflowTaskPrepareDTO.getWorkflowTaskBatchId(), new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS); } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java index d1ccbfcf..90415e2f 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java @@ -31,7 +31,7 @@ public class JobTimerTask implements TimerTask { TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO(); taskExecuteDTO.setTaskBatchId(jobTimerTaskDTO.getTaskBatchId()); taskExecuteDTO.setJobId(jobTimerTaskDTO.getJobId()); - taskExecuteDTO.setTriggerType(jobTimerTaskDTO.getTriggerType()); + taskExecuteDTO.setExecuteStrategy(jobTimerTaskDTO.getExecuteStrategy()); taskExecuteDTO.setWorkflowTaskBatchId(jobTimerTaskDTO.getWorkflowTaskBatchId()); taskExecuteDTO.setWorkflowNodeId(jobTimerTaskDTO.getWorkflowNodeId()); ActorRef actorRef = ActorGenerator.jobTaskExecutorActor(); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/ResidentJobTimerTask.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/ResidentJobTimerTask.java index b5d8478d..5373e1db 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/ResidentJobTimerTask.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/ResidentJobTimerTask.java @@ -30,7 +30,7 @@ public class ResidentJobTimerTask implements TimerTask { // 清除时间轮的缓存 JobTimerWheel.clearCache(jobTimerTaskDTO.getTaskBatchId()); JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job); - jobTaskPrepare.setTriggerType(JobExecuteStrategyEnum.AUTO.getType()); + jobTaskPrepare.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType()); // 执行预处理阶段 ActorRef actorRef = ActorGenerator.jobTaskPrepareActor(); actorRef.tell(jobTaskPrepare, actorRef); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/WorkflowTimerTask.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/WorkflowTimerTask.java index 581ad2bc..e58cf03c 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/WorkflowTimerTask.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/WorkflowTimerTask.java @@ -47,7 +47,7 @@ public class WorkflowTimerTask implements TimerTask { WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); taskExecuteDTO.setWorkflowTaskBatchId(workflowTimerTaskDTO.getWorkflowTaskBatchId()); taskExecuteDTO.setWorkflowId(workflowTimerTaskDTO.getWorkflowId()); - taskExecuteDTO.setTriggerType(workflowTimerTaskDTO.getTriggerType()); + taskExecuteDTO.setExecuteStrategy(workflowTimerTaskDTO.getExecuteStrategy()); taskExecuteDTO.setParentId(SystemConstants.ROOT); ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); actorRef.tell(taskExecuteDTO, actorRef); diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/model/response/WorkflowDetailResponseVO.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/model/response/WorkflowDetailResponseVO.java index 6854781e..7fe8ff3b 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/model/response/WorkflowDetailResponseVO.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/model/response/WorkflowDetailResponseVO.java @@ -36,6 +36,11 @@ public class WorkflowDetailResponseVO { */ private Integer triggerType; + /** + * 阻塞策略 + */ + private Integer blockStrategy; + /** * 触发间隔 */ @@ -109,6 +114,11 @@ public class WorkflowDetailResponseVO { */ private Integer failStrategy; + /** + * 任务批次状态 + */ + private Integer taskBatchStatus; + /** * 判定配置 */ diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/model/response/WorkflowResponseVO.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/model/response/WorkflowResponseVO.java index 02dd0221..6768092e 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/model/response/WorkflowResponseVO.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/model/response/WorkflowResponseVO.java @@ -47,7 +47,7 @@ public class WorkflowResponseVO { /** * 任务执行时间 */ - private Long nextTriggerAt; + private LocalDateTime nextTriggerAt; /** * 创建时间 diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/convert/WorkflowConverter.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/convert/WorkflowConverter.java index 17cc3d40..bcdab64c 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/convert/WorkflowConverter.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/convert/WorkflowConverter.java @@ -50,6 +50,11 @@ public interface WorkflowConverter { List toWorkflowResponseVO(List workflowList); + @Mappings({ + @Mapping(target = "nextTriggerAt", expression = "java(WorkflowConverter.toLocalDateTime(workflow.getNextTriggerAt()))") + }) + WorkflowResponseVO toWorkflowResponseVO(Workflow workflow); + List toWorkflowBatchResponseVO(List workflowBatchResponseList); @Mappings({ diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java index 96b10147..204c11e5 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java @@ -230,7 +230,7 @@ public class JobServiceImpl implements JobService { JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job); // 设置now表示立即执行 jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli()); - jobTaskPrepare.setTriggerType(JobExecuteStrategyEnum.MANUAL.getType()); + jobTaskPrepare.setExecuteStrategy(JobExecuteStrategyEnum.MANUAL.getType()); // 创建批次 jobPrePareHandler.handler(jobTaskPrepare); diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowBatchServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowBatchServiceImpl.java index 0ff6ce0c..c51fc858 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowBatchServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowBatchServiceImpl.java @@ -108,7 +108,7 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService { .eq(WorkflowNode::getWorkflowId, workflow.getId())); List alJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper() - .eq(JobTaskBatch::getWorkflowTaskBatchId, id)); + .eq(JobTaskBatch::getWorkflowTaskBatchId, id).orderByDesc(JobTaskBatch::getId)); Map> jobTaskBatchMap = alJobTaskBatchList.stream() .collect(Collectors.groupingBy(JobTaskBatch::getWorkflowNodeId)); @@ -119,6 +119,11 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService { List jobTaskBatchList = jobTaskBatchMap.get(nodeInfo.getId()); if (!CollectionUtils.isEmpty(jobTaskBatchList)) { nodeInfo.setJobBatchList(JobBatchResponseVOConverter.INSTANCE.jobTaskBatchToJobBatchResponseVOs(jobTaskBatchList)); + // 取第最新的一条状态 + nodeInfo.setTaskBatchStatus(jobTaskBatchList.get(0).getTaskBatchStatus()); + } else { + // 前端显示待上游调度 + nodeInfo.setTaskBatchStatus(-1); } }) .collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, i -> i)); diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java index fa92d970..f33b33b4 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java @@ -86,6 +86,7 @@ public class WorkflowServiceImpl implements WorkflowService { log.info("图构建完成. graph:[{}]", graph); // 保存图信息 + workflow.setVersion(null); workflow.setFlowInfo(JsonUtil.toJsonString(GraphUtils.serializeGraphToJson(graph))); Assert.isTrue(1 == workflowMapper.updateById(workflow), () -> new EasyRetryServerException("保存工作流图失败")); return true; @@ -166,20 +167,21 @@ public class WorkflowServiceImpl implements WorkflowService { // 获取DAG节点配置 NodeConfig nodeConfig = workflowRequestVO.getNodeConfig(); + int version = workflow.getVersion(); // 递归构建图 workflowHandler.buildGraph(Lists.newArrayList(SystemConstants.ROOT), new LinkedBlockingDeque<>(), - workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph, workflow.getVersion() + 1); + workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph, version + 1); log.info("图构建完成. graph:[{}]", graph); // 保存图信息 workflow = new Workflow(); workflow.setId(workflowRequestVO.getId()); - workflow.setVersion(workflow.getVersion() + 1); + workflow.setVersion(version); workflow.setFlowInfo(JsonUtil.toJsonString(GraphUtils.serializeGraphToJson(graph))); Assert.isTrue(workflowMapper.update(workflow, new LambdaQueryWrapper() .eq(Workflow::getId, workflow.getId()) - .eq(Workflow::getVersion, workflow.getVersion()) + .eq(Workflow::getVersion, version) ) > 0, () -> new EasyRetryServerException("更新失败")); return Boolean.TRUE;