From 4b272cd8e6fd30a9e63c0708285935c46e7cf87c Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Fri, 10 Nov 2023 15:33:20 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.4.0=201.=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E9=87=8D=E8=AF=95=E7=9A=84=E6=8B=89=E5=8F=96=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=A4=AA=E6=85=A2=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../persistence/mapper/JobMapper.java | 6 ++ .../persistence/mapper/RetryTaskMapper.java | 1 + .../main/resources/mysql/mapper/JobMapper.xml | 16 ++- .../mysql/mapper/RetryTaskMapper.xml | 13 +++ .../common/config/SystemProperties.java | 5 + .../retry/server/common/dto/ScanTask.java | 4 +- .../support/dispatch/ScanJobTaskActor.java | 58 ++++++----- .../actor/scan/AbstractScanGroup.java | 99 +++++++++++++------ .../actor/scan/ScanCallbackTaskActor.java | 4 +- .../actor/scan/ScanRetryTaskActor.java | 5 +- .../support/idempotent/TimerIdempotent.java | 31 ++++-- .../task/support/timer/RetryTimerWheel.java | 1 - .../starter/dispatch/ConsumerBucketActor.java | 5 +- .../src/main/resources/application.yml | 6 +- 14 files changed, 176 insertions(+), 78 deletions(-) diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/JobMapper.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/JobMapper.java index 278797bb..af4aaf00 100644 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/JobMapper.java +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/JobMapper.java @@ -1,8 +1,12 @@ package com.aizuda.easy.retry.template.datasource.persistence.mapper; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; +import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; + +import java.util.List; /** *

@@ -15,4 +19,6 @@ import org.apache.ibatis.annotations.Mapper; @Mapper public interface JobMapper extends BaseMapper { + int updateBatchNextTriggerAtById(@Param("list") List list); + } diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/RetryTaskMapper.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/RetryTaskMapper.java index abbdda0b..3979cb2e 100644 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/RetryTaskMapper.java +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/RetryTaskMapper.java @@ -10,4 +10,5 @@ public interface RetryTaskMapper extends BaseMapper { int batchInsert(@Param("list") List list); + int updateBatchNextTriggerAtById(@Param("partition") Integer partition, @Param("list") List list); } diff --git a/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/JobMapper.xml b/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/JobMapper.xml index 3825b693..de263ea0 100644 --- a/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/JobMapper.xml +++ b/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/JobMapper.xml @@ -14,7 +14,7 @@ - + @@ -26,4 +26,18 @@ + + update job rt, + ( + + select + #{item.nextTriggerAt} as next_trigger_at, + #{item.id} as id + + ) tt + set + rt.next_trigger_at = tt.next_trigger_at + where rt.id = tt.id + + diff --git a/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/RetryTaskMapper.xml b/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/RetryTaskMapper.xml index 8718aa7d..f6d69dc8 100644 --- a/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/RetryTaskMapper.xml +++ b/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/RetryTaskMapper.xml @@ -30,4 +30,17 @@ (#{item.uniqueId}, #{item.groupName}, #{item.sceneName}, #{item.idempotentId}, #{item.bizNo}, #{item.executorName}, #{item.argsStr}, #{item.extAttrs}, #{item.nextTriggerAt}, #{item.taskType}, #{item.retryStatus}, #{item.createDt}) + + update retry_task_${partition} rt, + ( + + select + #{item.nextTriggerAt} as next_trigger_at, + #{item.id} as id + + ) tt + set + rt.next_trigger_at = tt.next_trigger_at + where rt.id = tt.id + diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java index 2c9c66b9..c147b785 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java @@ -28,6 +28,11 @@ public class SystemProperties { */ private int retryPullPageSize = 1000; + /** + * 任务调度每次拉取的条数 + */ + private int jobPullPageSize = 1000; + /** * 重试每次拉取的次数 */ diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/ScanTask.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/ScanTask.java index d9b9305f..9e1077db 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/ScanTask.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/ScanTask.java @@ -19,7 +19,5 @@ public class ScanTask { private Set buckets; - private long size; - - private long startId; + private Integer groupPartition; } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java index c85c5aa4..8bb4cd2e 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java @@ -2,16 +2,15 @@ package com.aizuda.easy.retry.server.job.task.support.dispatch; import akka.actor.AbstractActor; import akka.actor.ActorRef; -import cn.hutool.core.lang.Assert; import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.common.WaitStrategy; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup; +import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.dto.PartitionTask; import com.aizuda.easy.retry.server.common.dto.ScanTask; -import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.strategy.WaitStrategies; import com.aizuda.easy.retry.server.common.util.DateUtils; import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils; @@ -30,6 +29,7 @@ import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -49,6 +49,8 @@ public class ScanJobTaskActor extends AbstractActor { @Autowired private JobMapper jobMapper; + @Autowired + private SystemProperties systemProperties; @Override public Receive createReceive() { @@ -65,23 +67,36 @@ public class ScanJobTaskActor extends AbstractActor { } private void doScan(final ScanTask scanTask) { - log.info("job scan start"); if (CollectionUtils.isEmpty(scanTask.getBuckets())) { return; } - long total = PartitionTaskUtils.process(startId -> listAvailableJobs(startId, scanTask), this::processJobPartitionTasks, scanTask.getStartId()); + long total = PartitionTaskUtils.process(startId -> listAvailableJobs(startId, scanTask), + this::processJobPartitionTasks, 0); log.info("job scan end. total:[{}]", total); } private void processJobPartitionTasks(List partitionTasks) { + + List waitUpdateJobs = new ArrayList<>(); + List waitExecJobs = new ArrayList<>(); for (PartitionTask partitionTask : partitionTasks) { - processJob((JobPartitionTask) partitionTask); + processJob((JobPartitionTask) partitionTask, waitUpdateJobs, waitExecJobs); + } + + // 批量更新 + jobMapper.updateBatchNextTriggerAtById(waitUpdateJobs); + + for (final JobTaskPrepareDTO waitExecJob : waitExecJobs) { + // 执行预处理阶段 + ActorRef actorRef = ActorGenerator.jobTaskPrepareActor(); + actorRef.tell(waitExecJob, actorRef); } } - private void processJob(JobPartitionTask partitionTask) { + private void processJob(JobPartitionTask partitionTask, final List waitUpdateJobs, + final List waitExecJobs) { CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName()); JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(partitionTask); @@ -98,27 +113,24 @@ public class ScanJobTaskActor extends AbstractActor { triggerTask = Objects.isNull(nextTriggerAt); // 若出现常驻任务时间为null或者常驻任务的内存时间长期未更新, 刷新为now long now = System.currentTimeMillis(); - if (Objects.isNull(nextTriggerAt) || (nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) { + if (Objects.isNull(nextTriggerAt) + || (nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) { nextTriggerAt = now; } } job.setNextTriggerAt(nextTriggerAt); - Assert.isTrue(1 == jobMapper.updateById(job), - () -> new EasyRetryServerException("更新job下次触发时间失败.jobId:[{}]", job.getId())); + + waitUpdateJobs.add(job); if (triggerTask) { - // 执行预处理阶段 - ActorRef actorRef = ActorGenerator.jobTaskPrepareActor(); - actorRef.tell(jobTaskPrepare, actorRef); + waitExecJobs.add(jobTaskPrepare); } + } /** - * 需要重新计算触发时间的条件 - * 1、不是常驻任务 - * 2、常驻任务缓存的触发任务为空 - * 3、常驻任务中的触发时间不是最新的 + * 需要重新计算触发时间的条件 1、不是常驻任务 2、常驻任务缓存的触发任务为空 3、常驻任务中的触发时间不是最新的 */ private static boolean needCalculateNextTriggerTime(JobPartitionTask partitionTask) { return !Objects.equals(StatusEnum.YES.getStatus(), partitionTask.getResident()); @@ -146,13 +158,13 @@ public class ScanJobTaskActor extends AbstractActor { return Collections.emptyList(); } - List jobs = jobMapper.selectPage(new PageDTO(0, scanTask.getSize()), - new LambdaQueryWrapper() - .eq(Job::getJobStatus, StatusEnum.YES.getStatus()) - .in(Job::getBucketIndex, scanTask.getBuckets()) - .le(Job::getNextTriggerAt, System.currentTimeMillis() + SystemConstants.SCHEDULE_PERIOD * 1000) - .eq(Job::getDeleted, StatusEnum.NO.getStatus()) - .ge(Job::getId, startId) + List jobs = jobMapper.selectPage(new PageDTO(0, systemProperties.getJobPullPageSize()), + new LambdaQueryWrapper() + .eq(Job::getJobStatus, StatusEnum.YES.getStatus()) + .eq(Job::getDeleted, StatusEnum.NO.getStatus()) + .in(Job::getBucketIndex, scanTask.getBuckets()) + .le(Job::getNextTriggerAt, System.currentTimeMillis() + SystemConstants.SCHEDULE_PERIOD * 1000) + .ge(Job::getId, startId) ).getRecords(); return JobTaskConverter.INSTANCE.toJobPartitionTasks(jobs); diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java index 20373260..e85d23fc 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java @@ -17,7 +17,10 @@ import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecuto import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum; import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerWheel; import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; +import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig; +import com.aizuda.easy.retry.template.datasource.utils.RequestDataHelper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import io.netty.util.TimerTask; @@ -27,12 +30,16 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.util.CollectionUtils; import java.time.LocalDateTime; -import java.time.ZoneId; +import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; /** * 数据扫描模板类 @@ -55,6 +62,8 @@ public abstract class AbstractScanGroup extends AbstractActor { protected ClientNodeAllocateHandler clientNodeAllocateHandler; @Autowired protected List taskExecutors; + @Autowired + protected RetryTaskMapper retryTaskMapper; @Override public Receive createReceive() { @@ -74,8 +83,6 @@ public abstract class AbstractScanGroup extends AbstractActor { preCostTime().set((endTime - startTime) / 1_000_000); - log.info(this.getClass().getName() + "重试任务调度耗时:[{}]", preCostTime().get()); - }).build(); } @@ -92,20 +99,17 @@ public abstract class AbstractScanGroup extends AbstractActor { String groupName = scanTask.getGroupName(); Long lastId = Optional.ofNullable(getLastId(groupName)).orElse(0L); - log.info(this.getClass().getName() + " retry scan start. groupName:[{}] startId:[{}] pullCount:[{}] preCostTime:[{}]", - groupName, lastId, prePullCount().get(), preCostTime().get()); - AtomicInteger count = new AtomicInteger(0); long total = PartitionTaskUtils.process( startId -> listAvailableTasks(groupName, startId, taskActuatorScene().getTaskType().getType()), - this::processRetryPartitionTasks, partitionTasks -> { + partitionTasks1 -> processRetryPartitionTasks(partitionTasks1, scanTask), partitionTasks -> { if (CollectionUtils.isEmpty(partitionTasks)) { putLastId(scanTask.getGroupName(), 0L); return Boolean.TRUE; } // 超过最大的拉取次数则中断 - if (count.getAndIncrement() > prePullCount().get()) { + if (count.incrementAndGet() >= prePullCount().get()) { putLastId(scanTask.getGroupName(), partitionTasks.get(partitionTasks.size() - 1).getId()); return Boolean.TRUE; } @@ -113,31 +117,61 @@ public abstract class AbstractScanGroup extends AbstractActor { return false; }, lastId); - log.info(this.getClass().getName() + " retry scan end. groupName:[{}] total:[{}] realPullCount:[{}]", - groupName, total, count); + log.warn(this.getClass().getName() + " retry scan end. groupName:[{}] startId:[{}] preCostTime:[{}] total:[{}] realPullCount:[{}]", + groupName, lastId, total, preCostTime().get(), count); } - private void processRetryPartitionTasks(List partitionTasks) { + private void processRetryPartitionTasks(List partitionTasks, final ScanTask scanTask) { + // 批次查询场景 + Map sceneConfigMap = getSceneConfigMap(partitionTasks); + + List waitUpdateRetryTasks = new ArrayList<>(); + for (PartitionTask task : partitionTasks) { + RetryPartitionTask retryPartitionTask = (RetryPartitionTask) task; + SceneConfig sceneConfig = sceneConfigMap.get(retryPartitionTask.getSceneName()); + if (Objects.isNull(sceneConfig)) { + continue; + } + + RetryTask retryTask = processRetryTask(retryPartitionTask, sceneConfig); + waitUpdateRetryTasks.add(retryTask); + } + + // 批量更新 + retryTaskMapper.updateBatchNextTriggerAtById(scanTask.getGroupPartition(), waitUpdateRetryTasks); + + long nowMilli = DateUtils.toNowMilli(); for (PartitionTask partitionTask : partitionTasks) { - processRetryTask((RetryPartitionTask) partitionTask); + RetryPartitionTask retryPartitionTask = (RetryPartitionTask) partitionTask; + long delay = DateUtils.toEpochMilli(retryPartitionTask.getNextTriggerAt()) - nowMilli - nowMilli % 100; + RetryTimerWheel.register( + retryPartitionTask.getGroupName(), + retryPartitionTask.getUniqueId(), + timerTask(retryPartitionTask), + delay, + TimeUnit.MILLISECONDS); } } - private void processRetryTask(RetryPartitionTask partitionTask) { + private Map getSceneConfigMap(final List partitionTasks) { + Set sceneNameSet = partitionTasks.stream() + .map(partitionTask -> ((RetryPartitionTask) partitionTask).getSceneName()).collect(Collectors.toSet()); + List sceneConfigs = accessTemplate.getSceneConfigAccess() + .list(new LambdaQueryWrapper() + .select(SceneConfig::getBackOff, SceneConfig::getTriggerInterval, SceneConfig::getSceneName) + .in(SceneConfig::getSceneName, sceneNameSet)); + return sceneConfigs.stream() + .collect(Collectors.toMap(SceneConfig::getSceneName, i -> i)); + } + private RetryTask processRetryTask(RetryPartitionTask partitionTask, SceneConfig sceneConfig) { RetryTask retryTask = new RetryTask(); - retryTask.setNextTriggerAt(calculateNextTriggerTime(partitionTask)); + retryTask.setNextTriggerAt(calculateNextTriggerTime(partitionTask, sceneConfig)); retryTask.setId(partitionTask.getId()); - accessTemplate.getRetryTaskAccess().updateById(partitionTask.getGroupName(), retryTask); - - long nowMilli = DateUtils.toNowMilli(); - long delay = DateUtils.toEpochMilli(partitionTask.getNextTriggerAt()) - nowMilli - nowMilli % 100; - RetryTimerWheel.register(partitionTask.getGroupName(), partitionTask.getUniqueId(), timerTask(partitionTask), - delay, - TimeUnit.MILLISECONDS); + return retryTask; } protected abstract TaskExecutorSceneEnum taskActuatorScene(); @@ -146,7 +180,8 @@ public abstract class AbstractScanGroup extends AbstractActor { protected abstract void putLastId(String groupName, Long lastId); - protected abstract LocalDateTime calculateNextTriggerTime(RetryPartitionTask partitionTask); + protected abstract LocalDateTime calculateNextTriggerTime(RetryPartitionTask partitionTask, + final SceneConfig sceneConfig); protected abstract TimerTask timerTask(RetryPartitionTask partitionTask); @@ -156,15 +191,17 @@ public abstract class AbstractScanGroup extends AbstractActor { public List listAvailableTasks(String groupName, Long lastId, Integer taskType) { List retryTasks = accessTemplate.getRetryTaskAccess() - .listPage(groupName, new PageDTO<>(0, systemProperties.getRetryPullPageSize()), - new LambdaQueryWrapper() - .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()) - .eq(RetryTask::getGroupName, groupName) - .eq(RetryTask::getTaskType, taskType) - .le(RetryTask::getNextTriggerAt, LocalDateTime.now().plusSeconds(SystemConstants.SCHEDULE_PERIOD)) - .gt(RetryTask::getId, lastId) - .orderByAsc(RetryTask::getId)) - .getRecords(); + .listPage(groupName, new PageDTO<>(0, systemProperties.getRetryPullPageSize()), + new LambdaQueryWrapper() + .select(RetryTask::getId, RetryTask::getNextTriggerAt, RetryTask::getUniqueId, + RetryTask::getGroupName, RetryTask::getRetryCount, RetryTask::getSceneName) + .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()) + .eq(RetryTask::getGroupName, groupName) + .eq(RetryTask::getTaskType, taskType) + .le(RetryTask::getNextTriggerAt, LocalDateTime.now().plusSeconds(SystemConstants.SCHEDULE_PERIOD)) + .gt(RetryTask::getId, lastId) + .orderByAsc(RetryTask::getId)) + .getRecords(); return RetryTaskConverter.INSTANCE.toRetryPartitionTasks(retryTasks); } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java index 0e37468b..2d058de6 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java @@ -9,6 +9,7 @@ import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyC import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyEnum; import com.aizuda.easy.retry.server.retry.task.support.timer.CallbackTimerTask; import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerContext; +import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig; import io.netty.util.TimerTask; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.config.ConfigurableBeanFactory; @@ -56,7 +57,8 @@ public class ScanCallbackTaskActor extends AbstractScanGroup { } @Override - protected LocalDateTime calculateNextTriggerTime(final RetryPartitionTask partitionTask) { + protected LocalDateTime calculateNextTriggerTime(final RetryPartitionTask partitionTask, + final SceneConfig sceneConfig) { long triggerInterval = systemProperties.getCallback().getTriggerInterval(); WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(WaitStrategyEnum.FIXED.getType()); diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java index f78fbe8b..aa967581 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java @@ -57,10 +57,9 @@ public class ScanRetryTaskActor extends AbstractScanGroup { LAST_AT_MAP.put(groupName, lastId); } - protected LocalDateTime calculateNextTriggerTime(RetryPartitionTask partitionTask) { + @Override + protected LocalDateTime calculateNextTriggerTime(RetryPartitionTask partitionTask, final SceneConfig sceneConfig) { // 更新下次触发时间 - SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess() - .getSceneConfigByGroupNameAndSceneName(partitionTask.getGroupName(), partitionTask.getSceneName()); WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/TimerIdempotent.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/TimerIdempotent.java index 431e53ad..d4031741 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/TimerIdempotent.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/TimerIdempotent.java @@ -2,11 +2,11 @@ package com.aizuda.easy.retry.server.retry.task.support.idempotent; import cn.hutool.core.util.StrUtil; import com.aizuda.easy.retry.server.common.IdempotentStrategy; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import lombok.extern.slf4j.Slf4j; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.TimeUnit; /** * @author www.byteblogs.com @@ -16,11 +16,24 @@ import java.util.concurrent.CopyOnWriteArraySet; @Slf4j public class TimerIdempotent implements IdempotentStrategy { - private static final CopyOnWriteArraySet cache = new CopyOnWriteArraySet<>(); + + private static final Cache cache; + + static { + cache = CacheBuilder.newBuilder() + .concurrencyLevel(16) // 并发级别 + .expireAfterWrite(20, TimeUnit.SECONDS) + .build(); + } @Override public boolean set(String key, String value) { - return cache.add(key.concat(StrUtil.UNDERLINE).concat(String.valueOf(value))); + cache.put(getKey(key, value), value); + return Boolean.TRUE; + } + + private static String getKey(final String key, final String value) { + return key.concat(StrUtil.UNDERLINE).concat(String.valueOf(value)); } @Override @@ -30,14 +43,12 @@ public class TimerIdempotent implements IdempotentStrategy { @Override public boolean isExist(String key, String value) { - if (key == null || value == null) { - log.error("异常监控. key:[{}] value:[{}]", key, value); - } - return cache.contains(key.concat(StrUtil.UNDERLINE).concat(String.valueOf(value))); + return cache.asMap().containsKey(getKey(key, value)); } @Override public boolean clear(String key, String value) { - return cache.removeIf(s-> s.equals(key.concat(StrUtil.UNDERLINE).concat(String.valueOf(value)))); + cache.invalidate(getKey(key, value)); + return Boolean.TRUE; } } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/RetryTimerWheel.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/RetryTimerWheel.java index f3c3a359..7f71c1a2 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/RetryTimerWheel.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/RetryTimerWheel.java @@ -37,7 +37,6 @@ public class RetryTimerWheel implements Lifecycle { if (!isExisted(groupName, uniqueId)) { delay = delay < 0 ? 0 : delay; - log.info("加入时间轮. delay:[{}ms] uniqueId:[{}]", delay, uniqueId); try { timer.newTimeout(task, delay, unit); idempotent.set(uniqueId, uniqueId); diff --git a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java index 1fe38368..3969125d 100644 --- a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java +++ b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java @@ -73,7 +73,7 @@ public class ConsumerBucketActor extends AbstractActor { // 查询桶对应组信息 groupConfigs = accessTemplate.getGroupConfigAccess().list( new LambdaQueryWrapper() - .select(GroupConfig::getGroupName) + .select(GroupConfig::getGroupName, GroupConfig::getGroupPartition) .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()) .in(GroupConfig::getBucketIndex, consumerBucket.getBuckets()) ); @@ -87,6 +87,7 @@ public class ConsumerBucketActor extends AbstractActor { ScanTask scanTask = new ScanTask(); scanTask.setGroupName(groupConfig.getGroupName()); scanTask.setBuckets(consumerBucket.getBuckets()); + scanTask.setGroupPartition(groupConfig.getGroupPartition()); produceScanActorTask(scanTask); } } @@ -96,8 +97,6 @@ public class ConsumerBucketActor extends AbstractActor { // 扫描回调数据 ScanTask scanTask = new ScanTask(); scanTask.setBuckets(consumerBucket.getBuckets()); - scanTask.setSize(1000); - scanTask.setStartId(0); ActorRef scanJobActorRef = cacheActorRef("DEFAULT_JOB_KEY", TaskTypeEnum.JOB); scanJobActorRef.tell(scanTask, scanJobActorRef); } diff --git a/easy-retry-server/easy-retry-server-starter/src/main/resources/application.yml b/easy-retry-server/easy-retry-server-starter/src/main/resources/application.yml index f06cdf2f..8a8f0be1 100644 --- a/easy-retry-server/easy-retry-server-starter/src/main/resources/application.yml +++ b/easy-retry-server/easy-retry-server-starter/src/main/resources/application.yml @@ -37,10 +37,11 @@ logging: config: classpath:logback-boot.xml easy-retry: - retry-pull-page-size: 100 # 拉取重试数据的每批次的大小 + retry-pull-page-size: 1000 # 拉取重试数据的每批次的大小 + job-pull-page-size: 1000 # 拉取重试数据的每批次的大小 netty-port: 1788 # 服务端netty端口 total-partition: 2 # 重试和死信表的分区总数 - limiter: 10 # 一个客户端每秒最多接收的重试数量指令 + limiter: 1000 # 一个客户端每秒最多接收的重试数量指令 step: 100 # 号段模式下步长配置 log-storage: 90 # 日志保存时间(单位: day) callback: # 回调配置 @@ -48,6 +49,7 @@ easy-retry: trigger-interval: 900 #间隔时间 db-type: mysql #当前使用的数据库 mode: all + retry-max-pull-count: 10