diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java index 50f47ae2..9dc66287 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorActor.java @@ -9,36 +9,28 @@ import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.SnailJobLog; -import com.aizuda.snailjob.server.common.WaitStrategy; import com.aizuda.snailjob.server.common.akka.ActorGenerator; import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; -import com.aizuda.snailjob.server.common.dto.DistributeInstance; import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum; -import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; -import com.aizuda.snailjob.server.common.strategy.WaitStrategies; import com.aizuda.snailjob.server.common.util.DateUtils; -import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO; import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.snailjob.server.job.task.support.JobExecutor; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent; -import com.aizuda.snailjob.server.job.task.support.cache.ResidentTaskCache; import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext; import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorFactory; import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext; import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerator; import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory; +import com.aizuda.snailjob.server.job.task.support.handler.JobTaskBatchHandler; import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; import com.aizuda.snailjob.server.job.task.support.timer.JobTimeoutCheckTask; import com.aizuda.snailjob.server.job.task.support.timer.JobTimerTask; import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel; -import com.aizuda.snailjob.server.job.task.support.timer.ResidentJobTimerTask; -import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; -import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig; import com.aizuda.snailjob.template.datasource.persistence.po.Job; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; @@ -74,8 +66,8 @@ public class JobExecutorActor extends AbstractActor { private final JobMapper jobMapper; private final JobTaskBatchMapper jobTaskBatchMapper; private final TransactionTemplate transactionTemplate; - private final GroupConfigMapper groupConfigMapper; private final WorkflowBatchHandler workflowBatchHandler; + private final JobTaskBatchHandler jobTaskBatchHandler; @Override public Receive createReceive() { @@ -164,8 +156,8 @@ public class JobExecutorActor extends AbstractActor { Duration.ofMillis(DateUtils.toEpochMilli(job.getExecutorTimeout()) + 500)); } - //方法内容 - doHandleResidentTask(job, taskExecute); + // 开启下一个常驻任务 + jobTaskBatchHandler.openResidentTask(job, taskExecute); } }); } @@ -199,52 +191,4 @@ public class JobExecutorActor extends AbstractActor { } - private void doHandleResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) { - if (Objects.isNull(job) - || JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(taskExecuteDTO.getTaskExecutorScene()) - || JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene()) - || JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene()) - // 是否是常驻任务 - || Objects.equals(StatusEnum.NO.getStatus(), job.getResident()) - // 防止任务已经分配到其他节点导致的任务重复执行 - || !DistributeInstance.INSTANCE.getConsumerBucket().contains(job.getBucketIndex()) - ) { - return; - } - - long count = groupConfigMapper.selectCount(new LambdaQueryWrapper() - .eq(GroupConfig::getNamespaceId, job.getNamespaceId()) - .eq(GroupConfig::getGroupName, job.getGroupName()) - .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())); - if (count == 0) { - return; - } - - JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO(); - jobTimerTaskDTO.setJobId(taskExecuteDTO.getJobId()); - jobTimerTaskDTO.setTaskBatchId(taskExecuteDTO.getTaskBatchId()); - jobTimerTaskDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_JOB.getType()); - WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(job.getTriggerType()); - - Long preTriggerAt = ResidentTaskCache.get(job.getId()); - if (Objects.isNull(preTriggerAt) || preTriggerAt < job.getNextTriggerAt()) { - preTriggerAt = job.getNextTriggerAt(); - } - - WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext(); - waitStrategyContext.setTriggerInterval(job.getTriggerInterval()); - waitStrategyContext.setNextTriggerAt(preTriggerAt); - Long nextTriggerAt = waitStrategy.computeTriggerTime(waitStrategyContext); - - // 获取时间差的毫秒数 - long milliseconds = nextTriggerAt - preTriggerAt; - - Duration duration = Duration.ofMillis(milliseconds - DateUtils.toNowMilli() % 1000); - - log.info("常驻任务监控. [{}] 任务时间差:[{}] 取余:[{}]", duration, milliseconds, DateUtils.toNowMilli() % 1000); - job.setNextTriggerAt(nextTriggerAt); - JobTimerWheel.registerWithJob(() -> new ResidentJobTimerTask(jobTimerTaskDTO, job), duration); -// JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - DateUtils.toNowMilli() % 1000, TimeUnit.MILLISECONDS); - ResidentTaskCache.refresh(job.getId(), nextTriggerAt); - } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/batch/JobTaskBatchGenerator.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/batch/JobTaskBatchGenerator.java index 9c1d1d4c..0bbaa78e 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/batch/JobTaskBatchGenerator.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/batch/JobTaskBatchGenerator.java @@ -9,12 +9,16 @@ import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum; import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO; +import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO; import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; +import com.aizuda.snailjob.server.job.task.support.handler.JobTaskBatchHandler; import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler; import com.aizuda.snailjob.server.job.task.support.timer.JobTimerTask; import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel; +import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.Job; import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.RequiredArgsConstructor; @@ -40,6 +44,8 @@ public class JobTaskBatchGenerator { private final JobTaskBatchMapper jobTaskBatchMapper; private final WorkflowBatchHandler workflowBatchHandler; + private final JobTaskBatchHandler jobTaskBatchHandler; + private final JobMapper jobMapper; @Transactional public JobTaskBatch generateJobTaskBatch(JobTaskBatchGeneratorContext context) { @@ -76,13 +82,12 @@ public class JobTaskBatchGenerator { // 非待处理状态无需进入时间轮中 if (JobTaskBatchStatusEnum.WAITING.getStatus() != jobTaskBatch.getTaskBatchStatus()) { - WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); - taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); - taskExecuteDTO.setTaskExecutorScene(context.getTaskExecutorScene()); - taskExecuteDTO.setParentId(context.getWorkflowNodeId()); - taskExecuteDTO.setTaskBatchId(jobTaskBatch.getId()); - workflowBatchHandler.openNextNode(taskExecuteDTO); + // 开启下一个工作流 + openNextWorkflow(context, jobTaskBatch); + + // 若是常驻任务则需要再次进入时间轮 + openNextResidentTask(context, jobTaskBatch); return jobTaskBatch; } @@ -95,10 +100,35 @@ public class JobTaskBatchGenerator { jobTimerTaskDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); jobTimerTaskDTO.setWorkflowNodeId(context.getWorkflowNodeId()); JobTimerWheel.registerWithJob(() -> new JobTimerTask(jobTimerTaskDTO), Duration.ofMillis(delay)); -// JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), jobTaskBatch.getId(), -// new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS); - return jobTaskBatch; } + private void openNextResidentTask(JobTaskBatchGeneratorContext context, JobTaskBatch jobTaskBatch) { + + // 手动触发的定时任务、工作流场景下不存在常驻任务,无需触发 + if (JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(context.getTaskExecutorScene()) + || JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(context.getTaskExecutorScene()) + || JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType().equals(context.getTaskExecutorScene())) { + return; + } + + TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO(); + taskExecuteDTO.setTaskBatchId(jobTaskBatch.getId()); + taskExecuteDTO.setJobId(context.getJobId()); + taskExecuteDTO.setTaskExecutorScene(context.getTaskExecutorScene()); + taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); + taskExecuteDTO.setWorkflowNodeId(context.getWorkflowNodeId()); + Job job = jobMapper.selectById(context.getJobId()); + jobTaskBatchHandler.openResidentTask(job, taskExecuteDTO); + } + + private void openNextWorkflow(JobTaskBatchGeneratorContext context, JobTaskBatch jobTaskBatch) { + WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); + taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); + taskExecuteDTO.setTaskExecutorScene(context.getTaskExecutorScene()); + taskExecuteDTO.setParentId(context.getWorkflowNodeId()); + taskExecuteDTO.setTaskBatchId(jobTaskBatch.getId()); + workflowBatchHandler.openNextNode(taskExecuteDTO); + } + } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java index 011a9afb..9fe13054 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java @@ -4,12 +4,25 @@ import cn.hutool.core.collection.CollUtil; import com.aizuda.snailjob.common.core.context.SpringContext; import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum; +import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.server.common.WaitStrategy; +import com.aizuda.snailjob.server.common.dto.DistributeInstance; import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum; +import com.aizuda.snailjob.server.common.strategy.WaitStrategies; +import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.job.task.dto.CompleteJobBatchDTO; +import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO; +import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO; import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO; import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent; +import com.aizuda.snailjob.server.job.task.support.cache.ResidentTaskCache; +import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel; +import com.aizuda.snailjob.server.job.task.support.timer.ResidentJobTimerTask; +import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig; +import com.aizuda.snailjob.template.datasource.persistence.po.Job; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; @@ -19,6 +32,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; +import java.time.Duration; import java.time.LocalDateTime; import java.util.List; import java.util.Map; @@ -36,6 +50,7 @@ public class JobTaskBatchHandler { private final JobTaskMapper jobTaskMapper; private final JobTaskBatchMapper jobTaskBatchMapper; private final WorkflowBatchHandler workflowBatchHandler; + private final GroupConfigMapper groupConfigMapper; @Transactional @@ -91,4 +106,58 @@ public class JobTaskBatchHandler { ); } + + /** + * 开启常驻任务 + * + * @param job 定时任务配置信息 + * @param taskExecuteDTO 任务执行新 + */ + public void openResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) { + if (Objects.isNull(job) + || JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(taskExecuteDTO.getTaskExecutorScene()) + || JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene()) + || JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene()) + // 是否是常驻任务 + || Objects.equals(StatusEnum.NO.getStatus(), job.getResident()) + // 防止任务已经分配到其他节点导致的任务重复执行 + || !DistributeInstance.INSTANCE.getConsumerBucket().contains(job.getBucketIndex()) + ) { + return; + } + + long count = groupConfigMapper.selectCount(new LambdaQueryWrapper() + .eq(GroupConfig::getNamespaceId, job.getNamespaceId()) + .eq(GroupConfig::getGroupName, job.getGroupName()) + .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())); + if (count == 0) { + return; + } + + JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO(); + jobTimerTaskDTO.setJobId(taskExecuteDTO.getJobId()); + jobTimerTaskDTO.setTaskBatchId(taskExecuteDTO.getTaskBatchId()); + jobTimerTaskDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_JOB.getType()); + WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(job.getTriggerType()); + + Long preTriggerAt = ResidentTaskCache.get(job.getId()); + if (Objects.isNull(preTriggerAt) || preTriggerAt < job.getNextTriggerAt()) { + preTriggerAt = job.getNextTriggerAt(); + } + + WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext(); + waitStrategyContext.setTriggerInterval(job.getTriggerInterval()); + waitStrategyContext.setNextTriggerAt(preTriggerAt); + Long nextTriggerAt = waitStrategy.computeTriggerTime(waitStrategyContext); + + // 获取时间差的毫秒数 + long milliseconds = nextTriggerAt - preTriggerAt; + + Duration duration = Duration.ofMillis(milliseconds - DateUtils.toNowMilli() % 1000); + + log.info("常驻任务监控. [{}] 任务时间差:[{}] 取余:[{}]", duration, milliseconds, DateUtils.toNowMilli() % 1000); + job.setNextTriggerAt(nextTriggerAt); + JobTimerWheel.registerWithJob(() -> new ResidentJobTimerTask(jobTimerTaskDTO, job), duration); + ResidentTaskCache.refresh(job.getId(), nextTriggerAt); + } }