diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/AbstractRegister.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/AbstractRegister.java index dbeded3e..8c9f899f 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/AbstractRegister.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/AbstractRegister.java @@ -74,12 +74,21 @@ public abstract class AbstractRegister implements Register, Lifecycle { .map(serverNode -> Pair.of(serverNode.getHostId(), serverNode.getHostIp())).collect( Collectors.toSet()); + // 去重处理 + Set> existed = Sets.newHashSet(); for (final ServerNode serverNode : serverNodes) { - if (pairs.contains(Pair.of(serverNode.getHostId(), serverNode.getHostIp()))) { + Pair pair = Pair.of(serverNode.getHostId(), serverNode.getHostIp()); + if (existed.contains(pair)) { + continue; + } + + if (pairs.contains(pair)) { updateDBs.add(serverNode); } else { insertDBs.add(serverNode); } + + existed.add(pair); } try { diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/ClientRegister.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/ClientRegister.java index 94f8ca1b..29eee112 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/ClientRegister.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/ClientRegister.java @@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.common.register; import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH; import com.aizuda.snailjob.common.core.enums.NodeTypeEnum; import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.triple.Pair; import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; @@ -12,6 +13,8 @@ import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -87,14 +90,9 @@ public class ClientRegister extends AbstractRegister implements Runnable { ServerNode serverNode = QUEUE.poll(5L, TimeUnit.SECONDS); if (Objects.nonNull(serverNode)) { List lists = Lists.newArrayList(serverNode); - QUEUE.drainTo(lists, 100); - - // 去重 - lists = new ArrayList<>(lists.stream() - .collect( - Collectors.toMap(ServerNode::getHostId, node -> node, (existing, replacement) -> existing)) - .values()); + QUEUE.drainTo(lists, 256); + // 注册或续租 refreshExpireAt(lists); } } catch (InterruptedException ignored) {