fix: 2.6.0

1. 优化工作流一个任务节点多个批次显示问题
This commit is contained in:
byteblogs168 2024-01-27 17:31:01 +08:00
parent 532ee481d9
commit 67895f3d7f
73 changed files with 158 additions and 170 deletions

View File

@ -368,7 +368,7 @@ CREATE TABLE `job_task_batch`
`task_batch_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务批次状态 0、失败 1、成功', `task_batch_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务批次状态 0、失败 1、成功',
`operation_reason` tinyint(4) NOT NULL DEFAULT '0' COMMENT '操作原因', `operation_reason` tinyint(4) NOT NULL DEFAULT '0' COMMENT '操作原因',
`execution_at` bigint(13) NOT NULL DEFAULT '0' COMMENT '任务执行时间', `execution_at` bigint(13) NOT NULL DEFAULT '0' COMMENT '任务执行时间',
`task_type` tinyint(4) NOT NULL DEFAULT '3' COMMENT '任务类型 3、JOB任务 4、WORKFLOW任务', `system_task_type` tinyint(4) NOT NULL DEFAULT '3' COMMENT '任务类型 3、JOB任务 4、WORKFLOW任务',
`parent_id` varchar(64) NOT NULL DEFAULT '' COMMENT '父节点', `parent_id` varchar(64) NOT NULL DEFAULT '' COMMENT '父节点',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',

View File

@ -81,13 +81,13 @@ public class JobTaskBatch implements Serializable {
/** /**
* 任务类型 3JOB任务 4WORKFLOW任务 * 任务类型 3JOB任务 4WORKFLOW任务
*/ */
private Integer taskType; private Integer systemTaskType;
/** /**
* 操作原因 * 操作原因
*/ */
private Integer operationReason; private Integer operationReason;
/** /**
* 修改时间 * 修改时间
*/ */

View File

@ -20,7 +20,7 @@
FROM job_task_batch a join job b on a.job_id = b.id FROM job_task_batch a join job b on a.job_id = b.id
<where> <where>
a.namespace_id = #{queryDO.namespaceId} a.namespace_id = #{queryDO.namespaceId}
and a.task_type = 3 and a.system_task_type = 3
<if test="queryDO.jobId != null"> <if test="queryDO.jobId != null">
and a.job_id = #{queryDO.jobId} and a.job_id = #{queryDO.jobId}
</if> </if>

View File

@ -3,7 +3,7 @@ package com.aizuda.easy.retry.server.common.cache;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.log.EasyRetryLog; import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.Lifecycle; import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.enums.SyetemTaskTypeEnum;
import com.google.common.cache.Cache; import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import lombok.Data; import lombok.Data;
@ -29,7 +29,7 @@ public class CacheGroupScanActor implements Lifecycle {
* *
* @return 缓存对象 * @return 缓存对象
*/ */
public static ActorRef get(String groupName, TaskTypeEnum typeEnum) { public static ActorRef get(String groupName, SyetemTaskTypeEnum typeEnum) {
return CACHE.getIfPresent(groupName.concat(typeEnum.name())); return CACHE.getIfPresent(groupName.concat(typeEnum.name()));
} }
@ -38,7 +38,7 @@ public class CacheGroupScanActor implements Lifecycle {
* *
* @return 缓存对象 * @return 缓存对象
*/ */
public static void put(String groupName, TaskTypeEnum typeEnum, ActorRef actorRef) { public static void put(String groupName, SyetemTaskTypeEnum typeEnum, ActorRef actorRef) {
CACHE.put(groupName.concat(typeEnum.name()), actorRef); CACHE.put(groupName.concat(typeEnum.name()), actorRef);
} }

View File

@ -14,14 +14,14 @@ import lombok.Getter;
@Getter @Getter
@AllArgsConstructor @AllArgsConstructor
public enum JobTaskExecutorSceneEnum { public enum JobTaskExecutorSceneEnum {
AUTO_JOB(1, TaskTypeEnum.JOB), AUTO_JOB(1, SyetemTaskTypeEnum.JOB),
MANUAL_JOB(2, TaskTypeEnum.JOB), MANUAL_JOB(2, SyetemTaskTypeEnum.JOB),
AUTO_WORKFLOW(3, TaskTypeEnum.WORKFLOW), AUTO_WORKFLOW(3, SyetemTaskTypeEnum.WORKFLOW),
MANUAL_WORKFLOW(4, TaskTypeEnum.WORKFLOW), MANUAL_WORKFLOW(4, SyetemTaskTypeEnum.WORKFLOW),
; ;
private final Integer type; private final Integer type;
private final TaskTypeEnum taskType; private final SyetemTaskTypeEnum systemTaskType;
/** /**
* 根据给定的类型获取对应的触发器类型枚举 * 根据给定的类型获取对应的触发器类型枚举

View File

@ -14,7 +14,7 @@ import java.util.function.Supplier;
*/ */
@AllArgsConstructor @AllArgsConstructor
@Getter @Getter
public enum TaskTypeEnum { public enum SyetemTaskTypeEnum {
RETRY(1, ActorGenerator::scanGroupActor), RETRY(1, ActorGenerator::scanGroupActor),
CALLBACK(2, ActorGenerator::scanCallbackGroupActor), CALLBACK(2, ActorGenerator::scanCallbackGroupActor),
JOB(3, ActorGenerator::scanJobActor), JOB(3, ActorGenerator::scanJobActor),

View File

@ -13,7 +13,7 @@ import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum; import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies; import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.util.DateUtils; import com.aizuda.easy.retry.server.common.util.DateUtils;
@ -153,7 +153,7 @@ public class JobExecutorActor extends AbstractActor {
@Override @Override
public void afterCompletion(int status) { public void afterCompletion(int status) {
// 清除时间轮的缓存 // 清除时间轮的缓存
JobTimerWheel.clearCache(TaskTypeEnum.JOB.getType(), taskExecute.getTaskBatchId()); JobTimerWheel.clearCache(SyetemTaskTypeEnum.JOB.getType(), taskExecute.getTaskBatchId());
//方法内容 //方法内容
doHandlerResidentTask(job, taskExecute); doHandlerResidentTask(job, taskExecute);
} }
@ -208,7 +208,7 @@ public class JobExecutorActor extends AbstractActor {
log.info("常驻任务监控. 任务时间差:[{}] 取余:[{}]", milliseconds, DateUtils.toNowMilli() % 1000); log.info("常驻任务监控. 任务时间差:[{}] 取余:[{}]", milliseconds, DateUtils.toNowMilli() % 1000);
job.setNextTriggerAt(nextTriggerAt); job.setNextTriggerAt(nextTriggerAt);
JobTimerWheel.register(TaskTypeEnum.JOB.getType(), jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - DateUtils.toNowMilli() % 1000, TimeUnit.MILLISECONDS); JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - DateUtils.toNowMilli() % 1000, TimeUnit.MILLISECONDS);
ResidentTaskCache.refresh(job.getId(), nextTriggerAt); ResidentTaskCache.refresh(job.getId(), nextTriggerAt);
} }
} }

View File

@ -4,7 +4,7 @@ import akka.actor.AbstractActor;
import com.aizuda.easy.retry.common.core.context.SpringContext; import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum; import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.JobPrePareHandler; import com.aizuda.easy.retry.server.job.task.support.JobPrePareHandler;
import com.aizuda.easy.retry.server.job.task.support.prepare.job.TerminalJobPrepareHandler; import com.aizuda.easy.retry.server.job.task.support.prepare.job.TerminalJobPrepareHandler;
@ -59,12 +59,12 @@ public class JobTaskPrepareActor extends AbstractActor {
JobTaskExecutorSceneEnum jobTaskExecutorSceneEnum = JobTaskExecutorSceneEnum.get( JobTaskExecutorSceneEnum jobTaskExecutorSceneEnum = JobTaskExecutorSceneEnum.get(
prepare.getTaskExecutorScene()); prepare.getTaskExecutorScene());
if (TaskTypeEnum.WORKFLOW.getType().equals(jobTaskExecutorSceneEnum.getTaskType().getType())) { if (SyetemTaskTypeEnum.WORKFLOW.getType().equals(jobTaskExecutorSceneEnum.getSystemTaskType().getType())) {
queryWrapper.eq(JobTaskBatch::getWorkflowNodeId, prepare.getWorkflowNodeId()); queryWrapper.eq(JobTaskBatch::getWorkflowNodeId, prepare.getWorkflowNodeId());
queryWrapper.eq(JobTaskBatch::getWorkflowTaskBatchId, prepare.getWorkflowTaskBatchId()); queryWrapper.eq(JobTaskBatch::getWorkflowTaskBatchId, prepare.getWorkflowTaskBatchId());
queryWrapper.eq(JobTaskBatch::getTaskType, TaskTypeEnum.WORKFLOW.getType()); queryWrapper.eq(JobTaskBatch::getSystemTaskType, SyetemTaskTypeEnum.WORKFLOW.getType());
} else { } else {
queryWrapper.eq(JobTaskBatch::getTaskType, TaskTypeEnum.JOB.getType()); queryWrapper.eq(JobTaskBatch::getSystemTaskType, SyetemTaskTypeEnum.JOB.getType());
} }
List<JobTaskBatch> notCompleteJobTaskBatchList = jobTaskBatchMapper List<JobTaskBatch> notCompleteJobTaskBatchList = jobTaskBatchMapper

View File

@ -77,7 +77,6 @@ public class ScanWorkflowTaskActor extends AbstractActor {
long now = DateUtils.toNowMilli(); long now = DateUtils.toNowMilli();
for (PartitionTask partitionTask : partitionTasks) { for (PartitionTask partitionTask : partitionTasks) {
WorkflowPartitionTaskDTO workflowPartitionTaskDTO = (WorkflowPartitionTaskDTO) partitionTask; WorkflowPartitionTaskDTO workflowPartitionTaskDTO = (WorkflowPartitionTaskDTO) partitionTask;
log.warn("监控时间. workflowId:[{}] now:[{}], dbnextTriggerAt:[{}]", workflowPartitionTaskDTO.getId(), now, workflowPartitionTaskDTO.getNextTriggerAt());
processWorkflow(workflowPartitionTaskDTO, waitUpdateJobs, waitExecWorkflows, now); processWorkflow(workflowPartitionTaskDTO, waitUpdateJobs, waitExecWorkflows, now);
} }
@ -85,8 +84,6 @@ public class ScanWorkflowTaskActor extends AbstractActor {
workflowMapper.updateBatchNextTriggerAtById(waitUpdateJobs); workflowMapper.updateBatchNextTriggerAtById(waitUpdateJobs);
for (final WorkflowTaskPrepareDTO waitExecTask : waitExecWorkflows) { for (final WorkflowTaskPrepareDTO waitExecTask : waitExecWorkflows) {
log.warn("监控时间. workflowId:[{}] now:[{}], nextTriggerAt:[{}]", waitExecTask.getWorkflowId(), now, waitExecTask.getNextTriggerAt());
// 执行预处理阶段 // 执行预处理阶段
ActorRef actorRef = ActorGenerator.workflowTaskPrepareActor(); ActorRef actorRef = ActorGenerator.workflowTaskPrepareActor();
waitExecTask.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType()); waitExecTask.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType());

View File

@ -1,12 +1,10 @@
package com.aizuda.easy.retry.server.job.task.support.generator.batch; package com.aizuda.easy.retry.server.job.task.support.generator.batch;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum; import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtils; import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
@ -21,12 +19,9 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException; import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime; import java.time.LocalDateTime;
@ -54,7 +49,7 @@ public class JobTaskBatchGenerator {
JobTaskBatch jobTaskBatch = JobTaskConverter.INSTANCE.toJobTaskBatch(context); JobTaskBatch jobTaskBatch = JobTaskConverter.INSTANCE.toJobTaskBatch(context);
JobTaskExecutorSceneEnum jobTaskExecutorSceneEnum = JobTaskExecutorSceneEnum.get( JobTaskExecutorSceneEnum jobTaskExecutorSceneEnum = JobTaskExecutorSceneEnum.get(
context.getTaskExecutorScene()); context.getTaskExecutorScene());
jobTaskBatch.setTaskType(jobTaskExecutorSceneEnum.getTaskType().getType()); jobTaskBatch.setSystemTaskType(jobTaskExecutorSceneEnum.getSystemTaskType().getType());
jobTaskBatch.setCreateDt(LocalDateTime.now()); jobTaskBatch.setCreateDt(LocalDateTime.now());
// 无执行的节点 // 无执行的节点
@ -100,7 +95,7 @@ public class JobTaskBatchGenerator {
jobTimerTaskDTO.setTaskExecutorScene(context.getTaskExecutorScene()); jobTimerTaskDTO.setTaskExecutorScene(context.getTaskExecutorScene());
jobTimerTaskDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); jobTimerTaskDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
jobTimerTaskDTO.setWorkflowNodeId(context.getWorkflowNodeId()); jobTimerTaskDTO.setWorkflowNodeId(context.getWorkflowNodeId());
JobTimerWheel.register(TaskTypeEnum.JOB.getType(), jobTaskBatch.getId(), JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), jobTaskBatch.getId(),
new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS); new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
return jobTaskBatch; return jobTaskBatch;

View File

@ -2,7 +2,7 @@ package com.aizuda.easy.retry.server.job.task.support.generator.batch;
import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtils; import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTimerTaskDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowTimerTaskDTO;
@ -51,7 +51,7 @@ public class WorkflowBatchGenerator {
workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskBatch.getId()); workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskBatch.getId());
workflowTimerTaskDTO.setWorkflowId(context.getWorkflowId()); workflowTimerTaskDTO.setWorkflowId(context.getWorkflowId());
workflowTimerTaskDTO.setTaskExecutorScene(context.getTaskExecutorScene()); workflowTimerTaskDTO.setTaskExecutorScene(context.getTaskExecutorScene());
JobTimerWheel.register(TaskTypeEnum.WORKFLOW.getType(), workflowTaskBatch.getId(), JobTimerWheel.register(SyetemTaskTypeEnum.WORKFLOW.getType(), workflowTaskBatch.getId(),
new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS); new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
} }
} }

View File

@ -5,6 +5,7 @@ import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.common.log.EasyRetryLog; import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum; import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum;
@ -102,13 +103,19 @@ public class WorkflowBatchHandler {
} }
} }
// 判定叶子节点的状态 boolean isMatchSuccess = jobTaskBatchList.stream()
for (JobTaskBatch jobTaskBatch : jobTaskBatchList) { .anyMatch(jobTaskBatch -> JobTaskStatusEnum.SUCCESS.getStatus() == jobTaskBatch.getTaskBatchStatus());
if (JobTaskBatchStatusEnum.NOT_SUCCESS.contains(jobTaskBatch.getTaskBatchStatus())) { if (!isMatchSuccess) {
// 只要叶子节点不是无需处理的都是失败 // 判定叶子节点的状态
if (JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason() != jobTaskBatch.getOperationReason() for (JobTaskBatch jobTaskBatch : jobTaskBatchList) {
&& JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason() != jobTaskBatch.getOperationReason()) { if (jobTaskBatch.getTaskBatchStatus() == JobTaskBatchStatusEnum.SUCCESS.getStatus()) {
taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus(); break;
} else if (JobTaskBatchStatusEnum.NOT_SUCCESS.contains(jobTaskBatch.getTaskBatchStatus())) {
// 只要叶子节点不是无需处理的都是失败
if (JobOperationReasonEnum.WORKFLOW_NODE_NO_REQUIRED.getReason() != jobTaskBatch.getOperationReason()
&& JobOperationReasonEnum.WORKFLOW_NODE_CLOSED_SKIP_EXECUTION.getReason() != jobTaskBatch.getOperationReason()) {
taskStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
}
} }
} }
} }

View File

@ -1,7 +1,7 @@
package com.aizuda.easy.retry.server.job.task.support.prepare.job; package com.aizuda.easy.retry.server.job.task.support.prepare.job;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.easy.retry.server.common.util.DateUtils; import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
@ -33,7 +33,7 @@ public class WaitJobPrepareHandler extends AbstractJobPrePareHandler {
log.info("存在待处理任务. taskBatchId:[{}]", jobPrepareDTO.getTaskBatchId()); log.info("存在待处理任务. taskBatchId:[{}]", jobPrepareDTO.getTaskBatchId());
// 若时间轮中数据不存在则重新加入 // 若时间轮中数据不存在则重新加入
if (!JobTimerWheel.isExisted(TaskTypeEnum.JOB.getType(), jobPrepareDTO.getTaskBatchId())) { if (!JobTimerWheel.isExisted(SyetemTaskTypeEnum.JOB.getType(), jobPrepareDTO.getTaskBatchId())) {
log.info("存在待处理任务且时间轮中不存在 taskBatchId:[{}]", jobPrepareDTO.getTaskBatchId()); log.info("存在待处理任务且时间轮中不存在 taskBatchId:[{}]", jobPrepareDTO.getTaskBatchId());
// 进入时间轮 // 进入时间轮
@ -41,7 +41,7 @@ public class WaitJobPrepareHandler extends AbstractJobPrePareHandler {
JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO(); JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO();
jobTimerTaskDTO.setTaskBatchId(jobPrepareDTO.getTaskBatchId()); jobTimerTaskDTO.setTaskBatchId(jobPrepareDTO.getTaskBatchId());
jobTimerTaskDTO.setJobId(jobPrepareDTO.getJobId()); jobTimerTaskDTO.setJobId(jobPrepareDTO.getJobId());
JobTimerWheel.register(TaskTypeEnum.JOB.getType(), jobPrepareDTO.getTaskBatchId(), JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), jobPrepareDTO.getTaskBatchId(),
new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS); new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
} }
} }

View File

@ -1,7 +1,7 @@
package com.aizuda.easy.retry.server.job.task.support.prepare.workflow; package com.aizuda.easy.retry.server.job.task.support.prepare.workflow;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.easy.retry.server.common.util.DateUtils; import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTimerTaskDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowTimerTaskDTO;
@ -34,7 +34,7 @@ public class WaiWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler {
log.info("存在待处理任务. workflowTaskBatchId:[{}]", workflowTaskPrepareDTO.getWorkflowTaskBatchId()); log.info("存在待处理任务. workflowTaskBatchId:[{}]", workflowTaskPrepareDTO.getWorkflowTaskBatchId());
// 若时间轮中数据不存在则重新加入 // 若时间轮中数据不存在则重新加入
if (!JobTimerWheel.isExisted(TaskTypeEnum.WORKFLOW.getType(), workflowTaskPrepareDTO.getWorkflowTaskBatchId())) { if (!JobTimerWheel.isExisted(SyetemTaskTypeEnum.WORKFLOW.getType(), workflowTaskPrepareDTO.getWorkflowTaskBatchId())) {
log.info("存在待处理任务且时间轮中不存在 workflowTaskBatchId:[{}]", workflowTaskPrepareDTO.getWorkflowTaskBatchId()); log.info("存在待处理任务且时间轮中不存在 workflowTaskBatchId:[{}]", workflowTaskPrepareDTO.getWorkflowTaskBatchId());
// 进入时间轮 // 进入时间轮
@ -43,7 +43,7 @@ public class WaiWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler {
workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskPrepareDTO.getWorkflowTaskBatchId()); workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskPrepareDTO.getWorkflowTaskBatchId());
workflowTimerTaskDTO.setWorkflowId(workflowTaskPrepareDTO.getWorkflowId()); workflowTimerTaskDTO.setWorkflowId(workflowTaskPrepareDTO.getWorkflowId());
workflowTimerTaskDTO.setTaskExecutorScene(workflowTaskPrepareDTO.getTaskExecutorScene()); workflowTimerTaskDTO.setTaskExecutorScene(workflowTaskPrepareDTO.getTaskExecutorScene());
JobTimerWheel.register(TaskTypeEnum.WORKFLOW.getType(), workflowTaskPrepareDTO.getWorkflowTaskBatchId(), JobTimerWheel.register(SyetemTaskTypeEnum.WORKFLOW.getType(), workflowTaskPrepareDTO.getWorkflowTaskBatchId(),
new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS); new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
} }
} }

View File

@ -3,7 +3,7 @@ package com.aizuda.easy.retry.server.job.task.support.timer;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum; import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
@ -29,7 +29,7 @@ public class ResidentJobTimerTask implements TimerTask {
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
try { try {
// 清除时间轮的缓存 // 清除时间轮的缓存
JobTimerWheel.clearCache(TaskTypeEnum.JOB.getType(), jobTimerTaskDTO.getTaskBatchId()); JobTimerWheel.clearCache(SyetemTaskTypeEnum.JOB.getType(), jobTimerTaskDTO.getTaskBatchId());
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job); JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job);
jobTaskPrepare.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_JOB.getType()); jobTaskPrepare.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_JOB.getType());
// 执行预处理阶段 // 执行预处理阶段

View File

@ -8,7 +8,7 @@ import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.log.EasyRetryLog; import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.enums.DelayLevelEnum; import com.aizuda.easy.retry.server.common.enums.DelayLevelEnum;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.generator.id.IdGenerator; import com.aizuda.easy.retry.server.common.generator.id.IdGenerator;
import com.aizuda.easy.retry.server.common.util.DateUtils; import com.aizuda.easy.retry.server.common.util.DateUtils;
@ -26,7 +26,6 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog;
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig; import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
import com.aizuda.easy.retry.template.datasource.utils.RequestDataHelper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -76,7 +75,7 @@ public abstract class AbstractGenerator implements TaskGenerator {
.eq(RetryTask::getGroupName, taskContext.getGroupName()) .eq(RetryTask::getGroupName, taskContext.getGroupName())
.eq(RetryTask::getSceneName, taskContext.getSceneName()) .eq(RetryTask::getSceneName, taskContext.getSceneName())
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()) .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())
.eq(RetryTask::getTaskType, TaskTypeEnum.RETRY.getType()) .eq(RetryTask::getTaskType, SyetemTaskTypeEnum.RETRY.getType())
.in(RetryTask::getIdempotentId, idempotentIdSet)); .in(RetryTask::getIdempotentId, idempotentIdSet));
Map<String/*幂等ID*/, List<RetryTask>> retryTaskMap = retryTasks.stream() Map<String/*幂等ID*/, List<RetryTask>> retryTaskMap = retryTasks.stream()
@ -131,7 +130,7 @@ public abstract class AbstractGenerator implements TaskGenerator {
RetryTask retryTask = RetryTaskConverter.INSTANCE.toRetryTask(taskInfo); RetryTask retryTask = RetryTaskConverter.INSTANCE.toRetryTask(taskInfo);
retryTask.setNamespaceId(taskContext.getNamespaceId()); retryTask.setNamespaceId(taskContext.getNamespaceId());
retryTask.setUniqueId(getIdGenerator(taskContext.getGroupName(), taskContext.getNamespaceId())); retryTask.setUniqueId(getIdGenerator(taskContext.getGroupName(), taskContext.getNamespaceId()));
retryTask.setTaskType(TaskTypeEnum.RETRY.getType()); retryTask.setTaskType(SyetemTaskTypeEnum.RETRY.getType());
retryTask.setGroupName(taskContext.getGroupName()); retryTask.setGroupName(taskContext.getGroupName());
retryTask.setSceneName(taskContext.getSceneName()); retryTask.setSceneName(taskContext.getSceneName());
retryTask.setRetryStatus(initStatus(taskContext)); retryTask.setRetryStatus(initStatus(taskContext));
@ -153,7 +152,7 @@ public abstract class AbstractGenerator implements TaskGenerator {
// 初始化日志 // 初始化日志
RetryTaskLog retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTask(retryTask); RetryTaskLog retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTask(retryTask);
retryTaskLog.setTaskType(TaskTypeEnum.RETRY.getType()); retryTaskLog.setTaskType(SyetemTaskTypeEnum.RETRY.getType());
retryTaskLog.setCreateDt(now); retryTaskLog.setCreateDt(now);
waitInsertTaskLogs.add(retryTaskLog); waitInsertTaskLogs.add(retryTaskLog);

View File

@ -4,7 +4,7 @@ import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.retry.task.service.RetryDeadLetterConverter; import com.aizuda.easy.retry.server.retry.task.service.RetryDeadLetterConverter;
import com.aizuda.easy.retry.server.retry.task.service.RetryService; import com.aizuda.easy.retry.server.retry.task.service.RetryService;
@ -53,7 +53,7 @@ public class RetryServiceImpl implements RetryService {
.eq(RetryTask::getNamespaceId, namespaceId) .eq(RetryTask::getNamespaceId, namespaceId)
.in(RetryTask::getRetryStatus, RetryStatusEnum.MAX_COUNT.getStatus(), .in(RetryTask::getRetryStatus, RetryStatusEnum.MAX_COUNT.getStatus(),
RetryStatusEnum.FINISH.getStatus()) RetryStatusEnum.FINISH.getStatus())
.eq(RetryTask::getTaskType, TaskTypeEnum.CALLBACK.getType()) .eq(RetryTask::getTaskType, SyetemTaskTypeEnum.CALLBACK.getType())
.eq(RetryTask::getGroupName, groupName)).getRecords(); .eq(RetryTask::getGroupName, groupName)).getRecords();
if (CollectionUtils.isEmpty(callbackRetryTasks)) { if (CollectionUtils.isEmpty(callbackRetryTasks)) {
@ -68,7 +68,7 @@ public class RetryServiceImpl implements RetryService {
List<RetryTask> retryTasks = accessTemplate.getRetryTaskAccess() List<RetryTask> retryTasks = accessTemplate.getRetryTaskAccess()
.list(groupName, namespaceId, new LambdaQueryWrapper<RetryTask>() .list(groupName, namespaceId, new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getNamespaceId, namespaceId) .eq(RetryTask::getNamespaceId, namespaceId)
.eq(RetryTask::getTaskType, TaskTypeEnum.RETRY.getType()) .eq(RetryTask::getTaskType, SyetemTaskTypeEnum.RETRY.getType())
.in(RetryTask::getUniqueId, uniqueIdSet) .in(RetryTask::getUniqueId, uniqueIdSet)
); );

View File

@ -9,7 +9,7 @@ import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.IdempotentStrategy; import com.aizuda.easy.retry.server.common.IdempotentStrategy;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskLogConverter; import com.aizuda.easy.retry.server.retry.task.support.RetryTaskLogConverter;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO; import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO;
@ -73,7 +73,7 @@ public class FailureActor extends AbstractActor {
protected void doInTransactionWithoutResult(TransactionStatus status) { protected void doInTransactionWithoutResult(TransactionStatus status) {
Integer maxRetryCount; Integer maxRetryCount;
if (TaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) { if (SyetemTaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) {
maxRetryCount = systemProperties.getCallback().getMaxCount(); maxRetryCount = systemProperties.getCallback().getMaxCount();
} else { } else {
maxRetryCount = sceneConfig.getMaxRetryCount(); maxRetryCount = sceneConfig.getMaxRetryCount();

View File

@ -1,6 +1,6 @@
package com.aizuda.easy.retry.server.retry.task.support.dispatch.task; package com.aizuda.easy.retry.server.retry.task.support.dispatch.task;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.enums.SyetemTaskTypeEnum;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;
@ -12,13 +12,13 @@ import lombok.Getter;
@AllArgsConstructor @AllArgsConstructor
@Getter @Getter
public enum TaskExecutorSceneEnum { public enum TaskExecutorSceneEnum {
AUTO_RETRY(1, TaskTypeEnum.RETRY), AUTO_RETRY(1, SyetemTaskTypeEnum.RETRY),
MANUAL_RETRY(2, TaskTypeEnum.RETRY), MANUAL_RETRY(2, SyetemTaskTypeEnum.RETRY),
AUTO_CALLBACK(3, TaskTypeEnum.CALLBACK), AUTO_CALLBACK(3, SyetemTaskTypeEnum.CALLBACK),
MANUAL_CALLBACK(4, TaskTypeEnum.CALLBACK); MANUAL_CALLBACK(4, SyetemTaskTypeEnum.CALLBACK);
private final int scene; private final int scene;
private final TaskTypeEnum taskType; private final SyetemTaskTypeEnum taskType;
} }

View File

@ -6,14 +6,13 @@ import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.WaitStrategy; import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyContext; import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyContext;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyEnum; import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyEnum;
import com.aizuda.easy.retry.server.common.util.DateUtils; import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter; import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskLogConverter; import com.aizuda.easy.retry.server.retry.task.support.RetryTaskLogConverter;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
@ -26,9 +25,7 @@ import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.time.Instant;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
/** /**
@ -58,12 +55,12 @@ public class CallbackRetryTaskHandler {
*/ */
@Transactional @Transactional
public void create(RetryTask retryTask) { public void create(RetryTask retryTask) {
if (!TaskTypeEnum.RETRY.getType().equals(retryTask.getTaskType())) { if (!SyetemTaskTypeEnum.RETRY.getType().equals(retryTask.getTaskType())) {
return; return;
} }
RetryTask callbackRetryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryTask); RetryTask callbackRetryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryTask);
callbackRetryTask.setTaskType(TaskTypeEnum.CALLBACK.getType()); callbackRetryTask.setTaskType(SyetemTaskTypeEnum.CALLBACK.getType());
callbackRetryTask.setId(null); callbackRetryTask.setId(null);
callbackRetryTask.setUniqueId(generatorCallbackUniqueId(retryTask.getUniqueId())); callbackRetryTask.setUniqueId(generatorCallbackUniqueId(retryTask.getUniqueId()));
callbackRetryTask.setRetryStatus(RetryStatusEnum.RUNNING.getStatus()); callbackRetryTask.setRetryStatus(RetryStatusEnum.RUNNING.getStatus());
@ -91,7 +88,7 @@ public class CallbackRetryTaskHandler {
// 初始化回调日志 // 初始化回调日志
RetryTaskLog retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTask(callbackRetryTask); RetryTaskLog retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTask(callbackRetryTask);
// 记录重试日志 // 记录重试日志
retryTaskLog.setTaskType(TaskTypeEnum.CALLBACK.getType()); retryTaskLog.setTaskType(SyetemTaskTypeEnum.CALLBACK.getType());
retryTaskLog.setCreateDt(LocalDateTime.now()); retryTaskLog.setCreateDt(LocalDateTime.now());
Assert.isTrue(1 == retryTaskLogMapper.insert(retryTaskLog), Assert.isTrue(1 == retryTaskLogMapper.insert(retryTaskLog),
() -> new EasyRetryServerException("新增重试日志失败")); () -> new EasyRetryServerException("新增重试日志失败"));

View File

@ -10,7 +10,7 @@ import com.aizuda.easy.retry.server.common.cache.CacheGroupScanActor;
import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.ScanTask; import com.aizuda.easy.retry.server.common.dto.ScanTask;
import com.aizuda.easy.retry.server.common.enums.SystemModeEnum; import com.aizuda.easy.retry.server.common.enums.SystemModeEnum;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.easy.retry.server.retry.task.support.cache.CacheGroupRateLimiter; import com.aizuda.easy.retry.server.retry.task.support.cache.CacheGroupRateLimiter;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.ServerNodeMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.ServerNodeMapper;
@ -70,11 +70,11 @@ public class ConsumerBucketActor extends AbstractActor {
scanTask.setBuckets(consumerBucket.getBuckets()); scanTask.setBuckets(consumerBucket.getBuckets());
// 扫描定时任务数据 // 扫描定时任务数据
ActorRef scanJobActorRef = cacheActorRef(DEFAULT_JOB_KEY, TaskTypeEnum.JOB); ActorRef scanJobActorRef = cacheActorRef(DEFAULT_JOB_KEY, SyetemTaskTypeEnum.JOB);
scanJobActorRef.tell(scanTask, scanJobActorRef); scanJobActorRef.tell(scanTask, scanJobActorRef);
// 扫描DAG工作流任务数据 // 扫描DAG工作流任务数据
ActorRef scanWorkflowActorRef = cacheActorRef(DEFAULT_WORKFLOW_KEY, TaskTypeEnum.WORKFLOW); ActorRef scanWorkflowActorRef = cacheActorRef(DEFAULT_WORKFLOW_KEY, SyetemTaskTypeEnum.WORKFLOW);
scanWorkflowActorRef.tell(scanTask, scanWorkflowActorRef); scanWorkflowActorRef.tell(scanTask, scanWorkflowActorRef);
} }
@ -120,11 +120,11 @@ public class ConsumerBucketActor extends AbstractActor {
cacheRateLimiter(groupName); cacheRateLimiter(groupName);
// 扫描重试数据 // 扫描重试数据
ActorRef scanRetryActorRef = cacheActorRef(groupName, TaskTypeEnum.RETRY); ActorRef scanRetryActorRef = cacheActorRef(groupName, SyetemTaskTypeEnum.RETRY);
scanRetryActorRef.tell(scanTask, scanRetryActorRef); scanRetryActorRef.tell(scanTask, scanRetryActorRef);
// 扫描回调数据 // 扫描回调数据
ActorRef scanCallbackActorRef = cacheActorRef(groupName, TaskTypeEnum.CALLBACK); ActorRef scanCallbackActorRef = cacheActorRef(groupName, SyetemTaskTypeEnum.CALLBACK);
scanCallbackActorRef.tell(scanTask, scanCallbackActorRef); scanCallbackActorRef.tell(scanTask, scanCallbackActorRef);
} }
@ -148,7 +148,7 @@ public class ConsumerBucketActor extends AbstractActor {
/** /**
* 缓存Actor对象 * 缓存Actor对象
*/ */
private ActorRef cacheActorRef(String groupName, TaskTypeEnum typeEnum) { private ActorRef cacheActorRef(String groupName, SyetemTaskTypeEnum typeEnum) {
ActorRef scanActorRef = CacheGroupScanActor.get(groupName, typeEnum); ActorRef scanActorRef = CacheGroupScanActor.get(groupName, typeEnum);
if (Objects.isNull(scanActorRef)) { if (Objects.isNull(scanActorRef)) {
scanActorRef = typeEnum.getActorRef().get(); scanActorRef = typeEnum.getActorRef().get();

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -5,8 +5,8 @@
<meta charset="UTF-8" /> <meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" /> <meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Easy Retry</title> <title>Easy Retry</title>
<script type="module" crossorigin src="./assets/nqxcaVpR.js"></script> <script type="module" crossorigin src="./assets/fjGXFx1T.js"></script>
<link rel="stylesheet" crossorigin href="./assets/eHB570AZ.css"> <link rel="stylesheet" crossorigin href="./assets/RihNDpOw.css">
</head> </head>
<body> <body>

View File

@ -339,14 +339,14 @@ public class GroupConfigServiceImpl implements GroupConfigService {
String schema = connection.getSchema(); String schema = connection.getSchema();
// https://gitee.com/aizuda/easy-retry/issues/I8DAMH // https://gitee.com/aizuda/easy-retry/issues/I8DAMH
String sql = MessageFormatter.arrayFormat("SELECT table_name\n" String sql = MessageFormatter.arrayFormat("SELECT table_name\n"
+ "FROM information_schema.tables\n" + "FROM information_schema.tables\n"
+ "WHERE table_name LIKE 'retry_task_%' AND table_schema = '{}' and table_catalog = '{}'", new Object[]{schema, catalog}).getMessage(); + "WHERE table_name LIKE 'retry_task_%' AND (table_schema = '{}' OR table_schema = '{}' OR table_catalog = '{}' OR table_catalog = '{}')", new Object[]{schema, catalog, schema, catalog}).getMessage();
List<String> tableList = jdbcTemplate.queryForList(sql, String.class); List<String> tableList = jdbcTemplate.queryForList(sql, String.class);
return tableList.stream().map(ReUtil::getFirstNumber).filter(i -> return tableList.stream().map(ReUtil::getFirstNumber).filter(i ->
!Objects.isNull(i) && i <= systemProperties.getTotalPartition()).distinct() !Objects.isNull(i) && i <= systemProperties.getTotalPartition()).distinct()
.collect(Collectors.toList()); .collect(Collectors.toList());
} catch (SQLException ignored) { } catch (SQLException ignored) {
} finally { } finally {
if (Objects.nonNull(connection)) { if (Objects.nonNull(connection)) {

View File

@ -4,15 +4,11 @@ import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.dto.CallbackConfig; import com.aizuda.easy.retry.server.common.dto.CallbackConfig;
import com.aizuda.easy.retry.server.common.dto.DecisionConfig; import com.aizuda.easy.retry.server.common.dto.DecisionConfig;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.JobPrePareHandler;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler; import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler;
import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory; import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory;
@ -32,12 +28,9 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNode
import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode; import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
@ -105,7 +98,7 @@ public class JobBatchServiceImpl implements JobBatchService {
Job job = jobMapper.selectById(jobTaskBatch.getJobId()); Job job = jobMapper.selectById(jobTaskBatch.getJobId());
JobBatchResponseVO jobBatchResponseVO = JobBatchResponseVOConverter.INSTANCE.toJobBatchResponseVO(jobTaskBatch, job); JobBatchResponseVO jobBatchResponseVO = JobBatchResponseVOConverter.INSTANCE.toJobBatchResponseVO(jobTaskBatch, job);
if (jobTaskBatch.getTaskType().equals(TaskTypeEnum.WORKFLOW.getType())) { if (jobTaskBatch.getSystemTaskType().equals(SyetemTaskTypeEnum.WORKFLOW.getType())) {
WorkflowNode workflowNode = workflowNodeMapper.selectById(jobTaskBatch.getWorkflowNodeId()); WorkflowNode workflowNode = workflowNodeMapper.selectById(jobTaskBatch.getWorkflowNodeId());
jobBatchResponseVO.setNodeName(workflowNode.getNodeName()); jobBatchResponseVO.setNodeName(workflowNode.getNodeName());

View File

@ -5,9 +5,8 @@ import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.server.common.WaitStrategy; import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyContext; import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyContext;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyEnum; import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyEnum;
import com.aizuda.easy.retry.server.common.util.DateUtils; import com.aizuda.easy.retry.server.common.util.DateUtils;
@ -37,12 +36,9 @@ import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -138,7 +134,7 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService {
RetryTask retryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryDeadLetter); RetryTask retryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryDeadLetter);
retryTask.setRetryStatus(RetryStatusEnum.RUNNING.getStatus()); retryTask.setRetryStatus(RetryStatusEnum.RUNNING.getStatus());
retryTask.setTaskType(TaskTypeEnum.RETRY.getType()); retryTask.setTaskType(SyetemTaskTypeEnum.RETRY.getType());
WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
waitStrategyContext.setNextTriggerAt(LocalDateTime.now()); waitStrategyContext.setNextTriggerAt(LocalDateTime.now());

View File

@ -12,7 +12,7 @@ import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.client.RequestBuilder; import com.aizuda.easy.retry.server.common.client.RequestBuilder;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.enums.TaskGeneratorSceneEnum; import com.aizuda.easy.retry.server.common.enums.TaskGeneratorSceneEnum;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyContext; import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyContext;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyEnum; import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyEnum;
@ -368,7 +368,7 @@ public class RetryTaskServiceImpl implements RetryTaskService {
requestVO.getGroupName(), namespaceId, requestVO.getGroupName(), namespaceId,
new LambdaQueryWrapper<RetryTask>() new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getNamespaceId, namespaceId) .eq(RetryTask::getNamespaceId, namespaceId)
.eq(RetryTask::getTaskType, TaskTypeEnum.RETRY.getType()) .eq(RetryTask::getTaskType, SyetemTaskTypeEnum.RETRY.getType())
.in(RetryTask::getUniqueId, uniqueIds)); .in(RetryTask::getUniqueId, uniqueIds));
Assert.notEmpty(list, () -> new EasyRetryServerException("没有可执行的任务")); Assert.notEmpty(list, () -> new EasyRetryServerException("没有可执行的任务"));
@ -399,7 +399,7 @@ public class RetryTaskServiceImpl implements RetryTaskService {
List<RetryTask> list = accessTemplate.getRetryTaskAccess().list(requestVO.getGroupName(), namespaceId, List<RetryTask> list = accessTemplate.getRetryTaskAccess().list(requestVO.getGroupName(), namespaceId,
new LambdaQueryWrapper<RetryTask>() new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getNamespaceId, namespaceId) .eq(RetryTask::getNamespaceId, namespaceId)
.eq(RetryTask::getTaskType, TaskTypeEnum.CALLBACK.getType()) .eq(RetryTask::getTaskType, SyetemTaskTypeEnum.CALLBACK.getType())
.in(RetryTask::getUniqueId, uniqueIds)); .in(RetryTask::getUniqueId, uniqueIds));
Assert.notEmpty(list, () -> new EasyRetryServerException("没有可执行的任务")); Assert.notEmpty(list, () -> new EasyRetryServerException("没有可执行的任务"));

View File

@ -5,6 +5,7 @@ import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.server.common.dto.JobTaskConfig; import com.aizuda.easy.retry.server.common.dto.JobTaskConfig;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
@ -105,9 +106,9 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
public WorkflowDetailResponseVO getWorkflowBatchDetail(Long id) { public WorkflowDetailResponseVO getWorkflowBatchDetail(Long id) {
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne( WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne(
new LambdaQueryWrapper<WorkflowTaskBatch>() new LambdaQueryWrapper<WorkflowTaskBatch>()
.eq(WorkflowTaskBatch::getId, id) .eq(WorkflowTaskBatch::getId, id)
.eq(WorkflowTaskBatch::getNamespaceId, UserSessionUtils.currentUserSession().getNamespaceId())); .eq(WorkflowTaskBatch::getNamespaceId, UserSessionUtils.currentUserSession().getNamespaceId()));
if (Objects.isNull(workflowTaskBatch)) { if (Objects.isNull(workflowTaskBatch)) {
return null; return null;
} }
@ -121,7 +122,7 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
List<Long> jobIds = workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toList()); List<Long> jobIds = workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toList());
List<Job> jobs = jobMapper.selectList(new LambdaQueryWrapper<Job>() List<Job> jobs = jobMapper.selectList(new LambdaQueryWrapper<Job>()
.in(Job::getId, new HashSet<>(jobIds))); .in(Job::getId, new HashSet<>(jobIds)));
Map<Long, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, job -> job)); Map<Long, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, job -> job));
@ -140,13 +141,15 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
.peek(nodeInfo -> { .peek(nodeInfo -> {
JobTaskConfig jobTask = nodeInfo.getJobTask(); JobTaskConfig jobTask = nodeInfo.getJobTask();
if(Objects.nonNull(jobTask)) { if (Objects.nonNull(jobTask)) {
jobTask.setJobName(jobMap.getOrDefault(jobTask.getJobId(), new Job()).getJobName()); jobTask.setJobName(jobMap.getOrDefault(jobTask.getJobId(), new Job()).getJobName());
} }
List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMap.get(nodeInfo.getId()); List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMap.get(nodeInfo.getId());
if (!CollectionUtils.isEmpty(jobTaskBatchList)) { if (!CollectionUtils.isEmpty(jobTaskBatchList)) {
jobTaskBatchList = jobTaskBatchList.stream().sorted(Comparator.comparingInt(JobTaskBatch::getTaskBatchStatus)).collect(Collectors.toList());
nodeInfo.setJobBatchList(JobBatchResponseVOConverter.INSTANCE.jobTaskBatchToJobBatchResponseVOs(jobTaskBatchList)); nodeInfo.setJobBatchList(JobBatchResponseVOConverter.INSTANCE.jobTaskBatchToJobBatchResponseVOs(jobTaskBatchList));
// 取第最新的一条状态 // 取第最新的一条状态
JobTaskBatch jobTaskBatch = jobTaskBatchList.get(0); JobTaskBatch jobTaskBatch = jobTaskBatchList.get(0);
if (JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason() == jobTaskBatch.getOperationReason()) { if (JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason() == jobTaskBatch.getOperationReason()) {
@ -166,6 +169,7 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
// 删除被误添加的节点 // 删除被误添加的节点
allNoOperationNode.remove(nodeInfo.getId()); allNoOperationNode.remove(nodeInfo.getId());
} }
} else { } else {
if (JobTaskBatchStatusEnum.NOT_SUCCESS.contains(workflowTaskBatch.getTaskBatchStatus())) { if (JobTaskBatchStatusEnum.NOT_SUCCESS.contains(workflowTaskBatch.getTaskBatchStatus())) {
allNoOperationNode.add(nodeInfo.getId()); allNoOperationNode.add(nodeInfo.getId());
@ -210,7 +214,7 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
private static boolean isNoOperation(JobTaskBatch i) { private static boolean isNoOperation(JobTaskBatch i) {
return JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(i.getOperationReason()) return JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(i.getOperationReason())
|| i.getTaskBatchStatus() == JobTaskBatchStatusEnum.STOP.getStatus(); || i.getTaskBatchStatus() == JobTaskBatchStatusEnum.STOP.getStatus();
} }
} }

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -5,8 +5,8 @@
<meta charset="UTF-8" /> <meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" /> <meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Easy Retry</title> <title>Easy Retry</title>
<script type="module" crossorigin src="./assets/yb0S4WKn.js"></script> <script type="module" crossorigin src="./assets/fjGXFx1T.js"></script>
<link rel="stylesheet" crossorigin href="./assets/EYpaHNYA.css"> <link rel="stylesheet" crossorigin href="./assets/RihNDpOw.css">
</head> </head>
<body> <body>