From 5367009a780eb63de2f6fdda8d5e6eb73bc3125c Mon Sep 17 00:00:00 2001 From: srzou Date: Wed, 11 Dec 2024 21:37:34 +0800 Subject: [PATCH] =?UTF-8?q?feat(sj=5F1.3.0-beta1):=20=E8=B0=83=E6=95=B4?= =?UTF-8?q?=E5=AE=A2=E6=88=B7=E7=AB=AF=E6=B3=A8=E5=86=8C=E9=80=BB=E8=BE=91?= =?UTF-8?q?=EF=BC=9B=E4=BD=BF=E7=94=A8=E4=B8=BB=E8=8A=82=E7=82=B9=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F=E5=AF=B9=E5=AE=A2=E6=88=B7=E7=AB=AF=E8=BF=9B=E8=A1=8C?= =?UTF-8?q?=E7=BB=AD=E7=AD=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/core/constant/SystemConstants.java | 2 + .../server/common/cache/CacheToken.java | 11 +- .../common/config/SystemProperties.java | 4 + .../common/register/ClientRegister.java | 60 ++-- .../server/common/rpc/client/GrpcChannel.java | 26 +- .../common/rpc/client/NettyChannel.java | 25 ++ .../rpc/server/GrpcRequestHandlerActor.java | 19 -- .../rpc/server/RequestHandlerActor.java | 18 -- .../handler/BeatHttpRequestHandler.java | 17 ++ .../job/task/server/ServerRpcClient.java | 20 ++ .../GetRegNodesPostHttpRequestHandler.java | 74 +++++ .../starter/schedule/RefreshNodeSchedule.java | 256 ++++++++++++++++++ 12 files changed, 468 insertions(+), 64 deletions(-) create mode 100644 snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/server/ServerRpcClient.java create mode 100644 snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/GetRegNodesPostHttpRequestHandler.java create mode 100644 snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/schedule/RefreshNodeSchedule.java diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java index e055c15d..f118f27c 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java @@ -120,6 +120,8 @@ public interface SystemConstants { */ String RETRY_CALLBACK = "/retry/callback/v1"; + String GET_REG_NODES_AND_REFRESH = "/server/regAndRefresh/v1"; + /** * 获取重试幂等id */ diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/cache/CacheToken.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/cache/CacheToken.java index be0560c4..25516fba 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/cache/CacheToken.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/cache/CacheToken.java @@ -4,6 +4,8 @@ import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.context.SnailSpringContext; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.Lifecycle; +import com.aizuda.snailjob.server.common.config.SystemProperties; +import com.aizuda.snailjob.server.common.register.ServerRegister; import com.aizuda.snailjob.server.common.triple.Pair; import com.aizuda.snailjob.template.datasource.access.AccessTemplate; import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig; @@ -29,7 +31,9 @@ public class CacheToken implements Lifecycle { } public static String get(String groupName, String namespaceId) { - + if (groupName.equals(ServerRegister.GROUP_NAME)){ + return getServerToken(); + } String token = CACHE.getIfPresent(Pair.of(groupName, namespaceId)); if (StrUtil.isBlank(token)) { // 从DB获取数据 @@ -46,6 +50,11 @@ public class CacheToken implements Lifecycle { return token; } + private static String getServerToken() { + SystemProperties properties = SnailSpringContext.getBean(SystemProperties.class); + return properties.getServerToken(); + } + @Override public void start() { SnailJobLog.LOCAL.info("CacheToken start"); diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SystemProperties.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SystemProperties.java index 9f83329a..6f015a2e 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SystemProperties.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/config/SystemProperties.java @@ -44,6 +44,10 @@ public class SystemProperties { */ private int nettyPort = 1788; + /** + * server token + */ + private String serverToken = "SJ_H9HGGmrX3QBVTfsAAG2mcKH3SR7bCLsK"; /** * 一个客户端每秒最多接收的重试数量指令 */ diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/ClientRegister.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/ClientRegister.java index 13d85cdb..8a020d02 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/ClientRegister.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/ClientRegister.java @@ -83,30 +83,44 @@ public class ClientRegister extends AbstractRegister implements Runnable { } } + public static List getExpireNodes(){ + try { + ServerNode serverNode = QUEUE.poll(5L, TimeUnit.SECONDS); + if (Objects.nonNull(serverNode)) { + List lists = Lists.newArrayList(serverNode); + QUEUE.drainTo(lists, 256); + return lists; + } + } catch (InterruptedException e) { + SnailJobLog.LOCAL.error("client get expireNodes error."); + } + return null; + } + @Override public void run() { - while (!Thread.currentThread().isInterrupted()) { - try { - ServerNode serverNode = QUEUE.poll(5L, TimeUnit.SECONDS); - if (Objects.nonNull(serverNode)) { - List lists = Lists.newArrayList(serverNode); - QUEUE.drainTo(lists, 256); - - // 注册或续租 - refreshExpireAt(lists); - } - } catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - } catch (Exception e) { - SnailJobLog.LOCAL.error("client refresh expireAt error."); - } finally { - // 防止刷的过快 - try { - TimeUnit.MILLISECONDS.sleep(2000); - } catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - } - } - } +// while (!Thread.currentThread().isInterrupted()) { +// try { +// ServerNode serverNode = QUEUE.poll(5L, TimeUnit.SECONDS); +// if (Objects.nonNull(serverNode)) { +// List lists = Lists.newArrayList(serverNode); +// QUEUE.drainTo(lists, 256); +// +// // 注册或续租 +// refreshExpireAt(lists); +// } +// } catch (InterruptedException ignored) { +// Thread.currentThread().interrupt(); +// } catch (Exception e) { +// SnailJobLog.LOCAL.error("client refresh expireAt error."); +// } finally { +// // 防止刷的过快 +// try { +// TimeUnit.MILLISECONDS.sleep(2000); +// } catch (InterruptedException ignored) { +// Thread.currentThread().interrupt(); +// } +// } +// } } } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/GrpcChannel.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/GrpcChannel.java index 0ef2ae7e..c8523480 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/GrpcChannel.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/GrpcChannel.java @@ -1,18 +1,21 @@ package com.aizuda.snailjob.server.common.rpc.client; +import cn.hutool.core.util.IdUtil; +import com.aizuda.snailjob.common.core.constant.SystemConstants; import com.aizuda.snailjob.common.core.context.SnailSpringContext; +import com.aizuda.snailjob.common.core.enums.HeadersEnum; import com.aizuda.snailjob.common.core.grpc.auto.GrpcResult; import com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest; import com.aizuda.snailjob.common.core.grpc.auto.Metadata; +import com.aizuda.snailjob.common.core.util.NetUtil; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.config.SystemProperties; import com.aizuda.snailjob.server.common.config.SystemProperties.RpcClientProperties; import com.aizuda.snailjob.server.common.config.SystemProperties.ThreadPoolConfig; +import com.aizuda.snailjob.server.common.register.ServerRegister; import com.aizuda.snailjob.server.common.triple.Pair; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.protobuf.Any; -import com.google.protobuf.UnsafeByteOperations; import io.grpc.DecompressorRegistry; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; @@ -29,6 +32,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; + /** * @author opensnail * @date 2023-05-13 @@ -38,7 +42,7 @@ import java.util.concurrent.TimeUnit; public class GrpcChannel { private GrpcChannel() { } - + private static final String HOST_ID = IdUtil.getSnowflake().nextIdStr(); private static final ThreadPoolExecutor grpcExecutor = createGrpcExecutor(); private static ConcurrentHashMap, ManagedChannel> CHANNEL_MAP = new ConcurrentHashMap<>(16); @@ -75,6 +79,12 @@ public class GrpcChannel { return null; } } + headersMap.put(HeadersEnum.HOST_ID.getKey(), HOST_ID); + headersMap.put(HeadersEnum.HOST_IP.getKey(), NetUtil.getLocalIpStr()); + headersMap.put(HeadersEnum.GROUP_NAME.getKey(), ServerRegister.GROUP_NAME); + headersMap.put(HeadersEnum.HOST_PORT.getKey(), getServerPort()); + headersMap.put(HeadersEnum.NAMESPACE.getKey(), SystemConstants.DEFAULT_NAMESPACE); + headersMap.put(HeadersEnum.TOKEN.getKey(), getServerToken()); Metadata metadata = Metadata .newBuilder() @@ -103,6 +113,16 @@ public class GrpcChannel { } + private static String getServerToken() { + SystemProperties properties = SnailSpringContext.getBean(SystemProperties.class); + return properties.getServerToken(); + } + + private static String getServerPort() { + SystemProperties properties = SnailSpringContext.getBean(SystemProperties.class); + return String.valueOf(properties.getNettyPort()); + } + /** * 连接客户端 * diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/NettyChannel.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/NettyChannel.java index 63618704..eaecfc12 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/NettyChannel.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/NettyChannel.java @@ -1,6 +1,13 @@ package com.aizuda.snailjob.server.common.rpc.client; +import cn.hutool.core.util.IdUtil; +import com.aizuda.snailjob.common.core.constant.SystemConstants; +import com.aizuda.snailjob.common.core.context.SnailSpringContext; +import com.aizuda.snailjob.common.core.enums.HeadersEnum; +import com.aizuda.snailjob.common.core.util.NetUtil; import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.config.SystemProperties; +import com.aizuda.snailjob.server.common.register.ServerRegister; import com.aizuda.snailjob.server.common.triple.Pair; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; @@ -13,6 +20,7 @@ import java.net.ConnectException; import java.nio.channels.ClosedChannelException; import java.nio.charset.StandardCharsets; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -24,6 +32,7 @@ import java.util.concurrent.TimeUnit; @Slf4j public class NettyChannel { private static Bootstrap bootstrap; + private static final String HOST_ID = IdUtil.getSnowflake().nextIdStr(); private static ConcurrentHashMap, Channel> CHANNEL_MAP = new ConcurrentHashMap<>(16); private NettyChannel() { } @@ -76,6 +85,12 @@ public class NettyChannel { .set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE) // 设置传递请求内容的长度 .set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes()) + .set(HeadersEnum.HOST_ID.getKey(), HOST_ID) + .set(HeadersEnum.HOST_IP.getKey(), NetUtil.getLocalIpStr()) + .set(HeadersEnum.GROUP_NAME.getKey(), ServerRegister.GROUP_NAME) + .set(HeadersEnum.HOST_PORT.getKey(), getServerPort()) + .set(HeadersEnum.NAMESPACE.getKey(), SystemConstants.DEFAULT_NAMESPACE) + .set(HeadersEnum.TOKEN.getKey(), getServerToken()) ; request.headers().setAll(requestHeaders); @@ -83,6 +98,16 @@ public class NettyChannel { channel.writeAndFlush(request).sync(); } + private static String getServerToken() { + SystemProperties properties = SnailSpringContext.getBean(SystemProperties.class); + return properties.getServerToken(); + } + + private static String getServerPort() { + SystemProperties properties = SnailSpringContext.getBean(SystemProperties.class); + return String.valueOf(properties.getNettyPort()); + } + /** * 连接客户端 * diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/server/GrpcRequestHandlerActor.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/server/GrpcRequestHandlerActor.java index 698f7be2..6de39ae9 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/server/GrpcRequestHandlerActor.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/server/GrpcRequestHandlerActor.java @@ -105,12 +105,6 @@ public class GrpcRequestHandlerActor extends AbstractActor { } private SnailJobRpcResult doProcess(String uri, String content, Map headersMap) { - - Register register = SnailSpringContext.getBean(ClientRegister.BEAN_NAME, Register.class); - - String hostId = headersMap.get(HeadersEnum.HOST_ID.getKey()); - String hostIp = headersMap.get(HeadersEnum.HOST_IP.getKey()); - Integer hostPort = Integer.valueOf(headersMap.get(HeadersEnum.HOST_PORT.getKey())); String groupName = headersMap.get(HeadersEnum.GROUP_NAME.getKey()); String namespace = headersMap.get(HeadersEnum.NAMESPACE.getKey()); String token = headersMap.get(HeadersEnum.TOKEN.getKey()); @@ -120,19 +114,6 @@ public class GrpcRequestHandlerActor extends AbstractActor { namespace, groupName, token); } - // 注册版本 此后后续版本将迁移至BeatHttpRequestHandler 只处理beat的心态注册 - RegisterContext registerContext = new RegisterContext(); - registerContext.setGroupName(groupName); - registerContext.setHostPort(hostPort); - registerContext.setHostIp(hostIp); - registerContext.setHostId(hostId); - registerContext.setUri(uri); - registerContext.setNamespaceId(namespace); - boolean result = register.register(registerContext); - if (!result) { - SnailJobLog.LOCAL.warn("client register error. groupName:[{}]", groupName); - } - DefaultHttpHeaders headers = new DefaultHttpHeaders(); headersMap.forEach(headers::add); diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/server/RequestHandlerActor.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/server/RequestHandlerActor.java index 0a12b3ce..4c35a3f8 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/server/RequestHandlerActor.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/server/RequestHandlerActor.java @@ -76,12 +76,6 @@ public class RequestHandlerActor extends AbstractActor { private SnailJobRpcResult doProcess(String uri, String content, HttpMethod method, HttpHeaders headers) { - - Register register = SnailSpringContext.getBean(ClientRegister.BEAN_NAME, Register.class); - - String hostId = headers.get(HeadersEnum.HOST_ID.getKey()); - String hostIp = headers.get(HeadersEnum.HOST_IP.getKey()); - Integer hostPort = headers.getInt(HeadersEnum.HOST_PORT.getKey()); String groupName = headers.get(HeadersEnum.GROUP_NAME.getKey()); String namespace = headers.get(HeadersEnum.NAMESPACE.getKey()); String token = headers.get(HeadersEnum.TOKEN.getKey()); @@ -90,18 +84,6 @@ public class RequestHandlerActor extends AbstractActor { throw new SnailJobServerException("Token authentication failed. [namespace:{} groupName:{} token:{}]", namespace, groupName, token); } - // 注册版本 此后后续版本将迁移至BeatHttpRequestHandler 只处理beat的心态注册 - RegisterContext registerContext = new RegisterContext(); - registerContext.setGroupName(groupName); - registerContext.setHostPort(hostPort); - registerContext.setHostIp(hostIp); - registerContext.setHostId(hostId); - registerContext.setUri(uri); - registerContext.setNamespaceId(namespace); - boolean result = register.register(registerContext); - if (!result) { - SnailJobLog.LOCAL.warn("client register error. groupName:[{}]", groupName); - } UrlBuilder builder = UrlBuilder.ofHttp(uri); Collection httpRequestHandlers = SnailSpringContext.getContext() diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/server/handler/BeatHttpRequestHandler.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/server/handler/BeatHttpRequestHandler.java index 69f6aed9..c5ae30d1 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/server/handler/BeatHttpRequestHandler.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/server/handler/BeatHttpRequestHandler.java @@ -2,11 +2,16 @@ package com.aizuda.snailjob.server.common.rpc.server.handler; import cn.hutool.core.net.url.UrlQuery; import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH; +import com.aizuda.snailjob.common.core.context.SnailSpringContext; +import com.aizuda.snailjob.common.core.enums.HeadersEnum; import com.aizuda.snailjob.common.core.model.SnailJobRpcResult; import com.aizuda.snailjob.common.core.model.SnailJobRequest; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.Register; import com.aizuda.snailjob.server.common.handler.GetHttpRequestHandler; +import com.aizuda.snailjob.server.common.register.ClientRegister; +import com.aizuda.snailjob.server.common.register.RegisterContext; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; import org.springframework.stereotype.Component; @@ -36,6 +41,18 @@ public class BeatHttpRequestHandler extends GetHttpRequestHandler { @Override public SnailJobRpcResult doHandler(String content, UrlQuery query, HttpHeaders headers) { SnailJobLog.LOCAL.debug("Beat check content:[{}]", content); + Register register = SnailSpringContext.getBean(ClientRegister.BEAN_NAME, Register.class); + RegisterContext registerContext = new RegisterContext(); + registerContext.setGroupName(headers.get(HeadersEnum.GROUP_NAME.getKey())); + registerContext.setHostPort(Integer.valueOf(headers.get(HeadersEnum.HOST_PORT.getKey()))); + registerContext.setHostIp(headers.get(HeadersEnum.HOST_IP.getKey())); + registerContext.setHostId(headers.get(HeadersEnum.HOST_ID.getKey())); + registerContext.setUri(HTTP_PATH.BEAT); + registerContext.setNamespaceId(headers.get(HeadersEnum.NAMESPACE.getKey())); + boolean result = register.register(registerContext); + if (!result) { + SnailJobLog.LOCAL.warn("client register error. groupName:[{}]", headers.get(HeadersEnum.GROUP_NAME.getKey())); + } SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class); return new SnailJobRpcResult(PONG, retryRequest.getReqId()); } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/server/ServerRpcClient.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/server/ServerRpcClient.java new file mode 100644 index 00000000..914c7b1e --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/server/ServerRpcClient.java @@ -0,0 +1,20 @@ +package com.aizuda.snailjob.server.job.task.server; + +import com.aizuda.snailjob.common.core.model.Result; +import com.aizuda.snailjob.server.common.rpc.client.RequestMethod; +import com.aizuda.snailjob.server.common.rpc.client.annotation.Mapping; + + +import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.GET_REG_NODES_AND_REFRESH; + +/** + * @Author:srzou + * @Package:com.aizuda.snailjob.server.job.task.server + * @Project:snail-job + * @Date:2024/12/11 9:36 + * @Filename:ServerRpcClient + */ +public interface ServerRpcClient { + @Mapping(path = GET_REG_NODES_AND_REFRESH, method = RequestMethod.GET) + Result getRegNodesAndFlush(); +} diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/GetRegNodesPostHttpRequestHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/GetRegNodesPostHttpRequestHandler.java new file mode 100644 index 00000000..1ec4e5fe --- /dev/null +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/GetRegNodesPostHttpRequestHandler.java @@ -0,0 +1,74 @@ +package com.aizuda.snailjob.server.job.task.support.request; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.net.url.UrlQuery; +import com.aizuda.snailjob.common.core.model.SnailJobRequest; +import com.aizuda.snailjob.common.core.model.SnailJobRpcResult; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.cache.CacheConsumerGroup; +import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; +import com.aizuda.snailjob.server.common.handler.GetHttpRequestHandler; +import com.aizuda.snailjob.server.common.register.ClientRegister; +import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.Objects; + +import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.GET_REG_NODES_AND_REFRESH; +import static com.aizuda.snailjob.server.common.register.ClientRegister.DELAY_TIME; + +/** + * 获取服务端缓存的客户端节点 并刷新本地时间 + * + */ +@Component +public class GetRegNodesPostHttpRequestHandler extends GetHttpRequestHandler { + + @Override + public boolean supports(String path) { + return GET_REG_NODES_AND_REFRESH.equals(path); + } + + @Override + public HttpMethod method() { + return HttpMethod.GET; + } + + @Override + public SnailJobRpcResult doHandler(String content, UrlQuery query, HttpHeaders headers) { + SnailJobLog.LOCAL.debug("Client Callback Request. content:[{}]", content); + + SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class); + + + List refreshCache = getAndRefreshCache(); + String json = null; + if (CollUtil.isNotEmpty(refreshCache)){ + json = JsonUtil.toJsonString(refreshCache); + } + return new SnailJobRpcResult(json, retryRequest.getReqId()); + } + + public static List getAndRefreshCache() { + // 获取当前所有需要续签的node + List expireNodes = ClientRegister.getExpireNodes(); + if (Objects.nonNull(expireNodes)) { + // 进行本地续签 + for (final ServerNode serverNode : expireNodes) { + serverNode.setExpireAt(LocalDateTime.now().plusSeconds(DELAY_TIME)); + // 刷新全量本地缓存 + CacheRegisterTable.addOrUpdate(serverNode); + // 刷新过期时间 + CacheConsumerGroup.addOrUpdate(serverNode.getGroupName(), serverNode.getNamespaceId()); + } + } + return expireNodes; + } + + +} diff --git a/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/schedule/RefreshNodeSchedule.java b/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/schedule/RefreshNodeSchedule.java new file mode 100644 index 00000000..40258496 --- /dev/null +++ b/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/schedule/RefreshNodeSchedule.java @@ -0,0 +1,256 @@ +package com.aizuda.snailjob.server.starter.schedule; + +import cn.hutool.core.collection.CollUtil; +import com.aizuda.snailjob.common.core.enums.NodeTypeEnum; +import com.aizuda.snailjob.common.core.model.Result; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.core.util.NetUtil; +import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.Lifecycle; +import com.aizuda.snailjob.server.common.config.SystemProperties; +import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; +import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder; +import com.aizuda.snailjob.server.common.schedule.AbstractSchedule; +import com.aizuda.snailjob.server.common.triple.Pair; +import com.aizuda.snailjob.server.job.task.server.ServerRpcClient; +import com.aizuda.snailjob.server.job.task.support.request.GetRegNodesPostHttpRequestHandler; +import com.aizuda.snailjob.template.datasource.persistence.mapper.ServerNodeMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.dao.DuplicateKeyException; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.*; +import java.util.stream.Collectors; + +import static com.aizuda.snailjob.server.common.register.ClientRegister.DELAY_TIME; + + +@Component +@Slf4j +@RequiredArgsConstructor +public class RefreshNodeSchedule extends AbstractSchedule implements Lifecycle { + private final ServerNodeMapper serverNodeMapper; + + private final SystemProperties systemProperties; + + ExecutorService executorService = Executors.newFixedThreadPool(5); + + @Override + protected void doExecute() { + int nettyPort = systemProperties.getNettyPort(); + String localIpStr = NetUtil.getLocalIpStr(); + try { + // 获取在线的客户端节点并且排除当前节点 + LambdaQueryWrapper wrapper = new LambdaQueryWrapper() + .eq(ServerNode::getNodeType, NodeTypeEnum.SERVER.getType()) + .not(w -> w.eq(ServerNode::getHostIp, localIpStr) + .eq(ServerNode::getHostPort, nettyPort)); + List serverNodes = serverNodeMapper.selectList(wrapper); + List clientNodes = new ArrayList<>(); + if (serverNodes.size() > 0) { + // 并行获取所有服务端需要注册的列表 + // 获取列表 并完成注册/本地完成续签 + List allClientList = getAllClientList(serverNodes); + if (CollUtil.isNotEmpty(allClientList)) { + clientNodes.addAll(allClientList); + } + List refreshCache = GetRegNodesPostHttpRequestHandler.getAndRefreshCache(); + if (CollUtil.isNotEmpty(refreshCache)) { + // 完成本节点的刷新 + clientNodes.addAll(refreshCache); + } + } else { + List refreshCache = GetRegNodesPostHttpRequestHandler.getAndRefreshCache(); + if (CollUtil.isNotEmpty(refreshCache)) { + // 完成本节点的刷新 + clientNodes.addAll(refreshCache); + } + } + if (CollUtil.isEmpty(clientNodes)){ + SnailJobLog.LOCAL.warn("clientNodes is empty"); + return; + } + SnailJobLog.LOCAL.info("start refresh client nodes:{}", clientNodes); + refreshExpireAt(clientNodes); + + } catch (Exception e) { + SnailJobLog.LOCAL.error("refresh 失败", e); + } + } + + private List getAllClientList(List serverNodes) { + int size = serverNodes.size(); + // 创建 CountDownLatch + CountDownLatch latch = new CountDownLatch(size); + + // 存储处理结果 + List> futures = new ArrayList<>(size); + + try { + for (ServerNode serverNode : serverNodes) { + Future future = executorService.submit(() -> { + try { + RegisterNodeInfo nodeInfo = new RegisterNodeInfo(); + nodeInfo.setHostId(serverNode.getHostId()); + nodeInfo.setGroupName(serverNode.getGroupName()); + nodeInfo.setNamespaceId(serverNode.getNamespaceId()); + nodeInfo.setHostPort(serverNode.getHostPort()); + nodeInfo.setHostIp(serverNode.getHostIp()); + ServerRpcClient serverRpcClient = buildRpcClient(nodeInfo); + Result regNodesAndFlush = serverRpcClient.getRegNodesAndFlush(); + + // 模拟耗时处理 + return regNodesAndFlush.getData(); + } finally { + // 处理完成后计数减一 + latch.countDown(); + } + }); + futures.add(future); + } + // 提交任务 + + // 等待所有任务完成 + latch.await(5, TimeUnit.SECONDS); // 设置超时时间为5秒 + + return futures.stream() + .map(future -> { + try { + String jsonString = future.get(1, TimeUnit.SECONDS); + if (Objects.nonNull(jsonString)) { + return JsonUtil.parseObject(jsonString, new TypeReference>() { + }); + } + return new ArrayList(); + } catch (Exception e) { + return new ArrayList(); + } + }) + .filter(Objects::nonNull) + .flatMap(List::stream) + .distinct() + .toList(); + // 收集处理结果 + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public String lockName() { + return "registerNode"; + } + + @Override + public String lockAtMost() { + return "PT10S"; + } + + @Override + public String lockAtLeast() { + return "PT5S"; + } + + @Override + public void start() { + taskScheduler.scheduleWithFixedDelay(this::execute, Instant.now(), Duration.parse("PT5S")); + } + + @Override + public void close() { + + } + + private ServerRpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo) { +// String regInfo = registerNodeInfo.getHostId() + "/" + registerNodeInfo.getHostIp() + "/" + registerNodeInfo.getHostPort(); +// log.info(regInfo + "--------------------------"); + int maxRetryTimes = 3; + boolean retry = false; + return RequestBuilder.newBuilder() + .nodeInfo(registerNodeInfo) + .failRetry(maxRetryTimes > 0 && !retry) + .retryTimes(maxRetryTimes) + .client(ServerRpcClient.class) + .build(); + } + + private void refreshExpireAt(List serverNodes) { + if (CollUtil.isEmpty(serverNodes)) { + return; + } + + Set hostIds = Sets.newHashSet(); + Set hostIps = Sets.newHashSet(); + for (final ServerNode serverNode : serverNodes) { + serverNode.setExpireAt(getExpireAt()); + hostIds.add(serverNode.getHostId()); + hostIps.add(serverNode.getHostIp()); + } + + List dbServerNodes = serverNodeMapper.selectList( + new LambdaQueryWrapper() + .select(ServerNode::getHostIp, ServerNode::getHostId) + .in(ServerNode::getHostId, hostIds) + .in(ServerNode::getHostIp, hostIps) + ); + + List insertDBs = Lists.newArrayList(); + List updateDBs = Lists.newArrayList(); + Set> pairs = dbServerNodes.stream() + .map(serverNode -> Pair.of(serverNode.getHostId(), serverNode.getHostIp())).collect( + Collectors.toSet()); + + // 去重处理 + Set> existed = Sets.newHashSet(); + for (final ServerNode serverNode : serverNodes) { + Pair pair = Pair.of(serverNode.getHostId(), serverNode.getHostIp()); + if (existed.contains(pair)) { + continue; + } + + if (pairs.contains(pair)) { + updateDBs.add(serverNode); + } else { + insertDBs.add(serverNode); + } + + existed.add(pair); + } + + try { + // 批量更新 + if (CollUtil.isNotEmpty(updateDBs)) { + serverNodeMapper.updateBatchExpireAt(updateDBs); + } + } catch (Exception e) { + SnailJobLog.LOCAL.error("续租失败", e); + } + + try { + if (CollUtil.isNotEmpty(insertDBs)) { + serverNodeMapper.insertBatch(insertDBs); + } + } catch (DuplicateKeyException ignored) { + } catch (Exception e) { + SnailJobLog.LOCAL.error("注册节点失败", e); + } + } + + private LocalDateTime getExpireAt() { + return LocalDateTime.now().plusSeconds(DELAY_TIME); + } +}