feat:(1.3.0-beta1): 优化客户端注册

This commit is contained in:
opensnail 2024-12-24 23:29:17 +08:00
parent b50a2402ab
commit 03bc07fe95
4 changed files with 424 additions and 316 deletions

View File

@ -6,6 +6,7 @@ import com.aizuda.snailjob.server.common.rpc.client.annotation.Body;
import com.aizuda.snailjob.server.common.rpc.client.annotation.Mapping; import com.aizuda.snailjob.server.common.rpc.client.annotation.Mapping;
import com.aizuda.snailjob.server.model.dto.ConfigDTO; import com.aizuda.snailjob.server.model.dto.ConfigDTO;
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.GET_REG_NODES_AND_REFRESH;
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.SYNC_CONFIG; import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.SYNC_CONFIG;
/** /**
@ -20,4 +21,6 @@ public interface CommonRpcClient {
@Mapping(path = SYNC_CONFIG, method = RequestMethod.POST) @Mapping(path = SYNC_CONFIG, method = RequestMethod.POST)
Result syncConfig(@Body ConfigDTO configDTO); Result syncConfig(@Body ConfigDTO configDTO);
@Mapping(path = GET_REG_NODES_AND_REFRESH, method = RequestMethod.POST)
Result<String> getRegNodesAndFlush();
} }

View File

@ -1,22 +1,37 @@
package com.aizuda.snailjob.server.common.register; package com.aizuda.snailjob.server.common.register;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH; import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.snailjob.common.core.enums.NodeTypeEnum; import com.aizuda.snailjob.common.core.enums.NodeTypeEnum;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.NetUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.cache.CacheConsumerGroup;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.client.CommonRpcClient;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.server.common.schedule.AbstractSchedule;
import com.aizuda.snailjob.server.common.triple.Pair; import com.aizuda.snailjob.server.common.triple.Pair;
import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode; import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.*;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -28,12 +43,13 @@ import java.util.stream.Collectors;
*/ */
@Component(ClientRegister.BEAN_NAME) @Component(ClientRegister.BEAN_NAME)
@Slf4j @Slf4j
public class ClientRegister extends AbstractRegister implements Runnable { @RequiredArgsConstructor
public class ClientRegister extends AbstractRegister {
ExecutorService executorService = Executors.newFixedThreadPool(5);
private final RefreshNodeSchedule refreshNodeSchedule;
public static final String BEAN_NAME = "clientRegister"; public static final String BEAN_NAME = "clientRegister";
public static final int DELAY_TIME = 30; public static final int DELAY_TIME = 30;
private Thread THREAD = null;
protected static final LinkedBlockingDeque<ServerNode> QUEUE = new LinkedBlockingDeque<>(1000); protected static final LinkedBlockingDeque<ServerNode> QUEUE = new LinkedBlockingDeque<>(1000);
@Override @Override
@ -61,10 +77,8 @@ public class ClientRegister extends AbstractRegister implements Runnable {
@Override @Override
protected void afterProcessor(final ServerNode serverNode) { protected void afterProcessor(final ServerNode serverNode) {
} }
@Override @Override
protected Integer getNodeType() { protected Integer getNodeType() {
return NodeTypeEnum.CLIENT.getType(); return NodeTypeEnum.CLIENT.getType();
@ -72,18 +86,14 @@ public class ClientRegister extends AbstractRegister implements Runnable {
@Override @Override
public void start() { public void start() {
THREAD = new Thread(this, "client-register"); refreshNodeSchedule.startScheduler();
THREAD.start();
} }
@Override @Override
public void close() { public void close() {
if (Objects.nonNull(THREAD)) {
THREAD.interrupt();
}
} }
public static List<ServerNode> getExpireNodes(){ public static List<ServerNode> getExpireNodes() {
try { try {
ServerNode serverNode = QUEUE.poll(5L, TimeUnit.SECONDS); ServerNode serverNode = QUEUE.poll(5L, TimeUnit.SECONDS);
if (Objects.nonNull(serverNode)) { if (Objects.nonNull(serverNode)) {
@ -97,30 +107,145 @@ public class ClientRegister extends AbstractRegister implements Runnable {
return null; return null;
} }
public static List<ServerNode> refreshLocalCache() {
// 获取当前所有需要续签的node
List<ServerNode> expireNodes = ClientRegister.getExpireNodes();
if (Objects.nonNull(expireNodes)) {
// 进行本地续签
for (final ServerNode serverNode : expireNodes) {
serverNode.setExpireAt(LocalDateTime.now().plusSeconds(DELAY_TIME));
// 刷新全量本地缓存
CacheRegisterTable.addOrUpdate(serverNode);
// 刷新过期时间
CacheConsumerGroup.addOrUpdate(serverNode.getGroupName(), serverNode.getNamespaceId());
}
}
return expireNodes;
}
@Component
@Slf4j
@RequiredArgsConstructor
public class RefreshNodeSchedule extends AbstractSchedule {
@Override @Override
public void run() { protected void doExecute() {
// while (!Thread.currentThread().isInterrupted()) { try {
// try { // 获取在线的客户端节点并且排除当前节点
// ServerNode serverNode = QUEUE.poll(5L, TimeUnit.SECONDS); LambdaQueryWrapper<ServerNode> wrapper = new LambdaQueryWrapper<ServerNode>()
// if (Objects.nonNull(serverNode)) { .eq(ServerNode::getNodeType, NodeTypeEnum.SERVER.getType());
// List<ServerNode> lists = Lists.newArrayList(serverNode); List<ServerNode> serverNodes = serverNodeMapper.selectList(wrapper);
// QUEUE.drainTo(lists, 256);
// serverNodes = StreamUtils.filter(serverNodes, serverNode -> !serverNode.getHostId().equals(ServerRegister.CURRENT_CID));
// // 注册或续租
// refreshExpireAt(lists); List<ServerNode> waitRefreshDBClientNodes = new ArrayList<>();
// }
// } catch (InterruptedException ignored) { // 刷新本地缓存
// Thread.currentThread().interrupt(); List<ServerNode> refreshCache = refreshLocalCache();
// } catch (Exception e) { if (CollUtil.isNotEmpty(refreshCache)) {
// SnailJobLog.LOCAL.error("client refresh expireAt error."); // 完成本节点的刷新
// } finally { waitRefreshDBClientNodes.addAll(refreshCache);
// // 防止刷的过快 }
// try {
// TimeUnit.MILLISECONDS.sleep(2000); if (!serverNodes.isEmpty()) {
// } catch (InterruptedException ignored) { // 并行获取所有服务端需要注册的列表
// Thread.currentThread().interrupt(); // 获取列表 并完成注册/本地完成续签
// } List<ServerNode> allClientList = collectAllClientQueue(serverNodes);
// } if (CollUtil.isNotEmpty(allClientList)) {
// } waitRefreshDBClientNodes.addAll(allClientList);
}
}
if (CollUtil.isEmpty(waitRefreshDBClientNodes)) {
SnailJobLog.LOCAL.warn("clientNodes is empty");
return;
}
SnailJobLog.LOCAL.info("start refresh client nodes{}", waitRefreshDBClientNodes);
// 刷新DB
refreshExpireAt(waitRefreshDBClientNodes);
} catch (Exception e) {
SnailJobLog.LOCAL.error("refresh 失败", e);
}
}
private List<ServerNode> collectAllClientQueue(List<ServerNode> serverNodes) {
if (CollUtil.isEmpty(serverNodes)) {
return Lists.newArrayList();
}
int size = serverNodes.size();
// 存储处理结果
List<Future<String>> futures = new ArrayList<>(size);
for (ServerNode serverNode : serverNodes) {
Future<String> future = executorService.submit(() -> {
try {
RegisterNodeInfo nodeInfo = new RegisterNodeInfo();
nodeInfo.setHostId(serverNode.getHostId());
nodeInfo.setGroupName(serverNode.getGroupName());
nodeInfo.setNamespaceId(serverNode.getNamespaceId());
nodeInfo.setHostPort(serverNode.getHostPort());
nodeInfo.setHostIp(serverNode.getHostIp());
CommonRpcClient serverRpcClient = buildRpcClient(nodeInfo);
Result<String> regNodesAndFlush = serverRpcClient.getRegNodesAndFlush();
return regNodesAndFlush.getData();
} catch (Exception e) {
return StrUtil.EMPTY;
}
});
futures.add(future);
}
return futures.stream()
.map(future -> {
try {
String jsonString = future.get(1, TimeUnit.SECONDS);
if (Objects.nonNull(jsonString)) {
return JsonUtil.parseList(jsonString, ServerNode.class);
}
return new ArrayList<ServerNode>();
} catch (Exception e) {
return new ArrayList<ServerNode>();
}
})
.filter(Objects::nonNull)
.flatMap(List::stream)
.distinct()
.toList();
}
private CommonRpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo) {
int maxRetryTimes = 3;
return RequestBuilder.<CommonRpcClient, Result>newBuilder()
.nodeInfo(registerNodeInfo)
.failRetry(true)
.retryTimes(maxRetryTimes)
.client(CommonRpcClient.class)
.build();
}
@Override
public String lockName() {
return "registerNode";
}
@Override
public String lockAtMost() {
return "PT10S";
}
@Override
public String lockAtLeast() {
return "PT5S";
}
public void startScheduler() {
taskScheduler.scheduleWithFixedDelay(this::execute, Instant.now(), Duration.parse("PT5S"));
}
} }
} }

View File

@ -1,20 +0,0 @@
package com.aizuda.snailjob.server.job.task.server;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.server.common.rpc.client.RequestMethod;
import com.aizuda.snailjob.server.common.rpc.client.annotation.Mapping;
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.GET_REG_NODES_AND_REFRESH;
/**
* @Authorsrzou
* @Packagecom.aizuda.snailjob.server.job.task.server
* @Projectsnail-job
* @Date2024/12/11 9:36
* @FilenameServerRpcClient
*/
public interface ServerRpcClient {
@Mapping(path = GET_REG_NODES_AND_REFRESH, method = RequestMethod.POST)
Result<String> getRegNodesAndFlush();
}

View File

@ -1,256 +1,256 @@
package com.aizuda.snailjob.server.starter.schedule; //package com.aizuda.snailjob.server.starter.schedule;
//
import cn.hutool.core.collection.CollUtil; //import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.enums.NodeTypeEnum; //import com.aizuda.snailjob.common.core.enums.NodeTypeEnum;
import com.aizuda.snailjob.common.core.model.Result; //import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.core.util.JsonUtil; //import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.NetUtil; //import com.aizuda.snailjob.common.core.util.NetUtil;
import com.aizuda.snailjob.common.log.SnailJobLog; //import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.Lifecycle; //import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.common.config.SystemProperties; //import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; //import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder; //import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.server.common.schedule.AbstractSchedule; //import com.aizuda.snailjob.server.common.schedule.AbstractSchedule;
import com.aizuda.snailjob.server.common.triple.Pair; //import com.aizuda.snailjob.server.common.triple.Pair;
import com.aizuda.snailjob.server.job.task.server.ServerRpcClient; //import com.aizuda.snailjob.server.job.task.server.ServerRpcClient;
import com.aizuda.snailjob.server.job.task.support.request.GetRegNodesPostHttpRequestHandler; //import com.aizuda.snailjob.server.job.task.support.request.GetRegNodesPostHttpRequestHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.ServerNodeMapper; //import com.aizuda.snailjob.template.datasource.persistence.mapper.ServerNodeMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode; //import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; //import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fasterxml.jackson.core.type.TypeReference; //import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Lists; //import com.google.common.collect.Lists;
import com.google.common.collect.Sets; //import com.google.common.collect.Sets;
import lombok.RequiredArgsConstructor; //import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; //import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.DuplicateKeyException; //import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
import java.time.Duration; //import java.time.Duration;
import java.time.Instant; //import java.time.Instant;
import java.time.LocalDateTime; //import java.time.LocalDateTime;
import java.util.ArrayList; //import java.util.ArrayList;
import java.util.List; //import java.util.List;
import java.util.Objects; //import java.util.Objects;
import java.util.Set; //import java.util.Set;
import java.util.concurrent.*; //import java.util.concurrent.*;
import java.util.stream.Collectors; //import java.util.stream.Collectors;
//
import static com.aizuda.snailjob.server.common.register.ClientRegister.DELAY_TIME; //import static com.aizuda.snailjob.server.common.register.ClientRegister.DELAY_TIME;
//
//
@Component //@Component
@Slf4j //@Slf4j
@RequiredArgsConstructor //@RequiredArgsConstructor
public class RefreshNodeSchedule extends AbstractSchedule implements Lifecycle { //public class RefreshNodeSchedule extends AbstractSchedule implements Lifecycle {
private final ServerNodeMapper serverNodeMapper; // private final ServerNodeMapper serverNodeMapper;
//
private final SystemProperties systemProperties; // private final SystemProperties systemProperties;
//
ExecutorService executorService = Executors.newFixedThreadPool(5); // ExecutorService executorService = Executors.newFixedThreadPool(5);
//
@Override // @Override
protected void doExecute() { // protected void doExecute() {
int nettyPort = systemProperties.getNettyPort(); // int nettyPort = systemProperties.getNettyPort();
String localIpStr = NetUtil.getLocalIpStr(); // String localIpStr = NetUtil.getLocalIpStr();
try { // try {
// 获取在线的客户端节点并且排除当前节点 // // 获取在线的客户端节点并且排除当前节点
LambdaQueryWrapper<ServerNode> wrapper = new LambdaQueryWrapper<ServerNode>() // LambdaQueryWrapper<ServerNode> wrapper = new LambdaQueryWrapper<ServerNode>()
.eq(ServerNode::getNodeType, NodeTypeEnum.SERVER.getType()) // .eq(ServerNode::getNodeType, NodeTypeEnum.SERVER.getType())
.not(w -> w.eq(ServerNode::getHostIp, localIpStr) // .not(w -> w.eq(ServerNode::getHostIp, localIpStr)
.eq(ServerNode::getHostPort, nettyPort)); // .eq(ServerNode::getHostPort, nettyPort));
List<ServerNode> serverNodes = serverNodeMapper.selectList(wrapper); // List<ServerNode> serverNodes = serverNodeMapper.selectList(wrapper);
List<ServerNode> clientNodes = new ArrayList<>(); // List<ServerNode> clientNodes = new ArrayList<>();
if (serverNodes.size() > 0) { // if (serverNodes.size() > 0) {
// 并行获取所有服务端需要注册的列表 // // 并行获取所有服务端需要注册的列表
// 获取列表 并完成注册/本地完成续签 // // 获取列表 并完成注册/本地完成续签
List<ServerNode> allClientList = getAllClientList(serverNodes); // List<ServerNode> allClientList = getAllClientList(serverNodes);
if (CollUtil.isNotEmpty(allClientList)) { // if (CollUtil.isNotEmpty(allClientList)) {
clientNodes.addAll(allClientList); // clientNodes.addAll(allClientList);
} // }
List<ServerNode> refreshCache = GetRegNodesPostHttpRequestHandler.getAndRefreshCache(); // List<ServerNode> refreshCache = GetRegNodesPostHttpRequestHandler.getAndRefreshCache();
if (CollUtil.isNotEmpty(refreshCache)) { // if (CollUtil.isNotEmpty(refreshCache)) {
// 完成本节点的刷新 // // 完成本节点的刷新
clientNodes.addAll(refreshCache); // clientNodes.addAll(refreshCache);
} // }
} else { // } else {
List<ServerNode> refreshCache = GetRegNodesPostHttpRequestHandler.getAndRefreshCache(); // List<ServerNode> refreshCache = GetRegNodesPostHttpRequestHandler.getAndRefreshCache();
if (CollUtil.isNotEmpty(refreshCache)) { // if (CollUtil.isNotEmpty(refreshCache)) {
// 完成本节点的刷新 // // 完成本节点的刷新
clientNodes.addAll(refreshCache); // clientNodes.addAll(refreshCache);
} // }
} // }
if (CollUtil.isEmpty(clientNodes)){ // if (CollUtil.isEmpty(clientNodes)) {
SnailJobLog.LOCAL.warn("clientNodes is empty"); // SnailJobLog.LOCAL.warn("clientNodes is empty");
return; // return;
} // }
SnailJobLog.LOCAL.info("start refresh client nodes{}", clientNodes); // SnailJobLog.LOCAL.info("start refresh client nodes{}", clientNodes);
refreshExpireAt(clientNodes); // refreshExpireAt(clientNodes);
//
} catch (Exception e) { // } catch (Exception e) {
SnailJobLog.LOCAL.error("refresh 失败", e); // SnailJobLog.LOCAL.error("refresh 失败", e);
} // }
} // }
//
private List<ServerNode> getAllClientList(List<ServerNode> serverNodes) { // private List<ServerNode> getAllClientList(List<ServerNode> serverNodes) {
int size = serverNodes.size(); // int size = serverNodes.size();
// 创建 CountDownLatch // // 创建 CountDownLatch
CountDownLatch latch = new CountDownLatch(size); // CountDownLatch latch = new CountDownLatch(size);
//
// 存储处理结果 // // 存储处理结果
List<Future<String>> futures = new ArrayList<>(size); // List<Future<String>> futures = new ArrayList<>(size);
//
try { // try {
for (ServerNode serverNode : serverNodes) { // for (ServerNode serverNode : serverNodes) {
Future<String> future = executorService.submit(() -> { // Future<String> future = executorService.submit(() -> {
try { // try {
RegisterNodeInfo nodeInfo = new RegisterNodeInfo(); // RegisterNodeInfo nodeInfo = new RegisterNodeInfo();
nodeInfo.setHostId(serverNode.getHostId()); // nodeInfo.setHostId(serverNode.getHostId());
nodeInfo.setGroupName(serverNode.getGroupName()); // nodeInfo.setGroupName(serverNode.getGroupName());
nodeInfo.setNamespaceId(serverNode.getNamespaceId()); // nodeInfo.setNamespaceId(serverNode.getNamespaceId());
nodeInfo.setHostPort(serverNode.getHostPort()); // nodeInfo.setHostPort(serverNode.getHostPort());
nodeInfo.setHostIp(serverNode.getHostIp()); // nodeInfo.setHostIp(serverNode.getHostIp());
ServerRpcClient serverRpcClient = buildRpcClient(nodeInfo); // ServerRpcClient serverRpcClient = buildRpcClient(nodeInfo);
Result<String> regNodesAndFlush = serverRpcClient.getRegNodesAndFlush(); // Result<String> regNodesAndFlush = serverRpcClient.getRegNodesAndFlush();
//
// 模拟耗时处理 // // 模拟耗时处理
return regNodesAndFlush.getData(); // return regNodesAndFlush.getData();
} finally { // } finally {
// 处理完成后计数减一 // // 处理完成后计数减一
latch.countDown(); // latch.countDown();
} // }
}); // });
futures.add(future); // futures.add(future);
} // }
// 提交任务 // // 提交任务
//
// 等待所有任务完成 // // 等待所有任务完成
latch.await(5, TimeUnit.SECONDS); // 设置超时时间为5秒 // latch.await(5, TimeUnit.SECONDS); // 设置超时时间为5秒
//
return futures.stream() // return futures.stream()
.map(future -> { // .map(future -> {
try { // try {
String jsonString = future.get(1, TimeUnit.SECONDS); // String jsonString = future.get(1, TimeUnit.SECONDS);
if (Objects.nonNull(jsonString)) { // if (Objects.nonNull(jsonString)) {
return JsonUtil.parseObject(jsonString, new TypeReference<List<ServerNode>>() { // return JsonUtil.parseObject(jsonString, new TypeReference<List<ServerNode>>() {
}); // });
} // }
return new ArrayList<ServerNode>(); // return new ArrayList<ServerNode>();
} catch (Exception e) { // } catch (Exception e) {
return new ArrayList<ServerNode>(); // return new ArrayList<ServerNode>();
} // }
}) // })
.filter(Objects::nonNull) // .filter(Objects::nonNull)
.flatMap(List::stream) // .flatMap(List::stream)
.distinct() // .distinct()
.toList(); // .toList();
// 收集处理结果 // // 收集处理结果
//
} catch (InterruptedException e) { // } catch (InterruptedException e) {
throw new RuntimeException(e); // throw new RuntimeException(e);
} // }
} // }
//
@Override // @Override
public String lockName() { // public String lockName() {
return "registerNode"; // return "registerNode";
} // }
//
@Override // @Override
public String lockAtMost() { // public String lockAtMost() {
return "PT10S"; // return "PT10S";
} // }
//
@Override // @Override
public String lockAtLeast() { // public String lockAtLeast() {
return "PT5S"; // return "PT5S";
} // }
//
@Override // @Override
public void start() { // public void start() {
taskScheduler.scheduleWithFixedDelay(this::execute, Instant.now(), Duration.parse("PT5S")); // taskScheduler.scheduleWithFixedDelay(this::execute, Instant.now(), Duration.parse("PT5S"));
} // }
//
@Override // @Override
public void close() { // public void close() {
//
} // }
//
private ServerRpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo) { // private ServerRpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo) {
// String regInfo = registerNodeInfo.getHostId() + "/" + registerNodeInfo.getHostIp() + "/" + registerNodeInfo.getHostPort(); //// String regInfo = registerNodeInfo.getHostId() + "/" + registerNodeInfo.getHostIp() + "/" + registerNodeInfo.getHostPort();
// log.info(regInfo + "--------------------------"); //// log.info(regInfo + "--------------------------");
int maxRetryTimes = 3; // int maxRetryTimes = 3;
boolean retry = false; // boolean retry = false;
return RequestBuilder.<ServerRpcClient, Result>newBuilder() // return RequestBuilder.<ServerRpcClient, Result>newBuilder()
.nodeInfo(registerNodeInfo) // .nodeInfo(registerNodeInfo)
.failRetry(maxRetryTimes > 0 && !retry) // .failRetry(maxRetryTimes > 0 && !retry)
.retryTimes(maxRetryTimes) // .retryTimes(maxRetryTimes)
.client(ServerRpcClient.class) // .client(ServerRpcClient.class)
.build(); // .build();
} // }
//
private void refreshExpireAt(List<ServerNode> serverNodes) { // private void refreshExpireAt(List<ServerNode> serverNodes) {
if (CollUtil.isEmpty(serverNodes)) { // if (CollUtil.isEmpty(serverNodes)) {
return; // return;
} // }
//
Set<String> hostIds = Sets.newHashSet(); // Set<String> hostIds = Sets.newHashSet();
Set<String> hostIps = Sets.newHashSet(); // Set<String> hostIps = Sets.newHashSet();
for (final ServerNode serverNode : serverNodes) { // for (final ServerNode serverNode : serverNodes) {
serverNode.setExpireAt(getExpireAt()); // serverNode.setExpireAt(getExpireAt());
hostIds.add(serverNode.getHostId()); // hostIds.add(serverNode.getHostId());
hostIps.add(serverNode.getHostIp()); // hostIps.add(serverNode.getHostIp());
} // }
//
List<ServerNode> dbServerNodes = serverNodeMapper.selectList( // List<ServerNode> dbServerNodes = serverNodeMapper.selectList(
new LambdaQueryWrapper<ServerNode>() // new LambdaQueryWrapper<ServerNode>()
.select(ServerNode::getHostIp, ServerNode::getHostId) // .select(ServerNode::getHostIp, ServerNode::getHostId)
.in(ServerNode::getHostId, hostIds) // .in(ServerNode::getHostId, hostIds)
.in(ServerNode::getHostIp, hostIps) // .in(ServerNode::getHostIp, hostIps)
); // );
//
List<ServerNode> insertDBs = Lists.newArrayList(); // List<ServerNode> insertDBs = Lists.newArrayList();
List<ServerNode> updateDBs = Lists.newArrayList(); // List<ServerNode> updateDBs = Lists.newArrayList();
Set<Pair<String, String>> pairs = dbServerNodes.stream() // Set<Pair<String, String>> pairs = dbServerNodes.stream()
.map(serverNode -> Pair.of(serverNode.getHostId(), serverNode.getHostIp())).collect( // .map(serverNode -> Pair.of(serverNode.getHostId(), serverNode.getHostIp())).collect(
Collectors.toSet()); // Collectors.toSet());
//
// 去重处理 // // 去重处理
Set<Pair<String, String>> existed = Sets.newHashSet(); // Set<Pair<String, String>> existed = Sets.newHashSet();
for (final ServerNode serverNode : serverNodes) { // for (final ServerNode serverNode : serverNodes) {
Pair<String, String> pair = Pair.of(serverNode.getHostId(), serverNode.getHostIp()); // Pair<String, String> pair = Pair.of(serverNode.getHostId(), serverNode.getHostIp());
if (existed.contains(pair)) { // if (existed.contains(pair)) {
continue; // continue;
} // }
//
if (pairs.contains(pair)) { // if (pairs.contains(pair)) {
updateDBs.add(serverNode); // updateDBs.add(serverNode);
} else { // } else {
insertDBs.add(serverNode); // insertDBs.add(serverNode);
} // }
//
existed.add(pair); // existed.add(pair);
} // }
//
try { // try {
// 批量更新 // // 批量更新
if (CollUtil.isNotEmpty(updateDBs)) { // if (CollUtil.isNotEmpty(updateDBs)) {
serverNodeMapper.updateBatchExpireAt(updateDBs); // serverNodeMapper.updateBatchExpireAt(updateDBs);
} // }
} catch (Exception e) { // } catch (Exception e) {
SnailJobLog.LOCAL.error("续租失败", e); // SnailJobLog.LOCAL.error("续租失败", e);
} // }
//
try { // try {
if (CollUtil.isNotEmpty(insertDBs)) { // if (CollUtil.isNotEmpty(insertDBs)) {
serverNodeMapper.insertBatch(insertDBs); // serverNodeMapper.insertBatch(insertDBs);
} // }
} catch (DuplicateKeyException ignored) { // } catch (DuplicateKeyException ignored) {
} catch (Exception e) { // } catch (Exception e) {
SnailJobLog.LOCAL.error("注册节点失败", e); // SnailJobLog.LOCAL.error("注册节点失败", e);
} // }
} // }
//
private LocalDateTime getExpireAt() { // private LocalDateTime getExpireAt() {
return LocalDateTime.now().plusSeconds(DELAY_TIME); // return LocalDateTime.now().plusSeconds(DELAY_TIME);
} // }
} //}