feat: 2.6.0
1. 完成DAG任务扫描
This commit is contained in:
parent
0134c31105
commit
bd24ffae0c
@ -453,6 +453,7 @@ CREATE TABLE `workflow`
|
|||||||
`executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',
|
`executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',
|
||||||
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述',
|
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述',
|
||||||
`flow_info` text DEFAULT NULL COMMENT '流程信息',
|
`flow_info` text DEFAULT NULL COMMENT '流程信息',
|
||||||
|
`bucket_index` int(11) NOT NULL DEFAULT '0' COMMENT 'bucket',
|
||||||
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||||
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||||
`deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
|
`deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
|
||||||
|
@ -1,8 +1,12 @@
|
|||||||
package com.aizuda.easy.retry.template.datasource.persistence.mapper;
|
package com.aizuda.easy.retry.template.datasource.persistence.mapper;
|
||||||
|
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow;
|
||||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||||
import org.apache.ibatis.annotations.Mapper;
|
import org.apache.ibatis.annotations.Mapper;
|
||||||
|
import org.apache.ibatis.annotations.Param;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
@ -16,4 +20,5 @@ import org.apache.ibatis.annotations.Mapper;
|
|||||||
@Mapper
|
@Mapper
|
||||||
public interface WorkflowMapper extends BaseMapper<Workflow> {
|
public interface WorkflowMapper extends BaseMapper<Workflow> {
|
||||||
|
|
||||||
|
int updateBatchNextTriggerAtById(@Param("list") List<Workflow> list);
|
||||||
}
|
}
|
||||||
|
@ -75,6 +75,11 @@ public class Workflow implements Serializable {
|
|||||||
*/
|
*/
|
||||||
private String flowInfo;
|
private String flowInfo;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* bucket
|
||||||
|
*/
|
||||||
|
private Integer bucketIndex;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 描述
|
* 描述
|
||||||
*/
|
*/
|
||||||
|
@ -8,12 +8,26 @@
|
|||||||
<result column="namespace_id" property="namespaceId" />
|
<result column="namespace_id" property="namespaceId" />
|
||||||
<result column="group_name" property="groupName" />
|
<result column="group_name" property="groupName" />
|
||||||
<result column="workflow_status" property="workflowStatus" />
|
<result column="workflow_status" property="workflowStatus" />
|
||||||
<result column="execution_at" property="executionAt" />
|
<result column="next_trigger_at" property="nextTriggerAt" />
|
||||||
<result column="flow_info" property="flowInfo" />
|
<result column="flow_info" property="flowInfo" />
|
||||||
<result column="create_dt" property="createDt" />
|
<result column="create_dt" property="createDt" />
|
||||||
<result column="update_dt" property="updateDt" />
|
<result column="update_dt" property="updateDt" />
|
||||||
<result column="deleted" property="deleted" />
|
<result column="deleted" property="deleted" />
|
||||||
<result column="ext_attrs" property="extAttrs" />
|
<result column="ext_attrs" property="extAttrs" />
|
||||||
</resultMap>
|
</resultMap>
|
||||||
|
<update id="updateBatchNextTriggerAtById" parameterType="java.util.List">
|
||||||
|
update workflow rt,
|
||||||
|
(
|
||||||
|
<foreach collection="list" item="item" index="index" separator=" union all ">
|
||||||
|
select
|
||||||
|
#{item.nextTriggerAt} as next_trigger_at,
|
||||||
|
#{item.id} as id
|
||||||
|
</foreach>
|
||||||
|
) tt
|
||||||
|
set
|
||||||
|
rt.next_trigger_at = tt.next_trigger_at
|
||||||
|
where rt.id = tt.id
|
||||||
|
</update>
|
||||||
|
|
||||||
|
|
||||||
</mapper>
|
</mapper>
|
||||||
|
@ -40,7 +40,9 @@ public class ActorGenerator {
|
|||||||
|
|
||||||
/*----------------------------------------分布式任务调度 START----------------------------------------*/
|
/*----------------------------------------分布式任务调度 START----------------------------------------*/
|
||||||
public static final String SCAN_JOB_ACTOR = "ScanJobActor";
|
public static final String SCAN_JOB_ACTOR = "ScanJobActor";
|
||||||
|
public static final String SCAN_WORKFLOW_ACTOR = "ScanWorkflowTaskActor";
|
||||||
public static final String JOB_TASK_PREPARE_ACTOR = "JobTaskPrepareActor";
|
public static final String JOB_TASK_PREPARE_ACTOR = "JobTaskPrepareActor";
|
||||||
|
public static final String WORKFLOW_TASK_PREPARE_ACTOR = "WorkflowTaskPrepareActor";
|
||||||
public static final String JOB_EXECUTOR_ACTOR = "JobExecutorActor";
|
public static final String JOB_EXECUTOR_ACTOR = "JobExecutorActor";
|
||||||
public static final String JOB_EXECUTOR_RESULT_ACTOR = "JobExecutorResultActor";
|
public static final String JOB_EXECUTOR_RESULT_ACTOR = "JobExecutorResultActor";
|
||||||
public static final String JOB_LOG_ACTOR = "JobLogActor";
|
public static final String JOB_LOG_ACTOR = "JobLogActor";
|
||||||
@ -139,6 +141,17 @@ public class ActorGenerator {
|
|||||||
.withDispatcher(COMMON_SCAN_TASK_DISPATCHER));
|
.withDispatcher(COMMON_SCAN_TASK_DISPATCHER));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 生成扫描工作流任务的actor
|
||||||
|
*
|
||||||
|
* @return actor 引用
|
||||||
|
*/
|
||||||
|
public static ActorRef scanWorkflowActor() {
|
||||||
|
return getCommonActorSystemSystem().actorOf(getSpringExtension()
|
||||||
|
.props(SCAN_WORKFLOW_ACTOR)
|
||||||
|
.withDispatcher(COMMON_SCAN_TASK_DISPATCHER));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 生成扫描重试数据的actor
|
* 生成扫描重试数据的actor
|
||||||
*
|
*
|
||||||
|
@ -18,6 +18,7 @@ public enum TaskTypeEnum {
|
|||||||
RETRY(1, ActorGenerator::scanGroupActor),
|
RETRY(1, ActorGenerator::scanGroupActor),
|
||||||
CALLBACK(2, ActorGenerator::scanCallbackGroupActor),
|
CALLBACK(2, ActorGenerator::scanCallbackGroupActor),
|
||||||
JOB(3, ActorGenerator::scanJobActor),
|
JOB(3, ActorGenerator::scanJobActor),
|
||||||
|
WORKFLOW(4, ActorGenerator::scanWorkflowActor),
|
||||||
;
|
;
|
||||||
|
|
||||||
|
|
||||||
|
@ -4,15 +4,13 @@ import com.aizuda.easy.retry.server.common.dto.PartitionTask;
|
|||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.EqualsAndHashCode;
|
import lombok.EqualsAndHashCode;
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author: www.byteblogs.com
|
* @author: www.byteblogs.com
|
||||||
* @date : 2023-10-10 17:52
|
* @date : 2023-10-10 17:52
|
||||||
*/
|
*/
|
||||||
@EqualsAndHashCode(callSuper = true)
|
@EqualsAndHashCode(callSuper = true)
|
||||||
@Data
|
@Data
|
||||||
public class JobPartitionTask extends PartitionTask {
|
public class JobPartitionTaskDTO extends PartitionTask {
|
||||||
|
|
||||||
private String namespaceId;
|
private String namespaceId;
|
||||||
|
|
@ -0,0 +1,51 @@
|
|||||||
|
package com.aizuda.easy.retry.server.job.task.dto;
|
||||||
|
|
||||||
|
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.EqualsAndHashCode;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author xiaowoniu
|
||||||
|
* @date 2023-12-21 21:38:52
|
||||||
|
* @since 2.6.0
|
||||||
|
*/
|
||||||
|
@EqualsAndHashCode(callSuper = true)
|
||||||
|
@Data
|
||||||
|
public class WorkflowPartitionTaskDTO extends PartitionTask {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 命名空间id
|
||||||
|
*/
|
||||||
|
private String namespaceId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 组名称
|
||||||
|
*/
|
||||||
|
private String groupName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 触发类型
|
||||||
|
*/
|
||||||
|
private Integer triggerType;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 触发间隔
|
||||||
|
*/
|
||||||
|
private String triggerInterval;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 执行超时时间
|
||||||
|
*/
|
||||||
|
private Integer executorTimeout;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 任务执行时间
|
||||||
|
*/
|
||||||
|
private Long nextTriggerAt;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 流程信息
|
||||||
|
*/
|
||||||
|
private String flowInfo;
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,20 @@
|
|||||||
|
package com.aizuda.easy.retry.server.job.task.dto;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author xiaowoniu
|
||||||
|
* @date 2023-12-21 22:25:11
|
||||||
|
* @since 2.6.0
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class WorkflowTaskPrepareDTO {
|
||||||
|
|
||||||
|
private Long workflowId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 触发类似 1、auto 2、manual
|
||||||
|
*/
|
||||||
|
private Integer triggerType;
|
||||||
|
|
||||||
|
}
|
@ -23,6 +23,7 @@ import java.util.List;
|
|||||||
/**
|
/**
|
||||||
* @author: www.byteblogs.com
|
* @author: www.byteblogs.com
|
||||||
* @date : 2021-11-26 15:22
|
* @date : 2021-11-26 15:22
|
||||||
|
* @since : 2.5.0
|
||||||
*/
|
*/
|
||||||
@Mapper
|
@Mapper
|
||||||
public interface JobTaskConverter {
|
public interface JobTaskConverter {
|
||||||
@ -32,7 +33,7 @@ public interface JobTaskConverter {
|
|||||||
@Mappings(
|
@Mappings(
|
||||||
@Mapping(source = "id", target = "jobId")
|
@Mapping(source = "id", target = "jobId")
|
||||||
)
|
)
|
||||||
JobTaskPrepareDTO toJobTaskPrepare(JobPartitionTask job);
|
JobTaskPrepareDTO toJobTaskPrepare(JobPartitionTaskDTO job);
|
||||||
|
|
||||||
@Mappings(
|
@Mappings(
|
||||||
@Mapping(source = "id", target = "jobId")
|
@Mapping(source = "id", target = "jobId")
|
||||||
@ -55,14 +56,10 @@ public interface JobTaskConverter {
|
|||||||
|
|
||||||
JobLogMessage toJobLogMessage(JobLogDTO jobLogDTO);
|
JobLogMessage toJobLogMessage(JobLogDTO jobLogDTO);
|
||||||
|
|
||||||
JobLogDTO toJobLogDTO(JobExecutorContext context);
|
|
||||||
|
|
||||||
JobLogDTO toJobLogDTO(JobExecutorResultDTO resultDTO);
|
JobLogDTO toJobLogDTO(JobExecutorResultDTO resultDTO);
|
||||||
|
|
||||||
JobLogDTO toJobLogDTO(BaseDTO baseDTO);
|
JobLogDTO toJobLogDTO(BaseDTO baseDTO);
|
||||||
|
|
||||||
JobLogDTO toJobLogDTO(DispatchJobResultRequest request);
|
|
||||||
|
|
||||||
ClientCallbackContext toClientCallbackContext(DispatchJobResultRequest request);
|
ClientCallbackContext toClientCallbackContext(DispatchJobResultRequest request);
|
||||||
|
|
||||||
ClientCallbackContext toClientCallbackContext(RealJobExecutorDTO request);
|
ClientCallbackContext toClientCallbackContext(RealJobExecutorDTO request);
|
||||||
@ -92,8 +89,8 @@ public interface JobTaskConverter {
|
|||||||
|
|
||||||
RealStopTaskInstanceDTO toRealStopTaskInstanceDTO(TaskStopJobContext context);
|
RealStopTaskInstanceDTO toRealStopTaskInstanceDTO(TaskStopJobContext context);
|
||||||
|
|
||||||
List<JobPartitionTask> toJobPartitionTasks(List<Job> jobs);
|
List<JobPartitionTaskDTO> toJobPartitionTasks(List<Job> jobs);
|
||||||
|
|
||||||
List<JobPartitionTask> toJobTaskBatchPartitionTasks(List<JobTaskBatch> jobTaskBatches);
|
List<JobPartitionTaskDTO> toJobTaskBatchPartitionTasks(List<JobTaskBatch> jobTaskBatches);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,24 @@
|
|||||||
|
package com.aizuda.easy.retry.server.job.task.support;
|
||||||
|
|
||||||
|
import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTaskDTO;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.dto.WorkflowPartitionTaskDTO;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow;
|
||||||
|
import org.mapstruct.Mapper;
|
||||||
|
import org.mapstruct.factory.Mappers;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author xiaowoniu
|
||||||
|
* @date 2023-12-21 22:04:19
|
||||||
|
* @since 2.6.0
|
||||||
|
*/
|
||||||
|
@Mapper
|
||||||
|
public interface WorkflowTaskConverter {
|
||||||
|
WorkflowTaskConverter INSTANCE = Mappers.getMapper(WorkflowTaskConverter.class);
|
||||||
|
|
||||||
|
List<WorkflowPartitionTaskDTO> toWorkflowPartitionTaskList(List<Workflow> workflowList);
|
||||||
|
|
||||||
|
WorkflowTaskPrepareDTO toWorkflowTaskPrepareDTO(WorkflowPartitionTaskDTO workflowPartitionTaskDTO);
|
||||||
|
}
|
@ -16,7 +16,7 @@ import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
|
|||||||
import com.aizuda.easy.retry.server.common.util.DateUtils;
|
import com.aizuda.easy.retry.server.common.util.DateUtils;
|
||||||
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
|
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
|
||||||
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
|
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
|
||||||
import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTask;
|
import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTaskDTO;
|
||||||
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
|
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
|
||||||
import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache;
|
import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
|
||||||
@ -81,7 +81,7 @@ public class ScanJobTaskActor extends AbstractActor {
|
|||||||
List<JobTaskPrepareDTO> waitExecJobs = new ArrayList<>();
|
List<JobTaskPrepareDTO> waitExecJobs = new ArrayList<>();
|
||||||
long now = DateUtils.toNowMilli();
|
long now = DateUtils.toNowMilli();
|
||||||
for (PartitionTask partitionTask : partitionTasks) {
|
for (PartitionTask partitionTask : partitionTasks) {
|
||||||
processJob((JobPartitionTask) partitionTask, waitUpdateJobs, waitExecJobs, now);
|
processJob((JobPartitionTaskDTO) partitionTask, waitUpdateJobs, waitExecJobs, now);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 批量更新
|
// 批量更新
|
||||||
@ -95,7 +95,7 @@ public class ScanJobTaskActor extends AbstractActor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processJob(JobPartitionTask partitionTask, final List<Job> waitUpdateJobs,
|
private void processJob(JobPartitionTaskDTO partitionTask, final List<Job> waitUpdateJobs,
|
||||||
final List<JobTaskPrepareDTO> waitExecJobs, long now) {
|
final List<JobTaskPrepareDTO> waitExecJobs, long now) {
|
||||||
CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName(), partitionTask.getNamespaceId());
|
CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName(), partitionTask.getNamespaceId());
|
||||||
|
|
||||||
@ -130,11 +130,11 @@ public class ScanJobTaskActor extends AbstractActor {
|
|||||||
/**
|
/**
|
||||||
* 需要重新计算触发时间的条件 1、不是常驻任务 2、常驻任务缓存的触发任务为空 3、常驻任务中的触发时间不是最新的
|
* 需要重新计算触发时间的条件 1、不是常驻任务 2、常驻任务缓存的触发任务为空 3、常驻任务中的触发时间不是最新的
|
||||||
*/
|
*/
|
||||||
private static boolean needCalculateNextTriggerTime(JobPartitionTask partitionTask) {
|
private static boolean needCalculateNextTriggerTime(JobPartitionTaskDTO partitionTask) {
|
||||||
return !Objects.equals(StatusEnum.YES.getStatus(), partitionTask.getResident());
|
return !Objects.equals(StatusEnum.YES.getStatus(), partitionTask.getResident());
|
||||||
}
|
}
|
||||||
|
|
||||||
private Long calculateNextTriggerTime(JobPartitionTask partitionTask, long now) {
|
private Long calculateNextTriggerTime(JobPartitionTaskDTO partitionTask, long now) {
|
||||||
|
|
||||||
long nextTriggerAt = partitionTask.getNextTriggerAt();
|
long nextTriggerAt = partitionTask.getNextTriggerAt();
|
||||||
if ((nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) {
|
if ((nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) {
|
||||||
@ -151,7 +151,7 @@ public class ScanJobTaskActor extends AbstractActor {
|
|||||||
return waitStrategy.computeTriggerTime(waitStrategyContext);
|
return waitStrategy.computeTriggerTime(waitStrategyContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<JobPartitionTask> listAvailableJobs(Long startId, ScanTask scanTask) {
|
private List<JobPartitionTaskDTO> listAvailableJobs(Long startId, ScanTask scanTask) {
|
||||||
if (CollectionUtils.isEmpty(scanTask.getBuckets())) {
|
if (CollectionUtils.isEmpty(scanTask.getBuckets())) {
|
||||||
return Collections.emptyList();
|
return Collections.emptyList();
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,145 @@
|
|||||||
|
package com.aizuda.easy.retry.server.job.task.support.dispatch;
|
||||||
|
|
||||||
|
import akka.actor.AbstractActor;
|
||||||
|
import akka.actor.ActorRef;
|
||||||
|
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
|
||||||
|
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
|
||||||
|
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||||
|
import com.aizuda.easy.retry.server.common.WaitStrategy;
|
||||||
|
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||||
|
import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup;
|
||||||
|
import com.aizuda.easy.retry.server.common.config.SystemProperties;
|
||||||
|
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
|
||||||
|
import com.aizuda.easy.retry.server.common.dto.ScanTask;
|
||||||
|
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
|
||||||
|
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
|
||||||
|
import com.aizuda.easy.retry.server.common.util.DateUtils;
|
||||||
|
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTaskDTO;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.dto.WorkflowPartitionTaskDTO;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowMapper;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow;
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
|
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||||
|
import org.springframework.context.annotation.Scope;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author xiaowoniu
|
||||||
|
* @date 2023-12-21 21:15:29
|
||||||
|
* @since 2.6.0
|
||||||
|
*/
|
||||||
|
@Component(ActorGenerator.SCAN_WORKFLOW_ACTOR)
|
||||||
|
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||||||
|
@Slf4j
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class ScanWorkflowTaskActor extends AbstractActor {
|
||||||
|
private final WorkflowMapper workflowMapper;
|
||||||
|
private final SystemProperties systemProperties;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Receive createReceive() {
|
||||||
|
return receiveBuilder().match(ScanTask.class, config -> {
|
||||||
|
|
||||||
|
try {
|
||||||
|
doScan(config);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LogUtils.error(log, "Data scanner processing exception. [{}]", config, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
}).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doScan(ScanTask scanTask) {
|
||||||
|
long total = PartitionTaskUtils.process(startId -> listAvailableJobs(startId, scanTask),
|
||||||
|
this::processPartitionTasks, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void processPartitionTasks(List<? extends PartitionTask> partitionTasks) {
|
||||||
|
List<Workflow> waitUpdateJobs = new ArrayList<>();
|
||||||
|
List<WorkflowTaskPrepareDTO> waitExecWorkflows = new ArrayList<>();
|
||||||
|
long now = DateUtils.toNowMilli();
|
||||||
|
for (PartitionTask partitionTask : partitionTasks) {
|
||||||
|
WorkflowPartitionTaskDTO workflowPartitionTaskDTO = (WorkflowPartitionTaskDTO) partitionTask;
|
||||||
|
processJob(workflowPartitionTaskDTO, waitUpdateJobs, waitExecWorkflows, now);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 批量更新
|
||||||
|
workflowMapper.updateBatchNextTriggerAtById(waitUpdateJobs);
|
||||||
|
|
||||||
|
for (final WorkflowTaskPrepareDTO waitExecTask : waitExecWorkflows) {
|
||||||
|
// 执行预处理阶段
|
||||||
|
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
|
||||||
|
waitExecTask.setTriggerType(JobTriggerTypeEnum.AUTO.getType());
|
||||||
|
actorRef.tell(waitExecTask, actorRef);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void processJob(WorkflowPartitionTaskDTO partitionTask, List<Workflow> waitUpdateWorkflows,
|
||||||
|
List<WorkflowTaskPrepareDTO> waitExecJobs, long now) {
|
||||||
|
CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName(), partitionTask.getNamespaceId());
|
||||||
|
|
||||||
|
Workflow workflow = new Workflow();
|
||||||
|
workflow.setId(partitionTask.getId());
|
||||||
|
|
||||||
|
// 更新下次触发时间
|
||||||
|
Long nextTriggerAt = calculateNextTriggerTime(partitionTask, now);
|
||||||
|
workflow.setNextTriggerAt(nextTriggerAt);
|
||||||
|
waitUpdateWorkflows.add(workflow);
|
||||||
|
waitExecJobs.add(WorkflowTaskConverter.INSTANCE.toWorkflowTaskPrepareDTO(partitionTask));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private Long calculateNextTriggerTime(WorkflowPartitionTaskDTO partitionTask, long now) {
|
||||||
|
|
||||||
|
long nextTriggerAt = partitionTask.getNextTriggerAt();
|
||||||
|
if ((nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) {
|
||||||
|
nextTriggerAt = now;
|
||||||
|
partitionTask.setNextTriggerAt(nextTriggerAt);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 更新下次触发时间
|
||||||
|
WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(partitionTask.getTriggerType());
|
||||||
|
WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
|
||||||
|
waitStrategyContext.setTriggerInterval(partitionTask.getTriggerInterval());
|
||||||
|
waitStrategyContext.setNextTriggerAt(nextTriggerAt);
|
||||||
|
|
||||||
|
return waitStrategy.computeTriggerTime(waitStrategyContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<WorkflowPartitionTaskDTO> listAvailableJobs(Long startId, ScanTask scanTask) {
|
||||||
|
if (CollectionUtils.isEmpty(scanTask.getBuckets())) {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Workflow> workflows = workflowMapper.selectPage(new PageDTO<>(0, systemProperties.getJobPullPageSize()),
|
||||||
|
new LambdaQueryWrapper<Workflow>()
|
||||||
|
.select(Workflow::getGroupName, Workflow::getNextTriggerAt, Workflow::getTriggerType,
|
||||||
|
Workflow::getTriggerInterval, Workflow::getExecutorTimeout,
|
||||||
|
Workflow::getId, Workflow::getNamespaceId)
|
||||||
|
.eq(Workflow::getWorkflowStatus, StatusEnum.YES.getStatus())
|
||||||
|
.eq(Workflow::getDeleted, StatusEnum.NO.getStatus())
|
||||||
|
.in(Workflow::getBucketIndex, scanTask.getBuckets())
|
||||||
|
.le(Workflow::getNextTriggerAt, DateUtils.toNowMilli() + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD))
|
||||||
|
.ge(Workflow::getId, startId)
|
||||||
|
.orderByAsc(Workflow::getId)
|
||||||
|
).getRecords();
|
||||||
|
|
||||||
|
return WorkflowTaskConverter.INSTANCE.toWorkflowPartitionTaskList(workflows);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,59 @@
|
|||||||
|
package com.aizuda.easy.retry.server.job.task.support.dispatch;
|
||||||
|
|
||||||
|
import akka.actor.AbstractActor;
|
||||||
|
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
|
||||||
|
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||||
|
import org.springframework.context.annotation.Scope;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author xiaowoniu
|
||||||
|
* @date 2023-12-21 22:41:29
|
||||||
|
* @since 2.6.0
|
||||||
|
*/
|
||||||
|
@Component(ActorGenerator.WORKFLOW_TASK_PREPARE_ACTOR)
|
||||||
|
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||||||
|
@Slf4j
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class WorkflowTaskPrepareActor extends AbstractActor {
|
||||||
|
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
|
||||||
|
@Override
|
||||||
|
public Receive createReceive() {
|
||||||
|
return receiveBuilder().match(WorkflowTaskPrepareDTO.class, workflowTaskPrepareDTO -> {
|
||||||
|
try {
|
||||||
|
doPrepare(workflowTaskPrepareDTO);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("预处理节点异常", e);
|
||||||
|
} finally {
|
||||||
|
getContext().stop(getSelf());
|
||||||
|
}
|
||||||
|
}).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doPrepare(WorkflowTaskPrepareDTO workflowTaskPrepareDTO) {
|
||||||
|
List<WorkflowTaskBatch> workflowTaskBatches = workflowTaskBatchMapper.selectList(new LambdaQueryWrapper<WorkflowTaskBatch>()
|
||||||
|
.eq(WorkflowTaskBatch::getWorkflowId, workflowTaskPrepareDTO.getWorkflowId())
|
||||||
|
.eq(WorkflowTaskBatch::getTaskBatchStatus, 0));
|
||||||
|
|
||||||
|
// 则直接创建一个任务批次
|
||||||
|
if (CollectionUtils.isEmpty(workflowTaskBatches)) {
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// 判断任务是否执行超时
|
||||||
|
// 任务是否为发起调用
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -6,7 +6,7 @@ import com.aizuda.easy.retry.server.common.config.SystemProperties;
|
|||||||
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
|
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
|
||||||
import com.aizuda.easy.retry.server.common.schedule.AbstractSchedule;
|
import com.aizuda.easy.retry.server.common.schedule.AbstractSchedule;
|
||||||
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
|
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
|
||||||
import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTask;
|
import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTaskDTO;
|
||||||
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
|
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobLogMessageMapper;
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobLogMessageMapper;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
|
||||||
@ -97,7 +97,7 @@ public class JobClearLogSchedule extends AbstractSchedule implements Lifecycle {
|
|||||||
* @param endTime
|
* @param endTime
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
private List<JobPartitionTask> jobTaskBatchList(Long startId, LocalDateTime endTime) {
|
private List<JobPartitionTaskDTO> jobTaskBatchList(Long startId, LocalDateTime endTime) {
|
||||||
|
|
||||||
List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMapper.selectPage(
|
List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMapper.selectPage(
|
||||||
new Page<>(0, 1000),
|
new Page<>(0, 1000),
|
||||||
|
@ -19,6 +19,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode;
|
|||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
import com.google.common.cache.Cache;
|
import com.google.common.cache.Cache;
|
||||||
import com.google.common.util.concurrent.RateLimiter;
|
import com.google.common.util.concurrent.RateLimiter;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||||
@ -40,14 +41,13 @@ import java.util.Objects;
|
|||||||
@Component(ActorGenerator.SCAN_BUCKET_ACTOR)
|
@Component(ActorGenerator.SCAN_BUCKET_ACTOR)
|
||||||
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||||||
@Slf4j
|
@Slf4j
|
||||||
|
@RequiredArgsConstructor
|
||||||
public class ConsumerBucketActor extends AbstractActor {
|
public class ConsumerBucketActor extends AbstractActor {
|
||||||
|
private final AccessTemplate accessTemplate;
|
||||||
@Autowired
|
private final ServerNodeMapper serverNodeMapper;
|
||||||
protected AccessTemplate accessTemplate;
|
private final SystemProperties systemProperties;
|
||||||
@Autowired
|
private static final String DEFAULT_JOB_KEY = "DEFAULT_JOB_KEY";
|
||||||
protected ServerNodeMapper serverNodeMapper;
|
private static final String DEFAULT_WORKFLOW_KEY = "DEFAULT_JOB_KEY";
|
||||||
@Autowired
|
|
||||||
protected SystemProperties systemProperties;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Receive createReceive() {
|
public Receive createReceive() {
|
||||||
@ -68,11 +68,17 @@ public class ConsumerBucketActor extends AbstractActor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (SystemModeEnum.isJob(systemProperties.getMode())) {
|
if (SystemModeEnum.isJob(systemProperties.getMode())) {
|
||||||
// 扫描回调数据
|
|
||||||
ScanTask scanTask = new ScanTask();
|
ScanTask scanTask = new ScanTask();
|
||||||
scanTask.setBuckets(consumerBucket.getBuckets());
|
scanTask.setBuckets(consumerBucket.getBuckets());
|
||||||
ActorRef scanJobActorRef = cacheActorRef("DEFAULT_JOB_KEY", TaskTypeEnum.JOB);
|
|
||||||
|
// 扫描定时任务数据
|
||||||
|
ActorRef scanJobActorRef = cacheActorRef(DEFAULT_JOB_KEY, TaskTypeEnum.JOB);
|
||||||
scanJobActorRef.tell(scanTask, scanJobActorRef);
|
scanJobActorRef.tell(scanTask, scanJobActorRef);
|
||||||
|
|
||||||
|
// 扫描DAG工作流任务数据
|
||||||
|
ActorRef scanWorkflowActorRef = cacheActorRef(DEFAULT_WORKFLOW_KEY, TaskTypeEnum.WORKFLOW);
|
||||||
|
scanJobActorRef.tell(scanTask, scanWorkflowActorRef);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (SystemModeEnum.isRetry(systemProperties.getMode())) {
|
if (SystemModeEnum.isRetry(systemProperties.getMode())) {
|
||||||
|
Loading…
Reference in New Issue
Block a user