diff --git a/easy-retry-client/easy-retry-client-common/pom.xml b/easy-retry-client/easy-retry-client-common/pom.xml index 17db16bd..836751db 100644 --- a/easy-retry-client/easy-retry-client-common/pom.xml +++ b/easy-retry-client/easy-retry-client-common/pom.xml @@ -29,6 +29,11 @@ spring-boot-starter-web provided + + org.springframework.boot + spring-boot-starter-aop + provided + org.projectlombok lombok diff --git a/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/annotation/Authentication.java b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/annotation/Authentication.java new file mode 100644 index 00000000..b489efd7 --- /dev/null +++ b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/annotation/Authentication.java @@ -0,0 +1,11 @@ +package com.aizuda.easy.retry.client.common.annotation; + +/** + * Easy Retry Client 认证 + * + * @author: xiaownouniu + * @date : 2024-03-30 + * @since : 3.2.0 + */ +public @interface Authentication { +} diff --git a/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/intercepter/AuthenticationInterceptor.java b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/intercepter/AuthenticationInterceptor.java new file mode 100644 index 00000000..5ba4f712 --- /dev/null +++ b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/intercepter/AuthenticationInterceptor.java @@ -0,0 +1,37 @@ +package com.aizuda.easy.retry.client.common.intercepter; + +import com.aizuda.easy.retry.client.common.config.EasyRetryProperties; +import com.aizuda.easy.retry.client.common.exception.EasyRetryClientException; +import com.aizuda.easy.retry.common.core.constant.SystemConstants; +import lombok.RequiredArgsConstructor; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.Before; +import org.springframework.stereotype.Component; +import org.springframework.web.context.request.RequestContextHolder; +import org.springframework.web.context.request.ServletRequestAttributes; + +import java.util.Optional; + +/** + * Easy Retry 认证拦截器 + * + * @author: xiaowoniu + * @date : 2022-04-18 09:19 + * @since 3.2.0 + */ +@Aspect +@Component +@RequiredArgsConstructor +public class AuthenticationInterceptor { + private final EasyRetryProperties easyRetryProperties; + @Before(value = "@annotation(com.aizuda.easy.retry.client.common.annotation.Authentication) || @within(com.aizuda.easy.retry.client.common.annotation.Authentication)") + public void easyRetryAuth() { + ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); + String easyRetryAuth = attributes.getRequest().getHeader(SystemConstants.EASY_RETRY_AUTH_TOKEN); + String configToken = Optional.ofNullable(easyRetryProperties.getToken()).orElse(SystemConstants.DEFAULT_TOKEN); + if (!configToken.equals(easyRetryAuth)) { + throw new EasyRetryClientException("认证失败.【请检查配置的Token是否正确】"); + } + } + +} diff --git a/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/netty/NettyChannel.java b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/netty/NettyChannel.java index 002c5a3c..a9e36663 100644 --- a/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/netty/NettyChannel.java +++ b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/netty/NettyChannel.java @@ -182,7 +182,7 @@ public class NettyChannel { .set(HeadersEnum.HOST.getKey(), serverConfig.getHost()) .set(HeadersEnum.NAMESPACE.getKey(), Optional.ofNullable(easyRetryProperties.getNamespace()).orElse( SystemConstants.DEFAULT_NAMESPACE)) - .set(HeadersEnum.TOKEN.getKey(), Optional.ofNullable(easyRetryProperties.getNamespace()).orElse( + .set(HeadersEnum.TOKEN.getKey(), Optional.ofNullable(easyRetryProperties.getToken()).orElse( SystemConstants.DEFAULT_TOKEN)) ; diff --git a/easy-retry-client/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/client/RetryEndPoint.java b/easy-retry-client/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/client/RetryEndPoint.java index 63cf681c..0c5c4a95 100644 --- a/easy-retry-client/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/client/RetryEndPoint.java +++ b/easy-retry-client/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/client/RetryEndPoint.java @@ -1,6 +1,7 @@ package com.aizuda.easy.retry.client.core.client; import cn.hutool.core.lang.Assert; +import com.aizuda.easy.retry.client.common.annotation.Authentication; import com.aizuda.easy.retry.client.common.log.support.EasyRetryLogManager; import com.aizuda.easy.retry.client.core.IdempotentIdGenerate; import com.aizuda.easy.retry.client.core.RetryArgSerializer; @@ -62,6 +63,7 @@ public class RetryEndPoint { * 服务端调度重试入口 */ @PostMapping("/dispatch/v1") + @Authentication public Result dispatch(@RequestBody @Validated DispatchRetryDTO executeReqDto) { RetryerInfo retryerInfo = RetryerInfoCache.get(executeReqDto.getScene(), executeReqDto.getExecutorName()); @@ -135,12 +137,14 @@ public class RetryEndPoint { * 同步版本 */ @PostMapping("/sync/version/v1") + @Authentication public Result syncVersion(@RequestBody ConfigDTO configDTO) { GroupVersionCache.configDTO = configDTO; return new Result(); } @PostMapping("/callback/v1") + @Authentication public Result callback(@RequestBody @Validated RetryCallbackDTO callbackDTO) { RetryerInfo retryerInfo = null; @@ -252,6 +256,7 @@ public class RetryEndPoint { * @return idempotentId */ @PostMapping("/generate/idempotent-id/v1") + @Authentication public Result idempotentIdGenerate( @RequestBody @Validated GenerateRetryIdempotentIdDTO generateRetryIdempotentIdDTO) { diff --git a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java index 24db79f4..b203af83 100644 --- a/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java +++ b/easy-retry-client/easy-retry-client-job-core/src/main/java/com/aizuda/easy/retry/client/job/core/client/JobEndPoint.java @@ -1,5 +1,6 @@ package com.aizuda.easy.retry.client.job.core.client; +import com.aizuda.easy.retry.client.common.annotation.Authentication; import com.aizuda.easy.retry.client.common.log.support.EasyRetryLogManager; import com.aizuda.easy.retry.client.job.core.IJobExecutor; import com.aizuda.easy.retry.client.job.core.cache.JobExecutorInfoCache; @@ -35,6 +36,7 @@ import java.util.concurrent.ThreadPoolExecutor; public class JobEndPoint { @PostMapping("/dispatch/v1") + @Authentication public Result dispatchJob(@RequestBody @Validated DispatchJobRequest dispatchJob) { try { @@ -108,6 +110,7 @@ public class JobEndPoint { } @PostMapping("/stop/v1") + @Authentication public Result stopJob(@RequestBody @Validated StopJobDTO interruptJob) { ThreadPoolExecutor threadPool = ThreadPoolCache.getThreadPool(interruptJob.getTaskBatchId()); if (Objects.isNull(threadPool) || threadPool.isShutdown() || threadPool.isTerminated()) { diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java index e7bb4aeb..755f1584 100644 --- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java @@ -133,4 +133,9 @@ public interface SystemConstants { * 仅表示定时任务类型为工作流 */ Integer WORKFLOW_TRIGGER_TYPE = 99; + + /** + * Easy Retry 认证Token + */ + String EASY_RETRY_AUTH_TOKEN= "ER-TOKEN"; } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheConsumerGroup.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheConsumerGroup.java index 1b639016..b195a8c8 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheConsumerGroup.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheConsumerGroup.java @@ -7,6 +7,9 @@ import com.google.common.cache.CacheBuilder; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -21,14 +24,14 @@ import java.util.concurrent.TimeUnit; @Slf4j public class CacheConsumerGroup implements Lifecycle { - private static Cache CACHE; + private static Cache/*namespaceId*/> CACHE; /** * 获取所有缓存 * * @return 缓存对象 */ - public static ConcurrentMap getAllConsumerGroupName() { + public static ConcurrentMap> getAllConsumerGroupName() { return CACHE.asMap(); } @@ -39,7 +42,9 @@ public class CacheConsumerGroup implements Lifecycle { * @return 缓存对象 */ public static synchronized void addOrUpdate(String groupName, String namespaceId) { - CACHE.put(groupName, namespaceId); + Set namespaceIds = Optional.ofNullable(CACHE.getIfPresent(groupName)).orElseGet(HashSet::new); + namespaceIds.add(namespaceId); + CACHE.put(groupName, namespaceIds); } public static void remove(String groupName) { diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheRegisterTable.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheRegisterTable.java index 72f2aa90..3b6c1dff 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheRegisterTable.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheRegisterTable.java @@ -8,6 +8,7 @@ import com.aizuda.easy.retry.server.common.Lifecycle; import com.aizuda.easy.retry.server.common.RegisterNodeInfoConverter; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.common.register.ServerRegister; +import com.aizuda.easy.retry.server.common.triple.Pair; import com.aizuda.easy.retry.template.datasource.persistence.mapper.ServerNodeMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; @@ -44,7 +45,7 @@ import java.util.stream.Collectors; @Slf4j public class CacheRegisterTable implements Lifecycle { - private static Cache> CACHE; + private static Cache, ConcurrentMap> CACHE; /** * 获取所有缓存 @@ -52,7 +53,7 @@ public class CacheRegisterTable implements Lifecycle { * @return 缓存对象 */ public static Set getAllPods() { - ConcurrentMap> concurrentMap = CACHE.asMap(); + ConcurrentMap, ConcurrentMap> concurrentMap = CACHE.asMap(); if (CollectionUtils.isEmpty(concurrentMap)) { return Sets.newHashSet(); } @@ -137,8 +138,8 @@ public class CacheRegisterTable implements Lifecycle { return new TreeSet<>(concurrentMap.values()); } - private static String getKey(final String groupName, final String namespaceId) { - return groupName + StrUtil.AT + namespaceId; + private static Pair getKey(final String groupName, final String namespaceId) { + return Pair.of(groupName, namespaceId); } /** @@ -162,7 +163,7 @@ public class CacheRegisterTable implements Lifecycle { serverNode.getHostId()); // 不存在则初始化 if (Objects.isNull(registerNodeInfo)) { - EasyRetryLog.LOCAL.warn("node not exists. groupName:[{}] hostId:[{}]", serverNode.getGroupName(), + EasyRetryLog.LOCAL.warn("node not exists. groupName:[{}] hostId:[{}]", serverNode.getGroupName(), serverNode.getHostId()); } else { // 存在则刷新过期时间 @@ -180,7 +181,7 @@ public class CacheRegisterTable implements Lifecycle { getKey(serverNode.getGroupName(), serverNode.getNamespaceId())); RegisterNodeInfo registerNodeInfo; if (Objects.isNull(concurrentMap)) { - EasyRetryLog.LOCAL.info("Add cache. groupName:[{}] namespaceId:[{}] hostId:[{}]", serverNode.getGroupName(), + EasyRetryLog.LOCAL.info("Add cache. groupName:[{}] namespaceId:[{}] hostId:[{}]", serverNode.getGroupName(), serverNode.getNamespaceId(), serverNode.getHostId()); concurrentMap = new ConcurrentHashMap<>(); registerNodeInfo = RegisterNodeInfoConverter.INSTANCE.toRegisterNodeInfo(serverNode); @@ -202,6 +203,7 @@ public class CacheRegisterTable implements Lifecycle { /** * 删除过期的节点信息 + * * @param concurrentMap 并发映射的节点信息 */ private static void delExpireNode(final ConcurrentMap concurrentMap) { @@ -225,13 +227,13 @@ public class CacheRegisterTable implements Lifecycle { return; } - EasyRetryLog.LOCAL.info("Remove cache. groupName:[{}] hostId:[{}]", groupName, hostId); + EasyRetryLog.LOCAL.info("Remove cache. groupName:[{}] hostId:[{}]", groupName, hostId); concurrentMap.remove(hostId); } @Override public void start() { - EasyRetryLog.LOCAL.info("CacheRegisterTable start"); + EasyRetryLog.LOCAL.info("CacheRegisterTable start"); CACHE = CacheBuilder.newBuilder() // 设置并发级别为cpu核心数 .concurrencyLevel(Runtime.getRuntime().availableProcessors()) @@ -243,7 +245,7 @@ public class CacheRegisterTable implements Lifecycle { @Override public void close() { - EasyRetryLog.LOCAL.info("CacheRegisterTable stop"); + EasyRetryLog.LOCAL.info("CacheRegisterTable stop"); CACHE.invalidateAll(); } } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheToken.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheToken.java index 011171a3..90f4957f 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheToken.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheToken.java @@ -1,18 +1,18 @@ package com.aizuda.easy.retry.server.common.cache; -import cn.hutool.core.lang.Pair; +import cn.hutool.core.util.StrUtil; import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.context.SpringContext; import com.aizuda.easy.retry.common.log.EasyRetryLog; import com.aizuda.easy.retry.server.common.Lifecycle; +import com.aizuda.easy.retry.server.common.triple.Pair; import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import org.springframework.stereotype.Component; import java.util.Objects; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; /** @@ -20,26 +20,31 @@ import java.util.concurrent.TimeUnit; * @date 2024-03-29 23:15:26 * @since 3.2.0 */ +@Component public class CacheToken implements Lifecycle { - private static Cache , String/*Token*/> CACHE; + private static Cache, String/*Token*/> CACHE; public static void add(String groupName, String namespaceId, String token) { CACHE.put(Pair.of(groupName, namespaceId), token); } - public static String get(String groupName, String namespaceId) throws ExecutionException { - return CACHE.get(Pair.of(groupName, namespaceId), () -> { - AccessTemplate template = SpringContext.getBean(AccessTemplate.class); - GroupConfig config = template.getGroupConfigAccess().getGroupConfigByGroupName(groupName, namespaceId); - if (Objects.isNull(config)) { - return SystemConstants.DEFAULT_TOKEN; - } + public static String get(String groupName, String namespaceId) { - String token = config.getToken(); - add(groupName, namespaceId, token); - return token; - }); + String token = CACHE.getIfPresent(Pair.of(groupName, namespaceId)); + if (StrUtil.isBlank(token)) { + // 从DB获取数据 + AccessTemplate template = SpringContext.getBean(AccessTemplate.class); + GroupConfig config = template.getGroupConfigAccess().getGroupConfigByGroupName(groupName, namespaceId); + if (Objects.isNull(config)) { + return SystemConstants.DEFAULT_TOKEN; + } + + token = config.getToken(); + add(groupName, namespaceId, token); + } + + return token; } @Override diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RequestBuilder.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RequestBuilder.java index 3837b99c..b397b934 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RequestBuilder.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RequestBuilder.java @@ -27,7 +27,6 @@ public class RequestBuilder { private int routeKey; private String allocKey; private Integer executorTimeout; - private String namespaceId; public static RequestBuilder newBuilder() { return new RequestBuilder<>(); @@ -88,18 +87,13 @@ public class RequestBuilder { return this; } - public RequestBuilder namespaceId(String namespaceId) { - this.namespaceId = namespaceId; - return this; - } - public T build() { if (Objects.isNull(clintInterface)) { throw new EasyRetryServerException("clintInterface cannot be null"); } Assert.notNull(nodeInfo, () -> new EasyRetryServerException("nodeInfo cannot be null")); - Assert.notBlank(namespaceId, () -> new EasyRetryServerException("namespaceId cannot be null")); + Assert.notBlank(nodeInfo.getNamespaceId(), () -> new EasyRetryServerException("namespaceId cannot be null")); if (failover) { Assert.isTrue(routeKey > 0, () -> new EasyRetryServerException("routeKey cannot be null")); @@ -113,7 +107,7 @@ public class RequestBuilder { RpcClientInvokeHandler clientInvokeHandler = new RpcClientInvokeHandler( nodeInfo.getGroupName(), nodeInfo, failRetry, retryTimes, retryInterval, - retryListener, routeKey, allocKey, failover, executorTimeout, namespaceId); + retryListener, routeKey, allocKey, failover, executorTimeout, nodeInfo.getNamespaceId()); return (T) Proxy.newProxyInstance(clintInterface.getClassLoader(), new Class[]{clintInterface}, clientInvokeHandler); diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RpcClientInvokeHandler.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RpcClientInvokeHandler.java index f4581a8e..b37959f6 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RpcClientInvokeHandler.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RpcClientInvokeHandler.java @@ -9,6 +9,7 @@ import com.aizuda.easy.retry.common.core.model.Result; import com.aizuda.easy.retry.common.core.util.NetUtil; import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; +import com.aizuda.easy.retry.server.common.cache.CacheToken; import com.aizuda.easy.retry.server.common.client.annotation.Body; import com.aizuda.easy.retry.server.common.client.annotation.Header; import com.aizuda.easy.retry.server.common.client.annotation.Mapping; @@ -142,6 +143,9 @@ public class RpcClientInvokeHandler implements InvocationHandler { requestHeaders.set(RequestInterceptor.TIMEOUT_TIME, String.valueOf(executorTimeout)); } + // 统一设置Token + requestHeaders.set(SystemConstants.EASY_RETRY_AUTH_TOKEN, CacheToken.get(groupName, namespaceId)); + Result result = retryer.call(() -> { ResponseEntity response = restTemplate.exchange( // 拼接 url?a=1&b=1 diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/GetHttpRequestHandler.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/GetHttpRequestHandler.java index b0bcc12b..fb6bf6c9 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/GetHttpRequestHandler.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/GetHttpRequestHandler.java @@ -17,6 +17,7 @@ public abstract class GetHttpRequestHandler implements HttpRequestHandler { @Override public String doHandler(String content, UrlBuilder builder, HttpHeaders headers) { UrlQuery query = builder.getQuery(); + return doHandler(content, query, headers); } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/PostHttpRequestHandler.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/PostHttpRequestHandler.java index 4cbdce3c..84fa9e75 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/PostHttpRequestHandler.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/PostHttpRequestHandler.java @@ -17,6 +17,7 @@ public abstract class PostHttpRequestHandler implements HttpRequestHandler { @Override public String doHandler(String content, UrlBuilder builder, HttpHeaders headers) { UrlQuery query = builder.getQuery(); + return doHandler(content, query, headers); } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ServerRegister.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ServerRegister.java index 3f189878..29537443 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ServerRegister.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ServerRegister.java @@ -16,7 +16,9 @@ import com.aizuda.easy.retry.server.common.handler.ServerNodeBalance; import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; +import org.checkerframework.checker.nullness.qual.Nullable; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.web.ServerProperties; import org.springframework.stereotype.Component; @@ -26,10 +28,12 @@ import java.time.LocalDateTime; import java.util.HashSet; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * 服务端注册 @@ -96,8 +100,20 @@ public class ServerRegister extends AbstractRegister { try { // 同步当前POD消费的组的节点信息 // netty的client只会注册到一个服务端,若组分配的和client连接的不是一个POD则会导致当前POD没有其他客户端的注册信息 - ConcurrentMap allConsumerGroupName = CacheConsumerGroup.getAllConsumerGroupName(); + ConcurrentMap/*namespaceId*/> allConsumerGroupName = CacheConsumerGroup.getAllConsumerGroupName(); if (!CollectionUtils.isEmpty(allConsumerGroupName)) { + + Set namespaceIdSets = allConsumerGroupName.values().stream().reduce((a, b) -> { + Set set = Sets.newHashSet(); + set.addAll(a); + set.addAll(b); + return set; + }).orElse(Sets.newHashSet()); + + if (CollectionUtils.isEmpty(namespaceIdSets)) { + return; + } + List serverNodes = serverNodeMapper.selectList( new LambdaQueryWrapper() .eq(ServerNode::getNodeType, NodeTypeEnum.CLIENT.getType()) diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/triple/ImmutablePair.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/triple/ImmutablePair.java new file mode 100644 index 00000000..ec576f6f --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/triple/ImmutablePair.java @@ -0,0 +1,47 @@ +package com.aizuda.easy.retry.server.common.triple; + +import com.aizuda.easy.retry.common.core.exception.EasyRetryCommonException; + +import java.io.Serial; + +/** + * @author: xiaowoniu + * @date : 2024-03-30 + * @since : 3.2.0 + */ +public final class ImmutablePair extends Pair{ + + @Serial + private static final long serialVersionUID = 4954918890077093841L; + + public final L left; + public final R right; + + + public static ImmutablePair of(final L left, final R right) { + return new ImmutablePair(left, right); + } + + + public ImmutablePair(final L left, final R right) { + super(); + this.left = left; + this.right = right; + } + + @Override + public L getLeft() { + return left; + } + + + @Override + public R getRight() { + return right; + } + @Override + public R setValue(final R value) { + throw new EasyRetryCommonException("非法操作"); + + } +} diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/triple/ImmutableTriple.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/triple/ImmutableTriple.java index 8e82b4cd..d1634404 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/triple/ImmutableTriple.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/triple/ImmutableTriple.java @@ -1,5 +1,7 @@ package com.aizuda.easy.retry.server.common.triple; +import java.io.Serial; + /** * @author: xiaowoniu * @date : 2023-11-24 08:56 @@ -7,11 +9,12 @@ package com.aizuda.easy.retry.server.common.triple; */ public final class ImmutableTriple extends Triple { + @Serial private static final long serialVersionUID = 1L; - public final L left; - public final M middle; - public final R right; + private final L left; + private final M middle; + private final R right; public static ImmutableTriple of(final L left, final M middle, final R right) { return new ImmutableTriple<>(left, middle, right); diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/triple/Pair.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/triple/Pair.java new file mode 100644 index 00000000..34cfbdd3 --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/triple/Pair.java @@ -0,0 +1,84 @@ +package com.aizuda.easy.retry.server.common.triple; + +import cn.hutool.core.builder.CompareToBuilder; + +import java.io.Serial; +import java.io.Serializable; +import java.util.Map; +import java.util.Objects; + +/** + * @author: xiaowoniu + * @date : 2024-03-30 + * @since : 3.2.0 + */ +public abstract class Pair implements Map.Entry, Comparable>, Serializable { + @Serial + private static final long serialVersionUID = 1L; + + public static Pair of(final L left, final R right) { + return new ImmutablePair<>(left, right); + } + + public abstract L getLeft(); + + + public abstract R getRight(); + + + @Override + public final L getKey() { + return getLeft(); + } + + + @Override + public R getValue() { + return getRight(); + } + + @Override + public int compareTo(final Pair other) { + return new CompareToBuilder().append(getLeft(), other.getLeft()) + .append(getRight(), other.getRight()).toComparison(); + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj instanceof Map.Entry) { + final Map.Entry other = (Map.Entry) obj; + return Objects.equals(getKey(), other.getKey()) + && Objects.equals(getValue(), other.getValue()); + } + return false; + } + + /** + *

Returns a suitable hash code. + * The hash code follows the definition in {@code Map.Entry}.

+ * + * @return the hash code + */ + @Override + public int hashCode() { + return (getKey() == null ? 0 : getKey().hashCode()) ^ + (getValue() == null ? 0 : getValue().hashCode()); + } + + /** + *

Returns a String representation of this pair using the format {@code ($left,$right)}.

+ * + * @return a string describing this object, not null + */ + @Override + public String toString() { + return "(" + getLeft() + ',' + getRight() + ')'; + } + + public String toString(final String format) { + return String.format(format, getLeft(), getRight()); + } +} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/job/RequestClientActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/job/RequestClientActor.java index cf530c73..fb0d30cc 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/job/RequestClientActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/job/RequestClientActor.java @@ -148,7 +148,6 @@ public class RequestClientActor extends AbstractActor { boolean retry = realJobExecutorDTO.isRetry(); return RequestBuilder.newBuilder() .nodeInfo(registerNodeInfo) - .namespaceId(registerNodeInfo.getNamespaceId()) .failRetry(maxRetryTimes > 0 && !retry) .retryTimes(maxRetryTimes) .retryInterval(realJobExecutorDTO.getRetryInterval()) diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/stop/RealStopTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/stop/RealStopTaskActor.java index 34ee8519..02a246dc 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/stop/RealStopTaskActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/stop/RealStopTaskActor.java @@ -53,7 +53,6 @@ public class RealStopTaskActor extends AbstractActor { private Result requestClient(RealStopTaskInstanceDTO realStopTaskInstanceDTO, RegisterNodeInfo registerNodeInfo) { JobRpcClient rpcClient = RequestBuilder.newBuilder() .nodeInfo(registerNodeInfo) - .namespaceId(registerNodeInfo.getNamespaceId()) .failRetry(Boolean.TRUE) .retryTimes(3) .retryInterval(1) diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/client/RetryRpcClient.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/client/RetryRpcClient.java index 01ff454b..512de391 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/client/RetryRpcClient.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/client/RetryRpcClient.java @@ -12,6 +12,7 @@ import com.aizuda.easy.retry.server.common.client.RequestMethod; import com.aizuda.easy.retry.server.common.client.annotation.Body; import com.aizuda.easy.retry.server.common.client.annotation.Header; import com.aizuda.easy.retry.server.common.client.annotation.Mapping; +import com.aizuda.easy.retry.server.model.dto.ConfigDTO; /** * 调用客户端接口 @@ -31,4 +32,7 @@ public interface RetryRpcClient { @Mapping(path = "/retry/generate/idempotent-id/v1", method = RequestMethod.POST) Result generateIdempotentId(@Body GenerateRetryIdempotentIdDTO retryCallbackDTO); + @Mapping(path = "/retry/sync/version/v1") + Result syncConfig(@Body ConfigDTO configDTO); + } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecCallbackUnitActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecCallbackUnitActor.java index a7645ee9..f1d6cd9d 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecCallbackUnitActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecCallbackUnitActor.java @@ -136,7 +136,6 @@ public class ExecCallbackUnitActor extends AbstractActor { RetryRpcClient rpcClient = RequestBuilder.newBuilder() .nodeInfo(serverNode) - .namespaceId(serverNode.getNamespaceId()) .failover(Boolean.TRUE) .routeKey(sceneConfig.getRouteKey()) .allocKey(sceneConfig.getSceneName()) diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecUnitActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecUnitActor.java index 4cd01950..85bae2fc 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecUnitActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/exec/ExecUnitActor.java @@ -139,7 +139,6 @@ public class ExecUnitActor extends AbstractActor { RetryRpcClient rpcClient = RequestBuilder.newBuilder() .nodeInfo(serverNode) - .namespaceId(serverNode.getNamespaceId()) .failover(Boolean.TRUE) .allocKey(retryTask.getSceneName()) .routeKey(sceneConfig.getRouteKey()) diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/handler/ConfigVersionSyncHandler.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/handler/ConfigVersionSyncHandler.java index bee36ceb..5ba14d9f 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/handler/ConfigVersionSyncHandler.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/handler/ConfigVersionSyncHandler.java @@ -1,21 +1,18 @@ package com.aizuda.easy.retry.server.retry.task.support.handler; -import cn.hutool.core.lang.Pair; -import com.aizuda.easy.retry.common.core.util.NetUtil; import com.aizuda.easy.retry.common.log.EasyRetryLog; import com.aizuda.easy.retry.common.core.model.Result; import com.aizuda.easy.retry.server.common.Lifecycle; +import com.aizuda.easy.retry.server.common.client.RequestBuilder; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.model.dto.ConfigDTO; import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; +import com.aizuda.easy.retry.server.retry.task.client.RetryRpcClient; import com.aizuda.easy.retry.server.retry.task.dto.ConfigSyncTask; import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; +import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; -import org.springframework.web.client.RestTemplate; -import java.text.MessageFormat; import java.util.Objects; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; @@ -29,16 +26,11 @@ import java.util.concurrent.TimeUnit; * @since 1.6.0 */ @Component -@Slf4j +@RequiredArgsConstructor public class ConfigVersionSyncHandler implements Lifecycle, Runnable { - private static final LinkedBlockingQueue QUEUE = new LinkedBlockingQueue<>(256); public Thread THREAD = null; - @Autowired - private RestTemplate restTemplate; - @Autowired - protected AccessTemplate accessTemplate; - private static final String SYNC_VERSION_V1 = "/retry/sync/version/v1"; + protected final AccessTemplate accessTemplate; /** * 添加任务 @@ -59,8 +51,8 @@ public class ConfigVersionSyncHandler implements Lifecycle, Runnable { /** * 同步版本 * - * @param groupName - * @param namespaceId + * @param groupName 组 + * @param namespaceId 空间id */ public void syncVersion(String groupName, final String namespaceId) { @@ -69,11 +61,11 @@ public class ConfigVersionSyncHandler implements Lifecycle, Runnable { // 同步版本到每个客户端节点 for (final RegisterNodeInfo registerNodeInfo : serverNodeSet) { ConfigDTO configDTO = accessTemplate.getGroupConfigAccess().getConfigInfo(groupName, namespaceId); - - String url = NetUtil.getUrl(registerNodeInfo.getHostIp(), registerNodeInfo.getHostPort(), - registerNodeInfo.getContextPath()); - Result result = restTemplate.postForObject(url.concat(SYNC_VERSION_V1), configDTO, Result.class); - EasyRetryLog.LOCAL.info("同步结果 [{}]", result); + RetryRpcClient rpcClient = RequestBuilder.newBuilder() + .nodeInfo(registerNodeInfo) + .client(RetryRpcClient.class) + .build(); + EasyRetryLog.LOCAL.info("同步结果 [{}]", rpcClient.syncConfig(configDTO)); } } catch (Exception e) { EasyRetryLog.LOCAL.error("version sync error. groupName:[{}]", groupName, e); @@ -111,8 +103,7 @@ public class ConfigVersionSyncHandler implements Lifecycle, Runnable { try { // 防止刷的过快,休眠1s TimeUnit.SECONDS.sleep(1); - } catch (InterruptedException e) { - e.printStackTrace(); + } catch (InterruptedException ignored) { } } } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/retry/RetryExecutor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/retry/RetryExecutor.java index 97984913..14d43052 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/retry/RetryExecutor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/retry/RetryExecutor.java @@ -2,9 +2,13 @@ package com.aizuda.easy.retry.server.retry.task.support.retry; import akka.actor.ActorRef; import cn.hutool.core.lang.Pair; +import com.aizuda.easy.retry.common.log.EasyRetryLog; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.util.DateUtils; +import com.aizuda.easy.retry.server.retry.task.dto.RetryLogMetaDTO; import com.aizuda.easy.retry.server.retry.task.support.FilterStrategy; import com.aizuda.easy.retry.server.retry.task.support.RetryContext; +import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter; import com.aizuda.easy.retry.server.retry.task.support.StopStrategy; import com.aizuda.easy.retry.server.common.WaitStrategy; import lombok.extern.slf4j.Slf4j; @@ -63,13 +67,12 @@ public class RetryExecutor { call = callable.call(); retryContext.setCallResult(call); } catch (Exception e) { - log.error("客户端执行失败: [{}]", retryContext.getRetryTask()); + RetryLogMetaDTO retryLogMetaDTO = RetryTaskConverter.INSTANCE.toLogMetaDTO(retryContext.getRetryTask()); + retryLogMetaDTO.setTimestamp(DateUtils.toNowMilli()); + EasyRetryLog.REMOTE.error("请求客户端执行失败. uniqueId:[{}] <|>{}<|>", retryContext.getRetryTask().getUniqueId(), e); retryContext.setException(e); } - // 计算下次触发时间 -// retryContext.getRetryTask().setNextTriggerAt(); - boolean isStop = Boolean.TRUE; // 触发停止策略判断 diff --git a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/server/RequestHandlerActor.java b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/server/RequestHandlerActor.java index 78f19a9b..7073099a 100644 --- a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/server/RequestHandlerActor.java +++ b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/server/RequestHandlerActor.java @@ -11,6 +11,7 @@ import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.common.HttpRequestHandler; import com.aizuda.easy.retry.server.common.Register; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.cache.CacheToken; import com.aizuda.easy.retry.server.common.dto.NettyHttpRequest; import com.aizuda.easy.retry.server.common.register.ClientRegister; import com.aizuda.easy.retry.server.common.register.RegisterContext; @@ -88,6 +89,12 @@ public class RequestHandlerActor extends AbstractActor { String groupName = headers.get(HeadersEnum.GROUP_NAME.getKey()); String contextPath = headers.get(HeadersEnum.CONTEXT_PATH.getKey()); String namespace = headers.get(HeadersEnum.NAMESPACE.getKey()); + String token = headers.get(HeadersEnum.TOKEN.getKey()); + + if (!CacheToken.get(groupName, namespace).equals(token)) { + EasyRetryLog.LOCAL.error("Token authentication failed. [{}]", token); + return JsonUtil.toJsonString(new Result<>(0, "Token authentication failed")); + } // 注册版本 RegisterContext registerContext = new RegisterContext(); @@ -103,6 +110,8 @@ public class RequestHandlerActor extends AbstractActor { EasyRetryLog.LOCAL.warn("client register error. groupName:[{}]", groupName); } + + UrlBuilder builder = UrlBuilder.ofHttp(uri); Collection httpRequestHandlers = SpringContext.getContext() .getBeansOfType(HttpRequestHandler.class).values(); diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/RetryTaskServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/RetryTaskServiceImpl.java index e441f66b..95f944f2 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/RetryTaskServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/RetryTaskServiceImpl.java @@ -249,7 +249,6 @@ public class RetryTaskServiceImpl implements RetryTaskService { RetryRpcClient rpcClient = RequestBuilder.newBuilder() .nodeInfo(serverNode) - .namespaceId(serverNode.getNamespaceId()) .client(RetryRpcClient.class) .build();