diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/PartitionTaskUtils.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/PartitionTaskUtils.java index 6d58d814..ce3b9817 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/PartitionTaskUtils.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/PartitionTaskUtils.java @@ -7,6 +7,9 @@ import java.util.List; import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.LongConsumer; +import java.util.function.LongFunction; +import java.util.function.Predicate; /** * @author: www.byteblogs.com @@ -18,14 +21,38 @@ public class PartitionTaskUtils { private PartitionTaskUtils() { } + public static long process(LongFunction> dataSource, + Consumer> task, + long startId) { + return process(dataSource, task, curStartId -> {}, CollectionUtils::isEmpty, startId); + } + + public static long process(LongFunction> dataSource, + Consumer> task, + Predicate> stopCondition, + long startId) { + return process(dataSource, task, curStartId -> {}, stopCondition, startId); + } + + public static long process(LongFunction> dataSource, + Consumer> task, + LongConsumer stopAfterProcessor, + long startId) { + return process(dataSource, task, stopAfterProcessor, CollectionUtils::isEmpty, startId); + } + public static long process( - Function> dataSource, Consumer> task, - long startId) { + LongFunction> dataSource, + Consumer> task, + LongConsumer stopAfterProcessor, + Predicate> stopCondition, + long startId) { int total = 0; do { List products = dataSource.apply(startId); - if (CollectionUtils.isEmpty(products)) { + if (stopCondition.test(products)) { // 没有查询到数据直接退出 + stopAfterProcessor.accept(startId); break; } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/strategy/WaitStrategies.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/strategy/WaitStrategies.java index 4be25dba..41a8d7ec 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/strategy/WaitStrategies.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/strategy/WaitStrategies.java @@ -122,7 +122,12 @@ public class WaitStrategies { public LocalDateTime computeRetryTime(WaitStrategyContext context) { long triggerInterval = Long.parseLong(context.triggerInterval); - return context.getNextTriggerAt().plusSeconds(triggerInterval); + LocalDateTime nextTriggerAt = context.getNextTriggerAt(); + if (nextTriggerAt.isBefore(LocalDateTime.now())) { + nextTriggerAt = LocalDateTime.now(); + } + + return nextTriggerAt.plusSeconds(triggerInterval); } } @@ -134,9 +139,14 @@ public class WaitStrategies { @Override public LocalDateTime computeRetryTime(WaitStrategyContext context) { + LocalDateTime nextTriggerAt = context.getNextTriggerAt(); + if (nextTriggerAt.isBefore(LocalDateTime.now())) { + nextTriggerAt = LocalDateTime.now(); + } + Date nextValidTime; try { - ZonedDateTime zdt = context.getNextTriggerAt().atZone(ZoneOffset.ofHours(8)); + ZonedDateTime zdt = nextTriggerAt.atZone(ZoneOffset.ofHours(8)); nextValidTime = new CronExpression(context.getTriggerInterval()).getNextValidTimeAfter(Date.from(zdt.toInstant())); } catch (ParseException e) { throw new EasyRetryServerException("解析CRON表达式异常 [{}]", context.getTriggerInterval(), e); diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java index aa28a9a4..fb644dcd 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java @@ -32,6 +32,9 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Predicate; /** * 数据扫描模板类 @@ -55,8 +58,8 @@ public abstract class AbstractScanGroup extends AbstractActor { @Autowired protected List taskExecutors; - private static long preCostTime = 0L; - private static long loopCount = 1L; + private static final AtomicLong preCostTime = new AtomicLong(0L); + private static final AtomicLong pullCount = new AtomicLong(1L); @Override public Receive createReceive() { @@ -74,8 +77,9 @@ public abstract class AbstractScanGroup extends AbstractActor { // 获取结束时间 long endTime = System.nanoTime(); - preCostTime = (endTime - startTime) / 1_000_000; - log.info("重试任务调度耗时:[{}]", preCostTime); + preCostTime.set((endTime - startTime) / 1_000_000); + + log.info("重试任务调度耗时:[{}]", preCostTime.get()); }).build(); @@ -84,42 +88,44 @@ public abstract class AbstractScanGroup extends AbstractActor { protected void doScan(final ScanTask scanTask) { // 计算循环拉取的次数 - if (preCostTime > 0) { - loopCount = (10 * 1000) / preCostTime; - loopCount = loopCount == 0 ? 1 : loopCount; + if (preCostTime.get() > 0) { + long loopCount = Math.max((10 * 1000) / preCostTime.get(), 1); + // TODO 最大拉取次数支持可配置 + loopCount = Math.min(loopCount, 10); + pullCount.set(loopCount); } String groupName = scanTask.getGroupName(); Long lastId = Optional.ofNullable(getLastId(groupName)).orElse(0L); + log.info("retry scan start. groupName:[{}] startId:[{}] pullCount:[{}] preCostTime:[{}]", + groupName, lastId, pullCount.get(), preCostTime.get()); + AtomicInteger count = new AtomicInteger(0); - PartitionTaskUtils.process(startId -> { - int i = count.getAndIncrement(); - if (i > loopCount) { - // 为空则中断处理 - return Lists.newArrayList(); + PartitionTaskUtils.process(startId -> listAvailableTasks(groupName, startId, taskActuatorScene().getTaskType().getType()), + partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask), partitionTasks -> { + if (CollectionUtils.isEmpty(partitionTasks)) { + putLastId(scanTask.getGroupName(), 0L); + return Boolean.TRUE; } - return listAvailableTasks(groupName, startId, taskActuatorScene().getTaskType().getType()); - }, - partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask), lastId); + + // 超过最大的拉取次数则中断 + if (count.getAndIncrement() > pullCount.get()) { + putLastId(scanTask.getGroupName(), partitionTasks.get(partitionTasks.size() - 1).getId()); + return Boolean.TRUE; + } + + return false; + }, lastId); } private void processRetryPartitionTasks(List partitionTasks, ScanTask scanTask) { - StopWatch watch = new StopWatch(); - watch.start(); - if (!CollectionUtils.isEmpty(partitionTasks)) { - putLastId(scanTask.getGroupName(), partitionTasks.get(partitionTasks.size() - 1).getId()); - for (PartitionTask partitionTask : partitionTasks) { - processRetryTask((RetryPartitionTask) partitionTask); - } - } else { - putLastId(scanTask.getGroupName(), 0L); + for (PartitionTask partitionTask : partitionTasks) { + processRetryTask((RetryPartitionTask) partitionTask); } - watch.getTotalTimeMillis(); - } private void processRetryTask(RetryPartitionTask partitionTask) { @@ -131,7 +137,8 @@ public abstract class AbstractScanGroup extends AbstractActor { long delay = partitionTask.getNextTriggerAt().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - System.currentTimeMillis(); - RetryTimerWheel.register(partitionTask.getGroupName(), partitionTask.getUniqueId(), timerTask(partitionTask), delay, + RetryTimerWheel.register(partitionTask.getGroupName(), partitionTask.getUniqueId(), timerTask(partitionTask), + delay, TimeUnit.MILLISECONDS); } @@ -151,7 +158,8 @@ public abstract class AbstractScanGroup extends AbstractActor { new LambdaQueryWrapper() .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()) .eq(RetryTask::getGroupName, groupName).eq(RetryTask::getTaskType, taskType) - .le(RetryTask::getNextTriggerAt, LocalDateTime.now().plusSeconds(10)).gt(RetryTask::getId, lastId) + .le(RetryTask::getNextTriggerAt, LocalDateTime.now().plusSeconds(10)) + .gt(RetryTask::getId, lastId) .orderByAsc(RetryTask::getId)) .getRecords(); diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java index 55564a6f..8b086d58 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java @@ -125,16 +125,10 @@ public class JobServiceImpl implements JobService { @Override public boolean saveJob(JobRequestVO jobRequestVO) { - WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(jobRequestVO.getTriggerType()); - WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); - waitStrategyContext.setTriggerType(jobRequestVO.getTriggerType()); - waitStrategyContext.setTriggerInterval(jobRequestVO.getTriggerInterval()); - waitStrategyContext.setNextTriggerAt(LocalDateTime.now()); - // 判断常驻任务 Job job = updateJobResident(jobRequestVO); job.setBucketIndex(HashUtil.bkdrHash(jobRequestVO.getGroupName() + jobRequestVO.getJobName()) % systemProperties.getBucketTotal()); - job.setNextTriggerAt(waitStrategy.computeRetryTime(waitStrategyContext)); + calculateNextTriggerAt(jobRequestVO, job); return 1 == jobMapper.insert(job); } @@ -145,9 +139,19 @@ public class JobServiceImpl implements JobService { // 判断常驻任务 Job job = updateJobResident(jobRequestVO); + calculateNextTriggerAt(jobRequestVO, job); return 1 == jobMapper.updateById(job); } + private static void calculateNextTriggerAt(final JobRequestVO jobRequestVO, final Job job) { + WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(jobRequestVO.getTriggerType()); + WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); + waitStrategyContext.setTriggerType(jobRequestVO.getTriggerType()); + waitStrategyContext.setTriggerInterval(jobRequestVO.getTriggerInterval()); + waitStrategyContext.setNextTriggerAt(LocalDateTime.now()); + job.setNextTriggerAt(waitStrategy.computeRetryTime(waitStrategyContext)); + } + @Override public Job updateJobResident(JobRequestVO jobRequestVO) { Job job = JobConverter.INSTANCE.toJob(jobRequestVO);