feat: 2.4.0
1. 完成对重试任务分发的优化 2. 修复客户端负载均衡节点相同的IP只返回同一个客户端
This commit is contained in:
parent
a5c4980464
commit
aedaeb25f3
@ -1,6 +1,7 @@
|
|||||||
package com.aizuda.easy.retry.server.common.allocate.client;
|
package com.aizuda.easy.retry.server.common.allocate.client;
|
||||||
|
|
||||||
import com.aizuda.easy.retry.server.common.ClientLoadBalance;
|
import com.aizuda.easy.retry.server.common.ClientLoadBalance;
|
||||||
|
import com.aizuda.easy.retry.server.common.allocate.client.ClientLoadBalanceManager.AllocationAlgorithmEnum;
|
||||||
|
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -53,6 +54,6 @@ public class ClientLoadBalanceLRU implements ClientLoadBalance {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int routeType() {
|
public int routeType() {
|
||||||
return 0;
|
return AllocationAlgorithmEnum.LRU.getType();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package com.aizuda.easy.retry.server.common.allocate.client;
|
package com.aizuda.easy.retry.server.common.allocate.client;
|
||||||
|
|
||||||
import com.aizuda.easy.retry.server.common.ClientLoadBalance;
|
import com.aizuda.easy.retry.server.common.ClientLoadBalance;
|
||||||
|
import com.aizuda.easy.retry.server.common.allocate.client.ClientLoadBalanceManager.AllocationAlgorithmEnum;
|
||||||
|
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
@ -20,7 +21,7 @@ public class ClientLoadBalanceRound implements ClientLoadBalance {
|
|||||||
public String route(final String allocKey, final TreeSet<String> clientAllAddressSet) {
|
public String route(final String allocKey, final TreeSet<String> clientAllAddressSet) {
|
||||||
String[] addressArr = clientAllAddressSet.toArray(new String[0]);
|
String[] addressArr = clientAllAddressSet.toArray(new String[0]);
|
||||||
AtomicInteger next = COUNTER.getOrDefault(allocKey, new AtomicInteger(1));
|
AtomicInteger next = COUNTER.getOrDefault(allocKey, new AtomicInteger(1));
|
||||||
String nextClientId = addressArr[ next.get() % clientAllAddressSet.size()];
|
String nextClientId = addressArr[next.get() % clientAllAddressSet.size()];
|
||||||
int nextIndex = next.incrementAndGet();
|
int nextIndex = next.incrementAndGet();
|
||||||
if (nextIndex > THRESHOLD) {
|
if (nextIndex > THRESHOLD) {
|
||||||
next = new AtomicInteger(1);
|
next = new AtomicInteger(1);
|
||||||
@ -32,6 +33,6 @@ public class ClientLoadBalanceRound implements ClientLoadBalance {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int routeType() {
|
public int routeType() {
|
||||||
return 0;
|
return AllocationAlgorithmEnum.ROUND.getType();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -45,10 +45,10 @@ public class ClientNodeAllocateHandler {
|
|||||||
|
|
||||||
ClientLoadBalance clientLoadBalanceRandom = ClientLoadBalanceManager.getClientLoadBalance(routeKey);
|
ClientLoadBalance clientLoadBalanceRandom = ClientLoadBalanceManager.getClientLoadBalance(routeKey);
|
||||||
|
|
||||||
String hostIp = clientLoadBalanceRandom.route(allocKey, new TreeSet<>(serverNodes.stream().map(RegisterNodeInfo::getHostIp).collect(Collectors.toSet())));
|
String hostId = clientLoadBalanceRandom.route(allocKey, new TreeSet<>(serverNodes.stream().map(RegisterNodeInfo::getHostId).collect(Collectors.toSet())));
|
||||||
|
|
||||||
Stream<RegisterNodeInfo> registerNodeInfoStream = serverNodes.stream()
|
Stream<RegisterNodeInfo> registerNodeInfoStream = serverNodes.stream()
|
||||||
.filter(s -> s.getHostIp().equals(hostIp));
|
.filter(s -> s.getHostId().equals(hostId));
|
||||||
return registerNodeInfoStream.findFirst().orElse(null);
|
return registerNodeInfoStream.findFirst().orElse(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,7 +48,6 @@ public class JobExecutorActor extends AbstractActor {
|
|||||||
JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job);
|
JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job);
|
||||||
context.setTaskBatchId(taskExecute.getTaskBatchId());
|
context.setTaskBatchId(taskExecute.getTaskBatchId());
|
||||||
context.setJobId(job.getId());
|
context.setJobId(job.getId());
|
||||||
context.setTaskType(job.getTaskType());
|
|
||||||
jobExecutor.execute(context);
|
jobExecutor.execute(context);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -34,8 +34,6 @@ public class ClusterTaskGenerator extends AbstractJobTaskGenerator {
|
|||||||
protected ClientNodeAllocateHandler clientNodeAllocateHandler;
|
protected ClientNodeAllocateHandler clientNodeAllocateHandler;
|
||||||
@Autowired
|
@Autowired
|
||||||
private JobTaskMapper jobTaskMapper;
|
private JobTaskMapper jobTaskMapper;
|
||||||
@Autowired
|
|
||||||
private JobMapper jobMapper;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TaskTypeEnum getTaskInstanceType() {
|
public TaskTypeEnum getTaskInstanceType() {
|
||||||
|
@ -15,6 +15,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|||||||
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
@ -38,7 +39,7 @@ public class JobTaskBatchHandler {
|
|||||||
.eq(JobTask::getTaskBatchId, taskBatchId));
|
.eq(JobTask::getTaskBatchId, taskBatchId));
|
||||||
|
|
||||||
|
|
||||||
if (jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) {
|
if (CollectionUtils.isEmpty(jobTasks) || jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
long failCount = jobTasks.stream().filter(jobTask -> jobTask.getTaskStatus() == JobTaskBatchStatusEnum.FAIL.getStatus()).count();
|
long failCount = jobTasks.stream().filter(jobTask -> jobTask.getTaskStatus() == JobTaskBatchStatusEnum.FAIL.getStatus()).count();
|
||||||
|
@ -21,7 +21,7 @@ public class TimerIdempotent implements IdempotentStrategy<Long, Long> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Long get(Long s) {
|
public Long get(Long s) {
|
||||||
throw new UnsupportedOperationException("不支持此操作");
|
throw new UnsupportedOperationException("不支持此操作");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -31,7 +31,6 @@ public class TimerIdempotent implements IdempotentStrategy<Long, Long> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean clear(Long key, Long value) {
|
public boolean clear(Long key, Long value) {
|
||||||
cache.clear();
|
return cache.removeIf(l -> l.equals(key));
|
||||||
return Boolean.TRUE;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,8 @@ import com.aizuda.easy.retry.server.common.dto.PartitionTask;
|
|||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.EqualsAndHashCode;
|
import lombok.EqualsAndHashCode;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author www.byteblogs.com
|
* @author www.byteblogs.com
|
||||||
* @date 2023-10-25 22:23:24
|
* @date 2023-10-25 22:23:24
|
||||||
@ -12,4 +14,18 @@ import lombok.EqualsAndHashCode;
|
|||||||
@EqualsAndHashCode(callSuper = true)
|
@EqualsAndHashCode(callSuper = true)
|
||||||
@Data
|
@Data
|
||||||
public class RetryPartitionTask extends PartitionTask {
|
public class RetryPartitionTask extends PartitionTask {
|
||||||
|
|
||||||
|
private String uniqueId;
|
||||||
|
|
||||||
|
private String groupName;
|
||||||
|
|
||||||
|
private String sceneName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 下次触发时间
|
||||||
|
*/
|
||||||
|
private LocalDateTime nextTriggerAt;
|
||||||
|
|
||||||
|
private Integer retryCount;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
package com.aizuda.easy.retry.server.retry.task.support;
|
package com.aizuda.easy.retry.server.retry.task.support;
|
||||||
|
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyContext;
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -13,9 +15,9 @@ public interface WaitStrategy {
|
|||||||
/**
|
/**
|
||||||
* 计算下次重试触发时间
|
* 计算下次重试触发时间
|
||||||
*
|
*
|
||||||
* @param retryContext {@link RetryContext} 重试上下文
|
* @param waitStrategyContext {@link WaitStrategyContext} 重试上下文
|
||||||
* @return 下次触发时间
|
* @return 下次触发时间
|
||||||
*/
|
*/
|
||||||
LocalDateTime computeRetryTime(RetryContext retryContext);
|
LocalDateTime computeRetryTime(WaitStrategyContext waitStrategyContext);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -32,6 +32,7 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
|||||||
import org.springframework.context.annotation.Scope;
|
import org.springframework.context.annotation.Scope;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
|
||||||
@ -68,6 +69,8 @@ public class ExecCallbackUnitActor extends AbstractActor {
|
|||||||
retryTaskLog.setGroupName(retryTask.getGroupName());
|
retryTaskLog.setGroupName(retryTask.getGroupName());
|
||||||
retryTaskLog.setUniqueId(retryTask.getUniqueId());
|
retryTaskLog.setUniqueId(retryTask.getUniqueId());
|
||||||
retryTaskLog.setRetryStatus(retryTask.getRetryStatus());
|
retryTaskLog.setRetryStatus(retryTask.getRetryStatus());
|
||||||
|
retryTaskLog.setTriggerTime(LocalDateTime.now());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
if (Objects.nonNull(serverNode)) {
|
if (Objects.nonNull(serverNode)) {
|
||||||
|
@ -28,6 +28,7 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
|||||||
import org.springframework.context.annotation.Scope;
|
import org.springframework.context.annotation.Scope;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
|
||||||
@ -60,6 +61,7 @@ public class ExecUnitActor extends AbstractActor {
|
|||||||
retryTaskLog.setGroupName(retryTask.getGroupName());
|
retryTaskLog.setGroupName(retryTask.getGroupName());
|
||||||
retryTaskLog.setUniqueId(retryTask.getUniqueId());
|
retryTaskLog.setUniqueId(retryTask.getUniqueId());
|
||||||
retryTaskLog.setRetryStatus(retryTask.getRetryStatus());
|
retryTaskLog.setRetryStatus(retryTask.getRetryStatus());
|
||||||
|
retryTaskLog.setTriggerTime(LocalDateTime.now());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
|
@ -16,6 +16,7 @@ import org.springframework.context.annotation.Scope;
|
|||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理日志信息
|
* 处理日志信息
|
||||||
@ -69,7 +70,7 @@ public class LogActor extends AbstractActor {
|
|||||||
String errorMessage = retryTaskLogDTO.getMessage();
|
String errorMessage = retryTaskLogDTO.getMessage();
|
||||||
retryTaskLogMessage.setMessage(
|
retryTaskLogMessage.setMessage(
|
||||||
StrUtil.isBlank(errorMessage) ? StrUtil.EMPTY : errorMessage);
|
StrUtil.isBlank(errorMessage) ? StrUtil.EMPTY : errorMessage);
|
||||||
retryTaskLogMessage.setCreateDt(LocalDateTime.now());
|
retryTaskLogMessage.setCreateDt(Optional.ofNullable(retryTaskLogDTO.getTriggerTime()).orElse(LocalDateTime.now()));
|
||||||
retryTaskLogMessageMapper.insert(retryTaskLogMessage);
|
retryTaskLogMessageMapper.insert(retryTaskLogMessage);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,8 @@ package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log;
|
|||||||
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 日志上下文模型
|
* 日志上下文模型
|
||||||
*
|
*
|
||||||
@ -32,4 +34,6 @@ public class RetryTaskLogDTO {
|
|||||||
*/
|
*/
|
||||||
private Integer retryStatus;
|
private Integer retryStatus;
|
||||||
|
|
||||||
|
private LocalDateTime triggerTime;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -12,7 +12,7 @@ import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
|||||||
import com.aizuda.easy.retry.server.retry.task.support.timer.CallbackTimerTask;
|
import com.aizuda.easy.retry.server.retry.task.support.timer.CallbackTimerTask;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerContext;
|
import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerContext;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerTask;
|
import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerTask;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.timer.TimerWheelHandler;
|
import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerWheel;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO;
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum;
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.handler.CallbackRetryTaskHandler;
|
import com.aizuda.easy.retry.server.retry.task.support.handler.CallbackRetryTaskHandler;
|
||||||
@ -68,32 +68,17 @@ public class FailureActor extends AbstractActor {
|
|||||||
@Override
|
@Override
|
||||||
protected void doInTransactionWithoutResult(TransactionStatus status) {
|
protected void doInTransactionWithoutResult(TransactionStatus status) {
|
||||||
|
|
||||||
RetryTimerContext timerContext = new RetryTimerContext();
|
|
||||||
timerContext.setGroupName(retryTask.getGroupName());
|
|
||||||
timerContext.setUniqueId(retryTask.getUniqueId());
|
|
||||||
|
|
||||||
TimerTask timerTask = null;
|
|
||||||
Integer maxRetryCount;
|
Integer maxRetryCount;
|
||||||
if (TaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) {
|
if (TaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) {
|
||||||
maxRetryCount = systemProperties.getCallback().getMaxCount();
|
maxRetryCount = systemProperties.getCallback().getMaxCount();
|
||||||
timerTask = new CallbackTimerTask();
|
|
||||||
timerContext.setScene(TaskExecutorSceneEnum.AUTO_CALLBACK);
|
|
||||||
} else {
|
} else {
|
||||||
maxRetryCount = sceneConfig.getMaxRetryCount();
|
maxRetryCount = sceneConfig.getMaxRetryCount();
|
||||||
timerTask = new RetryTimerTask(timerContext);
|
|
||||||
timerContext.setScene(TaskExecutorSceneEnum.AUTO_RETRY);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (maxRetryCount <= retryTask.getRetryCount()) {
|
if (maxRetryCount <= retryTask.getRetryCount()) {
|
||||||
retryTask.setRetryStatus(RetryStatusEnum.MAX_COUNT.getStatus());
|
retryTask.setRetryStatus(RetryStatusEnum.MAX_COUNT.getStatus());
|
||||||
// 创建一个回调任务
|
// 创建一个回调任务
|
||||||
callbackRetryTaskHandler.create(retryTask);
|
callbackRetryTaskHandler.create(retryTask);
|
||||||
} else {
|
|
||||||
// TODO 计算延迟的时间 此处需要判断符合条件的才会进入时间轮
|
|
||||||
LocalDateTime nextTriggerAt = retryTask.getNextTriggerAt();
|
|
||||||
long delay = nextTriggerAt.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - System.currentTimeMillis();
|
|
||||||
log.info("准确进入时间轮 {} {}", nextTriggerAt, delay);
|
|
||||||
TimerWheelHandler.register(retryTask.getGroupName(), retryTask.getUniqueId(), timerTask, delay, TimeUnit.MILLISECONDS);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
retryTask.setUpdateDt(LocalDateTime.now());
|
retryTask.setUpdateDt(LocalDateTime.now());
|
||||||
|
@ -41,8 +41,6 @@ public class NoRetryActor extends AbstractActor {
|
|||||||
|
|
||||||
RetryContext retryContext = retryExecutor.getRetryContext();
|
RetryContext retryContext = retryExecutor.getRetryContext();
|
||||||
RetryTask retryTask = retryContext.getRetryTask();
|
RetryTask retryTask = retryContext.getRetryTask();
|
||||||
WaitStrategy waitStrategy = retryContext.getWaitStrategy();
|
|
||||||
retryTask.setNextTriggerAt(waitStrategy.computeRetryTime(retryContext));
|
|
||||||
|
|
||||||
// 不更新重试次数
|
// 不更新重试次数
|
||||||
retryTask.setRetryCount(null);
|
retryTask.setRetryCount(null);
|
||||||
|
@ -3,33 +3,35 @@ package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.scan;
|
|||||||
import akka.actor.AbstractActor;
|
import akka.actor.AbstractActor;
|
||||||
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
|
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
|
||||||
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||||
import com.aizuda.easy.retry.server.common.config.SystemProperties;
|
|
||||||
import com.aizuda.easy.retry.server.common.IdempotentStrategy;
|
import com.aizuda.easy.retry.server.common.IdempotentStrategy;
|
||||||
|
import com.aizuda.easy.retry.server.common.config.SystemProperties;
|
||||||
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
|
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
|
||||||
import com.aizuda.easy.retry.server.common.dto.ScanTask;
|
import com.aizuda.easy.retry.server.common.dto.ScanTask;
|
||||||
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
|
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
|
||||||
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
|
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
|
||||||
import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask;
|
import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter;
|
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
|
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutor;
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutor;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum;
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies;
|
import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerWheel;
|
||||||
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
|
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
import io.netty.util.TimerTask;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
|
import org.springframework.util.StopWatch;
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
|
import java.time.ZoneId;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 数据扫描模板类
|
* 数据扫描模板类
|
||||||
@ -53,83 +55,84 @@ public abstract class AbstractScanGroup extends AbstractActor {
|
|||||||
@Autowired
|
@Autowired
|
||||||
protected List<TaskExecutor> taskExecutors;
|
protected List<TaskExecutor> taskExecutors;
|
||||||
|
|
||||||
|
private static long preCostTime = 0L;
|
||||||
|
private static long loopCount = 1L;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Receive createReceive() {
|
public Receive createReceive() {
|
||||||
return receiveBuilder().match(ScanTask.class, config -> {
|
return receiveBuilder().match(ScanTask.class, config -> {
|
||||||
|
|
||||||
|
// 获取开始时间
|
||||||
|
long startTime = System.nanoTime();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
doScan(config);
|
doScan(config);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LogUtils.error(log, "Data scanner processing exception. [{}]", config, e);
|
LogUtils.error(log, "Data scanner processing exception. [{}]", config, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 获取结束时间
|
||||||
|
long endTime = System.nanoTime();
|
||||||
|
|
||||||
|
preCostTime = (endTime - startTime) / 1_000_000;
|
||||||
|
log.info("重试任务调度耗时:[{}]", preCostTime);
|
||||||
|
|
||||||
}).build();
|
}).build();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void doScan(final ScanTask scanTask) {
|
protected void doScan(final ScanTask scanTask) {
|
||||||
|
|
||||||
// LocalDateTime lastAt = LocalDateTime.now().minusDays(systemProperties.getLastDays());
|
// 计算循环拉取的次数
|
||||||
|
if (preCostTime > 0) {
|
||||||
|
loopCount = (10 * 1000) / preCostTime;
|
||||||
|
loopCount = loopCount == 0 ? 1 : loopCount;
|
||||||
|
}
|
||||||
|
|
||||||
String groupName = scanTask.getGroupName();
|
String groupName = scanTask.getGroupName();
|
||||||
Long lastId = Optional.ofNullable(getLastId(groupName)).orElse(0L);
|
Long lastId = Optional.ofNullable(getLastId(groupName)).orElse(0L);
|
||||||
int retryPullPageSize = systemProperties.getRetryPullPageSize();
|
|
||||||
|
|
||||||
|
|
||||||
AtomicInteger count = new AtomicInteger(0);
|
AtomicInteger count = new AtomicInteger(0);
|
||||||
long total = PartitionTaskUtils.process(startId -> {
|
PartitionTaskUtils.process(startId -> {
|
||||||
// 没10秒触发一次扫描任务,每次扫描N次
|
int i = count.getAndIncrement();
|
||||||
int i = count.incrementAndGet();
|
if (i > loopCount) {
|
||||||
// TODO 需要支持动态计算循环拉取多少次
|
// 为空则中断处理
|
||||||
if (i > 5) {
|
return Lists.newArrayList();
|
||||||
return Lists.newArrayList();
|
}
|
||||||
}
|
return listAvailableTasks(groupName, startId, taskActuatorScene().getTaskType().getType());
|
||||||
return listAvailableTasks(groupName, startId, taskActuatorScene().getTaskType().getType());
|
},
|
||||||
},
|
partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask), lastId);
|
||||||
partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask), lastId);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processRetryPartitionTasks(List<? extends PartitionTask> partitionTasks, ScanTask scanTask) {
|
private void processRetryPartitionTasks(List<? extends PartitionTask> partitionTasks, ScanTask scanTask) {
|
||||||
|
|
||||||
|
StopWatch watch = new StopWatch();
|
||||||
|
watch.start();
|
||||||
if (!CollectionUtils.isEmpty(partitionTasks)) {
|
if (!CollectionUtils.isEmpty(partitionTasks)) {
|
||||||
|
|
||||||
// TODO 更新拉取的最大的id
|
|
||||||
putLastId(scanTask.getGroupName(), partitionTasks.get(partitionTasks.size() - 1).getId());
|
putLastId(scanTask.getGroupName(), partitionTasks.get(partitionTasks.size() - 1).getId());
|
||||||
|
|
||||||
for (PartitionTask partitionTask : partitionTasks) {
|
for (PartitionTask partitionTask : partitionTasks) {
|
||||||
processRetryTask((RetryPartitionTask) partitionTask);
|
processRetryTask((RetryPartitionTask) partitionTask);
|
||||||
|
|
||||||
// 已经存在时间轮里面的任务由时间轮负责调度
|
|
||||||
// boolean existed = TimerWheelHandler.isExisted(retryTask.getGroupName(), retryTask.getUniqueId());
|
|
||||||
// if (existed) {
|
|
||||||
// continue;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// for (TaskExecutor taskExecutor : taskExecutors) {
|
|
||||||
// if (taskActuatorScene().getScene() == taskExecutor.getTaskType().getScene()) {
|
|
||||||
// taskExecutor.actuator(retryTask);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
// // 数据为空则休眠5s
|
|
||||||
// try {
|
|
||||||
// Thread.sleep((10 / 2) * 1000);
|
|
||||||
// } catch (InterruptedException e) {
|
|
||||||
// Thread.currentThread().interrupt();
|
|
||||||
// }
|
|
||||||
|
|
||||||
putLastId(scanTask.getGroupName(), 0L);
|
putLastId(scanTask.getGroupName(), 0L);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
watch.getTotalTimeMillis();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processRetryTask(RetryPartitionTask partitionTask) {
|
private void processRetryTask(RetryPartitionTask partitionTask) {
|
||||||
|
|
||||||
|
RetryTask retryTask = new RetryTask();
|
||||||
|
retryTask.setNextTriggerAt(calculateNextTriggerTime(partitionTask));
|
||||||
|
retryTask.setId(partitionTask.getId());
|
||||||
|
accessTemplate.getRetryTaskAccess().updateById(partitionTask.getGroupName(), retryTask);
|
||||||
|
|
||||||
// 更新触发时间, 任务进入时间轮
|
long delay = partitionTask.getNextTriggerAt().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()
|
||||||
// WaitStrategies.WaitStrategyEnum.getWaitStrategy(partitionTask)
|
- System.currentTimeMillis();
|
||||||
// waitStrategy.computeRetryTime(retryContext);
|
RetryTimerWheel.register(retryTask.getGroupName(), retryTask.getUniqueId(), timerTask(partitionTask), delay,
|
||||||
|
TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract TaskExecutorSceneEnum taskActuatorScene();
|
protected abstract TaskExecutorSceneEnum taskActuatorScene();
|
||||||
@ -138,17 +141,19 @@ public abstract class AbstractScanGroup extends AbstractActor {
|
|||||||
|
|
||||||
protected abstract void putLastId(String groupName, Long lastId);
|
protected abstract void putLastId(String groupName, Long lastId);
|
||||||
|
|
||||||
|
protected abstract LocalDateTime calculateNextTriggerTime(RetryPartitionTask partitionTask);
|
||||||
|
|
||||||
|
protected abstract TimerTask timerTask(RetryPartitionTask partitionTask);
|
||||||
|
|
||||||
public List<RetryPartitionTask> listAvailableTasks(String groupName, Long lastId, Integer taskType) {
|
public List<RetryPartitionTask> listAvailableTasks(String groupName, Long lastId, Integer taskType) {
|
||||||
List<RetryTask> retryTasks = accessTemplate.getRetryTaskAccess().listPage(groupName, new PageDTO<>(0, systemProperties.getRetryPullPageSize()),
|
List<RetryTask> retryTasks = accessTemplate.getRetryTaskAccess()
|
||||||
|
.listPage(groupName, new PageDTO<>(0, systemProperties.getRetryPullPageSize()),
|
||||||
new LambdaQueryWrapper<RetryTask>()
|
new LambdaQueryWrapper<RetryTask>()
|
||||||
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())
|
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())
|
||||||
.eq(RetryTask::getGroupName, groupName).eq(RetryTask::getTaskType, taskType)
|
.eq(RetryTask::getGroupName, groupName).eq(RetryTask::getTaskType, taskType)
|
||||||
// TODO 提前10秒把需要执行的任务拉取出来
|
.le(RetryTask::getNextTriggerAt, LocalDateTime.now().plusSeconds(10)).gt(RetryTask::getId, lastId)
|
||||||
.le(RetryTask::getNextTriggerAt, LocalDateTime.now().plusSeconds(10)).gt(RetryTask::getId, lastId)
|
.orderByAsc(RetryTask::getId))
|
||||||
// TODO 验证一下lastAt会不会改变
|
.getRecords();
|
||||||
// .gt(RetryTask::getCreateDt, lastAt)
|
|
||||||
.orderByAsc(RetryTask::getId))
|
|
||||||
.getRecords();
|
|
||||||
|
|
||||||
return RetryTaskConverter.INSTANCE.toRetryPartitionTasks(retryTasks);
|
return RetryTaskConverter.INSTANCE.toRetryPartitionTasks(retryTasks);
|
||||||
}
|
}
|
||||||
|
@ -1,14 +1,21 @@
|
|||||||
package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.scan;
|
package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.scan;
|
||||||
|
|
||||||
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
|
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum;
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies;
|
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyContext;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyEnum;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.timer.CallbackTimerTask;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerContext;
|
||||||
|
import io.netty.util.TimerTask;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||||
import org.springframework.context.annotation.Scope;
|
import org.springframework.context.annotation.Scope;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
@ -22,8 +29,6 @@ import java.util.concurrent.ConcurrentMap;
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
public class ScanCallbackTaskActor extends AbstractScanGroup {
|
public class ScanCallbackTaskActor extends AbstractScanGroup {
|
||||||
|
|
||||||
public static final String BEAN_NAME = "ScanCallbackTaskActor";
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 缓存待拉取数据的起点id
|
* 缓存待拉取数据的起点id
|
||||||
* <p>
|
* <p>
|
||||||
@ -46,9 +51,28 @@ public class ScanCallbackTaskActor extends AbstractScanGroup {
|
|||||||
LAST_AT_MAP.put(groupName, lastId);
|
LAST_AT_MAP.put(groupName, lastId);
|
||||||
}
|
}
|
||||||
|
|
||||||
private WaitStrategy getWaitWaitStrategy() {
|
@Override
|
||||||
// 回调失败每15min重试一次
|
protected LocalDateTime calculateNextTriggerTime(final RetryPartitionTask partitionTask) {
|
||||||
return WaitStrategies.WaitStrategyEnum.getWaitStrategy(WaitStrategies.WaitStrategyEnum.FIXED.getBackOff());
|
|
||||||
|
long triggerInterval = systemProperties.getCallback().getTriggerInterval();
|
||||||
|
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(WaitStrategyEnum.FIXED.getBackOff());
|
||||||
|
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
|
||||||
|
waitStrategyContext.setNextTriggerAt(partitionTask.getNextTriggerAt());
|
||||||
|
waitStrategyContext.setTriggerInterval(String.valueOf(triggerInterval));
|
||||||
|
|
||||||
|
// 更新触发时间, 任务进入时间轮
|
||||||
|
return waitStrategy.computeRetryTime(waitStrategyContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected TimerTask timerTask(final RetryPartitionTask partitionTask) {
|
||||||
|
RetryTimerContext retryTimerContext = new RetryTimerContext();
|
||||||
|
retryTimerContext.setGroupName(partitionTask.getGroupName());
|
||||||
|
retryTimerContext.setScene(taskActuatorScene());
|
||||||
|
retryTimerContext.setUniqueId(partitionTask.getUniqueId());
|
||||||
|
|
||||||
|
return new CallbackTimerTask(retryTimerContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,12 +1,21 @@
|
|||||||
package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.scan;
|
package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.scan;
|
||||||
|
|
||||||
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum;
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyContext;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies.WaitStrategyEnum;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerContext;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerTask;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
|
||||||
|
import io.netty.util.TimerTask;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||||
import org.springframework.context.annotation.Scope;
|
import org.springframework.context.annotation.Scope;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
@ -42,4 +51,26 @@ public class ScanRetryTaskActor extends AbstractScanGroup {
|
|||||||
LAST_AT_MAP.put(groupName, lastId);
|
LAST_AT_MAP.put(groupName, lastId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected LocalDateTime calculateNextTriggerTime(RetryPartitionTask partitionTask) {
|
||||||
|
// 更新下次触发时间
|
||||||
|
SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess()
|
||||||
|
.getSceneConfigByGroupNameAndSceneName(partitionTask.getGroupName(), partitionTask.getSceneName());
|
||||||
|
|
||||||
|
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
|
||||||
|
waitStrategyContext.setNextTriggerAt(partitionTask.getNextTriggerAt());
|
||||||
|
waitStrategyContext.setTriggerInterval(sceneConfig.getTriggerInterval());
|
||||||
|
waitStrategyContext.setTriggerCount(partitionTask.getRetryCount() + 1);
|
||||||
|
// 更新触发时间, 任务进入时间轮
|
||||||
|
WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(sceneConfig.getBackOff());
|
||||||
|
return waitStrategy.computeRetryTime(waitStrategyContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected TimerTask timerTask(final RetryPartitionTask partitionTask) {
|
||||||
|
RetryTimerContext retryTimerContext = new RetryTimerContext();
|
||||||
|
retryTimerContext.setGroupName(partitionTask.getGroupName());
|
||||||
|
retryTimerContext.setScene(taskActuatorScene());
|
||||||
|
retryTimerContext.setUniqueId(partitionTask.getUniqueId());
|
||||||
|
return new RetryTimerTask(retryTimerContext);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -45,7 +45,7 @@ public class CallbackTaskExecutor extends AbstractTaskExecutor {
|
|||||||
.withStopStrategy(StopStrategies.stopException())
|
.withStopStrategy(StopStrategies.stopException())
|
||||||
.withStopStrategy(StopStrategies.stopResultStatus())
|
.withStopStrategy(StopStrategies.stopResultStatus())
|
||||||
.withWaitStrategy(getWaitWaitStrategy())
|
.withWaitStrategy(getWaitWaitStrategy())
|
||||||
.withFilterStrategy(FilterStrategies.triggerAtFilter())
|
// .withFilterStrategy(FilterStrategies.triggerAtFilter())
|
||||||
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))
|
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))
|
||||||
.withFilterStrategy(FilterStrategies.sceneBlackFilter())
|
.withFilterStrategy(FilterStrategies.sceneBlackFilter())
|
||||||
.withFilterStrategy(FilterStrategies.checkAliveClientPodFilter())
|
.withFilterStrategy(FilterStrategies.checkAliveClientPodFilter())
|
||||||
|
@ -48,7 +48,6 @@ public class ManualCallbackTaskExecutor extends AbstractTaskExecutor {
|
|||||||
.withStopStrategy(StopStrategies.stopException())
|
.withStopStrategy(StopStrategies.stopException())
|
||||||
.withStopStrategy(StopStrategies.stopResultStatus())
|
.withStopStrategy(StopStrategies.stopResultStatus())
|
||||||
.withWaitStrategy(getWaitWaitStrategy())
|
.withWaitStrategy(getWaitWaitStrategy())
|
||||||
.withFilterStrategy(FilterStrategies.triggerAtFilter())
|
|
||||||
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))
|
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))
|
||||||
.withFilterStrategy(FilterStrategies.sceneBlackFilter())
|
.withFilterStrategy(FilterStrategies.sceneBlackFilter())
|
||||||
.withFilterStrategy(FilterStrategies.checkAliveClientPodFilter())
|
.withFilterStrategy(FilterStrategies.checkAliveClientPodFilter())
|
||||||
|
@ -48,7 +48,7 @@ public class RetryTaskExecutor extends AbstractTaskExecutor {
|
|||||||
.withStopStrategy(StopStrategies.stopException())
|
.withStopStrategy(StopStrategies.stopException())
|
||||||
.withStopStrategy(StopStrategies.stopResultStatusCode())
|
.withStopStrategy(StopStrategies.stopResultStatusCode())
|
||||||
.withWaitStrategy(getWaitWaitStrategy(sceneConfig))
|
.withWaitStrategy(getWaitWaitStrategy(sceneConfig))
|
||||||
.withFilterStrategy(FilterStrategies.triggerAtFilter())
|
// .withFilterStrategy(FilterStrategies.triggerAtFilter())
|
||||||
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))
|
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))
|
||||||
.withFilterStrategy(FilterStrategies.sceneBlackFilter())
|
.withFilterStrategy(FilterStrategies.sceneBlackFilter())
|
||||||
.withFilterStrategy(FilterStrategies.checkAliveClientPodFilter())
|
.withFilterStrategy(FilterStrategies.checkAliveClientPodFilter())
|
||||||
|
@ -0,0 +1,37 @@
|
|||||||
|
package com.aizuda.easy.retry.server.retry.task.support.idempotent;
|
||||||
|
|
||||||
|
import cn.hutool.core.util.StrUtil;
|
||||||
|
import com.aizuda.easy.retry.server.common.IdempotentStrategy;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author www.byteblogs.com
|
||||||
|
* @date 2023-10-19 21:54:57
|
||||||
|
* @since 2.4.0
|
||||||
|
*/
|
||||||
|
public class TimerIdempotent implements IdempotentStrategy<String, String> {
|
||||||
|
|
||||||
|
private static final Set<String> cache = new HashSet<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean set(String key, String value) {
|
||||||
|
return cache.add(key.concat(StrUtil.UNDERLINE).concat(String.valueOf(value)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String get(String s) {
|
||||||
|
throw new UnsupportedOperationException("不支持此操作");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isExist(String key, String value) {
|
||||||
|
return cache.contains(key.concat(StrUtil.UNDERLINE).concat(String.valueOf(value)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean clear(String key, String value) {
|
||||||
|
return cache.removeIf(s-> s.equals(key.concat(StrUtil.UNDERLINE).concat(String.valueOf(value))));
|
||||||
|
}
|
||||||
|
}
|
@ -38,6 +38,7 @@ public class FilterStrategies {
|
|||||||
*
|
*
|
||||||
* @return {@link TriggerAtFilterStrategies} 触发时间过滤策略
|
* @return {@link TriggerAtFilterStrategies} 触发时间过滤策略
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public static FilterStrategy triggerAtFilter() {
|
public static FilterStrategy triggerAtFilter() {
|
||||||
return new TriggerAtFilterStrategies();
|
return new TriggerAtFilterStrategies();
|
||||||
}
|
}
|
||||||
|
@ -1,16 +1,10 @@
|
|||||||
package com.aizuda.easy.retry.server.retry.task.support.strategy;
|
package com.aizuda.easy.retry.server.retry.task.support.strategy;
|
||||||
|
|
||||||
import com.aizuda.easy.retry.common.core.context.SpringContext;
|
import cn.hutool.core.date.DateUtil;
|
||||||
import com.aizuda.easy.retry.common.core.util.CronExpression;
|
import com.aizuda.easy.retry.common.core.util.CronExpression;
|
||||||
import com.aizuda.easy.retry.server.common.config.SystemProperties;
|
|
||||||
import com.aizuda.easy.retry.server.common.enums.DelayLevelEnum;
|
import com.aizuda.easy.retry.server.common.enums.DelayLevelEnum;
|
||||||
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
|
|
||||||
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
|
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
|
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
|
||||||
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
|
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
@ -30,12 +24,35 @@ import java.util.concurrent.TimeUnit;
|
|||||||
* @author: www.byteblogs.com
|
* @author: www.byteblogs.com
|
||||||
* @date : 2021-11-29 18:19
|
* @date : 2021-11-29 18:19
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings({"squid:S3776", "squid:S2676", "squid:S3740"})
|
|
||||||
public class WaitStrategies {
|
public class WaitStrategies {
|
||||||
|
|
||||||
private WaitStrategies() {
|
private WaitStrategies() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Data
|
||||||
|
public static class WaitStrategyContext {
|
||||||
|
// /**
|
||||||
|
// * 触发类型 1.CRON 表达式 2. 固定时间
|
||||||
|
// */
|
||||||
|
// private Integer triggerType;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 间隔时长
|
||||||
|
*/
|
||||||
|
private String triggerInterval;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 下次触发时间
|
||||||
|
*/
|
||||||
|
private LocalDateTime nextTriggerAt;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 触发次数
|
||||||
|
*/
|
||||||
|
private Integer triggerCount;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Getter
|
@Getter
|
||||||
public enum WaitStrategyEnum {
|
public enum WaitStrategyEnum {
|
||||||
DELAY_LEVEL(1, delayLevelWait()),
|
DELAY_LEVEL(1, delayLevelWait()),
|
||||||
@ -89,10 +106,6 @@ public class WaitStrategies {
|
|||||||
return WaitStrategyEnum.DELAY_LEVEL;
|
return WaitStrategyEnum.DELAY_LEVEL;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Data
|
|
||||||
public static class StrategyParameter {
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -143,13 +156,12 @@ public class WaitStrategies {
|
|||||||
/**
|
/**
|
||||||
* 延迟等级等待策略
|
* 延迟等级等待策略
|
||||||
*/
|
*/
|
||||||
private static final class DelayLevelWaitStrategy implements WaitStrategy {
|
private static final class DelayLevelWaitStrategy implements WaitStrategy {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public LocalDateTime computeRetryTime(RetryContext context) {
|
public LocalDateTime computeRetryTime(WaitStrategyContext context) {
|
||||||
RetryTask retryTask = context.getRetryTask();
|
DelayLevelEnum levelEnum = DelayLevelEnum.getDelayLevelByLevel(context.getTriggerCount());
|
||||||
DelayLevelEnum levelEnum = DelayLevelEnum.getDelayLevelByLevel(retryTask.getRetryCount());
|
return context.getNextTriggerAt().plus(levelEnum.getTime(), levelEnum.getUnit());
|
||||||
return retryTask.getNextTriggerAt().plus(levelEnum.getTime(), levelEnum.getUnit());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -159,22 +171,19 @@ public class WaitStrategies {
|
|||||||
private static final class FixedWaitStrategy implements WaitStrategy {
|
private static final class FixedWaitStrategy implements WaitStrategy {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public LocalDateTime computeRetryTime(RetryContext retryContext) {
|
public LocalDateTime computeRetryTime(WaitStrategyContext retryContext) {
|
||||||
RetryTask retryTask = retryContext.getRetryTask();
|
// if (TaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) {
|
||||||
long triggerInterval;
|
// // 回调失败的默认15分钟执行一次重试
|
||||||
if (TaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) {
|
// SystemProperties systemProperties = SpringContext.CONTEXT.getBean(SystemProperties.class);
|
||||||
// 回调失败的默认15分钟执行一次重试
|
// triggerInterval = systemProperties.getCallback().getTriggerInterval();
|
||||||
SystemProperties systemProperties = SpringContext.CONTEXT.getBean(SystemProperties.class);
|
// } else {
|
||||||
triggerInterval = systemProperties.getCallback().getTriggerInterval();
|
// AccessTemplate accessTemplate = SpringContext.CONTEXT.getBean(AccessTemplate.class);
|
||||||
} else {
|
// SceneConfig sceneConfig =
|
||||||
AccessTemplate accessTemplate = SpringContext.CONTEXT.getBean(AccessTemplate.class);
|
// accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName());
|
||||||
SceneConfig sceneConfig =
|
// triggerInterval = Integer.parseInt(sceneConfig.getTriggerInterval());
|
||||||
accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName());
|
// }
|
||||||
triggerInterval = Integer.parseInt(sceneConfig.getTriggerInterval());
|
|
||||||
}
|
|
||||||
|
|
||||||
|
return retryContext.getNextTriggerAt().plusSeconds(Integer.parseInt(retryContext.getTriggerInterval()));
|
||||||
return retryTask.getNextTriggerAt().plusSeconds(triggerInterval);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -184,23 +193,16 @@ public class WaitStrategies {
|
|||||||
private static final class CronWaitStrategy implements WaitStrategy {
|
private static final class CronWaitStrategy implements WaitStrategy {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public LocalDateTime computeRetryTime(RetryContext context) {
|
public LocalDateTime computeRetryTime(WaitStrategyContext context) {
|
||||||
RetryTask retryTask = context.getRetryTask();
|
|
||||||
|
|
||||||
AccessTemplate accessTemplate = SpringContext.CONTEXT.getBean(AccessTemplate.class);
|
|
||||||
|
|
||||||
SceneConfig sceneConfig =
|
|
||||||
accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName());
|
|
||||||
|
|
||||||
Date nextValidTime;
|
|
||||||
try {
|
try {
|
||||||
ZonedDateTime zdt = retryTask.getNextTriggerAt().atZone(ZoneOffset.ofHours(8));
|
ZonedDateTime zdt = context.getNextTriggerAt().atZone(ZoneOffset.ofHours(8));
|
||||||
nextValidTime = new CronExpression(sceneConfig.getTriggerInterval()).getNextValidTimeAfter(Date.from(zdt.toInstant()));
|
Date nextValidTime = new CronExpression(context.getTriggerInterval()).getNextValidTimeAfter(Date.from(zdt.toInstant()));
|
||||||
|
return DateUtil.toLocalDateTime(nextValidTime);
|
||||||
} catch (ParseException e) {
|
} catch (ParseException e) {
|
||||||
throw new EasyRetryServerException("解析CRON表达式异常 [{}]", sceneConfig.getTriggerInterval(), e);
|
throw new EasyRetryServerException("解析CRON表达式异常 [{}]", context.getTriggerInterval(), e);
|
||||||
}
|
}
|
||||||
|
|
||||||
return LocalDateTime.ofEpochSecond( nextValidTime.getTime() / 1000,0, ZoneOffset.ofHours(8));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -226,19 +228,12 @@ public class WaitStrategies {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public LocalDateTime computeRetryTime(RetryContext retryContext) {
|
public LocalDateTime computeRetryTime(WaitStrategyContext retryContext) {
|
||||||
|
|
||||||
if (Objects.nonNull(retryContext)) {
|
if (Objects.nonNull(retryContext)) {
|
||||||
RetryTask retryTask = retryContext.getRetryTask();
|
|
||||||
|
|
||||||
AccessTemplate accessTemplate = SpringContext.CONTEXT.getBean(AccessTemplate.class);
|
|
||||||
SceneConfig sceneConfig =
|
|
||||||
accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName());
|
|
||||||
|
|
||||||
if (maximum == 0) {
|
if (maximum == 0) {
|
||||||
maximum = Long.parseLong(sceneConfig.getTriggerInterval());
|
maximum = Long.parseLong(retryContext.getTriggerInterval());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Preconditions.checkArgument(maximum > minimum, "maximum must be > minimum but maximum is %d and minimum is", maximum, minimum);
|
Preconditions.checkArgument(maximum > minimum, "maximum must be > minimum but maximum is %d and minimum is", maximum, minimum);
|
||||||
|
@ -2,24 +2,45 @@ package com.aizuda.easy.retry.server.retry.task.support.timer;
|
|||||||
|
|
||||||
import io.netty.util.Timeout;
|
import io.netty.util.Timeout;
|
||||||
import io.netty.util.TimerTask;
|
import io.netty.util.TimerTask;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author www.byteblogs.com
|
* @author www.byteblogs.com
|
||||||
* @date 2023-09-23 11:10:01
|
* @date 2023-09-23 11:10:01
|
||||||
* @since 2.4.0
|
* @since 2.4.0
|
||||||
*/
|
*/
|
||||||
|
@Slf4j
|
||||||
public abstract class AbstractTimerTask implements TimerTask {
|
public abstract class AbstractTimerTask implements TimerTask {
|
||||||
|
|
||||||
protected String groupName;
|
protected String groupName;
|
||||||
protected String uniqueId;
|
protected String uniqueId;
|
||||||
|
|
||||||
|
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(8, 8, 10, TimeUnit.SECONDS,
|
||||||
|
new LinkedBlockingQueue<>());
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run(Timeout timeout) throws Exception {
|
public void run(Timeout timeout) throws Exception {
|
||||||
|
log.info("开始执行重试任务. 当前时间:[{}] groupName:[{}] uniqueId:[{}]", LocalDateTime.now(), groupName, uniqueId);
|
||||||
|
|
||||||
// 先清除时间轮的缓存
|
executor.execute(() -> {
|
||||||
TimerWheelHandler.clearCache(groupName, uniqueId);
|
|
||||||
|
try {
|
||||||
|
// 先清除时间轮的缓存
|
||||||
|
RetryTimerWheel.clearCache(groupName, uniqueId);
|
||||||
|
|
||||||
|
doRun(timeout);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("重试任务执行失败", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
doRun(timeout);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract void doRun(Timeout timeout);
|
protected abstract void doRun(Timeout timeout);
|
||||||
|
@ -1,18 +1,45 @@
|
|||||||
package com.aizuda.easy.retry.server.retry.task.support.timer;
|
package com.aizuda.easy.retry.server.retry.task.support.timer;
|
||||||
|
|
||||||
|
import com.aizuda.easy.retry.common.core.context.SpringContext;
|
||||||
|
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskActuatorFactory;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutor;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.access.TaskAccess;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
import io.netty.util.Timeout;
|
import io.netty.util.Timeout;
|
||||||
import io.netty.util.TimerTask;
|
import io.netty.util.TimerTask;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author: www.byteblogs.com
|
* @author: www.byteblogs.com
|
||||||
* @date : 2023-09-22 17:09
|
* @date : 2023-09-22 17:09
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class CallbackTimerTask implements TimerTask {
|
public class CallbackTimerTask extends AbstractTimerTask {
|
||||||
|
|
||||||
|
private RetryTimerContext context;
|
||||||
|
|
||||||
|
public CallbackTimerTask(RetryTimerContext context) {
|
||||||
|
this.context = context;
|
||||||
|
super.groupName = context.getGroupName();
|
||||||
|
super.uniqueId = context.getUniqueId();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run(final Timeout timeout) throws Exception {
|
protected void doRun(final Timeout timeout) {
|
||||||
log.info("回调任务执行");
|
log.info("回调任务执行 {}", LocalDateTime.now());
|
||||||
|
AccessTemplate accessTemplate = SpringContext.getBeanByType(AccessTemplate.class);
|
||||||
|
TaskAccess<RetryTask> retryTaskAccess = accessTemplate.getRetryTaskAccess();
|
||||||
|
RetryTask retryTask = retryTaskAccess.one(context.getGroupName(), new LambdaQueryWrapper<RetryTask>()
|
||||||
|
.eq(RetryTask::getGroupName, context.getGroupName())
|
||||||
|
.eq(RetryTask::getUniqueId, context.getUniqueId())
|
||||||
|
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()));
|
||||||
|
TaskExecutor taskExecutor = TaskActuatorFactory.getTaskActuator(context.getScene());
|
||||||
|
taskExecutor.actuator(retryTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -33,7 +33,6 @@ public class RetryTimerTask extends AbstractTimerTask {
|
|||||||
@Override
|
@Override
|
||||||
public void doRun(final Timeout timeout){
|
public void doRun(final Timeout timeout){
|
||||||
log.info("重试任务执行 {}", LocalDateTime.now());
|
log.info("重试任务执行 {}", LocalDateTime.now());
|
||||||
// todo
|
|
||||||
AccessTemplate accessTemplate = SpringContext.getBeanByType(AccessTemplate.class);
|
AccessTemplate accessTemplate = SpringContext.getBeanByType(AccessTemplate.class);
|
||||||
TaskAccess<RetryTask> retryTaskAccess = accessTemplate.getRetryTaskAccess();
|
TaskAccess<RetryTask> retryTaskAccess = accessTemplate.getRetryTaskAccess();
|
||||||
RetryTask retryTask = retryTaskAccess.one(context.getGroupName(), new LambdaQueryWrapper<RetryTask>()
|
RetryTask retryTask = retryTaskAccess.one(context.getGroupName(), new LambdaQueryWrapper<RetryTask>()
|
||||||
|
@ -0,0 +1,61 @@
|
|||||||
|
package com.aizuda.easy.retry.server.retry.task.support.timer;
|
||||||
|
|
||||||
|
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||||
|
import com.aizuda.easy.retry.server.common.Lifecycle;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.idempotent.TimerIdempotent;
|
||||||
|
import io.netty.util.HashedWheelTimer;
|
||||||
|
import io.netty.util.TimerTask;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author: www.byteblogs.com
|
||||||
|
* @date : 2023-09-22 17:03
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
@Slf4j
|
||||||
|
public class RetryTimerWheel implements Lifecycle {
|
||||||
|
|
||||||
|
private static final int TICK_DURATION = 500;
|
||||||
|
private static final String THREAD_NAME_PREFIX = "retry-task-timer-wheel-";
|
||||||
|
private static HashedWheelTimer timer = null;
|
||||||
|
|
||||||
|
private static final TimerIdempotent idempotent = new TimerIdempotent();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start() {
|
||||||
|
timer = new HashedWheelTimer(
|
||||||
|
new CustomizableThreadFactory(THREAD_NAME_PREFIX), TICK_DURATION, TimeUnit.MILLISECONDS);
|
||||||
|
timer.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void register(String groupName, String uniqueId, TimerTask task, long delay, TimeUnit unit) {
|
||||||
|
|
||||||
|
if (!isExisted(groupName, uniqueId)) {
|
||||||
|
delay = delay < 0 ? 0 : delay;
|
||||||
|
log.info("加入时间轮. delay:[{}ms] uniqueId:[{}]", delay, uniqueId);
|
||||||
|
try {
|
||||||
|
timer.newTimeout(task, delay, unit);
|
||||||
|
idempotent.set(uniqueId, uniqueId);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LogUtils.error(log, "加入时间轮失败. uniqueId:[{}]", uniqueId, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean isExisted(String groupName, String uniqueId) {
|
||||||
|
return idempotent.isExist(groupName, uniqueId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void clearCache(String groupName, String uniqueId) {
|
||||||
|
idempotent.clear(groupName, uniqueId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
timer.stop();
|
||||||
|
}
|
||||||
|
}
|
@ -1,107 +0,0 @@
|
|||||||
package com.aizuda.easy.retry.server.retry.task.support.timer;
|
|
||||||
|
|
||||||
import cn.hutool.core.util.StrUtil;
|
|
||||||
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
|
||||||
import com.aizuda.easy.retry.server.common.Lifecycle;
|
|
||||||
import com.google.common.cache.Cache;
|
|
||||||
import com.google.common.cache.CacheBuilder;
|
|
||||||
import io.netty.util.HashedWheelTimer;
|
|
||||||
import io.netty.util.Timeout;
|
|
||||||
import io.netty.util.TimerTask;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.jetbrains.annotations.NotNull;
|
|
||||||
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author: www.byteblogs.com
|
|
||||||
* @date : 2023-09-22 17:03
|
|
||||||
*/
|
|
||||||
@Component
|
|
||||||
@Slf4j
|
|
||||||
public class TimerWheelHandler implements Lifecycle {
|
|
||||||
|
|
||||||
private static HashedWheelTimer timer = null;
|
|
||||||
|
|
||||||
private static Cache<String, Timeout> cache;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void start() {
|
|
||||||
|
|
||||||
// TODO 支持可配置
|
|
||||||
// tickDuration 和 timeUnit 一格的时间长度
|
|
||||||
// ticksPerWheel 一圈有多少格
|
|
||||||
timer = new HashedWheelTimer(
|
|
||||||
new CustomizableThreadFactory("retry-task-timer-wheel-"), 100,
|
|
||||||
TimeUnit.MILLISECONDS, 1024);
|
|
||||||
|
|
||||||
timer.start();
|
|
||||||
|
|
||||||
cache = CacheBuilder.newBuilder()
|
|
||||||
// 设置并发级别为cpu核心数
|
|
||||||
.concurrencyLevel(Runtime.getRuntime().availableProcessors())
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void register(String groupName, String uniqueId, TimerTask task, long delay, TimeUnit unit) {
|
|
||||||
|
|
||||||
if (delay < 0) {
|
|
||||||
delay = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO 支持可配置
|
|
||||||
if (delay > 60 * 1000) {
|
|
||||||
LogUtils.warn(log, "距离下次执行时间过久, 不满足进入时间轮的条件. groupName:[{}] uniqueId:[{}] delay:[{}ms]",
|
|
||||||
groupName, uniqueId, delay);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
Timeout timeout = getTimeout(groupName, uniqueId);
|
|
||||||
if (Objects.isNull(timeout)) {
|
|
||||||
try {
|
|
||||||
timeout = timer.newTimeout(task, delay, unit);
|
|
||||||
cache.put(getKey(groupName, uniqueId), timeout);
|
|
||||||
} catch (Exception e) {
|
|
||||||
LogUtils.error(log, "加入时间轮失败. groupName:[{}] uniqueId:[{}]",
|
|
||||||
groupName, uniqueId, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private static String getKey(String groupName, String uniqueId) {
|
|
||||||
return groupName.concat(StrUtil.UNDERLINE).concat(uniqueId);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Timeout getTimeout(String groupName, String uniqueId) {
|
|
||||||
return cache.getIfPresent(getKey(groupName, uniqueId));
|
|
||||||
}
|
|
||||||
|
|
||||||
public static boolean isExisted(String groupName, String uniqueId) {
|
|
||||||
return Objects.nonNull(cache.getIfPresent(getKey(groupName, uniqueId)));
|
|
||||||
}
|
|
||||||
|
|
||||||
public static boolean cancel(String groupName, String uniqueId) {
|
|
||||||
String key = getKey(groupName, uniqueId);
|
|
||||||
Timeout timeout = cache.getIfPresent(key);
|
|
||||||
if (Objects.isNull(timeout)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
cache.invalidate(key);
|
|
||||||
return timeout.cancel();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void clearCache(String groupName, String uniqueId) {
|
|
||||||
cache.invalidate(getKey(groupName, uniqueId));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
timer.stop();
|
|
||||||
cache.invalidateAll();
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user