feat: 2.6.0

1. 添加触发类型添加工作流类型
This commit is contained in:
byteblogs168 2024-01-03 22:36:47 +08:00
parent e1c9a68b50
commit a382fd4fdd
14 changed files with 99 additions and 74 deletions

View File

@ -13,10 +13,10 @@ import lombok.Getter;
*/ */
@Getter @Getter
@AllArgsConstructor @AllArgsConstructor
public enum JobTriggerTypeEnum { public enum JobExecuteStrategyEnum {
AUTO(1, "自动触发"), AUTO(1, "自动执行"),
MANUAL(2, "手动触发"), MANUAL(2, "手动执行"),
WORKFLOW(2, "DAG触发"), WORKFLOW(2, "DAG执行"),
; ;
private final Integer type; private final Integer type;
@ -29,10 +29,10 @@ public enum JobTriggerTypeEnum {
* @return 对应的触发器类型枚举 * @return 对应的触发器类型枚举
* @throws EasyRetryServerException 当给定的类型不是有效的枚举类型时抛出异常 * @throws EasyRetryServerException 当给定的类型不是有效的枚举类型时抛出异常
*/ */
public static JobTriggerTypeEnum get(Integer type) { public static JobExecuteStrategyEnum get(Integer type) {
for (JobTriggerTypeEnum jobTriggerTypeEnum : JobTriggerTypeEnum.values()) { for (JobExecuteStrategyEnum jobExecuteStrategyEnum : JobExecuteStrategyEnum.values()) {
if(jobTriggerTypeEnum.getType().equals(type)) { if(jobExecuteStrategyEnum.getType().equals(type)) {
return jobTriggerTypeEnum; return jobExecuteStrategyEnum;
} }
} }

View File

@ -0,0 +1,30 @@
package com.aizuda.easy.retry.server.common.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 1 CRON表达式 2 固定时间 3 工作流
* @author xiaowoniu
* @date 2024-01-03 22:10:01
* @since 2.6.0
*/
@AllArgsConstructor
@Getter
public enum TriggerTypeEnum {
CRON(1, "CRON表达式"),
FIXED_TIME(2, "固定时间"),
WORKFLOW(3, "工作流");
private final Integer type;
private final String desc;
public static TriggerTypeEnum get(Integer type) {
for (TriggerTypeEnum triggerTypeEnum : TriggerTypeEnum.values()) {
if (triggerTypeEnum.type.equals(type)) {
return triggerTypeEnum;
}
}
return null;
}
}

View File

@ -3,7 +3,6 @@ package com.aizuda.easy.retry.server.job.task.support.dispatch;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.context.SpringContext; import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
@ -13,7 +12,7 @@ import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.WaitStrategy; import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum; import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies; import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.util.DateUtils; import com.aizuda.easy.retry.server.common.util.DateUtils;
@ -91,7 +90,7 @@ public class JobExecutorActor extends AbstractActor {
LambdaQueryWrapper<Job> queryWrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<Job> queryWrapper = new LambdaQueryWrapper<>();
// 自动地校验任务必须是开启状态手动触发无需校验 // 自动地校验任务必须是开启状态手动触发无需校验
if (JobTriggerTypeEnum.AUTO.getType().equals(taskExecute.getTriggerType())) { if (JobExecuteStrategyEnum.AUTO.getType().equals(taskExecute.getTriggerType())) {
queryWrapper.eq(Job::getJobStatus, StatusEnum.YES.getStatus()); queryWrapper.eq(Job::getJobStatus, StatusEnum.YES.getStatus());
} }
@ -117,7 +116,7 @@ public class JobExecutorActor extends AbstractActor {
try { try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId()); taskExecuteDTO.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType()); taskExecuteDTO.setTriggerType(JobExecuteStrategyEnum.AUTO.getType());
taskExecuteDTO.setParentId(taskExecute.getWorkflowNodeId()); taskExecuteDTO.setParentId(taskExecute.getWorkflowNodeId());
taskExecuteDTO.setTaskBatchId(taskExecute.getTaskBatchId()); taskExecuteDTO.setTaskBatchId(taskExecute.getTaskBatchId());
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
@ -176,8 +175,8 @@ public class JobExecutorActor extends AbstractActor {
private void doHandlerResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) { private void doHandlerResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) {
if (Objects.isNull(job) if (Objects.isNull(job)
|| JobTriggerTypeEnum.MANUAL.getType().equals(taskExecuteDTO.getTriggerType()) || JobExecuteStrategyEnum.MANUAL.getType().equals(taskExecuteDTO.getTriggerType())
|| JobTriggerTypeEnum.WORKFLOW.getType().equals(taskExecuteDTO.getTriggerType()) || JobExecuteStrategyEnum.WORKFLOW.getType().equals(taskExecuteDTO.getTriggerType())
// 是否是常驻任务 // 是否是常驻任务
|| Objects.equals(StatusEnum.NO.getStatus(), job.getResident()) || Objects.equals(StatusEnum.NO.getStatus(), job.getResident())
) { ) {
@ -187,7 +186,7 @@ public class JobExecutorActor extends AbstractActor {
JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO(); JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO();
jobTimerTaskDTO.setJobId(taskExecuteDTO.getJobId()); jobTimerTaskDTO.setJobId(taskExecuteDTO.getJobId());
jobTimerTaskDTO.setTaskBatchId(taskExecuteDTO.getTaskBatchId()); jobTimerTaskDTO.setTaskBatchId(taskExecuteDTO.getTaskBatchId());
jobTimerTaskDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType()); jobTimerTaskDTO.setTriggerType(JobExecuteStrategyEnum.AUTO.getType());
ResidentJobTimerTask timerTask = new ResidentJobTimerTask(jobTimerTaskDTO, job); ResidentJobTimerTask timerTask = new ResidentJobTimerTask(jobTimerTaskDTO, job);
WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(job.getTriggerType()); WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(job.getTriggerType());

View File

@ -11,7 +11,8 @@ import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup;
import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.PartitionTask; import com.aizuda.easy.retry.server.common.dto.PartitionTask;
import com.aizuda.easy.retry.server.common.dto.ScanTask; import com.aizuda.easy.retry.server.common.dto.ScanTask;
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum; import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.common.enums.TriggerTypeEnum;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies; import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.util.DateUtils; import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils; import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
@ -23,6 +24,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.beans.factory.config.ConfigurableBeanFactory;
@ -90,7 +92,7 @@ public class ScanJobTaskActor extends AbstractActor {
for (final JobTaskPrepareDTO waitExecJob : waitExecJobs) { for (final JobTaskPrepareDTO waitExecJob : waitExecJobs) {
// 执行预处理阶段 // 执行预处理阶段
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor(); ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
waitExecJob.setTriggerType(JobTriggerTypeEnum.AUTO.getType()); waitExecJob.setTriggerType(JobExecuteStrategyEnum.AUTO.getType());
actorRef.tell(waitExecJob, actorRef); actorRef.tell(waitExecJob, actorRef);
} }
} }
@ -163,6 +165,7 @@ public class ScanJobTaskActor extends AbstractActor {
Job::getId, Job::getNamespaceId) Job::getId, Job::getNamespaceId)
.eq(Job::getJobStatus, StatusEnum.YES.getStatus()) .eq(Job::getJobStatus, StatusEnum.YES.getStatus())
.eq(Job::getDeleted, StatusEnum.NO.getStatus()) .eq(Job::getDeleted, StatusEnum.NO.getStatus())
.ne(Job::getTriggerType,TriggerTypeEnum.WORKFLOW.getType())
.in(Job::getBucketIndex, scanTask.getBuckets()) .in(Job::getBucketIndex, scanTask.getBuckets())
.le(Job::getNextTriggerAt, DateUtils.toNowMilli() + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) .le(Job::getNextTriggerAt, DateUtils.toNowMilli() + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD))
.ge(Job::getId, startId) .ge(Job::getId, startId)

View File

@ -11,19 +11,14 @@ import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup;
import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.PartitionTask; import com.aizuda.easy.retry.server.common.dto.PartitionTask;
import com.aizuda.easy.retry.server.common.dto.ScanTask; import com.aizuda.easy.retry.server.common.dto.ScanTask;
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum; import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies; import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.util.DateUtils; import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils; import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowPartitionTaskDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowPartitionTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter; import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow; import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
@ -37,7 +32,6 @@ import org.springframework.util.CollectionUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Objects;
/** /**
* @author xiaowoniu * @author xiaowoniu
@ -85,7 +79,7 @@ public class ScanWorkflowTaskActor extends AbstractActor {
for (final WorkflowTaskPrepareDTO waitExecTask : waitExecWorkflows) { for (final WorkflowTaskPrepareDTO waitExecTask : waitExecWorkflows) {
// 执行预处理阶段 // 执行预处理阶段
ActorRef actorRef = ActorGenerator.workflowTaskPrepareActor(); ActorRef actorRef = ActorGenerator.workflowTaskPrepareActor();
waitExecTask.setTriggerType(JobTriggerTypeEnum.AUTO.getType()); waitExecTask.setTriggerType(JobExecuteStrategyEnum.AUTO.getType());
actorRef.tell(waitExecTask, actorRef); actorRef.tell(waitExecTask, actorRef);
} }
} }

View File

@ -14,7 +14,7 @@ import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.dto.DecisionConfig; import com.aizuda.easy.retry.server.common.dto.DecisionConfig;
import com.aizuda.easy.retry.server.common.enums.ExpressionTypeEnum; import com.aizuda.easy.retry.server.common.enums.ExpressionTypeEnum;
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum; import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.common.enums.LogicalConditionEnum; import com.aizuda.easy.retry.server.common.enums.LogicalConditionEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO; import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
@ -29,10 +29,6 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.*; import java.util.*;
@ -108,7 +104,7 @@ public class ConditionWorkflowExecutor extends AbstractWorkflowExecutor {
try { try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType()); taskExecuteDTO.setTriggerType(JobExecuteStrategyEnum.AUTO.getType());
taskExecuteDTO.setParentId(context.getWorkflowNodeId()); taskExecuteDTO.setParentId(context.getWorkflowNodeId());
taskExecuteDTO.setTaskBatchId(context.getTaskBatchId()); taskExecuteDTO.setTaskBatchId(context.getTaskBatchId());
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();

View File

@ -3,7 +3,7 @@ package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum; import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum; import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.common.util.DateUtils; import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
@ -26,7 +26,7 @@ public class JobTaskWorkflowExecutor extends AbstractWorkflowExecutor {
protected void doExecute(WorkflowExecutorContext context) { protected void doExecute(WorkflowExecutorContext context) {
// 生成任务批次 // 生成任务批次
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(context.getJob()); JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(context.getJob());
jobTaskPrepare.setTriggerType(JobTriggerTypeEnum.WORKFLOW.getType()); jobTaskPrepare.setTriggerType(JobExecuteStrategyEnum.WORKFLOW.getType());
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli() + DateUtils.toNowMilli() % 5000); jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli() + DateUtils.toNowMilli() % 5000);
jobTaskPrepare.setWorkflowNodeId(context.getWorkflowNodeId()); jobTaskPrepare.setWorkflowNodeId(context.getWorkflowNodeId());
jobTaskPrepare.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); jobTaskPrepare.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());

View File

@ -2,11 +2,10 @@ package com.aizuda.easy.retry.server.job.task.support.generator.batch;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum; import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtils; import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
@ -65,7 +64,7 @@ public class JobTaskBatchGenerator {
try { try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType()); taskExecuteDTO.setTriggerType(JobExecuteStrategyEnum.AUTO.getType());
taskExecuteDTO.setParentId(context.getWorkflowNodeId()); taskExecuteDTO.setParentId(context.getWorkflowNodeId());
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef); actorRef.tell(taskExecuteDTO, actorRef);

View File

@ -1,14 +1,11 @@
package com.aizuda.easy.retry.server.job.task.support.handler; package com.aizuda.easy.retry.server.job.task.support.handler;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.context.SpringContext; import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum; import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.job.task.dto.CompleteJobBatchDTO; import com.aizuda.easy.retry.server.job.task.dto.CompleteJobBatchDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.event.JobTaskFailAlarmEvent; import com.aizuda.easy.retry.server.job.task.support.event.JobTaskFailAlarmEvent;
@ -83,7 +80,7 @@ public class JobTaskBatchHandler {
try { try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(completeJobBatchDTO.getWorkflowTaskBatchId()); taskExecuteDTO.setWorkflowTaskBatchId(completeJobBatchDTO.getWorkflowTaskBatchId());
taskExecuteDTO.setTriggerType(JobTriggerTypeEnum.AUTO.getType()); taskExecuteDTO.setTriggerType(JobExecuteStrategyEnum.AUTO.getType());
taskExecuteDTO.setParentId(completeJobBatchDTO.getWorkflowNodeId()); taskExecuteDTO.setParentId(completeJobBatchDTO.getWorkflowNodeId());
// 这里取第一个的任务执行结果 // 这里取第一个的任务执行结果
taskExecuteDTO.setTaskBatchId(completeJobBatchDTO.getTaskBatchId()); taskExecuteDTO.setTaskBatchId(completeJobBatchDTO.getTaskBatchId());

View File

@ -7,10 +7,9 @@ import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum; import com.aizuda.easy.retry.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum; import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtils; import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.common.util.GraphUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
@ -265,7 +264,7 @@ public class WorkflowBatchHandler {
// 生成任务批次 // 生成任务批次
Job job = jobMapper.selectById(jobTaskBatch.getJobId()); Job job = jobMapper.selectById(jobTaskBatch.getJobId());
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job); JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job);
jobTaskPrepare.setTriggerType(JobTriggerTypeEnum.WORKFLOW.getType()); jobTaskPrepare.setTriggerType(JobExecuteStrategyEnum.WORKFLOW.getType());
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli()); jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli());
jobTaskPrepare.setWorkflowNodeId(successor); jobTaskPrepare.setWorkflowNodeId(successor);
jobTaskPrepare.setWorkflowTaskBatchId(workflowTaskBatchId); jobTaskPrepare.setWorkflowTaskBatchId(workflowTaskBatchId);

View File

@ -2,7 +2,7 @@ package com.aizuda.easy.retry.server.job.task.support.timer;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum; import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
@ -30,7 +30,7 @@ public class ResidentJobTimerTask implements TimerTask {
// 清除时间轮的缓存 // 清除时间轮的缓存
JobTimerWheel.clearCache(jobTimerTaskDTO.getTaskBatchId()); JobTimerWheel.clearCache(jobTimerTaskDTO.getTaskBatchId());
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job); JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job);
jobTaskPrepare.setTriggerType(JobTriggerTypeEnum.AUTO.getType()); jobTaskPrepare.setTriggerType(JobExecuteStrategyEnum.AUTO.getType());
// 执行预处理阶段 // 执行预处理阶段
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor(); ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
actorRef.tell(jobTaskPrepare, actorRef); actorRef.tell(jobTaskPrepare, actorRef);

View File

@ -6,7 +6,8 @@ import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.server.common.WaitStrategy; import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum; import com.aizuda.easy.retry.server.common.enums.JobExecuteStrategyEnum;
import com.aizuda.easy.retry.server.common.enums.TriggerTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies; import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.util.CronUtils; import com.aizuda.easy.retry.server.common.util.CronUtils;
@ -144,8 +145,11 @@ public class JobServiceImpl implements JobService {
Job updateJob = updateJobResident(jobRequestVO); Job updateJob = updateJobResident(jobRequestVO);
updateJob.setNamespaceId(job.getNamespaceId()); updateJob.setNamespaceId(job.getNamespaceId());
// 非常驻任务 > 非常驻任务 // 工作流任务
if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals(updateJob.getResident(), if (Objects.equals(jobRequestVO.getTriggerType(), TriggerTypeEnum.WORKFLOW.getType())) {
job.setNextTriggerAt(0L);
// 非常驻任务 > 非常驻任务
} else if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals(updateJob.getResident(),
StatusEnum.NO.getStatus())) { StatusEnum.NO.getStatus())) {
updateJob.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, DateUtils.toNowMilli())); updateJob.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, DateUtils.toNowMilli()));
} else if (Objects.equals(job.getResident(), StatusEnum.YES.getStatus()) && Objects.equals( } else if (Objects.equals(job.getResident(), StatusEnum.YES.getStatus()) && Objects.equals(
@ -164,6 +168,10 @@ public class JobServiceImpl implements JobService {
} }
private static Long calculateNextTriggerAt(final JobRequestVO jobRequestVO, Long time) { private static Long calculateNextTriggerAt(final JobRequestVO jobRequestVO, Long time) {
if (Objects.equals(jobRequestVO.getTriggerType(), TriggerTypeEnum.WORKFLOW.getType())) {
return 0L;
}
WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(jobRequestVO.getTriggerType()); WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(jobRequestVO.getTriggerType());
WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext(); WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
waitStrategyContext.setTriggerInterval(jobRequestVO.getTriggerInterval()); waitStrategyContext.setTriggerInterval(jobRequestVO.getTriggerInterval());
@ -175,6 +183,10 @@ public class JobServiceImpl implements JobService {
public Job updateJobResident(JobRequestVO jobRequestVO) { public Job updateJobResident(JobRequestVO jobRequestVO) {
Job job = JobConverter.INSTANCE.toJob(jobRequestVO); Job job = JobConverter.INSTANCE.toJob(jobRequestVO);
job.setResident(StatusEnum.NO.getStatus()); job.setResident(StatusEnum.NO.getStatus());
if (Objects.equals(jobRequestVO.getTriggerType(), TriggerTypeEnum.WORKFLOW.getType())) {
return job;
}
if (jobRequestVO.getTriggerType() == WaitStrategies.WaitStrategyEnum.FIXED.getType()) { if (jobRequestVO.getTriggerType() == WaitStrategies.WaitStrategyEnum.FIXED.getType()) {
if (Integer.parseInt(jobRequestVO.getTriggerInterval()) < 10) { if (Integer.parseInt(jobRequestVO.getTriggerInterval()) < 10) {
job.setResident(StatusEnum.YES.getStatus()); job.setResident(StatusEnum.YES.getStatus());
@ -218,7 +230,7 @@ public class JobServiceImpl implements JobService {
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job); JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job);
// 设置now表示立即执行 // 设置now表示立即执行
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli()); jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli());
jobTaskPrepare.setTriggerType(JobTriggerTypeEnum.MANUAL.getType()); jobTaskPrepare.setTriggerType(JobExecuteStrategyEnum.MANUAL.getType());
// 创建批次 // 创建批次
jobPrePareHandler.handler(jobTaskPrepare); jobPrePareHandler.handler(jobTaskPrepare);

View File

@ -31,6 +31,10 @@ const enums = {
'2': { '2': {
'name': '固定时间', 'name': '固定时间',
'color': '#f5a22d' 'color': '#f5a22d'
},
'3': {
'name': '工作流',
'color': '#76f52d'
} }
}, },
blockStrategy: { blockStrategy: {

View File

@ -120,6 +120,14 @@
'triggerInterval', 'triggerInterval',
{rules: [{ required: true, message: '请输入间隔时长', whitespace: true}]} {rules: [{ required: true, message: '请输入间隔时长', whitespace: true}]}
]" /> ]" />
<a-input
v-if="triggerTypeValue === '3'"
disabled
placeholder=""
v-decorator="[
'triggerInterval'
]" />
</a-form-item> </a-form-item>
</a-col> </a-col>
</a-row> </a-row>
@ -422,29 +430,6 @@ export default {
const taskType = this.form.getFieldValue('taskType') const taskType = this.form.getFieldValue('taskType')
if (taskType === '3') { if (taskType === '3') {
this.visible = !this.visible this.visible = !this.visible
// const { form } = this
// if (this.formType === 'create') {
// return
// }
//
// form.setFieldsValue({
// argsStr: ''
// })
//
// console.log(this.argsStrValue)
// if (this.argsStrValue.length === 0) {
// return
// }
//
// //
// const keys = this.argsStrValue.map((item, index) => {
// this.count++
// this.dynamicForm.getFieldDecorator(`sharding[${index}]`, { initialValue: item, preserve: true })
// return index
// })
//
// console.log(keys)
// this.dynamicForm.getFieldDecorator('keys', { initialValue: keys, preserve: true })
} }
}, },
getCron (cron) { getCron (cron) {
@ -456,7 +441,6 @@ export default {
const { form } = this const { form } = this
this.$refs['dynamicValidateForm'].validate(valid => { this.$refs['dynamicValidateForm'].validate(valid => {
if (valid) { if (valid) {
console.log(this.dynamicValidateForm.domains)
this.argsStrValue = this.dynamicValidateForm.domains.map((item, index) => item.value) this.argsStrValue = this.dynamicValidateForm.domains.map((item, index) => item.value)
form.setFieldsValue({ form.setFieldsValue({
argsStr: this.dynamicValidateForm.domains.map((item, index) => `分区:${index}=>${item.value}`).join('; ') argsStr: this.dynamicValidateForm.domains.map((item, index) => `分区:${index}=>${item.value}`).join('; ')
@ -493,6 +477,10 @@ export default {
values['argsStr'] = JSON.stringify(this.argsStrValue) values['argsStr'] = JSON.stringify(this.argsStrValue)
} }
if (this.triggerTypeValue === '3') {
values['triggerInterval'] = '0'
}
if (this.formType === 'create') { if (this.formType === 'create') {
saveJob(values).then(res => { saveJob(values).then(res => {
this.$message.success('任务新增完成') this.$message.success('任务新增完成')
@ -527,6 +515,10 @@ export default {
this.triggerTypeValue = formData.triggerType this.triggerTypeValue = formData.triggerType
this.taskTypeValue = formData.taskType this.taskTypeValue = formData.taskType
if (this.triggerTypeValue === '3') {
formData.triggerInterval = null
}
if (this.taskTypeValue === '3') { if (this.taskTypeValue === '3') {
this.argsStrValue = JSON.parse(formData.argsStr) this.argsStrValue = JSON.parse(formData.argsStr)
formData.argsStr = this.argsStrValue.map((item, index) => `分区:${index}=>${item}`).join(';') formData.argsStr = this.argsStrValue.map((item, index) => `分区:${index}=>${item}`).join(';')