diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/client/CommonRpcClient.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/client/CommonRpcClient.java index 164e49963..21025810b 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/client/CommonRpcClient.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/client/CommonRpcClient.java @@ -6,6 +6,7 @@ import com.aizuda.snailjob.server.common.rpc.client.annotation.Body; import com.aizuda.snailjob.server.common.rpc.client.annotation.Mapping; import com.aizuda.snailjob.server.model.dto.ConfigDTO; +import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.GET_REG_NODES_AND_REFRESH; import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.SYNC_CONFIG; /** @@ -20,4 +21,6 @@ public interface CommonRpcClient { @Mapping(path = SYNC_CONFIG, method = RequestMethod.POST) Result syncConfig(@Body ConfigDTO configDTO); + @Mapping(path = GET_REG_NODES_AND_REFRESH, method = RequestMethod.POST) + Result getRegNodesAndFlush(); } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/ClientRegister.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/ClientRegister.java index 8a020d02a..56bea37a1 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/ClientRegister.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/ClientRegister.java @@ -1,22 +1,37 @@ package com.aizuda.snailjob.server.common.register; +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH; import com.aizuda.snailjob.common.core.enums.NodeTypeEnum; +import com.aizuda.snailjob.common.core.model.Result; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.core.util.NetUtil; +import com.aizuda.snailjob.common.core.util.StreamUtils; import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.cache.CacheConsumerGroup; +import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; +import com.aizuda.snailjob.server.common.client.CommonRpcClient; +import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; +import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder; +import com.aizuda.snailjob.server.common.schedule.AbstractSchedule; import com.aizuda.snailjob.server.common.triple.Pair; import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.Lists; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.TaskScheduler; import org.springframework.stereotype.Component; +import java.time.Duration; +import java.time.Instant; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.stream.Collectors; /** @@ -28,12 +43,13 @@ import java.util.stream.Collectors; */ @Component(ClientRegister.BEAN_NAME) @Slf4j -public class ClientRegister extends AbstractRegister implements Runnable { - +@RequiredArgsConstructor +public class ClientRegister extends AbstractRegister { + ExecutorService executorService = Executors.newFixedThreadPool(5); + private final RefreshNodeSchedule refreshNodeSchedule; public static final String BEAN_NAME = "clientRegister"; public static final int DELAY_TIME = 30; - private Thread THREAD = null; protected static final LinkedBlockingDeque QUEUE = new LinkedBlockingDeque<>(1000); @Override @@ -61,10 +77,8 @@ public class ClientRegister extends AbstractRegister implements Runnable { @Override protected void afterProcessor(final ServerNode serverNode) { - } - @Override protected Integer getNodeType() { return NodeTypeEnum.CLIENT.getType(); @@ -72,18 +86,14 @@ public class ClientRegister extends AbstractRegister implements Runnable { @Override public void start() { - THREAD = new Thread(this, "client-register"); - THREAD.start(); + refreshNodeSchedule.startScheduler(); } @Override public void close() { - if (Objects.nonNull(THREAD)) { - THREAD.interrupt(); - } } - public static List getExpireNodes(){ + public static List getExpireNodes() { try { ServerNode serverNode = QUEUE.poll(5L, TimeUnit.SECONDS); if (Objects.nonNull(serverNode)) { @@ -97,30 +107,145 @@ public class ClientRegister extends AbstractRegister implements Runnable { return null; } - @Override - public void run() { -// while (!Thread.currentThread().isInterrupted()) { -// try { -// ServerNode serverNode = QUEUE.poll(5L, TimeUnit.SECONDS); -// if (Objects.nonNull(serverNode)) { -// List lists = Lists.newArrayList(serverNode); -// QUEUE.drainTo(lists, 256); -// -// // 注册或续租 -// refreshExpireAt(lists); -// } -// } catch (InterruptedException ignored) { -// Thread.currentThread().interrupt(); -// } catch (Exception e) { -// SnailJobLog.LOCAL.error("client refresh expireAt error."); -// } finally { -// // 防止刷的过快 -// try { -// TimeUnit.MILLISECONDS.sleep(2000); -// } catch (InterruptedException ignored) { -// Thread.currentThread().interrupt(); -// } -// } -// } + public static List refreshLocalCache() { + // 获取当前所有需要续签的node + List expireNodes = ClientRegister.getExpireNodes(); + if (Objects.nonNull(expireNodes)) { + // 进行本地续签 + for (final ServerNode serverNode : expireNodes) { + serverNode.setExpireAt(LocalDateTime.now().plusSeconds(DELAY_TIME)); + // 刷新全量本地缓存 + CacheRegisterTable.addOrUpdate(serverNode); + // 刷新过期时间 + CacheConsumerGroup.addOrUpdate(serverNode.getGroupName(), serverNode.getNamespaceId()); + } + } + return expireNodes; + } + + @Component + @Slf4j + @RequiredArgsConstructor + public class RefreshNodeSchedule extends AbstractSchedule { + + @Override + protected void doExecute() { + try { + // 获取在线的客户端节点并且排除当前节点 + LambdaQueryWrapper wrapper = new LambdaQueryWrapper() + .eq(ServerNode::getNodeType, NodeTypeEnum.SERVER.getType()); + List serverNodes = serverNodeMapper.selectList(wrapper); + + serverNodes = StreamUtils.filter(serverNodes, serverNode -> !serverNode.getHostId().equals(ServerRegister.CURRENT_CID)); + + List waitRefreshDBClientNodes = new ArrayList<>(); + + // 刷新本地缓存 + List refreshCache = refreshLocalCache(); + if (CollUtil.isNotEmpty(refreshCache)) { + // 完成本节点的刷新 + waitRefreshDBClientNodes.addAll(refreshCache); + } + + if (!serverNodes.isEmpty()) { + // 并行获取所有服务端需要注册的列表 + // 获取列表 并完成注册/本地完成续签 + List allClientList = collectAllClientQueue(serverNodes); + if (CollUtil.isNotEmpty(allClientList)) { + waitRefreshDBClientNodes.addAll(allClientList); + } + } + + if (CollUtil.isEmpty(waitRefreshDBClientNodes)) { + SnailJobLog.LOCAL.warn("clientNodes is empty"); + return; + } + + SnailJobLog.LOCAL.info("start refresh client nodes:{}", waitRefreshDBClientNodes); + + // 刷新DB + refreshExpireAt(waitRefreshDBClientNodes); + + } catch (Exception e) { + SnailJobLog.LOCAL.error("refresh 失败", e); + } + } + + private List collectAllClientQueue(List serverNodes) { + if (CollUtil.isEmpty(serverNodes)) { + return Lists.newArrayList(); + } + + int size = serverNodes.size(); + // 存储处理结果 + List> futures = new ArrayList<>(size); + for (ServerNode serverNode : serverNodes) { + Future future = executorService.submit(() -> { + try { + RegisterNodeInfo nodeInfo = new RegisterNodeInfo(); + nodeInfo.setHostId(serverNode.getHostId()); + nodeInfo.setGroupName(serverNode.getGroupName()); + nodeInfo.setNamespaceId(serverNode.getNamespaceId()); + nodeInfo.setHostPort(serverNode.getHostPort()); + nodeInfo.setHostIp(serverNode.getHostIp()); + CommonRpcClient serverRpcClient = buildRpcClient(nodeInfo); + Result regNodesAndFlush = serverRpcClient.getRegNodesAndFlush(); + return regNodesAndFlush.getData(); + } catch (Exception e) { + return StrUtil.EMPTY; + } + }); + + futures.add(future); + } + + return futures.stream() + .map(future -> { + try { + String jsonString = future.get(1, TimeUnit.SECONDS); + if (Objects.nonNull(jsonString)) { + return JsonUtil.parseList(jsonString, ServerNode.class); + } + return new ArrayList(); + } catch (Exception e) { + return new ArrayList(); + } + }) + .filter(Objects::nonNull) + .flatMap(List::stream) + .distinct() + .toList(); + + } + + private CommonRpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo) { + + int maxRetryTimes = 3; + return RequestBuilder.newBuilder() + .nodeInfo(registerNodeInfo) + .failRetry(true) + .retryTimes(maxRetryTimes) + .client(CommonRpcClient.class) + .build(); + } + + @Override + public String lockName() { + return "registerNode"; + } + + @Override + public String lockAtMost() { + return "PT10S"; + } + + @Override + public String lockAtLeast() { + return "PT5S"; + } + + public void startScheduler() { + taskScheduler.scheduleWithFixedDelay(this::execute, Instant.now(), Duration.parse("PT5S")); + } } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/server/ServerRpcClient.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/server/ServerRpcClient.java deleted file mode 100644 index 1d20a396a..000000000 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/server/ServerRpcClient.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.aizuda.snailjob.server.job.task.server; - -import com.aizuda.snailjob.common.core.model.Result; -import com.aizuda.snailjob.server.common.rpc.client.RequestMethod; -import com.aizuda.snailjob.server.common.rpc.client.annotation.Mapping; - - -import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.GET_REG_NODES_AND_REFRESH; - -/** - * @Author:srzou - * @Package:com.aizuda.snailjob.server.job.task.server - * @Project:snail-job - * @Date:2024/12/11 9:36 - * @Filename:ServerRpcClient - */ -public interface ServerRpcClient { - @Mapping(path = GET_REG_NODES_AND_REFRESH, method = RequestMethod.POST) - Result getRegNodesAndFlush(); -} diff --git a/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/schedule/RefreshNodeSchedule.java b/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/schedule/RefreshNodeSchedule.java index 402584969..9344e1387 100644 --- a/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/schedule/RefreshNodeSchedule.java +++ b/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/schedule/RefreshNodeSchedule.java @@ -1,256 +1,256 @@ -package com.aizuda.snailjob.server.starter.schedule; - -import cn.hutool.core.collection.CollUtil; -import com.aizuda.snailjob.common.core.enums.NodeTypeEnum; -import com.aizuda.snailjob.common.core.model.Result; -import com.aizuda.snailjob.common.core.util.JsonUtil; -import com.aizuda.snailjob.common.core.util.NetUtil; -import com.aizuda.snailjob.common.log.SnailJobLog; -import com.aizuda.snailjob.server.common.Lifecycle; -import com.aizuda.snailjob.server.common.config.SystemProperties; -import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; -import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder; -import com.aizuda.snailjob.server.common.schedule.AbstractSchedule; -import com.aizuda.snailjob.server.common.triple.Pair; -import com.aizuda.snailjob.server.job.task.server.ServerRpcClient; -import com.aizuda.snailjob.server.job.task.support.request.GetRegNodesPostHttpRequestHandler; -import com.aizuda.snailjob.template.datasource.persistence.mapper.ServerNodeMapper; -import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.dao.DuplicateKeyException; -import org.springframework.stereotype.Component; - -import java.time.Duration; -import java.time.Instant; -import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.*; -import java.util.stream.Collectors; - -import static com.aizuda.snailjob.server.common.register.ClientRegister.DELAY_TIME; - - -@Component -@Slf4j -@RequiredArgsConstructor -public class RefreshNodeSchedule extends AbstractSchedule implements Lifecycle { - private final ServerNodeMapper serverNodeMapper; - - private final SystemProperties systemProperties; - - ExecutorService executorService = Executors.newFixedThreadPool(5); - - @Override - protected void doExecute() { - int nettyPort = systemProperties.getNettyPort(); - String localIpStr = NetUtil.getLocalIpStr(); - try { - // 获取在线的客户端节点并且排除当前节点 - LambdaQueryWrapper wrapper = new LambdaQueryWrapper() - .eq(ServerNode::getNodeType, NodeTypeEnum.SERVER.getType()) - .not(w -> w.eq(ServerNode::getHostIp, localIpStr) - .eq(ServerNode::getHostPort, nettyPort)); - List serverNodes = serverNodeMapper.selectList(wrapper); - List clientNodes = new ArrayList<>(); - if (serverNodes.size() > 0) { - // 并行获取所有服务端需要注册的列表 - // 获取列表 并完成注册/本地完成续签 - List allClientList = getAllClientList(serverNodes); - if (CollUtil.isNotEmpty(allClientList)) { - clientNodes.addAll(allClientList); - } - List refreshCache = GetRegNodesPostHttpRequestHandler.getAndRefreshCache(); - if (CollUtil.isNotEmpty(refreshCache)) { - // 完成本节点的刷新 - clientNodes.addAll(refreshCache); - } - } else { - List refreshCache = GetRegNodesPostHttpRequestHandler.getAndRefreshCache(); - if (CollUtil.isNotEmpty(refreshCache)) { - // 完成本节点的刷新 - clientNodes.addAll(refreshCache); - } - } - if (CollUtil.isEmpty(clientNodes)){ - SnailJobLog.LOCAL.warn("clientNodes is empty"); - return; - } - SnailJobLog.LOCAL.info("start refresh client nodes:{}", clientNodes); - refreshExpireAt(clientNodes); - - } catch (Exception e) { - SnailJobLog.LOCAL.error("refresh 失败", e); - } - } - - private List getAllClientList(List serverNodes) { - int size = serverNodes.size(); - // 创建 CountDownLatch - CountDownLatch latch = new CountDownLatch(size); - - // 存储处理结果 - List> futures = new ArrayList<>(size); - - try { - for (ServerNode serverNode : serverNodes) { - Future future = executorService.submit(() -> { - try { - RegisterNodeInfo nodeInfo = new RegisterNodeInfo(); - nodeInfo.setHostId(serverNode.getHostId()); - nodeInfo.setGroupName(serverNode.getGroupName()); - nodeInfo.setNamespaceId(serverNode.getNamespaceId()); - nodeInfo.setHostPort(serverNode.getHostPort()); - nodeInfo.setHostIp(serverNode.getHostIp()); - ServerRpcClient serverRpcClient = buildRpcClient(nodeInfo); - Result regNodesAndFlush = serverRpcClient.getRegNodesAndFlush(); - - // 模拟耗时处理 - return regNodesAndFlush.getData(); - } finally { - // 处理完成后计数减一 - latch.countDown(); - } - }); - futures.add(future); - } - // 提交任务 - - // 等待所有任务完成 - latch.await(5, TimeUnit.SECONDS); // 设置超时时间为5秒 - - return futures.stream() - .map(future -> { - try { - String jsonString = future.get(1, TimeUnit.SECONDS); - if (Objects.nonNull(jsonString)) { - return JsonUtil.parseObject(jsonString, new TypeReference>() { - }); - } - return new ArrayList(); - } catch (Exception e) { - return new ArrayList(); - } - }) - .filter(Objects::nonNull) - .flatMap(List::stream) - .distinct() - .toList(); - // 收集处理结果 - - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - @Override - public String lockName() { - return "registerNode"; - } - - @Override - public String lockAtMost() { - return "PT10S"; - } - - @Override - public String lockAtLeast() { - return "PT5S"; - } - - @Override - public void start() { - taskScheduler.scheduleWithFixedDelay(this::execute, Instant.now(), Duration.parse("PT5S")); - } - - @Override - public void close() { - - } - - private ServerRpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo) { -// String regInfo = registerNodeInfo.getHostId() + "/" + registerNodeInfo.getHostIp() + "/" + registerNodeInfo.getHostPort(); -// log.info(regInfo + "--------------------------"); - int maxRetryTimes = 3; - boolean retry = false; - return RequestBuilder.newBuilder() - .nodeInfo(registerNodeInfo) - .failRetry(maxRetryTimes > 0 && !retry) - .retryTimes(maxRetryTimes) - .client(ServerRpcClient.class) - .build(); - } - - private void refreshExpireAt(List serverNodes) { - if (CollUtil.isEmpty(serverNodes)) { - return; - } - - Set hostIds = Sets.newHashSet(); - Set hostIps = Sets.newHashSet(); - for (final ServerNode serverNode : serverNodes) { - serverNode.setExpireAt(getExpireAt()); - hostIds.add(serverNode.getHostId()); - hostIps.add(serverNode.getHostIp()); - } - - List dbServerNodes = serverNodeMapper.selectList( - new LambdaQueryWrapper() - .select(ServerNode::getHostIp, ServerNode::getHostId) - .in(ServerNode::getHostId, hostIds) - .in(ServerNode::getHostIp, hostIps) - ); - - List insertDBs = Lists.newArrayList(); - List updateDBs = Lists.newArrayList(); - Set> pairs = dbServerNodes.stream() - .map(serverNode -> Pair.of(serverNode.getHostId(), serverNode.getHostIp())).collect( - Collectors.toSet()); - - // 去重处理 - Set> existed = Sets.newHashSet(); - for (final ServerNode serverNode : serverNodes) { - Pair pair = Pair.of(serverNode.getHostId(), serverNode.getHostIp()); - if (existed.contains(pair)) { - continue; - } - - if (pairs.contains(pair)) { - updateDBs.add(serverNode); - } else { - insertDBs.add(serverNode); - } - - existed.add(pair); - } - - try { - // 批量更新 - if (CollUtil.isNotEmpty(updateDBs)) { - serverNodeMapper.updateBatchExpireAt(updateDBs); - } - } catch (Exception e) { - SnailJobLog.LOCAL.error("续租失败", e); - } - - try { - if (CollUtil.isNotEmpty(insertDBs)) { - serverNodeMapper.insertBatch(insertDBs); - } - } catch (DuplicateKeyException ignored) { - } catch (Exception e) { - SnailJobLog.LOCAL.error("注册节点失败", e); - } - } - - private LocalDateTime getExpireAt() { - return LocalDateTime.now().plusSeconds(DELAY_TIME); - } -} +//package com.aizuda.snailjob.server.starter.schedule; +// +//import cn.hutool.core.collection.CollUtil; +//import com.aizuda.snailjob.common.core.enums.NodeTypeEnum; +//import com.aizuda.snailjob.common.core.model.Result; +//import com.aizuda.snailjob.common.core.util.JsonUtil; +//import com.aizuda.snailjob.common.core.util.NetUtil; +//import com.aizuda.snailjob.common.log.SnailJobLog; +//import com.aizuda.snailjob.server.common.Lifecycle; +//import com.aizuda.snailjob.server.common.config.SystemProperties; +//import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo; +//import com.aizuda.snailjob.server.common.rpc.client.RequestBuilder; +//import com.aizuda.snailjob.server.common.schedule.AbstractSchedule; +//import com.aizuda.snailjob.server.common.triple.Pair; +//import com.aizuda.snailjob.server.job.task.server.ServerRpcClient; +//import com.aizuda.snailjob.server.job.task.support.request.GetRegNodesPostHttpRequestHandler; +//import com.aizuda.snailjob.template.datasource.persistence.mapper.ServerNodeMapper; +//import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode; +//import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +//import com.fasterxml.jackson.core.type.TypeReference; +//import com.google.common.collect.Lists; +//import com.google.common.collect.Sets; +//import lombok.RequiredArgsConstructor; +//import lombok.extern.slf4j.Slf4j; +//import org.springframework.dao.DuplicateKeyException; +//import org.springframework.stereotype.Component; +// +//import java.time.Duration; +//import java.time.Instant; +//import java.time.LocalDateTime; +//import java.util.ArrayList; +//import java.util.List; +//import java.util.Objects; +//import java.util.Set; +//import java.util.concurrent.*; +//import java.util.stream.Collectors; +// +//import static com.aizuda.snailjob.server.common.register.ClientRegister.DELAY_TIME; +// +// +//@Component +//@Slf4j +//@RequiredArgsConstructor +//public class RefreshNodeSchedule extends AbstractSchedule implements Lifecycle { +// private final ServerNodeMapper serverNodeMapper; +// +// private final SystemProperties systemProperties; +// +// ExecutorService executorService = Executors.newFixedThreadPool(5); +// +// @Override +// protected void doExecute() { +// int nettyPort = systemProperties.getNettyPort(); +// String localIpStr = NetUtil.getLocalIpStr(); +// try { +// // 获取在线的客户端节点并且排除当前节点 +// LambdaQueryWrapper wrapper = new LambdaQueryWrapper() +// .eq(ServerNode::getNodeType, NodeTypeEnum.SERVER.getType()) +// .not(w -> w.eq(ServerNode::getHostIp, localIpStr) +// .eq(ServerNode::getHostPort, nettyPort)); +// List serverNodes = serverNodeMapper.selectList(wrapper); +// List clientNodes = new ArrayList<>(); +// if (serverNodes.size() > 0) { +// // 并行获取所有服务端需要注册的列表 +// // 获取列表 并完成注册/本地完成续签 +// List allClientList = getAllClientList(serverNodes); +// if (CollUtil.isNotEmpty(allClientList)) { +// clientNodes.addAll(allClientList); +// } +// List refreshCache = GetRegNodesPostHttpRequestHandler.getAndRefreshCache(); +// if (CollUtil.isNotEmpty(refreshCache)) { +// // 完成本节点的刷新 +// clientNodes.addAll(refreshCache); +// } +// } else { +// List refreshCache = GetRegNodesPostHttpRequestHandler.getAndRefreshCache(); +// if (CollUtil.isNotEmpty(refreshCache)) { +// // 完成本节点的刷新 +// clientNodes.addAll(refreshCache); +// } +// } +// if (CollUtil.isEmpty(clientNodes)) { +// SnailJobLog.LOCAL.warn("clientNodes is empty"); +// return; +// } +// SnailJobLog.LOCAL.info("start refresh client nodes:{}", clientNodes); +// refreshExpireAt(clientNodes); +// +// } catch (Exception e) { +// SnailJobLog.LOCAL.error("refresh 失败", e); +// } +// } +// +// private List getAllClientList(List serverNodes) { +// int size = serverNodes.size(); +// // 创建 CountDownLatch +// CountDownLatch latch = new CountDownLatch(size); +// +// // 存储处理结果 +// List> futures = new ArrayList<>(size); +// +// try { +// for (ServerNode serverNode : serverNodes) { +// Future future = executorService.submit(() -> { +// try { +// RegisterNodeInfo nodeInfo = new RegisterNodeInfo(); +// nodeInfo.setHostId(serverNode.getHostId()); +// nodeInfo.setGroupName(serverNode.getGroupName()); +// nodeInfo.setNamespaceId(serverNode.getNamespaceId()); +// nodeInfo.setHostPort(serverNode.getHostPort()); +// nodeInfo.setHostIp(serverNode.getHostIp()); +// ServerRpcClient serverRpcClient = buildRpcClient(nodeInfo); +// Result regNodesAndFlush = serverRpcClient.getRegNodesAndFlush(); +// +// // 模拟耗时处理 +// return regNodesAndFlush.getData(); +// } finally { +// // 处理完成后计数减一 +// latch.countDown(); +// } +// }); +// futures.add(future); +// } +// // 提交任务 +// +// // 等待所有任务完成 +// latch.await(5, TimeUnit.SECONDS); // 设置超时时间为5秒 +// +// return futures.stream() +// .map(future -> { +// try { +// String jsonString = future.get(1, TimeUnit.SECONDS); +// if (Objects.nonNull(jsonString)) { +// return JsonUtil.parseObject(jsonString, new TypeReference>() { +// }); +// } +// return new ArrayList(); +// } catch (Exception e) { +// return new ArrayList(); +// } +// }) +// .filter(Objects::nonNull) +// .flatMap(List::stream) +// .distinct() +// .toList(); +// // 收集处理结果 +// +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } +// } +// +// @Override +// public String lockName() { +// return "registerNode"; +// } +// +// @Override +// public String lockAtMost() { +// return "PT10S"; +// } +// +// @Override +// public String lockAtLeast() { +// return "PT5S"; +// } +// +// @Override +// public void start() { +// taskScheduler.scheduleWithFixedDelay(this::execute, Instant.now(), Duration.parse("PT5S")); +// } +// +// @Override +// public void close() { +// +// } +// +// private ServerRpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo) { +//// String regInfo = registerNodeInfo.getHostId() + "/" + registerNodeInfo.getHostIp() + "/" + registerNodeInfo.getHostPort(); +//// log.info(regInfo + "--------------------------"); +// int maxRetryTimes = 3; +// boolean retry = false; +// return RequestBuilder.newBuilder() +// .nodeInfo(registerNodeInfo) +// .failRetry(maxRetryTimes > 0 && !retry) +// .retryTimes(maxRetryTimes) +// .client(ServerRpcClient.class) +// .build(); +// } +// +// private void refreshExpireAt(List serverNodes) { +// if (CollUtil.isEmpty(serverNodes)) { +// return; +// } +// +// Set hostIds = Sets.newHashSet(); +// Set hostIps = Sets.newHashSet(); +// for (final ServerNode serverNode : serverNodes) { +// serverNode.setExpireAt(getExpireAt()); +// hostIds.add(serverNode.getHostId()); +// hostIps.add(serverNode.getHostIp()); +// } +// +// List dbServerNodes = serverNodeMapper.selectList( +// new LambdaQueryWrapper() +// .select(ServerNode::getHostIp, ServerNode::getHostId) +// .in(ServerNode::getHostId, hostIds) +// .in(ServerNode::getHostIp, hostIps) +// ); +// +// List insertDBs = Lists.newArrayList(); +// List updateDBs = Lists.newArrayList(); +// Set> pairs = dbServerNodes.stream() +// .map(serverNode -> Pair.of(serverNode.getHostId(), serverNode.getHostIp())).collect( +// Collectors.toSet()); +// +// // 去重处理 +// Set> existed = Sets.newHashSet(); +// for (final ServerNode serverNode : serverNodes) { +// Pair pair = Pair.of(serverNode.getHostId(), serverNode.getHostIp()); +// if (existed.contains(pair)) { +// continue; +// } +// +// if (pairs.contains(pair)) { +// updateDBs.add(serverNode); +// } else { +// insertDBs.add(serverNode); +// } +// +// existed.add(pair); +// } +// +// try { +// // 批量更新 +// if (CollUtil.isNotEmpty(updateDBs)) { +// serverNodeMapper.updateBatchExpireAt(updateDBs); +// } +// } catch (Exception e) { +// SnailJobLog.LOCAL.error("续租失败", e); +// } +// +// try { +// if (CollUtil.isNotEmpty(insertDBs)) { +// serverNodeMapper.insertBatch(insertDBs); +// } +// } catch (DuplicateKeyException ignored) { +// } catch (Exception e) { +// SnailJobLog.LOCAL.error("注册节点失败", e); +// } +// } +// +// private LocalDateTime getExpireAt() { +// return LocalDateTime.now().plusSeconds(DELAY_TIME); +// } +//}