feat: 2.4.0

1. 优化触发时间
This commit is contained in:
byteblogs168 2023-11-01 18:43:01 +08:00
parent ff92c582e1
commit b55fe2413d
3 changed files with 51 additions and 11 deletions

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.job.task.support.dispatch;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.enums.StatusEnum;
@ -158,6 +159,8 @@ public class JobExecutorActor extends AbstractActor {
preTriggerAt = job.getNextTriggerAt(); preTriggerAt = job.getNextTriggerAt();
} }
System.out.println("时间监控 " + ResidentTaskCache.get(job.getId()) + "-" + job.getNextTriggerAt());
WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext(); WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
waitStrategyContext.setTriggerType(job.getTriggerType()); waitStrategyContext.setTriggerType(job.getTriggerType());
waitStrategyContext.setTriggerInterval(job.getTriggerInterval()); waitStrategyContext.setTriggerInterval(job.getTriggerInterval());

View File

@ -97,7 +97,13 @@ public class ScanJobTaskActor extends AbstractActor {
// 更新下次触发时间 // 更新下次触发时间
nextTriggerAt = calculateNextTriggerTime(partitionTask); nextTriggerAt = calculateNextTriggerTime(partitionTask);
} else { } else {
triggerTask = false; // 若常驻任务的缓存时间为空则触发一次任务调度说明常驻任务长时间未更新或者是系统刚刚启动
triggerTask = Objects.isNull(nextTriggerAt);
// 若出现常驻任务时间为null或者常驻任务的内存时间长期未更新, 刷新为now
LocalDateTime now = LocalDateTime.now();
if (Objects.isNull(nextTriggerAt) || nextTriggerAt.plusSeconds(SystemConstants.SCHEDULE_PERIOD).isBefore(now)) {
nextTriggerAt = now;
}
} }
job.setNextTriggerAt(nextTriggerAt); job.setNextTriggerAt(nextTriggerAt);
@ -118,8 +124,7 @@ public class ScanJobTaskActor extends AbstractActor {
* 3常驻任务中的触发时间不是最新的 * 3常驻任务中的触发时间不是最新的
*/ */
private static boolean needCalculateNextTriggerTime(JobPartitionTask partitionTask, LocalDateTime nextTriggerAt) { private static boolean needCalculateNextTriggerTime(JobPartitionTask partitionTask, LocalDateTime nextTriggerAt) {
return !Objects.equals(StatusEnum.YES.getStatus(), partitionTask.getResident()) return !Objects.equals(StatusEnum.YES.getStatus(), partitionTask.getResident());
|| Objects.isNull(nextTriggerAt) || partitionTask.getNextTriggerAt().isAfter(nextTriggerAt);
} }
private LocalDateTime calculateNextTriggerTime(JobPartitionTask partitionTask) { private LocalDateTime calculateNextTriggerTime(JobPartitionTask partitionTask) {

View File

@ -8,6 +8,7 @@ import com.aizuda.easy.retry.common.core.util.CronExpression;
import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.support.WaitStrategy; import com.aizuda.easy.retry.server.job.task.support.WaitStrategy;
import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache;
import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies.WaitStrategyContext; import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies.WaitStrategyContext;
import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies.WaitStrategyEnum; import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies.WaitStrategyEnum;
import com.aizuda.easy.retry.server.web.model.base.PageResult; import com.aizuda.easy.retry.server.web.model.base.PageResult;
@ -22,6 +23,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -35,6 +37,7 @@ import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
/** /**
* @author www.byteblogs.com * @author www.byteblogs.com
@ -42,6 +45,7 @@ import java.util.Objects;
* @since 2.4.0 * @since 2.4.0
*/ */
@Service @Service
@Slf4j
public class JobServiceImpl implements JobService { public class JobServiceImpl implements JobService {
private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@ -128,28 +132,56 @@ public class JobServiceImpl implements JobService {
// 判断常驻任务 // 判断常驻任务
Job job = updateJobResident(jobRequestVO); Job job = updateJobResident(jobRequestVO);
job.setBucketIndex(HashUtil.bkdrHash(jobRequestVO.getGroupName() + jobRequestVO.getJobName()) % systemProperties.getBucketTotal()); job.setBucketIndex(HashUtil.bkdrHash(jobRequestVO.getGroupName() + jobRequestVO.getJobName()) % systemProperties.getBucketTotal());
calculateNextTriggerAt(jobRequestVO, job); calculateNextTriggerAt(jobRequestVO, LocalDateTime.now());
return 1 == jobMapper.insert(job); return 1 == jobMapper.insert(job);
} }
@Override @Override
public boolean updateJob(JobRequestVO jobRequestVO) { public boolean updateJob(JobRequestVO jobRequestVO) {
Assert.notNull(jobRequestVO.getId(), () -> new EasyRetryServerException("id 不能为空")); Assert.notNull(jobRequestVO.getId(), () -> new EasyRetryServerException("id 不能为空"));
Assert.isTrue(1 == jobMapper.selectCount(new LambdaQueryWrapper<Job>().eq(Job::getId, jobRequestVO.getId())));
Job job = jobMapper.selectById(jobRequestVO.getId());
Assert.notNull(job, () -> new EasyRetryServerException("更新失败"));
LocalDateTime oldTime = job.getNextTriggerAt();
// 判断常驻任务 // 判断常驻任务
Job job = updateJobResident(jobRequestVO); Job updateJob = updateJobResident(jobRequestVO);
calculateNextTriggerAt(jobRequestVO, job);
return 1 == jobMapper.updateById(job); // LocalDateTime waitUpdateTime = null;
// 新老都是常驻任务
if (Objects.equals(job.getResident(), StatusEnum.YES.getStatus()) && Objects.equals(updateJob.getResident(), StatusEnum.YES.getStatus())) {
// LocalDateTime newTime = calculateNextTriggerAt(jobRequestVO, LocalDateTime.now());
log.info("新老都是常驻任务 newTime:[{}] oldTime:[{}]", null , oldTime);
// job.setNextTriggerAt(oldTime);
// 老的是常驻任务 新的是非常驻任务 需要使用内存里面的触发时间计算触发时间
} else if (Objects.equals(job.getResident(), StatusEnum.YES.getStatus()) && Objects.equals(updateJob.getResident(), StatusEnum.NO.getStatus())) {
// 常驻任务的触发时间
LocalDateTime time = Optional.ofNullable(ResidentTaskCache.get(jobRequestVO.getId())).orElse(LocalDateTime.now());
LocalDateTime newTime = calculateNextTriggerAt(jobRequestVO, time);
log.info("old是常驻任务 new不常驻任务 newTime:[{}] oldTime:[{}]", newTime , oldTime);
updateJob.setNextTriggerAt(newTime);
// 老的是不是常驻任务 新的是常驻任务 需要使用当前时间计算下次触发时间
} else if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals(updateJob.getResident(), StatusEnum.YES.getStatus())) {
LocalDateTime newTime = calculateNextTriggerAt(jobRequestVO, LocalDateTime.now());
log.info("old不是常驻任务 new是常驻任务 newTime:[{}] oldTime:[{}]",newTime , oldTime);
updateJob.setNextTriggerAt(LocalDateTime.now());
} else {
LocalDateTime newTime = calculateNextTriggerAt(jobRequestVO, LocalDateTime.now());
log.info("old不是常驻任务 new不常驻任务 newTime:[{}] oldTime:[{}]",newTime , oldTime);
updateJob.setNextTriggerAt(newTime);
}
return 1 == jobMapper.updateById(updateJob);
} }
private static void calculateNextTriggerAt(final JobRequestVO jobRequestVO, final Job job) { private static LocalDateTime calculateNextTriggerAt(final JobRequestVO jobRequestVO, LocalDateTime time) {
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(jobRequestVO.getTriggerType()); WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(jobRequestVO.getTriggerType());
WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
waitStrategyContext.setTriggerType(jobRequestVO.getTriggerType()); waitStrategyContext.setTriggerType(jobRequestVO.getTriggerType());
waitStrategyContext.setTriggerInterval(jobRequestVO.getTriggerInterval()); waitStrategyContext.setTriggerInterval(jobRequestVO.getTriggerInterval());
waitStrategyContext.setNextTriggerAt(LocalDateTime.now()); waitStrategyContext.setNextTriggerAt(time);
job.setNextTriggerAt(waitStrategy.computeRetryTime(waitStrategyContext)); return waitStrategy.computeRetryTime(waitStrategyContext);
} }
@Override @Override