feat:2.4.0

1. 优化重试幂等
This commit is contained in:
byteblogs168 2023-11-13 23:41:59 +08:00
parent 8c332fb0cd
commit bc3cef13c0
14 changed files with 101 additions and 119 deletions

View File

@ -111,12 +111,10 @@ public abstract class AbstractConfigAccess<T> implements ConfigAccess<T> {
if (Objects.isNull(groupConfig)) {
return Collections.EMPTY_SET;
}
LambdaQueryWrapper<SceneConfig> sceneConfigLambdaQueryWrapper = new LambdaQueryWrapper<SceneConfig>()
.eq(SceneConfig::getSceneName, groupName);
if (Objects.isNull(groupConfig)) {
return Collections.EMPTY_SET;
}
LambdaQueryWrapper<SceneConfig> sceneConfigLambdaQueryWrapper = new LambdaQueryWrapper<SceneConfig>()
.select(SceneConfig::getSceneName)
.eq(SceneConfig::getGroupName, groupName);
if (StatusEnum.YES.getStatus().equals(groupConfig.getGroupStatus())) {
sceneConfigLambdaQueryWrapper.eq(SceneConfig::getSceneStatus, StatusEnum.NO.getStatus());

View File

@ -90,7 +90,9 @@ public class ActorGenerator {
* @return actor 引用
*/
public static ActorRef execCallbackUnitActor() {
return getRetryActorSystem().actorOf(getSpringExtension().props(EXEC_CALLBACK_UNIT_ACTOR).withDispatcher(RETRY_TASK_EXECUTOR_DISPATCHER));
return getRetryActorSystem().actorOf(getSpringExtension()
.props(EXEC_CALLBACK_UNIT_ACTOR)
.withDispatcher(RETRY_TASK_EXECUTOR_DISPATCHER));
}
/**

View File

@ -27,7 +27,6 @@ public class RequestInterceptor implements Interceptor {
String timeoutTime = request.header(TIMEOUT_TIME);
if (StrUtil.isNotBlank(timeoutTime)) {
int timeout = Integer.parseInt(timeoutTime);
log.info("url:[{}] timeout:[{}]", request.url(), timeout);
if (timeout <= 0) {
return chain.proceed(request);
}

View File

@ -15,7 +15,6 @@ import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
import com.aizuda.easy.retry.server.retry.task.client.RetryRpcClient;
import com.aizuda.easy.retry.server.common.IdempotentStrategy;
import com.aizuda.easy.retry.server.retry.task.support.context.CallbackRetryContext;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO;
import com.aizuda.easy.retry.server.retry.task.support.handler.CallbackRetryTaskHandler;
@ -28,7 +27,6 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
@ -49,9 +47,6 @@ import java.util.concurrent.Callable;
@Slf4j
public class ExecCallbackUnitActor extends AbstractActor {
@Autowired
@Qualifier("bitSetIdempotentStrategyHandler")
private IdempotentStrategy<String, Integer> idempotentStrategy;
@Autowired
private AccessTemplate accessTemplate;
@Autowired
@ -102,9 +97,6 @@ public class ExecCallbackUnitActor extends AbstractActor {
retryTaskLog.setMessage(StringUtils.isBlank(e.getMessage()) ? StrUtil.EMPTY : e.getMessage());
} finally {
// 清除幂等标识位
idempotentStrategy.clear(retryTask.getGroupName(), retryTask.getId().intValue());
ActorRef actorRef = ActorGenerator.logActor();
actorRef.tell(retryTaskLog, actorRef);
@ -127,14 +119,17 @@ public class ExecCallbackUnitActor extends AbstractActor {
TaskAccess<RetryTask> retryTaskAccess = accessTemplate.getRetryTaskAccess();
RetryTask retryTask = retryTaskAccess.one(callbackTask.getGroupName(),
new LambdaQueryWrapper<RetryTask>().eq(RetryTask::getUniqueId, retryTaskUniqueId));
new LambdaQueryWrapper<RetryTask>()
.select(RetryTask::getRetryStatus)
.eq(RetryTask::getGroupName, callbackTask.getGroupName())
.eq(RetryTask::getUniqueId, retryTaskUniqueId));
Assert.notNull(retryTask, () -> new EasyRetryServerException("未查询回调任务对应的重试任务. callbackUniqueId:[{}] uniqueId:[{}]",
callbackTask.getUniqueId(), retryTaskUniqueId));
// 回调参数
RetryCallbackDTO retryCallbackDTO = new RetryCallbackDTO();
retryCallbackDTO.setIdempotentId(callbackTask.getIdempotentId());
// 重试任务的状态
// 重试任务的状态, 客户端根据重试状态判断最大次数或者成功成功
retryCallbackDTO.setRetryStatus(retryTask.getRetryStatus());
retryCallbackDTO.setArgsStr(callbackTask.getArgsStr());
retryCallbackDTO.setScene(callbackTask.getSceneName());

View File

@ -16,15 +16,12 @@ import com.aizuda.easy.retry.server.common.client.RequestBuilder;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
import com.aizuda.easy.retry.server.retry.task.client.RetryRpcClient;
import com.aizuda.easy.retry.server.common.IdempotentStrategy;
import com.aizuda.easy.retry.server.retry.task.support.context.MaxAttemptsPersistenceRetryContext;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO;
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
@ -45,10 +42,6 @@ import java.util.concurrent.Callable;
@Slf4j
public class ExecUnitActor extends AbstractActor {
@Autowired
@Qualifier("bitSetIdempotentStrategyHandler")
private IdempotentStrategy<String, Integer> idempotentStrategy;
@Override
public Receive createReceive() {
return receiveBuilder().match(RetryExecutor.class, retryExecutor -> {
@ -115,8 +108,6 @@ public class ExecUnitActor extends AbstractActor {
retryTaskLog.setMessage(e.getMessage());
} finally {
// 清除幂等标识位
idempotentStrategy.clear(retryTask.getGroupName(), retryTask.getId().intValue());
ActorRef actorRef = ActorGenerator.logActor();
actorRef.tell(retryTaskLog, actorRef);
getContext().stop(getSelf());

View File

@ -5,23 +5,19 @@ import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.IdempotentStrategy;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.retry.task.support.timer.CallbackTimerTask;
import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerContext;
import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerTask;
import com.aizuda.easy.retry.server.retry.task.support.timer.RetryTimerWheel;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.retry.task.support.handler.CallbackRetryTaskHandler;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
@ -30,8 +26,6 @@ import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.concurrent.TimeUnit;
/**
* 重试完成执行器 1更新重试任务 2记录重试日志
@ -53,17 +47,21 @@ public class FailureActor extends AbstractActor {
private TransactionTemplate transactionTemplate;
@Autowired
private SystemProperties systemProperties;
@Autowired
@Qualifier("retryIdempotentStrategyHandler")
private IdempotentStrategy<String, Long> idempotentStrategy;
@Override
public Receive createReceive() {
return receiveBuilder().match(RetryTask.class, retryTask -> {
LogUtils.info(log, "FailureActor params:[{}]", retryTask);
try {
// 超过最大等级
SceneConfig sceneConfig =
accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName());
try {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
@ -91,6 +89,8 @@ public class FailureActor extends AbstractActor {
} catch (Exception e) {
LogUtils.error(log, "更新重试任务失败", e);
} finally {
// 清除幂等标识位
idempotentStrategy.clear(retryTask.getGroupName(), retryTask.getId());
if (RetryStatusEnum.MAX_COUNT.getStatus().equals(retryTask.getRetryStatus())) {
RetryTaskLogDTO retryTaskLogDTO = new RetryTaskLogDTO();

View File

@ -5,6 +5,7 @@ import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.IdempotentStrategy;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO;
@ -13,6 +14,7 @@ import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
@ -42,6 +44,9 @@ public class FinishActor extends AbstractActor {
private CallbackRetryTaskHandler callbackRetryTaskHandler;
@Autowired
private TransactionTemplate transactionTemplate;
@Autowired
@Qualifier("retryIdempotentStrategyHandler")
private IdempotentStrategy<String, Long> idempotentStrategy;
@Override
public Receive createReceive() {
@ -69,6 +74,8 @@ public class FinishActor extends AbstractActor {
}catch (Exception e) {
LogUtils.error(log, "更新重试任务失败", e);
} finally {
// 清除幂等标识位
idempotentStrategy.clear(retryTask.getGroupName(), retryTask.getId());
RetryTaskLogDTO retryTaskLogDTO = new RetryTaskLogDTO();
retryTaskLogDTO.setGroupName(retryTask.getGroupName());

View File

@ -3,6 +3,7 @@ package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.result;
import akka.actor.AbstractActor;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.IdempotentStrategy;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
@ -11,6 +12,7 @@ import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
@ -29,10 +31,11 @@ import java.time.LocalDateTime;
@Slf4j
public class NoRetryActor extends AbstractActor {
public static final String BEAN_NAME = "NoRetryActor";
@Autowired
protected AccessTemplate accessTemplate;
@Autowired
@Qualifier("retryIdempotentStrategyHandler")
private IdempotentStrategy<String, Long> idempotentStrategy;
@Override
public Receive createReceive() {
@ -52,6 +55,9 @@ public class NoRetryActor extends AbstractActor {
}catch (Exception e) {
LogUtils.error(log,"更新重试任务失败", e);
} finally {
// 清除幂等标识位
idempotentStrategy.clear(retryTask.getGroupName(), retryTask.getId());
// 更新DB状态
getContext().stop(getSelf());
}

View File

@ -4,7 +4,6 @@ import akka.actor.AbstractActor;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.IdempotentStrategy;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
import com.aizuda.easy.retry.server.common.dto.ScanTask;
@ -20,22 +19,15 @@ import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
import com.aizuda.easy.retry.template.datasource.utils.RequestDataHelper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -51,9 +43,6 @@ import java.util.stream.Collectors;
@Slf4j
public abstract class AbstractScanGroup extends AbstractActor {
@Autowired
@Qualifier("bitSetIdempotentStrategyHandler")
protected IdempotentStrategy<String, Integer> idempotentStrategy;
@Autowired
protected SystemProperties systemProperties;
@Autowired

View File

@ -30,8 +30,8 @@ import java.time.LocalDateTime;
public abstract class AbstractTaskExecutor implements TaskExecutor, InitializingBean {
@Autowired
@Qualifier("bitSetIdempotentStrategyHandler")
protected IdempotentStrategy<String, Integer> idempotentStrategy;
@Qualifier("retryIdempotentStrategyHandler")
protected IdempotentStrategy<String, Long> idempotentStrategy;
@Autowired
protected SystemProperties systemProperties;
@Autowired
@ -86,9 +86,9 @@ public abstract class AbstractTaskExecutor implements TaskExecutor, Initializing
}
protected void productExecUnitActor(RetryExecutor retryExecutor) {
String groupIdHash = retryExecutor.getRetryContext().getRetryTask().getGroupName();
String groupName = retryExecutor.getRetryContext().getRetryTask().getGroupName();
Long retryId = retryExecutor.getRetryContext().getRetryTask().getId();
idempotentStrategy.set(groupIdHash, retryId.intValue());
idempotentStrategy.set(groupName, retryId);
ActorRef actorRef = getActorRef();
actorRef.tell(retryExecutor, actorRef);

View File

@ -1,58 +0,0 @@
package com.aizuda.easy.retry.server.retry.task.support.idempotent;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.IdempotentStrategy;
import org.springframework.stereotype.Component;
import java.util.BitSet;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* BitSet幂等校验器
*
* @author: www.byteblogs.com
* @date : 2021-11-23 09:26
*/
@Component
public class BitSetIdempotentStrategyHandler implements IdempotentStrategy<String, Integer> {
/**
* BIT_SET_MAP[key] : group
* BIT_SET_MAP[value] : BitSet
*/
public static final ConcurrentMap<String, BitSet> BIT_SET_MAP = new ConcurrentHashMap<>();
@Override
public boolean set(String groupId, Integer key) {
BitSet bitSet = BIT_SET_MAP.get(groupId);
if (Objects.isNull(bitSet)) {
bitSet = new BitSet(16);
bitSet.set(key, Boolean.TRUE);
BIT_SET_MAP.put(groupId, bitSet);
}
return Boolean.TRUE;
}
@Override
public Integer get(String s) {
throw new EasyRetryServerException("不支持的操作");
}
@Override
public boolean isExist(String groupId, Integer value) {
BitSet bitSet = BIT_SET_MAP.getOrDefault(groupId, new BitSet(16));
return bitSet.get(value);
}
@Override
public boolean clear(String groupId, Integer value) {
BitSet bitSet = BIT_SET_MAP.get(groupId);
bitSet.clear(value);
return Boolean.TRUE;
}
}

View File

@ -0,0 +1,54 @@
package com.aizuda.easy.retry.server.retry.task.support.idempotent;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.server.common.IdempotentStrategy;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* 重试任务幂等校验器
*
* @author: www.byteblogs.com
* @date : 2021-11-23 09:26
*/
@Component
public class RetryIdempotentStrategyHandler implements IdempotentStrategy<String, Long> {
private static final Cache<String, Long> cache;
static {
cache = CacheBuilder.newBuilder()
.concurrencyLevel(16) // 并发级别
.expireAfterWrite(60, TimeUnit.SECONDS)
.build();
}
@Override
public boolean set(String groupId, Long value) {
cache.put(getKey(groupId, value), value);
return Boolean.TRUE;
}
@Override
public Long get(String s) {
throw new EasyRetryServerException("不支持的操作");
}
@Override
public boolean isExist(String groupId, Long value) {
return cache.asMap().containsKey(getKey(groupId, value));
}
@Override
public boolean clear(String groupId, Long value) {
cache.invalidate(getKey(groupId, value));
return Boolean.TRUE;
}
private static String getKey(final String key, final Long value) {
return key.concat(StrUtil.UNDERLINE).concat(String.valueOf(value));
}
}

View File

@ -16,7 +16,6 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public class TimerIdempotent implements IdempotentStrategy<String, String> {
private static final Cache<String, String> cache;
static {
@ -33,7 +32,7 @@ public class TimerIdempotent implements IdempotentStrategy<String, String> {
}
private static String getKey(final String key, final String value) {
return key.concat(StrUtil.UNDERLINE).concat(String.valueOf(value));
return key.concat(StrUtil.UNDERLINE).concat(value);
}
@Override

View File

@ -48,7 +48,7 @@ public class FilterStrategies {
*
* @return {@link BitSetIdempotentFilterStrategies} BitSet幂等的过滤策略
*/
public static FilterStrategy bitSetIdempotentFilter(IdempotentStrategy<String, Integer> idempotentStrategy) {
public static FilterStrategy bitSetIdempotentFilter(IdempotentStrategy<String, Long> idempotentStrategy) {
return new BitSetIdempotentFilterStrategies(idempotentStrategy);
}
@ -122,9 +122,9 @@ public class FilterStrategies {
*/
private static final class BitSetIdempotentFilterStrategies implements FilterStrategy {
private IdempotentStrategy<String, Integer> idempotentStrategy;
private IdempotentStrategy<String, Long> idempotentStrategy;
public BitSetIdempotentFilterStrategies(IdempotentStrategy<String, Integer> idempotentStrategy) {
public BitSetIdempotentFilterStrategies(IdempotentStrategy<String, Long> idempotentStrategy) {
this.idempotentStrategy = idempotentStrategy;
}
@ -132,7 +132,7 @@ public class FilterStrategies {
public Pair<Boolean /*是否符合条件*/, StringBuilder/*描述信息*/> filter(RetryContext retryContext) {
RetryTask retryTask = retryContext.getRetryTask();
boolean result = !idempotentStrategy.isExist(retryTask.getGroupName(), retryTask.getId().intValue());
boolean result = !idempotentStrategy.isExist(retryTask.getGroupName(), retryTask.getId());
StringBuilder description = new StringBuilder();
if (!result) {
description.append(MessageFormat.format("存在执行中的任务.uniqueId:[{0}]", retryTask.getUniqueId()));