feat(3.2.0): 新增token 校验

1. 新增group_config 新增token配置
2. 组配置新增配置token的字段且可以自动生成
3. 客户端与服务端都校验token的合法性
4. 优化了group缓存的bug
This commit is contained in:
byteblogs168 2024-03-30 15:24:21 +08:00
parent aa0ad7737c
commit 6cad4b81cc
27 changed files with 301 additions and 71 deletions

View File

@ -29,6 +29,11 @@
<artifactId>spring-boot-starter-web</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>

View File

@ -0,0 +1,11 @@
package com.aizuda.easy.retry.client.common.annotation;
/**
* Easy Retry Client 认证
*
* @author: xiaownouniu
* @date : 2024-03-30
* @since : 3.2.0
*/
public @interface Authentication {
}

View File

@ -0,0 +1,37 @@
package com.aizuda.easy.retry.client.common.intercepter;
import com.aizuda.easy.retry.client.common.config.EasyRetryProperties;
import com.aizuda.easy.retry.client.common.exception.EasyRetryClientException;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import lombok.RequiredArgsConstructor;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import java.util.Optional;
/**
* Easy Retry 认证拦截器
*
* @author: xiaowoniu
* @date : 2022-04-18 09:19
* @since 3.2.0
*/
@Aspect
@Component
@RequiredArgsConstructor
public class AuthenticationInterceptor {
private final EasyRetryProperties easyRetryProperties;
@Before(value = "@annotation(com.aizuda.easy.retry.client.common.annotation.Authentication) || @within(com.aizuda.easy.retry.client.common.annotation.Authentication)")
public void easyRetryAuth() {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
String easyRetryAuth = attributes.getRequest().getHeader(SystemConstants.EASY_RETRY_AUTH_TOKEN);
String configToken = Optional.ofNullable(easyRetryProperties.getToken()).orElse(SystemConstants.DEFAULT_TOKEN);
if (!configToken.equals(easyRetryAuth)) {
throw new EasyRetryClientException("认证失败.【请检查配置的Token是否正确】");
}
}
}

View File

@ -182,7 +182,7 @@ public class NettyChannel {
.set(HeadersEnum.HOST.getKey(), serverConfig.getHost())
.set(HeadersEnum.NAMESPACE.getKey(), Optional.ofNullable(easyRetryProperties.getNamespace()).orElse(
SystemConstants.DEFAULT_NAMESPACE))
.set(HeadersEnum.TOKEN.getKey(), Optional.ofNullable(easyRetryProperties.getNamespace()).orElse(
.set(HeadersEnum.TOKEN.getKey(), Optional.ofNullable(easyRetryProperties.getToken()).orElse(
SystemConstants.DEFAULT_TOKEN))
;

View File

@ -1,6 +1,7 @@
package com.aizuda.easy.retry.client.core.client;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.client.common.annotation.Authentication;
import com.aizuda.easy.retry.client.common.log.support.EasyRetryLogManager;
import com.aizuda.easy.retry.client.core.IdempotentIdGenerate;
import com.aizuda.easy.retry.client.core.RetryArgSerializer;
@ -62,6 +63,7 @@ public class RetryEndPoint {
* 服务端调度重试入口
*/
@PostMapping("/dispatch/v1")
@Authentication
public Result<DispatchRetryResultDTO> dispatch(@RequestBody @Validated DispatchRetryDTO executeReqDto) {
RetryerInfo retryerInfo = RetryerInfoCache.get(executeReqDto.getScene(), executeReqDto.getExecutorName());
@ -135,12 +137,14 @@ public class RetryEndPoint {
* 同步版本
*/
@PostMapping("/sync/version/v1")
@Authentication
public Result syncVersion(@RequestBody ConfigDTO configDTO) {
GroupVersionCache.configDTO = configDTO;
return new Result();
}
@PostMapping("/callback/v1")
@Authentication
public Result callback(@RequestBody @Validated RetryCallbackDTO callbackDTO) {
RetryerInfo retryerInfo = null;
@ -252,6 +256,7 @@ public class RetryEndPoint {
* @return idempotentId
*/
@PostMapping("/generate/idempotent-id/v1")
@Authentication
public Result<String> idempotentIdGenerate(
@RequestBody @Validated GenerateRetryIdempotentIdDTO generateRetryIdempotentIdDTO) {

View File

@ -1,5 +1,6 @@
package com.aizuda.easy.retry.client.job.core.client;
import com.aizuda.easy.retry.client.common.annotation.Authentication;
import com.aizuda.easy.retry.client.common.log.support.EasyRetryLogManager;
import com.aizuda.easy.retry.client.job.core.IJobExecutor;
import com.aizuda.easy.retry.client.job.core.cache.JobExecutorInfoCache;
@ -35,6 +36,7 @@ import java.util.concurrent.ThreadPoolExecutor;
public class JobEndPoint {
@PostMapping("/dispatch/v1")
@Authentication
public Result<Boolean> dispatchJob(@RequestBody @Validated DispatchJobRequest dispatchJob) {
try {
@ -108,6 +110,7 @@ public class JobEndPoint {
}
@PostMapping("/stop/v1")
@Authentication
public Result<Boolean> stopJob(@RequestBody @Validated StopJobDTO interruptJob) {
ThreadPoolExecutor threadPool = ThreadPoolCache.getThreadPool(interruptJob.getTaskBatchId());
if (Objects.isNull(threadPool) || threadPool.isShutdown() || threadPool.isTerminated()) {

View File

@ -133,4 +133,9 @@ public interface SystemConstants {
* 仅表示定时任务类型为工作流
*/
Integer WORKFLOW_TRIGGER_TYPE = 99;
/**
* Easy Retry 认证Token
*/
String EASY_RETRY_AUTH_TOKEN= "ER-TOKEN";
}

View File

@ -7,6 +7,9 @@ import com.google.common.cache.CacheBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@ -21,14 +24,14 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public class CacheConsumerGroup implements Lifecycle {
private static Cache<String /*groupName*/, String/*namespaceId*/> CACHE;
private static Cache<String /*groupName*/, Set<String>/*namespaceId*/> CACHE;
/**
* 获取所有缓存
*
* @return 缓存对象
*/
public static ConcurrentMap<String, String> getAllConsumerGroupName() {
public static ConcurrentMap<String, Set<String>> getAllConsumerGroupName() {
return CACHE.asMap();
}
@ -39,7 +42,9 @@ public class CacheConsumerGroup implements Lifecycle {
* @return 缓存对象
*/
public static synchronized void addOrUpdate(String groupName, String namespaceId) {
CACHE.put(groupName, namespaceId);
Set<String> namespaceIds = Optional.ofNullable(CACHE.getIfPresent(groupName)).orElseGet(HashSet::new);
namespaceIds.add(namespaceId);
CACHE.put(groupName, namespaceIds);
}
public static void remove(String groupName) {

View File

@ -8,6 +8,7 @@ import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.RegisterNodeInfoConverter;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.register.ServerRegister;
import com.aizuda.easy.retry.server.common.triple.Pair;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.ServerNodeMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@ -44,7 +45,7 @@ import java.util.stream.Collectors;
@Slf4j
public class CacheRegisterTable implements Lifecycle {
private static Cache<String, ConcurrentMap<String, RegisterNodeInfo>> CACHE;
private static Cache<Pair<String/*groupName*/, String/*namespaceId*/>, ConcurrentMap<String, RegisterNodeInfo>> CACHE;
/**
* 获取所有缓存
@ -52,7 +53,7 @@ public class CacheRegisterTable implements Lifecycle {
* @return 缓存对象
*/
public static Set<RegisterNodeInfo> getAllPods() {
ConcurrentMap<String, ConcurrentMap<String, RegisterNodeInfo>> concurrentMap = CACHE.asMap();
ConcurrentMap<Pair<String, String>, ConcurrentMap<String, RegisterNodeInfo>> concurrentMap = CACHE.asMap();
if (CollectionUtils.isEmpty(concurrentMap)) {
return Sets.newHashSet();
}
@ -137,8 +138,8 @@ public class CacheRegisterTable implements Lifecycle {
return new TreeSet<>(concurrentMap.values());
}
private static String getKey(final String groupName, final String namespaceId) {
return groupName + StrUtil.AT + namespaceId;
private static Pair<String, String> getKey(final String groupName, final String namespaceId) {
return Pair.of(groupName, namespaceId);
}
/**
@ -162,7 +163,7 @@ public class CacheRegisterTable implements Lifecycle {
serverNode.getHostId());
// 不存在则初始化
if (Objects.isNull(registerNodeInfo)) {
EasyRetryLog.LOCAL.warn("node not exists. groupName:[{}] hostId:[{}]", serverNode.getGroupName(),
EasyRetryLog.LOCAL.warn("node not exists. groupName:[{}] hostId:[{}]", serverNode.getGroupName(),
serverNode.getHostId());
} else {
// 存在则刷新过期时间
@ -180,7 +181,7 @@ public class CacheRegisterTable implements Lifecycle {
getKey(serverNode.getGroupName(), serverNode.getNamespaceId()));
RegisterNodeInfo registerNodeInfo;
if (Objects.isNull(concurrentMap)) {
EasyRetryLog.LOCAL.info("Add cache. groupName:[{}] namespaceId:[{}] hostId:[{}]", serverNode.getGroupName(),
EasyRetryLog.LOCAL.info("Add cache. groupName:[{}] namespaceId:[{}] hostId:[{}]", serverNode.getGroupName(),
serverNode.getNamespaceId(), serverNode.getHostId());
concurrentMap = new ConcurrentHashMap<>();
registerNodeInfo = RegisterNodeInfoConverter.INSTANCE.toRegisterNodeInfo(serverNode);
@ -202,6 +203,7 @@ public class CacheRegisterTable implements Lifecycle {
/**
* 删除过期的节点信息
*
* @param concurrentMap 并发映射的节点信息
*/
private static void delExpireNode(final ConcurrentMap<String, RegisterNodeInfo> concurrentMap) {
@ -225,13 +227,13 @@ public class CacheRegisterTable implements Lifecycle {
return;
}
EasyRetryLog.LOCAL.info("Remove cache. groupName:[{}] hostId:[{}]", groupName, hostId);
EasyRetryLog.LOCAL.info("Remove cache. groupName:[{}] hostId:[{}]", groupName, hostId);
concurrentMap.remove(hostId);
}
@Override
public void start() {
EasyRetryLog.LOCAL.info("CacheRegisterTable start");
EasyRetryLog.LOCAL.info("CacheRegisterTable start");
CACHE = CacheBuilder.newBuilder()
// 设置并发级别为cpu核心数
.concurrencyLevel(Runtime.getRuntime().availableProcessors())
@ -243,7 +245,7 @@ public class CacheRegisterTable implements Lifecycle {
@Override
public void close() {
EasyRetryLog.LOCAL.info("CacheRegisterTable stop");
EasyRetryLog.LOCAL.info("CacheRegisterTable stop");
CACHE.invalidateAll();
}
}

View File

@ -1,18 +1,18 @@
package com.aizuda.easy.retry.server.common.cache;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.triple.Pair;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
@ -20,26 +20,31 @@ import java.util.concurrent.TimeUnit;
* @date 2024-03-29 23:15:26
* @since 3.2.0
*/
@Component
public class CacheToken implements Lifecycle {
private static Cache<Pair<String/*groupName*/, String/*namespaceId*/> , String/*Token*/> CACHE;
private static Cache<Pair<String/*groupName*/, String/*namespaceId*/>, String/*Token*/> CACHE;
public static void add(String groupName, String namespaceId, String token) {
CACHE.put(Pair.of(groupName, namespaceId), token);
}
public static String get(String groupName, String namespaceId) throws ExecutionException {
return CACHE.get(Pair.of(groupName, namespaceId), () -> {
AccessTemplate template = SpringContext.getBean(AccessTemplate.class);
GroupConfig config = template.getGroupConfigAccess().getGroupConfigByGroupName(groupName, namespaceId);
if (Objects.isNull(config)) {
return SystemConstants.DEFAULT_TOKEN;
}
public static String get(String groupName, String namespaceId) {
String token = config.getToken();
add(groupName, namespaceId, token);
return token;
});
String token = CACHE.getIfPresent(Pair.of(groupName, namespaceId));
if (StrUtil.isBlank(token)) {
// 从DB获取数据
AccessTemplate template = SpringContext.getBean(AccessTemplate.class);
GroupConfig config = template.getGroupConfigAccess().getGroupConfigByGroupName(groupName, namespaceId);
if (Objects.isNull(config)) {
return SystemConstants.DEFAULT_TOKEN;
}
token = config.getToken();
add(groupName, namespaceId, token);
}
return token;
}
@Override

View File

@ -27,7 +27,6 @@ public class RequestBuilder<T, R> {
private int routeKey;
private String allocKey;
private Integer executorTimeout;
private String namespaceId;
public static <T, R> RequestBuilder<T, R> newBuilder() {
return new RequestBuilder<>();
@ -88,18 +87,13 @@ public class RequestBuilder<T, R> {
return this;
}
public RequestBuilder<T, R> namespaceId(String namespaceId) {
this.namespaceId = namespaceId;
return this;
}
public T build() {
if (Objects.isNull(clintInterface)) {
throw new EasyRetryServerException("clintInterface cannot be null");
}
Assert.notNull(nodeInfo, () -> new EasyRetryServerException("nodeInfo cannot be null"));
Assert.notBlank(namespaceId, () -> new EasyRetryServerException("namespaceId cannot be null"));
Assert.notBlank(nodeInfo.getNamespaceId(), () -> new EasyRetryServerException("namespaceId cannot be null"));
if (failover) {
Assert.isTrue(routeKey > 0, () -> new EasyRetryServerException("routeKey cannot be null"));
@ -113,7 +107,7 @@ public class RequestBuilder<T, R> {
RpcClientInvokeHandler clientInvokeHandler = new RpcClientInvokeHandler(
nodeInfo.getGroupName(), nodeInfo, failRetry, retryTimes, retryInterval,
retryListener, routeKey, allocKey, failover, executorTimeout, namespaceId);
retryListener, routeKey, allocKey, failover, executorTimeout, nodeInfo.getNamespaceId());
return (T) Proxy.newProxyInstance(clintInterface.getClassLoader(),
new Class[]{clintInterface}, clientInvokeHandler);

View File

@ -9,6 +9,7 @@ import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.core.util.NetUtil;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.cache.CacheToken;
import com.aizuda.easy.retry.server.common.client.annotation.Body;
import com.aizuda.easy.retry.server.common.client.annotation.Header;
import com.aizuda.easy.retry.server.common.client.annotation.Mapping;
@ -142,6 +143,9 @@ public class RpcClientInvokeHandler implements InvocationHandler {
requestHeaders.set(RequestInterceptor.TIMEOUT_TIME, String.valueOf(executorTimeout));
}
// 统一设置Token
requestHeaders.set(SystemConstants.EASY_RETRY_AUTH_TOKEN, CacheToken.get(groupName, namespaceId));
Result result = retryer.call(() -> {
ResponseEntity<Result> response = restTemplate.exchange(
// 拼接 url?a=1&b=1

View File

@ -17,6 +17,7 @@ public abstract class GetHttpRequestHandler implements HttpRequestHandler {
@Override
public String doHandler(String content, UrlBuilder builder, HttpHeaders headers) {
UrlQuery query = builder.getQuery();
return doHandler(content, query, headers);
}

View File

@ -17,6 +17,7 @@ public abstract class PostHttpRequestHandler implements HttpRequestHandler {
@Override
public String doHandler(String content, UrlBuilder builder, HttpHeaders headers) {
UrlQuery query = builder.getQuery();
return doHandler(content, query, headers);
}

View File

@ -16,7 +16,9 @@ import com.aizuda.easy.retry.server.common.handler.ServerNodeBalance;
import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.web.ServerProperties;
import org.springframework.stereotype.Component;
@ -26,10 +28,12 @@ import java.time.LocalDateTime;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* 服务端注册
@ -96,8 +100,20 @@ public class ServerRegister extends AbstractRegister {
try {
// 同步当前POD消费的组的节点信息
// netty的client只会注册到一个服务端若组分配的和client连接的不是一个POD则会导致当前POD没有其他客户端的注册信息
ConcurrentMap<String /*groupName*/, String/*namespaceId*/> allConsumerGroupName = CacheConsumerGroup.getAllConsumerGroupName();
ConcurrentMap<String /*groupName*/, Set<String>/*namespaceId*/> allConsumerGroupName = CacheConsumerGroup.getAllConsumerGroupName();
if (!CollectionUtils.isEmpty(allConsumerGroupName)) {
Set<String> namespaceIdSets = allConsumerGroupName.values().stream().reduce((a, b) -> {
Set<String> set = Sets.newHashSet();
set.addAll(a);
set.addAll(b);
return set;
}).orElse(Sets.newHashSet());
if (CollectionUtils.isEmpty(namespaceIdSets)) {
return;
}
List<ServerNode> serverNodes = serverNodeMapper.selectList(
new LambdaQueryWrapper<ServerNode>()
.eq(ServerNode::getNodeType, NodeTypeEnum.CLIENT.getType())

View File

@ -0,0 +1,47 @@
package com.aizuda.easy.retry.server.common.triple;
import com.aizuda.easy.retry.common.core.exception.EasyRetryCommonException;
import java.io.Serial;
/**
* @author: xiaowoniu
* @date : 2024-03-30
* @since : 3.2.0
*/
public final class ImmutablePair<L, R> extends Pair<L, R>{
@Serial
private static final long serialVersionUID = 4954918890077093841L;
public final L left;
public final R right;
public static <L, R> ImmutablePair<L, R> of(final L left, final R right) {
return new ImmutablePair<L, R>(left, right);
}
public ImmutablePair(final L left, final R right) {
super();
this.left = left;
this.right = right;
}
@Override
public L getLeft() {
return left;
}
@Override
public R getRight() {
return right;
}
@Override
public R setValue(final R value) {
throw new EasyRetryCommonException("非法操作");
}
}

View File

@ -1,5 +1,7 @@
package com.aizuda.easy.retry.server.common.triple;
import java.io.Serial;
/**
* @author: xiaowoniu
* @date : 2023-11-24 08:56
@ -7,11 +9,12 @@ package com.aizuda.easy.retry.server.common.triple;
*/
public final class ImmutableTriple<L, M, R> extends Triple<L, M, R> {
@Serial
private static final long serialVersionUID = 1L;
public final L left;
public final M middle;
public final R right;
private final L left;
private final M middle;
private final R right;
public static <L, M, R> ImmutableTriple<L, M, R> of(final L left, final M middle, final R right) {
return new ImmutableTriple<>(left, middle, right);

View File

@ -0,0 +1,84 @@
package com.aizuda.easy.retry.server.common.triple;
import cn.hutool.core.builder.CompareToBuilder;
import java.io.Serial;
import java.io.Serializable;
import java.util.Map;
import java.util.Objects;
/**
* @author: xiaowoniu
* @date : 2024-03-30
* @since : 3.2.0
*/
public abstract class Pair<L, R> implements Map.Entry<L, R>, Comparable<Pair<L, R>>, Serializable {
@Serial
private static final long serialVersionUID = 1L;
public static <L, R> Pair<L, R> of(final L left, final R right) {
return new ImmutablePair<>(left, right);
}
public abstract L getLeft();
public abstract R getRight();
@Override
public final L getKey() {
return getLeft();
}
@Override
public R getValue() {
return getRight();
}
@Override
public int compareTo(final Pair<L, R> other) {
return new CompareToBuilder().append(getLeft(), other.getLeft())
.append(getRight(), other.getRight()).toComparison();
}
@Override
public boolean equals(final Object obj) {
if (obj == this) {
return true;
}
if (obj instanceof Map.Entry<?, ?>) {
final Map.Entry<?, ?> other = (Map.Entry<?, ?>) obj;
return Objects.equals(getKey(), other.getKey())
&& Objects.equals(getValue(), other.getValue());
}
return false;
}
/**
* <p>Returns a suitable hash code.
* The hash code follows the definition in {@code Map.Entry}.</p>
*
* @return the hash code
*/
@Override
public int hashCode() {
return (getKey() == null ? 0 : getKey().hashCode()) ^
(getValue() == null ? 0 : getValue().hashCode());
}
/**
* <p>Returns a String representation of this pair using the format {@code ($left,$right)}.</p>
*
* @return a string describing this object, not null
*/
@Override
public String toString() {
return "(" + getLeft() + ',' + getRight() + ')';
}
public String toString(final String format) {
return String.format(format, getLeft(), getRight());
}
}

View File

@ -148,7 +148,6 @@ public class RequestClientActor extends AbstractActor {
boolean retry = realJobExecutorDTO.isRetry();
return RequestBuilder.<JobRpcClient, Result>newBuilder()
.nodeInfo(registerNodeInfo)
.namespaceId(registerNodeInfo.getNamespaceId())
.failRetry(maxRetryTimes > 0 && !retry)
.retryTimes(maxRetryTimes)
.retryInterval(realJobExecutorDTO.getRetryInterval())

View File

@ -53,7 +53,6 @@ public class RealStopTaskActor extends AbstractActor {
private Result<Boolean> requestClient(RealStopTaskInstanceDTO realStopTaskInstanceDTO, RegisterNodeInfo registerNodeInfo) {
JobRpcClient rpcClient = RequestBuilder.<JobRpcClient, Result>newBuilder()
.nodeInfo(registerNodeInfo)
.namespaceId(registerNodeInfo.getNamespaceId())
.failRetry(Boolean.TRUE)
.retryTimes(3)
.retryInterval(1)

View File

@ -12,6 +12,7 @@ import com.aizuda.easy.retry.server.common.client.RequestMethod;
import com.aizuda.easy.retry.server.common.client.annotation.Body;
import com.aizuda.easy.retry.server.common.client.annotation.Header;
import com.aizuda.easy.retry.server.common.client.annotation.Mapping;
import com.aizuda.easy.retry.server.model.dto.ConfigDTO;
/**
* 调用客户端接口
@ -31,4 +32,7 @@ public interface RetryRpcClient {
@Mapping(path = "/retry/generate/idempotent-id/v1", method = RequestMethod.POST)
Result generateIdempotentId(@Body GenerateRetryIdempotentIdDTO retryCallbackDTO);
@Mapping(path = "/retry/sync/version/v1")
Result syncConfig(@Body ConfigDTO configDTO);
}

View File

@ -136,7 +136,6 @@ public class ExecCallbackUnitActor extends AbstractActor {
RetryRpcClient rpcClient = RequestBuilder.<RetryRpcClient, Result>newBuilder()
.nodeInfo(serverNode)
.namespaceId(serverNode.getNamespaceId())
.failover(Boolean.TRUE)
.routeKey(sceneConfig.getRouteKey())
.allocKey(sceneConfig.getSceneName())

View File

@ -139,7 +139,6 @@ public class ExecUnitActor extends AbstractActor {
RetryRpcClient rpcClient = RequestBuilder.<RetryRpcClient, Result>newBuilder()
.nodeInfo(serverNode)
.namespaceId(serverNode.getNamespaceId())
.failover(Boolean.TRUE)
.allocKey(retryTask.getSceneName())
.routeKey(sceneConfig.getRouteKey())

View File

@ -1,21 +1,18 @@
package com.aizuda.easy.retry.server.retry.task.support.handler;
import cn.hutool.core.lang.Pair;
import com.aizuda.easy.retry.common.core.util.NetUtil;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.client.RequestBuilder;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.model.dto.ConfigDTO;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.retry.task.client.RetryRpcClient;
import com.aizuda.easy.retry.server.retry.task.dto.ConfigSyncTask;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.text.MessageFormat;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
@ -29,16 +26,11 @@ import java.util.concurrent.TimeUnit;
* @since 1.6.0
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class ConfigVersionSyncHandler implements Lifecycle, Runnable {
private static final LinkedBlockingQueue<ConfigSyncTask> QUEUE = new LinkedBlockingQueue<>(256);
public Thread THREAD = null;
@Autowired
private RestTemplate restTemplate;
@Autowired
protected AccessTemplate accessTemplate;
private static final String SYNC_VERSION_V1 = "/retry/sync/version/v1";
protected final AccessTemplate accessTemplate;
/**
* 添加任务
@ -59,8 +51,8 @@ public class ConfigVersionSyncHandler implements Lifecycle, Runnable {
/**
* 同步版本
*
* @param groupName
* @param namespaceId
* @param groupName
* @param namespaceId 空间id
*/
public void syncVersion(String groupName, final String namespaceId) {
@ -69,11 +61,11 @@ public class ConfigVersionSyncHandler implements Lifecycle, Runnable {
// 同步版本到每个客户端节点
for (final RegisterNodeInfo registerNodeInfo : serverNodeSet) {
ConfigDTO configDTO = accessTemplate.getGroupConfigAccess().getConfigInfo(groupName, namespaceId);
String url = NetUtil.getUrl(registerNodeInfo.getHostIp(), registerNodeInfo.getHostPort(),
registerNodeInfo.getContextPath());
Result result = restTemplate.postForObject(url.concat(SYNC_VERSION_V1), configDTO, Result.class);
EasyRetryLog.LOCAL.info("同步结果 [{}]", result);
RetryRpcClient rpcClient = RequestBuilder.<RetryRpcClient, Result>newBuilder()
.nodeInfo(registerNodeInfo)
.client(RetryRpcClient.class)
.build();
EasyRetryLog.LOCAL.info("同步结果 [{}]", rpcClient.syncConfig(configDTO));
}
} catch (Exception e) {
EasyRetryLog.LOCAL.error("version sync error. groupName:[{}]", groupName, e);
@ -111,8 +103,7 @@ public class ConfigVersionSyncHandler implements Lifecycle, Runnable {
try {
// 防止刷的过快休眠1s
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (InterruptedException ignored) {
}
}
}

View File

@ -2,9 +2,13 @@ package com.aizuda.easy.retry.server.retry.task.support.retry;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Pair;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.retry.task.dto.RetryLogMetaDTO;
import com.aizuda.easy.retry.server.retry.task.support.FilterStrategy;
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter;
import com.aizuda.easy.retry.server.retry.task.support.StopStrategy;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import lombok.extern.slf4j.Slf4j;
@ -63,13 +67,12 @@ public class RetryExecutor<V> {
call = callable.call();
retryContext.setCallResult(call);
} catch (Exception e) {
log.error("客户端执行失败: [{}]", retryContext.getRetryTask());
RetryLogMetaDTO retryLogMetaDTO = RetryTaskConverter.INSTANCE.toLogMetaDTO(retryContext.getRetryTask());
retryLogMetaDTO.setTimestamp(DateUtils.toNowMilli());
EasyRetryLog.REMOTE.error("请求客户端执行失败. uniqueId:[{}] <|>{}<|>", retryContext.getRetryTask().getUniqueId(), e);
retryContext.setException(e);
}
// 计算下次触发时间
// retryContext.getRetryTask().setNextTriggerAt();
boolean isStop = Boolean.TRUE;
// 触发停止策略判断

View File

@ -11,6 +11,7 @@ import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.HttpRequestHandler;
import com.aizuda.easy.retry.server.common.Register;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheToken;
import com.aizuda.easy.retry.server.common.dto.NettyHttpRequest;
import com.aizuda.easy.retry.server.common.register.ClientRegister;
import com.aizuda.easy.retry.server.common.register.RegisterContext;
@ -88,6 +89,12 @@ public class RequestHandlerActor extends AbstractActor {
String groupName = headers.get(HeadersEnum.GROUP_NAME.getKey());
String contextPath = headers.get(HeadersEnum.CONTEXT_PATH.getKey());
String namespace = headers.get(HeadersEnum.NAMESPACE.getKey());
String token = headers.get(HeadersEnum.TOKEN.getKey());
if (!CacheToken.get(groupName, namespace).equals(token)) {
EasyRetryLog.LOCAL.error("Token authentication failed. [{}]", token);
return JsonUtil.toJsonString(new Result<>(0, "Token authentication failed"));
}
// 注册版本
RegisterContext registerContext = new RegisterContext();
@ -103,6 +110,8 @@ public class RequestHandlerActor extends AbstractActor {
EasyRetryLog.LOCAL.warn("client register error. groupName:[{}]", groupName);
}
UrlBuilder builder = UrlBuilder.ofHttp(uri);
Collection<HttpRequestHandler> httpRequestHandlers = SpringContext.getContext()
.getBeansOfType(HttpRequestHandler.class).values();

View File

@ -249,7 +249,6 @@ public class RetryTaskServiceImpl implements RetryTaskService {
RetryRpcClient rpcClient = RequestBuilder.<RetryRpcClient, Result>newBuilder()
.nodeInfo(serverNode)
.namespaceId(serverNode.getNamespaceId())
.client(RetryRpcClient.class)
.build();