diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/JobMapper.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/JobMapper.java
index 278797bb..af4aaf00 100644
--- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/JobMapper.java
+++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/JobMapper.java
@@ -1,8 +1,12 @@
package com.aizuda.easy.retry.template.datasource.persistence.mapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
+import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
+import org.apache.ibatis.annotations.Param;
+
+import java.util.List;
/**
*
@@ -15,4 +19,6 @@ import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface JobMapper extends BaseMapper {
+ int updateBatchNextTriggerAtById(@Param("list") List list);
+
}
diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/RetryTaskMapper.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/RetryTaskMapper.java
index abbdda0b..3979cb2e 100644
--- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/RetryTaskMapper.java
+++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/RetryTaskMapper.java
@@ -10,4 +10,5 @@ public interface RetryTaskMapper extends BaseMapper {
int batchInsert(@Param("list") List list);
+ int updateBatchNextTriggerAtById(@Param("partition") Integer partition, @Param("list") List list);
}
diff --git a/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/JobMapper.xml b/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/JobMapper.xml
index 3825b693..de263ea0 100644
--- a/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/JobMapper.xml
+++ b/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/JobMapper.xml
@@ -14,7 +14,7 @@
-
+
@@ -26,4 +26,18 @@
+
+ update job rt,
+ (
+
+ select
+ #{item.nextTriggerAt} as next_trigger_at,
+ #{item.id} as id
+
+ ) tt
+ set
+ rt.next_trigger_at = tt.next_trigger_at
+ where rt.id = tt.id
+
+
diff --git a/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/RetryTaskMapper.xml b/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/RetryTaskMapper.xml
index 8718aa7d..f6d69dc8 100644
--- a/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/RetryTaskMapper.xml
+++ b/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/RetryTaskMapper.xml
@@ -30,4 +30,17 @@
(#{item.uniqueId}, #{item.groupName}, #{item.sceneName}, #{item.idempotentId}, #{item.bizNo}, #{item.executorName}, #{item.argsStr}, #{item.extAttrs}, #{item.nextTriggerAt}, #{item.taskType}, #{item.retryStatus}, #{item.createDt})
+
+ update retry_task_${partition} rt,
+ (
+
+ select
+ #{item.nextTriggerAt} as next_trigger_at,
+ #{item.id} as id
+
+ ) tt
+ set
+ rt.next_trigger_at = tt.next_trigger_at
+ where rt.id = tt.id
+
diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java
index 2c9c66b9..c147b785 100644
--- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java
+++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java
@@ -28,6 +28,11 @@ public class SystemProperties {
*/
private int retryPullPageSize = 1000;
+ /**
+ * 任务调度每次拉取的条数
+ */
+ private int jobPullPageSize = 1000;
+
/**
* 重试每次拉取的次数
*/
diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/ScanTask.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/ScanTask.java
index d9b9305f..9e1077db 100644
--- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/ScanTask.java
+++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/ScanTask.java
@@ -19,7 +19,5 @@ public class ScanTask {
private Set buckets;
- private long size;
-
- private long startId;
+ private Integer groupPartition;
}
diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java
index c85c5aa4..8bb4cd2e 100644
--- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java
+++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java
@@ -2,16 +2,15 @@ package com.aizuda.easy.retry.server.job.task.support.dispatch;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
-import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup;
+import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
import com.aizuda.easy.retry.server.common.dto.ScanTask;
-import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
@@ -30,6 +29,7 @@ import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -49,6 +49,8 @@ public class ScanJobTaskActor extends AbstractActor {
@Autowired
private JobMapper jobMapper;
+ @Autowired
+ private SystemProperties systemProperties;
@Override
public Receive createReceive() {
@@ -65,23 +67,36 @@ public class ScanJobTaskActor extends AbstractActor {
}
private void doScan(final ScanTask scanTask) {
- log.info("job scan start");
if (CollectionUtils.isEmpty(scanTask.getBuckets())) {
return;
}
- long total = PartitionTaskUtils.process(startId -> listAvailableJobs(startId, scanTask), this::processJobPartitionTasks, scanTask.getStartId());
+ long total = PartitionTaskUtils.process(startId -> listAvailableJobs(startId, scanTask),
+ this::processJobPartitionTasks, 0);
log.info("job scan end. total:[{}]", total);
}
private void processJobPartitionTasks(List extends PartitionTask> partitionTasks) {
+
+ List waitUpdateJobs = new ArrayList<>();
+ List waitExecJobs = new ArrayList<>();
for (PartitionTask partitionTask : partitionTasks) {
- processJob((JobPartitionTask) partitionTask);
+ processJob((JobPartitionTask) partitionTask, waitUpdateJobs, waitExecJobs);
+ }
+
+ // 批量更新
+ jobMapper.updateBatchNextTriggerAtById(waitUpdateJobs);
+
+ for (final JobTaskPrepareDTO waitExecJob : waitExecJobs) {
+ // 执行预处理阶段
+ ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
+ actorRef.tell(waitExecJob, actorRef);
}
}
- private void processJob(JobPartitionTask partitionTask) {
+ private void processJob(JobPartitionTask partitionTask, final List waitUpdateJobs,
+ final List waitExecJobs) {
CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName());
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(partitionTask);
@@ -98,27 +113,24 @@ public class ScanJobTaskActor extends AbstractActor {
triggerTask = Objects.isNull(nextTriggerAt);
// 若出现常驻任务时间为null或者常驻任务的内存时间长期未更新, 刷新为now
long now = System.currentTimeMillis();
- if (Objects.isNull(nextTriggerAt) || (nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) {
+ if (Objects.isNull(nextTriggerAt)
+ || (nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) {
nextTriggerAt = now;
}
}
job.setNextTriggerAt(nextTriggerAt);
- Assert.isTrue(1 == jobMapper.updateById(job),
- () -> new EasyRetryServerException("更新job下次触发时间失败.jobId:[{}]", job.getId()));
+
+ waitUpdateJobs.add(job);
if (triggerTask) {
- // 执行预处理阶段
- ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
- actorRef.tell(jobTaskPrepare, actorRef);
+ waitExecJobs.add(jobTaskPrepare);
}
+
}
/**
- * 需要重新计算触发时间的条件
- * 1、不是常驻任务
- * 2、常驻任务缓存的触发任务为空
- * 3、常驻任务中的触发时间不是最新的
+ * 需要重新计算触发时间的条件 1、不是常驻任务 2、常驻任务缓存的触发任务为空 3、常驻任务中的触发时间不是最新的
*/
private static boolean needCalculateNextTriggerTime(JobPartitionTask partitionTask) {
return !Objects.equals(StatusEnum.YES.getStatus(), partitionTask.getResident());
@@ -146,13 +158,13 @@ public class ScanJobTaskActor extends AbstractActor {
return Collections.emptyList();
}
- List jobs = jobMapper.selectPage(new PageDTO(0, scanTask.getSize()),
- new LambdaQueryWrapper()
- .eq(Job::getJobStatus, StatusEnum.YES.getStatus())
- .in(Job::getBucketIndex, scanTask.getBuckets())
- .le(Job::getNextTriggerAt, System.currentTimeMillis() + SystemConstants.SCHEDULE_PERIOD * 1000)
- .eq(Job::getDeleted, StatusEnum.NO.getStatus())
- .ge(Job::getId, startId)
+ List jobs = jobMapper.selectPage(new PageDTO(0, systemProperties.getJobPullPageSize()),
+ new LambdaQueryWrapper()
+ .eq(Job::getJobStatus, StatusEnum.YES.getStatus())
+ .eq(Job::getDeleted, StatusEnum.NO.getStatus())
+ .in(Job::getBucketIndex, scanTask.getBuckets())
+ .le(Job::getNextTriggerAt, System.currentTimeMillis() + SystemConstants.SCHEDULE_PERIOD * 1000)
+ .ge(Job::getId, startId)
).getRecords();
return JobTaskConverter.INSTANCE.toJobPartitionTasks(jobs);
diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java
index 20373260..e85d23fc 100644
--- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java
+++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java
@@ -17,7 +17,10 @@ import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecuto
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerWheel;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
+import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
+import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
+import com.aizuda.easy.retry.template.datasource.utils.RequestDataHelper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import io.netty.util.TimerTask;
@@ -27,12 +30,16 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
-import java.time.ZoneId;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
/**
* 数据扫描模板类
@@ -55,6 +62,8 @@ public abstract class AbstractScanGroup extends AbstractActor {
protected ClientNodeAllocateHandler clientNodeAllocateHandler;
@Autowired
protected List taskExecutors;
+ @Autowired
+ protected RetryTaskMapper retryTaskMapper;
@Override
public Receive createReceive() {
@@ -74,8 +83,6 @@ public abstract class AbstractScanGroup extends AbstractActor {
preCostTime().set((endTime - startTime) / 1_000_000);
- log.info(this.getClass().getName() + "重试任务调度耗时:[{}]", preCostTime().get());
-
}).build();
}
@@ -92,20 +99,17 @@ public abstract class AbstractScanGroup extends AbstractActor {
String groupName = scanTask.getGroupName();
Long lastId = Optional.ofNullable(getLastId(groupName)).orElse(0L);
- log.info(this.getClass().getName() + " retry scan start. groupName:[{}] startId:[{}] pullCount:[{}] preCostTime:[{}]",
- groupName, lastId, prePullCount().get(), preCostTime().get());
-
AtomicInteger count = new AtomicInteger(0);
long total = PartitionTaskUtils.process(
startId -> listAvailableTasks(groupName, startId, taskActuatorScene().getTaskType().getType()),
- this::processRetryPartitionTasks, partitionTasks -> {
+ partitionTasks1 -> processRetryPartitionTasks(partitionTasks1, scanTask), partitionTasks -> {
if (CollectionUtils.isEmpty(partitionTasks)) {
putLastId(scanTask.getGroupName(), 0L);
return Boolean.TRUE;
}
// 超过最大的拉取次数则中断
- if (count.getAndIncrement() > prePullCount().get()) {
+ if (count.incrementAndGet() >= prePullCount().get()) {
putLastId(scanTask.getGroupName(), partitionTasks.get(partitionTasks.size() - 1).getId());
return Boolean.TRUE;
}
@@ -113,31 +117,61 @@ public abstract class AbstractScanGroup extends AbstractActor {
return false;
}, lastId);
- log.info(this.getClass().getName() + " retry scan end. groupName:[{}] total:[{}] realPullCount:[{}]",
- groupName, total, count);
+ log.warn(this.getClass().getName() + " retry scan end. groupName:[{}] startId:[{}] preCostTime:[{}] total:[{}] realPullCount:[{}]",
+ groupName, lastId, total, preCostTime().get(), count);
}
- private void processRetryPartitionTasks(List extends PartitionTask> partitionTasks) {
+ private void processRetryPartitionTasks(List extends PartitionTask> partitionTasks, final ScanTask scanTask) {
+ // 批次查询场景
+ Map sceneConfigMap = getSceneConfigMap(partitionTasks);
+
+ List waitUpdateRetryTasks = new ArrayList<>();
+ for (PartitionTask task : partitionTasks) {
+ RetryPartitionTask retryPartitionTask = (RetryPartitionTask) task;
+ SceneConfig sceneConfig = sceneConfigMap.get(retryPartitionTask.getSceneName());
+ if (Objects.isNull(sceneConfig)) {
+ continue;
+ }
+
+ RetryTask retryTask = processRetryTask(retryPartitionTask, sceneConfig);
+ waitUpdateRetryTasks.add(retryTask);
+ }
+
+ // 批量更新
+ retryTaskMapper.updateBatchNextTriggerAtById(scanTask.getGroupPartition(), waitUpdateRetryTasks);
+
+ long nowMilli = DateUtils.toNowMilli();
for (PartitionTask partitionTask : partitionTasks) {
- processRetryTask((RetryPartitionTask) partitionTask);
+ RetryPartitionTask retryPartitionTask = (RetryPartitionTask) partitionTask;
+ long delay = DateUtils.toEpochMilli(retryPartitionTask.getNextTriggerAt()) - nowMilli - nowMilli % 100;
+ RetryTimerWheel.register(
+ retryPartitionTask.getGroupName(),
+ retryPartitionTask.getUniqueId(),
+ timerTask(retryPartitionTask),
+ delay,
+ TimeUnit.MILLISECONDS);
}
}
- private void processRetryTask(RetryPartitionTask partitionTask) {
+ private Map getSceneConfigMap(final List extends PartitionTask> partitionTasks) {
+ Set sceneNameSet = partitionTasks.stream()
+ .map(partitionTask -> ((RetryPartitionTask) partitionTask).getSceneName()).collect(Collectors.toSet());
+ List sceneConfigs = accessTemplate.getSceneConfigAccess()
+ .list(new LambdaQueryWrapper()
+ .select(SceneConfig::getBackOff, SceneConfig::getTriggerInterval, SceneConfig::getSceneName)
+ .in(SceneConfig::getSceneName, sceneNameSet));
+ return sceneConfigs.stream()
+ .collect(Collectors.toMap(SceneConfig::getSceneName, i -> i));
+ }
+ private RetryTask processRetryTask(RetryPartitionTask partitionTask, SceneConfig sceneConfig) {
RetryTask retryTask = new RetryTask();
- retryTask.setNextTriggerAt(calculateNextTriggerTime(partitionTask));
+ retryTask.setNextTriggerAt(calculateNextTriggerTime(partitionTask, sceneConfig));
retryTask.setId(partitionTask.getId());
- accessTemplate.getRetryTaskAccess().updateById(partitionTask.getGroupName(), retryTask);
-
- long nowMilli = DateUtils.toNowMilli();
- long delay = DateUtils.toEpochMilli(partitionTask.getNextTriggerAt()) - nowMilli - nowMilli % 100;
- RetryTimerWheel.register(partitionTask.getGroupName(), partitionTask.getUniqueId(), timerTask(partitionTask),
- delay,
- TimeUnit.MILLISECONDS);
+ return retryTask;
}
protected abstract TaskExecutorSceneEnum taskActuatorScene();
@@ -146,7 +180,8 @@ public abstract class AbstractScanGroup extends AbstractActor {
protected abstract void putLastId(String groupName, Long lastId);
- protected abstract LocalDateTime calculateNextTriggerTime(RetryPartitionTask partitionTask);
+ protected abstract LocalDateTime calculateNextTriggerTime(RetryPartitionTask partitionTask,
+ final SceneConfig sceneConfig);
protected abstract TimerTask timerTask(RetryPartitionTask partitionTask);
@@ -156,15 +191,17 @@ public abstract class AbstractScanGroup extends AbstractActor {
public List listAvailableTasks(String groupName, Long lastId, Integer taskType) {
List retryTasks = accessTemplate.getRetryTaskAccess()
- .listPage(groupName, new PageDTO<>(0, systemProperties.getRetryPullPageSize()),
- new LambdaQueryWrapper()
- .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())
- .eq(RetryTask::getGroupName, groupName)
- .eq(RetryTask::getTaskType, taskType)
- .le(RetryTask::getNextTriggerAt, LocalDateTime.now().plusSeconds(SystemConstants.SCHEDULE_PERIOD))
- .gt(RetryTask::getId, lastId)
- .orderByAsc(RetryTask::getId))
- .getRecords();
+ .listPage(groupName, new PageDTO<>(0, systemProperties.getRetryPullPageSize()),
+ new LambdaQueryWrapper()
+ .select(RetryTask::getId, RetryTask::getNextTriggerAt, RetryTask::getUniqueId,
+ RetryTask::getGroupName, RetryTask::getRetryCount, RetryTask::getSceneName)
+ .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())
+ .eq(RetryTask::getGroupName, groupName)
+ .eq(RetryTask::getTaskType, taskType)
+ .le(RetryTask::getNextTriggerAt, LocalDateTime.now().plusSeconds(SystemConstants.SCHEDULE_PERIOD))
+ .gt(RetryTask::getId, lastId)
+ .orderByAsc(RetryTask::getId))
+ .getRecords();
return RetryTaskConverter.INSTANCE.toRetryPartitionTasks(retryTasks);
}
diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java
index 0e37468b..2d058de6 100644
--- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java
+++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java
@@ -9,6 +9,7 @@ import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyC
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyEnum;
import com.aizuda.easy.retry.server.retry.task.support.timer.CallbackTimerTask;
import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerContext;
+import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
@@ -56,7 +57,8 @@ public class ScanCallbackTaskActor extends AbstractScanGroup {
}
@Override
- protected LocalDateTime calculateNextTriggerTime(final RetryPartitionTask partitionTask) {
+ protected LocalDateTime calculateNextTriggerTime(final RetryPartitionTask partitionTask,
+ final SceneConfig sceneConfig) {
long triggerInterval = systemProperties.getCallback().getTriggerInterval();
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(WaitStrategyEnum.FIXED.getType());
diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java
index f78fbe8b..aa967581 100644
--- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java
+++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java
@@ -57,10 +57,9 @@ public class ScanRetryTaskActor extends AbstractScanGroup {
LAST_AT_MAP.put(groupName, lastId);
}
- protected LocalDateTime calculateNextTriggerTime(RetryPartitionTask partitionTask) {
+ @Override
+ protected LocalDateTime calculateNextTriggerTime(RetryPartitionTask partitionTask, final SceneConfig sceneConfig) {
// 更新下次触发时间
- SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess()
- .getSceneConfigByGroupNameAndSceneName(partitionTask.getGroupName(), partitionTask.getSceneName());
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/TimerIdempotent.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/TimerIdempotent.java
index 431e53ad..d4031741 100644
--- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/TimerIdempotent.java
+++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/TimerIdempotent.java
@@ -2,11 +2,11 @@ package com.aizuda.easy.retry.server.retry.task.support.idempotent;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.server.common.IdempotentStrategy;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import lombok.extern.slf4j.Slf4j;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.TimeUnit;
/**
* @author www.byteblogs.com
@@ -16,11 +16,24 @@ import java.util.concurrent.CopyOnWriteArraySet;
@Slf4j
public class TimerIdempotent implements IdempotentStrategy {
- private static final CopyOnWriteArraySet cache = new CopyOnWriteArraySet<>();
+
+ private static final Cache cache;
+
+ static {
+ cache = CacheBuilder.newBuilder()
+ .concurrencyLevel(16) // 并发级别
+ .expireAfterWrite(20, TimeUnit.SECONDS)
+ .build();
+ }
@Override
public boolean set(String key, String value) {
- return cache.add(key.concat(StrUtil.UNDERLINE).concat(String.valueOf(value)));
+ cache.put(getKey(key, value), value);
+ return Boolean.TRUE;
+ }
+
+ private static String getKey(final String key, final String value) {
+ return key.concat(StrUtil.UNDERLINE).concat(String.valueOf(value));
}
@Override
@@ -30,14 +43,12 @@ public class TimerIdempotent implements IdempotentStrategy {
@Override
public boolean isExist(String key, String value) {
- if (key == null || value == null) {
- log.error("异常监控. key:[{}] value:[{}]", key, value);
- }
- return cache.contains(key.concat(StrUtil.UNDERLINE).concat(String.valueOf(value)));
+ return cache.asMap().containsKey(getKey(key, value));
}
@Override
public boolean clear(String key, String value) {
- return cache.removeIf(s-> s.equals(key.concat(StrUtil.UNDERLINE).concat(String.valueOf(value))));
+ cache.invalidate(getKey(key, value));
+ return Boolean.TRUE;
}
}
diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/RetryTimerWheel.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/RetryTimerWheel.java
index f3c3a359..7f71c1a2 100644
--- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/RetryTimerWheel.java
+++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/RetryTimerWheel.java
@@ -37,7 +37,6 @@ public class RetryTimerWheel implements Lifecycle {
if (!isExisted(groupName, uniqueId)) {
delay = delay < 0 ? 0 : delay;
- log.info("加入时间轮. delay:[{}ms] uniqueId:[{}]", delay, uniqueId);
try {
timer.newTimeout(task, delay, unit);
idempotent.set(uniqueId, uniqueId);
diff --git a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java
index 1fe38368..3969125d 100644
--- a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java
+++ b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java
@@ -73,7 +73,7 @@ public class ConsumerBucketActor extends AbstractActor {
// 查询桶对应组信息
groupConfigs = accessTemplate.getGroupConfigAccess().list(
new LambdaQueryWrapper()
- .select(GroupConfig::getGroupName)
+ .select(GroupConfig::getGroupName, GroupConfig::getGroupPartition)
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
.in(GroupConfig::getBucketIndex, consumerBucket.getBuckets())
);
@@ -87,6 +87,7 @@ public class ConsumerBucketActor extends AbstractActor {
ScanTask scanTask = new ScanTask();
scanTask.setGroupName(groupConfig.getGroupName());
scanTask.setBuckets(consumerBucket.getBuckets());
+ scanTask.setGroupPartition(groupConfig.getGroupPartition());
produceScanActorTask(scanTask);
}
}
@@ -96,8 +97,6 @@ public class ConsumerBucketActor extends AbstractActor {
// 扫描回调数据
ScanTask scanTask = new ScanTask();
scanTask.setBuckets(consumerBucket.getBuckets());
- scanTask.setSize(1000);
- scanTask.setStartId(0);
ActorRef scanJobActorRef = cacheActorRef("DEFAULT_JOB_KEY", TaskTypeEnum.JOB);
scanJobActorRef.tell(scanTask, scanJobActorRef);
}
diff --git a/easy-retry-server/easy-retry-server-starter/src/main/resources/application.yml b/easy-retry-server/easy-retry-server-starter/src/main/resources/application.yml
index f06cdf2f..8a8f0be1 100644
--- a/easy-retry-server/easy-retry-server-starter/src/main/resources/application.yml
+++ b/easy-retry-server/easy-retry-server-starter/src/main/resources/application.yml
@@ -37,10 +37,11 @@ logging:
config: classpath:logback-boot.xml
easy-retry:
- retry-pull-page-size: 100 # 拉取重试数据的每批次的大小
+ retry-pull-page-size: 1000 # 拉取重试数据的每批次的大小
+ job-pull-page-size: 1000 # 拉取重试数据的每批次的大小
netty-port: 1788 # 服务端netty端口
total-partition: 2 # 重试和死信表的分区总数
- limiter: 10 # 一个客户端每秒最多接收的重试数量指令
+ limiter: 1000 # 一个客户端每秒最多接收的重试数量指令
step: 100 # 号段模式下步长配置
log-storage: 90 # 日志保存时间(单位: day)
callback: # 回调配置
@@ -48,6 +49,7 @@ easy-retry:
trigger-interval: 900 #间隔时间
db-type: mysql #当前使用的数据库
mode: all
+ retry-max-pull-count: 10