diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceLRU.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceLRU.java index 209a807b..f1575cdc 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceLRU.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceLRU.java @@ -1,6 +1,7 @@ package com.aizuda.easy.retry.server.common.allocate.client; import com.aizuda.easy.retry.server.common.ClientLoadBalance; +import com.aizuda.easy.retry.server.common.allocate.client.ClientLoadBalanceManager.AllocationAlgorithmEnum; import java.util.LinkedHashMap; import java.util.Map; @@ -53,6 +54,6 @@ public class ClientLoadBalanceLRU implements ClientLoadBalance { @Override public int routeType() { - return 0; + return AllocationAlgorithmEnum.LRU.getType(); } } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceRound.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceRound.java index 169f2209..0c20221c 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceRound.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceRound.java @@ -1,6 +1,7 @@ package com.aizuda.easy.retry.server.common.allocate.client; import com.aizuda.easy.retry.server.common.ClientLoadBalance; +import com.aizuda.easy.retry.server.common.allocate.client.ClientLoadBalanceManager.AllocationAlgorithmEnum; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; @@ -20,7 +21,7 @@ public class ClientLoadBalanceRound implements ClientLoadBalance { public String route(final String allocKey, final TreeSet clientAllAddressSet) { String[] addressArr = clientAllAddressSet.toArray(new String[0]); AtomicInteger next = COUNTER.getOrDefault(allocKey, new AtomicInteger(1)); - String nextClientId = addressArr[ next.get() % clientAllAddressSet.size()]; + String nextClientId = addressArr[next.get() % clientAllAddressSet.size()]; int nextIndex = next.incrementAndGet(); if (nextIndex > THRESHOLD) { next = new AtomicInteger(1); @@ -32,6 +33,6 @@ public class ClientLoadBalanceRound implements ClientLoadBalance { @Override public int routeType() { - return 0; + return AllocationAlgorithmEnum.ROUND.getType(); } } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/ClientNodeAllocateHandler.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/ClientNodeAllocateHandler.java index 440f954a..9eef855b 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/ClientNodeAllocateHandler.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/ClientNodeAllocateHandler.java @@ -45,10 +45,10 @@ public class ClientNodeAllocateHandler { ClientLoadBalance clientLoadBalanceRandom = ClientLoadBalanceManager.getClientLoadBalance(routeKey); - String hostIp = clientLoadBalanceRandom.route(allocKey, new TreeSet<>(serverNodes.stream().map(RegisterNodeInfo::getHostIp).collect(Collectors.toSet()))); + String hostId = clientLoadBalanceRandom.route(allocKey, new TreeSet<>(serverNodes.stream().map(RegisterNodeInfo::getHostId).collect(Collectors.toSet()))); Stream registerNodeInfoStream = serverNodes.stream() - .filter(s -> s.getHostIp().equals(hostIp)); + .filter(s -> s.getHostId().equals(hostId)); return registerNodeInfoStream.findFirst().orElse(null); } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java index 18771296..093ab034 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java @@ -48,7 +48,6 @@ public class JobExecutorActor extends AbstractActor { JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job); context.setTaskBatchId(taskExecute.getTaskBatchId()); context.setJobId(job.getId()); - context.setTaskType(job.getTaskType()); jobExecutor.execute(context); } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/task/ClusterTaskGenerator.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/task/ClusterTaskGenerator.java index 08696387..f35d7f0f 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/task/ClusterTaskGenerator.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/task/ClusterTaskGenerator.java @@ -34,8 +34,6 @@ public class ClusterTaskGenerator extends AbstractJobTaskGenerator { protected ClientNodeAllocateHandler clientNodeAllocateHandler; @Autowired private JobTaskMapper jobTaskMapper; - @Autowired - private JobMapper jobMapper; @Override public TaskTypeEnum getTaskInstanceType() { diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java index 335f8f7b..b6c3e578 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java @@ -15,6 +15,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import java.util.List; import java.util.Objects; @@ -38,7 +39,7 @@ public class JobTaskBatchHandler { .eq(JobTask::getTaskBatchId, taskBatchId)); - if (jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) { + if (CollectionUtils.isEmpty(jobTasks) || jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) { return false; } long failCount = jobTasks.stream().filter(jobTask -> jobTask.getTaskStatus() == JobTaskBatchStatusEnum.FAIL.getStatus()).count(); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/idempotent/TimerIdempotent.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/idempotent/TimerIdempotent.java index afe0a146..6afabc13 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/idempotent/TimerIdempotent.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/idempotent/TimerIdempotent.java @@ -21,7 +21,7 @@ public class TimerIdempotent implements IdempotentStrategy { @Override public Long get(Long s) { - throw new UnsupportedOperationException("不支持此操作"); + throw new UnsupportedOperationException("不支持此操作"); } @Override @@ -31,7 +31,6 @@ public class TimerIdempotent implements IdempotentStrategy { @Override public boolean clear(Long key, Long value) { - cache.clear(); - return Boolean.TRUE; + return cache.removeIf(l -> l.equals(key)); } } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/dto/RetryPartitionTask.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/dto/RetryPartitionTask.java index 387e57b4..98790de7 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/dto/RetryPartitionTask.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/dto/RetryPartitionTask.java @@ -4,6 +4,8 @@ import com.aizuda.easy.retry.server.common.dto.PartitionTask; import lombok.Data; import lombok.EqualsAndHashCode; +import java.time.LocalDateTime; + /** * @author www.byteblogs.com * @date 2023-10-25 22:23:24 @@ -12,4 +14,18 @@ import lombok.EqualsAndHashCode; @EqualsAndHashCode(callSuper = true) @Data public class RetryPartitionTask extends PartitionTask { + + private String uniqueId; + + private String groupName; + + private String sceneName; + + /** + * 下次触发时间 + */ + private LocalDateTime nextTriggerAt; + + private Integer retryCount; + } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/WaitStrategy.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/WaitStrategy.java index f42045ab..a02f74b4 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/WaitStrategy.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/WaitStrategy.java @@ -1,5 +1,7 @@ package com.aizuda.easy.retry.server.retry.task.support; +import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyContext; + import java.time.LocalDateTime; /** @@ -13,9 +15,9 @@ public interface WaitStrategy { /** * 计算下次重试触发时间 * - * @param retryContext {@link RetryContext} 重试上下文 + * @param waitStrategyContext {@link WaitStrategyContext} 重试上下文 * @return 下次触发时间 */ - LocalDateTime computeRetryTime(RetryContext retryContext); + LocalDateTime computeRetryTime(WaitStrategyContext waitStrategyContext); } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecCallbackUnitActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecCallbackUnitActor.java index b8f55e0d..6b3584f1 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecCallbackUnitActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecCallbackUnitActor.java @@ -32,6 +32,7 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; +import java.time.LocalDateTime; import java.util.Objects; import java.util.concurrent.Callable; @@ -68,6 +69,8 @@ public class ExecCallbackUnitActor extends AbstractActor { retryTaskLog.setGroupName(retryTask.getGroupName()); retryTaskLog.setUniqueId(retryTask.getUniqueId()); retryTaskLog.setRetryStatus(retryTask.getRetryStatus()); + retryTaskLog.setTriggerTime(LocalDateTime.now()); + try { if (Objects.nonNull(serverNode)) { diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecUnitActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecUnitActor.java index 2cc915db..5cd11a28 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecUnitActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecUnitActor.java @@ -28,6 +28,7 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; +import java.time.LocalDateTime; import java.util.Objects; import java.util.concurrent.Callable; @@ -60,6 +61,7 @@ public class ExecUnitActor extends AbstractActor { retryTaskLog.setGroupName(retryTask.getGroupName()); retryTaskLog.setUniqueId(retryTask.getUniqueId()); retryTaskLog.setRetryStatus(retryTask.getRetryStatus()); + retryTaskLog.setTriggerTime(LocalDateTime.now()); try { diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/log/LogActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/log/LogActor.java index 8d471604..e61b3395 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/log/LogActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/log/LogActor.java @@ -16,6 +16,7 @@ import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import java.time.LocalDateTime; +import java.util.Optional; /** * 处理日志信息 @@ -69,7 +70,7 @@ public class LogActor extends AbstractActor { String errorMessage = retryTaskLogDTO.getMessage(); retryTaskLogMessage.setMessage( StrUtil.isBlank(errorMessage) ? StrUtil.EMPTY : errorMessage); - retryTaskLogMessage.setCreateDt(LocalDateTime.now()); + retryTaskLogMessage.setCreateDt(Optional.ofNullable(retryTaskLogDTO.getTriggerTime()).orElse(LocalDateTime.now())); retryTaskLogMessageMapper.insert(retryTaskLogMessage); } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/log/RetryTaskLogDTO.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/log/RetryTaskLogDTO.java index d56180fd..e2fffbce 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/log/RetryTaskLogDTO.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/log/RetryTaskLogDTO.java @@ -2,6 +2,8 @@ package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log; import lombok.Data; +import java.time.LocalDateTime; + /** * 日志上下文模型 * @@ -32,4 +34,6 @@ public class RetryTaskLogDTO { */ private Integer retryStatus; + private LocalDateTime triggerTime; + } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/result/FailureActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/result/FailureActor.java index 804ed3ee..f22044f5 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/result/FailureActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/result/FailureActor.java @@ -12,7 +12,7 @@ import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.retry.task.support.timer.CallbackTimerTask; import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerContext; import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerTask; -import com.aizuda.easy.retry.server.retry.task.support.timer.TimerWheelHandler; +import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerWheel; import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO; import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum; import com.aizuda.easy.retry.server.retry.task.support.handler.CallbackRetryTaskHandler; @@ -68,32 +68,17 @@ public class FailureActor extends AbstractActor { @Override protected void doInTransactionWithoutResult(TransactionStatus status) { - RetryTimerContext timerContext = new RetryTimerContext(); - timerContext.setGroupName(retryTask.getGroupName()); - timerContext.setUniqueId(retryTask.getUniqueId()); - - TimerTask timerTask = null; Integer maxRetryCount; if (TaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) { maxRetryCount = systemProperties.getCallback().getMaxCount(); - timerTask = new CallbackTimerTask(); - timerContext.setScene(TaskExecutorSceneEnum.AUTO_CALLBACK); } else { maxRetryCount = sceneConfig.getMaxRetryCount(); - timerTask = new RetryTimerTask(timerContext); - timerContext.setScene(TaskExecutorSceneEnum.AUTO_RETRY); } if (maxRetryCount <= retryTask.getRetryCount()) { retryTask.setRetryStatus(RetryStatusEnum.MAX_COUNT.getStatus()); // 创建一个回调任务 callbackRetryTaskHandler.create(retryTask); - } else { - // TODO 计算延迟的时间 此处需要判断符合条件的才会进入时间轮 - LocalDateTime nextTriggerAt = retryTask.getNextTriggerAt(); - long delay = nextTriggerAt.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - System.currentTimeMillis(); - log.info("准确进入时间轮 {} {}", nextTriggerAt, delay); - TimerWheelHandler.register(retryTask.getGroupName(), retryTask.getUniqueId(), timerTask, delay, TimeUnit.MILLISECONDS); } retryTask.setUpdateDt(LocalDateTime.now()); diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/result/NoRetryActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/result/NoRetryActor.java index 9f6052d1..307cc8ec 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/result/NoRetryActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/result/NoRetryActor.java @@ -41,8 +41,6 @@ public class NoRetryActor extends AbstractActor { RetryContext retryContext = retryExecutor.getRetryContext(); RetryTask retryTask = retryContext.getRetryTask(); - WaitStrategy waitStrategy = retryContext.getWaitStrategy(); - retryTask.setNextTriggerAt(waitStrategy.computeRetryTime(retryContext)); // 不更新重试次数 retryTask.setRetryCount(null); diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java index 9d07a853..c54fcc31 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java @@ -3,33 +3,35 @@ package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.scan; import akka.actor.AbstractActor; import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; -import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.IdempotentStrategy; +import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.dto.PartitionTask; import com.aizuda.easy.retry.server.common.dto.ScanTask; import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler; import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils; import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask; import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter; -import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutor; import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum; -import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies; +import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerWheel; import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import com.google.common.collect.Lists; +import io.netty.util.TimerTask; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.util.CollectionUtils; +import org.springframework.util.StopWatch; import java.time.LocalDateTime; +import java.time.ZoneId; import java.util.List; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; /** * 数据扫描模板类 @@ -53,83 +55,84 @@ public abstract class AbstractScanGroup extends AbstractActor { @Autowired protected List taskExecutors; + private static long preCostTime = 0L; + private static long loopCount = 1L; + @Override public Receive createReceive() { return receiveBuilder().match(ScanTask.class, config -> { + // 获取开始时间 + long startTime = System.nanoTime(); + try { doScan(config); } catch (Exception e) { LogUtils.error(log, "Data scanner processing exception. [{}]", config, e); } + // 获取结束时间 + long endTime = System.nanoTime(); + + preCostTime = (endTime - startTime) / 1_000_000; + log.info("重试任务调度耗时:[{}]", preCostTime); + }).build(); } protected void doScan(final ScanTask scanTask) { -// LocalDateTime lastAt = LocalDateTime.now().minusDays(systemProperties.getLastDays()); + // 计算循环拉取的次数 + if (preCostTime > 0) { + loopCount = (10 * 1000) / preCostTime; + loopCount = loopCount == 0 ? 1 : loopCount; + } + String groupName = scanTask.getGroupName(); Long lastId = Optional.ofNullable(getLastId(groupName)).orElse(0L); - int retryPullPageSize = systemProperties.getRetryPullPageSize(); - AtomicInteger count = new AtomicInteger(0); - long total = PartitionTaskUtils.process(startId -> { - // 没10秒触发一次扫描任务,每次扫描N次 - int i = count.incrementAndGet(); - // TODO 需要支持动态计算循环拉取多少次 - if (i > 5) { - return Lists.newArrayList(); - } - return listAvailableTasks(groupName, startId, taskActuatorScene().getTaskType().getType()); - }, - partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask), lastId); + PartitionTaskUtils.process(startId -> { + int i = count.getAndIncrement(); + if (i > loopCount) { + // 为空则中断处理 + return Lists.newArrayList(); + } + return listAvailableTasks(groupName, startId, taskActuatorScene().getTaskType().getType()); + }, + partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask), lastId); } private void processRetryPartitionTasks(List partitionTasks, ScanTask scanTask) { + + StopWatch watch = new StopWatch(); + watch.start(); if (!CollectionUtils.isEmpty(partitionTasks)) { - - // TODO 更新拉取的最大的id putLastId(scanTask.getGroupName(), partitionTasks.get(partitionTasks.size() - 1).getId()); - for (PartitionTask partitionTask : partitionTasks) { processRetryTask((RetryPartitionTask) partitionTask); - - // 已经存在时间轮里面的任务由时间轮负责调度 -// boolean existed = TimerWheelHandler.isExisted(retryTask.getGroupName(), retryTask.getUniqueId()); -// if (existed) { -// continue; -// } -// -// for (TaskExecutor taskExecutor : taskExecutors) { -// if (taskActuatorScene().getScene() == taskExecutor.getTaskType().getScene()) { -// taskExecutor.actuator(retryTask); -// } -// } } } else { - -// // 数据为空则休眠5s -// try { -// Thread.sleep((10 / 2) * 1000); -// } catch (InterruptedException e) { -// Thread.currentThread().interrupt(); -// } - putLastId(scanTask.getGroupName(), 0L); } + + watch.getTotalTimeMillis(); + } private void processRetryTask(RetryPartitionTask partitionTask) { + RetryTask retryTask = new RetryTask(); + retryTask.setNextTriggerAt(calculateNextTriggerTime(partitionTask)); + retryTask.setId(partitionTask.getId()); + accessTemplate.getRetryTaskAccess().updateById(partitionTask.getGroupName(), retryTask); - // 更新触发时间, 任务进入时间轮 -// WaitStrategies.WaitStrategyEnum.getWaitStrategy(partitionTask) -// waitStrategy.computeRetryTime(retryContext); - + long delay = partitionTask.getNextTriggerAt().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() + - System.currentTimeMillis(); + RetryTimerWheel.register(retryTask.getGroupName(), retryTask.getUniqueId(), timerTask(partitionTask), delay, + TimeUnit.MILLISECONDS); } protected abstract TaskExecutorSceneEnum taskActuatorScene(); @@ -138,17 +141,19 @@ public abstract class AbstractScanGroup extends AbstractActor { protected abstract void putLastId(String groupName, Long lastId); + protected abstract LocalDateTime calculateNextTriggerTime(RetryPartitionTask partitionTask); + + protected abstract TimerTask timerTask(RetryPartitionTask partitionTask); + public List listAvailableTasks(String groupName, Long lastId, Integer taskType) { - List retryTasks = accessTemplate.getRetryTaskAccess().listPage(groupName, new PageDTO<>(0, systemProperties.getRetryPullPageSize()), + List retryTasks = accessTemplate.getRetryTaskAccess() + .listPage(groupName, new PageDTO<>(0, systemProperties.getRetryPullPageSize()), new LambdaQueryWrapper() - .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()) - .eq(RetryTask::getGroupName, groupName).eq(RetryTask::getTaskType, taskType) - // TODO 提前10秒把需要执行的任务拉取出来 - .le(RetryTask::getNextTriggerAt, LocalDateTime.now().plusSeconds(10)).gt(RetryTask::getId, lastId) - // TODO 验证一下lastAt会不会改变 -// .gt(RetryTask::getCreateDt, lastAt) - .orderByAsc(RetryTask::getId)) - .getRecords(); + .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()) + .eq(RetryTask::getGroupName, groupName).eq(RetryTask::getTaskType, taskType) + .le(RetryTask::getNextTriggerAt, LocalDateTime.now().plusSeconds(10)).gt(RetryTask::getId, lastId) + .orderByAsc(RetryTask::getId)) + .getRecords(); return RetryTaskConverter.INSTANCE.toRetryPartitionTasks(retryTasks); } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java index 93c8c74f..c6daaded 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java @@ -1,14 +1,21 @@ package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.scan; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask; import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum; import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies; +import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyContext; +import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyEnum; +import com.aizuda.easy.retry.server.retry.task.support.timer.CallbackTimerTask; +import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerContext; +import io.netty.util.TimerTask; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; +import java.time.LocalDateTime; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -22,8 +29,6 @@ import java.util.concurrent.ConcurrentMap; @Slf4j public class ScanCallbackTaskActor extends AbstractScanGroup { - public static final String BEAN_NAME = "ScanCallbackTaskActor"; - /** * 缓存待拉取数据的起点id *

@@ -46,9 +51,28 @@ public class ScanCallbackTaskActor extends AbstractScanGroup { LAST_AT_MAP.put(groupName, lastId); } - private WaitStrategy getWaitWaitStrategy() { - // 回调失败每15min重试一次 - return WaitStrategies.WaitStrategyEnum.getWaitStrategy(WaitStrategies.WaitStrategyEnum.FIXED.getBackOff()); + @Override + protected LocalDateTime calculateNextTriggerTime(final RetryPartitionTask partitionTask) { + + long triggerInterval = systemProperties.getCallback().getTriggerInterval(); + WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(WaitStrategyEnum.FIXED.getBackOff()); + WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); + waitStrategyContext.setNextTriggerAt(partitionTask.getNextTriggerAt()); + waitStrategyContext.setTriggerInterval(String.valueOf(triggerInterval)); + + // 更新触发时间, 任务进入时间轮 + return waitStrategy.computeRetryTime(waitStrategyContext); } + @Override + protected TimerTask timerTask(final RetryPartitionTask partitionTask) { + RetryTimerContext retryTimerContext = new RetryTimerContext(); + retryTimerContext.setGroupName(partitionTask.getGroupName()); + retryTimerContext.setScene(taskActuatorScene()); + retryTimerContext.setUniqueId(partitionTask.getUniqueId()); + + return new CallbackTimerTask(retryTimerContext); + } + + } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java index 172f8b00..c0aff9c9 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java @@ -1,12 +1,21 @@ package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.scan; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask; +import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum; +import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyContext; +import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyEnum; +import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerContext; +import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerTask; +import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig; +import io.netty.util.TimerTask; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; +import java.time.LocalDateTime; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -42,4 +51,26 @@ public class ScanRetryTaskActor extends AbstractScanGroup { LAST_AT_MAP.put(groupName, lastId); } + protected LocalDateTime calculateNextTriggerTime(RetryPartitionTask partitionTask) { + // 更新下次触发时间 + SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess() + .getSceneConfigByGroupNameAndSceneName(partitionTask.getGroupName(), partitionTask.getSceneName()); + + WaitStrategyContext waitStrategyContext = new WaitStrategyContext(); + waitStrategyContext.setNextTriggerAt(partitionTask.getNextTriggerAt()); + waitStrategyContext.setTriggerInterval(sceneConfig.getTriggerInterval()); + waitStrategyContext.setTriggerCount(partitionTask.getRetryCount() + 1); + // 更新触发时间, 任务进入时间轮 + WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(sceneConfig.getBackOff()); + return waitStrategy.computeRetryTime(waitStrategyContext); + } + + @Override + protected TimerTask timerTask(final RetryPartitionTask partitionTask) { + RetryTimerContext retryTimerContext = new RetryTimerContext(); + retryTimerContext.setGroupName(partitionTask.getGroupName()); + retryTimerContext.setScene(taskActuatorScene()); + retryTimerContext.setUniqueId(partitionTask.getUniqueId()); + return new RetryTimerTask(retryTimerContext); + } } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/CallbackTaskExecutor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/CallbackTaskExecutor.java index cf26d555..2ea76de5 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/CallbackTaskExecutor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/CallbackTaskExecutor.java @@ -45,7 +45,7 @@ public class CallbackTaskExecutor extends AbstractTaskExecutor { .withStopStrategy(StopStrategies.stopException()) .withStopStrategy(StopStrategies.stopResultStatus()) .withWaitStrategy(getWaitWaitStrategy()) - .withFilterStrategy(FilterStrategies.triggerAtFilter()) +// .withFilterStrategy(FilterStrategies.triggerAtFilter()) .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) .withFilterStrategy(FilterStrategies.sceneBlackFilter()) .withFilterStrategy(FilterStrategies.checkAliveClientPodFilter()) diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualCallbackTaskExecutor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualCallbackTaskExecutor.java index 12ab4111..d1cab008 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualCallbackTaskExecutor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualCallbackTaskExecutor.java @@ -48,7 +48,6 @@ public class ManualCallbackTaskExecutor extends AbstractTaskExecutor { .withStopStrategy(StopStrategies.stopException()) .withStopStrategy(StopStrategies.stopResultStatus()) .withWaitStrategy(getWaitWaitStrategy()) - .withFilterStrategy(FilterStrategies.triggerAtFilter()) .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) .withFilterStrategy(FilterStrategies.sceneBlackFilter()) .withFilterStrategy(FilterStrategies.checkAliveClientPodFilter()) diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/RetryTaskExecutor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/RetryTaskExecutor.java index 7f0c5c03..bf6fba94 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/RetryTaskExecutor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/RetryTaskExecutor.java @@ -48,7 +48,7 @@ public class RetryTaskExecutor extends AbstractTaskExecutor { .withStopStrategy(StopStrategies.stopException()) .withStopStrategy(StopStrategies.stopResultStatusCode()) .withWaitStrategy(getWaitWaitStrategy(sceneConfig)) - .withFilterStrategy(FilterStrategies.triggerAtFilter()) +// .withFilterStrategy(FilterStrategies.triggerAtFilter()) .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) .withFilterStrategy(FilterStrategies.sceneBlackFilter()) .withFilterStrategy(FilterStrategies.checkAliveClientPodFilter()) diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/TimerIdempotent.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/TimerIdempotent.java new file mode 100644 index 00000000..a105bfef --- /dev/null +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/TimerIdempotent.java @@ -0,0 +1,37 @@ +package com.aizuda.easy.retry.server.retry.task.support.idempotent; + +import cn.hutool.core.util.StrUtil; +import com.aizuda.easy.retry.server.common.IdempotentStrategy; + +import java.util.HashSet; +import java.util.Set; + +/** + * @author www.byteblogs.com + * @date 2023-10-19 21:54:57 + * @since 2.4.0 + */ +public class TimerIdempotent implements IdempotentStrategy { + + private static final Set cache = new HashSet<>(); + + @Override + public boolean set(String key, String value) { + return cache.add(key.concat(StrUtil.UNDERLINE).concat(String.valueOf(value))); + } + + @Override + public String get(String s) { + throw new UnsupportedOperationException("不支持此操作"); + } + + @Override + public boolean isExist(String key, String value) { + return cache.contains(key.concat(StrUtil.UNDERLINE).concat(String.valueOf(value))); + } + + @Override + public boolean clear(String key, String value) { + return cache.removeIf(s-> s.equals(key.concat(StrUtil.UNDERLINE).concat(String.valueOf(value)))); + } +} diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/FilterStrategies.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/FilterStrategies.java index 9e3d9ea3..8150b7d9 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/FilterStrategies.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/FilterStrategies.java @@ -38,6 +38,7 @@ public class FilterStrategies { * * @return {@link TriggerAtFilterStrategies} 触发时间过滤策略 */ + @Deprecated public static FilterStrategy triggerAtFilter() { return new TriggerAtFilterStrategies(); } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/WaitStrategies.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/WaitStrategies.java index 7caef01e..12e96307 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/WaitStrategies.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/strategy/WaitStrategies.java @@ -1,16 +1,10 @@ package com.aizuda.easy.retry.server.retry.task.support.strategy; -import com.aizuda.easy.retry.common.core.context.SpringContext; +import cn.hutool.core.date.DateUtil; import com.aizuda.easy.retry.common.core.util.CronExpression; -import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.enums.DelayLevelEnum; -import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; -import com.aizuda.easy.retry.server.retry.task.support.RetryContext; import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; -import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; -import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; -import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig; import com.google.common.base.Preconditions; import lombok.Data; import lombok.Getter; @@ -30,12 +24,35 @@ import java.util.concurrent.TimeUnit; * @author: www.byteblogs.com * @date : 2021-11-29 18:19 */ -@SuppressWarnings({"squid:S3776", "squid:S2676", "squid:S3740"}) public class WaitStrategies { private WaitStrategies() { } + @Data + public static class WaitStrategyContext { +// /** +// * 触发类型 1.CRON 表达式 2. 固定时间 +// */ +// private Integer triggerType; + + /** + * 间隔时长 + */ + private String triggerInterval; + + /** + * 下次触发时间 + */ + private LocalDateTime nextTriggerAt; + + /** + * 触发次数 + */ + private Integer triggerCount; + + } + @Getter public enum WaitStrategyEnum { DELAY_LEVEL(1, delayLevelWait()), @@ -89,10 +106,6 @@ public class WaitStrategies { return WaitStrategyEnum.DELAY_LEVEL; } - @Data - public static class StrategyParameter { - } - } /** @@ -143,13 +156,12 @@ public class WaitStrategies { /** * 延迟等级等待策略 */ - private static final class DelayLevelWaitStrategy implements WaitStrategy { + private static final class DelayLevelWaitStrategy implements WaitStrategy { @Override - public LocalDateTime computeRetryTime(RetryContext context) { - RetryTask retryTask = context.getRetryTask(); - DelayLevelEnum levelEnum = DelayLevelEnum.getDelayLevelByLevel(retryTask.getRetryCount()); - return retryTask.getNextTriggerAt().plus(levelEnum.getTime(), levelEnum.getUnit()); + public LocalDateTime computeRetryTime(WaitStrategyContext context) { + DelayLevelEnum levelEnum = DelayLevelEnum.getDelayLevelByLevel(context.getTriggerCount()); + return context.getNextTriggerAt().plus(levelEnum.getTime(), levelEnum.getUnit()); } } @@ -159,22 +171,19 @@ public class WaitStrategies { private static final class FixedWaitStrategy implements WaitStrategy { @Override - public LocalDateTime computeRetryTime(RetryContext retryContext) { - RetryTask retryTask = retryContext.getRetryTask(); - long triggerInterval; - if (TaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) { - // 回调失败的默认15分钟执行一次重试 - SystemProperties systemProperties = SpringContext.CONTEXT.getBean(SystemProperties.class); - triggerInterval = systemProperties.getCallback().getTriggerInterval(); - } else { - AccessTemplate accessTemplate = SpringContext.CONTEXT.getBean(AccessTemplate.class); - SceneConfig sceneConfig = - accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName()); - triggerInterval = Integer.parseInt(sceneConfig.getTriggerInterval()); - } + public LocalDateTime computeRetryTime(WaitStrategyContext retryContext) { +// if (TaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) { +// // 回调失败的默认15分钟执行一次重试 +// SystemProperties systemProperties = SpringContext.CONTEXT.getBean(SystemProperties.class); +// triggerInterval = systemProperties.getCallback().getTriggerInterval(); +// } else { +// AccessTemplate accessTemplate = SpringContext.CONTEXT.getBean(AccessTemplate.class); +// SceneConfig sceneConfig = +// accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName()); +// triggerInterval = Integer.parseInt(sceneConfig.getTriggerInterval()); +// } - - return retryTask.getNextTriggerAt().plusSeconds(triggerInterval); + return retryContext.getNextTriggerAt().plusSeconds(Integer.parseInt(retryContext.getTriggerInterval())); } } @@ -184,23 +193,16 @@ public class WaitStrategies { private static final class CronWaitStrategy implements WaitStrategy { @Override - public LocalDateTime computeRetryTime(RetryContext context) { - RetryTask retryTask = context.getRetryTask(); + public LocalDateTime computeRetryTime(WaitStrategyContext context) { - AccessTemplate accessTemplate = SpringContext.CONTEXT.getBean(AccessTemplate.class); - - SceneConfig sceneConfig = - accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName()); - - Date nextValidTime; try { - ZonedDateTime zdt = retryTask.getNextTriggerAt().atZone(ZoneOffset.ofHours(8)); - nextValidTime = new CronExpression(sceneConfig.getTriggerInterval()).getNextValidTimeAfter(Date.from(zdt.toInstant())); + ZonedDateTime zdt = context.getNextTriggerAt().atZone(ZoneOffset.ofHours(8)); + Date nextValidTime = new CronExpression(context.getTriggerInterval()).getNextValidTimeAfter(Date.from(zdt.toInstant())); + return DateUtil.toLocalDateTime(nextValidTime); } catch (ParseException e) { - throw new EasyRetryServerException("解析CRON表达式异常 [{}]", sceneConfig.getTriggerInterval(), e); + throw new EasyRetryServerException("解析CRON表达式异常 [{}]", context.getTriggerInterval(), e); } - return LocalDateTime.ofEpochSecond( nextValidTime.getTime() / 1000,0, ZoneOffset.ofHours(8)); } } @@ -226,19 +228,12 @@ public class WaitStrategies { } @Override - public LocalDateTime computeRetryTime(RetryContext retryContext) { + public LocalDateTime computeRetryTime(WaitStrategyContext retryContext) { if (Objects.nonNull(retryContext)) { - RetryTask retryTask = retryContext.getRetryTask(); - - AccessTemplate accessTemplate = SpringContext.CONTEXT.getBean(AccessTemplate.class); - SceneConfig sceneConfig = - accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName()); - if (maximum == 0) { - maximum = Long.parseLong(sceneConfig.getTriggerInterval()); + maximum = Long.parseLong(retryContext.getTriggerInterval()); } - } Preconditions.checkArgument(maximum > minimum, "maximum must be > minimum but maximum is %d and minimum is", maximum, minimum); diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/AbstractTimerTask.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/AbstractTimerTask.java index 16246a14..db38b9f6 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/AbstractTimerTask.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/AbstractTimerTask.java @@ -2,24 +2,45 @@ package com.aizuda.easy.retry.server.retry.task.support.timer; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import lombok.extern.slf4j.Slf4j; + +import java.time.LocalDateTime; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * @author www.byteblogs.com * @date 2023-09-23 11:10:01 * @since 2.4.0 */ +@Slf4j public abstract class AbstractTimerTask implements TimerTask { protected String groupName; protected String uniqueId; + private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(8, 8, 10, TimeUnit.SECONDS, + new LinkedBlockingQueue<>()); + + @Override public void run(Timeout timeout) throws Exception { + log.info("开始执行重试任务. 当前时间:[{}] groupName:[{}] uniqueId:[{}]", LocalDateTime.now(), groupName, uniqueId); - // 先清除时间轮的缓存 - TimerWheelHandler.clearCache(groupName, uniqueId); + executor.execute(() -> { + + try { + // 先清除时间轮的缓存 + RetryTimerWheel.clearCache(groupName, uniqueId); + + doRun(timeout); + } catch (Exception e) { + log.error("重试任务执行失败", e); + } + + }); - doRun(timeout); } protected abstract void doRun(Timeout timeout); diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/CallbackTimerTask.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/CallbackTimerTask.java index 39b65c89..2ded2b5a 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/CallbackTimerTask.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/CallbackTimerTask.java @@ -1,18 +1,45 @@ package com.aizuda.easy.retry.server.retry.task.support.timer; +import com.aizuda.easy.retry.common.core.context.SpringContext; +import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; +import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuatorFactory; +import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutor; +import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; +import com.aizuda.easy.retry.template.datasource.access.TaskAccess; +import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import io.netty.util.Timeout; import io.netty.util.TimerTask; import lombok.extern.slf4j.Slf4j; +import java.time.LocalDateTime; + /** * @author: www.byteblogs.com * @date : 2023-09-22 17:09 */ @Slf4j -public class CallbackTimerTask implements TimerTask { +public class CallbackTimerTask extends AbstractTimerTask { + + private RetryTimerContext context; + + public CallbackTimerTask(RetryTimerContext context) { + this.context = context; + super.groupName = context.getGroupName(); + super.uniqueId = context.getUniqueId(); + } @Override - public void run(final Timeout timeout) throws Exception { - log.info("回调任务执行"); + protected void doRun(final Timeout timeout) { + log.info("回调任务执行 {}", LocalDateTime.now()); + AccessTemplate accessTemplate = SpringContext.getBeanByType(AccessTemplate.class); + TaskAccess retryTaskAccess = accessTemplate.getRetryTaskAccess(); + RetryTask retryTask = retryTaskAccess.one(context.getGroupName(), new LambdaQueryWrapper() + .eq(RetryTask::getGroupName, context.getGroupName()) + .eq(RetryTask::getUniqueId, context.getUniqueId()) + .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())); + TaskExecutor taskExecutor = TaskActuatorFactory.getTaskActuator(context.getScene()); + taskExecutor.actuator(retryTask); } + } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/RetryTimerTask.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/RetryTimerTask.java index e214e090..b3d22655 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/RetryTimerTask.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/RetryTimerTask.java @@ -33,7 +33,6 @@ public class RetryTimerTask extends AbstractTimerTask { @Override public void doRun(final Timeout timeout){ log.info("重试任务执行 {}", LocalDateTime.now()); - // todo AccessTemplate accessTemplate = SpringContext.getBeanByType(AccessTemplate.class); TaskAccess retryTaskAccess = accessTemplate.getRetryTaskAccess(); RetryTask retryTask = retryTaskAccess.one(context.getGroupName(), new LambdaQueryWrapper() diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/RetryTimerWheel.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/RetryTimerWheel.java new file mode 100644 index 00000000..f55c3313 --- /dev/null +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/RetryTimerWheel.java @@ -0,0 +1,61 @@ +package com.aizuda.easy.retry.server.retry.task.support.timer; + +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.common.Lifecycle; +import com.aizuda.easy.retry.server.retry.task.support.idempotent.TimerIdempotent; +import io.netty.util.HashedWheelTimer; +import io.netty.util.TimerTask; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.concurrent.CustomizableThreadFactory; +import org.springframework.stereotype.Component; + +import java.util.concurrent.TimeUnit; + +/** + * @author: www.byteblogs.com + * @date : 2023-09-22 17:03 + */ +@Component +@Slf4j +public class RetryTimerWheel implements Lifecycle { + + private static final int TICK_DURATION = 500; + private static final String THREAD_NAME_PREFIX = "retry-task-timer-wheel-"; + private static HashedWheelTimer timer = null; + + private static final TimerIdempotent idempotent = new TimerIdempotent(); + + @Override + public void start() { + timer = new HashedWheelTimer( + new CustomizableThreadFactory(THREAD_NAME_PREFIX), TICK_DURATION, TimeUnit.MILLISECONDS); + timer.start(); + } + + public static void register(String groupName, String uniqueId, TimerTask task, long delay, TimeUnit unit) { + + if (!isExisted(groupName, uniqueId)) { + delay = delay < 0 ? 0 : delay; + log.info("加入时间轮. delay:[{}ms] uniqueId:[{}]", delay, uniqueId); + try { + timer.newTimeout(task, delay, unit); + idempotent.set(uniqueId, uniqueId); + } catch (Exception e) { + LogUtils.error(log, "加入时间轮失败. uniqueId:[{}]", uniqueId, e); + } + } + } + + public static boolean isExisted(String groupName, String uniqueId) { + return idempotent.isExist(groupName, uniqueId); + } + + public static void clearCache(String groupName, String uniqueId) { + idempotent.clear(groupName, uniqueId); + } + + @Override + public void close() { + timer.stop(); + } +} diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/TimerWheelHandler.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/TimerWheelHandler.java deleted file mode 100644 index 20c079c8..00000000 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/timer/TimerWheelHandler.java +++ /dev/null @@ -1,107 +0,0 @@ -package com.aizuda.easy.retry.server.retry.task.support.timer; - -import cn.hutool.core.util.StrUtil; -import com.aizuda.easy.retry.common.core.log.LogUtils; -import com.aizuda.easy.retry.server.common.Lifecycle; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import io.netty.util.HashedWheelTimer; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; -import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; -import org.springframework.scheduling.concurrent.CustomizableThreadFactory; -import org.springframework.stereotype.Component; - -import java.util.Objects; -import java.util.concurrent.TimeUnit; - -/** - * @author: www.byteblogs.com - * @date : 2023-09-22 17:03 - */ -@Component -@Slf4j -public class TimerWheelHandler implements Lifecycle { - - private static HashedWheelTimer timer = null; - - private static Cache cache; - - @Override - public void start() { - - // TODO 支持可配置 - // tickDuration 和 timeUnit 一格的时间长度 - // ticksPerWheel 一圈有多少格 - timer = new HashedWheelTimer( - new CustomizableThreadFactory("retry-task-timer-wheel-"), 100, - TimeUnit.MILLISECONDS, 1024); - - timer.start(); - - cache = CacheBuilder.newBuilder() - // 设置并发级别为cpu核心数 - .concurrencyLevel(Runtime.getRuntime().availableProcessors()) - .build(); - } - - public static void register(String groupName, String uniqueId, TimerTask task, long delay, TimeUnit unit) { - - if (delay < 0) { - delay = 0; - } - - // TODO 支持可配置 - if (delay > 60 * 1000) { - LogUtils.warn(log, "距离下次执行时间过久, 不满足进入时间轮的条件. groupName:[{}] uniqueId:[{}] delay:[{}ms]", - groupName, uniqueId, delay); - return; - } - - Timeout timeout = getTimeout(groupName, uniqueId); - if (Objects.isNull(timeout)) { - try { - timeout = timer.newTimeout(task, delay, unit); - cache.put(getKey(groupName, uniqueId), timeout); - } catch (Exception e) { - LogUtils.error(log, "加入时间轮失败. groupName:[{}] uniqueId:[{}]", - groupName, uniqueId, e); - } - } - - } - - private static String getKey(String groupName, String uniqueId) { - return groupName.concat(StrUtil.UNDERLINE).concat(uniqueId); - } - - public static Timeout getTimeout(String groupName, String uniqueId) { - return cache.getIfPresent(getKey(groupName, uniqueId)); - } - - public static boolean isExisted(String groupName, String uniqueId) { - return Objects.nonNull(cache.getIfPresent(getKey(groupName, uniqueId))); - } - - public static boolean cancel(String groupName, String uniqueId) { - String key = getKey(groupName, uniqueId); - Timeout timeout = cache.getIfPresent(key); - if (Objects.isNull(timeout)) { - return false; - } - - cache.invalidate(key); - return timeout.cancel(); - } - - public static void clearCache(String groupName, String uniqueId) { - cache.invalidate(getKey(groupName, uniqueId)); - } - - @Override - public void close() { - timer.stop(); - cache.invalidateAll(); - } -}