diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java
index 5bc0dc75..5db48063 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java
@@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.job.task.support.dispatch;
import akka.actor.AbstractActor;
import cn.hutool.core.lang.Assert;
+import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
@@ -158,6 +159,8 @@ public class JobExecutorActor extends AbstractActor {
preTriggerAt = job.getNextTriggerAt();
}
+ System.out.println("时间监控 " + ResidentTaskCache.get(job.getId()) + "-" + job.getNextTriggerAt());
+
WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
waitStrategyContext.setTriggerType(job.getTriggerType());
waitStrategyContext.setTriggerInterval(job.getTriggerInterval());
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java
index 68ea22ff..2edc1487 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java
@@ -97,7 +97,13 @@ public class ScanJobTaskActor extends AbstractActor {
// 更新下次触发时间
nextTriggerAt = calculateNextTriggerTime(partitionTask);
} else {
- triggerTask = false;
+ // 若常驻任务的缓存时间为空则触发一次任务调度,说明常驻任务长时间未更新或者是系统刚刚启动
+ triggerTask = Objects.isNull(nextTriggerAt);
+ // 若出现常驻任务时间为null或者常驻任务的内存时间长期未更新, 刷新为now
+ LocalDateTime now = LocalDateTime.now();
+ if (Objects.isNull(nextTriggerAt) || nextTriggerAt.plusSeconds(SystemConstants.SCHEDULE_PERIOD).isBefore(now)) {
+ nextTriggerAt = now;
+ }
}
job.setNextTriggerAt(nextTriggerAt);
@@ -118,8 +124,7 @@ public class ScanJobTaskActor extends AbstractActor {
* 3、常驻任务中的触发时间不是最新的
*/
private static boolean needCalculateNextTriggerTime(JobPartitionTask partitionTask, LocalDateTime nextTriggerAt) {
- return !Objects.equals(StatusEnum.YES.getStatus(), partitionTask.getResident())
- || Objects.isNull(nextTriggerAt) || partitionTask.getNextTriggerAt().isAfter(nextTriggerAt);
+ return !Objects.equals(StatusEnum.YES.getStatus(), partitionTask.getResident());
}
private LocalDateTime calculateNextTriggerTime(JobPartitionTask partitionTask) {
diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java
index 8b086d58..a8745965 100644
--- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java
+++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java
@@ -8,6 +8,7 @@ import com.aizuda.easy.retry.common.core.util.CronExpression;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.support.WaitStrategy;
+import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache;
import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies.WaitStrategyContext;
import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies.WaitStrategyEnum;
import com.aizuda.easy.retry.server.web.model.base.PageResult;
@@ -22,6 +23,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -35,6 +37,7 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
/**
* @author www.byteblogs.com
@@ -42,6 +45,7 @@ import java.util.Objects;
* @since 2.4.0
*/
@Service
+@Slf4j
public class JobServiceImpl implements JobService {
private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@@ -128,28 +132,56 @@ public class JobServiceImpl implements JobService {
// 判断常驻任务
Job job = updateJobResident(jobRequestVO);
job.setBucketIndex(HashUtil.bkdrHash(jobRequestVO.getGroupName() + jobRequestVO.getJobName()) % systemProperties.getBucketTotal());
- calculateNextTriggerAt(jobRequestVO, job);
+ calculateNextTriggerAt(jobRequestVO, LocalDateTime.now());
return 1 == jobMapper.insert(job);
}
@Override
public boolean updateJob(JobRequestVO jobRequestVO) {
Assert.notNull(jobRequestVO.getId(), () -> new EasyRetryServerException("id 不能为空"));
- Assert.isTrue(1 == jobMapper.selectCount(new LambdaQueryWrapper<Job>().eq(Job::getId, jobRequestVO.getId())));
+ Job job = jobMapper.selectById(jobRequestVO.getId());
+ Assert.notNull(job, () -> new EasyRetryServerException("更新失败"));
+
+ LocalDateTime oldTime = job.getNextTriggerAt();
// 判断常驻任务
- Job job = updateJobResident(jobRequestVO);
- calculateNextTriggerAt(jobRequestVO, job);
- return 1 == jobMapper.updateById(job);
+ Job updateJob = updateJobResident(jobRequestVO);
+
+// LocalDateTime waitUpdateTime = null;
+ // 新老都是常驻任务
+ if (Objects.equals(job.getResident(), StatusEnum.YES.getStatus()) && Objects.equals(updateJob.getResident(), StatusEnum.YES.getStatus())) {
+// LocalDateTime newTime = calculateNextTriggerAt(jobRequestVO, LocalDateTime.now());
+ log.info("新老都是常驻任务 newTime:[{}] oldTime:[{}]", null , oldTime);
+// job.setNextTriggerAt(oldTime);
+ // 老的是常驻任务 新的是非常驻任务 需要使用内存里面的触发时间计算触发时间
+ } else if (Objects.equals(job.getResident(), StatusEnum.YES.getStatus()) && Objects.equals(updateJob.getResident(), StatusEnum.NO.getStatus())) {
+ // 常驻任务的触发时间
+ LocalDateTime time = Optional.ofNullable(ResidentTaskCache.get(jobRequestVO.getId())).orElse(LocalDateTime.now());
+ LocalDateTime newTime = calculateNextTriggerAt(jobRequestVO, time);
+ log.info("old是常驻任务 new不常驻任务 newTime:[{}] oldTime:[{}]", newTime , oldTime);
+ updateJob.setNextTriggerAt(newTime);
+ // 老的是不是常驻任务 新的是常驻任务 需要使用当前时间计算下次触发时间
+ } else if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals(updateJob.getResident(), StatusEnum.YES.getStatus())) {
+ LocalDateTime newTime = calculateNextTriggerAt(jobRequestVO, LocalDateTime.now());
+ log.info("old不是常驻任务 new是常驻任务 newTime:[{}] oldTime:[{}]",newTime , oldTime);
+ updateJob.setNextTriggerAt(LocalDateTime.now());
+ } else {
+
+ LocalDateTime newTime = calculateNextTriggerAt(jobRequestVO, LocalDateTime.now());
+ log.info("old不是常驻任务 new不常驻任务 newTime:[{}] oldTime:[{}]",newTime , oldTime);
+ updateJob.setNextTriggerAt(newTime);
+ }
+
+ return 1 == jobMapper.updateById(updateJob);
}
- private static void calculateNextTriggerAt(final JobRequestVO jobRequestVO, final Job job) {
+ private static LocalDateTime calculateNextTriggerAt(final JobRequestVO jobRequestVO, LocalDateTime time) {
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(jobRequestVO.getTriggerType());
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
waitStrategyContext.setTriggerType(jobRequestVO.getTriggerType());
waitStrategyContext.setTriggerInterval(jobRequestVO.getTriggerInterval());
- waitStrategyContext.setNextTriggerAt(LocalDateTime.now());
- job.setNextTriggerAt(waitStrategy.computeRetryTime(waitStrategyContext));
+ waitStrategyContext.setNextTriggerAt(time);
+ return waitStrategy.computeRetryTime(waitStrategyContext);
}
@Override