feat(sj_1.3.0-beta1): 调整客户端注册逻辑;使用主节点模式对客户端进行续签

This commit is contained in:
srzou 2024-12-11 21:37:34 +08:00
parent 11c57a9686
commit 5367009a78
12 changed files with 468 additions and 64 deletions

View File

@ -120,6 +120,8 @@ public interface SystemConstants {
*/
String RETRY_CALLBACK = "/retry/callback/v1";
String GET_REG_NODES_AND_REFRESH = "/server/regAndRefresh/v1";
/**
* 获取重试幂等id
*/

View File

@ -4,6 +4,8 @@ import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.register.ServerRegister;
import com.aizuda.snailjob.server.common.triple.Pair;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
@ -29,7 +31,9 @@ public class CacheToken implements Lifecycle {
}
public static String get(String groupName, String namespaceId) {
if (groupName.equals(ServerRegister.GROUP_NAME)){
return getServerToken();
}
String token = CACHE.getIfPresent(Pair.of(groupName, namespaceId));
if (StrUtil.isBlank(token)) {
// 从DB获取数据
@ -46,6 +50,11 @@ public class CacheToken implements Lifecycle {
return token;
}
private static String getServerToken() {
SystemProperties properties = SnailSpringContext.getBean(SystemProperties.class);
return properties.getServerToken();
}
@Override
public void start() {
SnailJobLog.LOCAL.info("CacheToken start");

View File

@ -44,6 +44,10 @@ public class SystemProperties {
*/
private int nettyPort = 1788;
/**
* server token
*/
private String serverToken = "SJ_H9HGGmrX3QBVTfsAAG2mcKH3SR7bCLsK";
/**
* 一个客户端每秒最多接收的重试数量指令
*/

View File

@ -83,30 +83,44 @@ public class ClientRegister extends AbstractRegister implements Runnable {
}
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
public static List<ServerNode> getExpireNodes(){
try {
ServerNode serverNode = QUEUE.poll(5L, TimeUnit.SECONDS);
if (Objects.nonNull(serverNode)) {
List<ServerNode> lists = Lists.newArrayList(serverNode);
QUEUE.drainTo(lists, 256);
return lists;
}
} catch (InterruptedException e) {
SnailJobLog.LOCAL.error("client get expireNodes error.");
}
return null;
}
// 注册或续租
refreshExpireAt(lists);
}
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
} catch (Exception e) {
SnailJobLog.LOCAL.error("client refresh expireAt error.");
} finally {
// 防止刷的过快
try {
TimeUnit.MILLISECONDS.sleep(2000);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
}
@Override
public void run() {
// while (!Thread.currentThread().isInterrupted()) {
// try {
// ServerNode serverNode = QUEUE.poll(5L, TimeUnit.SECONDS);
// if (Objects.nonNull(serverNode)) {
// List<ServerNode> lists = Lists.newArrayList(serverNode);
// QUEUE.drainTo(lists, 256);
//
// // 注册或续租
// refreshExpireAt(lists);
// }
// } catch (InterruptedException ignored) {
// Thread.currentThread().interrupt();
// } catch (Exception e) {
// SnailJobLog.LOCAL.error("client refresh expireAt error.");
// } finally {
// // 防止刷的过快
// try {
// TimeUnit.MILLISECONDS.sleep(2000);
// } catch (InterruptedException ignored) {
// Thread.currentThread().interrupt();
// }
// }
// }
}
}

View File

@ -1,18 +1,21 @@
package com.aizuda.snailjob.server.common.rpc.client;
import cn.hutool.core.util.IdUtil;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.HeadersEnum;
import com.aizuda.snailjob.common.core.grpc.auto.GrpcResult;
import com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest;
import com.aizuda.snailjob.common.core.grpc.auto.Metadata;
import com.aizuda.snailjob.common.core.util.NetUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.config.SystemProperties.RpcClientProperties;
import com.aizuda.snailjob.server.common.config.SystemProperties.ThreadPoolConfig;
import com.aizuda.snailjob.server.common.register.ServerRegister;
import com.aizuda.snailjob.server.common.triple.Pair;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.Any;
import com.google.protobuf.UnsafeByteOperations;
import io.grpc.DecompressorRegistry;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
@ -29,6 +32,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author opensnail
* @date 2023-05-13
@ -38,7 +42,7 @@ import java.util.concurrent.TimeUnit;
public class GrpcChannel {
private GrpcChannel() {
}
private static final String HOST_ID = IdUtil.getSnowflake().nextIdStr();
private static final ThreadPoolExecutor grpcExecutor = createGrpcExecutor();
private static ConcurrentHashMap<Pair<String, String>, ManagedChannel> CHANNEL_MAP = new ConcurrentHashMap<>(16);
@ -75,6 +79,12 @@ public class GrpcChannel {
return null;
}
}
headersMap.put(HeadersEnum.HOST_ID.getKey(), HOST_ID);
headersMap.put(HeadersEnum.HOST_IP.getKey(), NetUtil.getLocalIpStr());
headersMap.put(HeadersEnum.GROUP_NAME.getKey(), ServerRegister.GROUP_NAME);
headersMap.put(HeadersEnum.HOST_PORT.getKey(), getServerPort());
headersMap.put(HeadersEnum.NAMESPACE.getKey(), SystemConstants.DEFAULT_NAMESPACE);
headersMap.put(HeadersEnum.TOKEN.getKey(), getServerToken());
Metadata metadata = Metadata
.newBuilder()
@ -103,6 +113,16 @@ public class GrpcChannel {
}
private static String getServerToken() {
SystemProperties properties = SnailSpringContext.getBean(SystemProperties.class);
return properties.getServerToken();
}
private static String getServerPort() {
SystemProperties properties = SnailSpringContext.getBean(SystemProperties.class);
return String.valueOf(properties.getNettyPort());
}
/**
* 连接客户端
*

View File

@ -1,6 +1,13 @@
package com.aizuda.snailjob.server.common.rpc.client;
import cn.hutool.core.util.IdUtil;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.HeadersEnum;
import com.aizuda.snailjob.common.core.util.NetUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.register.ServerRegister;
import com.aizuda.snailjob.server.common.triple.Pair;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
@ -13,6 +20,7 @@ import java.net.ConnectException;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@ -24,6 +32,7 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public class NettyChannel {
private static Bootstrap bootstrap;
private static final String HOST_ID = IdUtil.getSnowflake().nextIdStr();
private static ConcurrentHashMap<Pair<String, String>, Channel> CHANNEL_MAP = new ConcurrentHashMap<>(16);
private NettyChannel() {
}
@ -76,6 +85,12 @@ public class NettyChannel {
.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
// 设置传递请求内容的长度
.set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes())
.set(HeadersEnum.HOST_ID.getKey(), HOST_ID)
.set(HeadersEnum.HOST_IP.getKey(), NetUtil.getLocalIpStr())
.set(HeadersEnum.GROUP_NAME.getKey(), ServerRegister.GROUP_NAME)
.set(HeadersEnum.HOST_PORT.getKey(), getServerPort())
.set(HeadersEnum.NAMESPACE.getKey(), SystemConstants.DEFAULT_NAMESPACE)
.set(HeadersEnum.TOKEN.getKey(), getServerToken())
;
request.headers().setAll(requestHeaders);
@ -83,6 +98,16 @@ public class NettyChannel {
channel.writeAndFlush(request).sync();
}
private static String getServerToken() {
SystemProperties properties = SnailSpringContext.getBean(SystemProperties.class);
return properties.getServerToken();
}
private static String getServerPort() {
SystemProperties properties = SnailSpringContext.getBean(SystemProperties.class);
return String.valueOf(properties.getNettyPort());
}
/**
* 连接客户端
*

View File

@ -105,12 +105,6 @@ public class GrpcRequestHandlerActor extends AbstractActor {
}
private SnailJobRpcResult doProcess(String uri, String content, Map<String, String> headersMap) {
Register register = SnailSpringContext.getBean(ClientRegister.BEAN_NAME, Register.class);
String hostId = headersMap.get(HeadersEnum.HOST_ID.getKey());
String hostIp = headersMap.get(HeadersEnum.HOST_IP.getKey());
Integer hostPort = Integer.valueOf(headersMap.get(HeadersEnum.HOST_PORT.getKey()));
String groupName = headersMap.get(HeadersEnum.GROUP_NAME.getKey());
String namespace = headersMap.get(HeadersEnum.NAMESPACE.getKey());
String token = headersMap.get(HeadersEnum.TOKEN.getKey());
@ -120,19 +114,6 @@ public class GrpcRequestHandlerActor extends AbstractActor {
namespace, groupName, token);
}
// 注册版本 此后后续版本将迁移至BeatHttpRequestHandler 只处理beat的心态注册
RegisterContext registerContext = new RegisterContext();
registerContext.setGroupName(groupName);
registerContext.setHostPort(hostPort);
registerContext.setHostIp(hostIp);
registerContext.setHostId(hostId);
registerContext.setUri(uri);
registerContext.setNamespaceId(namespace);
boolean result = register.register(registerContext);
if (!result) {
SnailJobLog.LOCAL.warn("client register error. groupName:[{}]", groupName);
}
DefaultHttpHeaders headers = new DefaultHttpHeaders();
headersMap.forEach(headers::add);

View File

@ -76,12 +76,6 @@ public class RequestHandlerActor extends AbstractActor {
private SnailJobRpcResult doProcess(String uri, String content, HttpMethod method,
HttpHeaders headers) {
Register register = SnailSpringContext.getBean(ClientRegister.BEAN_NAME, Register.class);
String hostId = headers.get(HeadersEnum.HOST_ID.getKey());
String hostIp = headers.get(HeadersEnum.HOST_IP.getKey());
Integer hostPort = headers.getInt(HeadersEnum.HOST_PORT.getKey());
String groupName = headers.get(HeadersEnum.GROUP_NAME.getKey());
String namespace = headers.get(HeadersEnum.NAMESPACE.getKey());
String token = headers.get(HeadersEnum.TOKEN.getKey());
@ -90,18 +84,6 @@ public class RequestHandlerActor extends AbstractActor {
throw new SnailJobServerException("Token authentication failed. [namespace:{} groupName:{} token:{}]", namespace, groupName, token);
}
// 注册版本 此后后续版本将迁移至BeatHttpRequestHandler 只处理beat的心态注册
RegisterContext registerContext = new RegisterContext();
registerContext.setGroupName(groupName);
registerContext.setHostPort(hostPort);
registerContext.setHostIp(hostIp);
registerContext.setHostId(hostId);
registerContext.setUri(uri);
registerContext.setNamespaceId(namespace);
boolean result = register.register(registerContext);
if (!result) {
SnailJobLog.LOCAL.warn("client register error. groupName:[{}]", groupName);
}
UrlBuilder builder = UrlBuilder.ofHttp(uri);
Collection<HttpRequestHandler> httpRequestHandlers = SnailSpringContext.getContext()

View File

@ -2,11 +2,16 @@ package com.aizuda.snailjob.server.common.rpc.server.handler;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.HeadersEnum;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.Register;
import com.aizuda.snailjob.server.common.handler.GetHttpRequestHandler;
import com.aizuda.snailjob.server.common.register.ClientRegister;
import com.aizuda.snailjob.server.common.register.RegisterContext;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import org.springframework.stereotype.Component;
@ -36,6 +41,18 @@ public class BeatHttpRequestHandler extends GetHttpRequestHandler {
@Override
public SnailJobRpcResult doHandler(String content, UrlQuery query, HttpHeaders headers) {
SnailJobLog.LOCAL.debug("Beat check content:[{}]", content);
Register register = SnailSpringContext.getBean(ClientRegister.BEAN_NAME, Register.class);
RegisterContext registerContext = new RegisterContext();
registerContext.setGroupName(headers.get(HeadersEnum.GROUP_NAME.getKey()));
registerContext.setHostPort(Integer.valueOf(headers.get(HeadersEnum.HOST_PORT.getKey())));
registerContext.setHostIp(headers.get(HeadersEnum.HOST_IP.getKey()));
registerContext.setHostId(headers.get(HeadersEnum.HOST_ID.getKey()));
registerContext.setUri(HTTP_PATH.BEAT);
registerContext.setNamespaceId(headers.get(HeadersEnum.NAMESPACE.getKey()));
boolean result = register.register(registerContext);
if (!result) {
SnailJobLog.LOCAL.warn("client register error. groupName:[{}]", headers.get(HeadersEnum.GROUP_NAME.getKey()));
}
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
return new SnailJobRpcResult(PONG, retryRequest.getReqId());
}

View File

@ -0,0 +1,20 @@
package com.aizuda.snailjob.server.job.task.server;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.server.common.rpc.client.RequestMethod;
import com.aizuda.snailjob.server.common.rpc.client.annotation.Mapping;
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.GET_REG_NODES_AND_REFRESH;
/**
* @Authorsrzou
* @Packagecom.aizuda.snailjob.server.job.task.server
* @Projectsnail-job
* @Date2024/12/11 9:36
* @FilenameServerRpcClient
*/
public interface ServerRpcClient {
@Mapping(path = GET_REG_NODES_AND_REFRESH, method = RequestMethod.GET)
Result<String> getRegNodesAndFlush();
}

View File

@ -0,0 +1,74 @@
package com.aizuda.snailjob.server.job.task.support.request;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.cache.CacheConsumerGroup;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.handler.GetHttpRequestHandler;
import com.aizuda.snailjob.server.common.register.ClientRegister;
import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.GET_REG_NODES_AND_REFRESH;
import static com.aizuda.snailjob.server.common.register.ClientRegister.DELAY_TIME;
/**
* 获取服务端缓存的客户端节点 并刷新本地时间
*
*/
@Component
public class GetRegNodesPostHttpRequestHandler extends GetHttpRequestHandler {
@Override
public boolean supports(String path) {
return GET_REG_NODES_AND_REFRESH.equals(path);
}
@Override
public HttpMethod method() {
return HttpMethod.GET;
}
@Override
public SnailJobRpcResult doHandler(String content, UrlQuery query, HttpHeaders headers) {
SnailJobLog.LOCAL.debug("Client Callback Request. content:[{}]", content);
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
List<ServerNode> refreshCache = getAndRefreshCache();
String json = null;
if (CollUtil.isNotEmpty(refreshCache)){
json = JsonUtil.toJsonString(refreshCache);
}
return new SnailJobRpcResult(json, retryRequest.getReqId());
}
public static List<ServerNode> getAndRefreshCache() {
// 获取当前所有需要续签的node
List<ServerNode> expireNodes = ClientRegister.getExpireNodes();
if (Objects.nonNull(expireNodes)) {
// 进行本地续签
for (final ServerNode serverNode : expireNodes) {
serverNode.setExpireAt(LocalDateTime.now().plusSeconds(DELAY_TIME));
// 刷新全量本地缓存
CacheRegisterTable.addOrUpdate(serverNode);
// 刷新过期时间
CacheConsumerGroup.addOrUpdate(serverNode.getGroupName(), serverNode.getNamespaceId());
}
}
return expireNodes;
}
}

View File

@ -0,0 +1,256 @@
package com.aizuda.snailjob.server.starter.schedule;
import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.enums.NodeTypeEnum;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.NetUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.server.common.schedule.AbstractSchedule;
import com.aizuda.snailjob.server.common.triple.Pair;
import com.aizuda.snailjob.server.job.task.server.ServerRpcClient;
import com.aizuda.snailjob.server.job.task.support.request.GetRegNodesPostHttpRequestHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.ServerNodeMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import static com.aizuda.snailjob.server.common.register.ClientRegister.DELAY_TIME;
@Component
@Slf4j
@RequiredArgsConstructor
public class RefreshNodeSchedule extends AbstractSchedule implements Lifecycle {
private final ServerNodeMapper serverNodeMapper;
private final SystemProperties systemProperties;
ExecutorService executorService = Executors.newFixedThreadPool(5);
@Override
protected void doExecute() {
int nettyPort = systemProperties.getNettyPort();
String localIpStr = NetUtil.getLocalIpStr();
try {
// 获取在线的客户端节点并且排除当前节点
LambdaQueryWrapper<ServerNode> wrapper = new LambdaQueryWrapper<ServerNode>()
.eq(ServerNode::getNodeType, NodeTypeEnum.SERVER.getType())
.not(w -> w.eq(ServerNode::getHostIp, localIpStr)
.eq(ServerNode::getHostPort, nettyPort));
List<ServerNode> serverNodes = serverNodeMapper.selectList(wrapper);
List<ServerNode> clientNodes = new ArrayList<>();
if (serverNodes.size() > 0) {
// 并行获取所有服务端需要注册的列表
// 获取列表 并完成注册/本地完成续签
List<ServerNode> allClientList = getAllClientList(serverNodes);
if (CollUtil.isNotEmpty(allClientList)) {
clientNodes.addAll(allClientList);
}
List<ServerNode> refreshCache = GetRegNodesPostHttpRequestHandler.getAndRefreshCache();
if (CollUtil.isNotEmpty(refreshCache)) {
// 完成本节点的刷新
clientNodes.addAll(refreshCache);
}
} else {
List<ServerNode> refreshCache = GetRegNodesPostHttpRequestHandler.getAndRefreshCache();
if (CollUtil.isNotEmpty(refreshCache)) {
// 完成本节点的刷新
clientNodes.addAll(refreshCache);
}
}
if (CollUtil.isEmpty(clientNodes)){
SnailJobLog.LOCAL.warn("clientNodes is empty");
return;
}
SnailJobLog.LOCAL.info("start refresh client nodes{}", clientNodes);
refreshExpireAt(clientNodes);
} catch (Exception e) {
SnailJobLog.LOCAL.error("refresh 失败", e);
}
}
private List<ServerNode> getAllClientList(List<ServerNode> serverNodes) {
int size = serverNodes.size();
// 创建 CountDownLatch
CountDownLatch latch = new CountDownLatch(size);
// 存储处理结果
List<Future<String>> futures = new ArrayList<>(size);
try {
for (ServerNode serverNode : serverNodes) {
Future<String> future = executorService.submit(() -> {
try {
RegisterNodeInfo nodeInfo = new RegisterNodeInfo();
nodeInfo.setHostId(serverNode.getHostId());
nodeInfo.setGroupName(serverNode.getGroupName());
nodeInfo.setNamespaceId(serverNode.getNamespaceId());
nodeInfo.setHostPort(serverNode.getHostPort());
nodeInfo.setHostIp(serverNode.getHostIp());
ServerRpcClient serverRpcClient = buildRpcClient(nodeInfo);
Result<String> regNodesAndFlush = serverRpcClient.getRegNodesAndFlush();
// 模拟耗时处理
return regNodesAndFlush.getData();
} finally {
// 处理完成后计数减一
latch.countDown();
}
});
futures.add(future);
}
// 提交任务
// 等待所有任务完成
latch.await(5, TimeUnit.SECONDS); // 设置超时时间为5秒
return futures.stream()
.map(future -> {
try {
String jsonString = future.get(1, TimeUnit.SECONDS);
if (Objects.nonNull(jsonString)) {
return JsonUtil.parseObject(jsonString, new TypeReference<List<ServerNode>>() {
});
}
return new ArrayList<ServerNode>();
} catch (Exception e) {
return new ArrayList<ServerNode>();
}
})
.filter(Objects::nonNull)
.flatMap(List::stream)
.distinct()
.toList();
// 收集处理结果
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public String lockName() {
return "registerNode";
}
@Override
public String lockAtMost() {
return "PT10S";
}
@Override
public String lockAtLeast() {
return "PT5S";
}
@Override
public void start() {
taskScheduler.scheduleWithFixedDelay(this::execute, Instant.now(), Duration.parse("PT5S"));
}
@Override
public void close() {
}
private ServerRpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo) {
// String regInfo = registerNodeInfo.getHostId() + "/" + registerNodeInfo.getHostIp() + "/" + registerNodeInfo.getHostPort();
// log.info(regInfo + "--------------------------");
int maxRetryTimes = 3;
boolean retry = false;
return RequestBuilder.<ServerRpcClient, Result>newBuilder()
.nodeInfo(registerNodeInfo)
.failRetry(maxRetryTimes > 0 && !retry)
.retryTimes(maxRetryTimes)
.client(ServerRpcClient.class)
.build();
}
private void refreshExpireAt(List<ServerNode> serverNodes) {
if (CollUtil.isEmpty(serverNodes)) {
return;
}
Set<String> hostIds = Sets.newHashSet();
Set<String> hostIps = Sets.newHashSet();
for (final ServerNode serverNode : serverNodes) {
serverNode.setExpireAt(getExpireAt());
hostIds.add(serverNode.getHostId());
hostIps.add(serverNode.getHostIp());
}
List<ServerNode> dbServerNodes = serverNodeMapper.selectList(
new LambdaQueryWrapper<ServerNode>()
.select(ServerNode::getHostIp, ServerNode::getHostId)
.in(ServerNode::getHostId, hostIds)
.in(ServerNode::getHostIp, hostIps)
);
List<ServerNode> insertDBs = Lists.newArrayList();
List<ServerNode> updateDBs = Lists.newArrayList();
Set<Pair<String, String>> pairs = dbServerNodes.stream()
.map(serverNode -> Pair.of(serverNode.getHostId(), serverNode.getHostIp())).collect(
Collectors.toSet());
// 去重处理
Set<Pair<String, String>> existed = Sets.newHashSet();
for (final ServerNode serverNode : serverNodes) {
Pair<String, String> pair = Pair.of(serverNode.getHostId(), serverNode.getHostIp());
if (existed.contains(pair)) {
continue;
}
if (pairs.contains(pair)) {
updateDBs.add(serverNode);
} else {
insertDBs.add(serverNode);
}
existed.add(pair);
}
try {
// 批量更新
if (CollUtil.isNotEmpty(updateDBs)) {
serverNodeMapper.updateBatchExpireAt(updateDBs);
}
} catch (Exception e) {
SnailJobLog.LOCAL.error("续租失败", e);
}
try {
if (CollUtil.isNotEmpty(insertDBs)) {
serverNodeMapper.insertBatch(insertDBs);
}
} catch (DuplicateKeyException ignored) {
} catch (Exception e) {
SnailJobLog.LOCAL.error("注册节点失败", e);
}
}
private LocalDateTime getExpireAt() {
return LocalDateTime.now().plusSeconds(DELAY_TIME);
}
}