feat: 2.6.0

1. 后端流程
This commit is contained in:
byteblogs168 2023-12-23 11:17:35 +08:00
parent 309d13e2cf
commit 012bfd0634
36 changed files with 745 additions and 119 deletions

View File

@ -360,6 +360,8 @@ CREATE TABLE `job_task_batch`
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`job_id` bigint(20) NOT NULL COMMENT '任务id',
`workflow_node_id` bigint(20) NOT NULL COMMENT '工作流节点id',
`workflow_task_batch_id` bigint(20) NOT NULL COMMENT '工作流任务批次id',
`task_batch_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务批次状态 0、失败 1、成功',
`operation_reason` tinyint(4) NOT NULL DEFAULT '0' COMMENT '操作原因',
`execution_at` bigint(13) NOT NULL DEFAULT '0' COMMENT '任务执行时间',
@ -497,6 +499,7 @@ CREATE TABLE `workflow_task_batch`
`workflow_id` bigint(20) NOT NULL COMMENT '工作流任务id',
`task_batch_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '任务批次状态 0、失败 1、成功',
`operation_reason` tinyint(4) NOT NULL DEFAULT '0' COMMENT '操作原因',
`flow_info` text DEFAULT NULL COMMENT '流程信息',
`execution_at` bigint(13) NOT NULL DEFAULT '0' COMMENT '任务执行时间',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',

View File

@ -13,6 +13,10 @@ public class JobContext {
private Long taskBatchId;
private Long workflowBatchId;
private Long workflowNodeId;
private Long taskId;
private String groupName;

View File

@ -30,7 +30,7 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
.client(JobNettyClient.class)
.callback(nettyResult -> LogUtils.info(log, "Data report successfully requestId:[{}]", nettyResult.getRequestId())).build();
private JobContext jobContext;
private final JobContext jobContext;
public JobExecutorFutureCallback(final JobContext jobContext) {
this.jobContext = jobContext;
@ -97,6 +97,9 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
dispatchJobRequest.setGroupName(jobContext.getGroupName());
dispatchJobRequest.setJobId(jobContext.getJobId());
dispatchJobRequest.setTaskId(jobContext.getTaskId());
dispatchJobRequest.setWorkflowBatchId(jobContext.getWorkflowBatchId());
dispatchJobRequest.setTaskBatchId(jobContext.getTaskBatchId());
dispatchJobRequest.setTaskId(jobContext.getTaskId());
dispatchJobRequest.setTaskType(jobContext.getTaskType());
dispatchJobRequest.setExecuteResult(executeResult);
dispatchJobRequest.setTaskStatus(status);

View File

@ -36,13 +36,18 @@ public class DispatchJobRequest {
@NotBlank(message = "executorInfo 不能为空")
private String executorInfo;
@NotBlank(message = "executorTimeout 不能为空")
private Integer executorTimeout;
private String argsStr;
private Integer shardingTotal;
private Integer shardingIndex;
@NotBlank(message = "executorTimeout 不能为空")
private Integer executorTimeout;
private Long workflowBatchId;
private Long workflowNodeId;
}

View File

@ -14,6 +14,10 @@ public class DispatchJobResultRequest {
private Long taskBatchId;
private Long workflowBatchId;
private Long workflowNodeId;
private Long taskId;
/**

View File

@ -96,4 +96,9 @@ public interface SystemConstants {
* AT 所有人
*/
String AT_ALL = "all";
/**
* 根节点
*/
Long ROOT = -1L;
}

View File

@ -48,6 +48,16 @@ public class JobTaskBatch implements Serializable {
*/
private Long jobId;
/**
* 工作流批次id
*/
private Long workflowTaskBatchId;
/**
* 工作流节点id
*/
private Long workflowNodeId;
/**
* 任务批次状态
*/

View File

@ -47,18 +47,23 @@ public class WorkflowTaskBatch implements Serializable {
/**
* 任务批次状态 0失败 1成功
*/
private Byte taskBatchStatus;
private Integer taskBatchStatus;
/**
* 操作原因
*/
private Byte operationReason;
private Integer operationReason;
/**
* 任务执行时间
*/
private Long executionAt;
/**
* 流程信息
*/
private String flowInfo;
/**
* 创建时间
*/
@ -72,7 +77,7 @@ public class WorkflowTaskBatch implements Serializable {
/**
* 逻辑删除 1删除
*/
private Byte deleted;
private Integer deleted;
/**
* 扩展字段

View File

@ -44,6 +44,7 @@ public class ActorGenerator {
public static final String JOB_TASK_PREPARE_ACTOR = "JobTaskPrepareActor";
public static final String WORKFLOW_TASK_PREPARE_ACTOR = "WorkflowTaskPrepareActor";
public static final String JOB_EXECUTOR_ACTOR = "JobExecutorActor";
public static final String WORKFLOW_EXECUTOR_ACTOR = "WorkflowExecutorActor";
public static final String JOB_EXECUTOR_RESULT_ACTOR = "JobExecutorResultActor";
public static final String JOB_LOG_ACTOR = "JobLogActor";
public static final String REAL_JOB_EXECUTOR_ACTOR = "RealJobExecutorActor";
@ -195,6 +196,16 @@ public class ActorGenerator {
.withDispatcher(JOB_TASK_DISPATCHER));
}
/**
* Job调度准备阶段actor
*
* @return actor 引用
*/
public static ActorRef workflowTaskPrepareActor() {
return getJobActorSystem().actorOf(getSpringExtension().props(WORKFLOW_TASK_PREPARE_ACTOR)
.withDispatcher(JOB_TASK_DISPATCHER));
}
/**
* Job任务执行阶段actor
*
@ -208,6 +219,19 @@ public class ActorGenerator {
);
}
/**
* Job任务执行阶段actor
*
* @return actor 引用
*/
public static ActorRef workflowTaskExecutorActor() {
return getJobActorSystem()
.actorOf(getSpringExtension()
.props(WORKFLOW_EXECUTOR_ACTOR)
.withDispatcher(JOB_TASK_EXECUTOR_DISPATCHER)
);
}
/**
* Job任务执行结果actor
*

View File

@ -15,7 +15,9 @@ import lombok.Getter;
@AllArgsConstructor
public enum JobTriggerTypeEnum {
AUTO(1, "自动触发"),
MANUAL(2, "手动触发");
MANUAL(2, "手动触发"),
WORKFLOW(2, "DAG触发"),
;
private final Integer type;
private final String desc;

View File

@ -0,0 +1,41 @@
package com.aizuda.easy.retry.server.common.util;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.graph.GraphBuilder;
import com.google.common.graph.MutableGraph;
import java.io.IOException;
import java.util.Map;
/**
* @author: xiaowoniu
* @date : 2023-12-22
* @since : 2.6.0
*/
public class GraphUtils {
// 从JSON反序列化为Guava图
public static <T> MutableGraph<T> deserializeJsonToGraph(String jsonGraph) throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
// 将JSON字符串转换为Map<String, Iterable<String>>
Map<T, Iterable<T>> adjacencyList = objectMapper.readValue(
jsonGraph, new TypeReference<Map<T, Iterable<T>>>() {});
// 创建Guava图并添加节点和边
MutableGraph<T> graph = GraphBuilder.directed().build();
for (Map.Entry<T, Iterable<T>> entry : adjacencyList.entrySet()) {
T node = entry.getKey();
Iterable<T> successors = entry.getValue();
graph.addNode(node);
for (T successor : successors) {
graph.putEdge(node, successor);
}
}
return graph;
}
}

View File

@ -1,6 +1,5 @@
package com.aizuda.easy.retry.server.job.task.dto;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import lombok.Data;
/**
@ -15,6 +14,13 @@ public class JobExecutorResultDTO {
private Long taskBatchId;
/**
* 工作流任务批次id
*/
private Long workflowTaskBatchId;
private Long workflowNodeId;
private Long taskId;
/**

View File

@ -57,4 +57,15 @@ public class JobTaskPrepareDTO {
*/
private Integer triggerType;
/**
* 工作流任务批次id
*/
private Long workflowTaskBatchId;
/**
* 工作流节点id
*/
private Long workflowNodeId;
}

View File

@ -0,0 +1,29 @@
package com.aizuda.easy.retry.server.job.task.dto;
import lombok.Data;
/**
* @author: xiaowoniu
* @date : 2023-12-22
* @since : 2.6.0
*/
@Data
public class WorkflowNodeTaskExecuteDTO {
/**
* 工作流id
*/
private Long workflowId;
/**
* 工作流任务批次id
*/
private Long workflowTaskBatchId;
/**
* 触发类似 1auto 2manual
*/
private Integer triggerType;
private Long parentId;
}

View File

@ -17,4 +17,39 @@ public class WorkflowTaskPrepareDTO {
*/
private Integer triggerType;
/**
* 工作流名称
*/
private String workflowName;
/**
* 命名空间id
*/
private String namespaceId;
/**
* 组名称
*/
private String groupName;
/**
* 触发间隔
*/
private String triggerInterval;
/**
* 执行超时时间
*/
private Integer executorTimeout;
/**
* 工作流状态 0关闭1开启
*/
private Integer workflowStatus;
/**
* 流程信息
*/
private String flowInfo;
}

View File

@ -0,0 +1,21 @@
package com.aizuda.easy.retry.server.job.task.dto;
import lombok.Data;
/**
* @author www.byteblogs.com
* @date 2023-12-22
* @since 2.6.0
*/
@Data
public class WorkflowTimerTaskDTO {
private Long workflowTaskBatchId;
private Long workflowId;
/**
* 触发类似 1auto 2manual
*/
private Integer triggerType;
}

View File

@ -0,0 +1,15 @@
package com.aizuda.easy.retry.server.job.task.support;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
/**
* @author www.byteblogs.com
* @date 2023-10-22 09:34:00
* @since 2.6.0
*/
public interface WorkflowPrePareHandler {
boolean matches(Integer status);
void handler(WorkflowTaskPrepareDTO workflowTaskPrepareDTO);
}

View File

@ -1,10 +1,13 @@
package com.aizuda.easy.retry.server.job.task.support;
import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowPartitionTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext;
import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Mappings;
import org.mapstruct.factory.Mappers;
import java.util.List;
@ -20,5 +23,12 @@ public interface WorkflowTaskConverter {
List<WorkflowPartitionTaskDTO> toWorkflowPartitionTaskList(List<Workflow> workflowList);
@Mappings(
@Mapping(source = "id", target = "workflowId")
)
WorkflowTaskPrepareDTO toWorkflowTaskPrepareDTO(WorkflowPartitionTaskDTO workflowPartitionTaskDTO);
WorkflowTaskBatchGeneratorContext toWorkflowTaskBatchGeneratorContext(WorkflowTaskPrepareDTO workflowTaskPrepareDTO);
WorkflowTaskBatch toWorkflowTaskBatch(WorkflowTaskBatchGeneratorContext context);
}

View File

@ -20,6 +20,8 @@ public class ClientCallbackContext {
private Long taskBatchId;
private Long workflowBatchId;
private Long taskId;
private String groupName;

View File

@ -87,7 +87,7 @@ public class JobExecutorActor extends AbstractActor {
private void doExecute(final TaskExecuteDTO taskExecute) {
LambdaQueryWrapper<Job> queryWrapper = new LambdaQueryWrapper<Job>();
LambdaQueryWrapper<Job> queryWrapper = new LambdaQueryWrapper<>();
// 自动的校验任务必须是开启状态手动触发无需校验
if (JobTriggerTypeEnum.AUTO.getType().equals(taskExecute.getTriggerType())) {
queryWrapper.eq(Job::getJobStatus, StatusEnum.YES.getStatus());
@ -150,13 +150,15 @@ public class JobExecutorActor extends AbstractActor {
}
private void doHandlerResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) {
if (Objects.isNull(job) || JobTriggerTypeEnum.MANUAL.getType().equals(taskExecuteDTO.getTriggerType())) {
if (Objects.isNull(job)
|| JobTriggerTypeEnum.MANUAL.getType().equals(taskExecuteDTO.getTriggerType())
|| JobTriggerTypeEnum.WORKFLOW.getType().equals(taskExecuteDTO.getTriggerType())
// 是否是常驻任务
|| Objects.equals(StatusEnum.NO.getStatus(), job.getResident())
) {
return;
}
// 是否是常驻任务
if (Objects.equals(StatusEnum.YES.getStatus(), job.getResident())) {
JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO();
jobTimerTaskDTO.setJobId(taskExecuteDTO.getJobId());
jobTimerTaskDTO.setTaskBatchId(taskExecuteDTO.getTaskBatchId());
@ -183,5 +185,4 @@ public class JobExecutorActor extends AbstractActor {
JobTimerWheel.register(jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - DateUtils.toNowMilli() % 1000, TimeUnit.MILLISECONDS);
ResidentTaskCache.refresh(job.getId(), nextTriggerAt);
}
}
}

View File

@ -65,7 +65,7 @@ public class JobExecutorResultActor extends AbstractActor {
()-> new EasyRetryServerException("更新任务实例失败"));
// 更新批次上的状态
boolean complete = jobTaskBatchHandler.complete(result.getTaskBatchId(), result.getJobOperationReason());
boolean complete = jobTaskBatchHandler.complete(result.getWorkflowNodeId(), result.getWorkflowTaskBatchId(), result.getTaskBatchId(), result.getJobOperationReason());
if (complete) {
// 尝试停止任务
// 若是集群任务则客户端会主动关闭

View File

@ -66,6 +66,8 @@ public class JobTaskPrepareActor extends AbstractActor {
for (JobTaskBatch jobTaskBatch : notCompleteJobTaskBatchList) {
prepare.setExecutionAt(jobTaskBatch.getExecutionAt());
prepare.setTaskBatchId(jobTaskBatch.getId());
prepare.setWorkflowTaskBatchId(jobTaskBatch.getWorkflowTaskBatchId());
prepare.setWorkflowNodeId(jobTaskBatch.getWorkflowNodeId());
prepare.setOnlyTimeoutCheck(onlyTimeoutCheck);
for (JobPrePareHandler prePareHandler : prePareHandlers) {
if (prePareHandler.matches(jobTaskBatch.getTaskBatchStatus())) {

View File

@ -84,7 +84,7 @@ public class ScanWorkflowTaskActor extends AbstractActor {
for (final WorkflowTaskPrepareDTO waitExecTask : waitExecWorkflows) {
// 执行预处理阶段
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
ActorRef actorRef = ActorGenerator.workflowTaskPrepareActor();
waitExecTask.setTriggerType(JobTriggerTypeEnum.AUTO.getType());
actorRef.tell(waitExecTask, actorRef);
}

View File

@ -0,0 +1,110 @@
package com.aizuda.easy.retry.server.job.task.support.dispatch;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.common.util.GraphUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.graph.GraphBuilder;
import com.google.common.graph.MutableGraph;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @author: xiaowoniu
* @date : 2023-12-22 10:34
* @since : 2.6.0
*/
@Component(ActorGenerator.WORKFLOW_EXECUTOR_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
@RequiredArgsConstructor
public class WorkflowExecutorActor extends AbstractActor {
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
private final WorkflowNodeMapper workflowNodeMapper;
private final JobMapper jobMapper;
private final JobTaskBatchMapper jobTaskBatchMapper;
@Override
public Receive createReceive() {
return receiveBuilder().match(WorkflowNodeTaskExecuteDTO.class, taskExecute -> {
try {
doExecutor(taskExecute);
} catch (Exception e) {
LogUtils.error(log, "workflow executor exception. [{}]", taskExecute, e);
handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(), JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason());
// TODO 发送通知
} finally {
getContext().stop(getSelf());
}
}).build();
}
private void doExecutor(WorkflowNodeTaskExecuteDTO taskExecute) throws IOException {
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectById(taskExecute.getWorkflowTaskBatchId());
Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("任务不存在"));
// 获取DAG图
String flowInfo = workflowTaskBatch.getFlowInfo();
MutableGraph<Long> graph = GraphUtils.deserializeJsonToGraph(flowInfo);
Set<Long> predecessors = graph.predecessors(taskExecute.getParentId());
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectBatchIds(predecessors);
Set<Long> jobIdSet = workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toSet());
List<Job> jobs = jobMapper.selectBatchIds(jobIdSet);
for (Job job : jobs) {
// 生成任务批次
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job);
jobTaskPrepare.setTriggerType(JobTriggerTypeEnum.WORKFLOW.getType());
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli());
// 执行预处理阶段
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
actorRef.tell(jobTaskPrepare, actorRef);
}
}
private void handlerTaskBatch(WorkflowNodeTaskExecuteDTO taskExecute, int taskStatus, int operationReason) {
WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch();
jobTaskBatch.setId(taskExecute.getWorkflowTaskBatchId());
jobTaskBatch.setExecutionAt(DateUtils.toNowMilli());
jobTaskBatch.setTaskBatchStatus(taskStatus);
jobTaskBatch.setOperationReason(operationReason);
Assert.isTrue(1 == workflowTaskBatchMapper.updateById(jobTaskBatch),
() -> new EasyRetryServerException("更新任务失败"));
}
}

View File

@ -1,22 +1,23 @@
package com.aizuda.easy.retry.server.job.task.support.event;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
/**
* job任务失败事件
*
* @author: zuoJunLin
* @date : 2023-12-02 21:40
* @since 2.5.0
*/
@Getter
public class JobTaskFailAlarmEvent extends ApplicationEvent {
private Long jobTaskBatchId;
private final Long jobTaskBatchId;
public JobTaskFailAlarmEvent(Long jobTaskBatchId) {
super(jobTaskBatchId);
this.jobTaskBatchId=jobTaskBatchId;
this.jobTaskBatchId = jobTaskBatchId;
}
public Long getJobTaskBatchId() {
return jobTaskBatchId;
}
}

View File

@ -42,6 +42,7 @@ public class JobTaskBatchGenerator {
jobTaskBatch.setGroupName(context.getGroupName());
jobTaskBatch.setCreateDt(LocalDateTime.now());
jobTaskBatch.setNamespaceId(context.getNamespaceId());
jobTaskBatch.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
// 无执行的节点
if (CollectionUtils.isEmpty(CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()))) {
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());

View File

@ -45,5 +45,10 @@ public class JobTaskBatchGeneratorContext {
*/
private Integer triggerType;
/**
* 工作流任务批次id
*/
private Long workflowTaskBatchId;
}

View File

@ -0,0 +1,48 @@
package com.aizuda.easy.retry.server.job.task.support.generator.batch;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTimerTaskDTO;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerTask;
import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerWheel;
import com.aizuda.easy.retry.server.job.task.support.timer.WorkflowTimerTask;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.concurrent.TimeUnit;
/**
* @author: xiaowoniu
* @date : 2023-12-22 09:04
* @since : 2.6.0
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class WorkflowBatchGenerator {
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
@Transactional
public void generateJobTaskBatch(WorkflowTaskBatchGeneratorContext context) {
// 生成任务批次
WorkflowTaskBatch workflowTaskBatch = WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatch(context);
workflowTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.WAITING.getStatus());
workflowTaskBatchMapper.insert(workflowTaskBatch);
// 开始执行工作流
// 进入时间轮
long delay = context.getNextTriggerAt() - DateUtils.toNowMilli();
WorkflowTimerTaskDTO workflowTimerTaskDTO = new WorkflowTimerTaskDTO();
workflowTimerTaskDTO.setWorkflowTaskBatchId(workflowTaskBatch.getId());
workflowTimerTaskDTO.setWorkflowId(context.getWorkflowId());
workflowTimerTaskDTO.setTriggerType(context.getTriggerType());
JobTimerWheel.register(workflowTaskBatch.getId(),
new WorkflowTimerTask(workflowTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
}
}

View File

@ -0,0 +1,51 @@
package com.aizuda.easy.retry.server.job.task.support.generator.batch;
import lombok.Data;
/**
* @author www.byteblogs.com
* @date 2023-10-02 13:12:48
* @since 2.4.0
*/
@Data
public class WorkflowTaskBatchGeneratorContext {
private String namespaceId;
/**
* 组名称
*/
private String groupName;
/**
* 工作流id
*/
private Long workflowId;
/**
* 下次触发时间
*/
private Long nextTriggerAt;
/**
* 操作原因
*/
private Integer operationReason;
/**
* 任务批次状态
*/
private Integer taskBatchStatus;
/**
* 触发类似 1auto 2manual
*/
private Integer triggerType;
/**
* 流程信息
*/
private String flowInfo;
}

View File

@ -1,9 +1,14 @@
package com.aizuda.easy.retry.server.job.task.support.handler;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.event.JobTaskFailAlarmEvent;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
@ -11,6 +16,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
@ -25,6 +31,7 @@ import java.util.stream.Collectors;
* @date : 2023-10-10 16:50
*/
@Component
@Slf4j
public class JobTaskBatchHandler {
@Autowired
@ -32,7 +39,16 @@ public class JobTaskBatchHandler {
@Autowired
private JobTaskBatchMapper jobTaskBatchMapper;
public boolean complete(Long taskBatchId, Integer jobOperationReason) {
/**
* TODO 参数待优化
*
* @param workflowNodeId
* @param workflowTaskBatchId
* @param taskBatchId
* @param jobOperationReason
* @return
*/
public boolean complete(Long workflowNodeId, Long workflowTaskBatchId, Long taskBatchId, Integer jobOperationReason) {
List<JobTask> jobTasks = jobTaskMapper.selectList(
new LambdaQueryWrapper<JobTask>().select(JobTask::getTaskStatus)
@ -76,6 +92,20 @@ public class JobTaskBatchHandler {
jobTaskBatch.setOperationReason(jobOperationReason);
}
if (Objects.nonNull(workflowNodeId) && Objects.nonNull(workflowTaskBatchId)) {
// 若是工作流则开启下一个任务
try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(workflowTaskBatchId);
taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType());
taskExecuteDTO.setParentId(workflowNodeId);
ActorRef actorRef = ActorGenerator.jobTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
} catch (Exception e) {
log.error("任务调度执行失败", e);
}
}
return 1 == jobTaskBatchMapper.update(jobTaskBatch,
new LambdaUpdateWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getId, taskBatchId)

View File

@ -43,7 +43,7 @@ public class RunningJobPrepareHandler extends AbstractJobPrePareHandler {
// 若存在所有的任务都是完成但是批次上的状态为运行中则是并发导致的未把批次状态变成为终态此处做一次兜底处理
int blockStrategy = prepare.getBlockStrategy();
JobOperationReasonEnum jobOperationReasonEnum = JobOperationReasonEnum.NONE;
if (jobTaskBatchHandler.complete(prepare.getTaskBatchId(), jobOperationReasonEnum.getReason())) {
if (jobTaskBatchHandler.complete(prepare.getWorkflowNodeId(), prepare.getWorkflowTaskBatchId(), prepare.getTaskBatchId(), jobOperationReasonEnum.getReason())) {
blockStrategy = BlockStrategyEnum.CONCURRENCY.getBlockStrategy();
} else {
// 计算超时时间

View File

@ -0,0 +1,21 @@
package com.aizuda.easy.retry.server.job.task.support.prepare.workflow;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.WorkflowPrePareHandler;
/**
* @author: xiaowoniu
* @date : 2023-12-22 08:57
* @since : 2.6.0
*/
public abstract class AbstractWorkflowPrePareHandler implements WorkflowPrePareHandler {
@Override
public void handler(WorkflowTaskPrepareDTO workflowTaskPrepareDTO) {
doHandler(workflowTaskPrepareDTO);
}
protected abstract void doHandler(WorkflowTaskPrepareDTO jobPrepareDTO);
}

View File

@ -0,0 +1,32 @@
package com.aizuda.easy.retry.server.job.task.support.prepare.workflow;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowBatchGenerator;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Objects;
/**
* @author: xiaowoniu
* @date : 2023-12-22 08:59
* @since : 2.6.0
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class TerminalWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler {
private final WorkflowBatchGenerator workflowBatchGenerator;
@Override
public boolean matches(final Integer status) {
return Objects.isNull(status);
}
@Override
protected void doHandler(final WorkflowTaskPrepareDTO jobPrepareDTO) {
log.info("无处理中的数据. workflowId:[{}]", jobPrepareDTO.getWorkflowId());
workflowBatchGenerator.generateJobTaskBatch(WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatchGeneratorContext(jobPrepareDTO));
}
}

View File

@ -0,0 +1,72 @@
package com.aizuda.easy.retry.server.job.task.support.timer;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTimerTaskDTO;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
/**
* @author: xiaowoniu
* @date : 2023-09-25
* @since 2.6.0
*/
@AllArgsConstructor
@Slf4j
public class WorkflowTimerTask implements TimerTask {
private WorkflowTimerTaskDTO workflowTimerTaskDTO;
@Override
public void run(final Timeout timeout) throws Exception {
// 执行任务调度
log.info("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), workflowTimerTaskDTO.getWorkflowTaskBatchId());
try {
int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus();
int operationReason = JobOperationReasonEnum.NONE.getReason();
handlerTaskBatch(workflowTimerTaskDTO.getWorkflowTaskBatchId(), taskStatus, operationReason);
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(workflowTimerTaskDTO.getWorkflowTaskBatchId());
taskExecuteDTO.setWorkflowId(workflowTimerTaskDTO.getWorkflowId());
taskExecuteDTO.setTriggerType(workflowTimerTaskDTO.getTriggerType());
taskExecuteDTO.setParentId(SystemConstants.ROOT);
ActorRef actorRef = ActorGenerator.jobTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
} catch (Exception e) {
log.error("任务调度执行失败", e);
}
}
private void handlerTaskBatch(Long workflowTaskBatchId, int taskStatus, int operationReason) {
WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch();
jobTaskBatch.setId(workflowTaskBatchId);
jobTaskBatch.setExecutionAt(DateUtils.toNowMilli());
jobTaskBatch.setTaskBatchStatus(taskStatus);
jobTaskBatch.setOperationReason(operationReason);
Assert.isTrue(1 == SpringContext.getBeanByType(WorkflowTaskBatchMapper.class).updateById(jobTaskBatch),
() -> new EasyRetryServerException("更新任务失败"));
}
}

View File

@ -78,7 +78,7 @@ public class ConsumerBucketActor extends AbstractActor {
// 扫描DAG工作流任务数据
ActorRef scanWorkflowActorRef = cacheActorRef(DEFAULT_WORKFLOW_KEY, TaskTypeEnum.WORKFLOW);
scanJobActorRef.tell(scanTask, scanWorkflowActorRef);
scanWorkflowActorRef.tell(scanTask, scanWorkflowActorRef);
}
if (SystemModeEnum.isRetry(systemProperties.getMode())) {

View File

@ -1,11 +1,18 @@
package com.aizuda.easy.retry.server.web.service.impl;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.HashUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.web.model.base.PageResult;
import com.aizuda.easy.retry.server.web.model.request.JobRequestVO;
import com.aizuda.easy.retry.server.web.model.request.UserSessionVO;
import com.aizuda.easy.retry.server.web.model.request.WorkflowQueryVO;
import com.aizuda.easy.retry.server.web.model.request.WorkflowRequestVO;
@ -30,6 +37,7 @@ import com.google.common.graph.GraphBuilder;
import com.google.common.graph.MutableGraph;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
@ -49,7 +57,7 @@ import java.util.stream.Collectors;
public class WorkflowServiceImpl implements WorkflowService {
private final WorkflowMapper workflowMapper;
private final WorkflowNodeMapper workflowNodeMapper;
private final static long root = -1;
private final SystemProperties systemProperties;
@Override
@Transactional
@ -57,20 +65,22 @@ public class WorkflowServiceImpl implements WorkflowService {
MutableGraph<Long> graph = GraphBuilder.directed().allowsSelfLoops(false).build();
// 添加虚拟头节点
graph.addNode(root);
graph.addNode(SystemConstants.ROOT);
// 组装工作流信息
Workflow workflow = WorkflowConverter.INSTANCE.toWorkflow(workflowRequestVO);
// TODO 临时设置值
workflow.setNextTriggerAt(1L);
workflow.setNextTriggerAt(calculateNextTriggerAt(workflowRequestVO, DateUtils.toNowMilli()));
workflow.setFlowInfo(StrUtil.EMPTY);
workflow.setBucketIndex(HashUtil.bkdrHash(workflowRequestVO.getGroupName() + workflowRequestVO.getWorkflowName())
% systemProperties.getBucketTotal());
workflow.setNamespaceId(UserSessionUtils.currentUserSession().getNamespaceId());
Assert.isTrue(1 == workflowMapper.insert(workflow), () -> new EasyRetryServerException("新增工作流失败"));
// 获取DAG节点配置
NodeConfig nodeConfig = workflowRequestVO.getNodeConfig();
// 递归构建图
buildGraph(Lists.newArrayList(root), workflowRequestVO.getGroupName(), workflow.getId(), nodeConfig, graph);
buildGraph(Lists.newArrayList(SystemConstants.ROOT), workflowRequestVO.getGroupName(), workflow.getId(), nodeConfig, graph);
log.info("图构建完成. graph:[{}]", graph);
// 保存图信息
@ -80,6 +90,13 @@ public class WorkflowServiceImpl implements WorkflowService {
return true;
}
private static Long calculateNextTriggerAt(final WorkflowRequestVO workflowRequestVO, Long time) {
WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(workflowRequestVO.getTriggerType());
WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
waitStrategyContext.setTriggerInterval(workflowRequestVO.getTriggerInterval());
waitStrategyContext.setNextTriggerAt(time);
return waitStrategy.computeTriggerTime(waitStrategyContext);
}
@Override
public WorkflowDetailResponseVO getWorkflowDetail(Long id) throws IOException {
@ -101,7 +118,7 @@ public class WorkflowServiceImpl implements WorkflowService {
try {
MutableGraph<Long> graph = deserializeJsonToGraph(flowInfo);
// 反序列化构建图
WorkflowDetailResponseVO.NodeConfig config = buildNodeConfig(graph, root, new HashMap<>(), workflowNodeMap);
WorkflowDetailResponseVO.NodeConfig config = buildNodeConfig(graph, SystemConstants.ROOT, new HashMap<>(), workflowNodeMap);
responseVO.setNodeConfig(config);
} catch (Exception e) {
log.error("反序列化失败. json:[{}]", flowInfo, e);
@ -138,13 +155,13 @@ public class WorkflowServiceImpl implements WorkflowService {
MutableGraph<Long> graph = GraphBuilder.directed().allowsSelfLoops(false).build();
// 添加虚拟头节点
graph.addNode(root);
graph.addNode(SystemConstants.ROOT);
// 获取DAG节点配置
NodeConfig nodeConfig = workflowRequestVO.getNodeConfig();
// 递归构建图
buildGraph(Lists.newArrayList(root), workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph);
buildGraph(Lists.newArrayList(SystemConstants.ROOT), workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph);
log.info("图构建完成. graph:[{}]", graph);
@ -215,7 +232,7 @@ public class WorkflowServiceImpl implements WorkflowService {
buildNodeConfig(graph, successor, nodeConfigMap, workflowNodeMap);
}
if (parentId != root && mount) {
if (parentId != SystemConstants.ROOT && mount) {
previousNodeInfo.setChildNode(currentConfig);
}