feat: 2.4.0
1. 优化重试的任务扫描间隔偶发重复触发问题 2. 修复组没有填充bucket_index值
This commit is contained in:
parent
a4498af88e
commit
bb1a569c39
@ -14,7 +14,7 @@ CREATE TABLE `group_config`
|
||||
`group_partition` int(11) NOT NULL COMMENT '分区',
|
||||
`id_generator_mode` tinyint(4) NOT NULL DEFAULT '1' COMMENT '唯一id生成模式 默认号段模式',
|
||||
`init_scene` tinyint(4) NOT NULL DEFAULT '0' COMMENT '是否初始化场景 0:否 1:是',
|
||||
`bucket_index` int(11) DEFAULT NULL COMMENT 'bucket',
|
||||
`bucket_index` int(11) NOT NULL DEFAULT '0' COMMENT 'bucket',
|
||||
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||
PRIMARY KEY (`id`),
|
||||
|
@ -20,6 +20,7 @@ public class SystemProperties {
|
||||
/**
|
||||
* 拉取数据天数
|
||||
*/
|
||||
@Deprecated
|
||||
private int lastDays = 30;
|
||||
|
||||
/**
|
||||
|
@ -159,8 +159,6 @@ public class JobExecutorActor extends AbstractActor {
|
||||
preTriggerAt = job.getNextTriggerAt();
|
||||
}
|
||||
|
||||
System.out.println("时间监控 " + ResidentTaskCache.get(job.getId()) + "-" + job.getNextTriggerAt());
|
||||
|
||||
WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
|
||||
waitStrategyContext.setTriggerType(job.getTriggerType());
|
||||
waitStrategyContext.setTriggerInterval(job.getTriggerInterval());
|
||||
|
@ -33,6 +33,7 @@ import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
@ -146,6 +147,9 @@ public class ScanJobTaskActor extends AbstractActor {
|
||||
}
|
||||
|
||||
private List<JobPartitionTask> listAvailableJobs(Long startId, ScanTask scanTask) {
|
||||
if (CollectionUtils.isEmpty(scanTask.getBuckets())) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
List<Job> jobs = jobMapper.selectPage(new PageDTO<Job>(0, scanTask.getSize()),
|
||||
new LambdaQueryWrapper<Job>()
|
||||
|
@ -11,9 +11,13 @@ import com.aizuda.easy.retry.server.common.enums.DelayLevelEnum;
|
||||
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
|
||||
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
||||
import com.aizuda.easy.retry.server.common.generator.id.IdGenerator;
|
||||
import com.aizuda.easy.retry.server.retry.task.generator.task.TaskContext.TaskInfo;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskLogConverter;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyContext;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyEnum;
|
||||
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
|
||||
import com.aizuda.easy.retry.template.datasource.access.TaskAccess;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMapper;
|
||||
@ -30,7 +34,6 @@ import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
@ -53,7 +56,7 @@ public abstract class AbstractGenerator implements TaskGenerator {
|
||||
public void taskGenerator(TaskContext taskContext) {
|
||||
LogUtils.info(log, "received report data. {}", JsonUtil.toJsonString(taskContext));
|
||||
|
||||
checkAndInitScene(taskContext);
|
||||
SceneConfig sceneConfig = checkAndInitScene(taskContext);
|
||||
|
||||
List<TaskContext.TaskInfo> taskInfos = taskContext.getTaskInfos();
|
||||
|
||||
@ -76,7 +79,7 @@ public abstract class AbstractGenerator implements TaskGenerator {
|
||||
List<RetryTaskLog> waitInsertTaskLogs = new ArrayList<>();
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
for (TaskContext.TaskInfo taskInfo : taskInfos) {
|
||||
Pair<List<RetryTask>, List<RetryTaskLog>> pair = doConvertTask(retryTaskMap, taskContext, now, taskInfo);
|
||||
Pair<List<RetryTask>, List<RetryTaskLog>> pair = doConvertTask(retryTaskMap, taskContext, now, taskInfo, sceneConfig);
|
||||
waitInsertTasks.addAll(pair.getKey());
|
||||
waitInsertTaskLogs.addAll(pair.getValue());
|
||||
}
|
||||
@ -96,10 +99,11 @@ public abstract class AbstractGenerator implements TaskGenerator {
|
||||
* @param retryTaskMap
|
||||
* @param now
|
||||
* @param taskInfo
|
||||
* @param sceneConfig
|
||||
*/
|
||||
private Pair<List<RetryTask>, List<RetryTaskLog>> doConvertTask(Map<String/*幂等ID*/, List<RetryTask>> retryTaskMap,
|
||||
TaskContext taskContext, LocalDateTime now,
|
||||
TaskContext.TaskInfo taskInfo) {
|
||||
TaskInfo taskInfo, SceneConfig sceneConfig) {
|
||||
List<RetryTask> waitInsertTasks = new ArrayList<>();
|
||||
List<RetryTaskLog> waitInsertTaskLogs = new ArrayList<>();
|
||||
|
||||
@ -127,7 +131,12 @@ public abstract class AbstractGenerator implements TaskGenerator {
|
||||
retryTask.setExtAttrs(StrUtil.EMPTY);
|
||||
}
|
||||
|
||||
retryTask.setNextTriggerAt(WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null));
|
||||
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
|
||||
waitStrategyContext.setNextTriggerAt(now);
|
||||
waitStrategyContext.setTriggerInterval(sceneConfig.getTriggerInterval());
|
||||
waitStrategyContext.setTriggerCount(1);
|
||||
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(sceneConfig.getBackOff());
|
||||
retryTask.setNextTriggerAt(waitStrategy.computeRetryTime(waitStrategyContext));
|
||||
waitInsertTasks.add(retryTask);
|
||||
|
||||
// 初始化日志
|
||||
@ -141,7 +150,7 @@ public abstract class AbstractGenerator implements TaskGenerator {
|
||||
|
||||
protected abstract Integer initStatus(TaskContext taskContext);
|
||||
|
||||
private void checkAndInitScene(TaskContext taskContext) {
|
||||
private SceneConfig checkAndInitScene(TaskContext taskContext) {
|
||||
SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(taskContext.getGroupName(), taskContext.getSceneName());
|
||||
if (Objects.isNull(sceneConfig)) {
|
||||
|
||||
@ -154,10 +163,12 @@ public abstract class AbstractGenerator implements TaskGenerator {
|
||||
throw new EasyRetryServerException("failed to report data, no scene configuration found. groupName:[{}] sceneName:[{}]", taskContext.getGroupName(), taskContext.getSceneName());
|
||||
} else {
|
||||
// 若配置了默认初始化场景配置,则发现上报数据的时候未配置场景,默认生成一个场景
|
||||
initScene(taskContext.getGroupName(), taskContext.getSceneName());
|
||||
sceneConfig = initScene(taskContext.getGroupName(), taskContext.getSceneName());
|
||||
}
|
||||
}
|
||||
|
||||
return sceneConfig;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
@ -169,9 +180,8 @@ public abstract class AbstractGenerator implements TaskGenerator {
|
||||
* @param groupName 组名称
|
||||
* @param sceneName 场景名称
|
||||
*/
|
||||
private void initScene(String groupName, String sceneName) {
|
||||
SceneConfig sceneConfig;
|
||||
sceneConfig = new SceneConfig();
|
||||
private SceneConfig initScene(String groupName, String sceneName) {
|
||||
SceneConfig sceneConfig = new SceneConfig();
|
||||
sceneConfig.setGroupName(groupName);
|
||||
sceneConfig.setSceneName(sceneName);
|
||||
sceneConfig.setSceneStatus(StatusEnum.YES.getStatus());
|
||||
@ -179,6 +189,7 @@ public abstract class AbstractGenerator implements TaskGenerator {
|
||||
sceneConfig.setMaxRetryCount(DelayLevelEnum._21.getLevel());
|
||||
sceneConfig.setDescription("自动初始化场景");
|
||||
Assert.isTrue(1 == accessTemplate.getSceneConfigAccess().insert(sceneConfig), () -> new EasyRetryServerException("init scene error"));
|
||||
return sceneConfig;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -19,13 +19,11 @@ import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.netty.util.TimerTask;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.StopWatch;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
@ -34,8 +32,6 @@ import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
/**
|
||||
* 数据扫描模板类
|
||||
@ -59,9 +55,6 @@ public abstract class AbstractScanGroup extends AbstractActor {
|
||||
@Autowired
|
||||
protected List<TaskExecutor> taskExecutors;
|
||||
|
||||
private static final AtomicLong preCostTime = new AtomicLong(0L);
|
||||
private static final AtomicLong pullCount = new AtomicLong(1L);
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder().match(ScanTask.class, config -> {
|
||||
@ -78,9 +71,9 @@ public abstract class AbstractScanGroup extends AbstractActor {
|
||||
// 获取结束时间
|
||||
long endTime = System.nanoTime();
|
||||
|
||||
preCostTime.set((endTime - startTime) / 1_000_000);
|
||||
preCostTime().set((endTime - startTime) / 1_000_000);
|
||||
|
||||
log.info("重试任务调度耗时:[{}]", preCostTime.get());
|
||||
log.info(this.getClass().getName() + "重试任务调度耗时:[{}]", preCostTime().get());
|
||||
|
||||
}).build();
|
||||
|
||||
@ -89,35 +82,39 @@ public abstract class AbstractScanGroup extends AbstractActor {
|
||||
protected void doScan(final ScanTask scanTask) {
|
||||
|
||||
// 计算循环拉取的次数
|
||||
if (preCostTime.get() > 0) {
|
||||
long loopCount = Math.max((SystemConstants.SCHEDULE_PERIOD * 1000) / preCostTime.get(), 1);
|
||||
if (preCostTime().get() > 0) {
|
||||
long loopCount = Math.max((SystemConstants.SCHEDULE_PERIOD * 1000) / preCostTime().get(), 1);
|
||||
// TODO 最大拉取次数支持可配置
|
||||
loopCount = Math.min(loopCount, 10);
|
||||
pullCount.set(loopCount);
|
||||
prePullCount().set(loopCount);
|
||||
}
|
||||
|
||||
String groupName = scanTask.getGroupName();
|
||||
Long lastId = Optional.ofNullable(getLastId(groupName)).orElse(0L);
|
||||
|
||||
log.info("retry scan start. groupName:[{}] startId:[{}] pullCount:[{}] preCostTime:[{}]",
|
||||
groupName, lastId, pullCount.get(), preCostTime.get());
|
||||
log.info(this.getClass().getName() + " retry scan start. groupName:[{}] startId:[{}] pullCount:[{}] preCostTime:[{}]",
|
||||
groupName, lastId, prePullCount().get(), preCostTime().get());
|
||||
|
||||
AtomicInteger count = new AtomicInteger(0);
|
||||
PartitionTaskUtils.process(startId -> listAvailableTasks(groupName, startId, taskActuatorScene().getTaskType().getType()),
|
||||
partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask), partitionTasks -> {
|
||||
if (CollectionUtils.isEmpty(partitionTasks)) {
|
||||
putLastId(scanTask.getGroupName(), 0L);
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
long total = PartitionTaskUtils.process(
|
||||
startId -> listAvailableTasks(groupName, startId, taskActuatorScene().getTaskType().getType()),
|
||||
partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask), partitionTasks -> {
|
||||
if (CollectionUtils.isEmpty(partitionTasks)) {
|
||||
putLastId(scanTask.getGroupName(), 0L);
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
|
||||
// 超过最大的拉取次数则中断
|
||||
if (count.getAndIncrement() > pullCount.get()) {
|
||||
putLastId(scanTask.getGroupName(), partitionTasks.get(partitionTasks.size() - 1).getId());
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
// 超过最大的拉取次数则中断
|
||||
if (count.getAndIncrement() > prePullCount().get()) {
|
||||
putLastId(scanTask.getGroupName(), partitionTasks.get(partitionTasks.size() - 1).getId());
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
|
||||
return false;
|
||||
}, lastId);
|
||||
return false;
|
||||
}, lastId);
|
||||
|
||||
log.info(this.getClass().getName() + " retry scan end. groupName:[{}] total:[{}] realPullCount:[{}]",
|
||||
groupName, total, count);
|
||||
|
||||
}
|
||||
|
||||
@ -137,7 +134,7 @@ public abstract class AbstractScanGroup extends AbstractActor {
|
||||
accessTemplate.getRetryTaskAccess().updateById(partitionTask.getGroupName(), retryTask);
|
||||
|
||||
long delay = partitionTask.getNextTriggerAt().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()
|
||||
- System.currentTimeMillis();
|
||||
- System.currentTimeMillis() - System.currentTimeMillis() % 500;
|
||||
RetryTimerWheel.register(partitionTask.getGroupName(), partitionTask.getUniqueId(), timerTask(partitionTask),
|
||||
delay,
|
||||
TimeUnit.MILLISECONDS);
|
||||
@ -153,6 +150,10 @@ public abstract class AbstractScanGroup extends AbstractActor {
|
||||
|
||||
protected abstract TimerTask timerTask(RetryPartitionTask partitionTask);
|
||||
|
||||
protected abstract AtomicLong preCostTime();
|
||||
|
||||
protected abstract AtomicLong prePullCount();
|
||||
|
||||
public List<RetryPartitionTask> listAvailableTasks(String groupName, Long lastId, Integer taskType) {
|
||||
List<RetryTask> retryTasks = accessTemplate.getRetryTaskAccess()
|
||||
.listPage(groupName, new PageDTO<>(0, systemProperties.getRetryPullPageSize()),
|
||||
|
@ -18,6 +18,7 @@ import org.springframework.stereotype.Component;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* @author www.byteblogs.com
|
||||
@ -29,6 +30,9 @@ import java.util.concurrent.ConcurrentMap;
|
||||
@Slf4j
|
||||
public class ScanCallbackTaskActor extends AbstractScanGroup {
|
||||
|
||||
private static final AtomicLong preCostTime = new AtomicLong(0L);
|
||||
private static final AtomicLong pullCount = new AtomicLong(1L);
|
||||
|
||||
/**
|
||||
* 缓存待拉取数据的起点id
|
||||
* <p>
|
||||
@ -74,5 +78,15 @@ public class ScanCallbackTaskActor extends AbstractScanGroup {
|
||||
return new CallbackTimerTask(retryTimerContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AtomicLong preCostTime() {
|
||||
return preCostTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AtomicLong prePullCount() {
|
||||
return pullCount;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ import org.springframework.stereotype.Component;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* @author www.byteblogs.com
|
||||
@ -30,6 +31,9 @@ import java.util.concurrent.ConcurrentMap;
|
||||
@Slf4j
|
||||
public class ScanRetryTaskActor extends AbstractScanGroup {
|
||||
|
||||
private static final AtomicLong preCostTime = new AtomicLong(0L);
|
||||
private static final AtomicLong pullCount = new AtomicLong(1L);
|
||||
|
||||
/**
|
||||
* 缓存待拉取数据的起点id
|
||||
* <p>
|
||||
@ -81,4 +85,14 @@ public class ScanRetryTaskActor extends AbstractScanGroup {
|
||||
retryTimerContext.setUniqueId(partitionTask.getUniqueId());
|
||||
return new RetryTimerTask(retryTimerContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AtomicLong preCostTime() {
|
||||
return preCostTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AtomicLong prePullCount() {
|
||||
return pullCount;
|
||||
}
|
||||
}
|
||||
|
@ -2,10 +2,12 @@ package com.aizuda.easy.retry.server.retry.task.support.dispatch.task;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import cn.hutool.core.lang.Pair;
|
||||
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||
import com.aizuda.easy.retry.server.common.config.SystemProperties;
|
||||
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
|
||||
import com.aizuda.easy.retry.server.common.IdempotentStrategy;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
|
||||
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
||||
@ -15,6 +17,8 @@ import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
@ -55,9 +59,21 @@ public abstract class AbstractTaskExecutor implements TaskExecutor, Initializing
|
||||
protected boolean preCheck(RetryContext retryContext, RetryExecutor executor) {
|
||||
Pair<Boolean /*是否符合条件*/, StringBuilder/*描述信息*/> pair = executor.filter();
|
||||
if (!pair.getKey()) {
|
||||
RetryTask retryTask = retryContext.getRetryTask();
|
||||
log.warn("当前任务不满足执行条件. groupName:[{}] uniqueId:[{}], description:[{}]",
|
||||
retryContext.getRetryTask().getGroupName(),
|
||||
retryContext.getRetryTask().getUniqueId(), pair.getValue().toString());
|
||||
retryTask.getGroupName(),
|
||||
retryTask.getUniqueId(), pair.getValue().toString());
|
||||
|
||||
// 记录日志
|
||||
RetryTaskLogDTO retryTaskLog = new RetryTaskLogDTO();
|
||||
retryTaskLog.setGroupName(retryTask.getGroupName());
|
||||
retryTaskLog.setUniqueId(retryTask.getUniqueId());
|
||||
retryTaskLog.setRetryStatus(retryTask.getRetryStatus());
|
||||
retryTaskLog.setMessage(pair.getValue().toString());
|
||||
retryTaskLog.setTriggerTime(LocalDateTime.now());
|
||||
ActorRef actorRef = ActorGenerator.logActor();
|
||||
actorRef.tell(retryTaskLog, actorRef);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -20,25 +20,24 @@ public abstract class AbstractTimerTask implements TimerTask {
|
||||
protected String groupName;
|
||||
protected String uniqueId;
|
||||
|
||||
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(8, 8, 10, TimeUnit.SECONDS,
|
||||
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(16, 16, 10, TimeUnit.SECONDS,
|
||||
new LinkedBlockingQueue<>());
|
||||
|
||||
|
||||
@Override
|
||||
public void run(Timeout timeout) throws Exception {
|
||||
log.info("开始执行重试任务. 当前时间:[{}] groupName:[{}] uniqueId:[{}]", LocalDateTime.now(), groupName, uniqueId);
|
||||
|
||||
executor.execute(() -> {
|
||||
|
||||
log.info("开始执行重试任务. 当前时间:[{}] groupName:[{}] uniqueId:[{}]", LocalDateTime.now(), groupName,
|
||||
uniqueId);
|
||||
try {
|
||||
doRun(timeout);
|
||||
} catch (Exception e) {
|
||||
log.error("重试任务执行失败 groupName:[{}] uniqueId:[{}]", groupName, uniqueId, e);
|
||||
} finally {
|
||||
// 先清除时间轮的缓存
|
||||
RetryTimerWheel.clearCache(groupName, uniqueId);
|
||||
|
||||
doRun(timeout);
|
||||
} catch (Exception e) {
|
||||
log.error("重试任务执行失败", e);
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.retry.task.support.timer;
|
||||
|
||||
import com.aizuda.easy.retry.common.core.context.SpringContext;
|
||||
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
|
||||
import com.aizuda.easy.retry.common.core.util.JsonUtil;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutor;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuatorFactory;
|
||||
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
|
||||
@ -12,8 +13,6 @@ import io.netty.util.Timeout;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
/**
|
||||
* @author: www.byteblogs.com
|
||||
* @date : 2023-09-22 17:09
|
||||
@ -32,7 +31,6 @@ public class RetryTimerTask extends AbstractTimerTask {
|
||||
|
||||
@Override
|
||||
public void doRun(final Timeout timeout){
|
||||
log.info("重试任务执行 {}", LocalDateTime.now());
|
||||
AccessTemplate accessTemplate = SpringContext.getBeanByType(AccessTemplate.class);
|
||||
TaskAccess<RetryTask> retryTaskAccess = accessTemplate.getRetryTaskAccess();
|
||||
RetryTask retryTask = retryTaskAccess.one(context.getGroupName(), new LambdaQueryWrapper<RetryTask>()
|
||||
|
@ -68,13 +68,18 @@ public class ConsumerBucketActor extends AbstractActor {
|
||||
}
|
||||
|
||||
if (SystemModeEnum.isRetry(systemProperties.getMode())) {
|
||||
// 查询桶对应组信息
|
||||
List<GroupConfig> groupConfigs = accessTemplate.getGroupConfigAccess().list(
|
||||
List<GroupConfig> groupConfigs = null;
|
||||
try {
|
||||
// 查询桶对应组信息
|
||||
groupConfigs = accessTemplate.getGroupConfigAccess().list(
|
||||
new LambdaQueryWrapper<GroupConfig>()
|
||||
.select(GroupConfig::getGroupName)
|
||||
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
|
||||
.in(GroupConfig::getBucketIndex, consumerBucket.getBuckets())
|
||||
);
|
||||
.select(GroupConfig::getGroupName)
|
||||
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
|
||||
.in(GroupConfig::getBucketIndex, consumerBucket.getBuckets())
|
||||
);
|
||||
} catch (Exception e) {
|
||||
log.error("生成重试任务异常.", e);
|
||||
}
|
||||
|
||||
if (!CollectionUtils.isEmpty(groupConfigs)) {
|
||||
for (final GroupConfig groupConfig : groupConfigs) {
|
||||
|
@ -180,12 +180,12 @@ public class GroupConfigServiceImpl implements GroupConfigService {
|
||||
groupConfig.setDescription(Optional.ofNullable(groupConfigRequestVO.getDescription()).orElse(StrUtil.EMPTY));
|
||||
if (Objects.isNull(groupConfigRequestVO.getGroupPartition())) {
|
||||
groupConfig.setGroupPartition(HashUtil.bkdrHash(groupConfigRequestVO.getGroupName()) % systemProperties.getTotalPartition());
|
||||
groupConfig.setBucketIndex(HashUtil.bkdrHash(groupConfigRequestVO.getGroupName()) % systemProperties.getBucketTotal());
|
||||
} else {
|
||||
Assert.isTrue(systemProperties.getTotalPartition() > groupConfigRequestVO.getGroupPartition(), () -> new EasyRetryServerException("分区超过最大分区. [{}]", systemProperties.getTotalPartition() - 1));
|
||||
Assert.isTrue(groupConfigRequestVO.getGroupPartition() >= 0, () -> new EasyRetryServerException("分区不能是负数."));
|
||||
}
|
||||
|
||||
groupConfig.setBucketIndex(HashUtil.bkdrHash(groupConfigRequestVO.getGroupName()) % systemProperties.getBucketTotal());
|
||||
ConfigAccess<GroupConfig> groupConfigAccess = accessTemplate.getGroupConfigAccess();
|
||||
Assert.isTrue(1 == groupConfigAccess.insert(groupConfig), () -> new EasyRetryServerException("新增组异常异常 groupConfigVO[{}]", groupConfigRequestVO));
|
||||
|
||||
|
@ -112,7 +112,7 @@ public class JobServiceImpl implements JobService {
|
||||
public List<JobResponseVO> getJobNameList(String keywords, Long jobId) {
|
||||
|
||||
LambdaQueryWrapper<Job> queryWrapper = new LambdaQueryWrapper<Job>()
|
||||
.select(Job::getId, Job::getJobName);
|
||||
.select(Job::getId, Job::getJobName);
|
||||
if (StrUtil.isNotBlank(keywords)) {
|
||||
queryWrapper.like(Job::getJobName, keywords.trim() + "%");
|
||||
}
|
||||
@ -131,7 +131,8 @@ public class JobServiceImpl implements JobService {
|
||||
public boolean saveJob(JobRequestVO jobRequestVO) {
|
||||
// 判断常驻任务
|
||||
Job job = updateJobResident(jobRequestVO);
|
||||
job.setBucketIndex(HashUtil.bkdrHash(jobRequestVO.getGroupName() + jobRequestVO.getJobName()) % systemProperties.getBucketTotal());
|
||||
job.setBucketIndex(HashUtil.bkdrHash(jobRequestVO.getGroupName() + jobRequestVO.getJobName())
|
||||
% systemProperties.getBucketTotal());
|
||||
calculateNextTriggerAt(jobRequestVO, LocalDateTime.now());
|
||||
return 1 == jobMapper.insert(job);
|
||||
}
|
||||
@ -143,33 +144,22 @@ public class JobServiceImpl implements JobService {
|
||||
Job job = jobMapper.selectById(jobRequestVO.getId());
|
||||
Assert.notNull(job, () -> new EasyRetryServerException("更新失败"));
|
||||
|
||||
LocalDateTime oldTime = job.getNextTriggerAt();
|
||||
// 判断常驻任务
|
||||
Job updateJob = updateJobResident(jobRequestVO);
|
||||
|
||||
// LocalDateTime waitUpdateTime = null;
|
||||
// 新老都是常驻任务
|
||||
if (Objects.equals(job.getResident(), StatusEnum.YES.getStatus()) && Objects.equals(updateJob.getResident(), StatusEnum.YES.getStatus())) {
|
||||
// LocalDateTime newTime = calculateNextTriggerAt(jobRequestVO, LocalDateTime.now());
|
||||
log.info("新老都是常驻任务 newTime:[{}] oldTime:[{}]", null , oldTime);
|
||||
// job.setNextTriggerAt(oldTime);
|
||||
// 老的是常驻任务 新的是非常驻任务 需要使用内存里面的触发时间计算触发时间
|
||||
} else if (Objects.equals(job.getResident(), StatusEnum.YES.getStatus()) && Objects.equals(updateJob.getResident(), StatusEnum.NO.getStatus())) {
|
||||
// 非常驻任务 > 非常驻任务
|
||||
if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals(updateJob.getResident(),
|
||||
StatusEnum.NO.getStatus())) {
|
||||
updateJob.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, LocalDateTime.now()));
|
||||
} else if (Objects.equals(job.getResident(), StatusEnum.YES.getStatus()) && Objects.equals(
|
||||
updateJob.getResident(), StatusEnum.NO.getStatus())) {
|
||||
// 常驻任务的触发时间
|
||||
LocalDateTime time = Optional.ofNullable(ResidentTaskCache.get(jobRequestVO.getId())).orElse(LocalDateTime.now());
|
||||
LocalDateTime newTime = calculateNextTriggerAt(jobRequestVO, time);
|
||||
log.info("old是常驻任务 new不常驻任务 newTime:[{}] oldTime:[{}]", newTime , oldTime);
|
||||
updateJob.setNextTriggerAt(newTime);
|
||||
LocalDateTime time = Optional.ofNullable(ResidentTaskCache.get(jobRequestVO.getId()))
|
||||
.orElse(LocalDateTime.now());
|
||||
updateJob.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, time));
|
||||
// 老的是不是常驻任务 新的是常驻任务 需要使用当前时间计算下次触发时间
|
||||
} else if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals(updateJob.getResident(), StatusEnum.YES.getStatus())) {
|
||||
LocalDateTime newTime = calculateNextTriggerAt(jobRequestVO, LocalDateTime.now());
|
||||
log.info("old不是常驻任务 new是常驻任务 newTime:[{}] oldTime:[{}]",newTime , oldTime);
|
||||
} else if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals(
|
||||
updateJob.getResident(), StatusEnum.YES.getStatus())) {
|
||||
updateJob.setNextTriggerAt(LocalDateTime.now());
|
||||
} else {
|
||||
|
||||
LocalDateTime newTime = calculateNextTriggerAt(jobRequestVO, LocalDateTime.now());
|
||||
log.info("old不是常驻任务 new不常驻任务 newTime:[{}] oldTime:[{}]",newTime , oldTime);
|
||||
updateJob.setNextTriggerAt(newTime);
|
||||
}
|
||||
|
||||
return 1 == jobMapper.updateById(updateJob);
|
||||
|
Loading…
Reference in New Issue
Block a user