feat: 2.4.0
1. 优化重试的拉取数据太慢问题
This commit is contained in:
parent
1ceeb56113
commit
5ff06db7b8
@ -1,8 +1,12 @@
|
||||
package com.aizuda.easy.retry.template.datasource.persistence.mapper;
|
||||
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
@ -15,4 +19,6 @@ import org.apache.ibatis.annotations.Mapper;
|
||||
@Mapper
|
||||
public interface JobMapper extends BaseMapper<Job> {
|
||||
|
||||
int updateBatchNextTriggerAtById(@Param("list") List<Job> list);
|
||||
|
||||
}
|
||||
|
@ -10,4 +10,5 @@ public interface RetryTaskMapper extends BaseMapper<RetryTask> {
|
||||
|
||||
int batchInsert(@Param("list") List<RetryTask> list);
|
||||
|
||||
int updateBatchNextTriggerAtById(@Param("partition") Integer partition, @Param("list") List<RetryTask> list);
|
||||
}
|
||||
|
@ -14,7 +14,7 @@
|
||||
<result column="job_status" property="jobStatus"/>
|
||||
<result column="route_key" property="routeKey"/>
|
||||
<result column="executor_type" property="executorType"/>
|
||||
<result column="executor_name" property="executorName"/>
|
||||
<result column="executor_info" property="executorInfo"/>
|
||||
<result column="block_strategy" property="blockStrategy"/>
|
||||
<result column="executor_timeout" property="executorTimeout"/>
|
||||
<result column="max_retry_times" property="maxRetryTimes"/>
|
||||
@ -26,4 +26,18 @@
|
||||
<result column="deleted" property="deleted"/>
|
||||
</resultMap>
|
||||
|
||||
<update id="updateBatchNextTriggerAtById" parameterType="java.util.List">
|
||||
update job rt,
|
||||
(
|
||||
<foreach collection="list" item="item" index="index" separator=" union all ">
|
||||
select
|
||||
#{item.nextTriggerAt} as next_trigger_at,
|
||||
#{item.id} as id
|
||||
</foreach>
|
||||
) tt
|
||||
set
|
||||
rt.next_trigger_at = tt.next_trigger_at
|
||||
where rt.id = tt.id
|
||||
</update>
|
||||
|
||||
</mapper>
|
||||
|
@ -30,4 +30,17 @@
|
||||
(#{item.uniqueId}, #{item.groupName}, #{item.sceneName}, #{item.idempotentId}, #{item.bizNo}, #{item.executorName}, #{item.argsStr}, #{item.extAttrs}, #{item.nextTriggerAt}, #{item.taskType}, #{item.retryStatus}, #{item.createDt})
|
||||
</foreach>
|
||||
</insert>
|
||||
<update id="updateBatchNextTriggerAtById" parameterType="java.util.List">
|
||||
update retry_task_${partition} rt,
|
||||
(
|
||||
<foreach collection="list" item="item" index="index" separator=" union all ">
|
||||
select
|
||||
#{item.nextTriggerAt} as next_trigger_at,
|
||||
#{item.id} as id
|
||||
</foreach>
|
||||
) tt
|
||||
set
|
||||
rt.next_trigger_at = tt.next_trigger_at
|
||||
where rt.id = tt.id
|
||||
</update>
|
||||
</mapper>
|
||||
|
@ -28,6 +28,11 @@ public class SystemProperties {
|
||||
*/
|
||||
private int retryPullPageSize = 1000;
|
||||
|
||||
/**
|
||||
* 任务调度每次拉取的条数
|
||||
*/
|
||||
private int jobPullPageSize = 1000;
|
||||
|
||||
/**
|
||||
* 重试每次拉取的次数
|
||||
*/
|
||||
|
@ -19,7 +19,5 @@ public class ScanTask {
|
||||
|
||||
private Set<Integer> buckets;
|
||||
|
||||
private long size;
|
||||
|
||||
private long startId;
|
||||
private Integer groupPartition;
|
||||
}
|
||||
|
@ -2,16 +2,15 @@ package com.aizuda.easy.retry.server.job.task.support.dispatch;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.ActorRef;
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
|
||||
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
|
||||
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||
import com.aizuda.easy.retry.server.common.WaitStrategy;
|
||||
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||
import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup;
|
||||
import com.aizuda.easy.retry.server.common.config.SystemProperties;
|
||||
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
|
||||
import com.aizuda.easy.retry.server.common.dto.ScanTask;
|
||||
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
||||
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
|
||||
import com.aizuda.easy.retry.server.common.util.DateUtils;
|
||||
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
|
||||
@ -30,6 +29,7 @@ import org.springframework.context.annotation.Scope;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
@ -49,6 +49,8 @@ public class ScanJobTaskActor extends AbstractActor {
|
||||
|
||||
@Autowired
|
||||
private JobMapper jobMapper;
|
||||
@Autowired
|
||||
private SystemProperties systemProperties;
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
@ -65,23 +67,36 @@ public class ScanJobTaskActor extends AbstractActor {
|
||||
}
|
||||
|
||||
private void doScan(final ScanTask scanTask) {
|
||||
log.info("job scan start");
|
||||
if (CollectionUtils.isEmpty(scanTask.getBuckets())) {
|
||||
return;
|
||||
}
|
||||
|
||||
long total = PartitionTaskUtils.process(startId -> listAvailableJobs(startId, scanTask), this::processJobPartitionTasks, scanTask.getStartId());
|
||||
long total = PartitionTaskUtils.process(startId -> listAvailableJobs(startId, scanTask),
|
||||
this::processJobPartitionTasks, 0);
|
||||
|
||||
log.info("job scan end. total:[{}]", total);
|
||||
}
|
||||
|
||||
private void processJobPartitionTasks(List<? extends PartitionTask> partitionTasks) {
|
||||
|
||||
List<Job> waitUpdateJobs = new ArrayList<>();
|
||||
List<JobTaskPrepareDTO> waitExecJobs = new ArrayList<>();
|
||||
for (PartitionTask partitionTask : partitionTasks) {
|
||||
processJob((JobPartitionTask) partitionTask);
|
||||
processJob((JobPartitionTask) partitionTask, waitUpdateJobs, waitExecJobs);
|
||||
}
|
||||
|
||||
// 批量更新
|
||||
jobMapper.updateBatchNextTriggerAtById(waitUpdateJobs);
|
||||
|
||||
for (final JobTaskPrepareDTO waitExecJob : waitExecJobs) {
|
||||
// 执行预处理阶段
|
||||
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
|
||||
actorRef.tell(waitExecJob, actorRef);
|
||||
}
|
||||
}
|
||||
|
||||
private void processJob(JobPartitionTask partitionTask) {
|
||||
private void processJob(JobPartitionTask partitionTask, final List<Job> waitUpdateJobs,
|
||||
final List<JobTaskPrepareDTO> waitExecJobs) {
|
||||
CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName());
|
||||
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(partitionTask);
|
||||
|
||||
@ -98,27 +113,24 @@ public class ScanJobTaskActor extends AbstractActor {
|
||||
triggerTask = Objects.isNull(nextTriggerAt);
|
||||
// 若出现常驻任务时间为null或者常驻任务的内存时间长期未更新, 刷新为now
|
||||
long now = System.currentTimeMillis();
|
||||
if (Objects.isNull(nextTriggerAt) || (nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) {
|
||||
if (Objects.isNull(nextTriggerAt)
|
||||
|| (nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) {
|
||||
nextTriggerAt = now;
|
||||
}
|
||||
}
|
||||
|
||||
job.setNextTriggerAt(nextTriggerAt);
|
||||
Assert.isTrue(1 == jobMapper.updateById(job),
|
||||
() -> new EasyRetryServerException("更新job下次触发时间失败.jobId:[{}]", job.getId()));
|
||||
|
||||
waitUpdateJobs.add(job);
|
||||
|
||||
if (triggerTask) {
|
||||
// 执行预处理阶段
|
||||
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
|
||||
actorRef.tell(jobTaskPrepare, actorRef);
|
||||
waitExecJobs.add(jobTaskPrepare);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 需要重新计算触发时间的条件
|
||||
* 1、不是常驻任务
|
||||
* 2、常驻任务缓存的触发任务为空
|
||||
* 3、常驻任务中的触发时间不是最新的
|
||||
* 需要重新计算触发时间的条件 1、不是常驻任务 2、常驻任务缓存的触发任务为空 3、常驻任务中的触发时间不是最新的
|
||||
*/
|
||||
private static boolean needCalculateNextTriggerTime(JobPartitionTask partitionTask) {
|
||||
return !Objects.equals(StatusEnum.YES.getStatus(), partitionTask.getResident());
|
||||
@ -146,13 +158,13 @@ public class ScanJobTaskActor extends AbstractActor {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
List<Job> jobs = jobMapper.selectPage(new PageDTO<Job>(0, scanTask.getSize()),
|
||||
new LambdaQueryWrapper<Job>()
|
||||
.eq(Job::getJobStatus, StatusEnum.YES.getStatus())
|
||||
.in(Job::getBucketIndex, scanTask.getBuckets())
|
||||
.le(Job::getNextTriggerAt, System.currentTimeMillis() + SystemConstants.SCHEDULE_PERIOD * 1000)
|
||||
.eq(Job::getDeleted, StatusEnum.NO.getStatus())
|
||||
.ge(Job::getId, startId)
|
||||
List<Job> jobs = jobMapper.selectPage(new PageDTO<Job>(0, systemProperties.getJobPullPageSize()),
|
||||
new LambdaQueryWrapper<Job>()
|
||||
.eq(Job::getJobStatus, StatusEnum.YES.getStatus())
|
||||
.eq(Job::getDeleted, StatusEnum.NO.getStatus())
|
||||
.in(Job::getBucketIndex, scanTask.getBuckets())
|
||||
.le(Job::getNextTriggerAt, System.currentTimeMillis() + SystemConstants.SCHEDULE_PERIOD * 1000)
|
||||
.ge(Job::getId, startId)
|
||||
).getRecords();
|
||||
|
||||
return JobTaskConverter.INSTANCE.toJobPartitionTasks(jobs);
|
||||
|
@ -17,7 +17,10 @@ import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecuto
|
||||
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerWheel;
|
||||
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskMapper;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
|
||||
import com.aizuda.easy.retry.template.datasource.utils.RequestDataHelper;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
||||
import io.netty.util.TimerTask;
|
||||
@ -27,12 +30,16 @@ import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 数据扫描模板类
|
||||
@ -55,6 +62,8 @@ public abstract class AbstractScanGroup extends AbstractActor {
|
||||
protected ClientNodeAllocateHandler clientNodeAllocateHandler;
|
||||
@Autowired
|
||||
protected List<TaskExecutor> taskExecutors;
|
||||
@Autowired
|
||||
protected RetryTaskMapper retryTaskMapper;
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
@ -74,8 +83,6 @@ public abstract class AbstractScanGroup extends AbstractActor {
|
||||
|
||||
preCostTime().set((endTime - startTime) / 1_000_000);
|
||||
|
||||
log.info(this.getClass().getName() + "重试任务调度耗时:[{}]", preCostTime().get());
|
||||
|
||||
}).build();
|
||||
|
||||
}
|
||||
@ -92,20 +99,17 @@ public abstract class AbstractScanGroup extends AbstractActor {
|
||||
String groupName = scanTask.getGroupName();
|
||||
Long lastId = Optional.ofNullable(getLastId(groupName)).orElse(0L);
|
||||
|
||||
log.info(this.getClass().getName() + " retry scan start. groupName:[{}] startId:[{}] pullCount:[{}] preCostTime:[{}]",
|
||||
groupName, lastId, prePullCount().get(), preCostTime().get());
|
||||
|
||||
AtomicInteger count = new AtomicInteger(0);
|
||||
long total = PartitionTaskUtils.process(
|
||||
startId -> listAvailableTasks(groupName, startId, taskActuatorScene().getTaskType().getType()),
|
||||
this::processRetryPartitionTasks, partitionTasks -> {
|
||||
partitionTasks1 -> processRetryPartitionTasks(partitionTasks1, scanTask), partitionTasks -> {
|
||||
if (CollectionUtils.isEmpty(partitionTasks)) {
|
||||
putLastId(scanTask.getGroupName(), 0L);
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
|
||||
// 超过最大的拉取次数则中断
|
||||
if (count.getAndIncrement() > prePullCount().get()) {
|
||||
if (count.incrementAndGet() >= prePullCount().get()) {
|
||||
putLastId(scanTask.getGroupName(), partitionTasks.get(partitionTasks.size() - 1).getId());
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
@ -113,31 +117,61 @@ public abstract class AbstractScanGroup extends AbstractActor {
|
||||
return false;
|
||||
}, lastId);
|
||||
|
||||
log.info(this.getClass().getName() + " retry scan end. groupName:[{}] total:[{}] realPullCount:[{}]",
|
||||
groupName, total, count);
|
||||
log.warn(this.getClass().getName() + " retry scan end. groupName:[{}] startId:[{}] preCostTime:[{}] total:[{}] realPullCount:[{}]",
|
||||
groupName, lastId, total, preCostTime().get(), count);
|
||||
|
||||
}
|
||||
|
||||
private void processRetryPartitionTasks(List<? extends PartitionTask> partitionTasks) {
|
||||
private void processRetryPartitionTasks(List<? extends PartitionTask> partitionTasks, final ScanTask scanTask) {
|
||||
|
||||
// 批次查询场景
|
||||
Map<String, SceneConfig> sceneConfigMap = getSceneConfigMap(partitionTasks);
|
||||
|
||||
List<RetryTask> waitUpdateRetryTasks = new ArrayList<>();
|
||||
for (PartitionTask task : partitionTasks) {
|
||||
RetryPartitionTask retryPartitionTask = (RetryPartitionTask) task;
|
||||
SceneConfig sceneConfig = sceneConfigMap.get(retryPartitionTask.getSceneName());
|
||||
if (Objects.isNull(sceneConfig)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
RetryTask retryTask = processRetryTask(retryPartitionTask, sceneConfig);
|
||||
waitUpdateRetryTasks.add(retryTask);
|
||||
}
|
||||
|
||||
// 批量更新
|
||||
retryTaskMapper.updateBatchNextTriggerAtById(scanTask.getGroupPartition(), waitUpdateRetryTasks);
|
||||
|
||||
long nowMilli = DateUtils.toNowMilli();
|
||||
for (PartitionTask partitionTask : partitionTasks) {
|
||||
processRetryTask((RetryPartitionTask) partitionTask);
|
||||
RetryPartitionTask retryPartitionTask = (RetryPartitionTask) partitionTask;
|
||||
long delay = DateUtils.toEpochMilli(retryPartitionTask.getNextTriggerAt()) - nowMilli - nowMilli % 100;
|
||||
RetryTimerWheel.register(
|
||||
retryPartitionTask.getGroupName(),
|
||||
retryPartitionTask.getUniqueId(),
|
||||
timerTask(retryPartitionTask),
|
||||
delay,
|
||||
TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void processRetryTask(RetryPartitionTask partitionTask) {
|
||||
private Map<String, SceneConfig> getSceneConfigMap(final List<? extends PartitionTask> partitionTasks) {
|
||||
Set<String> sceneNameSet = partitionTasks.stream()
|
||||
.map(partitionTask -> ((RetryPartitionTask) partitionTask).getSceneName()).collect(Collectors.toSet());
|
||||
List<SceneConfig> sceneConfigs = accessTemplate.getSceneConfigAccess()
|
||||
.list(new LambdaQueryWrapper<SceneConfig>()
|
||||
.select(SceneConfig::getBackOff, SceneConfig::getTriggerInterval, SceneConfig::getSceneName)
|
||||
.in(SceneConfig::getSceneName, sceneNameSet));
|
||||
return sceneConfigs.stream()
|
||||
.collect(Collectors.toMap(SceneConfig::getSceneName, i -> i));
|
||||
}
|
||||
|
||||
private RetryTask processRetryTask(RetryPartitionTask partitionTask, SceneConfig sceneConfig) {
|
||||
RetryTask retryTask = new RetryTask();
|
||||
retryTask.setNextTriggerAt(calculateNextTriggerTime(partitionTask));
|
||||
retryTask.setNextTriggerAt(calculateNextTriggerTime(partitionTask, sceneConfig));
|
||||
retryTask.setId(partitionTask.getId());
|
||||
accessTemplate.getRetryTaskAccess().updateById(partitionTask.getGroupName(), retryTask);
|
||||
|
||||
long nowMilli = DateUtils.toNowMilli();
|
||||
long delay = DateUtils.toEpochMilli(partitionTask.getNextTriggerAt()) - nowMilli - nowMilli % 100;
|
||||
RetryTimerWheel.register(partitionTask.getGroupName(), partitionTask.getUniqueId(), timerTask(partitionTask),
|
||||
delay,
|
||||
TimeUnit.MILLISECONDS);
|
||||
return retryTask;
|
||||
}
|
||||
|
||||
protected abstract TaskExecutorSceneEnum taskActuatorScene();
|
||||
@ -146,7 +180,8 @@ public abstract class AbstractScanGroup extends AbstractActor {
|
||||
|
||||
protected abstract void putLastId(String groupName, Long lastId);
|
||||
|
||||
protected abstract LocalDateTime calculateNextTriggerTime(RetryPartitionTask partitionTask);
|
||||
protected abstract LocalDateTime calculateNextTriggerTime(RetryPartitionTask partitionTask,
|
||||
final SceneConfig sceneConfig);
|
||||
|
||||
protected abstract TimerTask timerTask(RetryPartitionTask partitionTask);
|
||||
|
||||
@ -156,15 +191,17 @@ public abstract class AbstractScanGroup extends AbstractActor {
|
||||
|
||||
public List<RetryPartitionTask> listAvailableTasks(String groupName, Long lastId, Integer taskType) {
|
||||
List<RetryTask> retryTasks = accessTemplate.getRetryTaskAccess()
|
||||
.listPage(groupName, new PageDTO<>(0, systemProperties.getRetryPullPageSize()),
|
||||
new LambdaQueryWrapper<RetryTask>()
|
||||
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())
|
||||
.eq(RetryTask::getGroupName, groupName)
|
||||
.eq(RetryTask::getTaskType, taskType)
|
||||
.le(RetryTask::getNextTriggerAt, LocalDateTime.now().plusSeconds(SystemConstants.SCHEDULE_PERIOD))
|
||||
.gt(RetryTask::getId, lastId)
|
||||
.orderByAsc(RetryTask::getId))
|
||||
.getRecords();
|
||||
.listPage(groupName, new PageDTO<>(0, systemProperties.getRetryPullPageSize()),
|
||||
new LambdaQueryWrapper<RetryTask>()
|
||||
.select(RetryTask::getId, RetryTask::getNextTriggerAt, RetryTask::getUniqueId,
|
||||
RetryTask::getGroupName, RetryTask::getRetryCount, RetryTask::getSceneName)
|
||||
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())
|
||||
.eq(RetryTask::getGroupName, groupName)
|
||||
.eq(RetryTask::getTaskType, taskType)
|
||||
.le(RetryTask::getNextTriggerAt, LocalDateTime.now().plusSeconds(SystemConstants.SCHEDULE_PERIOD))
|
||||
.gt(RetryTask::getId, lastId)
|
||||
.orderByAsc(RetryTask::getId))
|
||||
.getRecords();
|
||||
|
||||
return RetryTaskConverter.INSTANCE.toRetryPartitionTasks(retryTasks);
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyC
|
||||
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies.WaitStrategyEnum;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.timer.CallbackTimerTask;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerContext;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
|
||||
import io.netty.util.TimerTask;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||
@ -56,7 +57,8 @@ public class ScanCallbackTaskActor extends AbstractScanGroup {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected LocalDateTime calculateNextTriggerTime(final RetryPartitionTask partitionTask) {
|
||||
protected LocalDateTime calculateNextTriggerTime(final RetryPartitionTask partitionTask,
|
||||
final SceneConfig sceneConfig) {
|
||||
|
||||
long triggerInterval = systemProperties.getCallback().getTriggerInterval();
|
||||
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(WaitStrategyEnum.FIXED.getType());
|
||||
|
@ -57,10 +57,9 @@ public class ScanRetryTaskActor extends AbstractScanGroup {
|
||||
LAST_AT_MAP.put(groupName, lastId);
|
||||
}
|
||||
|
||||
protected LocalDateTime calculateNextTriggerTime(RetryPartitionTask partitionTask) {
|
||||
@Override
|
||||
protected LocalDateTime calculateNextTriggerTime(RetryPartitionTask partitionTask, final SceneConfig sceneConfig) {
|
||||
// 更新下次触发时间
|
||||
SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess()
|
||||
.getSceneConfigByGroupNameAndSceneName(partitionTask.getGroupName(), partitionTask.getSceneName());
|
||||
|
||||
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
|
||||
|
||||
|
@ -2,11 +2,11 @@ package com.aizuda.easy.retry.server.retry.task.support.idempotent;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.aizuda.easy.retry.server.common.IdempotentStrategy;
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* @author www.byteblogs.com
|
||||
@ -16,11 +16,24 @@ import java.util.concurrent.CopyOnWriteArraySet;
|
||||
@Slf4j
|
||||
public class TimerIdempotent implements IdempotentStrategy<String, String> {
|
||||
|
||||
private static final CopyOnWriteArraySet<String> cache = new CopyOnWriteArraySet<>();
|
||||
|
||||
private static final Cache<String, String> cache;
|
||||
|
||||
static {
|
||||
cache = CacheBuilder.newBuilder()
|
||||
.concurrencyLevel(16) // 并发级别
|
||||
.expireAfterWrite(20, TimeUnit.SECONDS)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean set(String key, String value) {
|
||||
return cache.add(key.concat(StrUtil.UNDERLINE).concat(String.valueOf(value)));
|
||||
cache.put(getKey(key, value), value);
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
|
||||
private static String getKey(final String key, final String value) {
|
||||
return key.concat(StrUtil.UNDERLINE).concat(String.valueOf(value));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -30,14 +43,12 @@ public class TimerIdempotent implements IdempotentStrategy<String, String> {
|
||||
|
||||
@Override
|
||||
public boolean isExist(String key, String value) {
|
||||
if (key == null || value == null) {
|
||||
log.error("异常监控. key:[{}] value:[{}]", key, value);
|
||||
}
|
||||
return cache.contains(key.concat(StrUtil.UNDERLINE).concat(String.valueOf(value)));
|
||||
return cache.asMap().containsKey(getKey(key, value));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean clear(String key, String value) {
|
||||
return cache.removeIf(s-> s.equals(key.concat(StrUtil.UNDERLINE).concat(String.valueOf(value))));
|
||||
cache.invalidate(getKey(key, value));
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
}
|
||||
|
@ -37,7 +37,6 @@ public class RetryTimerWheel implements Lifecycle {
|
||||
|
||||
if (!isExisted(groupName, uniqueId)) {
|
||||
delay = delay < 0 ? 0 : delay;
|
||||
log.info("加入时间轮. delay:[{}ms] uniqueId:[{}]", delay, uniqueId);
|
||||
try {
|
||||
timer.newTimeout(task, delay, unit);
|
||||
idempotent.set(uniqueId, uniqueId);
|
||||
|
@ -73,7 +73,7 @@ public class ConsumerBucketActor extends AbstractActor {
|
||||
// 查询桶对应组信息
|
||||
groupConfigs = accessTemplate.getGroupConfigAccess().list(
|
||||
new LambdaQueryWrapper<GroupConfig>()
|
||||
.select(GroupConfig::getGroupName)
|
||||
.select(GroupConfig::getGroupName, GroupConfig::getGroupPartition)
|
||||
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
|
||||
.in(GroupConfig::getBucketIndex, consumerBucket.getBuckets())
|
||||
);
|
||||
@ -87,6 +87,7 @@ public class ConsumerBucketActor extends AbstractActor {
|
||||
ScanTask scanTask = new ScanTask();
|
||||
scanTask.setGroupName(groupConfig.getGroupName());
|
||||
scanTask.setBuckets(consumerBucket.getBuckets());
|
||||
scanTask.setGroupPartition(groupConfig.getGroupPartition());
|
||||
produceScanActorTask(scanTask);
|
||||
}
|
||||
}
|
||||
@ -96,8 +97,6 @@ public class ConsumerBucketActor extends AbstractActor {
|
||||
// 扫描回调数据
|
||||
ScanTask scanTask = new ScanTask();
|
||||
scanTask.setBuckets(consumerBucket.getBuckets());
|
||||
scanTask.setSize(1000);
|
||||
scanTask.setStartId(0);
|
||||
ActorRef scanJobActorRef = cacheActorRef("DEFAULT_JOB_KEY", TaskTypeEnum.JOB);
|
||||
scanJobActorRef.tell(scanTask, scanJobActorRef);
|
||||
}
|
||||
|
@ -37,10 +37,11 @@ logging:
|
||||
config: classpath:logback-boot.xml
|
||||
|
||||
easy-retry:
|
||||
retry-pull-page-size: 100 # 拉取重试数据的每批次的大小
|
||||
retry-pull-page-size: 1000 # 拉取重试数据的每批次的大小
|
||||
job-pull-page-size: 1000 # 拉取重试数据的每批次的大小
|
||||
netty-port: 1788 # 服务端netty端口
|
||||
total-partition: 2 # 重试和死信表的分区总数
|
||||
limiter: 10 # 一个客户端每秒最多接收的重试数量指令
|
||||
limiter: 1000 # 一个客户端每秒最多接收的重试数量指令
|
||||
step: 100 # 号段模式下步长配置
|
||||
log-storage: 90 # 日志保存时间(单位: day)
|
||||
callback: # 回调配置
|
||||
@ -48,6 +49,7 @@ easy-retry:
|
||||
trigger-interval: 900 #间隔时间
|
||||
db-type: mysql #当前使用的数据库
|
||||
mode: all
|
||||
retry-max-pull-count: 10
|
||||
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user