diff --git a/doc/sql/easy_retry_mysql.sql b/doc/sql/easy_retry_mysql.sql index 0459ae4e7..940ef7d85 100644 --- a/doc/sql/easy_retry_mysql.sql +++ b/doc/sql/easy_retry_mysql.sql @@ -453,6 +453,7 @@ CREATE TABLE `workflow` `executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒', `description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述', `flow_info` text DEFAULT NULL COMMENT '流程信息', + `bucket_index` int(11) NOT NULL DEFAULT '0' COMMENT 'bucket', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', `deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除', diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/WorkflowMapper.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/WorkflowMapper.java index 4d727ad0c..2ad1b37f1 100644 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/WorkflowMapper.java +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/WorkflowMapper.java @@ -1,8 +1,12 @@ package com.aizuda.easy.retry.template.datasource.persistence.mapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; + +import java.util.List; /** *

@@ -16,4 +20,5 @@ import org.apache.ibatis.annotations.Mapper; @Mapper public interface WorkflowMapper extends BaseMapper { + int updateBatchNextTriggerAtById(@Param("list") List list); } diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Workflow.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Workflow.java index c33c7a9a6..7871f1538 100644 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Workflow.java +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Workflow.java @@ -75,6 +75,11 @@ public class Workflow implements Serializable { */ private String flowInfo; + /** + * bucket + */ + private Integer bucketIndex; + /** * 描述 */ diff --git a/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/WorkflowMapper.xml b/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/WorkflowMapper.xml index 54ca92249..d957fb80a 100644 --- a/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/WorkflowMapper.xml +++ b/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/WorkflowMapper.xml @@ -8,12 +8,26 @@ - + + + update workflow rt, + ( + + select + #{item.nextTriggerAt} as next_trigger_at, + #{item.id} as id + + ) tt + set + rt.next_trigger_at = tt.next_trigger_at + where rt.id = tt.id + + diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java index 92b854649..a8269e320 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java @@ -40,7 +40,9 @@ public class ActorGenerator { /*----------------------------------------分布式任务调度 START----------------------------------------*/ public static final String SCAN_JOB_ACTOR = "ScanJobActor"; + public static final String SCAN_WORKFLOW_ACTOR = "ScanWorkflowTaskActor"; public static final String JOB_TASK_PREPARE_ACTOR = "JobTaskPrepareActor"; + public static final String WORKFLOW_TASK_PREPARE_ACTOR = "WorkflowTaskPrepareActor"; public static final String JOB_EXECUTOR_ACTOR = "JobExecutorActor"; public static final String JOB_EXECUTOR_RESULT_ACTOR = "JobExecutorResultActor"; public static final String JOB_LOG_ACTOR = "JobLogActor"; @@ -139,6 +141,17 @@ public class ActorGenerator { .withDispatcher(COMMON_SCAN_TASK_DISPATCHER)); } + /** + * 生成扫描工作流任务的actor + * + * @return actor 引用 + */ + public static ActorRef scanWorkflowActor() { + return getCommonActorSystemSystem().actorOf(getSpringExtension() + .props(SCAN_WORKFLOW_ACTOR) + .withDispatcher(COMMON_SCAN_TASK_DISPATCHER)); + } + /** * 生成扫描重试数据的actor * diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/TaskTypeEnum.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/TaskTypeEnum.java index bf4ed483d..4fe8d2fbe 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/TaskTypeEnum.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/TaskTypeEnum.java @@ -18,6 +18,7 @@ public enum TaskTypeEnum { RETRY(1, ActorGenerator::scanGroupActor), CALLBACK(2, ActorGenerator::scanCallbackGroupActor), JOB(3, ActorGenerator::scanJobActor), + WORKFLOW(4, ActorGenerator::scanWorkflowActor), ; diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTask.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTaskDTO.java similarity index 91% rename from easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTask.java rename to easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTaskDTO.java index 9e4150bd5..d0d05dc14 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTask.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTaskDTO.java @@ -4,15 +4,13 @@ import com.aizuda.easy.retry.server.common.dto.PartitionTask; import lombok.Data; import lombok.EqualsAndHashCode; -import java.time.LocalDateTime; - /** * @author: www.byteblogs.com * @date : 2023-10-10 17:52 */ @EqualsAndHashCode(callSuper = true) @Data -public class JobPartitionTask extends PartitionTask { +public class JobPartitionTaskDTO extends PartitionTask { private String namespaceId; diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowPartitionTaskDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowPartitionTaskDTO.java new file mode 100644 index 000000000..db3e20470 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowPartitionTaskDTO.java @@ -0,0 +1,51 @@ +package com.aizuda.easy.retry.server.job.task.dto; + +import com.aizuda.easy.retry.server.common.dto.PartitionTask; +import lombok.Data; +import lombok.EqualsAndHashCode; + +/** + * @author xiaowoniu + * @date 2023-12-21 21:38:52 + * @since 2.6.0 + */ +@EqualsAndHashCode(callSuper = true) +@Data +public class WorkflowPartitionTaskDTO extends PartitionTask { + + /** + * 命名空间id + */ + private String namespaceId; + + /** + * 组名称 + */ + private String groupName; + + /** + * 触发类型 + */ + private Integer triggerType; + + /** + * 触发间隔 + */ + private String triggerInterval; + + /** + * 执行超时时间 + */ + private Integer executorTimeout; + + /** + * 任务执行时间 + */ + private Long nextTriggerAt; + + /** + * 流程信息 + */ + private String flowInfo; + +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java new file mode 100644 index 000000000..50652c974 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java @@ -0,0 +1,20 @@ +package com.aizuda.easy.retry.server.job.task.dto; + +import lombok.Data; + +/** + * @author xiaowoniu + * @date 2023-12-21 22:25:11 + * @since 2.6.0 + */ +@Data +public class WorkflowTaskPrepareDTO { + + private Long workflowId; + + /** + * 触发类似 1、auto 2、manual + */ + private Integer triggerType; + +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java index f702089e9..11a9f950d 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java @@ -23,6 +23,7 @@ import java.util.List; /** * @author: www.byteblogs.com * @date : 2021-11-26 15:22 + * @since : 2.5.0 */ @Mapper public interface JobTaskConverter { @@ -32,7 +33,7 @@ public interface JobTaskConverter { @Mappings( @Mapping(source = "id", target = "jobId") ) - JobTaskPrepareDTO toJobTaskPrepare(JobPartitionTask job); + JobTaskPrepareDTO toJobTaskPrepare(JobPartitionTaskDTO job); @Mappings( @Mapping(source = "id", target = "jobId") @@ -55,14 +56,10 @@ public interface JobTaskConverter { JobLogMessage toJobLogMessage(JobLogDTO jobLogDTO); - JobLogDTO toJobLogDTO(JobExecutorContext context); - JobLogDTO toJobLogDTO(JobExecutorResultDTO resultDTO); JobLogDTO toJobLogDTO(BaseDTO baseDTO); - JobLogDTO toJobLogDTO(DispatchJobResultRequest request); - ClientCallbackContext toClientCallbackContext(DispatchJobResultRequest request); ClientCallbackContext toClientCallbackContext(RealJobExecutorDTO request); @@ -92,8 +89,8 @@ public interface JobTaskConverter { RealStopTaskInstanceDTO toRealStopTaskInstanceDTO(TaskStopJobContext context); - List toJobPartitionTasks(List jobs); + List toJobPartitionTasks(List jobs); - List toJobTaskBatchPartitionTasks(List jobTaskBatches); + List toJobTaskBatchPartitionTasks(List jobTaskBatches); } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java new file mode 100644 index 000000000..1ee0836fc --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java @@ -0,0 +1,24 @@ +package com.aizuda.easy.retry.server.job.task.support; + +import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTaskDTO; +import com.aizuda.easy.retry.server.job.task.dto.WorkflowPartitionTaskDTO; +import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO; +import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow; +import org.mapstruct.Mapper; +import org.mapstruct.factory.Mappers; + +import java.util.List; + +/** + * @author xiaowoniu + * @date 2023-12-21 22:04:19 + * @since 2.6.0 + */ +@Mapper +public interface WorkflowTaskConverter { + WorkflowTaskConverter INSTANCE = Mappers.getMapper(WorkflowTaskConverter.class); + + List toWorkflowPartitionTaskList(List workflowList); + + WorkflowTaskPrepareDTO toWorkflowTaskPrepareDTO(WorkflowPartitionTaskDTO workflowPartitionTaskDTO); +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java index 0514830e8..dceefa181 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java @@ -16,7 +16,7 @@ import com.aizuda.easy.retry.server.common.strategy.WaitStrategies; import com.aizuda.easy.retry.server.common.util.DateUtils; import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; -import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTask; +import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTaskDTO; import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; @@ -81,7 +81,7 @@ public class ScanJobTaskActor extends AbstractActor { List waitExecJobs = new ArrayList<>(); long now = DateUtils.toNowMilli(); for (PartitionTask partitionTask : partitionTasks) { - processJob((JobPartitionTask) partitionTask, waitUpdateJobs, waitExecJobs, now); + processJob((JobPartitionTaskDTO) partitionTask, waitUpdateJobs, waitExecJobs, now); } // 批量更新 @@ -95,7 +95,7 @@ public class ScanJobTaskActor extends AbstractActor { } } - private void processJob(JobPartitionTask partitionTask, final List waitUpdateJobs, + private void processJob(JobPartitionTaskDTO partitionTask, final List waitUpdateJobs, final List waitExecJobs, long now) { CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName(), partitionTask.getNamespaceId()); @@ -130,11 +130,11 @@ public class ScanJobTaskActor extends AbstractActor { /** * 需要重新计算触发时间的条件 1、不是常驻任务 2、常驻任务缓存的触发任务为空 3、常驻任务中的触发时间不是最新的 */ - private static boolean needCalculateNextTriggerTime(JobPartitionTask partitionTask) { + private static boolean needCalculateNextTriggerTime(JobPartitionTaskDTO partitionTask) { return !Objects.equals(StatusEnum.YES.getStatus(), partitionTask.getResident()); } - private Long calculateNextTriggerTime(JobPartitionTask partitionTask, long now) { + private Long calculateNextTriggerTime(JobPartitionTaskDTO partitionTask, long now) { long nextTriggerAt = partitionTask.getNextTriggerAt(); if ((nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) { @@ -151,7 +151,7 @@ public class ScanJobTaskActor extends AbstractActor { return waitStrategy.computeTriggerTime(waitStrategyContext); } - private List listAvailableJobs(Long startId, ScanTask scanTask) { + private List listAvailableJobs(Long startId, ScanTask scanTask) { if (CollectionUtils.isEmpty(scanTask.getBuckets())) { return Collections.emptyList(); } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java new file mode 100644 index 000000000..9ac2c0407 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java @@ -0,0 +1,145 @@ +package com.aizuda.easy.retry.server.job.task.support.dispatch; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import com.aizuda.easy.retry.common.core.constant.SystemConstants; +import com.aizuda.easy.retry.common.core.enums.StatusEnum; +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.common.WaitStrategy; +import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup; +import com.aizuda.easy.retry.server.common.config.SystemProperties; +import com.aizuda.easy.retry.server.common.dto.PartitionTask; +import com.aizuda.easy.retry.server.common.dto.ScanTask; +import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum; +import com.aizuda.easy.retry.server.common.strategy.WaitStrategies; +import com.aizuda.easy.retry.server.common.util.DateUtils; +import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils; +import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTaskDTO; +import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; +import com.aizuda.easy.retry.server.job.task.dto.WorkflowPartitionTaskDTO; +import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO; +import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; +import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter; +import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.Job; +import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * @author xiaowoniu + * @date 2023-12-21 21:15:29 + * @since 2.6.0 + */ +@Component(ActorGenerator.SCAN_WORKFLOW_ACTOR) +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +@Slf4j +@RequiredArgsConstructor +public class ScanWorkflowTaskActor extends AbstractActor { + private final WorkflowMapper workflowMapper; + private final SystemProperties systemProperties; + + @Override + public Receive createReceive() { + return receiveBuilder().match(ScanTask.class, config -> { + + try { + doScan(config); + } catch (Exception e) { + LogUtils.error(log, "Data scanner processing exception. [{}]", config, e); + } + + }).build(); + } + + private void doScan(ScanTask scanTask) { + long total = PartitionTaskUtils.process(startId -> listAvailableJobs(startId, scanTask), + this::processPartitionTasks, 0); + } + + private void processPartitionTasks(List partitionTasks) { + List waitUpdateJobs = new ArrayList<>(); + List waitExecWorkflows = new ArrayList<>(); + long now = DateUtils.toNowMilli(); + for (PartitionTask partitionTask : partitionTasks) { + WorkflowPartitionTaskDTO workflowPartitionTaskDTO = (WorkflowPartitionTaskDTO) partitionTask; + processJob(workflowPartitionTaskDTO, waitUpdateJobs, waitExecWorkflows, now); + } + + // 批量更新 + workflowMapper.updateBatchNextTriggerAtById(waitUpdateJobs); + + for (final WorkflowTaskPrepareDTO waitExecTask : waitExecWorkflows) { + // 执行预处理阶段 + ActorRef actorRef = ActorGenerator.jobTaskPrepareActor(); + waitExecTask.setTriggerType(JobTriggerTypeEnum.AUTO.getType()); + actorRef.tell(waitExecTask, actorRef); + } + } + + private void processJob(WorkflowPartitionTaskDTO partitionTask, List waitUpdateWorkflows, + List waitExecJobs, long now) { + CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName(), partitionTask.getNamespaceId()); + + Workflow workflow = new Workflow(); + workflow.setId(partitionTask.getId()); + + // 更新下次触发时间 + Long nextTriggerAt = calculateNextTriggerTime(partitionTask, now); + workflow.setNextTriggerAt(nextTriggerAt); + waitUpdateWorkflows.add(workflow); + waitExecJobs.add(WorkflowTaskConverter.INSTANCE.toWorkflowTaskPrepareDTO(partitionTask)); + + } + + private Long calculateNextTriggerTime(WorkflowPartitionTaskDTO partitionTask, long now) { + + long nextTriggerAt = partitionTask.getNextTriggerAt(); + if ((nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) { + nextTriggerAt = now; + partitionTask.setNextTriggerAt(nextTriggerAt); + } + + // 更新下次触发时间 + WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(partitionTask.getTriggerType()); + WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext(); + waitStrategyContext.setTriggerInterval(partitionTask.getTriggerInterval()); + waitStrategyContext.setNextTriggerAt(nextTriggerAt); + + return waitStrategy.computeTriggerTime(waitStrategyContext); + } + + private List listAvailableJobs(Long startId, ScanTask scanTask) { + if (CollectionUtils.isEmpty(scanTask.getBuckets())) { + return Collections.emptyList(); + } + + List workflows = workflowMapper.selectPage(new PageDTO<>(0, systemProperties.getJobPullPageSize()), + new LambdaQueryWrapper() + .select(Workflow::getGroupName, Workflow::getNextTriggerAt, Workflow::getTriggerType, + Workflow::getTriggerInterval, Workflow::getExecutorTimeout, + Workflow::getId, Workflow::getNamespaceId) + .eq(Workflow::getWorkflowStatus, StatusEnum.YES.getStatus()) + .eq(Workflow::getDeleted, StatusEnum.NO.getStatus()) + .in(Workflow::getBucketIndex, scanTask.getBuckets()) + .le(Workflow::getNextTriggerAt, DateUtils.toNowMilli() + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) + .ge(Workflow::getId, startId) + .orderByAsc(Workflow::getId) + ).getRecords(); + + return WorkflowTaskConverter.INSTANCE.toWorkflowPartitionTaskList(workflows); + } +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowTaskPrepareActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowTaskPrepareActor.java new file mode 100644 index 000000000..67159c8c9 --- /dev/null +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowTaskPrepareActor.java @@ -0,0 +1,59 @@ +package com.aizuda.easy.retry.server.job.task.support.dispatch; + +import akka.actor.AbstractActor; +import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; +import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.util.List; + +/** + * @author xiaowoniu + * @date 2023-12-21 22:41:29 + * @since 2.6.0 + */ +@Component(ActorGenerator.WORKFLOW_TASK_PREPARE_ACTOR) +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +@Slf4j +@RequiredArgsConstructor +public class WorkflowTaskPrepareActor extends AbstractActor { + private final WorkflowTaskBatchMapper workflowTaskBatchMapper; + @Override + public Receive createReceive() { + return receiveBuilder().match(WorkflowTaskPrepareDTO.class, workflowTaskPrepareDTO -> { + try { + doPrepare(workflowTaskPrepareDTO); + } catch (Exception e) { + log.error("预处理节点异常", e); + } finally { + getContext().stop(getSelf()); + } + }).build(); + } + + private void doPrepare(WorkflowTaskPrepareDTO workflowTaskPrepareDTO) { + List workflowTaskBatches = workflowTaskBatchMapper.selectList(new LambdaQueryWrapper() + .eq(WorkflowTaskBatch::getWorkflowId, workflowTaskPrepareDTO.getWorkflowId()) + .eq(WorkflowTaskBatch::getTaskBatchStatus, 0)); + + // 则直接创建一个任务批次 + if (CollectionUtils.isEmpty(workflowTaskBatches)) { + + } else { + // 判断任务是否执行超时 + // 任务是否为发起调用 + } + } + +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobClearLogSchedule.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobClearLogSchedule.java index 9cd6a56b2..585ebfe02 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobClearLogSchedule.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobClearLogSchedule.java @@ -6,7 +6,7 @@ import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.dto.PartitionTask; import com.aizuda.easy.retry.server.common.schedule.AbstractSchedule; import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils; -import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTask; +import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTaskDTO; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobLogMessageMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; @@ -97,7 +97,7 @@ public class JobClearLogSchedule extends AbstractSchedule implements Lifecycle { * @param endTime * @return */ - private List jobTaskBatchList(Long startId, LocalDateTime endTime) { + private List jobTaskBatchList(Long startId, LocalDateTime endTime) { List jobTaskBatchList = jobTaskBatchMapper.selectPage( new Page<>(0, 1000), diff --git a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java index 967a638b3..2af47c610 100644 --- a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java +++ b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java @@ -19,6 +19,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.google.common.cache.Cache; import com.google.common.util.concurrent.RateLimiter; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; @@ -40,14 +41,13 @@ import java.util.Objects; @Component(ActorGenerator.SCAN_BUCKET_ACTOR) @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) @Slf4j +@RequiredArgsConstructor public class ConsumerBucketActor extends AbstractActor { - - @Autowired - protected AccessTemplate accessTemplate; - @Autowired - protected ServerNodeMapper serverNodeMapper; - @Autowired - protected SystemProperties systemProperties; + private final AccessTemplate accessTemplate; + private final ServerNodeMapper serverNodeMapper; + private final SystemProperties systemProperties; + private static final String DEFAULT_JOB_KEY = "DEFAULT_JOB_KEY"; + private static final String DEFAULT_WORKFLOW_KEY = "DEFAULT_JOB_KEY"; @Override public Receive createReceive() { @@ -68,11 +68,17 @@ public class ConsumerBucketActor extends AbstractActor { } if (SystemModeEnum.isJob(systemProperties.getMode())) { - // 扫描回调数据 + ScanTask scanTask = new ScanTask(); scanTask.setBuckets(consumerBucket.getBuckets()); - ActorRef scanJobActorRef = cacheActorRef("DEFAULT_JOB_KEY", TaskTypeEnum.JOB); + + // 扫描定时任务数据 + ActorRef scanJobActorRef = cacheActorRef(DEFAULT_JOB_KEY, TaskTypeEnum.JOB); scanJobActorRef.tell(scanTask, scanJobActorRef); + + // 扫描DAG工作流任务数据 + ActorRef scanWorkflowActorRef = cacheActorRef(DEFAULT_WORKFLOW_KEY, TaskTypeEnum.WORKFLOW); + scanJobActorRef.tell(scanTask, scanWorkflowActorRef); } if (SystemModeEnum.isRetry(systemProperties.getMode())) {