diff --git a/doc/sql/easy_retry_mysql.sql b/doc/sql/easy_retry_mysql.sql index a663e2f4b..22781878a 100644 --- a/doc/sql/easy_retry_mysql.sql +++ b/doc/sql/easy_retry_mysql.sql @@ -14,7 +14,7 @@ CREATE TABLE `group_config` `group_partition` int(11) NOT NULL COMMENT '分区', `id_generator_mode` tinyint(4) NOT NULL DEFAULT '1' COMMENT '唯一id生成模式 默认号段模式', `init_scene` tinyint(4) NOT NULL DEFAULT '0' COMMENT '是否初始化场景 0:否 1:是', - `bucket_index` int(11) DEFAULT NULL COMMENT 'bucket', + `bucket_index` int(11) NOT NULL DEFAULT '0' COMMENT 'bucket', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', PRIMARY KEY (`id`), diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java index ac3a78450..87058a906 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java @@ -20,6 +20,7 @@ public class SystemProperties { /** * 拉取数据天数 */ + @Deprecated private int lastDays = 30; /** diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java index 5db48063f..d1a1a05ae 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java @@ -159,8 +159,6 @@ public class JobExecutorActor extends AbstractActor { preTriggerAt = job.getNextTriggerAt(); } - System.out.println("时间监控 " + ResidentTaskCache.get(job.getId()) + "-" + job.getNextTriggerAt()); - WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext(); waitStrategyContext.setTriggerType(job.getTriggerType()); waitStrategyContext.setTriggerInterval(job.getTriggerInterval()); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java index 2edc1487b..db13fb641 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java @@ -33,6 +33,7 @@ import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.time.LocalDateTime; +import java.util.Collections; import java.util.List; import java.util.Objects; @@ -146,6 +147,9 @@ public class ScanJobTaskActor extends AbstractActor { } private List listAvailableJobs(Long startId, ScanTask scanTask) { + if (CollectionUtils.isEmpty(scanTask.getBuckets())) { + return Collections.emptyList(); + } List jobs = jobMapper.selectPage(new PageDTO(0, scanTask.getSize()), new LambdaQueryWrapper() diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/generator/task/AbstractGenerator.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/generator/task/AbstractGenerator.java index cf665c1bf..cf81239cc 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/generator/task/AbstractGenerator.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/generator/task/AbstractGenerator.java @@ -11,9 +11,13 @@ import com.aizuda.easy.retry.server.common.enums.DelayLevelEnum; import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.generator.id.IdGenerator; +import com.aizuda.easy.retry.server.retry.task.generator.task.TaskContext.TaskInfo; import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter; import com.aizuda.easy.retry.server.retry.task.support.RetryTaskLogConverter; +import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies; +import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyContext; +import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyEnum; import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; import com.aizuda.easy.retry.template.datasource.access.TaskAccess; import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMapper; @@ -30,7 +34,6 @@ import org.springframework.util.CollectionUtils; import java.time.LocalDateTime; import java.util.*; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** @@ -53,7 +56,7 @@ public abstract class AbstractGenerator implements TaskGenerator { public void taskGenerator(TaskContext taskContext) { LogUtils.info(log, "received report data. {}", JsonUtil.toJsonString(taskContext)); - checkAndInitScene(taskContext); + SceneConfig sceneConfig = checkAndInitScene(taskContext); List taskInfos = taskContext.getTaskInfos(); @@ -76,7 +79,7 @@ public abstract class AbstractGenerator implements TaskGenerator { List waitInsertTaskLogs = new ArrayList<>(); LocalDateTime now = LocalDateTime.now(); for (TaskContext.TaskInfo taskInfo : taskInfos) { - Pair, List> pair = doConvertTask(retryTaskMap, taskContext, now, taskInfo); + Pair, List> pair = doConvertTask(retryTaskMap, taskContext, now, taskInfo, sceneConfig); waitInsertTasks.addAll(pair.getKey()); waitInsertTaskLogs.addAll(pair.getValue()); } @@ -96,10 +99,11 @@ public abstract class AbstractGenerator implements TaskGenerator { * @param retryTaskMap * @param now * @param taskInfo + * @param sceneConfig */ private Pair, List> doConvertTask(Map> retryTaskMap, TaskContext taskContext, LocalDateTime now, - TaskContext.TaskInfo taskInfo) { + TaskInfo taskInfo, SceneConfig sceneConfig) { List waitInsertTasks = new ArrayList<>(); List waitInsertTaskLogs = new ArrayList<>(); @@ -127,7 +131,12 @@ public abstract class AbstractGenerator implements TaskGenerator { retryTask.setExtAttrs(StrUtil.EMPTY); } - retryTask.setNextTriggerAt(WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null)); + WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); + waitStrategyContext.setNextTriggerAt(now); + waitStrategyContext.setTriggerInterval(sceneConfig.getTriggerInterval()); + waitStrategyContext.setTriggerCount(1); + WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(sceneConfig.getBackOff()); + retryTask.setNextTriggerAt(waitStrategy.computeRetryTime(waitStrategyContext)); waitInsertTasks.add(retryTask); // 初始化日志 @@ -141,7 +150,7 @@ public abstract class AbstractGenerator implements TaskGenerator { protected abstract Integer initStatus(TaskContext taskContext); - private void checkAndInitScene(TaskContext taskContext) { + private SceneConfig checkAndInitScene(TaskContext taskContext) { SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(taskContext.getGroupName(), taskContext.getSceneName()); if (Objects.isNull(sceneConfig)) { @@ -154,10 +163,12 @@ public abstract class AbstractGenerator implements TaskGenerator { throw new EasyRetryServerException("failed to report data, no scene configuration found. groupName:[{}] sceneName:[{}]", taskContext.getGroupName(), taskContext.getSceneName()); } else { // 若配置了默认初始化场景配置,则发现上报数据的时候未配置场景,默认生成一个场景 - initScene(taskContext.getGroupName(), taskContext.getSceneName()); + sceneConfig = initScene(taskContext.getGroupName(), taskContext.getSceneName()); } } + return sceneConfig; + } /** @@ -169,9 +180,8 @@ public abstract class AbstractGenerator implements TaskGenerator { * @param groupName 组名称 * @param sceneName 场景名称 */ - private void initScene(String groupName, String sceneName) { - SceneConfig sceneConfig; - sceneConfig = new SceneConfig(); + private SceneConfig initScene(String groupName, String sceneName) { + SceneConfig sceneConfig = new SceneConfig(); sceneConfig.setGroupName(groupName); sceneConfig.setSceneName(sceneName); sceneConfig.setSceneStatus(StatusEnum.YES.getStatus()); @@ -179,6 +189,7 @@ public abstract class AbstractGenerator implements TaskGenerator { sceneConfig.setMaxRetryCount(DelayLevelEnum._21.getLevel()); sceneConfig.setDescription("自动初始化场景"); Assert.isTrue(1 == accessTemplate.getSceneConfigAccess().insert(sceneConfig), () -> new EasyRetryServerException("init scene error")); + return sceneConfig; } /** diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java index 819950cb2..9fa34a694 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java @@ -19,13 +19,11 @@ import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; -import com.google.common.collect.Lists; import io.netty.util.TimerTask; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.util.CollectionUtils; -import org.springframework.util.StopWatch; import java.time.LocalDateTime; import java.time.ZoneId; @@ -34,8 +32,6 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; -import java.util.function.Predicate; /** * 数据扫描模板类 @@ -59,9 +55,6 @@ public abstract class AbstractScanGroup extends AbstractActor { @Autowired protected List taskExecutors; - private static final AtomicLong preCostTime = new AtomicLong(0L); - private static final AtomicLong pullCount = new AtomicLong(1L); - @Override public Receive createReceive() { return receiveBuilder().match(ScanTask.class, config -> { @@ -78,9 +71,9 @@ public abstract class AbstractScanGroup extends AbstractActor { // 获取结束时间 long endTime = System.nanoTime(); - preCostTime.set((endTime - startTime) / 1_000_000); + preCostTime().set((endTime - startTime) / 1_000_000); - log.info("重试任务调度耗时:[{}]", preCostTime.get()); + log.info(this.getClass().getName() + "重试任务调度耗时:[{}]", preCostTime().get()); }).build(); @@ -89,35 +82,39 @@ public abstract class AbstractScanGroup extends AbstractActor { protected void doScan(final ScanTask scanTask) { // 计算循环拉取的次数 - if (preCostTime.get() > 0) { - long loopCount = Math.max((SystemConstants.SCHEDULE_PERIOD * 1000) / preCostTime.get(), 1); + if (preCostTime().get() > 0) { + long loopCount = Math.max((SystemConstants.SCHEDULE_PERIOD * 1000) / preCostTime().get(), 1); // TODO 最大拉取次数支持可配置 loopCount = Math.min(loopCount, 10); - pullCount.set(loopCount); + prePullCount().set(loopCount); } String groupName = scanTask.getGroupName(); Long lastId = Optional.ofNullable(getLastId(groupName)).orElse(0L); - log.info("retry scan start. groupName:[{}] startId:[{}] pullCount:[{}] preCostTime:[{}]", - groupName, lastId, pullCount.get(), preCostTime.get()); + log.info(this.getClass().getName() + " retry scan start. groupName:[{}] startId:[{}] pullCount:[{}] preCostTime:[{}]", + groupName, lastId, prePullCount().get(), preCostTime().get()); AtomicInteger count = new AtomicInteger(0); - PartitionTaskUtils.process(startId -> listAvailableTasks(groupName, startId, taskActuatorScene().getTaskType().getType()), - partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask), partitionTasks -> { - if (CollectionUtils.isEmpty(partitionTasks)) { - putLastId(scanTask.getGroupName(), 0L); - return Boolean.TRUE; - } + long total = PartitionTaskUtils.process( + startId -> listAvailableTasks(groupName, startId, taskActuatorScene().getTaskType().getType()), + partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask), partitionTasks -> { + if (CollectionUtils.isEmpty(partitionTasks)) { + putLastId(scanTask.getGroupName(), 0L); + return Boolean.TRUE; + } - // 超过最大的拉取次数则中断 - if (count.getAndIncrement() > pullCount.get()) { - putLastId(scanTask.getGroupName(), partitionTasks.get(partitionTasks.size() - 1).getId()); - return Boolean.TRUE; - } + // 超过最大的拉取次数则中断 + if (count.getAndIncrement() > prePullCount().get()) { + putLastId(scanTask.getGroupName(), partitionTasks.get(partitionTasks.size() - 1).getId()); + return Boolean.TRUE; + } - return false; - }, lastId); + return false; + }, lastId); + + log.info(this.getClass().getName() + " retry scan end. groupName:[{}] total:[{}] realPullCount:[{}]", + groupName, total, count); } @@ -137,7 +134,7 @@ public abstract class AbstractScanGroup extends AbstractActor { accessTemplate.getRetryTaskAccess().updateById(partitionTask.getGroupName(), retryTask); long delay = partitionTask.getNextTriggerAt().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - - System.currentTimeMillis(); + - System.currentTimeMillis() - System.currentTimeMillis() % 500; RetryTimerWheel.register(partitionTask.getGroupName(), partitionTask.getUniqueId(), timerTask(partitionTask), delay, TimeUnit.MILLISECONDS); @@ -153,6 +150,10 @@ public abstract class AbstractScanGroup extends AbstractActor { protected abstract TimerTask timerTask(RetryPartitionTask partitionTask); + protected abstract AtomicLong preCostTime(); + + protected abstract AtomicLong prePullCount(); + public List listAvailableTasks(String groupName, Long lastId, Integer taskType) { List retryTasks = accessTemplate.getRetryTaskAccess() .listPage(groupName, new PageDTO<>(0, systemProperties.getRetryPullPageSize()), diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java index c6daadeda..358835bb2 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java @@ -18,6 +18,7 @@ import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; /** * @author www.byteblogs.com @@ -29,6 +30,9 @@ import java.util.concurrent.ConcurrentMap; @Slf4j public class ScanCallbackTaskActor extends AbstractScanGroup { + private static final AtomicLong preCostTime = new AtomicLong(0L); + private static final AtomicLong pullCount = new AtomicLong(1L); + /** * 缓存待拉取数据的起点id *

@@ -74,5 +78,15 @@ public class ScanCallbackTaskActor extends AbstractScanGroup { return new CallbackTimerTask(retryTimerContext); } + @Override + protected AtomicLong preCostTime() { + return preCostTime; + } + + @Override + protected AtomicLong prePullCount() { + return pullCount; + } + } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java index f8da21d68..c95625f96 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java @@ -19,6 +19,7 @@ import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; /** * @author www.byteblogs.com @@ -30,6 +31,9 @@ import java.util.concurrent.ConcurrentMap; @Slf4j public class ScanRetryTaskActor extends AbstractScanGroup { + private static final AtomicLong preCostTime = new AtomicLong(0L); + private static final AtomicLong pullCount = new AtomicLong(1L); + /** * 缓存待拉取数据的起点id *

@@ -81,4 +85,14 @@ public class ScanRetryTaskActor extends AbstractScanGroup { retryTimerContext.setUniqueId(partitionTask.getUniqueId()); return new RetryTimerTask(retryTimerContext); } + + @Override + protected AtomicLong preCostTime() { + return preCostTime; + } + + @Override + protected AtomicLong prePullCount() { + return pullCount; + } } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/AbstractTaskExecutor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/AbstractTaskExecutor.java index b3d711aa3..eff595b40 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/AbstractTaskExecutor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/AbstractTaskExecutor.java @@ -2,10 +2,12 @@ package com.aizuda.easy.retry.server.retry.task.support.dispatch.task; import akka.actor.ActorRef; import cn.hutool.core.lang.Pair; +import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler; import com.aizuda.easy.retry.server.common.IdempotentStrategy; import com.aizuda.easy.retry.server.retry.task.support.RetryContext; +import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO; import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor; import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; @@ -15,6 +17,8 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import java.time.LocalDateTime; + /** * * @@ -55,9 +59,21 @@ public abstract class AbstractTaskExecutor implements TaskExecutor, Initializing protected boolean preCheck(RetryContext retryContext, RetryExecutor executor) { Pair pair = executor.filter(); if (!pair.getKey()) { + RetryTask retryTask = retryContext.getRetryTask(); log.warn("当前任务不满足执行条件. groupName:[{}] uniqueId:[{}], description:[{}]", - retryContext.getRetryTask().getGroupName(), - retryContext.getRetryTask().getUniqueId(), pair.getValue().toString()); + retryTask.getGroupName(), + retryTask.getUniqueId(), pair.getValue().toString()); + + // 记录日志 + RetryTaskLogDTO retryTaskLog = new RetryTaskLogDTO(); + retryTaskLog.setGroupName(retryTask.getGroupName()); + retryTaskLog.setUniqueId(retryTask.getUniqueId()); + retryTaskLog.setRetryStatus(retryTask.getRetryStatus()); + retryTaskLog.setMessage(pair.getValue().toString()); + retryTaskLog.setTriggerTime(LocalDateTime.now()); + ActorRef actorRef = ActorGenerator.logActor(); + actorRef.tell(retryTaskLog, actorRef); + return false; } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/AbstractTimerTask.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/AbstractTimerTask.java index db38b9f6e..8352351a7 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/AbstractTimerTask.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/AbstractTimerTask.java @@ -20,25 +20,24 @@ public abstract class AbstractTimerTask implements TimerTask { protected String groupName; protected String uniqueId; - private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(8, 8, 10, TimeUnit.SECONDS, + private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(16, 16, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); @Override public void run(Timeout timeout) throws Exception { - log.info("开始执行重试任务. 当前时间:[{}] groupName:[{}] uniqueId:[{}]", LocalDateTime.now(), groupName, uniqueId); - executor.execute(() -> { - + log.info("开始执行重试任务. 当前时间:[{}] groupName:[{}] uniqueId:[{}]", LocalDateTime.now(), groupName, + uniqueId); try { + doRun(timeout); + } catch (Exception e) { + log.error("重试任务执行失败 groupName:[{}] uniqueId:[{}]", groupName, uniqueId, e); + } finally { // 先清除时间轮的缓存 RetryTimerWheel.clearCache(groupName, uniqueId); - doRun(timeout); - } catch (Exception e) { - log.error("重试任务执行失败", e); } - }); } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/RetryTimerTask.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/RetryTimerTask.java index b3d226558..630ef2d0c 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/RetryTimerTask.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/RetryTimerTask.java @@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.retry.task.support.timer; import com.aizuda.easy.retry.common.core.context.SpringContext; import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; +import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutor; import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuatorFactory; import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; @@ -12,8 +13,6 @@ import io.netty.util.Timeout; import lombok.Data; import lombok.extern.slf4j.Slf4j; -import java.time.LocalDateTime; - /** * @author: www.byteblogs.com * @date : 2023-09-22 17:09 @@ -32,7 +31,6 @@ public class RetryTimerTask extends AbstractTimerTask { @Override public void doRun(final Timeout timeout){ - log.info("重试任务执行 {}", LocalDateTime.now()); AccessTemplate accessTemplate = SpringContext.getBeanByType(AccessTemplate.class); TaskAccess retryTaskAccess = accessTemplate.getRetryTaskAccess(); RetryTask retryTask = retryTaskAccess.one(context.getGroupName(), new LambdaQueryWrapper() diff --git a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java index 6f5d78d81..1fe383680 100644 --- a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java +++ b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/ConsumerBucketActor.java @@ -68,13 +68,18 @@ public class ConsumerBucketActor extends AbstractActor { } if (SystemModeEnum.isRetry(systemProperties.getMode())) { - // 查询桶对应组信息 - List groupConfigs = accessTemplate.getGroupConfigAccess().list( + List groupConfigs = null; + try { + // 查询桶对应组信息 + groupConfigs = accessTemplate.getGroupConfigAccess().list( new LambdaQueryWrapper() - .select(GroupConfig::getGroupName) - .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()) - .in(GroupConfig::getBucketIndex, consumerBucket.getBuckets()) - ); + .select(GroupConfig::getGroupName) + .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()) + .in(GroupConfig::getBucketIndex, consumerBucket.getBuckets()) + ); + } catch (Exception e) { + log.error("生成重试任务异常.", e); + } if (!CollectionUtils.isEmpty(groupConfigs)) { for (final GroupConfig groupConfig : groupConfigs) { diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/GroupConfigServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/GroupConfigServiceImpl.java index a53906553..73f0bf541 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/GroupConfigServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/GroupConfigServiceImpl.java @@ -180,12 +180,12 @@ public class GroupConfigServiceImpl implements GroupConfigService { groupConfig.setDescription(Optional.ofNullable(groupConfigRequestVO.getDescription()).orElse(StrUtil.EMPTY)); if (Objects.isNull(groupConfigRequestVO.getGroupPartition())) { groupConfig.setGroupPartition(HashUtil.bkdrHash(groupConfigRequestVO.getGroupName()) % systemProperties.getTotalPartition()); - groupConfig.setBucketIndex(HashUtil.bkdrHash(groupConfigRequestVO.getGroupName()) % systemProperties.getBucketTotal()); } else { Assert.isTrue(systemProperties.getTotalPartition() > groupConfigRequestVO.getGroupPartition(), () -> new EasyRetryServerException("分区超过最大分区. [{}]", systemProperties.getTotalPartition() - 1)); Assert.isTrue(groupConfigRequestVO.getGroupPartition() >= 0, () -> new EasyRetryServerException("分区不能是负数.")); } + groupConfig.setBucketIndex(HashUtil.bkdrHash(groupConfigRequestVO.getGroupName()) % systemProperties.getBucketTotal()); ConfigAccess groupConfigAccess = accessTemplate.getGroupConfigAccess(); Assert.isTrue(1 == groupConfigAccess.insert(groupConfig), () -> new EasyRetryServerException("新增组异常异常 groupConfigVO[{}]", groupConfigRequestVO)); diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java index a87459658..6cada535e 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java @@ -112,7 +112,7 @@ public class JobServiceImpl implements JobService { public List getJobNameList(String keywords, Long jobId) { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .select(Job::getId, Job::getJobName); + .select(Job::getId, Job::getJobName); if (StrUtil.isNotBlank(keywords)) { queryWrapper.like(Job::getJobName, keywords.trim() + "%"); } @@ -131,7 +131,8 @@ public class JobServiceImpl implements JobService { public boolean saveJob(JobRequestVO jobRequestVO) { // 判断常驻任务 Job job = updateJobResident(jobRequestVO); - job.setBucketIndex(HashUtil.bkdrHash(jobRequestVO.getGroupName() + jobRequestVO.getJobName()) % systemProperties.getBucketTotal()); + job.setBucketIndex(HashUtil.bkdrHash(jobRequestVO.getGroupName() + jobRequestVO.getJobName()) + % systemProperties.getBucketTotal()); calculateNextTriggerAt(jobRequestVO, LocalDateTime.now()); return 1 == jobMapper.insert(job); } @@ -143,33 +144,22 @@ public class JobServiceImpl implements JobService { Job job = jobMapper.selectById(jobRequestVO.getId()); Assert.notNull(job, () -> new EasyRetryServerException("更新失败")); - LocalDateTime oldTime = job.getNextTriggerAt(); // 判断常驻任务 Job updateJob = updateJobResident(jobRequestVO); - -// LocalDateTime waitUpdateTime = null; - // 新老都是常驻任务 - if (Objects.equals(job.getResident(), StatusEnum.YES.getStatus()) && Objects.equals(updateJob.getResident(), StatusEnum.YES.getStatus())) { -// LocalDateTime newTime = calculateNextTriggerAt(jobRequestVO, LocalDateTime.now()); - log.info("新老都是常驻任务 newTime:[{}] oldTime:[{}]", null , oldTime); -// job.setNextTriggerAt(oldTime); - // 老的是常驻任务 新的是非常驻任务 需要使用内存里面的触发时间计算触发时间 - } else if (Objects.equals(job.getResident(), StatusEnum.YES.getStatus()) && Objects.equals(updateJob.getResident(), StatusEnum.NO.getStatus())) { + // 非常驻任务 > 非常驻任务 + if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals(updateJob.getResident(), + StatusEnum.NO.getStatus())) { + updateJob.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, LocalDateTime.now())); + } else if (Objects.equals(job.getResident(), StatusEnum.YES.getStatus()) && Objects.equals( + updateJob.getResident(), StatusEnum.NO.getStatus())) { // 常驻任务的触发时间 - LocalDateTime time = Optional.ofNullable(ResidentTaskCache.get(jobRequestVO.getId())).orElse(LocalDateTime.now()); - LocalDateTime newTime = calculateNextTriggerAt(jobRequestVO, time); - log.info("old是常驻任务 new不常驻任务 newTime:[{}] oldTime:[{}]", newTime , oldTime); - updateJob.setNextTriggerAt(newTime); + LocalDateTime time = Optional.ofNullable(ResidentTaskCache.get(jobRequestVO.getId())) + .orElse(LocalDateTime.now()); + updateJob.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, time)); // 老的是不是常驻任务 新的是常驻任务 需要使用当前时间计算下次触发时间 - } else if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals(updateJob.getResident(), StatusEnum.YES.getStatus())) { - LocalDateTime newTime = calculateNextTriggerAt(jobRequestVO, LocalDateTime.now()); - log.info("old不是常驻任务 new是常驻任务 newTime:[{}] oldTime:[{}]",newTime , oldTime); + } else if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals( + updateJob.getResident(), StatusEnum.YES.getStatus())) { updateJob.setNextTriggerAt(LocalDateTime.now()); - } else { - - LocalDateTime newTime = calculateNextTriggerAt(jobRequestVO, LocalDateTime.now()); - log.info("old不是常驻任务 new不常驻任务 newTime:[{}] oldTime:[{}]",newTime , oldTime); - updateJob.setNextTriggerAt(newTime); } return 1 == jobMapper.updateById(updateJob);