From 01fb0cda20bff259604a3e93d7b53c60d049e5fd Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Sat, 23 Sep 2023 16:41:39 +0800 Subject: [PATCH] =?UTF-8?q?feat:2.4.0=201.=20=E9=80=9A=E8=BF=87=E6=97=B6?= =?UTF-8?q?=E9=97=B4=E8=BD=AE=E6=9D=A5=E5=87=8F=E5=B0=91=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E8=A7=A6=E5=8F=91=E6=97=B6=E9=97=B4=E8=AF=AF=E5=B7=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../task/support/cache/CacheBucketActor.java | 59 ------------ .../task/support/cache/CacheLockRecord.java | 2 +- .../dispatch/actor/AbstractTimerTask.java | 26 +++++ .../dispatch/actor/RetryTimerContext.java | 20 ++++ .../dispatch/actor/RetryTimerTask.java | 38 ++++++-- .../dispatch/actor/TimerWheelHandler.java | 80 +++++++++++++--- .../dispatch/actor/result/FailureActor.java | 27 ++++-- .../actor/scan/AbstractScanGroup.java | 63 ++---------- .../actor/scan/ScanCallbackTaskActor.java | 36 +------ .../actor/scan/ScanRetryTaskActor.java | 48 +--------- .../dispatch/task/AbstractTaskActuator.java | 95 +++++++++++++++++++ .../dispatch/task/CallbackTaskActuator.java | 69 ++++++++++++++ .../task/ManualCallbackTaskActuator.java | 78 +++++++++++++++ .../task/ManualRetryTaskActuator.java | 84 ++++++++++++++++ .../dispatch/task/RetryTaskActuator.java | 76 +++++++++++++++ .../support/dispatch/task/TaskActuator.java | 16 ++++ .../dispatch/task/TaskActuatorFactory.java | 25 +++++ .../dispatch/task/TaskActuatorSceneEnum.java | 24 +++++ .../server/dispatch/ConsumerBucketActor.java | 2 +- .../server/dispatch/DispatchService.java | 2 - .../service/impl/RetryTaskServiceImpl.java | 59 +++--------- 21 files changed, 665 insertions(+), 264 deletions(-) delete mode 100644 easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/cache/CacheBucketActor.java create mode 100644 easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/AbstractTimerTask.java create mode 100644 easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/RetryTimerContext.java create mode 100644 easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/AbstractTaskActuator.java create mode 100644 easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/CallbackTaskActuator.java create mode 100644 easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualCallbackTaskActuator.java create mode 100644 easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualRetryTaskActuator.java create mode 100644 easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/RetryTaskActuator.java create mode 100644 easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/TaskActuator.java create mode 100644 easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/TaskActuatorFactory.java create mode 100644 easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/TaskActuatorSceneEnum.java diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/cache/CacheBucketActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/cache/CacheBucketActor.java deleted file mode 100644 index 8a11d89d..00000000 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/cache/CacheBucketActor.java +++ /dev/null @@ -1,59 +0,0 @@ -package com.aizuda.easy.retry.server.retry.task.support.cache; - -import akka.actor.ActorRef; -import com.aizuda.easy.retry.common.core.log.LogUtils; -import com.aizuda.easy.retry.server.common.Lifecycle; -import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import lombok.Data; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -/** - * 缓存组扫描Actor - * - * @author www.byteblogs.com - * @date 2021-10-30 - * @since 1.0.0 - */ -@Component -@Data -@Slf4j -public class CacheBucketActor implements Lifecycle { - - private static Cache CACHE; - - /** - * 获取所有缓存 - * - * @return 缓存对象 - */ - public static ActorRef get(Integer bucket) { - return CACHE.getIfPresent(bucket); - } - - /** - * 获取所有缓存 - * - * @return 缓存对象 - */ - public static void put(Integer bucket, ActorRef actorRef) { - CACHE.put(bucket, actorRef); - } - - @Override - public void start() { - LogUtils.info(log, "CacheGroupScanActor start"); - CACHE = CacheBuilder.newBuilder() - // 设置并发级别为cpu核心数 - .concurrencyLevel(Runtime.getRuntime().availableProcessors()) - .build(); - } - - @Override - public void close() { - LogUtils.info(log, "CacheGroupScanActor stop"); - CACHE.invalidateAll(); - } -} diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/cache/CacheLockRecord.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/cache/CacheLockRecord.java index c022dd13..20a824a9 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/cache/CacheLockRecord.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/cache/CacheLockRecord.java @@ -53,6 +53,6 @@ public class CacheLockRecord implements Lifecycle { @Override public void close() { - + CACHE.invalidateAll(); } } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/AbstractTimerTask.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/AbstractTimerTask.java new file mode 100644 index 00000000..8ce0c47b --- /dev/null +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/AbstractTimerTask.java @@ -0,0 +1,26 @@ +package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor; + +import io.netty.util.Timeout; +import io.netty.util.TimerTask; + +/** + * @author www.byteblogs.com + * @date 2023-09-23 11:10:01 + * @since 2.4.0 + */ +public abstract class AbstractTimerTask implements TimerTask { + + protected String groupName; + protected String uniqueId; + + @Override + public void run(Timeout timeout) throws Exception { + + // 先清除时间轮的缓存 + TimerWheelHandler.clearCache(groupName, uniqueId); + + doRun(timeout); + } + + protected abstract void doRun(Timeout timeout); +} diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/RetryTimerContext.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/RetryTimerContext.java new file mode 100644 index 00000000..04b5dfbb --- /dev/null +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/RetryTimerContext.java @@ -0,0 +1,20 @@ +package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor; + +import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuatorSceneEnum; +import lombok.Data; + +/** + * @author www.byteblogs.com + * @date 2023-09-23 09:14:03 + * @since 2.4.0 + */ +@Data +public class RetryTimerContext { + + private String groupName; + + private String uniqueId; + + private TaskActuatorSceneEnum scene; + +} diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/RetryTimerTask.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/RetryTimerTask.java index ecec1e2e..8ce2215b 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/RetryTimerTask.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/RetryTimerTask.java @@ -1,10 +1,23 @@ package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor; +import com.aizuda.easy.retry.common.core.context.SpringContext; +import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; +import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuator; +import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuatorFactory; +import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuatorSceneEnum; import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor; +import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; +import com.aizuda.easy.retry.template.datasource.access.TaskAccess; +import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import io.netty.util.Timeout; import io.netty.util.TimerTask; import lombok.Data; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; + +import java.time.LocalDateTime; +import java.util.List; /** * @author: www.byteblogs.com @@ -12,14 +25,27 @@ import lombok.extern.slf4j.Slf4j; */ @Data @Slf4j -public class RetryTimerTask implements TimerTask { +public class RetryTimerTask extends AbstractTimerTask { - private RetryExecutor executor; + private RetryTimerContext context; + + public RetryTimerTask(RetryTimerContext context) { + this.context = context; + super.groupName = context.getGroupName(); + super.uniqueId = context.getUniqueId(); + } @Override - public void run(final Timeout timeout) throws Exception { - log.info("重试任务执行"); -// RetryContext retryContext = executor.getRetryContext(); -// RetryTask retryTask = retryContext.getRetryTask(); + public void doRun(final Timeout timeout){ + log.info("重试任务执行 {}", LocalDateTime.now()); + // todo + AccessTemplate accessTemplate = SpringContext.getBeanByType(AccessTemplate.class); + TaskAccess retryTaskAccess = accessTemplate.getRetryTaskAccess(); + RetryTask retryTask = retryTaskAccess.one(context.getGroupName(), new LambdaQueryWrapper() + .eq(RetryTask::getGroupName, context.getGroupName()) + .eq(RetryTask::getUniqueId, context.getUniqueId()) + .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())); + TaskActuator taskActuator = TaskActuatorFactory.getTaskActuator(context.getScene()); + taskActuator.actuator(retryTask); } } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/TimerWheelHandler.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/TimerWheelHandler.java index 431627d8..7a493dba 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/TimerWheelHandler.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/TimerWheelHandler.java @@ -1,13 +1,19 @@ package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor; +import cn.hutool.core.util.StrUtil; +import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.common.Lifecycle; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.stereotype.Component; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Objects; import java.util.concurrent.TimeUnit; /** @@ -15,31 +21,83 @@ import java.util.concurrent.TimeUnit; * @date : 2023-09-22 17:03 */ @Component +@Slf4j public class TimerWheelHandler implements Lifecycle { - private static HashedWheelTimer hashedWheelTimer = null; + private static HashedWheelTimer timer = null; - public static ConcurrentHashMap taskConcurrentHashMap = new ConcurrentHashMap<>(); + private static Cache cache; @Override public void start() { - hashedWheelTimer = new HashedWheelTimer( - new CustomizableThreadFactory("retry-task-timer-wheel"), 10, TimeUnit.MILLISECONDS, 16); - hashedWheelTimer.start(); + + // TODO 支持可配置 + // tickDuration 和 timeUnit 一格的时间长度 + // ticksPerWheel 一圈有多少格 + timer = new HashedWheelTimer( + new CustomizableThreadFactory("retry_task_timer_wheel_"), 100, + TimeUnit.MILLISECONDS, 1024); + + timer.start(); + + cache = CacheBuilder.newBuilder() + // 设置并发级别为cpu核心数 + .concurrencyLevel(Runtime.getRuntime().availableProcessors()) + .build(); } public static void register(String groupName, String uniqueId, TimerTask task, long delay, TimeUnit unit) { - Timeout timeout = hashedWheelTimer.newTimeout(task, delay, unit); - taskConcurrentHashMap.put(groupName.concat("_").concat(uniqueId), timeout); + + if (delay < 0) { + delay = 0; + } + + // TODO 支持可配置 + if (delay > 60 * 1000) { + LogUtils.warn(log, "距离下次执行时间过久, 不满足进入时间轮的条件. groupName:[{}] uniqueId:[{}] delay:[{}ms]", + groupName, uniqueId, delay); + return; + } + + Timeout timeout = getTimeout(groupName, uniqueId); + if (Objects.isNull(timeout)) { + try { + timeout = timer.newTimeout(task, delay, unit); + cache.put(getKey(groupName, uniqueId), timeout); + } catch (Exception e) { + LogUtils.error(log, "加入时间轮失败. groupName:[{}] uniqueId:[{}]", + groupName, uniqueId, e); + } + } + } - public static boolean cancel(Long taskId) { - Timeout timeout = taskConcurrentHashMap.get(taskId); + private static String getKey(String groupName, String uniqueId) { + return groupName.concat(StrUtil.UNDERLINE).concat(uniqueId); + } + + public static Timeout getTimeout(String groupName, String uniqueId) { + return cache.getIfPresent(getKey(groupName, uniqueId)); + } + + public static boolean cancel(String groupName, String uniqueId) { + String key = getKey(groupName, uniqueId); + Timeout timeout = cache.getIfPresent(key); + if (Objects.isNull(timeout)) { + return false; + } + + cache.invalidate(key); return timeout.cancel(); } + public static void clearCache(String groupName, String uniqueId) { + cache.invalidate(getKey(groupName, uniqueId)); + } + @Override public void close() { - hashedWheelTimer.stop(); + timer.stop(); + cache.invalidateAll(); } } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/result/FailureActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/result/FailureActor.java index b01fdb54..6ff02cea 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/result/FailureActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/result/FailureActor.java @@ -10,9 +10,11 @@ import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.CallbackTimerTask; +import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.RetryTimerContext; import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.RetryTimerTask; import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.TimerWheelHandler; import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO; +import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuatorSceneEnum; import com.aizuda.easy.retry.server.retry.task.support.handler.CallbackRetryTaskHandler; import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; @@ -59,39 +61,46 @@ public class FailureActor extends AbstractActor { // 超过最大等级 SceneConfig sceneConfig = - accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName()); + accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName()); try { transactionTemplate.execute(new TransactionCallbackWithoutResult() { @Override protected void doInTransactionWithoutResult(TransactionStatus status) { + RetryTimerContext timerContext = new RetryTimerContext(); + timerContext.setGroupName(retryTask.getGroupName()); + timerContext.setUniqueId(retryTask.getUniqueId()); + TimerTask timerTask = null; Integer maxRetryCount; if (TaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) { maxRetryCount = systemProperties.getCallback().getMaxCount(); timerTask = new CallbackTimerTask(); + timerContext.setScene(TaskActuatorSceneEnum.AUTO_CALLBACK); } else { maxRetryCount = sceneConfig.getMaxRetryCount(); - timerTask = new RetryTimerTask(); + timerTask = new RetryTimerTask(timerContext); + timerContext.setScene(TaskActuatorSceneEnum.AUTO_RETRY); } if (maxRetryCount <= retryTask.getRetryCount()) { retryTask.setRetryStatus(RetryStatusEnum.MAX_COUNT.getStatus()); // 创建一个回调任务 callbackRetryTaskHandler.create(retryTask); + } else { + // TODO 计算延迟的时间 此处需要判断符合条件的才会进入时间轮 + LocalDateTime nextTriggerAt = retryTask.getNextTriggerAt(); + long delay = nextTriggerAt.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - System.currentTimeMillis(); + log.info("准确进入时间轮 {} {}", nextTriggerAt, delay); + TimerWheelHandler.register(retryTask.getGroupName(), retryTask.getUniqueId(), timerTask, delay, TimeUnit.MILLISECONDS); } - // TODO 计算延迟的时间 此处需要判断符合条件的才会进入时间轮 - LocalDateTime nextTriggerAt = retryTask.getNextTriggerAt(); - long delay = nextTriggerAt.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); - TimerWheelHandler.register(retryTask.getGroupName(), retryTask.getUniqueId(), timerTask, delay, TimeUnit.MILLISECONDS); - retryTask.setUpdateDt(LocalDateTime.now()); Assert.isTrue(1 == accessTemplate.getRetryTaskAccess() - .updateById(retryTask.getGroupName(), retryTask), + .updateById(retryTask.getGroupName(), retryTask), () -> new EasyRetryServerException("更新重试任务失败. groupName:[{}] uniqueId:[{}]", - retryTask.getGroupName(), retryTask.getUniqueId())); + retryTask.getGroupName(), retryTask.getUniqueId())); } }); } catch (Exception e) { diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java index 57adc5d9..5888844f 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java @@ -1,36 +1,26 @@ package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.scan; import akka.actor.AbstractActor; -import akka.actor.ActorRef; -import cn.hutool.core.lang.Pair; import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy; -import com.aizuda.easy.retry.server.retry.task.support.RetryContext; import com.aizuda.easy.retry.server.common.dto.ScanTask; import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler; -import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.RetryTimerTask; -import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.TimerWheelHandler; -import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor; +import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuator; +import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuatorSceneEnum; import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; -import io.netty.util.HashedWheelTimer; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.util.CollectionUtils; import java.time.LocalDateTime; import java.util.List; -import java.util.Objects; import java.util.Optional; -import java.util.concurrent.TimeUnit; /** * 数据扫描模板类 @@ -51,7 +41,8 @@ public abstract class AbstractScanGroup extends AbstractActor { protected AccessTemplate accessTemplate; @Autowired protected ClientNodeAllocateHandler clientNodeAllocateHandler; - + @Autowired + protected List taskActuators; @Override public Receive createReceive() { @@ -75,7 +66,7 @@ public abstract class AbstractScanGroup extends AbstractActor { Long lastId = Optional.ofNullable(getLastId(groupName)).orElse(0L); // 扫描当前Group 待处理的任务 - List list = listAvailableTasks(groupName, lastAt, lastId, retryPullPageSize, getTaskType()); + List list = listAvailableTasks(groupName, lastAt, lastId, retryPullPageSize, taskActuatorScene().getScene()); if (!CollectionUtils.isEmpty(list)) { @@ -83,24 +74,10 @@ public abstract class AbstractScanGroup extends AbstractActor { putLastId(scanTask.getGroupName(), list.get(list.size() - 1).getId()); for (RetryTask retryTask : list) { - - // 重试次数累加 - retryCountIncrement(retryTask); - - RetryContext retryContext = builderRetryContext(groupName, retryTask); - RetryExecutor executor = builderResultRetryExecutor(retryContext); - - Pair pair = executor.filter(); - if (!pair.getKey()) { - log.warn("当前任务不满足执行条件. groupName:[{}] uniqueId:[{}], description:[{}]", - retryContext.getRetryTask().getGroupName(), - retryContext.getRetryTask().getUniqueId(), pair.getValue().toString()); - continue; - } - - Timeout timeout = TimerWheelHandler.taskConcurrentHashMap.get(retryTask.getGroupName().concat("_").concat(retryTask.getUniqueId())); - if (Objects.isNull(timeout)) { - productExecUnitActor(executor); + for (TaskActuator taskActuator : taskActuators) { + if (taskActuatorScene().getScene() == taskActuator.getTaskType().getScene()) { + taskActuator.actuator(retryTask); + } } } } else { @@ -116,32 +93,12 @@ public abstract class AbstractScanGroup extends AbstractActor { } - protected abstract RetryContext builderRetryContext(String groupName, RetryTask retryTask); - - protected abstract RetryExecutor builderResultRetryExecutor(RetryContext retryContext); - - protected abstract Integer getTaskType(); + protected abstract TaskActuatorSceneEnum taskActuatorScene(); protected abstract Long getLastId(String groupName); protected abstract void putLastId(String groupName, Long lastId); - private void retryCountIncrement(RetryTask retryTask) { - Integer retryCount = retryTask.getRetryCount(); - retryTask.setRetryCount(++retryCount); - } - - private void productExecUnitActor(RetryExecutor retryExecutor) { - String groupIdHash = retryExecutor.getRetryContext().getRetryTask().getGroupName(); - Long retryId = retryExecutor.getRetryContext().getRetryTask().getId(); - idempotentStrategy.set(groupIdHash, retryId.intValue()); - - ActorRef actorRef = getActorRef(); - actorRef.tell(retryExecutor, actorRef); - } - - protected abstract ActorRef getActorRef(); - public List listAvailableTasks(String groupName, LocalDateTime lastAt, Long lastId, diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java index 9913972b..fa657330 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanCallbackTaskActor.java @@ -7,6 +7,7 @@ import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.retry.task.support.RetryContext; import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; import com.aizuda.easy.retry.server.retry.task.support.context.CallbackRetryContext; +import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuatorSceneEnum; import com.aizuda.easy.retry.server.retry.task.support.retry.RetryBuilder; import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor; import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies; @@ -41,34 +42,8 @@ public class ScanCallbackTaskActor extends AbstractScanGroup { private static final ConcurrentMap LAST_AT_MAP = new ConcurrentHashMap<>(); @Override - protected RetryContext builderRetryContext(final String groupName, final RetryTask retryTask) { - - CallbackRetryContext retryContext = new CallbackRetryContext<>(); - retryContext.setRetryTask(retryTask); - retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName)); - retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName())); - return retryContext; - } - - @Override - protected RetryExecutor builderResultRetryExecutor(RetryContext retryContext) { - return RetryBuilder.newBuilder() - .withStopStrategy(StopStrategies.stopException()) - .withStopStrategy(StopStrategies.stopResultStatus()) - .withWaitStrategy(getWaitWaitStrategy()) - .withFilterStrategy(FilterStrategies.triggerAtFilter()) - .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) - .withFilterStrategy(FilterStrategies.sceneBlackFilter()) - .withFilterStrategy(FilterStrategies.checkAliveClientPodFilter()) - .withFilterStrategy(FilterStrategies.rebalanceFilterStrategies()) - .withFilterStrategy(FilterStrategies.rateLimiterFilter()) - .withRetryContext(retryContext) - .build(); - } - - @Override - protected Integer getTaskType() { - return TaskTypeEnum.CALLBACK.getType(); + protected TaskActuatorSceneEnum taskActuatorScene() { + return TaskActuatorSceneEnum.AUTO_CALLBACK; } @Override @@ -86,9 +61,4 @@ public class ScanCallbackTaskActor extends AbstractScanGroup { return WaitStrategies.WaitStrategyEnum.getWaitStrategy(WaitStrategies.WaitStrategyEnum.FIXED.getBackOff()); } - @Override - protected ActorRef getActorRef() { - return ActorGenerator.execCallbackUnitActor(); - } - } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java index 223efcfe..9e4dbe71 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/ScanRetryTaskActor.java @@ -8,6 +8,7 @@ import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.retry.task.support.RetryContext; import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; import com.aizuda.easy.retry.server.retry.task.support.context.MaxAttemptsPersistenceRetryContext; +import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuatorSceneEnum; import com.aizuda.easy.retry.server.retry.task.support.retry.RetryBuilder; import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor; import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies; @@ -41,36 +42,8 @@ public class ScanRetryTaskActor extends AbstractScanGroup { private static final ConcurrentMap LAST_AT_MAP = new ConcurrentHashMap<>(); @Override - protected RetryContext> builderRetryContext(final String groupName, - final RetryTask retryTask) { - MaxAttemptsPersistenceRetryContext> retryContext = new MaxAttemptsPersistenceRetryContext<>(); - retryContext.setRetryTask(retryTask); - retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName)); - retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName())); - return retryContext; - } - - @Override - protected RetryExecutor> builderResultRetryExecutor(RetryContext retryContext) { - - RetryTask retryTask = retryContext.getRetryTask(); - return RetryBuilder.>newBuilder() - .withStopStrategy(StopStrategies.stopException()) - .withStopStrategy(StopStrategies.stopResultStatusCode()) - .withWaitStrategy(getWaitWaitStrategy(retryTask.getGroupName(), retryTask.getSceneName())) - .withFilterStrategy(FilterStrategies.triggerAtFilter()) - .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) - .withFilterStrategy(FilterStrategies.sceneBlackFilter()) - .withFilterStrategy(FilterStrategies.checkAliveClientPodFilter()) - .withFilterStrategy(FilterStrategies.rebalanceFilterStrategies()) - .withFilterStrategy(FilterStrategies.rateLimiterFilter()) - .withRetryContext(retryContext) - .build(); - } - - @Override - protected Integer getTaskType() { - return TaskTypeEnum.RETRY.getType(); + protected TaskActuatorSceneEnum taskActuatorScene() { + return TaskActuatorSceneEnum.AUTO_RETRY; } @Override @@ -83,19 +56,4 @@ public class ScanRetryTaskActor extends AbstractScanGroup { LAST_AT_MAP.put(groupName, lastId); } - - private WaitStrategy getWaitWaitStrategy(String groupName, String sceneName) { - - SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(groupName, sceneName); - Integer backOff = sceneConfig.getBackOff(); - - return WaitStrategies.WaitStrategyEnum.getWaitStrategy(backOff); - } - - @Override - protected ActorRef getActorRef() { - return ActorGenerator.execUnitActor(); - } - - } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/AbstractTaskActuator.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/AbstractTaskActuator.java new file mode 100644 index 00000000..3fe871d4 --- /dev/null +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/AbstractTaskActuator.java @@ -0,0 +1,95 @@ +package com.aizuda.easy.retry.server.retry.task.support.dispatch.task; + +import akka.actor.ActorRef; +import cn.hutool.core.lang.Pair; +import com.aizuda.easy.retry.server.common.config.SystemProperties; +import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler; +import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy; +import com.aizuda.easy.retry.server.retry.task.support.RetryContext; +import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.TimerWheelHandler; +import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor; +import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; +import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; +import io.netty.util.Timeout; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; + +import java.util.Objects; + +/** + * + * + * @author www.byteblogs.com + * @date 2023-09-23 08:02:17 + * @since 2.4.0 + */ +@Slf4j +public abstract class AbstractTaskActuator implements TaskActuator, InitializingBean { + + @Autowired + @Qualifier("bitSetIdempotentStrategyHandler") + protected IdempotentStrategy idempotentStrategy; + @Autowired + protected SystemProperties systemProperties; + @Autowired + protected AccessTemplate accessTemplate; + @Autowired + protected ClientNodeAllocateHandler clientNodeAllocateHandler; + + @Override + public void actuator(RetryTask retryTask) { + // 重试次数累加 + retryCountIncrement(retryTask); + + RetryContext retryContext = builderRetryContext(retryTask.getGroupName(), retryTask); + RetryExecutor executor = builderResultRetryExecutor(retryContext); + + if (!preCheck(retryContext, executor)) { + return; + } + + Timeout timeout = TimerWheelHandler.getTimeout(retryTask.getGroupName(), retryTask.getUniqueId()); + if (Objects.isNull(timeout)) { + productExecUnitActor(executor); + } + } + + protected boolean preCheck(RetryContext retryContext, RetryExecutor executor) { + Pair pair = executor.filter(); + if (!pair.getKey()) { + log.warn("当前任务不满足执行条件. groupName:[{}] uniqueId:[{}], description:[{}]", + retryContext.getRetryTask().getGroupName(), + retryContext.getRetryTask().getUniqueId(), pair.getValue().toString()); + return false; + } + + return true; + } + + private void retryCountIncrement(RetryTask retryTask) { + Integer retryCount = retryTask.getRetryCount(); + retryTask.setRetryCount(++retryCount); + } + + private void productExecUnitActor(RetryExecutor retryExecutor) { + String groupIdHash = retryExecutor.getRetryContext().getRetryTask().getGroupName(); + Long retryId = retryExecutor.getRetryContext().getRetryTask().getId(); + idempotentStrategy.set(groupIdHash, retryId.intValue()); + + ActorRef actorRef = getActorRef(); + actorRef.tell(retryExecutor, actorRef); + } + + protected abstract RetryContext builderRetryContext(String groupName, RetryTask retryTask); + + protected abstract RetryExecutor builderResultRetryExecutor(RetryContext retryContext); + + protected abstract ActorRef getActorRef(); + + @Override + public void afterPropertiesSet() throws Exception { + TaskActuatorFactory.register(this.getTaskType(), this); + } +} diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/CallbackTaskActuator.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/CallbackTaskActuator.java new file mode 100644 index 00000000..567ddc17 --- /dev/null +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/CallbackTaskActuator.java @@ -0,0 +1,69 @@ +package com.aizuda.easy.retry.server.retry.task.support.dispatch.task; + +import akka.actor.ActorRef; +import com.aizuda.easy.retry.common.core.model.Result; +import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; +import com.aizuda.easy.retry.server.retry.task.support.RetryContext; +import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; +import com.aizuda.easy.retry.server.retry.task.support.context.CallbackRetryContext; +import com.aizuda.easy.retry.server.retry.task.support.retry.RetryBuilder; +import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor; +import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies; +import com.aizuda.easy.retry.server.retry.task.support.strategy.StopStrategies; +import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies; +import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; +import org.springframework.stereotype.Component; + +/** + * 回调任务执行器 + * + * @author www.byteblogs.com + * @date 2023-09-23 08:03:07 + * @since 2.4.0 + */ +@Component +public class CallbackTaskActuator extends AbstractTaskActuator { + + @Override + protected RetryContext builderRetryContext(final String groupName, final RetryTask retryTask) { + + CallbackRetryContext retryContext = new CallbackRetryContext<>(); + retryContext.setRetryTask(retryTask); + retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName)); + retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName())); + return retryContext; + } + + @Override + protected RetryExecutor builderResultRetryExecutor(RetryContext retryContext) { + return RetryBuilder.newBuilder() + .withStopStrategy(StopStrategies.stopException()) + .withStopStrategy(StopStrategies.stopResultStatus()) + .withWaitStrategy(getWaitWaitStrategy()) + .withFilterStrategy(FilterStrategies.triggerAtFilter()) + .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) + .withFilterStrategy(FilterStrategies.sceneBlackFilter()) + .withFilterStrategy(FilterStrategies.checkAliveClientPodFilter()) + .withFilterStrategy(FilterStrategies.rebalanceFilterStrategies()) + .withFilterStrategy(FilterStrategies.rateLimiterFilter()) + .withRetryContext(retryContext) + .build(); + } + + @Override + public TaskActuatorSceneEnum getTaskType() { + return TaskActuatorSceneEnum.AUTO_CALLBACK; + } + + private WaitStrategy getWaitWaitStrategy() { + // 回调失败每15min重试一次 + return WaitStrategies.WaitStrategyEnum.getWaitStrategy(WaitStrategies.WaitStrategyEnum.FIXED.getBackOff()); + } + + @Override + protected ActorRef getActorRef() { + return ActorGenerator.execCallbackUnitActor(); + } + +} diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualCallbackTaskActuator.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualCallbackTaskActuator.java new file mode 100644 index 00000000..f552ca7b --- /dev/null +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualCallbackTaskActuator.java @@ -0,0 +1,78 @@ +package com.aizuda.easy.retry.server.retry.task.support.dispatch.task; + +import akka.actor.ActorRef; +import cn.hutool.core.lang.Assert; +import cn.hutool.core.lang.Pair; +import com.aizuda.easy.retry.common.core.model.Result; +import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.retry.task.support.RetryContext; +import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; +import com.aizuda.easy.retry.server.retry.task.support.context.CallbackRetryContext; +import com.aizuda.easy.retry.server.retry.task.support.retry.RetryBuilder; +import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor; +import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies; +import com.aizuda.easy.retry.server.retry.task.support.strategy.StopStrategies; +import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies; +import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; +import org.springframework.stereotype.Component; + +/** + * 回调任务执行器 + * + * @author www.byteblogs.com + * @date 2023-09-23 08:03:07 + * @since 2.4.0 + */ +@Component +public class ManualCallbackTaskActuator extends AbstractTaskActuator { + + @Override + protected RetryContext builderRetryContext(final String groupName, final RetryTask retryTask) { + + CallbackRetryContext retryContext = new CallbackRetryContext<>(); + retryContext.setRetryTask(retryTask); + retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName)); + retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName())); + return retryContext; + } + + @Override + protected RetryExecutor builderResultRetryExecutor(RetryContext retryContext) { + return RetryBuilder.newBuilder() + .withStopStrategy(StopStrategies.stopException()) + .withStopStrategy(StopStrategies.stopResultStatus()) + .withWaitStrategy(getWaitWaitStrategy()) + .withFilterStrategy(FilterStrategies.triggerAtFilter()) + .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) + .withFilterStrategy(FilterStrategies.sceneBlackFilter()) + .withFilterStrategy(FilterStrategies.checkAliveClientPodFilter()) + .withFilterStrategy(FilterStrategies.rebalanceFilterStrategies()) + .withFilterStrategy(FilterStrategies.rateLimiterFilter()) + .withRetryContext(retryContext) + .build(); + } + + @Override + protected boolean preCheck(RetryContext retryContext, RetryExecutor executor) { + Pair pair = executor.filter(); + Assert.isTrue(pair.getKey(), () -> new EasyRetryServerException(pair.getValue().toString())); + return pair.getKey(); + } + + @Override + public TaskActuatorSceneEnum getTaskType() { + return TaskActuatorSceneEnum.MANUAL_CALLBACK; + } + + private WaitStrategy getWaitWaitStrategy() { + // 回调失败每15min重试一次 + return WaitStrategies.WaitStrategyEnum.getWaitStrategy(WaitStrategies.WaitStrategyEnum.FIXED.getBackOff()); + } + + @Override + protected ActorRef getActorRef() { + return ActorGenerator.execCallbackUnitActor(); + } + +} diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualRetryTaskActuator.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualRetryTaskActuator.java new file mode 100644 index 00000000..560d6f59 --- /dev/null +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualRetryTaskActuator.java @@ -0,0 +1,84 @@ +package com.aizuda.easy.retry.server.retry.task.support.dispatch.task; + +import akka.actor.ActorRef; +import cn.hutool.core.lang.Assert; +import cn.hutool.core.lang.Pair; +import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO; +import com.aizuda.easy.retry.common.core.model.Result; +import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.retry.task.support.RetryContext; +import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; +import com.aizuda.easy.retry.server.retry.task.support.context.MaxAttemptsPersistenceRetryContext; +import com.aizuda.easy.retry.server.retry.task.support.retry.RetryBuilder; +import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor; +import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies; +import com.aizuda.easy.retry.server.retry.task.support.strategy.StopStrategies; +import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies; +import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; +import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig; +import org.springframework.stereotype.Component; + +/** + * 重试任务执行器 + * + * @author www.byteblogs.com + * @date 2023-09-23 08:03:07 + * @since 2.4.0 + */ +@Component +public class ManualRetryTaskActuator extends AbstractTaskActuator { + + @Override + protected RetryContext> builderRetryContext(final String groupName, + final RetryTask retryTask) { + MaxAttemptsPersistenceRetryContext> retryContext = new MaxAttemptsPersistenceRetryContext<>(); + retryContext.setRetryTask(retryTask); + retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName)); + retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName())); + return retryContext; + } + + @Override + protected RetryExecutor> builderResultRetryExecutor(RetryContext retryContext) { + + RetryTask retryTask = retryContext.getRetryTask(); + return RetryBuilder.newBuilder() + .withStopStrategy(StopStrategies.stopException()) + .withStopStrategy(StopStrategies.stopResultStatusCode()) + .withWaitStrategy(getWaitWaitStrategy(retryTask.getGroupName(), retryTask.getSceneName())) + .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) + .withFilterStrategy(FilterStrategies.checkAliveClientPodFilter()) + .withFilterStrategy(FilterStrategies.rebalanceFilterStrategies()) + .withFilterStrategy(FilterStrategies.rateLimiterFilter()) + .withRetryContext(retryContext) + .build(); + } + + @Override + public TaskActuatorSceneEnum getTaskType() { + return TaskActuatorSceneEnum.MANUAL_RETRY; + } + + private WaitStrategy getWaitWaitStrategy(String groupName, String sceneName) { + + SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(groupName, sceneName); + Integer backOff = sceneConfig.getBackOff(); + + return WaitStrategies.WaitStrategyEnum.getWaitStrategy(backOff); + } + + @Override + protected boolean preCheck(RetryContext retryContext, RetryExecutor executor) { + Pair pair = executor.filter(); + Assert.isTrue(pair.getKey(), () -> new EasyRetryServerException(pair.getValue().toString())); + return pair.getKey(); + } + + + @Override + protected ActorRef getActorRef() { + return ActorGenerator.execUnitActor(); + } + +} diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/RetryTaskActuator.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/RetryTaskActuator.java new file mode 100644 index 00000000..e74b514a --- /dev/null +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/RetryTaskActuator.java @@ -0,0 +1,76 @@ +package com.aizuda.easy.retry.server.retry.task.support.dispatch.task; + +import akka.actor.ActorRef; +import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO; +import com.aizuda.easy.retry.common.core.model.Result; +import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; +import com.aizuda.easy.retry.server.retry.task.support.RetryContext; +import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; +import com.aizuda.easy.retry.server.retry.task.support.context.MaxAttemptsPersistenceRetryContext; +import com.aizuda.easy.retry.server.retry.task.support.retry.RetryBuilder; +import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor; +import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies; +import com.aizuda.easy.retry.server.retry.task.support.strategy.StopStrategies; +import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies; +import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; +import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig; +import org.springframework.stereotype.Component; + +/** + * 重试任务执行器 + * + * @author www.byteblogs.com + * @date 2023-09-23 08:03:07 + * @since 2.4.0 + */ +@Component +public class RetryTaskActuator extends AbstractTaskActuator { + + @Override + protected RetryContext> builderRetryContext(final String groupName, + final RetryTask retryTask) { + MaxAttemptsPersistenceRetryContext> retryContext = new MaxAttemptsPersistenceRetryContext<>(); + retryContext.setRetryTask(retryTask); + retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName)); + retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName())); + return retryContext; + } + + @Override + protected RetryExecutor> builderResultRetryExecutor(RetryContext retryContext) { + + RetryTask retryTask = retryContext.getRetryTask(); + return RetryBuilder.>newBuilder() + .withStopStrategy(StopStrategies.stopException()) + .withStopStrategy(StopStrategies.stopResultStatusCode()) + .withWaitStrategy(getWaitWaitStrategy(retryTask.getGroupName(), retryTask.getSceneName())) + .withFilterStrategy(FilterStrategies.triggerAtFilter()) + .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) + .withFilterStrategy(FilterStrategies.sceneBlackFilter()) + .withFilterStrategy(FilterStrategies.checkAliveClientPodFilter()) + .withFilterStrategy(FilterStrategies.rebalanceFilterStrategies()) + .withFilterStrategy(FilterStrategies.rateLimiterFilter()) + .withRetryContext(retryContext) + .build(); + } + + @Override + public TaskActuatorSceneEnum getTaskType() { + return TaskActuatorSceneEnum.AUTO_RETRY; + } + + private WaitStrategy getWaitWaitStrategy(String groupName, String sceneName) { + + SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(groupName, sceneName); + Integer backOff = sceneConfig.getBackOff(); + + return WaitStrategies.WaitStrategyEnum.getWaitStrategy(backOff); + } + + @Override + protected ActorRef getActorRef() { + return ActorGenerator.execUnitActor(); + } + +} diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/TaskActuator.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/TaskActuator.java new file mode 100644 index 00000000..a22c90f4 --- /dev/null +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/TaskActuator.java @@ -0,0 +1,16 @@ +package com.aizuda.easy.retry.server.retry.task.support.dispatch.task; + +import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; +import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; + +/** + * @author www.byteblogs.com + * @date 2023-09-23 08:01:38 + * @since 2.4.0 + */ +public interface TaskActuator { + + TaskActuatorSceneEnum getTaskType(); + + void actuator(RetryTask retryTask); +} diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/TaskActuatorFactory.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/TaskActuatorFactory.java new file mode 100644 index 00000000..3812d324 --- /dev/null +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/TaskActuatorFactory.java @@ -0,0 +1,25 @@ +package com.aizuda.easy.retry.server.retry.task.support.dispatch.task; + +import com.aizuda.easy.retry.template.datasource.access.Access; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author www.byteblogs.com + * @date 2023-09-23 09:16:23 + * @since 2.4.0 + */ +public class TaskActuatorFactory { + + private static final Map REGISTER_TASK_ACTUATOR = new HashMap<>(); + + protected static void register(TaskActuatorSceneEnum scene,TaskActuator taskActuator) { + REGISTER_TASK_ACTUATOR.put(scene, taskActuator); + } + + public static TaskActuator getTaskActuator(TaskActuatorSceneEnum scene) { + return REGISTER_TASK_ACTUATOR.get(scene); + } + +} diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/TaskActuatorSceneEnum.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/TaskActuatorSceneEnum.java new file mode 100644 index 00000000..0c9ced1e --- /dev/null +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/TaskActuatorSceneEnum.java @@ -0,0 +1,24 @@ +package com.aizuda.easy.retry.server.retry.task.support.dispatch.task; + +import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * @author www.byteblogs.com + * @date 2023-09-23 08:49:21 + * @since 2.4.0 + */ +@AllArgsConstructor +@Getter +public enum TaskActuatorSceneEnum { + AUTO_RETRY(1, TaskTypeEnum.RETRY), + MANUAL_RETRY(2, TaskTypeEnum.RETRY), + AUTO_CALLBACK(3, TaskTypeEnum.CALLBACK), + MANUAL_CALLBACK(4, TaskTypeEnum.CALLBACK); + + private final int scene; + private final TaskTypeEnum taskType; + + +} diff --git a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/ConsumerBucketActor.java b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/ConsumerBucketActor.java index 4055a3ba..ae91c30c 100644 --- a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/ConsumerBucketActor.java +++ b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/ConsumerBucketActor.java @@ -71,8 +71,8 @@ public class ConsumerBucketActor extends AbstractActor { .in(SceneConfig::getBucketIndex, consumerBucket.getBuckets()) .groupBy(SceneConfig::getGroupName)).stream().map(SceneConfig::getGroupName).collect(Collectors.toSet()); - CacheConsumerGroup.clear(); // todo 需要对groupNameSet进行状态过滤只有开启才进行任务调度 + // todo 通过同步线程对集群中的当前节点需要处理的组进行同步 for (final String groupName : groupNameSet) { CacheConsumerGroup.addOrUpdate(groupName); ScanTask scanTask = new ScanTask(); diff --git a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/DispatchService.java b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/DispatchService.java index 378020a7..0ba4746c 100644 --- a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/DispatchService.java +++ b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/dispatch/DispatchService.java @@ -5,13 +5,11 @@ import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.common.Lifecycle; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.dto.DistributeInstance; -import com.aizuda.easy.retry.server.retry.task.support.cache.CacheBucketActor; import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; -import java.util.Objects; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/RetryTaskServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/RetryTaskServiceImpl.java index ace0394b..d09debf7 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/RetryTaskServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/RetryTaskServiceImpl.java @@ -22,6 +22,8 @@ import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; import com.aizuda.easy.retry.server.retry.task.support.context.CallbackRetryContext; import com.aizuda.easy.retry.server.retry.task.support.context.MaxAttemptsPersistenceRetryContext; import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler; +import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuator; +import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuatorSceneEnum; import com.aizuda.easy.retry.server.retry.task.support.retry.RetryBuilder; import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor; import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies; @@ -97,6 +99,8 @@ public class RetryTaskServiceImpl implements RetryTaskService { @Autowired @Qualifier("bitSetIdempotentStrategyHandler") protected IdempotentStrategy idempotentStrategy; + @Autowired + private List taskActuators; @Override public PageResult> getRetryTaskPage(RetryTaskQueryVO queryVO) { @@ -327,29 +331,13 @@ public class RetryTaskServiceImpl implements RetryTaskService { .in(RetryTask::getUniqueId, uniqueIds)); Assert.notEmpty(list, () -> new EasyRetryServerException("没有可执行的任务")); + for (RetryTask retryTask : list) { - MaxAttemptsPersistenceRetryContext> retryContext = new MaxAttemptsPersistenceRetryContext<>(); - retryContext.setRetryTask(retryTask); - retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName)); - retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName())); - - retryCountIncrement(retryTask); - - RetryExecutor> executor = RetryBuilder.>newBuilder() - .withStopStrategy(StopStrategies.stopException()) - .withStopStrategy(StopStrategies.stopResultStatusCode()) - .withWaitStrategy(getRetryTaskWaitWaitStrategy(retryTask.getGroupName(), retryTask.getSceneName())) - .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) - .withFilterStrategy(FilterStrategies.checkAliveClientPodFilter()) - .withFilterStrategy(FilterStrategies.rebalanceFilterStrategies()) - .withFilterStrategy(FilterStrategies.rateLimiterFilter()) - .withRetryContext(retryContext) - .build(); - - Pair pair = executor.filter(); - Assert.isTrue(pair.getKey(), () -> new EasyRetryServerException(pair.getValue().toString())); - - productExecUnitActor(executor, ActorGenerator.execUnitActor()); + for (TaskActuator taskActuator : taskActuators) { + if (taskActuator.getTaskType().getScene() == TaskActuatorSceneEnum.MANUAL_RETRY.getScene()) { + taskActuator.actuator(retryTask); + } + } } return true; @@ -367,29 +355,12 @@ public class RetryTaskServiceImpl implements RetryTaskService { Assert.notEmpty(list, () -> new EasyRetryServerException("没有可执行的任务")); for (RetryTask retryTask : list) { + for (TaskActuator taskActuator : taskActuators) { + if (taskActuator.getTaskType().getScene() == TaskActuatorSceneEnum.MANUAL_CALLBACK.getScene()) { + taskActuator.actuator(retryTask); + } + } - CallbackRetryContext retryContext = new CallbackRetryContext<>(); - retryContext.setRetryTask(retryTask); - retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName)); - retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName())); - - retryCountIncrement(retryTask); - - RetryExecutor executor = RetryBuilder.newBuilder() - .withStopStrategy(StopStrategies.stopException()) - .withStopStrategy(StopStrategies.stopResultStatusCode()) - .withWaitStrategy(getCallbackWaitWaitStrategy()) - .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) - .withFilterStrategy(FilterStrategies.checkAliveClientPodFilter()) - .withFilterStrategy(FilterStrategies.rebalanceFilterStrategies()) - .withFilterStrategy(FilterStrategies.rateLimiterFilter()) - .withRetryContext(retryContext) - .build(); - - Pair pair = executor.filter(); - Assert.isTrue(pair.getKey(), () -> new EasyRetryServerException(pair.getValue().toString())); - - productExecUnitActor(executor, ActorGenerator.execCallbackUnitActor()); } return true;