feat: 2.6.0

1. 变更executeStrategy 为 taskExecutorScene
2. 任务批次新增taskType字段
This commit is contained in:
byteblogs168 2024-01-09 15:15:27 +08:00
parent 9acba06cc8
commit 73d0c7de07
34 changed files with 175 additions and 109 deletions

View File

@ -58,7 +58,7 @@ Easy-RETRY 是一个针对业务系统重试流量的治理平台,其自身具
- [HelloWorld](https://www.easyretry.com/pages/da9ecc/)
## 应用实例
- [easy-retry-demo](https://gitee.com/zhangyutongxue/easy-retry-demo)
- [easy-retry-demo](https://gitee.com/byteblogs168/easy-retry-demo.git)
## 期望
欢迎提出更好的意见,帮助完善 Easy-Retry

View File

@ -0,0 +1,20 @@
package com.aizuda.easy.retry.common.core.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author: xiaowoniu
* @date : 2024-01-09
* @since : 2.6.0
*/
@AllArgsConstructor
@Getter
public enum JobArgsTypeEnum {
TEXT(1, "文本"),
JSON(2, "JSON");
private final Integer argsType;
private final String desc;
}

View File

@ -78,6 +78,11 @@ public class JobTaskBatch implements Serializable {
*/
private Long executionAt;
/**
* 任务类型 3JOB任务 4WORKFLOW任务
*/
private Integer taskType;
/**
* 操作原因
*/

View File

@ -20,6 +20,7 @@
FROM job_task_batch a join job b on a.job_id = b.id
<where>
a.namespace_id = #{queryDO.namespaceId}
and a.task_type = 3
<if test="queryDO.jobId != null">
and a.job_id = #{queryDO.jobId}
</if>

View File

@ -13,14 +13,15 @@ import lombok.Getter;
*/
@Getter
@AllArgsConstructor
public enum JobExecuteStrategyEnum {
AUTO(1, "自动执行"),
MANUAL(2, "手动执行"),
WORKFLOW(3, "DAG执行"),
public enum JobTaskExecutorSceneEnum {
AUTO_JOB(1, TaskTypeEnum.JOB),
MANUAL_JOB(2, TaskTypeEnum.JOB),
AUTO_WORKFLOW(3, TaskTypeEnum.WORKFLOW),
MANUAL_WORKFLOW(4, TaskTypeEnum.WORKFLOW),
;
private final Integer type;
private final String desc;
private final TaskTypeEnum taskType;
/**
* 根据给定的类型获取对应的触发器类型枚举
@ -29,10 +30,10 @@ public enum JobExecuteStrategyEnum {
* @return 对应的触发器类型枚举
* @throws EasyRetryServerException 当给定的类型不是有效的枚举类型时抛出异常
*/
public static JobExecuteStrategyEnum get(Integer type) {
for (JobExecuteStrategyEnum jobExecuteStrategyEnum : JobExecuteStrategyEnum.values()) {
if(jobExecuteStrategyEnum.getType().equals(type)) {
return jobExecuteStrategyEnum;
public static JobTaskExecutorSceneEnum get(Integer type) {
for (JobTaskExecutorSceneEnum jobTaskExecutorSceneEnum : JobTaskExecutorSceneEnum.values()) {
if(jobTaskExecutorSceneEnum.getType().equals(type)) {
return jobTaskExecutorSceneEnum;
}
}

View File

@ -2,8 +2,6 @@ package com.aizuda.easy.retry.server.job.task.dto;
import lombok.Data;
import java.time.LocalDateTime;
/**
* @author www.byteblogs.com
* @date 2023-09-25 22:42:21
@ -55,7 +53,7 @@ public class JobTaskPrepareDTO {
/**
* 执行策略 1auto 2manual 3workflow
*/
private Integer executeStrategy;
private Integer taskExecutorScene;
/**
* 工作流任务批次id

View File

@ -21,5 +21,5 @@ public class JobTimerTaskDTO {
/**
* 执行策略 1auto 2manual 3workflow
*/
private Integer executeStrategy;
private Integer taskExecutorScene;
}

View File

@ -20,6 +20,6 @@ public class TaskExecuteDTO {
/**
* 执行策略 1auto 2manual 3workflow
*/
private Integer executeStrategy;
private Integer taskExecutorScene;
}

View File

@ -22,7 +22,7 @@ public class WorkflowNodeTaskExecuteDTO {
/**
* 执行策略 1auto 2manual 3workflow
*/
private Integer executeStrategy;
private Integer taskExecutorScene;
private Long parentId;

View File

@ -17,7 +17,7 @@ public class WorkflowTaskPrepareDTO {
/**
* 执行策略 1auto 2manual 3workflow
*/
private Integer executeStrategy;
private Integer taskExecutorScene;
/**
* 阻塞策略 1丢弃 2覆盖 3并行

View File

@ -17,5 +17,5 @@ public class WorkflowTimerTaskDTO {
/**
* 执行策略 1auto 2manual 3workflow
*/
private Integer executeStrategy;
private Integer taskExecutorScene;
}

View File

@ -1,16 +1,12 @@
package com.aizuda.easy.retry.server.job.task.support.block.workflow;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.block.job.BlockStrategies.BlockStrategyEnum;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowBatchGenerator;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.job.task.support.block.job.BlockStrategies.BlockStrategyEnum;
import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

View File

@ -31,5 +31,5 @@ public class WorkflowBlockStrategyContext extends BlockStrategyContext {
/**
* 执行策略 1auto 2manual 3workflow
*/
private Integer executeStrategy;
private Integer taskExecutorScene;
}

View File

@ -12,7 +12,7 @@ import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
@ -91,7 +91,7 @@ public class JobExecutorActor extends AbstractActor {
LambdaQueryWrapper<Job> queryWrapper = new LambdaQueryWrapper<>();
// 自动地校验任务必须是开启状态手动触发无需校验
if (JobExecuteStrategyEnum.AUTO.getType().equals(taskExecute.getExecuteStrategy())) {
if (JobTaskExecutorSceneEnum.AUTO_JOB.getType().equals(taskExecute.getTaskExecutorScene())) {
queryWrapper.eq(Job::getJobStatus, StatusEnum.YES.getStatus());
}
@ -117,7 +117,7 @@ public class JobExecutorActor extends AbstractActor {
try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
taskExecuteDTO.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
taskExecuteDTO.setTaskExecutorScene(taskExecute.getTaskExecutorScene());
taskExecuteDTO.setParentId(taskExecute.getWorkflowNodeId());
taskExecuteDTO.setTaskBatchId(taskExecute.getTaskBatchId());
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
@ -176,8 +176,8 @@ public class JobExecutorActor extends AbstractActor {
private void doHandlerResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) {
if (Objects.isNull(job)
|| JobExecuteStrategyEnum.MANUAL.getType().equals(taskExecuteDTO.getExecuteStrategy())
|| JobExecuteStrategyEnum.WORKFLOW.getType().equals(taskExecuteDTO.getExecuteStrategy())
|| JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(taskExecuteDTO.getTaskExecutorScene())
|| JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene())
// 是否是常驻任务
|| Objects.equals(StatusEnum.NO.getStatus(), job.getResident())
) {
@ -187,7 +187,7 @@ public class JobExecutorActor extends AbstractActor {
JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO();
jobTimerTaskDTO.setJobId(taskExecuteDTO.getJobId());
jobTimerTaskDTO.setTaskBatchId(taskExecuteDTO.getTaskBatchId());
jobTimerTaskDTO.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
jobTimerTaskDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_JOB.getType());
ResidentJobTimerTask timerTask = new ResidentJobTimerTask(jobTimerTaskDTO, job);
WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(job.getTriggerType());

View File

@ -11,7 +11,7 @@ import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
import com.aizuda.easy.retry.server.common.dto.ScanTask;
import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.common.enums.TriggerTypeEnum;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.util.DateUtils;
@ -24,7 +24,6 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
@ -92,7 +91,7 @@ public class ScanJobTaskActor extends AbstractActor {
for (final JobTaskPrepareDTO waitExecJob : waitExecJobs) {
// 执行预处理阶段
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
waitExecJob.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
waitExecJob.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_JOB.getType());
actorRef.tell(waitExecJob, actorRef);
}
}

View File

@ -11,7 +11,7 @@ import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
import com.aizuda.easy.retry.server.common.dto.ScanTask;
import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
@ -60,7 +60,7 @@ public class ScanWorkflowTaskActor extends AbstractActor {
}
private void doScan(ScanTask scanTask) {
long total = PartitionTaskUtils.process(startId -> listAvailableJobs(startId, scanTask),
PartitionTaskUtils.process(startId -> listAvailableJobs(startId, scanTask),
this::processPartitionTasks, 0);
}
@ -79,7 +79,7 @@ public class ScanWorkflowTaskActor extends AbstractActor {
for (final WorkflowTaskPrepareDTO waitExecTask : waitExecWorkflows) {
// 执行预处理阶段
ActorRef actorRef = ActorGenerator.workflowTaskPrepareActor();
waitExecTask.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
waitExecTask.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_JOB.getType());
actorRef.tell(waitExecTask, actorRef);
}
}

View File

@ -152,6 +152,7 @@ public class WorkflowExecutorActor extends AbstractActor {
context.setParentWorkflowNodeId(taskExecute.getParentId());
context.setEvaluationResult(evaluationResult);
context.setTaskBatchId(taskExecute.getTaskBatchId());
context.setTaskExecutorScene(taskExecute.getTaskExecutorScene());
workflowExecutor.execute(context);

View File

@ -3,21 +3,21 @@ package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobArgsTypeEnum;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.LockExecutor;
import com.aizuda.easy.retry.server.job.task.support.WorkflowExecutor;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.job.task.support.handler.DistributedLockHandler;
import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
@ -26,7 +26,12 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
import java.io.IOException;
import java.text.MessageFormat;
@ -52,6 +57,8 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
private WorkflowBatchHandler workflowBatchHandler;
@Autowired
private JobTaskMapper jobTaskMapper;
@Autowired
private TransactionTemplate transactionTemplate;
@Override
@Transactional
@ -69,16 +76,20 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
return;
}
if (!preValidate(context)) {
return;
}
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {
beforeExecute(context);
if (!preValidate(context)) {
return;
}
beforeExecute(context);
doExecute(context);
afterExecute(context);
doExecute(context);
afterExecute(context);
}
});
});
}
@ -91,6 +102,7 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
generatorContext.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
generatorContext.setOperationReason(JobOperationReasonEnum.WORKFLOW_NODE_NO_OPERATION_REQUIRED.getReason());
generatorContext.setJobId(context.getJobId());
generatorContext.setTaskExecutorScene(context.getTaskExecutorScene());
jobTaskBatchGenerator.generateJobTaskBatch(generatorContext);
try {
workflowBatchHandler.complete(context.getWorkflowTaskBatchId());
@ -118,17 +130,22 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
}
protected void workflowTaskExecutor(WorkflowExecutorContext context) {
try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
taskExecuteDTO.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
taskExecuteDTO.setParentId(context.getWorkflowNodeId());
taskExecuteDTO.setTaskBatchId(context.getTaskBatchId());
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
} catch (Exception e) {
log.error("工作流执行失败", e);
}
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCompletion(int status) {
try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
taskExecuteDTO.setTaskExecutorScene(context.getTaskExecutorScene());
taskExecuteDTO.setParentId(context.getWorkflowNodeId());
taskExecuteDTO.setTaskBatchId(context.getTaskBatchId());
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
} catch (Exception e) {
log.error("工作流执行失败", e);
}
}
});
}
protected JobTask generateJobTask(WorkflowExecutorContext context, JobTaskBatch jobTaskBatch) {
@ -139,7 +156,7 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
jobTask.setJobId(context.getJobId());
jobTask.setClientInfo(StrUtil.EMPTY);
jobTask.setTaskBatchId(jobTaskBatch.getId());
jobTask.setArgsType(1);
jobTask.setArgsType(JobArgsTypeEnum.TEXT.getArgsType());
jobTask.setArgsStr(Optional.ofNullable(context.getTaskResult()).orElse(StrUtil.EMPTY));
jobTask.setTaskStatus(context.getJobTaskStatus());
jobTask.setResultMessage(String.valueOf(context.getEvaluationResult()));

View File

@ -1,26 +1,15 @@
package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Objects;
/**
* @author xiaowoniu
* @date 2023-12-24 08:09:14
@ -54,7 +43,7 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
protected void doExecute(WorkflowExecutorContext context) {
// 生成任务批次
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(context.getJob());
jobTaskPrepare.setExecuteStrategy(JobExecuteStrategyEnum.WORKFLOW.getType());
jobTaskPrepare.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType());
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli() + DateUtils.toNowMilli() % 1000);
jobTaskPrepare.setWorkflowNodeId(context.getWorkflowNodeId());
jobTaskPrepare.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());

View File

@ -92,4 +92,10 @@ public class WorkflowExecutorContext {
* 日志信息
*/
private String logMessage;
/**
* 执行策略 1auto 2manual 3workflow
*/
private Integer taskExecutorScene;
}

View File

@ -5,7 +5,7 @@ import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtils;
@ -49,6 +49,9 @@ public class JobTaskBatchGenerator {
// 生成一个新的任务
JobTaskBatch jobTaskBatch = JobTaskConverter.INSTANCE.toJobTaskBatch(context);
JobTaskExecutorSceneEnum jobTaskExecutorSceneEnum = JobTaskExecutorSceneEnum.get(
context.getTaskExecutorScene());
jobTaskBatch.setTaskType(jobTaskExecutorSceneEnum.getTaskType().getType());
jobTaskBatch.setCreateDt(LocalDateTime.now());
// 无执行的节点
@ -60,13 +63,12 @@ public class JobTaskBatchGenerator {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCompletion(int status) {
if (Objects.nonNull(context.getWorkflowNodeId()) && Objects.nonNull(context.getWorkflowTaskBatchId())) {
// 若是工作流则开启下一个任务
try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
taskExecuteDTO.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
taskExecuteDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_JOB.getType());
taskExecuteDTO.setParentId(context.getWorkflowNodeId());
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
@ -104,7 +106,7 @@ public class JobTaskBatchGenerator {
JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO();
jobTimerTaskDTO.setTaskBatchId(jobTaskBatch.getId());
jobTimerTaskDTO.setJobId(context.getJobId());
jobTimerTaskDTO.setExecuteStrategy(context.getExecuteStrategy());
jobTimerTaskDTO.setTaskExecutorScene(context.getTaskExecutorScene());
jobTimerTaskDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
jobTimerTaskDTO.setWorkflowNodeId(context.getWorkflowNodeId());
JobTimerWheel.register(TaskTypeEnum.JOB.getType(), jobTaskBatch.getId(),

View File

@ -2,8 +2,6 @@ package com.aizuda.easy.retry.server.job.task.support.generator.batch;
import lombok.Data;
import java.time.LocalDateTime;
/**
* @author www.byteblogs.com
* @date 2023-10-02 13:12:48
@ -43,7 +41,7 @@ public class JobTaskBatchGeneratorContext {
/**
* 执行策略 1auto 2manual 3workflow
*/
private Integer executeStrategy;
private Integer taskExecutorScene;
/**
* 工作流任务批次id

View File

@ -1,16 +1,12 @@
package com.aizuda.easy.retry.server.job.task.support.generator.batch;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTimerTaskDTO;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerTask;
import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerWheel;
import com.aizuda.easy.retry.server.job.task.support.timer.WorkflowTimerTask;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
@ -19,7 +15,6 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@ -55,7 +50,7 @@ public class WorkflowBatchGenerator {
WorkflowTimerTaskDTO workflowTimerTaskDTO = new WorkflowTimerTaskDTO();
workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskBatch.getId());
workflowTimerTaskDTO.setWorkflowId(context.getWorkflowId());
workflowTimerTaskDTO.setExecuteStrategy(context.getExecuteStrategy());
workflowTimerTaskDTO.setTaskExecutorScene(context.getTaskExecutorScene());
JobTimerWheel.register(TaskTypeEnum.WORKFLOW.getType(), workflowTaskBatch.getId(),
new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
}

View File

@ -40,7 +40,7 @@ public class WorkflowTaskBatchGeneratorContext {
/**
* 执行策略 1auto 2manual 3workflow
*/
private Integer executeStrategy;
private Integer taskExecutorScene;
/**
* 流程信息

View File

@ -1,9 +1,7 @@
package com.aizuda.easy.retry.server.job.task.support.handler;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.LockConfig;
import com.aizuda.easy.retry.server.common.enums.UnLockOperationEnum;

View File

@ -5,7 +5,7 @@ import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.job.task.dto.CompleteJobBatchDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.event.JobTaskFailAlarmEvent;
@ -80,7 +80,7 @@ public class JobTaskBatchHandler {
try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(completeJobBatchDTO.getWorkflowTaskBatchId());
taskExecuteDTO.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
taskExecuteDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType());
taskExecuteDTO.setParentId(completeJobBatchDTO.getWorkflowNodeId());
// 这里取第一个的任务执行结果
taskExecuteDTO.setTaskBatchId(completeJobBatchDTO.getTaskBatchId());

View File

@ -7,7 +7,7 @@ import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
@ -250,7 +250,7 @@ public class WorkflowBatchHandler {
// 重新尝试执行, 重新生成任务批次
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(workflowTaskBatchId);
taskExecuteDTO.setExecuteStrategy(JobExecuteStrategyEnum.WORKFLOW.getType());
taskExecuteDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType());
taskExecuteDTO.setParentId(parentId);
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
@ -261,7 +261,7 @@ public class WorkflowBatchHandler {
// 生成任务批次
Job job = jobMapper.selectById(jobTaskBatch.getJobId());
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job);
jobTaskPrepare.setExecuteStrategy(JobExecuteStrategyEnum.WORKFLOW.getType());
jobTaskPrepare.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType());
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli() + DateUtils.toNowMilli() % 1000);
jobTaskPrepare.setWorkflowTaskBatchId(workflowTaskBatchId);
jobTaskPrepare.setParentWorkflowNodeId(parentId);

View File

@ -42,7 +42,7 @@ public class WaiWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler {
WorkflowTimerTaskDTO workflowTimerTaskDTO = new WorkflowTimerTaskDTO();
workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskPrepareDTO.getWorkflowTaskBatchId());
workflowTimerTaskDTO.setWorkflowId(workflowTaskPrepareDTO.getWorkflowId());
workflowTimerTaskDTO.setExecuteStrategy(workflowTaskPrepareDTO.getExecuteStrategy());
workflowTimerTaskDTO.setTaskExecutorScene(workflowTaskPrepareDTO.getTaskExecutorScene());
JobTimerWheel.register(TaskTypeEnum.WORKFLOW.getType(), workflowTaskPrepareDTO.getWorkflowTaskBatchId(),
new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
}

View File

@ -31,7 +31,7 @@ public class JobTimerTask implements TimerTask {
TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO();
taskExecuteDTO.setTaskBatchId(jobTimerTaskDTO.getTaskBatchId());
taskExecuteDTO.setJobId(jobTimerTaskDTO.getJobId());
taskExecuteDTO.setExecuteStrategy(jobTimerTaskDTO.getExecuteStrategy());
taskExecuteDTO.setTaskExecutorScene(jobTimerTaskDTO.getTaskExecutorScene());
taskExecuteDTO.setWorkflowTaskBatchId(jobTimerTaskDTO.getWorkflowTaskBatchId());
taskExecuteDTO.setWorkflowNodeId(jobTimerTaskDTO.getWorkflowNodeId());
ActorRef actorRef = ActorGenerator.jobTaskExecutorActor();

View File

@ -2,7 +2,7 @@ package com.aizuda.easy.retry.server.job.task.support.timer;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
@ -31,7 +31,7 @@ public class ResidentJobTimerTask implements TimerTask {
// 清除时间轮的缓存
JobTimerWheel.clearCache(TaskTypeEnum.JOB.getType(), jobTimerTaskDTO.getTaskBatchId());
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job);
jobTaskPrepare.setExecuteStrategy(JobExecuteStrategyEnum.AUTO.getType());
jobTaskPrepare.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_JOB.getType());
// 执行预处理阶段
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
actorRef.tell(jobTaskPrepare, actorRef);

View File

@ -9,8 +9,6 @@ import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTimerTaskDTO;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
@ -47,7 +45,7 @@ public class WorkflowTimerTask implements TimerTask {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(workflowTimerTaskDTO.getWorkflowTaskBatchId());
taskExecuteDTO.setWorkflowId(workflowTimerTaskDTO.getWorkflowId());
taskExecuteDTO.setExecuteStrategy(workflowTimerTaskDTO.getExecuteStrategy());
taskExecuteDTO.setTaskExecutorScene(workflowTimerTaskDTO.getTaskExecutorScene());
taskExecuteDTO.setParentId(SystemConstants.ROOT);
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);

View File

@ -1,5 +1,8 @@
package com.aizuda.easy.retry.server.web.model.response;
import com.aizuda.easy.retry.server.common.config.SystemProperties.Callback;
import com.aizuda.easy.retry.server.common.dto.CallbackConfig;
import com.aizuda.easy.retry.server.common.dto.DecisionConfig;
import lombok.Data;
import java.time.LocalDateTime;
@ -58,4 +61,14 @@ public class JobBatchResponseVO {
* 执行器名称
*/
private String executorInfo;
/**
* 工作流的回调节点信息
*/
private CallbackConfig callback;
/**
* 工作流的决策节点信息
*/
private DecisionConfig decision;
}

View File

@ -2,8 +2,13 @@ package com.aizuda.easy.retry.server.web.service.impl;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.dto.CallbackConfig;
import com.aizuda.easy.retry.server.common.dto.DecisionConfig;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
@ -23,11 +28,14 @@ import com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatch
import com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchResponseDO;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
@ -42,12 +50,12 @@ import java.util.Objects;
* @since 2.4.0
*/
@Service
@RequiredArgsConstructor
public class JobBatchServiceImpl implements JobBatchService {
@Autowired
private JobTaskBatchMapper jobTaskBatchMapper;
@Autowired
private JobMapper jobMapper;
private final JobTaskBatchMapper jobTaskBatchMapper;
private final JobMapper jobMapper;
private final WorkflowNodeMapper workflowNodeMapper;
@Override
public PageResult<List<JobBatchResponseVO>> getJobBatchPage(final JobBatchQueryVO queryVO) {
@ -78,10 +86,11 @@ public class JobBatchServiceImpl implements JobBatchService {
jobBatchQueryDO.setTaskBatchStatus(queryVO.getTaskBatchStatus());
jobBatchQueryDO.setGroupNames(groupNames);
jobBatchQueryDO.setNamespaceId(userSessionVO.getNamespaceId());
List<JobBatchResponseDO> batchResponseDOList = jobTaskBatchMapper.selectJobBatchPageList(pageDTO, jobBatchQueryDO);
List<JobBatchResponseDO> batchResponseDOList = jobTaskBatchMapper.selectJobBatchPageList(pageDTO,
jobBatchQueryDO);
List<JobBatchResponseVO> batchResponseVOList = JobBatchResponseVOConverter.INSTANCE.toJobBatchResponseVOs(
batchResponseDOList);
batchResponseDOList);
return new PageResult<>(pageDTO, batchResponseVOList);
}
@ -93,8 +102,28 @@ public class JobBatchServiceImpl implements JobBatchService {
return null;
}
Job job = jobMapper.selectById(jobTaskBatch.getJobId());
return JobBatchResponseVOConverter.INSTANCE.toJobBatchResponseVO(jobTaskBatch, job);
if (jobTaskBatch.getTaskType().equals(TaskTypeEnum.JOB.getType())) {
Job job = jobMapper.selectById(jobTaskBatch.getJobId());
return JobBatchResponseVOConverter.INSTANCE.toJobBatchResponseVO(jobTaskBatch, job);
}
JobBatchResponseVO jobBatchResponseVO = JobBatchResponseVOConverter.INSTANCE.toJobBatchResponseVO(jobTaskBatch);
// 回调节点
if (SystemConstants.CALLBACK_JOB_ID.equals(jobTaskBatch.getJobId())) {
WorkflowNode workflowNode = workflowNodeMapper.selectById(jobTaskBatch.getWorkflowNodeId());
jobBatchResponseVO.setJobName(workflowNode.getNodeName());
jobBatchResponseVO.setCallback(JsonUtil.parseObject(workflowNode.getNodeInfo(), CallbackConfig.class));
}
// 条件节点
if (SystemConstants.DECISION_JOB_ID.equals(jobTaskBatch.getJobId())) {
WorkflowNode workflowNode = workflowNodeMapper.selectById(jobTaskBatch.getWorkflowNodeId());
jobBatchResponseVO.setJobName(workflowNode.getNodeName());
jobBatchResponseVO.setDecision(JsonUtil.parseObject(workflowNode.getNodeInfo(), DecisionConfig.class));
}
return jobBatchResponseVO;
}
@Override

View File

@ -6,7 +6,7 @@ import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.common.enums.TriggerTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
@ -230,7 +230,7 @@ public class JobServiceImpl implements JobService {
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job);
// 设置now表示立即执行
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli());
jobTaskPrepare.setExecuteStrategy(JobExecuteStrategyEnum.MANUAL.getType());
jobTaskPrepare.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_JOB.getType());
// 创建批次
jobPrePareHandler.handler(jobTaskPrepare);