From b15f977cf4c2f73ecf31a043363ff8044dda7796 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Fri, 27 Oct 2023 18:33:26 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.4.0=201.=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E6=89=A7=E8=A1=8C=E9=98=B6=E6=AE=B5=EF=BC=8C?= =?UTF-8?q?=E6=94=B9=E4=B8=BA=E5=9C=A8=E6=89=A7=E8=A1=8C=E9=98=B6=E6=AE=B5?= =?UTF-8?q?=E9=99=A4=E4=BA=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../support/dispatch/JobExecutorActor.java | 119 ++++++++++++++++-- .../job/task/support/timer/JobTimerTask.java | 111 ++-------------- 2 files changed, 124 insertions(+), 106 deletions(-) diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java index 093ab034..01f936f0 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java @@ -1,20 +1,42 @@ package com.aizuda.easy.retry.server.job.task.support.dispatch; import akka.actor.AbstractActor; +import cn.hutool.core.lang.Assert; +import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; +import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO; import com.aizuda.easy.retry.server.job.task.support.JobExecutor; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; +import com.aizuda.easy.retry.server.job.task.support.WaitStrategy; +import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache; import com.aizuda.easy.retry.server.job.task.support.executor.JobExecutorContext; import com.aizuda.easy.retry.server.job.task.support.executor.JobExecutorFactory; +import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies; +import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerWheel; +import com.aizuda.easy.retry.server.job.task.support.timer.ResidentJobTimerTask; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; +import org.springframework.transaction.support.TransactionTemplate; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.Objects; +import java.util.concurrent.TimeUnit; /** * @author: www.byteblogs.com @@ -27,12 +49,22 @@ import org.springframework.stereotype.Component; public class JobExecutorActor extends AbstractActor { @Autowired private JobMapper jobMapper; + @Autowired + private JobTaskBatchMapper jobTaskBatchMapper; + @Autowired + private TransactionTemplate transactionTemplate; @Override public Receive createReceive() { return receiveBuilder().match(TaskExecuteDTO.class, taskExecute -> { try { - doExecute(taskExecute); + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + @Override + protected void doInTransactionWithoutResult(final TransactionStatus status) { + doExecute(taskExecute); + } + }); + } catch (Exception e) { LogUtils.error(log, "job executor exception. [{}]", taskExecute, e); } finally { @@ -42,13 +74,86 @@ public class JobExecutorActor extends AbstractActor { } private void doExecute(final TaskExecuteDTO taskExecute) { - Job job = jobMapper.selectById(taskExecute.getJobId()); - JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(job.getTaskType()); - JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job); - context.setTaskBatchId(taskExecute.getTaskBatchId()); - context.setJobId(job.getId()); - jobExecutor.execute(context); + Job job = jobMapper.selectOne(new LambdaQueryWrapper() + .eq(Job::getJobStatus, StatusEnum.YES.getStatus()) + .eq(Job::getId, taskExecute.getJobId()) + ); + + try { + // 更新批次的状态 + updateBatchStatus(taskExecute, job); + + // 如果任务已经关闭则不需要执行 + if (Objects.isNull(job)) { + return; + } + + // 执行任务 + JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(job.getTaskType()); + JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job); + context.setTaskBatchId(taskExecute.getTaskBatchId()); + context.setJobId(job.getId()); + jobExecutor.execute(context); + } finally { + doHandlerResidentTask(job, taskExecute); + } } + + private void updateBatchStatus(final TaskExecuteDTO taskExecute, final Job job) { + int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus(); + int operationReason = JobOperationReasonEnum.NONE.getReason(); + if (Objects.isNull(job)) { + log.warn("任务已经关闭不允许执行. jobId:[{}]", taskExecute.getJobId()); + taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus(); + operationReason = JobOperationReasonEnum.JOB_CLOSED.getReason(); + } + + JobTaskBatch jobTaskBatch = new JobTaskBatch(); + jobTaskBatch.setId(taskExecute.getTaskBatchId()); + jobTaskBatch.setExecutionAt(LocalDateTime.now()); + jobTaskBatch.setTaskBatchStatus(taskStatus); + jobTaskBatch.setOperationReason(operationReason); + Assert.isTrue(1 == jobTaskBatchMapper.updateById(jobTaskBatch), + () -> new EasyRetryServerException("更新任务失败")); + } + + private void doHandlerResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) { + if (Objects.isNull(job)) { + return; + } + + // 是否是常驻任务 + if (Objects.equals(StatusEnum.YES.getStatus(), job.getResident())) { + + JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO(); + jobTimerTaskDTO.setJobId(taskExecuteDTO.getJobId()); + jobTimerTaskDTO.setTaskBatchId(taskExecuteDTO.getTaskBatchId()); + jobTimerTaskDTO.setGroupName(taskExecuteDTO.getGroupName()); + ResidentJobTimerTask timerTask = new ResidentJobTimerTask(jobTimerTaskDTO, job); + WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(job.getTriggerType()); + + LocalDateTime preTriggerAt = ResidentTaskCache.get(job.getId()); + if (Objects.isNull(preTriggerAt) || preTriggerAt.isBefore(job.getNextTriggerAt())) { + preTriggerAt = job.getNextTriggerAt(); + } + + WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext(); + waitStrategyContext.setTriggerType(job.getTriggerType()); + waitStrategyContext.setTriggerInterval(job.getTriggerInterval()); + waitStrategyContext.setNextTriggerAt(preTriggerAt); + LocalDateTime nextTriggerAt = waitStrategy.computeRetryTime(waitStrategyContext); + + // 获取时间差的毫秒数 + Duration duration = Duration.between(preTriggerAt, nextTriggerAt); + long milliseconds = duration.toMillis(); + + log.info("常驻任务监控. 任务时间差:[{}] 取余:[{}]", milliseconds, System.currentTimeMillis() % 1000); + job.setNextTriggerAt(nextTriggerAt); + + JobTimerWheel.register(jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - System.currentTimeMillis() % 1000, TimeUnit.MILLISECONDS); + ResidentTaskCache.refresh(job.getId(), nextTriggerAt); + } + } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java index 3306d19c..b7df725c 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/timer/JobTimerTask.java @@ -1,34 +1,15 @@ package com.aizuda.easy.retry.server.job.task.support.timer; import akka.actor.ActorRef; -import cn.hutool.core.lang.Assert; -import com.aizuda.easy.retry.common.core.context.SpringContext; -import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; -import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; -import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; -import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO; -import com.aizuda.easy.retry.server.job.task.support.WaitStrategy; -import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache; -import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies; -import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; -import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; -import com.aizuda.easy.retry.template.datasource.persistence.po.Job; -import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import io.netty.util.Timeout; import io.netty.util.TimerTask; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; -import java.time.Duration; import java.time.LocalDateTime; -import java.util.Objects; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; /** * @author: www.byteblogs.com @@ -41,93 +22,25 @@ public class JobTimerTask implements TimerTask { private JobTimerTaskDTO jobTimerTaskDTO; - private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 10, TimeUnit.SECONDS, - new LinkedBlockingQueue<>()); - @Override public void run(final Timeout timeout) throws Exception { // 执行任务调度 log.info("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobTimerTaskDTO.getTaskBatchId()); - executor.execute(() -> { - Job job = null; - try { - // 清除时间轮的缓存 - JobTimerWheel.clearCache(jobTimerTaskDTO.getTaskBatchId()); + try { + // 清除时间轮的缓存 + JobTimerWheel.clearCache(jobTimerTaskDTO.getTaskBatchId()); - JobMapper jobMapper = SpringContext.getBeanByType(JobMapper.class); - job = jobMapper.selectOne(new LambdaQueryWrapper() - .eq(Job::getJobStatus, StatusEnum.YES.getStatus()) - .eq(Job::getId, jobTimerTaskDTO.getJobId()) - ); + TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO(); + taskExecuteDTO.setTaskBatchId(jobTimerTaskDTO.getTaskBatchId()); + taskExecuteDTO.setGroupName(jobTimerTaskDTO.getGroupName()); + taskExecuteDTO.setJobId(jobTimerTaskDTO.getJobId()); + ActorRef actorRef = ActorGenerator.jobTaskExecutorActor(); + actorRef.tell(taskExecuteDTO, actorRef); - int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus(); - int operationReason = JobOperationReasonEnum.NONE.getReason(); - if (Objects.isNull(job)) { - log.warn("任务已经关闭不允许执行. jobId:[{}]", jobTimerTaskDTO.getJobId()); - taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus(); - operationReason = JobOperationReasonEnum.JOB_CLOSED.getReason(); - } - - JobTaskBatchMapper jobTaskBatchMapper = SpringContext.getBeanByType(JobTaskBatchMapper.class); - JobTaskBatch jobTaskBatch = new JobTaskBatch(); - jobTaskBatch.setId(jobTimerTaskDTO.getTaskBatchId()); - jobTaskBatch.setExecutionAt(LocalDateTime.now()); - jobTaskBatch.setTaskBatchStatus(taskStatus); - jobTaskBatch.setOperationReason(operationReason); - Assert.isTrue(1 == jobTaskBatchMapper.updateById(jobTaskBatch), - () -> new EasyRetryServerException("更新任务失败")); - - // 如果任务已经关闭则不需要执行 - if (Objects.isNull(job)) { - return; - } - - TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO(); - taskExecuteDTO.setTaskBatchId(jobTimerTaskDTO.getTaskBatchId()); - taskExecuteDTO.setGroupName(jobTimerTaskDTO.getGroupName()); - taskExecuteDTO.setJobId(jobTimerTaskDTO.getJobId()); - ActorRef actorRef = ActorGenerator.jobTaskExecutorActor(); - actorRef.tell(taskExecuteDTO, actorRef); - - } catch (Exception e) { - log.error("任务调度执行失败", e); - } finally { - // 处理常驻任务 - doHandlerResidentTask(job); - - } - }); - } - - private void doHandlerResidentTask(Job job) { - if (Objects.nonNull(job)) { - // 是否是常驻任务 - if (Objects.equals(StatusEnum.YES.getStatus(), job.getResident())) { - ResidentJobTimerTask timerTask = new ResidentJobTimerTask(jobTimerTaskDTO, job); - WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(job.getTriggerType()); - - LocalDateTime preTriggerAt = ResidentTaskCache.get(jobTimerTaskDTO.getJobId()); - if (Objects.isNull(preTriggerAt) || preTriggerAt.isBefore(job.getNextTriggerAt())) { - preTriggerAt = job.getNextTriggerAt(); - } - - WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext(); - waitStrategyContext.setTriggerType(job.getTriggerType()); - waitStrategyContext.setTriggerInterval(job.getTriggerInterval()); - waitStrategyContext.setNextTriggerAt(preTriggerAt); - LocalDateTime nextTriggerAt = waitStrategy.computeRetryTime(waitStrategyContext); - - // 获取时间差的毫秒数 - Duration duration = Duration.between(preTriggerAt, nextTriggerAt); - long milliseconds = duration.toMillis(); - - log.info("常驻任务监控. 任务时间差:[{}] 取余:[{}]", milliseconds, System.currentTimeMillis() % 1000); - job.setNextTriggerAt(nextTriggerAt); - - JobTimerWheel.register(jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - System.currentTimeMillis() % 1000, TimeUnit.MILLISECONDS); - ResidentTaskCache.refresh(jobTimerTaskDTO.getJobId(), nextTriggerAt); - } + } catch (Exception e) { + log.error("任务调度执行失败", e); } } + }