diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java index 5bc0dc75..5db48063 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java @@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.job.task.support.dispatch; import akka.actor.AbstractActor; import cn.hutool.core.lang.Assert; +import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.StatusEnum; @@ -158,6 +159,8 @@ public class JobExecutorActor extends AbstractActor { preTriggerAt = job.getNextTriggerAt(); } + System.out.println("时间监控 " + ResidentTaskCache.get(job.getId()) + "-" + job.getNextTriggerAt()); + WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext(); waitStrategyContext.setTriggerType(job.getTriggerType()); waitStrategyContext.setTriggerInterval(job.getTriggerInterval()); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java index 68ea22ff..2edc1487 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java @@ -97,7 +97,13 @@ public class ScanJobTaskActor extends AbstractActor { // 更新下次触发时间 nextTriggerAt = calculateNextTriggerTime(partitionTask); } else { - triggerTask = false; + // 若常驻任务的缓存时间为空则触发一次任务调度,说明常驻任务长时间未更新或者是系统刚刚启动 + triggerTask = Objects.isNull(nextTriggerAt); + // 若出现常驻任务时间为null或者常驻任务的内存时间长期未更新, 刷新为now + LocalDateTime now = LocalDateTime.now(); + if (Objects.isNull(nextTriggerAt) || nextTriggerAt.plusSeconds(SystemConstants.SCHEDULE_PERIOD).isBefore(now)) { + nextTriggerAt = now; + } } job.setNextTriggerAt(nextTriggerAt); @@ -118,8 +124,7 @@ public class ScanJobTaskActor extends AbstractActor { * 3、常驻任务中的触发时间不是最新的 */ private static boolean needCalculateNextTriggerTime(JobPartitionTask partitionTask, LocalDateTime nextTriggerAt) { - return !Objects.equals(StatusEnum.YES.getStatus(), partitionTask.getResident()) - || Objects.isNull(nextTriggerAt) || partitionTask.getNextTriggerAt().isAfter(nextTriggerAt); + return !Objects.equals(StatusEnum.YES.getStatus(), partitionTask.getResident()); } private LocalDateTime calculateNextTriggerTime(JobPartitionTask partitionTask) { diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java index 8b086d58..a8745965 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java @@ -8,6 +8,7 @@ import com.aizuda.easy.retry.common.core.util.CronExpression; import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.job.task.support.WaitStrategy; +import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache; import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies.WaitStrategyContext; import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies.WaitStrategyEnum; import com.aizuda.easy.retry.server.web.model.base.PageResult; @@ -22,6 +23,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -35,6 +37,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Objects; +import java.util.Optional; /** * @author www.byteblogs.com @@ -42,6 +45,7 @@ import java.util.Objects; * @since 2.4.0 */ @Service +@Slf4j public class JobServiceImpl implements JobService { private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); @@ -128,28 +132,56 @@ public class JobServiceImpl implements JobService { // 判断常驻任务 Job job = updateJobResident(jobRequestVO); job.setBucketIndex(HashUtil.bkdrHash(jobRequestVO.getGroupName() + jobRequestVO.getJobName()) % systemProperties.getBucketTotal()); - calculateNextTriggerAt(jobRequestVO, job); + calculateNextTriggerAt(jobRequestVO, LocalDateTime.now()); return 1 == jobMapper.insert(job); } @Override public boolean updateJob(JobRequestVO jobRequestVO) { Assert.notNull(jobRequestVO.getId(), () -> new EasyRetryServerException("id 不能为空")); - Assert.isTrue(1 == jobMapper.selectCount(new LambdaQueryWrapper().eq(Job::getId, jobRequestVO.getId()))); + Job job = jobMapper.selectById(jobRequestVO.getId()); + Assert.notNull(job, () -> new EasyRetryServerException("更新失败")); + + LocalDateTime oldTime = job.getNextTriggerAt(); // 判断常驻任务 - Job job = updateJobResident(jobRequestVO); - calculateNextTriggerAt(jobRequestVO, job); - return 1 == jobMapper.updateById(job); + Job updateJob = updateJobResident(jobRequestVO); + +// LocalDateTime waitUpdateTime = null; + // 新老都是常驻任务 + if (Objects.equals(job.getResident(), StatusEnum.YES.getStatus()) && Objects.equals(updateJob.getResident(), StatusEnum.YES.getStatus())) { +// LocalDateTime newTime = calculateNextTriggerAt(jobRequestVO, LocalDateTime.now()); + log.info("新老都是常驻任务 newTime:[{}] oldTime:[{}]", null , oldTime); +// job.setNextTriggerAt(oldTime); + // 老的是常驻任务 新的是非常驻任务 需要使用内存里面的触发时间计算触发时间 + } else if (Objects.equals(job.getResident(), StatusEnum.YES.getStatus()) && Objects.equals(updateJob.getResident(), StatusEnum.NO.getStatus())) { + // 常驻任务的触发时间 + LocalDateTime time = Optional.ofNullable(ResidentTaskCache.get(jobRequestVO.getId())).orElse(LocalDateTime.now()); + LocalDateTime newTime = calculateNextTriggerAt(jobRequestVO, time); + log.info("old是常驻任务 new不常驻任务 newTime:[{}] oldTime:[{}]", newTime , oldTime); + updateJob.setNextTriggerAt(newTime); + // 老的是不是常驻任务 新的是常驻任务 需要使用当前时间计算下次触发时间 + } else if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals(updateJob.getResident(), StatusEnum.YES.getStatus())) { + LocalDateTime newTime = calculateNextTriggerAt(jobRequestVO, LocalDateTime.now()); + log.info("old不是常驻任务 new是常驻任务 newTime:[{}] oldTime:[{}]",newTime , oldTime); + updateJob.setNextTriggerAt(LocalDateTime.now()); + } else { + + LocalDateTime newTime = calculateNextTriggerAt(jobRequestVO, LocalDateTime.now()); + log.info("old不是常驻任务 new不常驻任务 newTime:[{}] oldTime:[{}]",newTime , oldTime); + updateJob.setNextTriggerAt(newTime); + } + + return 1 == jobMapper.updateById(updateJob); } - private static void calculateNextTriggerAt(final JobRequestVO jobRequestVO, final Job job) { + private static LocalDateTime calculateNextTriggerAt(final JobRequestVO jobRequestVO, LocalDateTime time) { WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(jobRequestVO.getTriggerType()); WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); waitStrategyContext.setTriggerType(jobRequestVO.getTriggerType()); waitStrategyContext.setTriggerInterval(jobRequestVO.getTriggerInterval()); - waitStrategyContext.setNextTriggerAt(LocalDateTime.now()); - job.setNextTriggerAt(waitStrategy.computeRetryTime(waitStrategyContext)); + waitStrategyContext.setNextTriggerAt(time); + return waitStrategy.computeRetryTime(waitStrategyContext); } @Override