From d649db4ba68eb918c223d28e800dc041be158179 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Mon, 30 Oct 2023 17:37:55 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.4.0=201.=20=E4=BC=98=E5=8C=96JOB?= =?UTF-8?q?=E4=B8=8B=E6=AC=A1=E8=A7=A6=E5=8F=91=E6=97=B6=E9=97=B4=E4=B8=8D?= =?UTF-8?q?=E6=98=AF=E6=9C=80=E6=96=B0=E7=9A=84=E6=97=B6=E9=97=B4=EF=BC=8C?= =?UTF-8?q?=E5=AF=BC=E8=87=B4=E8=AE=A1=E7=AE=97=E7=9A=84=E4=B8=8B=E6=AC=A1?= =?UTF-8?q?=E8=A7=A6=E5=8F=91=E6=97=B6=E9=97=B4=E4=B8=8D=E5=87=86=202.=20?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=9B=B4=E6=96=B0=E8=A7=A6=E5=8F=91=E6=97=B6?= =?UTF-8?q?=E9=97=B4=E5=91=A2=EF=BC=8C=E4=B8=BA=E9=87=8D=E6=96=B0=E8=AE=A1?= =?UTF-8?q?=E7=AE=97=E4=B8=8B=E6=AC=A1=E8=A7=A6=E5=8F=91=E6=97=B6=E9=97=B4?= =?UTF-8?q?=203.=20=E9=87=8D=E8=AF=95=E7=9A=84=E6=89=AB=E6=8F=8F=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E7=9A=84=E6=8B=89=E5=8F=96=E6=95=B0=E6=8D=AE=E7=9A=84?= =?UTF-8?q?=E6=AC=A1=E6=95=B0=E4=BD=BF=E7=94=A8=E5=8A=A8=E6=80=81=E8=AE=A1?= =?UTF-8?q?=E7=AE=97=E8=80=97=E6=97=B6=E8=BF=9B=E8=A1=8C=E8=AE=A1=E7=AE=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/util/PartitionTaskUtils.java | 33 +++++++++- .../task/support/strategy/WaitStrategies.java | 14 +++- .../actor/scan/AbstractScanGroup.java | 64 +++++++++++-------- .../web/service/impl/JobServiceImpl.java | 18 ++++-- 4 files changed, 89 insertions(+), 40 deletions(-) diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/PartitionTaskUtils.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/PartitionTaskUtils.java index 6d58d814..ce3b9817 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/PartitionTaskUtils.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/PartitionTaskUtils.java @@ -7,6 +7,9 @@ import java.util.List; import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.LongConsumer; +import java.util.function.LongFunction; +import java.util.function.Predicate; /** * @author: www.byteblogs.com @@ -18,14 +21,38 @@ public class PartitionTaskUtils { private PartitionTaskUtils() { } + public static long process(LongFunction> dataSource, + Consumer> task, + long startId) { + return process(dataSource, task, curStartId -> {}, CollectionUtils::isEmpty, startId); + } + + public static long process(LongFunction> dataSource, + Consumer> task, + Predicate> stopCondition, + long startId) { + return process(dataSource, task, curStartId -> {}, stopCondition, startId); + } + + public static long process(LongFunction> dataSource, + Consumer> task, + LongConsumer stopAfterProcessor, + long startId) { + return process(dataSource, task, stopAfterProcessor, CollectionUtils::isEmpty, startId); + } + public static long process( - Function> dataSource, Consumer> task, - long startId) { + LongFunction> dataSource, + Consumer> task, + LongConsumer stopAfterProcessor, + Predicate> stopCondition, + long startId) { int total = 0; do { List products = dataSource.apply(startId); - if (CollectionUtils.isEmpty(products)) { + if (stopCondition.test(products)) { // 没有查询到数据直接退出 + stopAfterProcessor.accept(startId); break; } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/strategy/WaitStrategies.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/strategy/WaitStrategies.java index 4be25dba..41a8d7ec 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/strategy/WaitStrategies.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/strategy/WaitStrategies.java @@ -122,7 +122,12 @@ public class WaitStrategies { public LocalDateTime computeRetryTime(WaitStrategyContext context) { long triggerInterval = Long.parseLong(context.triggerInterval); - return context.getNextTriggerAt().plusSeconds(triggerInterval); + LocalDateTime nextTriggerAt = context.getNextTriggerAt(); + if (nextTriggerAt.isBefore(LocalDateTime.now())) { + nextTriggerAt = LocalDateTime.now(); + } + + return nextTriggerAt.plusSeconds(triggerInterval); } } @@ -134,9 +139,14 @@ public class WaitStrategies { @Override public LocalDateTime computeRetryTime(WaitStrategyContext context) { + LocalDateTime nextTriggerAt = context.getNextTriggerAt(); + if (nextTriggerAt.isBefore(LocalDateTime.now())) { + nextTriggerAt = LocalDateTime.now(); + } + Date nextValidTime; try { - ZonedDateTime zdt = context.getNextTriggerAt().atZone(ZoneOffset.ofHours(8)); + ZonedDateTime zdt = nextTriggerAt.atZone(ZoneOffset.ofHours(8)); nextValidTime = new CronExpression(context.getTriggerInterval()).getNextValidTimeAfter(Date.from(zdt.toInstant())); } catch (ParseException e) { throw new EasyRetryServerException("解析CRON表达式异常 [{}]", context.getTriggerInterval(), e); diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java index aa28a9a4..fb644dcd 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java @@ -32,6 +32,9 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Predicate; /** * 数据扫描模板类 @@ -55,8 +58,8 @@ public abstract class AbstractScanGroup extends AbstractActor { @Autowired protected List taskExecutors; - private static long preCostTime = 0L; - private static long loopCount = 1L; + private static final AtomicLong preCostTime = new AtomicLong(0L); + private static final AtomicLong pullCount = new AtomicLong(1L); @Override public Receive createReceive() { @@ -74,8 +77,9 @@ public abstract class AbstractScanGroup extends AbstractActor { // 获取结束时间 long endTime = System.nanoTime(); - preCostTime = (endTime - startTime) / 1_000_000; - log.info("重试任务调度耗时:[{}]", preCostTime); + preCostTime.set((endTime - startTime) / 1_000_000); + + log.info("重试任务调度耗时:[{}]", preCostTime.get()); }).build(); @@ -84,42 +88,44 @@ public abstract class AbstractScanGroup extends AbstractActor { protected void doScan(final ScanTask scanTask) { // 计算循环拉取的次数 - if (preCostTime > 0) { - loopCount = (10 * 1000) / preCostTime; - loopCount = loopCount == 0 ? 1 : loopCount; + if (preCostTime.get() > 0) { + long loopCount = Math.max((10 * 1000) / preCostTime.get(), 1); + // TODO 最大拉取次数支持可配置 + loopCount = Math.min(loopCount, 10); + pullCount.set(loopCount); } String groupName = scanTask.getGroupName(); Long lastId = Optional.ofNullable(getLastId(groupName)).orElse(0L); + log.info("retry scan start. groupName:[{}] startId:[{}] pullCount:[{}] preCostTime:[{}]", + groupName, lastId, pullCount.get(), preCostTime.get()); + AtomicInteger count = new AtomicInteger(0); - PartitionTaskUtils.process(startId -> { - int i = count.getAndIncrement(); - if (i > loopCount) { - // 为空则中断处理 - return Lists.newArrayList(); + PartitionTaskUtils.process(startId -> listAvailableTasks(groupName, startId, taskActuatorScene().getTaskType().getType()), + partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask), partitionTasks -> { + if (CollectionUtils.isEmpty(partitionTasks)) { + putLastId(scanTask.getGroupName(), 0L); + return Boolean.TRUE; } - return listAvailableTasks(groupName, startId, taskActuatorScene().getTaskType().getType()); - }, - partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask), lastId); + + // 超过最大的拉取次数则中断 + if (count.getAndIncrement() > pullCount.get()) { + putLastId(scanTask.getGroupName(), partitionTasks.get(partitionTasks.size() - 1).getId()); + return Boolean.TRUE; + } + + return false; + }, lastId); } private void processRetryPartitionTasks(List partitionTasks, ScanTask scanTask) { - StopWatch watch = new StopWatch(); - watch.start(); - if (!CollectionUtils.isEmpty(partitionTasks)) { - putLastId(scanTask.getGroupName(), partitionTasks.get(partitionTasks.size() - 1).getId()); - for (PartitionTask partitionTask : partitionTasks) { - processRetryTask((RetryPartitionTask) partitionTask); - } - } else { - putLastId(scanTask.getGroupName(), 0L); + for (PartitionTask partitionTask : partitionTasks) { + processRetryTask((RetryPartitionTask) partitionTask); } - watch.getTotalTimeMillis(); - } private void processRetryTask(RetryPartitionTask partitionTask) { @@ -131,7 +137,8 @@ public abstract class AbstractScanGroup extends AbstractActor { long delay = partitionTask.getNextTriggerAt().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - System.currentTimeMillis(); - RetryTimerWheel.register(partitionTask.getGroupName(), partitionTask.getUniqueId(), timerTask(partitionTask), delay, + RetryTimerWheel.register(partitionTask.getGroupName(), partitionTask.getUniqueId(), timerTask(partitionTask), + delay, TimeUnit.MILLISECONDS); } @@ -151,7 +158,8 @@ public abstract class AbstractScanGroup extends AbstractActor { new LambdaQueryWrapper() .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()) .eq(RetryTask::getGroupName, groupName).eq(RetryTask::getTaskType, taskType) - .le(RetryTask::getNextTriggerAt, LocalDateTime.now().plusSeconds(10)).gt(RetryTask::getId, lastId) + .le(RetryTask::getNextTriggerAt, LocalDateTime.now().plusSeconds(10)) + .gt(RetryTask::getId, lastId) .orderByAsc(RetryTask::getId)) .getRecords(); diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java index 55564a6f..8b086d58 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java @@ -125,16 +125,10 @@ public class JobServiceImpl implements JobService { @Override public boolean saveJob(JobRequestVO jobRequestVO) { - WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(jobRequestVO.getTriggerType()); - WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); - waitStrategyContext.setTriggerType(jobRequestVO.getTriggerType()); - waitStrategyContext.setTriggerInterval(jobRequestVO.getTriggerInterval()); - waitStrategyContext.setNextTriggerAt(LocalDateTime.now()); - // 判断常驻任务 Job job = updateJobResident(jobRequestVO); job.setBucketIndex(HashUtil.bkdrHash(jobRequestVO.getGroupName() + jobRequestVO.getJobName()) % systemProperties.getBucketTotal()); - job.setNextTriggerAt(waitStrategy.computeRetryTime(waitStrategyContext)); + calculateNextTriggerAt(jobRequestVO, job); return 1 == jobMapper.insert(job); } @@ -145,9 +139,19 @@ public class JobServiceImpl implements JobService { // 判断常驻任务 Job job = updateJobResident(jobRequestVO); + calculateNextTriggerAt(jobRequestVO, job); return 1 == jobMapper.updateById(job); } + private static void calculateNextTriggerAt(final JobRequestVO jobRequestVO, final Job job) { + WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(jobRequestVO.getTriggerType()); + WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); + waitStrategyContext.setTriggerType(jobRequestVO.getTriggerType()); + waitStrategyContext.setTriggerInterval(jobRequestVO.getTriggerInterval()); + waitStrategyContext.setNextTriggerAt(LocalDateTime.now()); + job.setNextTriggerAt(waitStrategy.computeRetryTime(waitStrategyContext)); + } + @Override public Job updateJobResident(JobRequestVO jobRequestVO) { Job job = JobConverter.INSTANCE.toJob(jobRequestVO);