feat:2.4.0
1. 通过时间轮来减少任务触发时间误差
This commit is contained in:
parent
9cef08d338
commit
01fb0cda20
@ -1,59 +0,0 @@
|
|||||||
package com.aizuda.easy.retry.server.retry.task.support.cache;
|
|
||||||
|
|
||||||
import akka.actor.ActorRef;
|
|
||||||
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
|
||||||
import com.aizuda.easy.retry.server.common.Lifecycle;
|
|
||||||
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
|
|
||||||
import com.google.common.cache.Cache;
|
|
||||||
import com.google.common.cache.CacheBuilder;
|
|
||||||
import lombok.Data;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 缓存组扫描Actor
|
|
||||||
*
|
|
||||||
* @author www.byteblogs.com
|
|
||||||
* @date 2021-10-30
|
|
||||||
* @since 1.0.0
|
|
||||||
*/
|
|
||||||
@Component
|
|
||||||
@Data
|
|
||||||
@Slf4j
|
|
||||||
public class CacheBucketActor implements Lifecycle {
|
|
||||||
|
|
||||||
private static Cache<Integer, ActorRef> CACHE;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取所有缓存
|
|
||||||
*
|
|
||||||
* @return 缓存对象
|
|
||||||
*/
|
|
||||||
public static ActorRef get(Integer bucket) {
|
|
||||||
return CACHE.getIfPresent(bucket);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取所有缓存
|
|
||||||
*
|
|
||||||
* @return 缓存对象
|
|
||||||
*/
|
|
||||||
public static void put(Integer bucket, ActorRef actorRef) {
|
|
||||||
CACHE.put(bucket, actorRef);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void start() {
|
|
||||||
LogUtils.info(log, "CacheGroupScanActor start");
|
|
||||||
CACHE = CacheBuilder.newBuilder()
|
|
||||||
// 设置并发级别为cpu核心数
|
|
||||||
.concurrencyLevel(Runtime.getRuntime().availableProcessors())
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
LogUtils.info(log, "CacheGroupScanActor stop");
|
|
||||||
CACHE.invalidateAll();
|
|
||||||
}
|
|
||||||
}
|
|
@ -53,6 +53,6 @@ public class CacheLockRecord implements Lifecycle {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
|
CACHE.invalidateAll();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,26 @@
|
|||||||
|
package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor;
|
||||||
|
|
||||||
|
import io.netty.util.Timeout;
|
||||||
|
import io.netty.util.TimerTask;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author www.byteblogs.com
|
||||||
|
* @date 2023-09-23 11:10:01
|
||||||
|
* @since 2.4.0
|
||||||
|
*/
|
||||||
|
public abstract class AbstractTimerTask implements TimerTask {
|
||||||
|
|
||||||
|
protected String groupName;
|
||||||
|
protected String uniqueId;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run(Timeout timeout) throws Exception {
|
||||||
|
|
||||||
|
// 先清除时间轮的缓存
|
||||||
|
TimerWheelHandler.clearCache(groupName, uniqueId);
|
||||||
|
|
||||||
|
doRun(timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract void doRun(Timeout timeout);
|
||||||
|
}
|
@ -0,0 +1,20 @@
|
|||||||
|
package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor;
|
||||||
|
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuatorSceneEnum;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author www.byteblogs.com
|
||||||
|
* @date 2023-09-23 09:14:03
|
||||||
|
* @since 2.4.0
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class RetryTimerContext {
|
||||||
|
|
||||||
|
private String groupName;
|
||||||
|
|
||||||
|
private String uniqueId;
|
||||||
|
|
||||||
|
private TaskActuatorSceneEnum scene;
|
||||||
|
|
||||||
|
}
|
@ -1,10 +1,23 @@
|
|||||||
package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor;
|
package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor;
|
||||||
|
|
||||||
|
import com.aizuda.easy.retry.common.core.context.SpringContext;
|
||||||
|
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuator;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuatorFactory;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuatorSceneEnum;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
|
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.access.TaskAccess;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
import io.netty.util.Timeout;
|
import io.netty.util.Timeout;
|
||||||
import io.netty.util.TimerTask;
|
import io.netty.util.TimerTask;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author: www.byteblogs.com
|
* @author: www.byteblogs.com
|
||||||
@ -12,14 +25,27 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class RetryTimerTask implements TimerTask {
|
public class RetryTimerTask extends AbstractTimerTask {
|
||||||
|
|
||||||
private RetryExecutor executor;
|
private RetryTimerContext context;
|
||||||
|
|
||||||
|
public RetryTimerTask(RetryTimerContext context) {
|
||||||
|
this.context = context;
|
||||||
|
super.groupName = context.getGroupName();
|
||||||
|
super.uniqueId = context.getUniqueId();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run(final Timeout timeout) throws Exception {
|
public void doRun(final Timeout timeout){
|
||||||
log.info("重试任务执行");
|
log.info("重试任务执行 {}", LocalDateTime.now());
|
||||||
// RetryContext retryContext = executor.getRetryContext();
|
// todo
|
||||||
// RetryTask retryTask = retryContext.getRetryTask();
|
AccessTemplate accessTemplate = SpringContext.getBeanByType(AccessTemplate.class);
|
||||||
|
TaskAccess<RetryTask> retryTaskAccess = accessTemplate.getRetryTaskAccess();
|
||||||
|
RetryTask retryTask = retryTaskAccess.one(context.getGroupName(), new LambdaQueryWrapper<RetryTask>()
|
||||||
|
.eq(RetryTask::getGroupName, context.getGroupName())
|
||||||
|
.eq(RetryTask::getUniqueId, context.getUniqueId())
|
||||||
|
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()));
|
||||||
|
TaskActuator taskActuator = TaskActuatorFactory.getTaskActuator(context.getScene());
|
||||||
|
taskActuator.actuator(retryTask);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,13 +1,19 @@
|
|||||||
package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor;
|
package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor;
|
||||||
|
|
||||||
|
import cn.hutool.core.util.StrUtil;
|
||||||
|
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||||
import com.aizuda.easy.retry.server.common.Lifecycle;
|
import com.aizuda.easy.retry.server.common.Lifecycle;
|
||||||
|
import com.google.common.cache.Cache;
|
||||||
|
import com.google.common.cache.CacheBuilder;
|
||||||
import io.netty.util.HashedWheelTimer;
|
import io.netty.util.HashedWheelTimer;
|
||||||
import io.netty.util.Timeout;
|
import io.netty.util.Timeout;
|
||||||
import io.netty.util.TimerTask;
|
import io.netty.util.TimerTask;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
|
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -15,31 +21,83 @@ import java.util.concurrent.TimeUnit;
|
|||||||
* @date : 2023-09-22 17:03
|
* @date : 2023-09-22 17:03
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
|
@Slf4j
|
||||||
public class TimerWheelHandler implements Lifecycle {
|
public class TimerWheelHandler implements Lifecycle {
|
||||||
|
|
||||||
private static HashedWheelTimer hashedWheelTimer = null;
|
private static HashedWheelTimer timer = null;
|
||||||
|
|
||||||
public static ConcurrentHashMap<String, Timeout> taskConcurrentHashMap = new ConcurrentHashMap<>();
|
private static Cache<String, Timeout> cache;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start() {
|
public void start() {
|
||||||
hashedWheelTimer = new HashedWheelTimer(
|
|
||||||
new CustomizableThreadFactory("retry-task-timer-wheel"), 10, TimeUnit.MILLISECONDS, 16);
|
// TODO 支持可配置
|
||||||
hashedWheelTimer.start();
|
// tickDuration 和 timeUnit 一格的时间长度
|
||||||
|
// ticksPerWheel 一圈有多少格
|
||||||
|
timer = new HashedWheelTimer(
|
||||||
|
new CustomizableThreadFactory("retry_task_timer_wheel_"), 100,
|
||||||
|
TimeUnit.MILLISECONDS, 1024);
|
||||||
|
|
||||||
|
timer.start();
|
||||||
|
|
||||||
|
cache = CacheBuilder.newBuilder()
|
||||||
|
// 设置并发级别为cpu核心数
|
||||||
|
.concurrencyLevel(Runtime.getRuntime().availableProcessors())
|
||||||
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void register(String groupName, String uniqueId, TimerTask task, long delay, TimeUnit unit) {
|
public static void register(String groupName, String uniqueId, TimerTask task, long delay, TimeUnit unit) {
|
||||||
Timeout timeout = hashedWheelTimer.newTimeout(task, delay, unit);
|
|
||||||
taskConcurrentHashMap.put(groupName.concat("_").concat(uniqueId), timeout);
|
if (delay < 0) {
|
||||||
|
delay = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO 支持可配置
|
||||||
|
if (delay > 60 * 1000) {
|
||||||
|
LogUtils.warn(log, "距离下次执行时间过久, 不满足进入时间轮的条件. groupName:[{}] uniqueId:[{}] delay:[{}ms]",
|
||||||
|
groupName, uniqueId, delay);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Timeout timeout = getTimeout(groupName, uniqueId);
|
||||||
|
if (Objects.isNull(timeout)) {
|
||||||
|
try {
|
||||||
|
timeout = timer.newTimeout(task, delay, unit);
|
||||||
|
cache.put(getKey(groupName, uniqueId), timeout);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LogUtils.error(log, "加入时间轮失败. groupName:[{}] uniqueId:[{}]",
|
||||||
|
groupName, uniqueId, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static boolean cancel(Long taskId) {
|
private static String getKey(String groupName, String uniqueId) {
|
||||||
Timeout timeout = taskConcurrentHashMap.get(taskId);
|
return groupName.concat(StrUtil.UNDERLINE).concat(uniqueId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Timeout getTimeout(String groupName, String uniqueId) {
|
||||||
|
return cache.getIfPresent(getKey(groupName, uniqueId));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean cancel(String groupName, String uniqueId) {
|
||||||
|
String key = getKey(groupName, uniqueId);
|
||||||
|
Timeout timeout = cache.getIfPresent(key);
|
||||||
|
if (Objects.isNull(timeout)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
cache.invalidate(key);
|
||||||
return timeout.cancel();
|
return timeout.cancel();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void clearCache(String groupName, String uniqueId) {
|
||||||
|
cache.invalidate(getKey(groupName, uniqueId));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
hashedWheelTimer.stop();
|
timer.stop();
|
||||||
|
cache.invalidateAll();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -10,9 +10,11 @@ import com.aizuda.easy.retry.server.common.config.SystemProperties;
|
|||||||
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
|
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
|
||||||
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.CallbackTimerTask;
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.CallbackTimerTask;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.RetryTimerContext;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.RetryTimerTask;
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.RetryTimerTask;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.TimerWheelHandler;
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.TimerWheelHandler;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO;
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuatorSceneEnum;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.handler.CallbackRetryTaskHandler;
|
import com.aizuda.easy.retry.server.retry.task.support.handler.CallbackRetryTaskHandler;
|
||||||
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
|
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
||||||
@ -59,39 +61,46 @@ public class FailureActor extends AbstractActor {
|
|||||||
|
|
||||||
// 超过最大等级
|
// 超过最大等级
|
||||||
SceneConfig sceneConfig =
|
SceneConfig sceneConfig =
|
||||||
accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName());
|
accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
||||||
@Override
|
@Override
|
||||||
protected void doInTransactionWithoutResult(TransactionStatus status) {
|
protected void doInTransactionWithoutResult(TransactionStatus status) {
|
||||||
|
|
||||||
|
RetryTimerContext timerContext = new RetryTimerContext();
|
||||||
|
timerContext.setGroupName(retryTask.getGroupName());
|
||||||
|
timerContext.setUniqueId(retryTask.getUniqueId());
|
||||||
|
|
||||||
TimerTask timerTask = null;
|
TimerTask timerTask = null;
|
||||||
Integer maxRetryCount;
|
Integer maxRetryCount;
|
||||||
if (TaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) {
|
if (TaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) {
|
||||||
maxRetryCount = systemProperties.getCallback().getMaxCount();
|
maxRetryCount = systemProperties.getCallback().getMaxCount();
|
||||||
timerTask = new CallbackTimerTask();
|
timerTask = new CallbackTimerTask();
|
||||||
|
timerContext.setScene(TaskActuatorSceneEnum.AUTO_CALLBACK);
|
||||||
} else {
|
} else {
|
||||||
maxRetryCount = sceneConfig.getMaxRetryCount();
|
maxRetryCount = sceneConfig.getMaxRetryCount();
|
||||||
timerTask = new RetryTimerTask();
|
timerTask = new RetryTimerTask(timerContext);
|
||||||
|
timerContext.setScene(TaskActuatorSceneEnum.AUTO_RETRY);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (maxRetryCount <= retryTask.getRetryCount()) {
|
if (maxRetryCount <= retryTask.getRetryCount()) {
|
||||||
retryTask.setRetryStatus(RetryStatusEnum.MAX_COUNT.getStatus());
|
retryTask.setRetryStatus(RetryStatusEnum.MAX_COUNT.getStatus());
|
||||||
// 创建一个回调任务
|
// 创建一个回调任务
|
||||||
callbackRetryTaskHandler.create(retryTask);
|
callbackRetryTaskHandler.create(retryTask);
|
||||||
|
} else {
|
||||||
|
// TODO 计算延迟的时间 此处需要判断符合条件的才会进入时间轮
|
||||||
|
LocalDateTime nextTriggerAt = retryTask.getNextTriggerAt();
|
||||||
|
long delay = nextTriggerAt.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - System.currentTimeMillis();
|
||||||
|
log.info("准确进入时间轮 {} {}", nextTriggerAt, delay);
|
||||||
|
TimerWheelHandler.register(retryTask.getGroupName(), retryTask.getUniqueId(), timerTask, delay, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO 计算延迟的时间 此处需要判断符合条件的才会进入时间轮
|
|
||||||
LocalDateTime nextTriggerAt = retryTask.getNextTriggerAt();
|
|
||||||
long delay = nextTriggerAt.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
|
|
||||||
TimerWheelHandler.register(retryTask.getGroupName(), retryTask.getUniqueId(), timerTask, delay, TimeUnit.MILLISECONDS);
|
|
||||||
|
|
||||||
retryTask.setUpdateDt(LocalDateTime.now());
|
retryTask.setUpdateDt(LocalDateTime.now());
|
||||||
Assert.isTrue(1 == accessTemplate.getRetryTaskAccess()
|
Assert.isTrue(1 == accessTemplate.getRetryTaskAccess()
|
||||||
.updateById(retryTask.getGroupName(), retryTask),
|
.updateById(retryTask.getGroupName(), retryTask),
|
||||||
() -> new EasyRetryServerException("更新重试任务失败. groupName:[{}] uniqueId:[{}]",
|
() -> new EasyRetryServerException("更新重试任务失败. groupName:[{}] uniqueId:[{}]",
|
||||||
retryTask.getGroupName(), retryTask.getUniqueId()));
|
retryTask.getGroupName(), retryTask.getUniqueId()));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -1,36 +1,26 @@
|
|||||||
package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.scan;
|
package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.scan;
|
||||||
|
|
||||||
import akka.actor.AbstractActor;
|
import akka.actor.AbstractActor;
|
||||||
import akka.actor.ActorRef;
|
|
||||||
import cn.hutool.core.lang.Pair;
|
|
||||||
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
|
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
|
||||||
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||||
import com.aizuda.easy.retry.server.common.config.SystemProperties;
|
import com.aizuda.easy.retry.server.common.config.SystemProperties;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy;
|
import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
|
|
||||||
import com.aizuda.easy.retry.server.common.dto.ScanTask;
|
import com.aizuda.easy.retry.server.common.dto.ScanTask;
|
||||||
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
|
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.RetryTimerTask;
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuator;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.TimerWheelHandler;
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuatorSceneEnum;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
|
|
||||||
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
|
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
||||||
import io.netty.util.HashedWheelTimer;
|
|
||||||
import io.netty.util.Timeout;
|
|
||||||
import io.netty.util.TimerTask;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
|
|
||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 数据扫描模板类
|
* 数据扫描模板类
|
||||||
@ -51,7 +41,8 @@ public abstract class AbstractScanGroup extends AbstractActor {
|
|||||||
protected AccessTemplate accessTemplate;
|
protected AccessTemplate accessTemplate;
|
||||||
@Autowired
|
@Autowired
|
||||||
protected ClientNodeAllocateHandler clientNodeAllocateHandler;
|
protected ClientNodeAllocateHandler clientNodeAllocateHandler;
|
||||||
|
@Autowired
|
||||||
|
protected List<TaskActuator> taskActuators;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Receive createReceive() {
|
public Receive createReceive() {
|
||||||
@ -75,7 +66,7 @@ public abstract class AbstractScanGroup extends AbstractActor {
|
|||||||
Long lastId = Optional.ofNullable(getLastId(groupName)).orElse(0L);
|
Long lastId = Optional.ofNullable(getLastId(groupName)).orElse(0L);
|
||||||
|
|
||||||
// 扫描当前Group 待处理的任务
|
// 扫描当前Group 待处理的任务
|
||||||
List<RetryTask> list = listAvailableTasks(groupName, lastAt, lastId, retryPullPageSize, getTaskType());
|
List<RetryTask> list = listAvailableTasks(groupName, lastAt, lastId, retryPullPageSize, taskActuatorScene().getScene());
|
||||||
|
|
||||||
if (!CollectionUtils.isEmpty(list)) {
|
if (!CollectionUtils.isEmpty(list)) {
|
||||||
|
|
||||||
@ -83,24 +74,10 @@ public abstract class AbstractScanGroup extends AbstractActor {
|
|||||||
putLastId(scanTask.getGroupName(), list.get(list.size() - 1).getId());
|
putLastId(scanTask.getGroupName(), list.get(list.size() - 1).getId());
|
||||||
|
|
||||||
for (RetryTask retryTask : list) {
|
for (RetryTask retryTask : list) {
|
||||||
|
for (TaskActuator taskActuator : taskActuators) {
|
||||||
// 重试次数累加
|
if (taskActuatorScene().getScene() == taskActuator.getTaskType().getScene()) {
|
||||||
retryCountIncrement(retryTask);
|
taskActuator.actuator(retryTask);
|
||||||
|
}
|
||||||
RetryContext retryContext = builderRetryContext(groupName, retryTask);
|
|
||||||
RetryExecutor executor = builderResultRetryExecutor(retryContext);
|
|
||||||
|
|
||||||
Pair<Boolean /*是否符合条件*/, StringBuilder/*描述信息*/> pair = executor.filter();
|
|
||||||
if (!pair.getKey()) {
|
|
||||||
log.warn("当前任务不满足执行条件. groupName:[{}] uniqueId:[{}], description:[{}]",
|
|
||||||
retryContext.getRetryTask().getGroupName(),
|
|
||||||
retryContext.getRetryTask().getUniqueId(), pair.getValue().toString());
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
Timeout timeout = TimerWheelHandler.taskConcurrentHashMap.get(retryTask.getGroupName().concat("_").concat(retryTask.getUniqueId()));
|
|
||||||
if (Objects.isNull(timeout)) {
|
|
||||||
productExecUnitActor(executor);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -116,32 +93,12 @@ public abstract class AbstractScanGroup extends AbstractActor {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract RetryContext builderRetryContext(String groupName, RetryTask retryTask);
|
protected abstract TaskActuatorSceneEnum taskActuatorScene();
|
||||||
|
|
||||||
protected abstract RetryExecutor builderResultRetryExecutor(RetryContext retryContext);
|
|
||||||
|
|
||||||
protected abstract Integer getTaskType();
|
|
||||||
|
|
||||||
protected abstract Long getLastId(String groupName);
|
protected abstract Long getLastId(String groupName);
|
||||||
|
|
||||||
protected abstract void putLastId(String groupName, Long lastId);
|
protected abstract void putLastId(String groupName, Long lastId);
|
||||||
|
|
||||||
private void retryCountIncrement(RetryTask retryTask) {
|
|
||||||
Integer retryCount = retryTask.getRetryCount();
|
|
||||||
retryTask.setRetryCount(++retryCount);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void productExecUnitActor(RetryExecutor retryExecutor) {
|
|
||||||
String groupIdHash = retryExecutor.getRetryContext().getRetryTask().getGroupName();
|
|
||||||
Long retryId = retryExecutor.getRetryContext().getRetryTask().getId();
|
|
||||||
idempotentStrategy.set(groupIdHash, retryId.intValue());
|
|
||||||
|
|
||||||
ActorRef actorRef = getActorRef();
|
|
||||||
actorRef.tell(retryExecutor, actorRef);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected abstract ActorRef getActorRef();
|
|
||||||
|
|
||||||
public List<RetryTask> listAvailableTasks(String groupName,
|
public List<RetryTask> listAvailableTasks(String groupName,
|
||||||
LocalDateTime lastAt,
|
LocalDateTime lastAt,
|
||||||
Long lastId,
|
Long lastId,
|
||||||
|
@ -7,6 +7,7 @@ import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
|
|||||||
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
|
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
|
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.context.CallbackRetryContext;
|
import com.aizuda.easy.retry.server.retry.task.support.context.CallbackRetryContext;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuatorSceneEnum;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryBuilder;
|
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryBuilder;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
|
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies;
|
import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies;
|
||||||
@ -41,34 +42,8 @@ public class ScanCallbackTaskActor extends AbstractScanGroup {
|
|||||||
private static final ConcurrentMap<String, Long> LAST_AT_MAP = new ConcurrentHashMap<>();
|
private static final ConcurrentMap<String, Long> LAST_AT_MAP = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected RetryContext builderRetryContext(final String groupName, final RetryTask retryTask) {
|
protected TaskActuatorSceneEnum taskActuatorScene() {
|
||||||
|
return TaskActuatorSceneEnum.AUTO_CALLBACK;
|
||||||
CallbackRetryContext<Result> retryContext = new CallbackRetryContext<>();
|
|
||||||
retryContext.setRetryTask(retryTask);
|
|
||||||
retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName));
|
|
||||||
retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName()));
|
|
||||||
return retryContext;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected RetryExecutor builderResultRetryExecutor(RetryContext retryContext) {
|
|
||||||
return RetryBuilder.<Result>newBuilder()
|
|
||||||
.withStopStrategy(StopStrategies.stopException())
|
|
||||||
.withStopStrategy(StopStrategies.stopResultStatus())
|
|
||||||
.withWaitStrategy(getWaitWaitStrategy())
|
|
||||||
.withFilterStrategy(FilterStrategies.triggerAtFilter())
|
|
||||||
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))
|
|
||||||
.withFilterStrategy(FilterStrategies.sceneBlackFilter())
|
|
||||||
.withFilterStrategy(FilterStrategies.checkAliveClientPodFilter())
|
|
||||||
.withFilterStrategy(FilterStrategies.rebalanceFilterStrategies())
|
|
||||||
.withFilterStrategy(FilterStrategies.rateLimiterFilter())
|
|
||||||
.withRetryContext(retryContext)
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Integer getTaskType() {
|
|
||||||
return TaskTypeEnum.CALLBACK.getType();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -86,9 +61,4 @@ public class ScanCallbackTaskActor extends AbstractScanGroup {
|
|||||||
return WaitStrategies.WaitStrategyEnum.getWaitStrategy(WaitStrategies.WaitStrategyEnum.FIXED.getBackOff());
|
return WaitStrategies.WaitStrategyEnum.getWaitStrategy(WaitStrategies.WaitStrategyEnum.FIXED.getBackOff());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ActorRef getActorRef() {
|
|
||||||
return ActorGenerator.execCallbackUnitActor();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,7 @@ import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
|
|||||||
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
|
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
|
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.context.MaxAttemptsPersistenceRetryContext;
|
import com.aizuda.easy.retry.server.retry.task.support.context.MaxAttemptsPersistenceRetryContext;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuatorSceneEnum;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryBuilder;
|
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryBuilder;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
|
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies;
|
import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies;
|
||||||
@ -41,36 +42,8 @@ public class ScanRetryTaskActor extends AbstractScanGroup {
|
|||||||
private static final ConcurrentMap<String, Long> LAST_AT_MAP = new ConcurrentHashMap<>();
|
private static final ConcurrentMap<String, Long> LAST_AT_MAP = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected RetryContext<Result<DispatchRetryResultDTO>> builderRetryContext(final String groupName,
|
protected TaskActuatorSceneEnum taskActuatorScene() {
|
||||||
final RetryTask retryTask) {
|
return TaskActuatorSceneEnum.AUTO_RETRY;
|
||||||
MaxAttemptsPersistenceRetryContext<Result<DispatchRetryResultDTO>> retryContext = new MaxAttemptsPersistenceRetryContext<>();
|
|
||||||
retryContext.setRetryTask(retryTask);
|
|
||||||
retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName));
|
|
||||||
retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName()));
|
|
||||||
return retryContext;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected RetryExecutor<Result<DispatchRetryResultDTO>> builderResultRetryExecutor(RetryContext retryContext) {
|
|
||||||
|
|
||||||
RetryTask retryTask = retryContext.getRetryTask();
|
|
||||||
return RetryBuilder.<Result<DispatchRetryResultDTO>>newBuilder()
|
|
||||||
.withStopStrategy(StopStrategies.stopException())
|
|
||||||
.withStopStrategy(StopStrategies.stopResultStatusCode())
|
|
||||||
.withWaitStrategy(getWaitWaitStrategy(retryTask.getGroupName(), retryTask.getSceneName()))
|
|
||||||
.withFilterStrategy(FilterStrategies.triggerAtFilter())
|
|
||||||
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))
|
|
||||||
.withFilterStrategy(FilterStrategies.sceneBlackFilter())
|
|
||||||
.withFilterStrategy(FilterStrategies.checkAliveClientPodFilter())
|
|
||||||
.withFilterStrategy(FilterStrategies.rebalanceFilterStrategies())
|
|
||||||
.withFilterStrategy(FilterStrategies.rateLimiterFilter())
|
|
||||||
.withRetryContext(retryContext)
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Integer getTaskType() {
|
|
||||||
return TaskTypeEnum.RETRY.getType();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -83,19 +56,4 @@ public class ScanRetryTaskActor extends AbstractScanGroup {
|
|||||||
LAST_AT_MAP.put(groupName, lastId);
|
LAST_AT_MAP.put(groupName, lastId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private WaitStrategy getWaitWaitStrategy(String groupName, String sceneName) {
|
|
||||||
|
|
||||||
SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(groupName, sceneName);
|
|
||||||
Integer backOff = sceneConfig.getBackOff();
|
|
||||||
|
|
||||||
return WaitStrategies.WaitStrategyEnum.getWaitStrategy(backOff);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ActorRef getActorRef() {
|
|
||||||
return ActorGenerator.execUnitActor();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,95 @@
|
|||||||
|
package com.aizuda.easy.retry.server.retry.task.support.dispatch.task;
|
||||||
|
|
||||||
|
import akka.actor.ActorRef;
|
||||||
|
import cn.hutool.core.lang.Pair;
|
||||||
|
import com.aizuda.easy.retry.server.common.config.SystemProperties;
|
||||||
|
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.TimerWheelHandler;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
||||||
|
import io.netty.util.Timeout;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.InitializingBean;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* @author www.byteblogs.com
|
||||||
|
* @date 2023-09-23 08:02:17
|
||||||
|
* @since 2.4.0
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public abstract class AbstractTaskActuator implements TaskActuator, InitializingBean {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
@Qualifier("bitSetIdempotentStrategyHandler")
|
||||||
|
protected IdempotentStrategy<String, Integer> idempotentStrategy;
|
||||||
|
@Autowired
|
||||||
|
protected SystemProperties systemProperties;
|
||||||
|
@Autowired
|
||||||
|
protected AccessTemplate accessTemplate;
|
||||||
|
@Autowired
|
||||||
|
protected ClientNodeAllocateHandler clientNodeAllocateHandler;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void actuator(RetryTask retryTask) {
|
||||||
|
// 重试次数累加
|
||||||
|
retryCountIncrement(retryTask);
|
||||||
|
|
||||||
|
RetryContext retryContext = builderRetryContext(retryTask.getGroupName(), retryTask);
|
||||||
|
RetryExecutor executor = builderResultRetryExecutor(retryContext);
|
||||||
|
|
||||||
|
if (!preCheck(retryContext, executor)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Timeout timeout = TimerWheelHandler.getTimeout(retryTask.getGroupName(), retryTask.getUniqueId());
|
||||||
|
if (Objects.isNull(timeout)) {
|
||||||
|
productExecUnitActor(executor);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean preCheck(RetryContext retryContext, RetryExecutor executor) {
|
||||||
|
Pair<Boolean /*是否符合条件*/, StringBuilder/*描述信息*/> pair = executor.filter();
|
||||||
|
if (!pair.getKey()) {
|
||||||
|
log.warn("当前任务不满足执行条件. groupName:[{}] uniqueId:[{}], description:[{}]",
|
||||||
|
retryContext.getRetryTask().getGroupName(),
|
||||||
|
retryContext.getRetryTask().getUniqueId(), pair.getValue().toString());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void retryCountIncrement(RetryTask retryTask) {
|
||||||
|
Integer retryCount = retryTask.getRetryCount();
|
||||||
|
retryTask.setRetryCount(++retryCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void productExecUnitActor(RetryExecutor retryExecutor) {
|
||||||
|
String groupIdHash = retryExecutor.getRetryContext().getRetryTask().getGroupName();
|
||||||
|
Long retryId = retryExecutor.getRetryContext().getRetryTask().getId();
|
||||||
|
idempotentStrategy.set(groupIdHash, retryId.intValue());
|
||||||
|
|
||||||
|
ActorRef actorRef = getActorRef();
|
||||||
|
actorRef.tell(retryExecutor, actorRef);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract RetryContext builderRetryContext(String groupName, RetryTask retryTask);
|
||||||
|
|
||||||
|
protected abstract RetryExecutor builderResultRetryExecutor(RetryContext retryContext);
|
||||||
|
|
||||||
|
protected abstract ActorRef getActorRef();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterPropertiesSet() throws Exception {
|
||||||
|
TaskActuatorFactory.register(this.getTaskType(), this);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,69 @@
|
|||||||
|
package com.aizuda.easy.retry.server.retry.task.support.dispatch.task;
|
||||||
|
|
||||||
|
import akka.actor.ActorRef;
|
||||||
|
import com.aizuda.easy.retry.common.core.model.Result;
|
||||||
|
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||||
|
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.context.CallbackRetryContext;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryBuilder;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.strategy.StopStrategies;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 回调任务执行器
|
||||||
|
*
|
||||||
|
* @author www.byteblogs.com
|
||||||
|
* @date 2023-09-23 08:03:07
|
||||||
|
* @since 2.4.0
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class CallbackTaskActuator extends AbstractTaskActuator {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected RetryContext builderRetryContext(final String groupName, final RetryTask retryTask) {
|
||||||
|
|
||||||
|
CallbackRetryContext<Result> retryContext = new CallbackRetryContext<>();
|
||||||
|
retryContext.setRetryTask(retryTask);
|
||||||
|
retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName));
|
||||||
|
retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName()));
|
||||||
|
return retryContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected RetryExecutor builderResultRetryExecutor(RetryContext retryContext) {
|
||||||
|
return RetryBuilder.<Result>newBuilder()
|
||||||
|
.withStopStrategy(StopStrategies.stopException())
|
||||||
|
.withStopStrategy(StopStrategies.stopResultStatus())
|
||||||
|
.withWaitStrategy(getWaitWaitStrategy())
|
||||||
|
.withFilterStrategy(FilterStrategies.triggerAtFilter())
|
||||||
|
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))
|
||||||
|
.withFilterStrategy(FilterStrategies.sceneBlackFilter())
|
||||||
|
.withFilterStrategy(FilterStrategies.checkAliveClientPodFilter())
|
||||||
|
.withFilterStrategy(FilterStrategies.rebalanceFilterStrategies())
|
||||||
|
.withFilterStrategy(FilterStrategies.rateLimiterFilter())
|
||||||
|
.withRetryContext(retryContext)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TaskActuatorSceneEnum getTaskType() {
|
||||||
|
return TaskActuatorSceneEnum.AUTO_CALLBACK;
|
||||||
|
}
|
||||||
|
|
||||||
|
private WaitStrategy getWaitWaitStrategy() {
|
||||||
|
// 回调失败每15min重试一次
|
||||||
|
return WaitStrategies.WaitStrategyEnum.getWaitStrategy(WaitStrategies.WaitStrategyEnum.FIXED.getBackOff());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ActorRef getActorRef() {
|
||||||
|
return ActorGenerator.execCallbackUnitActor();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,78 @@
|
|||||||
|
package com.aizuda.easy.retry.server.retry.task.support.dispatch.task;
|
||||||
|
|
||||||
|
import akka.actor.ActorRef;
|
||||||
|
import cn.hutool.core.lang.Assert;
|
||||||
|
import cn.hutool.core.lang.Pair;
|
||||||
|
import com.aizuda.easy.retry.common.core.model.Result;
|
||||||
|
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||||
|
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.context.CallbackRetryContext;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryBuilder;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.strategy.StopStrategies;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 回调任务执行器
|
||||||
|
*
|
||||||
|
* @author www.byteblogs.com
|
||||||
|
* @date 2023-09-23 08:03:07
|
||||||
|
* @since 2.4.0
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class ManualCallbackTaskActuator extends AbstractTaskActuator {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected RetryContext builderRetryContext(final String groupName, final RetryTask retryTask) {
|
||||||
|
|
||||||
|
CallbackRetryContext<Result> retryContext = new CallbackRetryContext<>();
|
||||||
|
retryContext.setRetryTask(retryTask);
|
||||||
|
retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName));
|
||||||
|
retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName()));
|
||||||
|
return retryContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected RetryExecutor builderResultRetryExecutor(RetryContext retryContext) {
|
||||||
|
return RetryBuilder.<Result>newBuilder()
|
||||||
|
.withStopStrategy(StopStrategies.stopException())
|
||||||
|
.withStopStrategy(StopStrategies.stopResultStatus())
|
||||||
|
.withWaitStrategy(getWaitWaitStrategy())
|
||||||
|
.withFilterStrategy(FilterStrategies.triggerAtFilter())
|
||||||
|
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))
|
||||||
|
.withFilterStrategy(FilterStrategies.sceneBlackFilter())
|
||||||
|
.withFilterStrategy(FilterStrategies.checkAliveClientPodFilter())
|
||||||
|
.withFilterStrategy(FilterStrategies.rebalanceFilterStrategies())
|
||||||
|
.withFilterStrategy(FilterStrategies.rateLimiterFilter())
|
||||||
|
.withRetryContext(retryContext)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean preCheck(RetryContext retryContext, RetryExecutor executor) {
|
||||||
|
Pair<Boolean, StringBuilder> pair = executor.filter();
|
||||||
|
Assert.isTrue(pair.getKey(), () -> new EasyRetryServerException(pair.getValue().toString()));
|
||||||
|
return pair.getKey();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TaskActuatorSceneEnum getTaskType() {
|
||||||
|
return TaskActuatorSceneEnum.MANUAL_CALLBACK;
|
||||||
|
}
|
||||||
|
|
||||||
|
private WaitStrategy getWaitWaitStrategy() {
|
||||||
|
// 回调失败每15min重试一次
|
||||||
|
return WaitStrategies.WaitStrategyEnum.getWaitStrategy(WaitStrategies.WaitStrategyEnum.FIXED.getBackOff());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ActorRef getActorRef() {
|
||||||
|
return ActorGenerator.execCallbackUnitActor();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,84 @@
|
|||||||
|
package com.aizuda.easy.retry.server.retry.task.support.dispatch.task;
|
||||||
|
|
||||||
|
import akka.actor.ActorRef;
|
||||||
|
import cn.hutool.core.lang.Assert;
|
||||||
|
import cn.hutool.core.lang.Pair;
|
||||||
|
import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO;
|
||||||
|
import com.aizuda.easy.retry.common.core.model.Result;
|
||||||
|
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||||
|
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.context.MaxAttemptsPersistenceRetryContext;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryBuilder;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.strategy.StopStrategies;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 重试任务执行器
|
||||||
|
*
|
||||||
|
* @author www.byteblogs.com
|
||||||
|
* @date 2023-09-23 08:03:07
|
||||||
|
* @since 2.4.0
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class ManualRetryTaskActuator extends AbstractTaskActuator {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected RetryContext<Result<DispatchRetryResultDTO>> builderRetryContext(final String groupName,
|
||||||
|
final RetryTask retryTask) {
|
||||||
|
MaxAttemptsPersistenceRetryContext<Result<DispatchRetryResultDTO>> retryContext = new MaxAttemptsPersistenceRetryContext<>();
|
||||||
|
retryContext.setRetryTask(retryTask);
|
||||||
|
retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName));
|
||||||
|
retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName()));
|
||||||
|
return retryContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected RetryExecutor<Result<DispatchRetryResultDTO>> builderResultRetryExecutor(RetryContext retryContext) {
|
||||||
|
|
||||||
|
RetryTask retryTask = retryContext.getRetryTask();
|
||||||
|
return RetryBuilder.<Result>newBuilder()
|
||||||
|
.withStopStrategy(StopStrategies.stopException())
|
||||||
|
.withStopStrategy(StopStrategies.stopResultStatusCode())
|
||||||
|
.withWaitStrategy(getWaitWaitStrategy(retryTask.getGroupName(), retryTask.getSceneName()))
|
||||||
|
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))
|
||||||
|
.withFilterStrategy(FilterStrategies.checkAliveClientPodFilter())
|
||||||
|
.withFilterStrategy(FilterStrategies.rebalanceFilterStrategies())
|
||||||
|
.withFilterStrategy(FilterStrategies.rateLimiterFilter())
|
||||||
|
.withRetryContext(retryContext)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TaskActuatorSceneEnum getTaskType() {
|
||||||
|
return TaskActuatorSceneEnum.MANUAL_RETRY;
|
||||||
|
}
|
||||||
|
|
||||||
|
private WaitStrategy getWaitWaitStrategy(String groupName, String sceneName) {
|
||||||
|
|
||||||
|
SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(groupName, sceneName);
|
||||||
|
Integer backOff = sceneConfig.getBackOff();
|
||||||
|
|
||||||
|
return WaitStrategies.WaitStrategyEnum.getWaitStrategy(backOff);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean preCheck(RetryContext retryContext, RetryExecutor executor) {
|
||||||
|
Pair<Boolean, StringBuilder> pair = executor.filter();
|
||||||
|
Assert.isTrue(pair.getKey(), () -> new EasyRetryServerException(pair.getValue().toString()));
|
||||||
|
return pair.getKey();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ActorRef getActorRef() {
|
||||||
|
return ActorGenerator.execUnitActor();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,76 @@
|
|||||||
|
package com.aizuda.easy.retry.server.retry.task.support.dispatch.task;
|
||||||
|
|
||||||
|
import akka.actor.ActorRef;
|
||||||
|
import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO;
|
||||||
|
import com.aizuda.easy.retry.common.core.model.Result;
|
||||||
|
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||||
|
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.context.MaxAttemptsPersistenceRetryContext;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryBuilder;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.strategy.StopStrategies;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 重试任务执行器
|
||||||
|
*
|
||||||
|
* @author www.byteblogs.com
|
||||||
|
* @date 2023-09-23 08:03:07
|
||||||
|
* @since 2.4.0
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class RetryTaskActuator extends AbstractTaskActuator {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected RetryContext<Result<DispatchRetryResultDTO>> builderRetryContext(final String groupName,
|
||||||
|
final RetryTask retryTask) {
|
||||||
|
MaxAttemptsPersistenceRetryContext<Result<DispatchRetryResultDTO>> retryContext = new MaxAttemptsPersistenceRetryContext<>();
|
||||||
|
retryContext.setRetryTask(retryTask);
|
||||||
|
retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName));
|
||||||
|
retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName()));
|
||||||
|
return retryContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected RetryExecutor<Result<DispatchRetryResultDTO>> builderResultRetryExecutor(RetryContext retryContext) {
|
||||||
|
|
||||||
|
RetryTask retryTask = retryContext.getRetryTask();
|
||||||
|
return RetryBuilder.<Result<DispatchRetryResultDTO>>newBuilder()
|
||||||
|
.withStopStrategy(StopStrategies.stopException())
|
||||||
|
.withStopStrategy(StopStrategies.stopResultStatusCode())
|
||||||
|
.withWaitStrategy(getWaitWaitStrategy(retryTask.getGroupName(), retryTask.getSceneName()))
|
||||||
|
.withFilterStrategy(FilterStrategies.triggerAtFilter())
|
||||||
|
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))
|
||||||
|
.withFilterStrategy(FilterStrategies.sceneBlackFilter())
|
||||||
|
.withFilterStrategy(FilterStrategies.checkAliveClientPodFilter())
|
||||||
|
.withFilterStrategy(FilterStrategies.rebalanceFilterStrategies())
|
||||||
|
.withFilterStrategy(FilterStrategies.rateLimiterFilter())
|
||||||
|
.withRetryContext(retryContext)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TaskActuatorSceneEnum getTaskType() {
|
||||||
|
return TaskActuatorSceneEnum.AUTO_RETRY;
|
||||||
|
}
|
||||||
|
|
||||||
|
private WaitStrategy getWaitWaitStrategy(String groupName, String sceneName) {
|
||||||
|
|
||||||
|
SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(groupName, sceneName);
|
||||||
|
Integer backOff = sceneConfig.getBackOff();
|
||||||
|
|
||||||
|
return WaitStrategies.WaitStrategyEnum.getWaitStrategy(backOff);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ActorRef getActorRef() {
|
||||||
|
return ActorGenerator.execUnitActor();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,16 @@
|
|||||||
|
package com.aizuda.easy.retry.server.retry.task.support.dispatch.task;
|
||||||
|
|
||||||
|
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author www.byteblogs.com
|
||||||
|
* @date 2023-09-23 08:01:38
|
||||||
|
* @since 2.4.0
|
||||||
|
*/
|
||||||
|
public interface TaskActuator {
|
||||||
|
|
||||||
|
TaskActuatorSceneEnum getTaskType();
|
||||||
|
|
||||||
|
void actuator(RetryTask retryTask);
|
||||||
|
}
|
@ -0,0 +1,25 @@
|
|||||||
|
package com.aizuda.easy.retry.server.retry.task.support.dispatch.task;
|
||||||
|
|
||||||
|
import com.aizuda.easy.retry.template.datasource.access.Access;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author www.byteblogs.com
|
||||||
|
* @date 2023-09-23 09:16:23
|
||||||
|
* @since 2.4.0
|
||||||
|
*/
|
||||||
|
public class TaskActuatorFactory {
|
||||||
|
|
||||||
|
private static final Map<TaskActuatorSceneEnum, TaskActuator> REGISTER_TASK_ACTUATOR = new HashMap<>();
|
||||||
|
|
||||||
|
protected static void register(TaskActuatorSceneEnum scene,TaskActuator taskActuator) {
|
||||||
|
REGISTER_TASK_ACTUATOR.put(scene, taskActuator);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static TaskActuator getTaskActuator(TaskActuatorSceneEnum scene) {
|
||||||
|
return REGISTER_TASK_ACTUATOR.get(scene);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,24 @@
|
|||||||
|
package com.aizuda.easy.retry.server.retry.task.support.dispatch.task;
|
||||||
|
|
||||||
|
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Getter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author www.byteblogs.com
|
||||||
|
* @date 2023-09-23 08:49:21
|
||||||
|
* @since 2.4.0
|
||||||
|
*/
|
||||||
|
@AllArgsConstructor
|
||||||
|
@Getter
|
||||||
|
public enum TaskActuatorSceneEnum {
|
||||||
|
AUTO_RETRY(1, TaskTypeEnum.RETRY),
|
||||||
|
MANUAL_RETRY(2, TaskTypeEnum.RETRY),
|
||||||
|
AUTO_CALLBACK(3, TaskTypeEnum.CALLBACK),
|
||||||
|
MANUAL_CALLBACK(4, TaskTypeEnum.CALLBACK);
|
||||||
|
|
||||||
|
private final int scene;
|
||||||
|
private final TaskTypeEnum taskType;
|
||||||
|
|
||||||
|
|
||||||
|
}
|
@ -71,8 +71,8 @@ public class ConsumerBucketActor extends AbstractActor {
|
|||||||
.in(SceneConfig::getBucketIndex, consumerBucket.getBuckets())
|
.in(SceneConfig::getBucketIndex, consumerBucket.getBuckets())
|
||||||
.groupBy(SceneConfig::getGroupName)).stream().map(SceneConfig::getGroupName).collect(Collectors.toSet());
|
.groupBy(SceneConfig::getGroupName)).stream().map(SceneConfig::getGroupName).collect(Collectors.toSet());
|
||||||
|
|
||||||
CacheConsumerGroup.clear();
|
|
||||||
// todo 需要对groupNameSet进行状态过滤只有开启才进行任务调度
|
// todo 需要对groupNameSet进行状态过滤只有开启才进行任务调度
|
||||||
|
// todo 通过同步线程对集群中的当前节点需要处理的组进行同步
|
||||||
for (final String groupName : groupNameSet) {
|
for (final String groupName : groupNameSet) {
|
||||||
CacheConsumerGroup.addOrUpdate(groupName);
|
CacheConsumerGroup.addOrUpdate(groupName);
|
||||||
ScanTask scanTask = new ScanTask();
|
ScanTask scanTask = new ScanTask();
|
||||||
|
@ -5,13 +5,11 @@ import com.aizuda.easy.retry.common.core.log.LogUtils;
|
|||||||
import com.aizuda.easy.retry.server.common.Lifecycle;
|
import com.aizuda.easy.retry.server.common.Lifecycle;
|
||||||
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||||
import com.aizuda.easy.retry.server.common.dto.DistributeInstance;
|
import com.aizuda.easy.retry.server.common.dto.DistributeInstance;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.cache.CacheBucketActor;
|
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
@ -22,6 +22,8 @@ import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
|
|||||||
import com.aizuda.easy.retry.server.retry.task.support.context.CallbackRetryContext;
|
import com.aizuda.easy.retry.server.retry.task.support.context.CallbackRetryContext;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.context.MaxAttemptsPersistenceRetryContext;
|
import com.aizuda.easy.retry.server.retry.task.support.context.MaxAttemptsPersistenceRetryContext;
|
||||||
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
|
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuator;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuatorSceneEnum;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryBuilder;
|
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryBuilder;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
|
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies;
|
import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies;
|
||||||
@ -97,6 +99,8 @@ public class RetryTaskServiceImpl implements RetryTaskService {
|
|||||||
@Autowired
|
@Autowired
|
||||||
@Qualifier("bitSetIdempotentStrategyHandler")
|
@Qualifier("bitSetIdempotentStrategyHandler")
|
||||||
protected IdempotentStrategy<String, Integer> idempotentStrategy;
|
protected IdempotentStrategy<String, Integer> idempotentStrategy;
|
||||||
|
@Autowired
|
||||||
|
private List<TaskActuator> taskActuators;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PageResult<List<RetryTaskResponseVO>> getRetryTaskPage(RetryTaskQueryVO queryVO) {
|
public PageResult<List<RetryTaskResponseVO>> getRetryTaskPage(RetryTaskQueryVO queryVO) {
|
||||||
@ -327,29 +331,13 @@ public class RetryTaskServiceImpl implements RetryTaskService {
|
|||||||
.in(RetryTask::getUniqueId, uniqueIds));
|
.in(RetryTask::getUniqueId, uniqueIds));
|
||||||
Assert.notEmpty(list, () -> new EasyRetryServerException("没有可执行的任务"));
|
Assert.notEmpty(list, () -> new EasyRetryServerException("没有可执行的任务"));
|
||||||
|
|
||||||
|
|
||||||
for (RetryTask retryTask : list) {
|
for (RetryTask retryTask : list) {
|
||||||
MaxAttemptsPersistenceRetryContext<Result<DispatchRetryResultDTO>> retryContext = new MaxAttemptsPersistenceRetryContext<>();
|
for (TaskActuator taskActuator : taskActuators) {
|
||||||
retryContext.setRetryTask(retryTask);
|
if (taskActuator.getTaskType().getScene() == TaskActuatorSceneEnum.MANUAL_RETRY.getScene()) {
|
||||||
retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName));
|
taskActuator.actuator(retryTask);
|
||||||
retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName()));
|
}
|
||||||
|
}
|
||||||
retryCountIncrement(retryTask);
|
|
||||||
|
|
||||||
RetryExecutor<Result<DispatchRetryResultDTO>> executor = RetryBuilder.<Result<DispatchRetryResultDTO>>newBuilder()
|
|
||||||
.withStopStrategy(StopStrategies.stopException())
|
|
||||||
.withStopStrategy(StopStrategies.stopResultStatusCode())
|
|
||||||
.withWaitStrategy(getRetryTaskWaitWaitStrategy(retryTask.getGroupName(), retryTask.getSceneName()))
|
|
||||||
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))
|
|
||||||
.withFilterStrategy(FilterStrategies.checkAliveClientPodFilter())
|
|
||||||
.withFilterStrategy(FilterStrategies.rebalanceFilterStrategies())
|
|
||||||
.withFilterStrategy(FilterStrategies.rateLimiterFilter())
|
|
||||||
.withRetryContext(retryContext)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
Pair<Boolean, StringBuilder> pair = executor.filter();
|
|
||||||
Assert.isTrue(pair.getKey(), () -> new EasyRetryServerException(pair.getValue().toString()));
|
|
||||||
|
|
||||||
productExecUnitActor(executor, ActorGenerator.execUnitActor());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
@ -367,29 +355,12 @@ public class RetryTaskServiceImpl implements RetryTaskService {
|
|||||||
Assert.notEmpty(list, () -> new EasyRetryServerException("没有可执行的任务"));
|
Assert.notEmpty(list, () -> new EasyRetryServerException("没有可执行的任务"));
|
||||||
|
|
||||||
for (RetryTask retryTask : list) {
|
for (RetryTask retryTask : list) {
|
||||||
|
for (TaskActuator taskActuator : taskActuators) {
|
||||||
|
if (taskActuator.getTaskType().getScene() == TaskActuatorSceneEnum.MANUAL_CALLBACK.getScene()) {
|
||||||
|
taskActuator.actuator(retryTask);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
CallbackRetryContext<Result> retryContext = new CallbackRetryContext<>();
|
|
||||||
retryContext.setRetryTask(retryTask);
|
|
||||||
retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName));
|
|
||||||
retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName()));
|
|
||||||
|
|
||||||
retryCountIncrement(retryTask);
|
|
||||||
|
|
||||||
RetryExecutor<Result> executor = RetryBuilder.<Result>newBuilder()
|
|
||||||
.withStopStrategy(StopStrategies.stopException())
|
|
||||||
.withStopStrategy(StopStrategies.stopResultStatusCode())
|
|
||||||
.withWaitStrategy(getCallbackWaitWaitStrategy())
|
|
||||||
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))
|
|
||||||
.withFilterStrategy(FilterStrategies.checkAliveClientPodFilter())
|
|
||||||
.withFilterStrategy(FilterStrategies.rebalanceFilterStrategies())
|
|
||||||
.withFilterStrategy(FilterStrategies.rateLimiterFilter())
|
|
||||||
.withRetryContext(retryContext)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
Pair<Boolean, StringBuilder> pair = executor.filter();
|
|
||||||
Assert.isTrue(pair.getKey(), () -> new EasyRetryServerException(pair.getValue().toString()));
|
|
||||||
|
|
||||||
productExecUnitActor(executor, ActorGenerator.execCallbackUnitActor());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
Loading…
Reference in New Issue
Block a user