From 88ae12540ac01449b69360fcc57ab275d71657bf Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Fri, 13 Oct 2023 21:40:00 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.4.0=201.=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E5=AE=A2=E6=88=B7=E7=AB=AF=E8=B4=9F=E8=BD=BD=E5=9D=87=E8=A1=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/sql/easy_retry_mysql.sql | 4 +- .../persistence/po/GroupConfig.java | 1 + .../datasource/persistence/po/Job.java | 2 +- .../persistence/po/SceneConfig.java | 2 + .../server/common/ClientLoadBalance.java | 2 +- .../{handler => }/HttpRequestHandler.java | 2 +- .../ClientLoadBalanceConsistentHash.java | 6 +-- .../allocate/client/ClientLoadBalanceLRU.java | 10 ++-- .../client/ClientLoadBalanceManager.java | 5 +- .../client/ClientLoadBalanceRandom.java | 6 +-- .../client/ClientLoadBalanceRound.java | 37 +++++++++++++ .../server/common/client/RequestBuilder.java | 54 ++++++++++--------- .../common/client/RpcClientInvokeHandler.java | 41 ++++++++++---- .../common/client/annotation/Mapping.java | 6 --- .../server/common/dto/RegisterNodeInfo.java | 4 ++ .../common/enums/AllocationAlgorithmEnum.java | 21 -------- .../handler/ClientNodeAllocateHandler.java | 17 ++++-- .../common/handler/GetHttpRequestHandler.java | 1 + .../handler/PostHttpRequestHandler.java | 1 + .../support/dispatch/JobExecutorActor.java | 1 + .../support/executor/AbstractJobExecutor.java | 6 --- .../support/executor/JobExecutorContext.java | 12 ++++- .../executor/RealJobExecutorActor.java | 5 +- .../task/BroadcastTaskGenerator.java | 7 +-- .../generator/task/ClusterTaskGenerator.java | 10 ++-- .../task/JobTaskGenerateContext.java | 10 ++++ .../generator/task/ShardingTaskGenerator.java | 8 ++- .../retry/task/client/RetryRpcClient.java | 4 ++ .../dispatch/task/AbstractTaskExecutor.java | 13 +++-- .../dispatch/task/CallbackTaskExecutor.java | 10 ++-- .../task/ManualCallbackTaskExecutor.java | 32 ++++++----- .../task/ManualRetryTaskExecutor.java | 11 ++-- .../dispatch/task/RetryTaskExecutor.java | 37 ++++++------- .../server/server/RequestHandlerActor.java | 2 +- .../service/impl/RetryTaskServiceImpl.java | 32 ++++++++--- 35 files changed, 258 insertions(+), 164 deletions(-) rename easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/{handler => }/HttpRequestHandler.java (89%) create mode 100644 easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceRound.java delete mode 100644 easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/AllocationAlgorithmEnum.java diff --git a/doc/sql/easy_retry_mysql.sql b/doc/sql/easy_retry_mysql.sql index c88a328f8..5c5858a18 100644 --- a/doc/sql/easy_retry_mysql.sql +++ b/doc/sql/easy_retry_mysql.sql @@ -12,7 +12,6 @@ CREATE TABLE `group_config` `group_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '组状态 0、未启用 1、启用', `version` int(11) NOT NULL COMMENT '版本号', `group_partition` int(11) NOT NULL COMMENT '分区', - `route_key` tinyint(4) NOT NULL COMMENT '路由策略', `id_generator_mode` tinyint(4) NOT NULL DEFAULT '1' COMMENT '唯一id生成模式 默认号段模式', `init_scene` tinyint(4) NOT NULL DEFAULT '0' COMMENT '是否初始化场景 0:否 1:是', `bucket_index` int(11) DEFAULT NULL COMMENT 'bucket', @@ -135,6 +134,7 @@ CREATE TABLE `scene_config` `back_off` tinyint(4) NOT NULL DEFAULT '1' COMMENT '1、默认等级 2、固定间隔时间 3、CRON 表达式', `trigger_interval` varchar(16) NOT NULL DEFAULT '' COMMENT '间隔时长', `deadline_request` bigint(20) unsigned NOT NULL DEFAULT '60000' COMMENT 'Deadline Request 调用链超时 单位毫秒', + `route_key` tinyint(4) NOT NULL DEFAULT '4' COMMENT '路由策略', `description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', @@ -225,7 +225,7 @@ CREATE TABLE `job` ( `next_trigger_at` datetime NOT NULL COMMENT '下次触发时间', `job_status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '重试状态 0、关闭、1、开启', `task_type` varchar(255) DEFAULT NULL COMMENT '任务类型 1、集群 2、广播 3、切片', - `route_key` varchar(50) DEFAULT NULL COMMENT '执行器路由策略', + `route_key` tinyint(4) NOT NULL DEFAULT '4' COMMENT '路由策略', `executor_type` tinyint(4) NOT NULL DEFAULT '1' COMMENT '执行器类型', `executor_name` varchar(255) DEFAULT NULL COMMENT '执行器名称', `trigger_type` tinyint(4) NOT NULL COMMENT '触发类型 1.CRON 表达式 2. 固定时间', diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/GroupConfig.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/GroupConfig.java index 5ed1d5024..b64f2b76d 100644 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/GroupConfig.java +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/GroupConfig.java @@ -19,6 +19,7 @@ public class GroupConfig implements Serializable { private Integer groupPartition; + @Deprecated private Integer routeKey; private Integer idGeneratorMode; diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Job.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Job.java index 4d8b12225..1314550fc 100644 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Job.java +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/Job.java @@ -67,7 +67,7 @@ public class Job implements Serializable { /** * 执行器路由策略 */ - private String routeKey; + private Integer routeKey; /** * 执行器类型 1、Java diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/SceneConfig.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/SceneConfig.java index ddd94c3c8..8381ef2b6 100644 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/SceneConfig.java +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/SceneConfig.java @@ -29,6 +29,8 @@ public class SceneConfig implements Serializable { private Long deadlineRequest; + private Integer routeKey; + private LocalDateTime createDt; private LocalDateTime updateDt; diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/ClientLoadBalance.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/ClientLoadBalance.java index 6f2922c5f..a53bba019 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/ClientLoadBalance.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/ClientLoadBalance.java @@ -8,7 +8,7 @@ import java.util.TreeSet; */ public interface ClientLoadBalance { - String route(String currentGroupName, TreeSet clientAllAddressSet); + String route(String key, TreeSet clientAllAddressSet); int routeType(); diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/HttpRequestHandler.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/HttpRequestHandler.java similarity index 89% rename from easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/HttpRequestHandler.java rename to easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/HttpRequestHandler.java index a066c42b4..8a822dd1c 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/HttpRequestHandler.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/HttpRequestHandler.java @@ -1,4 +1,4 @@ -package com.aizuda.easy.retry.server.common.handler; +package com.aizuda.easy.retry.server.common; import cn.hutool.core.net.url.UrlBuilder; import io.netty.handler.codec.http.HttpHeaders; diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceConsistentHash.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceConsistentHash.java index 468775070..c0d377bad 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceConsistentHash.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceConsistentHash.java @@ -1,7 +1,7 @@ package com.aizuda.easy.retry.server.common.allocate.client; import com.aizuda.easy.retry.server.common.ClientLoadBalance; -import com.aizuda.easy.retry.server.common.enums.AllocationAlgorithmEnum; +import com.aizuda.easy.retry.server.common.allocate.client.ClientLoadBalanceManager.AllocationAlgorithmEnum; import com.aizuda.easy.retry.server.common.allocate.common.ConsistentHashRouter; import com.aizuda.easy.retry.server.common.allocate.common.Node; @@ -22,7 +22,7 @@ public class ClientLoadBalanceConsistentHash implements ClientLoadBalance { } @Override - public String route(String currentGroupName, TreeSet clientAllAddressSet) { + public String route(String allocKey, TreeSet clientAllAddressSet) { Collection cidNodes = new ArrayList(); for (String clientAddress : clientAllAddressSet) { @@ -30,7 +30,7 @@ public class ClientLoadBalanceConsistentHash implements ClientLoadBalance { } final ConsistentHashRouter consistentHashRouter = new ConsistentHashRouter<>(cidNodes, virtualNodeCnt); - ClientNode clientNode = consistentHashRouter.routeNode(currentGroupName); + ClientNode clientNode = consistentHashRouter.routeNode(allocKey); return clientNode.clientAddress; } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceLRU.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceLRU.java index 7d0722eeb..209a807bf 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceLRU.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceLRU.java @@ -23,8 +23,8 @@ public class ClientLoadBalanceLRU implements ClientLoadBalance { private ConcurrentHashMap> LRU_CACHE = new ConcurrentHashMap<>(); @Override - public String route(String currentGroupName, TreeSet clientAllAddressSet) { - LinkedHashMap lruItem = LRU_CACHE.get(currentGroupName); + public String route(String allocKey, TreeSet clientAllAddressSet) { + LinkedHashMap lruItem = LRU_CACHE.get(allocKey); if (Objects.isNull(lruItem)) { lruItem = new LinkedHashMap(16, 0.75f, true) { @Override @@ -32,13 +32,13 @@ public class ClientLoadBalanceLRU implements ClientLoadBalance { return super.size() > size; } }; + + LRU_CACHE.put(allocKey, lruItem); } // 添加新数据 for (String address: clientAllAddressSet) { - if (!lruItem.containsKey(address)) { - lruItem.put(address, address); - } + lruItem.computeIfAbsent(address, key -> address); } // 删除已经下线的节点 diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceManager.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceManager.java index ccd384065..18a1754a9 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceManager.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceManager.java @@ -1,6 +1,7 @@ package com.aizuda.easy.retry.server.common.allocate.client; import com.aizuda.easy.retry.server.common.ClientLoadBalance; +import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import lombok.Getter; import org.springframework.stereotype.Component; @@ -8,7 +9,6 @@ import org.springframework.stereotype.Component; * @author: www.byteblogs.com * @date : 2022-03-12 10:20 */ -@Component public class ClientLoadBalanceManager { public static ClientLoadBalance getClientLoadBalance(int routeType) { @@ -19,7 +19,7 @@ public class ClientLoadBalanceManager { } } - return null; + throw new EasyRetryServerException("routeType is not existed. routeType:[{}]", routeType); } @Getter @@ -28,6 +28,7 @@ public class ClientLoadBalanceManager { CONSISTENT_HASH(1, new ClientLoadBalanceConsistentHash(100)), RANDOM(2, new ClientLoadBalanceRandom()), LRU(3, new ClientLoadBalanceLRU(100)), + ROUND(4, new ClientLoadBalanceRound()) ; private final int type; diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceRandom.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceRandom.java index 892bbb8e7..e43a28b43 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceRandom.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceRandom.java @@ -1,8 +1,7 @@ package com.aizuda.easy.retry.server.common.allocate.client; import com.aizuda.easy.retry.server.common.ClientLoadBalance; -import com.aizuda.easy.retry.server.common.enums.AllocationAlgorithmEnum; -import org.springframework.stereotype.Component; +import com.aizuda.easy.retry.server.common.allocate.client.ClientLoadBalanceManager.AllocationAlgorithmEnum; import java.util.Random; import java.util.TreeSet; @@ -11,13 +10,12 @@ import java.util.TreeSet; * @author: www.byteblogs.com * @date : 2022-03-11 22:00 */ -@Component public class ClientLoadBalanceRandom implements ClientLoadBalance { private Random random = new Random(); @Override - public String route(String currentGroupName, TreeSet clientAllAddressSet) { + public String route(String allocKey, TreeSet clientAllAddressSet) { String[] addressArr = clientAllAddressSet.toArray(new String[clientAllAddressSet.size()]); return addressArr[random.nextInt(clientAllAddressSet.size())]; } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceRound.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceRound.java new file mode 100644 index 000000000..169f22097 --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/allocate/client/ClientLoadBalanceRound.java @@ -0,0 +1,37 @@ +package com.aizuda.easy.retry.server.common.allocate.client; + +import com.aizuda.easy.retry.server.common.ClientLoadBalance; + +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author: www.byteblogs.com + * @date : 2023-10-13 15:43 + * @since : 2.4.0 + */ +public class ClientLoadBalanceRound implements ClientLoadBalance { + + private static final ConcurrentHashMap COUNTER = new ConcurrentHashMap<>(); + private static final int THRESHOLD = Integer.MAX_VALUE - 10000; + + @Override + public String route(final String allocKey, final TreeSet clientAllAddressSet) { + String[] addressArr = clientAllAddressSet.toArray(new String[0]); + AtomicInteger next = COUNTER.getOrDefault(allocKey, new AtomicInteger(1)); + String nextClientId = addressArr[ next.get() % clientAllAddressSet.size()]; + int nextIndex = next.incrementAndGet(); + if (nextIndex > THRESHOLD) { + next = new AtomicInteger(1); + } + + COUNTER.put(allocKey, next); + return nextClientId; + } + + @Override + public int routeType() { + return 0; + } +} diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RequestBuilder.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RequestBuilder.java index 5a8b9bba2..b1108e883 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RequestBuilder.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RequestBuilder.java @@ -1,6 +1,7 @@ package com.aizuda.easy.retry.server.common.client; import cn.hutool.core.lang.Assert; +import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.github.rholder.retry.RetryListener; @@ -18,14 +19,14 @@ public class RequestBuilder { private Class clintInterface; private String groupName; - private String hostId; - private String hostIp; - private Integer hostPort; - private String contextPath; + private RegisterNodeInfo nodeInfo; private boolean failRetry; private int retryTimes = 3; private int retryInterval = 1; private RetryListener retryListener = new SimpleRetryListener(); + private boolean failover; + private int routeKey; + private String allocKey; public static RequestBuilder newBuilder() { @@ -37,23 +38,8 @@ public class RequestBuilder { return this; } - public RequestBuilder hostPort(Integer hostPort) { - this.hostPort = hostPort; - return this; - } - - public RequestBuilder contextPath(String contextPath) { - this.contextPath = contextPath; - return this; - } - - public RequestBuilder hostId(String hostId) { - this.hostId = hostId; - return this; - } - - public RequestBuilder hostIp(String hostIp) { - this.hostIp = hostIp; + public RequestBuilder nodeInfo(RegisterNodeInfo nodeInfo) { + this.nodeInfo = nodeInfo; return this; } @@ -82,6 +68,21 @@ public class RequestBuilder { return this; } + public RequestBuilder allocKey(boolean failover) { + this.failover = failover; + return this; + } + + public RequestBuilder routeKey(int routeKey) { + this.routeKey = routeKey; + return this; + } + + + public RequestBuilder allocKey(String allocKey) { + this.allocKey = allocKey; + return this; + } public T build() { if (Objects.isNull(clintInterface)) { @@ -89,11 +90,12 @@ public class RequestBuilder { } Assert.notBlank(groupName, () -> new EasyRetryServerException("groupName cannot be null")); - Assert.notBlank(hostId, () -> new EasyRetryServerException("hostId cannot be null")); - Assert.notBlank(hostIp, () -> new EasyRetryServerException("hostIp cannot be null")); - Assert.notNull(hostPort, () -> new EasyRetryServerException("hostPort cannot be null")); - Assert.notBlank(contextPath, () -> new EasyRetryServerException("contextPath cannot be null")); + Assert.notNull(nodeInfo, () -> new EasyRetryServerException("nodeInfo cannot be null")); + if (failover) { + Assert.isTrue(routeKey > 0, () -> new EasyRetryServerException("routeKey cannot be null")); + Assert.notBlank(allocKey, () -> new EasyRetryServerException("allocKey cannot be null")); + } try { clintInterface = (Class) Class.forName(clintInterface.getName()); } catch (Exception e) { @@ -101,7 +103,7 @@ public class RequestBuilder { } RpcClientInvokeHandler clientInvokeHandler = new RpcClientInvokeHandler( - groupName, hostId, hostIp, hostPort, contextPath, failRetry, retryTimes, retryInterval, retryListener); + groupName, nodeInfo, failRetry, retryTimes, retryInterval, retryListener, routeKey, allocKey, failover); return (T) Proxy.newProxyInstance(clintInterface.getClassLoader(), new Class[]{clintInterface}, clientInvokeHandler); diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RpcClientInvokeHandler.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RpcClientInvokeHandler.java index 85d4383ff..87316ed55 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RpcClientInvokeHandler.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/RpcClientInvokeHandler.java @@ -7,6 +7,7 @@ import com.aizuda.easy.retry.common.core.context.SpringContext; import com.aizuda.easy.retry.common.core.model.Result; import com.aizuda.easy.retry.common.core.util.HostUtils; import com.aizuda.easy.retry.common.core.util.JsonUtil; +import com.aizuda.easy.retry.server.common.allocate.client.ClientLoadBalanceManager.AllocationAlgorithmEnum; import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; import com.aizuda.easy.retry.server.common.client.annotation.Body; import com.aizuda.easy.retry.server.common.client.annotation.Header; @@ -29,6 +30,7 @@ import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.ResponseEntity; +import org.springframework.http.client.OkHttp3ClientHttpRequestFactory; import org.springframework.util.CollectionUtils; import org.springframework.web.client.ResourceAccessException; import org.springframework.web.client.RestClientException; @@ -52,27 +54,47 @@ import java.util.concurrent.TimeUnit; * @since 2.0.0 */ @Slf4j -@AllArgsConstructor public class RpcClientInvokeHandler implements InvocationHandler { public static final String URL = "http://{0}:{1}/{2}"; - private String groupName; + private final String groupName; private String hostId; private String hostIp; private Integer hostPort; private String contextPath; - private boolean failRetry; - private int retryTimes; - private int retryInterval; - private RetryListener retryListener; + private final boolean failRetry; + private final int retryTimes; + private final int retryInterval; + private final RetryListener retryListener; + private final boolean failover; + private final Integer routeKey; + private final String allocKey; + + public RpcClientInvokeHandler(final String groupName, final RegisterNodeInfo registerNodeInfo, + final boolean failRetry, final int retryTimes, + final int retryInterval, final RetryListener retryListener, final Integer routeKey, final String allocKey, + final boolean failover) { + this.groupName = groupName; + this.hostId = registerNodeInfo.getHostId(); + this.hostPort = registerNodeInfo.getHostPort(); + this.hostIp = registerNodeInfo.getHostIp(); + this.contextPath = registerNodeInfo.getContextPath(); + this.failRetry = failRetry; + this.retryTimes = retryTimes; + this.retryInterval = retryInterval; + this.retryListener = retryListener; + this.failover = failover; + this.routeKey = routeKey; + this.allocKey = allocKey; + } @Override public Result invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { Mapping annotation = method.getAnnotation(Mapping.class); Assert.notNull(annotation, () -> new EasyRetryServerException("@Mapping cannot be null")); - if (annotation.failover()) { + if (failover) { return doFailoverHandler(method, args, annotation); } @@ -134,7 +156,7 @@ public class RpcClientInvokeHandler implements InvocationHandler { return result; } catch (RestClientException ex) { // 网络异常 - if (ex instanceof ResourceAccessException && mapping.failover()) { + if (ex instanceof ResourceAccessException && failover) { log.error("request client I/O error, count:[{}] clientId:[{}] clientAddr:[{}:{}] serverIp:[{}]", count, hostId, hostIp, hostPort, HostUtils.getIp(), ex); @@ -143,7 +165,8 @@ public class RpcClientInvokeHandler implements InvocationHandler { // 重新选一个可用的客户端节点 ClientNodeAllocateHandler clientNodeAllocateHandler = SpringContext.CONTEXT.getBean( ClientNodeAllocateHandler.class); - RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(groupName); + RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(allocKey, groupName, + routeKey); // 这里表示无可用节点 if (Objects.isNull(serverNode)) { throw ex; diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/annotation/Mapping.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/annotation/Mapping.java index ed18eb19d..432aeb594 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/annotation/Mapping.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/client/annotation/Mapping.java @@ -24,10 +24,4 @@ public @interface Mapping { String path() default ""; - /** - * 是否支持失败转移 - * @return false or trur - */ - boolean failover() default false; - } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/RegisterNodeInfo.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/RegisterNodeInfo.java index 8f7733c25..57f41a2e9 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/RegisterNodeInfo.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/RegisterNodeInfo.java @@ -34,6 +34,10 @@ public class RegisterNodeInfo implements Comparable { return MessageFormat.format(URL, hostIp, hostPort.toString(), contextPath); } + public String address() { + return MessageFormat.format("{0}:{1}", hostIp, hostPort.toString()); + } + @Override public int compareTo(RegisterNodeInfo info) { return hostId.compareTo(info.hostId); diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/AllocationAlgorithmEnum.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/AllocationAlgorithmEnum.java deleted file mode 100644 index adf35c2b3..000000000 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/enums/AllocationAlgorithmEnum.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.aizuda.easy.retry.server.common.enums; - -import lombok.Getter; - -/** - * @author: www.byteblogs.com - * @date : 2022-03-11 21:55 - */ -@Getter -public enum AllocationAlgorithmEnum { - - CONSISTENT_HASH(1), - RANDOM(2), - LRU(3), - ; - - private final int type; - AllocationAlgorithmEnum(int type) { - this.type = type; - } -} diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/ClientNodeAllocateHandler.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/ClientNodeAllocateHandler.java index dbf211afd..440f954a5 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/ClientNodeAllocateHandler.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/ClientNodeAllocateHandler.java @@ -3,6 +3,7 @@ package com.aizuda.easy.retry.server.common.handler; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.common.ClientLoadBalance; import com.aizuda.easy.retry.server.common.allocate.client.ClientLoadBalanceManager; +import com.aizuda.easy.retry.server.common.allocate.client.ClientLoadBalanceManager.AllocationAlgorithmEnum; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; @@ -15,6 +16,7 @@ import org.springframework.util.CollectionUtils; import java.util.Set; import java.util.TreeSet; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * @author: www.byteblogs.com @@ -29,20 +31,25 @@ public class ClientNodeAllocateHandler { /** * 获取分配的节点 + * @param allocKey 分配的key + * @param groupName 组名称 + * @param routeKey {@link AllocationAlgorithmEnum} 路由类型 */ - public RegisterNodeInfo getServerNode(String groupName) { + public RegisterNodeInfo getServerNode(String allocKey, String groupName, Integer routeKey) { - GroupConfig groupConfig = accessTemplate.getGroupConfigAccess().getGroupConfigByGroupName(groupName); Set serverNodes = CacheRegisterTable.getServerNodeSet(groupName); if (CollectionUtils.isEmpty(serverNodes)) { LogUtils.warn(log, "client node is null. groupName:[{}]", groupName); return null; } - ClientLoadBalance clientLoadBalanceRandom = ClientLoadBalanceManager.getClientLoadBalance(groupConfig.getRouteKey()); + ClientLoadBalance clientLoadBalanceRandom = ClientLoadBalanceManager.getClientLoadBalance(routeKey); - String hostIp = clientLoadBalanceRandom.route(groupName, new TreeSet<>(serverNodes.stream().map(RegisterNodeInfo::getHostIp).collect(Collectors.toSet()))); - return serverNodes.stream().filter(s -> s.getHostIp().equals(hostIp)).findFirst().get(); + String hostIp = clientLoadBalanceRandom.route(allocKey, new TreeSet<>(serverNodes.stream().map(RegisterNodeInfo::getHostIp).collect(Collectors.toSet()))); + + Stream registerNodeInfoStream = serverNodes.stream() + .filter(s -> s.getHostIp().equals(hostIp)); + return registerNodeInfoStream.findFirst().orElse(null); } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/GetHttpRequestHandler.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/GetHttpRequestHandler.java index add782179..b0bcc12bb 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/GetHttpRequestHandler.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/GetHttpRequestHandler.java @@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.common.handler; import cn.hutool.core.net.url.UrlBuilder; import cn.hutool.core.net.url.UrlQuery; +import com.aizuda.easy.retry.server.common.HttpRequestHandler; import io.netty.handler.codec.http.HttpHeaders; /** diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/PostHttpRequestHandler.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/PostHttpRequestHandler.java index 97bb0f7e7..4cbdce3cb 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/PostHttpRequestHandler.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/handler/PostHttpRequestHandler.java @@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.common.handler; import cn.hutool.core.net.url.UrlBuilder; import cn.hutool.core.net.url.UrlQuery; +import com.aizuda.easy.retry.server.common.HttpRequestHandler; import io.netty.handler.codec.http.HttpHeaders; /** diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java index 0e2e31418..a41f1ea5d 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java @@ -49,6 +49,7 @@ public class JobExecutorActor extends AbstractActor { context.setGroupName(taskExecute.getGroupName()); context.setJobId(job.getId()); context.setTaskType(job.getTaskType()); + context.setJob(job); jobExecutor.execute(context); } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/AbstractJobExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/AbstractJobExecutor.java index b620113ea..e7b84b500 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/AbstractJobExecutor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/AbstractJobExecutor.java @@ -21,23 +21,17 @@ import java.util.List; */ public abstract class AbstractJobExecutor implements JobExecutor, InitializingBean { - @Autowired - private JobMapper jobMapper; - @Override public void execute(JobExecutorContext context) { // 生成任务 JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(getTaskInstanceType().getType()); JobTaskGenerateContext instanceGenerateContext = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(context); - instanceGenerateContext.setTaskBatchId(context.getTaskBatchId()); List taskList = taskInstance.generate(instanceGenerateContext); if (CollectionUtils.isEmpty(taskList)) { return; } - Job job = jobMapper.selectById(context.getJobId()); - context.setJob(job); context.setTaskList(taskList); doExecute(context); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/JobExecutorContext.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/JobExecutorContext.java index 8d861deef..3001e200d 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/JobExecutorContext.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/JobExecutorContext.java @@ -36,6 +36,16 @@ public class JobExecutorContext { private List taskList; - private Job job; + /** + * 执行方法参数 + */ + private String argsStr; + + /** + * 参数类型 text/json + */ + private Integer argsType; + + private Integer routeKey; } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/RealJobExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/RealJobExecutorActor.java index 0d2110467..9cda79d8e 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/RealJobExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/RealJobExecutorActor.java @@ -115,11 +115,8 @@ public class RealJobExecutorActor extends AbstractActor { private JobRpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo, RealJobExecutorDTO realJobExecutorDTO) { return RequestBuilder.newBuilder() - .hostPort(registerNodeInfo.getHostPort()) .groupName(registerNodeInfo.getGroupName()) - .hostId(registerNodeInfo.getHostId()) - .hostIp(registerNodeInfo.getHostIp()) - .contextPath(registerNodeInfo.getContextPath()) + .nodeInfo(registerNodeInfo) .failRetry(Boolean.TRUE) .retryTimes(realJobExecutorDTO.getMaxRetryTimes()) .retryInterval(realJobExecutorDTO.getRetryInterval()) diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/task/BroadcastTaskGenerator.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/task/BroadcastTaskGenerator.java index 1cdc5dc0e..ef8adb2a9 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/task/BroadcastTaskGenerator.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/task/BroadcastTaskGenerator.java @@ -52,15 +52,12 @@ public class BroadcastTaskGenerator extends AbstractJobTaskGenerator { return Lists.newArrayList(); } - Job job = jobMapper.selectById(context.getJobId()); - List jobTasks = new ArrayList<>(serverNodes.size()); for (RegisterNodeInfo serverNode : serverNodes) { JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); jobTask.setClientId(serverNode.getHostId()); - jobTask.setArgsType(job.getArgsType()); - jobTask.setArgsStr(job.getArgsStr()); - jobTask.setExtAttrs(job.getExtAttrs()); + jobTask.setArgsType(context.getArgsType()); + jobTask.setArgsStr(context.getArgsStr()); jobTask.setExecuteStatus(JobTaskStatusEnum.RUNNING.getStatus()); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败")); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/task/ClusterTaskGenerator.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/task/ClusterTaskGenerator.java index 51d87efae..7cd60dbcf 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/task/ClusterTaskGenerator.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/task/ClusterTaskGenerator.java @@ -45,20 +45,18 @@ public class ClusterTaskGenerator extends AbstractJobTaskGenerator { @Override public List doGenerate(JobTaskGenerateContext context) { // 生成可执行任务 - RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(context.getGroupName()); + RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(context.getJobId().toString(), + context.getGroupName(), context.getRouteKey()); if (Objects.isNull(serverNode)) { log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId()); return Lists.newArrayList(); } - Job job = jobMapper.selectById(context.getJobId()); - // 新增任务实例 JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); jobTask.setClientId(serverNode.getHostId()); - jobTask.setArgsType(job.getArgsType()); - jobTask.setArgsStr(job.getArgsStr()); - jobTask.setExtAttrs(job.getExtAttrs()); + jobTask.setArgsType(context.getArgsType()); + jobTask.setArgsStr(context.getArgsStr()); jobTask.setExecuteStatus(JobTaskStatusEnum.RUNNING.getStatus()); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败")); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/task/JobTaskGenerateContext.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/task/JobTaskGenerateContext.java index b778eb037..720463093 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/task/JobTaskGenerateContext.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/task/JobTaskGenerateContext.java @@ -12,4 +12,14 @@ public class JobTaskGenerateContext { private Long taskBatchId; private String groupName; private Long jobId; + private Integer routeKey; + /** + * 执行方法参数 + */ + private String argsStr; + + /** + * 参数类型 text/json + */ + private Integer argsType; } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/task/ShardingTaskGenerator.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/task/ShardingTaskGenerator.java index 64768e6b7..23d7b4b04 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/task/ShardingTaskGenerator.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/generator/task/ShardingTaskGenerator.java @@ -55,8 +55,7 @@ public class ShardingTaskGenerator extends AbstractJobTaskGenerator { return Lists.newArrayList(); } - Job job = jobMapper.selectById(context.getJobId()); - String argsStr = job.getArgsStr(); + String argsStr = context.getArgsStr(); Map split = Splitter.on(";").omitEmptyStrings().withKeyValueSeparator('=').split(argsStr); List nodeInfoList = new ArrayList<>(serverNodes); @@ -66,9 +65,8 @@ public class ShardingTaskGenerator extends AbstractJobTaskGenerator { // 新增任务实例 JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); jobTask.setClientId(registerNodeInfo.getHostId()); - jobTask.setArgsType(job.getArgsType()); - jobTask.setArgsStr(job.getArgsStr()); - jobTask.setExtAttrs(job.getExtAttrs()); + jobTask.setArgsType(context.getArgsType()); + jobTask.setArgsStr(value); jobTask.setExecuteStatus(JobTaskStatusEnum.RUNNING.getStatus()); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败")); diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/client/RetryRpcClient.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/client/RetryRpcClient.java index 74d7f7df8..8fde7564f 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/client/RetryRpcClient.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/client/RetryRpcClient.java @@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.retry.task.client; import com.aizuda.easy.retry.client.model.DispatchRetryDTO; import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO; +import com.aizuda.easy.retry.client.model.GenerateRetryIdempotentIdDTO; import com.aizuda.easy.retry.client.model.RetryCallbackDTO; import com.aizuda.easy.retry.client.model.StopJobDTO; import com.aizuda.easy.retry.client.model.request.DispatchJobRequest; @@ -27,4 +28,7 @@ public interface RetryRpcClient { @Mapping(path = "/retry/callback/v1", method = RequestMethod.POST, failover = true) Result callback(@Body RetryCallbackDTO retryCallbackDTO); + @Mapping(path = "/retry/generate/idempotent-id/v1", method = RequestMethod.POST) + Result generateIdempotentId(@Body GenerateRetryIdempotentIdDTO retryCallbackDTO); + } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/AbstractTaskExecutor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/AbstractTaskExecutor.java index 74020275a..935a20dfd 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/AbstractTaskExecutor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/AbstractTaskExecutor.java @@ -9,6 +9,7 @@ import com.aizuda.easy.retry.server.retry.task.support.RetryContext; import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor; import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; +import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; @@ -39,8 +40,10 @@ public abstract class AbstractTaskExecutor implements TaskExecutor, Initializing // 重试次数累加 retryCountIncrement(retryTask); - RetryContext retryContext = builderRetryContext(retryTask.getGroupName(), retryTask); - RetryExecutor executor = builderResultRetryExecutor(retryContext); + SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName()); + + RetryContext retryContext = builderRetryContext(retryTask.getGroupName(), retryTask, sceneConfig); + RetryExecutor executor = builderResultRetryExecutor(retryContext, sceneConfig); if (!preCheck(retryContext, executor)) { return; @@ -75,9 +78,11 @@ public abstract class AbstractTaskExecutor implements TaskExecutor, Initializing actorRef.tell(retryExecutor, actorRef); } - protected abstract RetryContext builderRetryContext(String groupName, RetryTask retryTask); + protected abstract RetryContext builderRetryContext(String groupName, RetryTask retryTask, + final SceneConfig sceneConfig); - protected abstract RetryExecutor builderResultRetryExecutor(RetryContext retryContext); + protected abstract RetryExecutor builderResultRetryExecutor(RetryContext retryContext, + final SceneConfig sceneConfig); protected abstract ActorRef getActorRef(); diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/CallbackTaskExecutor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/CallbackTaskExecutor.java index 665f3f5dc..e7c0c80d9 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/CallbackTaskExecutor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/CallbackTaskExecutor.java @@ -12,6 +12,7 @@ import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies import com.aizuda.easy.retry.server.retry.task.support.strategy.StopStrategies; import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; +import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig; import org.springframework.stereotype.Component; /** @@ -25,17 +26,20 @@ import org.springframework.stereotype.Component; public class CallbackTaskExecutor extends AbstractTaskExecutor { @Override - protected RetryContext builderRetryContext(final String groupName, final RetryTask retryTask) { + protected RetryContext builderRetryContext(final String groupName, final RetryTask retryTask, + final SceneConfig sceneConfig) { CallbackRetryContext retryContext = new CallbackRetryContext<>(); retryContext.setRetryTask(retryTask); retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName)); - retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName())); + retryContext.setServerNode( + clientNodeAllocateHandler.getServerNode(retryTask.getSceneName(), retryTask.getGroupName(), + sceneConfig.getRouteKey())); return retryContext; } @Override - protected RetryExecutor builderResultRetryExecutor(RetryContext retryContext) { + protected RetryExecutor builderResultRetryExecutor(RetryContext retryContext, final SceneConfig sceneConfig) { return RetryBuilder.newBuilder() .withStopStrategy(StopStrategies.stopException()) .withStopStrategy(StopStrategies.stopResultStatus()) diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualCallbackTaskExecutor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualCallbackTaskExecutor.java index cc2a733c1..1d922c8b8 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualCallbackTaskExecutor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualCallbackTaskExecutor.java @@ -15,6 +15,7 @@ import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies import com.aizuda.easy.retry.server.retry.task.support.strategy.StopStrategies; import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; +import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig; import org.springframework.stereotype.Component; /** @@ -28,29 +29,32 @@ import org.springframework.stereotype.Component; public class ManualCallbackTaskExecutor extends AbstractTaskExecutor { @Override - protected RetryContext builderRetryContext(final String groupName, final RetryTask retryTask) { + protected RetryContext builderRetryContext(final String groupName, final RetryTask retryTask, + final SceneConfig sceneConfig) { CallbackRetryContext retryContext = new CallbackRetryContext<>(); retryContext.setRetryTask(retryTask); retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName)); - retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName())); + retryContext.setServerNode( + clientNodeAllocateHandler.getServerNode(retryTask.getSceneName(), retryTask.getGroupName(), + sceneConfig.getRouteKey())); return retryContext; } @Override - protected RetryExecutor builderResultRetryExecutor(RetryContext retryContext) { + protected RetryExecutor builderResultRetryExecutor(RetryContext retryContext, final SceneConfig sceneConfig) { return RetryBuilder.newBuilder() - .withStopStrategy(StopStrategies.stopException()) - .withStopStrategy(StopStrategies.stopResultStatus()) - .withWaitStrategy(getWaitWaitStrategy()) - .withFilterStrategy(FilterStrategies.triggerAtFilter()) - .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) - .withFilterStrategy(FilterStrategies.sceneBlackFilter()) - .withFilterStrategy(FilterStrategies.checkAliveClientPodFilter()) - .withFilterStrategy(FilterStrategies.rebalanceFilterStrategies()) - .withFilterStrategy(FilterStrategies.rateLimiterFilter()) - .withRetryContext(retryContext) - .build(); + .withStopStrategy(StopStrategies.stopException()) + .withStopStrategy(StopStrategies.stopResultStatus()) + .withWaitStrategy(getWaitWaitStrategy()) + .withFilterStrategy(FilterStrategies.triggerAtFilter()) + .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) + .withFilterStrategy(FilterStrategies.sceneBlackFilter()) + .withFilterStrategy(FilterStrategies.checkAliveClientPodFilter()) + .withFilterStrategy(FilterStrategies.rebalanceFilterStrategies()) + .withFilterStrategy(FilterStrategies.rateLimiterFilter()) + .withRetryContext(retryContext) + .build(); } @Override diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualRetryTaskExecutor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualRetryTaskExecutor.java index 8ab906e60..784ed9caf 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualRetryTaskExecutor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/ManualRetryTaskExecutor.java @@ -31,16 +31,19 @@ public class ManualRetryTaskExecutor extends AbstractTaskExecutor { @Override protected RetryContext> builderRetryContext(final String groupName, - final RetryTask retryTask) { + final RetryTask retryTask, + final SceneConfig sceneConfig) { MaxAttemptsPersistenceRetryContext> retryContext = new MaxAttemptsPersistenceRetryContext<>(); retryContext.setRetryTask(retryTask); retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName)); - retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName())); - return retryContext; + retryContext.setServerNode( + clientNodeAllocateHandler.getServerNode(retryTask.getSceneName(), retryTask.getGroupName(), + sceneConfig.getRouteKey())); return retryContext; } @Override - protected RetryExecutor> builderResultRetryExecutor(RetryContext retryContext) { + protected RetryExecutor> builderResultRetryExecutor(RetryContext retryContext, + final SceneConfig sceneConfig) { RetryTask retryTask = retryContext.getRetryTask(); return RetryBuilder.newBuilder() diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/RetryTaskExecutor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/RetryTaskExecutor.java index 9d362af71..bef00d128 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/RetryTaskExecutor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/task/RetryTaskExecutor.java @@ -28,30 +28,34 @@ public class RetryTaskExecutor extends AbstractTaskExecutor { @Override protected RetryContext> builderRetryContext(final String groupName, - final RetryTask retryTask) { + final RetryTask retryTask, + final SceneConfig sceneConfig) { MaxAttemptsPersistenceRetryContext> retryContext = new MaxAttemptsPersistenceRetryContext<>(); retryContext.setRetryTask(retryTask); retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName)); - retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName())); + retryContext.setServerNode( + clientNodeAllocateHandler.getServerNode(retryTask.getSceneName(), retryTask.getGroupName(), + sceneConfig.getRouteKey())); return retryContext; } @Override - protected RetryExecutor> builderResultRetryExecutor(RetryContext retryContext) { + protected RetryExecutor> builderResultRetryExecutor(RetryContext retryContext, + final SceneConfig sceneConfig) { RetryTask retryTask = retryContext.getRetryTask(); return RetryBuilder.>newBuilder() - .withStopStrategy(StopStrategies.stopException()) - .withStopStrategy(StopStrategies.stopResultStatusCode()) - .withWaitStrategy(getWaitWaitStrategy(retryTask.getGroupName(), retryTask.getSceneName())) - .withFilterStrategy(FilterStrategies.triggerAtFilter()) - .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) - .withFilterStrategy(FilterStrategies.sceneBlackFilter()) - .withFilterStrategy(FilterStrategies.checkAliveClientPodFilter()) - .withFilterStrategy(FilterStrategies.rebalanceFilterStrategies()) - .withFilterStrategy(FilterStrategies.rateLimiterFilter()) - .withRetryContext(retryContext) - .build(); + .withStopStrategy(StopStrategies.stopException()) + .withStopStrategy(StopStrategies.stopResultStatusCode()) + .withWaitStrategy(getWaitWaitStrategy(sceneConfig)) + .withFilterStrategy(FilterStrategies.triggerAtFilter()) + .withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy)) + .withFilterStrategy(FilterStrategies.sceneBlackFilter()) + .withFilterStrategy(FilterStrategies.checkAliveClientPodFilter()) + .withFilterStrategy(FilterStrategies.rebalanceFilterStrategies()) + .withFilterStrategy(FilterStrategies.rateLimiterFilter()) + .withRetryContext(retryContext) + .build(); } @Override @@ -59,11 +63,8 @@ public class RetryTaskExecutor extends AbstractTaskExecutor { return TaskExecutorSceneEnum.AUTO_RETRY; } - private WaitStrategy getWaitWaitStrategy(String groupName, String sceneName) { - - SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(groupName, sceneName); + private WaitStrategy getWaitWaitStrategy(SceneConfig sceneConfig) { Integer backOff = sceneConfig.getBackOff(); - return WaitStrategies.WaitStrategyEnum.getWaitStrategy(backOff); } diff --git a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/server/RequestHandlerActor.java b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/server/RequestHandlerActor.java index 8feeea978..c463365e0 100644 --- a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/server/RequestHandlerActor.java +++ b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/server/RequestHandlerActor.java @@ -14,7 +14,7 @@ import com.aizuda.easy.retry.server.common.Register; import com.aizuda.easy.retry.server.retry.task.support.handler.ConfigVersionSyncHandler; import com.aizuda.easy.retry.server.common.register.ClientRegister; import com.aizuda.easy.retry.server.common.register.RegisterContext; -import com.aizuda.easy.retry.server.common.handler.HttpRequestHandler; +import com.aizuda.easy.retry.server.common.HttpRequestHandler; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.DefaultFullHttpResponse; diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/RetryTaskServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/RetryTaskServiceImpl.java index ea4ff1f17..15ef74de5 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/RetryTaskServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/RetryTaskServiceImpl.java @@ -6,11 +6,14 @@ import com.aizuda.easy.retry.client.model.GenerateRetryIdempotentIdDTO; import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; import com.aizuda.easy.retry.common.core.model.Result; import com.aizuda.easy.retry.common.core.util.JsonUtil; +import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; +import com.aizuda.easy.retry.server.common.client.RequestBuilder; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.common.enums.TaskGeneratorScene; import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO; +import com.aizuda.easy.retry.server.retry.task.client.RetryRpcClient; import com.aizuda.easy.retry.server.retry.task.generator.task.TaskContext; import com.aizuda.easy.retry.server.retry.task.generator.task.TaskGenerator; import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler; @@ -37,6 +40,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLog import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLogMessage; +import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig; import com.aizuda.easy.retry.template.datasource.utils.RequestDataHelper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; @@ -55,6 +59,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -202,21 +207,34 @@ public class RetryTaskServiceImpl implements RetryTaskService { @Override public String idempotentIdGenerate(final GenerateRetryIdempotentIdVO generateRetryIdempotentIdVO) { - RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(generateRetryIdempotentIdVO.getGroupName()); - Assert.notNull(serverNode, () -> new EasyRetryServerException("生成idempotentId失败: 不存在活跃的客户端节点")); + + Set serverNodes = CacheRegisterTable.getServerNodeSet(generateRetryIdempotentIdVO.getGroupName()); + Assert.notEmpty(serverNodes, () -> new EasyRetryServerException("生成idempotentId失败: 不存在活跃的客户端节点")); + + SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess() + .getSceneConfigByGroupNameAndSceneName(generateRetryIdempotentIdVO.getGroupName(), + generateRetryIdempotentIdVO.getSceneName()); + + RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(sceneConfig.getSceneName(), + sceneConfig.getGroupName(), sceneConfig.getRouteKey()); // 委托客户端生成idempotentId - String url = MessageFormat - .format(URL, serverNode.getHostIp(), serverNode.getHostPort().toString(), serverNode.getContextPath()); - GenerateRetryIdempotentIdDTO generateRetryIdempotentIdDTO = new GenerateRetryIdempotentIdDTO(); generateRetryIdempotentIdDTO.setGroup(generateRetryIdempotentIdVO.getGroupName()); generateRetryIdempotentIdDTO.setScene(generateRetryIdempotentIdVO.getSceneName()); generateRetryIdempotentIdDTO.setArgsStr(generateRetryIdempotentIdVO.getArgsStr()); generateRetryIdempotentIdDTO.setExecutorName(generateRetryIdempotentIdVO.getExecutorName()); - HttpEntity requestEntity = new HttpEntity<>(generateRetryIdempotentIdDTO); - Result result = restTemplate.postForObject(url, requestEntity, Result.class); + RetryRpcClient rpcClient = RequestBuilder.newBuilder() + .hostPort(serverNode.getHostPort()) + .groupName(serverNode.getGroupName()) + .hostId(serverNode.getHostId()) + .hostIp(serverNode.getHostIp()) + .contextPath(serverNode.getContextPath()) + .client(RetryRpcClient.class) + .build(); + + Result result = rpcClient.generateIdempotentId(generateRetryIdempotentIdDTO); Assert.notNull(result, () -> new EasyRetryServerException("idempotentId生成失败")); Assert.isTrue(1 == result.getStatus(), () -> new EasyRetryServerException("idempotentId生成失败:请确保参数与执行器名称正确"));