fix(sj_1.1.1): 优化去重处理

This commit is contained in:
opensnail 2024-07-16 18:21:46 +08:00
parent dff4abbc88
commit aca65eb1f5
2 changed files with 15 additions and 8 deletions

View File

@ -74,12 +74,21 @@ public abstract class AbstractRegister implements Register, Lifecycle {
.map(serverNode -> Pair.of(serverNode.getHostId(), serverNode.getHostIp())).collect(
Collectors.toSet());
// 去重处理
Set<Pair<String, String>> existed = Sets.newHashSet();
for (final ServerNode serverNode : serverNodes) {
if (pairs.contains(Pair.of(serverNode.getHostId(), serverNode.getHostIp()))) {
Pair<String, String> pair = Pair.of(serverNode.getHostId(), serverNode.getHostIp());
if (existed.contains(pair)) {
continue;
}
if (pairs.contains(pair)) {
updateDBs.add(serverNode);
} else {
insertDBs.add(serverNode);
}
existed.add(pair);
}
try {

View File

@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.common.register;
import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.snailjob.common.core.enums.NodeTypeEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.triple.Pair;
import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
@ -12,6 +13,8 @@ import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -87,14 +90,9 @@ public class ClientRegister extends AbstractRegister implements Runnable {
ServerNode serverNode = QUEUE.poll(5L, TimeUnit.SECONDS);
if (Objects.nonNull(serverNode)) {
List<ServerNode> lists = Lists.newArrayList(serverNode);
QUEUE.drainTo(lists, 100);
// 去重
lists = new ArrayList<>(lists.stream()
.collect(
Collectors.toMap(ServerNode::getHostId, node -> node, (existing, replacement) -> existing))
.values());
QUEUE.drainTo(lists, 256);
// 注册或续租
refreshExpireAt(lists);
}
} catch (InterruptedException ignored) {