feat: 2.6.0

1. DAG条件节点未完成
This commit is contained in:
byteblogs168 2023-12-24 23:28:01 +08:00
parent 8e65b608ca
commit 8af045bfe4
13 changed files with 101 additions and 55 deletions

View File

@ -101,4 +101,9 @@ public interface SystemConstants {
* 根节点 * 根节点
*/ */
Long ROOT = -1L; Long ROOT = -1L;
/**
* 根节点
*/
Long CONDITION_JOB_ID = -1000L;
} }

View File

@ -26,6 +26,7 @@ public enum JobOperationReasonEnum {
NOT_EXECUTE_TASK(6, "无可执行任务项"), NOT_EXECUTE_TASK(6, "无可执行任务项"),
TASK_EXECUTE_ERROR(7, "任务执行期间发生非预期异常"), TASK_EXECUTE_ERROR(7, "任务执行期间发生非预期异常"),
MANNER_STOP(8, "手动停止"), MANNER_STOP(8, "手动停止"),
WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR(8, "手动停止"),
; ;
private final int reason; private final int reason;

View File

@ -0,0 +1,19 @@
package com.aizuda.easy.retry.server.job.task.dto;
import lombok.Data;
/**
* @author xiaowoniu
* @date 2023-12-24 23:00:24
* @since 2.6.0
*/
@Data
public class CompleteJobBatchDTO {
private Long workflowNodeId;
private Long workflowTaskBatchId;
private Long taskBatchId;
private Integer jobOperationReason;
private Object result;
}

View File

@ -26,4 +26,5 @@ public class WorkflowNodeTaskExecuteDTO {
private Long parentId; private Long parentId;
private String result;
} }

View File

@ -95,4 +95,8 @@ public interface JobTaskConverter {
JobTaskBatch toJobTaskBatch(JobTaskBatchGeneratorContext context); JobTaskBatch toJobTaskBatch(JobTaskBatchGeneratorContext context);
CompleteJobBatchDTO toCompleteJobBatchDTO(JobExecutorResultDTO jobExecutorResultDTO);
CompleteJobBatchDTO completeJobBatchDTO(JobTaskPrepareDTO jobTaskPrepareDTO);
} }

View File

@ -2,6 +2,8 @@ package com.aizuda.easy.retry.server.job.task.support;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowPartitionTaskDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowPartitionTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.executor.workflow.WorkflowExecutorContext;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext; import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext;
import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow; import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch; import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
@ -31,4 +33,6 @@ public interface WorkflowTaskConverter {
WorkflowTaskBatchGeneratorContext toWorkflowTaskBatchGeneratorContext(WorkflowTaskPrepareDTO workflowTaskPrepareDTO); WorkflowTaskBatchGeneratorContext toWorkflowTaskBatchGeneratorContext(WorkflowTaskPrepareDTO workflowTaskPrepareDTO);
WorkflowTaskBatch toWorkflowTaskBatch(WorkflowTaskBatchGeneratorContext context); WorkflowTaskBatch toWorkflowTaskBatch(WorkflowTaskBatchGeneratorContext context);
JobTaskBatchGeneratorContext toJobTaskBatchGeneratorContext(WorkflowExecutorContext context);
} }

View File

@ -8,6 +8,7 @@ import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.CompleteJobBatchDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO; import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO;
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO; import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
@ -64,8 +65,11 @@ public class JobExecutorResultActor extends AbstractActor {
new LambdaUpdateWrapper<JobTask>().eq(JobTask::getId, result.getTaskId())), new LambdaUpdateWrapper<JobTask>().eq(JobTask::getId, result.getTaskId())),
()-> new EasyRetryServerException("更新任务实例失败")); ()-> new EasyRetryServerException("更新任务实例失败"));
// 更新批次上的状态 // 更新批次上的状态
boolean complete = jobTaskBatchHandler.complete(result.getWorkflowNodeId(), result.getWorkflowTaskBatchId(), result.getTaskBatchId(), result.getJobOperationReason()); CompleteJobBatchDTO completeJobBatchDTO = JobTaskConverter.INSTANCE.toCompleteJobBatchDTO(result);
boolean complete = jobTaskBatchHandler.complete(completeJobBatchDTO);
if (complete) { if (complete) {
// 尝试停止任务 // 尝试停止任务
// 若是集群任务则客户端会主动关闭 // 若是集群任务则客户端会主动关闭

View File

@ -1,6 +1,7 @@
package com.aizuda.easy.retry.server.job.task.support.executor.workflow; package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum; import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
@ -8,15 +9,12 @@ import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum; import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtils; import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator; import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext; import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired; import lombok.extern.slf4j.Slf4j;
import org.springframework.expression.EvaluationContext; import org.springframework.expression.EvaluationContext;
import org.springframework.expression.ExpressionParser; import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.expression.spel.standard.SpelExpressionParser;
@ -32,6 +30,7 @@ import java.util.Map;
*/ */
@Component @Component
@RequiredArgsConstructor @RequiredArgsConstructor
@Slf4j
public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor { public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
private final static ExpressionParser ENGINE = new SpelExpressionParser(); private final static ExpressionParser ENGINE = new SpelExpressionParser();
@ -45,25 +44,38 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
@Override @Override
protected void doExecute(WorkflowExecutorContext context) { protected void doExecute(WorkflowExecutorContext context) {
int taskBatchStatus = JobTaskBatchStatusEnum.SUCCESS.getStatus();
int operationReason = JobOperationReasonEnum.NONE.getReason();
try {
// 根据配置的表达式执行 // 根据配置的表达式执行
Boolean result = doEval(context.getExpression(), context.getExpressionContext()); Boolean result = doEval(context.getExpression(), JsonUtil.parseHashMap(context.getResult()));
if (result) { if (result) {
JobTaskBatchGeneratorContext generatorContext = new JobTaskBatchGeneratorContext(); // 若是工作流则开启下一个任务
generatorContext.setGroupName(context.getGroupName()); try {
generatorContext.setNamespaceId(context.getNamespaceId()); WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
generatorContext.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus()); taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
generatorContext.setOperationReason(JobOperationReasonEnum.NONE.getReason()); taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType());
// 特殊的job taskExecuteDTO.setParentId(context.getWorkflowNodeId());
generatorContext.setJobId(-1L); taskExecuteDTO.setResult(context.getResult());
generatorContext.setWorkflowNodeId(context.getWorkflowNodeId()); ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
generatorContext.setParentWorkflowNodeId(context.getParentWorkflowNodeId()); actorRef.tell(taskExecuteDTO, actorRef);
generatorContext.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); } catch (Exception e) {
jobTaskBatchGenerator.generateJobTaskBatch(generatorContext); log.error("工作流执行失败", e);
} else { }
// }
} catch (Exception e) {
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
operationReason = JobOperationReasonEnum.WORKFLOW_CONDITION_NODE_EXECUTOR_ERROR.getReason();
} }
JobTaskBatchGeneratorContext generatorContext = WorkflowTaskConverter.INSTANCE.toJobTaskBatchGeneratorContext(context);
generatorContext.setTaskBatchStatus(taskBatchStatus);
generatorContext.setOperationReason(operationReason);
// 特殊的job
generatorContext.setJobId(SystemConstants.CONDITION_JOB_ID);
jobTaskBatchGenerator.generateJobTaskBatch(generatorContext);
} }
protected Boolean doEval(String expression, Map<String, Object> context) { protected Boolean doEval(String expression, Map<String, Object> context) {

View File

@ -49,6 +49,9 @@ public class WorkflowExecutorContext {
private String expression; private String expression;
private Map<String, Object> expressionContext; /**
* 客户端返回的结果
*/
private String result;
} }

View File

@ -6,8 +6,10 @@ import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum; import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
import com.aizuda.easy.retry.server.job.task.dto.CompleteJobBatchDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.event.JobTaskFailAlarmEvent; import com.aizuda.easy.retry.server.job.task.support.event.JobTaskFailAlarmEvent;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
@ -39,33 +41,17 @@ public class JobTaskBatchHandler {
@Autowired @Autowired
private JobTaskBatchMapper jobTaskBatchMapper; private JobTaskBatchMapper jobTaskBatchMapper;
/** public boolean complete(CompleteJobBatchDTO completeJobBatchDTO) {
* TODO 参数待优化
*
* @param workflowNodeId
* @param workflowTaskBatchId
* @param taskBatchId
* @param jobOperationReason
* @return
*/
public boolean complete(Long workflowNodeId, Long workflowTaskBatchId, Long taskBatchId, Integer jobOperationReason) {
List<JobTask> jobTasks = jobTaskMapper.selectList( List<JobTask> jobTasks = jobTaskMapper.selectList(
new LambdaQueryWrapper<JobTask>().select(JobTask::getTaskStatus) new LambdaQueryWrapper<JobTask>()
.eq(JobTask::getTaskBatchId, taskBatchId)); .select(JobTask::getTaskStatus, JobTask::getResultMessage)
.eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId()));
JobTaskBatch jobTaskBatch = new JobTaskBatch(); JobTaskBatch jobTaskBatch = new JobTaskBatch();
jobTaskBatch.setId(taskBatchId); jobTaskBatch.setId(completeJobBatchDTO.getTaskBatchId());
if (CollectionUtils.isEmpty(jobTasks)) { if (CollectionUtils.isEmpty(jobTasks)) {
// todo 此为异常数据没有生成对应的任务项(task) 直接更新为失败
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.FAIL.getStatus());
jobTaskBatch.setOperationReason(JobOperationReasonEnum.NOT_EXECUTE_TASK.getReason());
jobTaskBatchMapper.update(jobTaskBatch,
new LambdaUpdateWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getId, taskBatchId)
.in(JobTaskBatch::getTaskBatchStatus, JobTaskStatusEnum.NOT_COMPLETE));
SpringContext.CONTEXT.publishEvent(new JobTaskFailAlarmEvent(taskBatchId));
return false; return false;
} }
@ -81,24 +67,26 @@ public class JobTaskBatchHandler {
if (failCount > 0) { if (failCount > 0) {
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.FAIL.getStatus()); jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.FAIL.getStatus());
SpringContext.CONTEXT.publishEvent(new JobTaskFailAlarmEvent(taskBatchId)); SpringContext.CONTEXT.publishEvent(new JobTaskFailAlarmEvent(completeJobBatchDTO.getTaskBatchId()));
} else if (stopCount > 0) { } else if (stopCount > 0) {
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.STOP.getStatus()); jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.STOP.getStatus());
} else { } else {
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus()); jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
} }
if (Objects.nonNull(jobOperationReason)) { if (Objects.nonNull(completeJobBatchDTO.getJobOperationReason())) {
jobTaskBatch.setOperationReason(jobOperationReason); jobTaskBatch.setOperationReason(completeJobBatchDTO.getJobOperationReason());
} }
if (Objects.nonNull(workflowNodeId) && Objects.nonNull(workflowTaskBatchId)) { if (Objects.nonNull(completeJobBatchDTO.getWorkflowNodeId()) && Objects.nonNull(completeJobBatchDTO.getWorkflowTaskBatchId())) {
// 若是工作流则开启下一个任务 // 若是工作流则开启下一个任务
try { try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(workflowTaskBatchId); taskExecuteDTO.setWorkflowTaskBatchId(completeJobBatchDTO.getWorkflowTaskBatchId());
taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType()); taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType());
taskExecuteDTO.setParentId(workflowNodeId); taskExecuteDTO.setParentId(completeJobBatchDTO.getWorkflowNodeId());
// 这里取第一个的任务执行结果
taskExecuteDTO.setResult(jobTasks.get(0).getResultMessage());
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef); actorRef.tell(taskExecuteDTO, actorRef);
} catch (Exception e) { } catch (Exception e) {
@ -108,7 +96,7 @@ public class JobTaskBatchHandler {
return 1 == jobTaskBatchMapper.update(jobTaskBatch, return 1 == jobTaskBatchMapper.update(jobTaskBatch,
new LambdaUpdateWrapper<JobTaskBatch>() new LambdaUpdateWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getId, taskBatchId) .eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId())
.in(JobTaskBatch::getTaskBatchStatus, JobTaskStatusEnum.NOT_COMPLETE) .in(JobTaskBatch::getTaskBatchStatus, JobTaskStatusEnum.NOT_COMPLETE)
); );

View File

@ -4,6 +4,7 @@ import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.util.DateUtils; import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.CompleteJobBatchDTO;
import com.aizuda.easy.retry.server.job.task.support.BlockStrategy; import com.aizuda.easy.retry.server.job.task.support.BlockStrategy;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
@ -43,7 +44,9 @@ public class RunningJobPrepareHandler extends AbstractJobPrePareHandler {
// 若存在所有的任务都是完成但是批次上的状态为运行中则是并发导致的未把批次状态变成为终态此处做一次兜底处理 // 若存在所有的任务都是完成但是批次上的状态为运行中则是并发导致的未把批次状态变成为终态此处做一次兜底处理
int blockStrategy = prepare.getBlockStrategy(); int blockStrategy = prepare.getBlockStrategy();
JobOperationReasonEnum jobOperationReasonEnum = JobOperationReasonEnum.NONE; JobOperationReasonEnum jobOperationReasonEnum = JobOperationReasonEnum.NONE;
if (jobTaskBatchHandler.complete(prepare.getWorkflowNodeId(), prepare.getWorkflowTaskBatchId(), prepare.getTaskBatchId(), jobOperationReasonEnum.getReason())) { CompleteJobBatchDTO completeJobBatchDTO = JobTaskConverter.INSTANCE.completeJobBatchDTO(prepare);
completeJobBatchDTO.setJobOperationReason(jobOperationReasonEnum.getReason());
if (jobTaskBatchHandler.complete(completeJobBatchDTO)) {
blockStrategy = BlockStrategyEnum.CONCURRENCY.getBlockStrategy(); blockStrategy = BlockStrategyEnum.CONCURRENCY.getBlockStrategy();
} else { } else {
// 计算超时时间 // 计算超时时间

View File

@ -9,6 +9,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.io.IOException; import java.io.IOException;
import java.util.Objects;
/** /**
* @author xiaowoniu * @author xiaowoniu
@ -23,7 +24,7 @@ public class RunningWorkflowPrepareHandler extends AbstractWorkflowPrePareHandle
@Override @Override
public boolean matches(Integer status) { public boolean matches(Integer status) {
return JobTaskBatchStatusEnum.RUNNING.getStatus() == status; return Objects.nonNull(status) && JobTaskBatchStatusEnum.RUNNING.getStatus() == status;
} }
@Override @Override

View File

@ -13,6 +13,7 @@ import com.aizuda.easy.retry.server.job.task.support.timer.WorkflowTimerTask;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -28,7 +29,7 @@ public class WaiWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler {
@Override @Override
public boolean matches(Integer status) { public boolean matches(Integer status) {
return JobTaskBatchStatusEnum.WAITING.getStatus() == status; return Objects.nonNull(status) && JobTaskBatchStatusEnum.WAITING.getStatus() == status;
} }
@Override @Override