feat: 2.6.0

1. 优化回调节点
This commit is contained in:
byteblogs168 2024-01-03 17:55:43 +08:00
parent 38fa8e12cf
commit f7ff968bc4
35 changed files with 129 additions and 64 deletions

View File

@ -9,6 +9,8 @@ import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.time.Duration;
/**
* 缓存本地的分布式锁的名称
*
@ -48,6 +50,7 @@ public class CacheLockRecord implements Lifecycle {
CACHE = CacheBuilder.newBuilder()
// 设置并发级别为cpu核心数
.concurrencyLevel(Runtime.getRuntime().availableProcessors())
.expireAfterWrite(Duration.ofHours(1))
.build();
}

View File

@ -20,7 +20,7 @@ public class CallbackConfig {
/**
* 请求类型
*/
private String contentType;
private Integer contentType;
/**
* 秘钥

View File

@ -0,0 +1,32 @@
package com.aizuda.easy.retry.server.common.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.springframework.http.MediaType;
/**
* @author: xiaowoniu
* @date : 2024-01-03
* @since : 2.6.0
*/
@AllArgsConstructor
@Getter
public enum ContentTypeEnum {
JSON(1, MediaType.APPLICATION_JSON),
FORM(2, MediaType.APPLICATION_FORM_URLENCODED)
;
private final Integer type;
private final MediaType mediaType;
public static ContentTypeEnum valueOf(Integer type) {
for (ContentTypeEnum contentTypeEnum : values()) {
if (contentTypeEnum.getType().equals(type)) {
return contentTypeEnum;
}
}
return ContentTypeEnum.JSON;
}
}

View File

@ -3,6 +3,7 @@ package com.aizuda.easy.retry.server.common.lock;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.cache.CacheLockRecord;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.LockConfig;
import com.aizuda.easy.retry.server.common.enums.UnLockOperationEnum;
@ -60,6 +61,7 @@ public class JdbcLockProvider extends AbstractLockProvider implements Lifecycle
return distributedLockMapper.update(distributedLock, new LambdaUpdateWrapper<DistributedLock>()
.eq(DistributedLock::getName, lockConfig.getLockName())) > 0;
} else {
CacheLockRecord.remove(lockConfig.getLockName());
return distributedLockMapper.delete(new LambdaUpdateWrapper<DistributedLock>()
.eq(DistributedLock::getName, lockConfig.getLockName())) > 0;
}

View File

@ -53,9 +53,9 @@ public class JobTaskPrepareDTO {
private boolean onlyTimeoutCheck;
/**
* 触发类似 1auto 2manual
* 执行策略 1auto 2manual 3workflow
*/
private Integer triggerType;
private Integer executeStrategy;
/**
* 工作流任务批次id

View File

@ -19,7 +19,7 @@ public class JobTimerTaskDTO {
private Long workflowNodeId;
/**
* 触发类似 1auto 2manual
* 执行策略 1auto 2manual 3workflow
*/
private Integer triggerType;
private Integer executeStrategy;
}

View File

@ -18,8 +18,8 @@ public class TaskExecuteDTO {
private Long workflowNodeId;
/**
* 触发类似 1auto 2manual
* 执行策略 1auto 2manual 3workflow
*/
private Integer triggerType;
private Integer executeStrategy;
}

View File

@ -20,9 +20,9 @@ public class WorkflowNodeTaskExecuteDTO {
*/
private Long workflowTaskBatchId;
/**
* 触发类似 1auto 2manual
* 执行策略 1auto 2manual 3workflow
*/
private Integer triggerType;
private Integer executeStrategy;
private Long parentId;

View File

@ -15,9 +15,9 @@ public class WorkflowTaskPrepareDTO {
private Long workflowId;
/**
* 触发类似 1auto 2manual
* 执行策略 1auto 2manual 3workflow
*/
private Integer triggerType;
private Integer executeStrategy;
/**
* 阻塞策略 1丢弃 2覆盖 3并行

View File

@ -15,7 +15,7 @@ public class WorkflowTimerTaskDTO {
private Long workflowId;
/**
* 触发类似 1auto 2manual
* 执行策略 1auto 2manual 3workflow
*/
private Integer triggerType;
private Integer executeStrategy;
}

View File

@ -28,9 +28,8 @@ public class WorkflowBlockStrategyContext extends BlockStrategyContext {
*/
private String flowInfo;
/**
* 触发类似 1auto 2manual
* 执行策略 1auto 2manual 3workflow
*/
private Integer triggerType;
private Integer executeStrategy;
}

View File

@ -90,7 +90,7 @@ public class JobExecutorActor extends AbstractActor {
LambdaQueryWrapper<Job> queryWrapper = new LambdaQueryWrapper<>();
// 自动地校验任务必须是开启状态手动触发无需校验
if (JobExecuteStrategyEnum.AUTO.getType().equals(taskExecute.getTriggerType())) {
if (JobExecuteStrategyEnum.AUTO.getType().equals(taskExecute.getExecuteStrategy())) {
queryWrapper.eq(Job::getJobStatus, StatusEnum.YES.getStatus());
}
@ -116,7 +116,7 @@ public class JobExecutorActor extends AbstractActor {
try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
taskExecuteDTO.setTriggerType(JobExecuteStrategyEnum.AUTO.getType());
taskExecuteDTO.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
taskExecuteDTO.setParentId(taskExecute.getWorkflowNodeId());
taskExecuteDTO.setTaskBatchId(taskExecute.getTaskBatchId());
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
@ -175,8 +175,8 @@ public class JobExecutorActor extends AbstractActor {
private void doHandlerResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) {
if (Objects.isNull(job)
|| JobExecuteStrategyEnum.MANUAL.getType().equals(taskExecuteDTO.getTriggerType())
|| JobExecuteStrategyEnum.WORKFLOW.getType().equals(taskExecuteDTO.getTriggerType())
|| JobExecuteStrategyEnum.MANUAL.getType().equals(taskExecuteDTO.getExecuteStrategy())
|| JobExecuteStrategyEnum.WORKFLOW.getType().equals(taskExecuteDTO.getExecuteStrategy())
// 是否是常驻任务
|| Objects.equals(StatusEnum.NO.getStatus(), job.getResident())
) {
@ -186,7 +186,7 @@ public class JobExecutorActor extends AbstractActor {
JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO();
jobTimerTaskDTO.setJobId(taskExecuteDTO.getJobId());
jobTimerTaskDTO.setTaskBatchId(taskExecuteDTO.getTaskBatchId());
jobTimerTaskDTO.setTriggerType(JobExecuteStrategyEnum.AUTO.getType());
jobTimerTaskDTO.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
ResidentJobTimerTask timerTask = new ResidentJobTimerTask(jobTimerTaskDTO, job);
WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(job.getTriggerType());

View File

@ -92,7 +92,7 @@ public class ScanJobTaskActor extends AbstractActor {
for (final JobTaskPrepareDTO waitExecJob : waitExecJobs) {
// 执行预处理阶段
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
waitExecJob.setTriggerType(JobExecuteStrategyEnum.AUTO.getType());
waitExecJob.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
actorRef.tell(waitExecJob, actorRef);
}
}

View File

@ -79,7 +79,7 @@ public class ScanWorkflowTaskActor extends AbstractActor {
for (final WorkflowTaskPrepareDTO waitExecTask : waitExecWorkflows) {
// 执行预处理阶段
ActorRef actorRef = ActorGenerator.workflowTaskPrepareActor();
waitExecTask.setTriggerType(JobExecuteStrategyEnum.AUTO.getType());
waitExecTask.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
actorRef.tell(waitExecTask, actorRef);
}
}

View File

@ -5,6 +5,7 @@ import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.FailStrategyEnum;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
@ -96,7 +97,9 @@ public class WorkflowExecutorActor extends AbstractActor {
);
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
.in(WorkflowNode::getId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId()))).orderByAsc(WorkflowNode::getPriorityLevel));
.eq(WorkflowNode::getWorkflowNodeStatus, StatusEnum.YES.getStatus())
.in(WorkflowNode::getId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId())))
.orderByAsc(WorkflowNode::getPriorityLevel));
Map<Long, List<JobTaskBatch>> jobTaskBatchMap = allJobTaskBatchList.stream().collect(Collectors.groupingBy(JobTaskBatch::getWorkflowNodeId));
Map<Long, WorkflowNode> workflowNodeMap = workflowNodes.stream().collect(Collectors.toMap(WorkflowNode::getId, i -> i));

View File

@ -11,6 +11,8 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import java.text.MessageFormat;
/**
* @author xiaowoniu
* @date 2023-12-24 08:15:19
@ -19,6 +21,7 @@ import org.springframework.transaction.annotation.Transactional;
@Slf4j
public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, InitializingBean {
private static final String KEY = "workflow_execute_{0}_{1}";
@Autowired
private DistributedLockHandler distributedLockHandler;
@Autowired
@ -27,7 +30,8 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
@Override
@Transactional
public void execute(WorkflowExecutorContext context) {
distributedLockHandler.lockAndProcessAfterUnlockDel("workflow_execute_" + context.getWorkflowNodeId(), "PT5S",
distributedLockHandler.lockAndProcessAfterUnlockDel(
MessageFormat.format(KEY, context.getWorkflowTaskBatchId(), context.getWorkflowNodeId()), "PT5S",
() -> {
Long total = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper<JobTaskBatch>()

View File

@ -10,7 +10,9 @@ import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.client.RequestInterceptor;
import com.aizuda.easy.retry.server.common.dto.CallbackConfig;
import com.aizuda.easy.retry.server.common.enums.ContentTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
@ -26,6 +28,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
@ -33,6 +36,7 @@ import org.springframework.web.client.RestTemplate;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
/**
@ -45,6 +49,7 @@ import java.util.Optional;
@Slf4j
public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
private static final String SECRET = "secret";
private static final String CALLBACK_TIMEOUT = "10";
private final RestTemplate restTemplate;
private final JobTaskMapper jobTaskMapper;
private final JobTaskBatchGenerator jobTaskBatchGenerator;
@ -64,15 +69,17 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
String message = StrUtil.EMPTY;
HttpHeaders requestHeaders = new HttpHeaders();
requestHeaders.set(HttpHeaders.CONTENT_TYPE, decisionConfig.getContentType());
requestHeaders.set(SECRET, decisionConfig.getSecret());
requestHeaders.setContentType(ContentTypeEnum.valueOf(decisionConfig.getContentType()).getMediaType());
// 设置回调超时时间
requestHeaders.set(RequestInterceptor.TIMEOUT_TIME, CALLBACK_TIMEOUT);
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getResultMessage, JobTask::getClientInfo)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()));
List<CallbackParamsDTO> callbackParamsList = WorkflowTaskConverter.INSTANCE.toCallbackParamsDTO(jobTasks);
String result = StrUtil.EMPTY;
String result = null;
try {
Map<String, String> uriVariables = new HashMap<>();
uriVariables.put(SECRET, decisionConfig.getSecret());
@ -92,7 +99,7 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context);
generatorContext.setTaskBatchStatus(taskBatchStatus);
generatorContext.setOperationReason(operationReason);
generatorContext.setJobId(SystemConstants.DECISION_JOB_ID);
generatorContext.setJobId(SystemConstants.CALLBACK_JOB_ID);
JobTaskBatch jobTaskBatch = jobTaskBatchGenerator.generateJobTaskBatch(generatorContext);
// 生成执行任务实例
@ -105,7 +112,7 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
jobTask.setArgsType(1);
jobTask.setArgsStr(Optional.ofNullable(context.getResult()).orElse(StrUtil.EMPTY));
jobTask.setTaskStatus(jobTaskStatus);
jobTask.setResultMessage(result);
jobTask.setResultMessage(Optional.ofNullable(result).orElse(StrUtil.EMPTY));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败"));
// 保存执行的日志

View File

@ -104,7 +104,7 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
taskExecuteDTO.setTriggerType(JobExecuteStrategyEnum.AUTO.getType());
taskExecuteDTO.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
taskExecuteDTO.setParentId(context.getWorkflowNodeId());
taskExecuteDTO.setTaskBatchId(context.getTaskBatchId());
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();

View File

@ -26,7 +26,7 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
protected void doExecute(WorkflowExecutorContext context) {
// 生成任务批次
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(context.getJob());
jobTaskPrepare.setTriggerType(JobExecuteStrategyEnum.WORKFLOW.getType());
jobTaskPrepare.setExecuteStrategy(JobExecuteStrategyEnum.WORKFLOW.getType());
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli() + DateUtils.toNowMilli() % 5000);
jobTaskPrepare.setWorkflowNodeId(context.getWorkflowNodeId());
jobTaskPrepare.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());

View File

@ -64,7 +64,7 @@ public class JobTaskBatchGenerator {
try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
taskExecuteDTO.setTriggerType(JobExecuteStrategyEnum.AUTO.getType());
taskExecuteDTO.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
taskExecuteDTO.setParentId(context.getWorkflowNodeId());
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
@ -102,7 +102,7 @@ public class JobTaskBatchGenerator {
JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO();
jobTimerTaskDTO.setTaskBatchId(jobTaskBatch.getId());
jobTimerTaskDTO.setJobId(context.getJobId());
jobTimerTaskDTO.setTriggerType(context.getTriggerType());
jobTimerTaskDTO.setExecuteStrategy(context.getExecuteStrategy());
jobTimerTaskDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
jobTimerTaskDTO.setWorkflowNodeId(context.getWorkflowNodeId());
JobTimerWheel.register(jobTaskBatch.getId(),

View File

@ -41,9 +41,9 @@ public class JobTaskBatchGeneratorContext {
private Integer taskBatchStatus;
/**
* 触发类似 1auto 2manual
* 执行策略 1auto 2manual 3workflow
*/
private Integer triggerType;
private Integer executeStrategy;
/**
* 工作流任务批次id

View File

@ -54,7 +54,7 @@ public class WorkflowBatchGenerator {
WorkflowTimerTaskDTO workflowTimerTaskDTO = new WorkflowTimerTaskDTO();
workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskBatch.getId());
workflowTimerTaskDTO.setWorkflowId(context.getWorkflowId());
workflowTimerTaskDTO.setTriggerType(context.getTriggerType());
workflowTimerTaskDTO.setExecuteStrategy(context.getExecuteStrategy());
JobTimerWheel.register(workflowTaskBatch.getId(),
new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
}

View File

@ -38,9 +38,9 @@ public class WorkflowTaskBatchGeneratorContext {
private Integer taskBatchStatus;
/**
* 触发类似 1auto 2manual
* 执行策略 1auto 2manual 3workflow
*/
private Integer triggerType;
private Integer executeStrategy;
/**
* 流程信息

View File

@ -80,7 +80,7 @@ public class JobTaskBatchHandler {
try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(completeJobBatchDTO.getWorkflowTaskBatchId());
taskExecuteDTO.setTriggerType(JobExecuteStrategyEnum.AUTO.getType());
taskExecuteDTO.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
taskExecuteDTO.setParentId(completeJobBatchDTO.getWorkflowNodeId());
// 这里取第一个的任务执行结果
taskExecuteDTO.setTaskBatchId(completeJobBatchDTO.getTaskBatchId());

View File

@ -5,6 +5,7 @@ import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
@ -84,6 +85,7 @@ public class WorkflowBatchHandler {
}
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
.eq(WorkflowNode::getWorkflowNodeStatus, StatusEnum.YES.getStatus())
.in(WorkflowNode::getId, graph.nodes()));
if (jobTaskBatches.size() < workflowNodes.size()) {
return false;
@ -115,40 +117,31 @@ public class WorkflowBatchHandler {
if (failCount > 0) {
taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
operationReason = JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason();
break;
}
if (stopCount > 0) {
taskStatus = JobTaskBatchStatusEnum.STOP.getStatus();
operationReason = JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason();
taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
break;
}
}
} else {
for (final Long predecessor : predecessors) {
List<JobTaskBatch> jobTaskBatcheList = map.getOrDefault(predecessor, Lists.newArrayList());
Map<Integer, Long> statusCountMap = jobTaskBatcheList.stream()
.collect(Collectors.groupingBy(JobTaskBatch::getTaskBatchStatus, Collectors.counting()));
long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L);
long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L);
if (failCount > 0) {
long cancelCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.CANCEL.getStatus(), 0L);
// 一个节点没有成功则认为失败
if (failCount > 0 || stopCount > 0 || cancelCount > 0) {
taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
operationReason = JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason();
break;
}
if (stopCount > 0) {
taskStatus = JobTaskBatchStatusEnum.STOP.getStatus();
operationReason = JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason();
break;
}
}
}
if (taskStatus != JobTaskBatchStatusEnum.SUCCESS.getStatus()) {
break;
}
@ -253,7 +246,7 @@ public class WorkflowBatchHandler {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(workflowTaskBatchId);
taskExecuteDTO.setWorkflowId(successor);
taskExecuteDTO.setTriggerType(1);
taskExecuteDTO.setExecuteStrategy(JobExecuteStrategyEnum.WORKFLOW.getType());
taskExecuteDTO.setParentId(parentId);
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
@ -264,7 +257,7 @@ public class WorkflowBatchHandler {
// 生成任务批次
Job job = jobMapper.selectById(jobTaskBatch.getJobId());
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job);
jobTaskPrepare.setTriggerType(JobExecuteStrategyEnum.WORKFLOW.getType());
jobTaskPrepare.setExecuteStrategy(JobExecuteStrategyEnum.WORKFLOW.getType());
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli());
jobTaskPrepare.setWorkflowNodeId(successor);
jobTaskPrepare.setWorkflowTaskBatchId(workflowTaskBatchId);

View File

@ -41,7 +41,7 @@ public class WaiWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler {
WorkflowTimerTaskDTO workflowTimerTaskDTO = new WorkflowTimerTaskDTO();
workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskPrepareDTO.getWorkflowTaskBatchId());
workflowTimerTaskDTO.setWorkflowId(workflowTaskPrepareDTO.getWorkflowId());
workflowTimerTaskDTO.setTriggerType(workflowTaskPrepareDTO.getTriggerType());
workflowTimerTaskDTO.setExecuteStrategy(workflowTaskPrepareDTO.getExecuteStrategy());
JobTimerWheel.register(workflowTaskPrepareDTO.getWorkflowTaskBatchId(),
new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
}

View File

@ -31,7 +31,7 @@ public class JobTimerTask implements TimerTask {
TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO();
taskExecuteDTO.setTaskBatchId(jobTimerTaskDTO.getTaskBatchId());
taskExecuteDTO.setJobId(jobTimerTaskDTO.getJobId());
taskExecuteDTO.setTriggerType(jobTimerTaskDTO.getTriggerType());
taskExecuteDTO.setExecuteStrategy(jobTimerTaskDTO.getExecuteStrategy());
taskExecuteDTO.setWorkflowTaskBatchId(jobTimerTaskDTO.getWorkflowTaskBatchId());
taskExecuteDTO.setWorkflowNodeId(jobTimerTaskDTO.getWorkflowNodeId());
ActorRef actorRef = ActorGenerator.jobTaskExecutorActor();

View File

@ -30,7 +30,7 @@ public class ResidentJobTimerTask implements TimerTask {
// 清除时间轮的缓存
JobTimerWheel.clearCache(jobTimerTaskDTO.getTaskBatchId());
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job);
jobTaskPrepare.setTriggerType(JobExecuteStrategyEnum.AUTO.getType());
jobTaskPrepare.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
// 执行预处理阶段
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
actorRef.tell(jobTaskPrepare, actorRef);

View File

@ -47,7 +47,7 @@ public class WorkflowTimerTask implements TimerTask {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(workflowTimerTaskDTO.getWorkflowTaskBatchId());
taskExecuteDTO.setWorkflowId(workflowTimerTaskDTO.getWorkflowId());
taskExecuteDTO.setTriggerType(workflowTimerTaskDTO.getTriggerType());
taskExecuteDTO.setExecuteStrategy(workflowTimerTaskDTO.getExecuteStrategy());
taskExecuteDTO.setParentId(SystemConstants.ROOT);
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);

View File

@ -36,6 +36,11 @@ public class WorkflowDetailResponseVO {
*/
private Integer triggerType;
/**
* 阻塞策略
*/
private Integer blockStrategy;
/**
* 触发间隔
*/
@ -109,6 +114,11 @@ public class WorkflowDetailResponseVO {
*/
private Integer failStrategy;
/**
* 任务批次状态
*/
private Integer taskBatchStatus;
/**
* 判定配置
*/

View File

@ -47,7 +47,7 @@ public class WorkflowResponseVO {
/**
* 任务执行时间
*/
private Long nextTriggerAt;
private LocalDateTime nextTriggerAt;
/**
* 创建时间

View File

@ -50,6 +50,11 @@ public interface WorkflowConverter {
List<WorkflowResponseVO> toWorkflowResponseVO(List<Workflow> workflowList);
@Mappings({
@Mapping(target = "nextTriggerAt", expression = "java(WorkflowConverter.toLocalDateTime(workflow.getNextTriggerAt()))")
})
WorkflowResponseVO toWorkflowResponseVO(Workflow workflow);
List<WorkflowBatchResponseVO> toWorkflowBatchResponseVO(List<WorkflowBatchResponseDO> workflowBatchResponseList);
@Mappings({

View File

@ -230,7 +230,7 @@ public class JobServiceImpl implements JobService {
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job);
// 设置now表示立即执行
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli());
jobTaskPrepare.setTriggerType(JobExecuteStrategyEnum.MANUAL.getType());
jobTaskPrepare.setExecuteStrategy(JobExecuteStrategyEnum.MANUAL.getType());
// 创建批次
jobPrePareHandler.handler(jobTaskPrepare);

View File

@ -108,7 +108,7 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
.eq(WorkflowNode::getWorkflowId, workflow.getId()));
List<JobTaskBatch> alJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getWorkflowTaskBatchId, id));
.eq(JobTaskBatch::getWorkflowTaskBatchId, id).orderByDesc(JobTaskBatch::getId));
Map<Long, List<JobTaskBatch>> jobTaskBatchMap = alJobTaskBatchList.stream()
.collect(Collectors.groupingBy(JobTaskBatch::getWorkflowNodeId));
@ -119,6 +119,11 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMap.get(nodeInfo.getId());
if (!CollectionUtils.isEmpty(jobTaskBatchList)) {
nodeInfo.setJobBatchList(JobBatchResponseVOConverter.INSTANCE.jobTaskBatchToJobBatchResponseVOs(jobTaskBatchList));
// 取第最新的一条状态
nodeInfo.setTaskBatchStatus(jobTaskBatchList.get(0).getTaskBatchStatus());
} else {
// 前端显示待上游调度
nodeInfo.setTaskBatchStatus(-1);
}
})
.collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, i -> i));

View File

@ -86,6 +86,7 @@ public class WorkflowServiceImpl implements WorkflowService {
log.info("图构建完成. graph:[{}]", graph);
// 保存图信息
workflow.setVersion(null);
workflow.setFlowInfo(JsonUtil.toJsonString(GraphUtils.serializeGraphToJson(graph)));
Assert.isTrue(1 == workflowMapper.updateById(workflow), () -> new EasyRetryServerException("保存工作流图失败"));
return true;
@ -166,20 +167,21 @@ public class WorkflowServiceImpl implements WorkflowService {
// 获取DAG节点配置
NodeConfig nodeConfig = workflowRequestVO.getNodeConfig();
int version = workflow.getVersion();
// 递归构建图
workflowHandler.buildGraph(Lists.newArrayList(SystemConstants.ROOT), new LinkedBlockingDeque<>(),
workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph, workflow.getVersion() + 1);
workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph, version + 1);
log.info("图构建完成. graph:[{}]", graph);
// 保存图信息
workflow = new Workflow();
workflow.setId(workflowRequestVO.getId());
workflow.setVersion(workflow.getVersion() + 1);
workflow.setVersion(version);
workflow.setFlowInfo(JsonUtil.toJsonString(GraphUtils.serializeGraphToJson(graph)));
Assert.isTrue(workflowMapper.update(workflow, new LambdaQueryWrapper<Workflow>()
.eq(Workflow::getId, workflow.getId())
.eq(Workflow::getVersion, workflow.getVersion())
.eq(Workflow::getVersion, version)
) > 0, () -> new EasyRetryServerException("更新失败"));
return Boolean.TRUE;