From bd24ffae0c167201db0d82993c6b6a5b7b8ba3d4 Mon Sep 17 00:00:00 2001
From: byteblogs168 <598092184@qq.com>
Date: Thu, 21 Dec 2023 22:58:17 +0800
Subject: [PATCH] =?UTF-8?q?feat:=202.6.0=201.=20=E5=AE=8C=E6=88=90DAG?=
=?UTF-8?q?=E4=BB=BB=E5=8A=A1=E6=89=AB=E6=8F=8F?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
doc/sql/easy_retry_mysql.sql | 1 +
.../persistence/mapper/WorkflowMapper.java | 5 +
.../datasource/persistence/po/Workflow.java | 5 +
.../resources/mysql/mapper/WorkflowMapper.xml | 16 +-
.../server/common/akka/ActorGenerator.java | 13 ++
.../server/common/enums/TaskTypeEnum.java | 1 +
...tionTask.java => JobPartitionTaskDTO.java} | 4 +-
.../task/dto/WorkflowPartitionTaskDTO.java | 51 ++++++
.../job/task/dto/WorkflowTaskPrepareDTO.java | 20 +++
.../job/task/support/JobTaskConverter.java | 11 +-
.../task/support/WorkflowTaskConverter.java | 24 +++
.../support/dispatch/ScanJobTaskActor.java | 12 +-
.../dispatch/ScanWorkflowTaskActor.java | 145 ++++++++++++++++++
.../dispatch/WorkflowTaskPrepareActor.java | 59 +++++++
.../support/schedule/JobClearLogSchedule.java | 4 +-
.../starter/dispatch/ConsumerBucketActor.java | 24 +--
16 files changed, 367 insertions(+), 28 deletions(-)
rename easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/{JobPartitionTask.java => JobPartitionTaskDTO.java} (91%)
create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowPartitionTaskDTO.java
create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java
create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java
create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java
create mode 100644 easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowTaskPrepareActor.java
diff --git a/doc/sql/easy_retry_mysql.sql b/doc/sql/easy_retry_mysql.sql
index 0459ae4e7..940ef7d85 100644
--- a/doc/sql/easy_retry_mysql.sql
+++ b/doc/sql/easy_retry_mysql.sql
@@ -453,6 +453,7 @@ CREATE TABLE `workflow`
`executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述',
`flow_info` text DEFAULT NULL COMMENT '流程信息',
+ `bucket_index` int(11) NOT NULL DEFAULT '0' COMMENT 'bucket',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/WorkflowMapper.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/WorkflowMapper.java
index 4d727ad0c..2ad1b37f1 100644
--- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/WorkflowMapper.java
+++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/WorkflowMapper.java
@@ -1,8 +1,12 @@
package com.aizuda.easy.retry.template.datasource.persistence.mapper;
+import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
+import org.apache.ibatis.annotations.Param;
+
+import java.util.List;
/**
*
@@ -16,4 +20,5 @@ import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface WorkflowMapper extends BaseMapper {
+ int updateBatchNextTriggerAtById(@Param("list") List list);
}
diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Workflow.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Workflow.java
index c33c7a9a6..7871f1538 100644
--- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Workflow.java
+++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Workflow.java
@@ -75,6 +75,11 @@ public class Workflow implements Serializable {
*/
private String flowInfo;
+ /**
+ * bucket
+ */
+ private Integer bucketIndex;
+
/**
* 描述
*/
diff --git a/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/WorkflowMapper.xml b/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/WorkflowMapper.xml
index 54ca92249..d957fb80a 100644
--- a/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/WorkflowMapper.xml
+++ b/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/WorkflowMapper.xml
@@ -8,12 +8,26 @@
-
+
+
+ update workflow rt,
+ (
+
+ select
+ #{item.nextTriggerAt} as next_trigger_at,
+ #{item.id} as id
+
+ ) tt
+ set
+ rt.next_trigger_at = tt.next_trigger_at
+ where rt.id = tt.id
+
+
diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java
index 92b854649..a8269e320 100644
--- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java
+++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/akka/ActorGenerator.java
@@ -40,7 +40,9 @@ public class ActorGenerator {
/*----------------------------------------分布式任务调度 START----------------------------------------*/
public static final String SCAN_JOB_ACTOR = "ScanJobActor";
+ public static final String SCAN_WORKFLOW_ACTOR = "ScanWorkflowTaskActor";
public static final String JOB_TASK_PREPARE_ACTOR = "JobTaskPrepareActor";
+ public static final String WORKFLOW_TASK_PREPARE_ACTOR = "WorkflowTaskPrepareActor";
public static final String JOB_EXECUTOR_ACTOR = "JobExecutorActor";
public static final String JOB_EXECUTOR_RESULT_ACTOR = "JobExecutorResultActor";
public static final String JOB_LOG_ACTOR = "JobLogActor";
@@ -139,6 +141,17 @@ public class ActorGenerator {
.withDispatcher(COMMON_SCAN_TASK_DISPATCHER));
}
+ /**
+ * 生成扫描工作流任务的actor
+ *
+ * @return actor 引用
+ */
+ public static ActorRef scanWorkflowActor() {
+ return getCommonActorSystemSystem().actorOf(getSpringExtension()
+ .props(SCAN_WORKFLOW_ACTOR)
+ .withDispatcher(COMMON_SCAN_TASK_DISPATCHER));
+ }
+
/**
* 生成扫描重试数据的actor
*
diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/TaskTypeEnum.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/TaskTypeEnum.java
index bf4ed483d..4fe8d2fbe 100644
--- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/TaskTypeEnum.java
+++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/TaskTypeEnum.java
@@ -18,6 +18,7 @@ public enum TaskTypeEnum {
RETRY(1, ActorGenerator::scanGroupActor),
CALLBACK(2, ActorGenerator::scanCallbackGroupActor),
JOB(3, ActorGenerator::scanJobActor),
+ WORKFLOW(4, ActorGenerator::scanWorkflowActor),
;
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTask.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTaskDTO.java
similarity index 91%
rename from easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTask.java
rename to easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTaskDTO.java
index 9e4150bd5..d0d05dc14 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTask.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/JobPartitionTaskDTO.java
@@ -4,15 +4,13 @@ import com.aizuda.easy.retry.server.common.dto.PartitionTask;
import lombok.Data;
import lombok.EqualsAndHashCode;
-import java.time.LocalDateTime;
-
/**
* @author: www.byteblogs.com
* @date : 2023-10-10 17:52
*/
@EqualsAndHashCode(callSuper = true)
@Data
-public class JobPartitionTask extends PartitionTask {
+public class JobPartitionTaskDTO extends PartitionTask {
private String namespaceId;
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowPartitionTaskDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowPartitionTaskDTO.java
new file mode 100644
index 000000000..db3e20470
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowPartitionTaskDTO.java
@@ -0,0 +1,51 @@
+package com.aizuda.easy.retry.server.job.task.dto;
+
+import com.aizuda.easy.retry.server.common.dto.PartitionTask;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+/**
+ * @author xiaowoniu
+ * @date 2023-12-21 21:38:52
+ * @since 2.6.0
+ */
+@EqualsAndHashCode(callSuper = true)
+@Data
+public class WorkflowPartitionTaskDTO extends PartitionTask {
+
+ /**
+ * 命名空间id
+ */
+ private String namespaceId;
+
+ /**
+ * 组名称
+ */
+ private String groupName;
+
+ /**
+ * 触发类型
+ */
+ private Integer triggerType;
+
+ /**
+ * 触发间隔
+ */
+ private String triggerInterval;
+
+ /**
+ * 执行超时时间
+ */
+ private Integer executorTimeout;
+
+ /**
+ * 任务执行时间
+ */
+ private Long nextTriggerAt;
+
+ /**
+ * 流程信息
+ */
+ private String flowInfo;
+
+}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java
new file mode 100644
index 000000000..50652c974
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/dto/WorkflowTaskPrepareDTO.java
@@ -0,0 +1,20 @@
+package com.aizuda.easy.retry.server.job.task.dto;
+
+import lombok.Data;
+
+/**
+ * @author xiaowoniu
+ * @date 2023-12-21 22:25:11
+ * @since 2.6.0
+ */
+@Data
+public class WorkflowTaskPrepareDTO {
+
+ private Long workflowId;
+
+ /**
+ * 触发类似 1、auto 2、manual
+ */
+ private Integer triggerType;
+
+}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java
index f702089e9..11a9f950d 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java
@@ -23,6 +23,7 @@ import java.util.List;
/**
* @author: www.byteblogs.com
* @date : 2021-11-26 15:22
+ * @since : 2.5.0
*/
@Mapper
public interface JobTaskConverter {
@@ -32,7 +33,7 @@ public interface JobTaskConverter {
@Mappings(
@Mapping(source = "id", target = "jobId")
)
- JobTaskPrepareDTO toJobTaskPrepare(JobPartitionTask job);
+ JobTaskPrepareDTO toJobTaskPrepare(JobPartitionTaskDTO job);
@Mappings(
@Mapping(source = "id", target = "jobId")
@@ -55,14 +56,10 @@ public interface JobTaskConverter {
JobLogMessage toJobLogMessage(JobLogDTO jobLogDTO);
- JobLogDTO toJobLogDTO(JobExecutorContext context);
-
JobLogDTO toJobLogDTO(JobExecutorResultDTO resultDTO);
JobLogDTO toJobLogDTO(BaseDTO baseDTO);
- JobLogDTO toJobLogDTO(DispatchJobResultRequest request);
-
ClientCallbackContext toClientCallbackContext(DispatchJobResultRequest request);
ClientCallbackContext toClientCallbackContext(RealJobExecutorDTO request);
@@ -92,8 +89,8 @@ public interface JobTaskConverter {
RealStopTaskInstanceDTO toRealStopTaskInstanceDTO(TaskStopJobContext context);
- List toJobPartitionTasks(List jobs);
+ List toJobPartitionTasks(List jobs);
- List toJobTaskBatchPartitionTasks(List jobTaskBatches);
+ List toJobTaskBatchPartitionTasks(List jobTaskBatches);
}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java
new file mode 100644
index 000000000..1ee0836fc
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/WorkflowTaskConverter.java
@@ -0,0 +1,24 @@
+package com.aizuda.easy.retry.server.job.task.support;
+
+import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTaskDTO;
+import com.aizuda.easy.retry.server.job.task.dto.WorkflowPartitionTaskDTO;
+import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
+import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow;
+import org.mapstruct.Mapper;
+import org.mapstruct.factory.Mappers;
+
+import java.util.List;
+
+/**
+ * @author xiaowoniu
+ * @date 2023-12-21 22:04:19
+ * @since 2.6.0
+ */
+@Mapper
+public interface WorkflowTaskConverter {
+ WorkflowTaskConverter INSTANCE = Mappers.getMapper(WorkflowTaskConverter.class);
+
+ List toWorkflowPartitionTaskList(List workflowList);
+
+ WorkflowTaskPrepareDTO toWorkflowTaskPrepareDTO(WorkflowPartitionTaskDTO workflowPartitionTaskDTO);
+}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java
index 0514830e8..dceefa181 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java
@@ -16,7 +16,7 @@ import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
-import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTask;
+import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
@@ -81,7 +81,7 @@ public class ScanJobTaskActor extends AbstractActor {
List waitExecJobs = new ArrayList<>();
long now = DateUtils.toNowMilli();
for (PartitionTask partitionTask : partitionTasks) {
- processJob((JobPartitionTask) partitionTask, waitUpdateJobs, waitExecJobs, now);
+ processJob((JobPartitionTaskDTO) partitionTask, waitUpdateJobs, waitExecJobs, now);
}
// 批量更新
@@ -95,7 +95,7 @@ public class ScanJobTaskActor extends AbstractActor {
}
}
- private void processJob(JobPartitionTask partitionTask, final List waitUpdateJobs,
+ private void processJob(JobPartitionTaskDTO partitionTask, final List waitUpdateJobs,
final List waitExecJobs, long now) {
CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName(), partitionTask.getNamespaceId());
@@ -130,11 +130,11 @@ public class ScanJobTaskActor extends AbstractActor {
/**
* 需要重新计算触发时间的条件 1、不是常驻任务 2、常驻任务缓存的触发任务为空 3、常驻任务中的触发时间不是最新的
*/
- private static boolean needCalculateNextTriggerTime(JobPartitionTask partitionTask) {
+ private static boolean needCalculateNextTriggerTime(JobPartitionTaskDTO partitionTask) {
return !Objects.equals(StatusEnum.YES.getStatus(), partitionTask.getResident());
}
- private Long calculateNextTriggerTime(JobPartitionTask partitionTask, long now) {
+ private Long calculateNextTriggerTime(JobPartitionTaskDTO partitionTask, long now) {
long nextTriggerAt = partitionTask.getNextTriggerAt();
if ((nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) {
@@ -151,7 +151,7 @@ public class ScanJobTaskActor extends AbstractActor {
return waitStrategy.computeTriggerTime(waitStrategyContext);
}
- private List listAvailableJobs(Long startId, ScanTask scanTask) {
+ private List listAvailableJobs(Long startId, ScanTask scanTask) {
if (CollectionUtils.isEmpty(scanTask.getBuckets())) {
return Collections.emptyList();
}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java
new file mode 100644
index 000000000..9ac2c0407
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java
@@ -0,0 +1,145 @@
+package com.aizuda.easy.retry.server.job.task.support.dispatch;
+
+import akka.actor.AbstractActor;
+import akka.actor.ActorRef;
+import com.aizuda.easy.retry.common.core.constant.SystemConstants;
+import com.aizuda.easy.retry.common.core.enums.StatusEnum;
+import com.aizuda.easy.retry.common.core.log.LogUtils;
+import com.aizuda.easy.retry.server.common.WaitStrategy;
+import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
+import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup;
+import com.aizuda.easy.retry.server.common.config.SystemProperties;
+import com.aizuda.easy.retry.server.common.dto.PartitionTask;
+import com.aizuda.easy.retry.server.common.dto.ScanTask;
+import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
+import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
+import com.aizuda.easy.retry.server.common.util.DateUtils;
+import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
+import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTaskDTO;
+import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
+import com.aizuda.easy.retry.server.job.task.dto.WorkflowPartitionTaskDTO;
+import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
+import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
+import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
+import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache;
+import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowMapper;
+import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
+import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * @author xiaowoniu
+ * @date 2023-12-21 21:15:29
+ * @since 2.6.0
+ */
+@Component(ActorGenerator.SCAN_WORKFLOW_ACTOR)
+@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
+@Slf4j
+@RequiredArgsConstructor
+public class ScanWorkflowTaskActor extends AbstractActor {
+ private final WorkflowMapper workflowMapper;
+ private final SystemProperties systemProperties;
+
+ @Override
+ public Receive createReceive() {
+ return receiveBuilder().match(ScanTask.class, config -> {
+
+ try {
+ doScan(config);
+ } catch (Exception e) {
+ LogUtils.error(log, "Data scanner processing exception. [{}]", config, e);
+ }
+
+ }).build();
+ }
+
+ private void doScan(ScanTask scanTask) {
+ long total = PartitionTaskUtils.process(startId -> listAvailableJobs(startId, scanTask),
+ this::processPartitionTasks, 0);
+ }
+
+ private void processPartitionTasks(List extends PartitionTask> partitionTasks) {
+ List waitUpdateJobs = new ArrayList<>();
+ List waitExecWorkflows = new ArrayList<>();
+ long now = DateUtils.toNowMilli();
+ for (PartitionTask partitionTask : partitionTasks) {
+ WorkflowPartitionTaskDTO workflowPartitionTaskDTO = (WorkflowPartitionTaskDTO) partitionTask;
+ processJob(workflowPartitionTaskDTO, waitUpdateJobs, waitExecWorkflows, now);
+ }
+
+ // 批量更新
+ workflowMapper.updateBatchNextTriggerAtById(waitUpdateJobs);
+
+ for (final WorkflowTaskPrepareDTO waitExecTask : waitExecWorkflows) {
+ // 执行预处理阶段
+ ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
+ waitExecTask.setTriggerType(JobTriggerTypeEnum.AUTO.getType());
+ actorRef.tell(waitExecTask, actorRef);
+ }
+ }
+
+ private void processJob(WorkflowPartitionTaskDTO partitionTask, List waitUpdateWorkflows,
+ List waitExecJobs, long now) {
+ CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName(), partitionTask.getNamespaceId());
+
+ Workflow workflow = new Workflow();
+ workflow.setId(partitionTask.getId());
+
+ // 更新下次触发时间
+ Long nextTriggerAt = calculateNextTriggerTime(partitionTask, now);
+ workflow.setNextTriggerAt(nextTriggerAt);
+ waitUpdateWorkflows.add(workflow);
+ waitExecJobs.add(WorkflowTaskConverter.INSTANCE.toWorkflowTaskPrepareDTO(partitionTask));
+
+ }
+
+ private Long calculateNextTriggerTime(WorkflowPartitionTaskDTO partitionTask, long now) {
+
+ long nextTriggerAt = partitionTask.getNextTriggerAt();
+ if ((nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) {
+ nextTriggerAt = now;
+ partitionTask.setNextTriggerAt(nextTriggerAt);
+ }
+
+ // 更新下次触发时间
+ WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(partitionTask.getTriggerType());
+ WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
+ waitStrategyContext.setTriggerInterval(partitionTask.getTriggerInterval());
+ waitStrategyContext.setNextTriggerAt(nextTriggerAt);
+
+ return waitStrategy.computeTriggerTime(waitStrategyContext);
+ }
+
+ private List listAvailableJobs(Long startId, ScanTask scanTask) {
+ if (CollectionUtils.isEmpty(scanTask.getBuckets())) {
+ return Collections.emptyList();
+ }
+
+ List workflows = workflowMapper.selectPage(new PageDTO<>(0, systemProperties.getJobPullPageSize()),
+ new LambdaQueryWrapper()
+ .select(Workflow::getGroupName, Workflow::getNextTriggerAt, Workflow::getTriggerType,
+ Workflow::getTriggerInterval, Workflow::getExecutorTimeout,
+ Workflow::getId, Workflow::getNamespaceId)
+ .eq(Workflow::getWorkflowStatus, StatusEnum.YES.getStatus())
+ .eq(Workflow::getDeleted, StatusEnum.NO.getStatus())
+ .in(Workflow::getBucketIndex, scanTask.getBuckets())
+ .le(Workflow::getNextTriggerAt, DateUtils.toNowMilli() + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD))
+ .ge(Workflow::getId, startId)
+ .orderByAsc(Workflow::getId)
+ ).getRecords();
+
+ return WorkflowTaskConverter.INSTANCE.toWorkflowPartitionTaskList(workflows);
+ }
+}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowTaskPrepareActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowTaskPrepareActor.java
new file mode 100644
index 000000000..67159c8c9
--- /dev/null
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/WorkflowTaskPrepareActor.java
@@ -0,0 +1,59 @@
+package com.aizuda.easy.retry.server.job.task.support.dispatch;
+
+import akka.actor.AbstractActor;
+import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
+import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
+import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
+import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
+import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
+import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import java.util.List;
+
+/**
+ * @author xiaowoniu
+ * @date 2023-12-21 22:41:29
+ * @since 2.6.0
+ */
+@Component(ActorGenerator.WORKFLOW_TASK_PREPARE_ACTOR)
+@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
+@Slf4j
+@RequiredArgsConstructor
+public class WorkflowTaskPrepareActor extends AbstractActor {
+ private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
+ @Override
+ public Receive createReceive() {
+ return receiveBuilder().match(WorkflowTaskPrepareDTO.class, workflowTaskPrepareDTO -> {
+ try {
+ doPrepare(workflowTaskPrepareDTO);
+ } catch (Exception e) {
+ log.error("预处理节点异常", e);
+ } finally {
+ getContext().stop(getSelf());
+ }
+ }).build();
+ }
+
+ private void doPrepare(WorkflowTaskPrepareDTO workflowTaskPrepareDTO) {
+ List workflowTaskBatches = workflowTaskBatchMapper.selectList(new LambdaQueryWrapper()
+ .eq(WorkflowTaskBatch::getWorkflowId, workflowTaskPrepareDTO.getWorkflowId())
+ .eq(WorkflowTaskBatch::getTaskBatchStatus, 0));
+
+ // 则直接创建一个任务批次
+ if (CollectionUtils.isEmpty(workflowTaskBatches)) {
+
+ } else {
+ // 判断任务是否执行超时
+ // 任务是否为发起调用
+ }
+ }
+
+}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobClearLogSchedule.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobClearLogSchedule.java
index 9cd6a56b2..585ebfe02 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobClearLogSchedule.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobClearLogSchedule.java
@@ -6,7 +6,7 @@ import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
import com.aizuda.easy.retry.server.common.schedule.AbstractSchedule;
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
-import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTask;
+import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTaskDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobLogMessageMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
@@ -97,7 +97,7 @@ public class JobClearLogSchedule extends AbstractSchedule implements Lifecycle {
* @param endTime
* @return
*/
- private List jobTaskBatchList(Long startId, LocalDateTime endTime) {
+ private List jobTaskBatchList(Long startId, LocalDateTime endTime) {
List jobTaskBatchList = jobTaskBatchMapper.selectPage(
new Page<>(0, 1000),
diff --git a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java
index 967a638b3..2af47c610 100644
--- a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java
+++ b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java
@@ -19,6 +19,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.cache.Cache;
import com.google.common.util.concurrent.RateLimiter;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
@@ -40,14 +41,13 @@ import java.util.Objects;
@Component(ActorGenerator.SCAN_BUCKET_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
+@RequiredArgsConstructor
public class ConsumerBucketActor extends AbstractActor {
-
- @Autowired
- protected AccessTemplate accessTemplate;
- @Autowired
- protected ServerNodeMapper serverNodeMapper;
- @Autowired
- protected SystemProperties systemProperties;
+ private final AccessTemplate accessTemplate;
+ private final ServerNodeMapper serverNodeMapper;
+ private final SystemProperties systemProperties;
+ private static final String DEFAULT_JOB_KEY = "DEFAULT_JOB_KEY";
+ private static final String DEFAULT_WORKFLOW_KEY = "DEFAULT_JOB_KEY";
@Override
public Receive createReceive() {
@@ -68,11 +68,17 @@ public class ConsumerBucketActor extends AbstractActor {
}
if (SystemModeEnum.isJob(systemProperties.getMode())) {
- // 扫描回调数据
+
ScanTask scanTask = new ScanTask();
scanTask.setBuckets(consumerBucket.getBuckets());
- ActorRef scanJobActorRef = cacheActorRef("DEFAULT_JOB_KEY", TaskTypeEnum.JOB);
+
+ // 扫描定时任务数据
+ ActorRef scanJobActorRef = cacheActorRef(DEFAULT_JOB_KEY, TaskTypeEnum.JOB);
scanJobActorRef.tell(scanTask, scanJobActorRef);
+
+ // 扫描DAG工作流任务数据
+ ActorRef scanWorkflowActorRef = cacheActorRef(DEFAULT_WORKFLOW_KEY, TaskTypeEnum.WORKFLOW);
+ scanJobActorRef.tell(scanTask, scanWorkflowActorRef);
}
if (SystemModeEnum.isRetry(systemProperties.getMode())) {