From dff4abbc884aff8af69e7f6424d38359c286a51b Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Tue, 16 Jul 2024 17:49:02 +0800 Subject: [PATCH] =?UTF-8?q?fix(sj=5F1.1.1):=20=E4=BF=AE=E5=A4=8D=E5=AE=A2?= =?UTF-8?q?=E6=88=B7=E7=AB=AF=E6=B3=A8=E5=86=8C=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/register/AbstractRegister.java | 64 ++++++++++++++++--- 1 file changed, 54 insertions(+), 10 deletions(-) diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/AbstractRegister.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/AbstractRegister.java index 564c70e3..dbeded3e 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/AbstractRegister.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/AbstractRegister.java @@ -1,17 +1,25 @@ package com.aizuda.snailjob.server.common.register; +import cn.hutool.core.collection.CollUtil; +import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.Lifecycle; import com.aizuda.snailjob.server.common.Register; import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; +import com.aizuda.snailjob.server.common.triple.Pair; import com.aizuda.snailjob.template.datasource.persistence.mapper.ServerNodeMapper; import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DuplicateKeyException; import java.time.LocalDateTime; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; /** * @author opensnail @@ -41,21 +49,57 @@ public abstract class AbstractRegister implements Register, Lifecycle { protected abstract void afterProcessor(final ServerNode serverNode); protected void refreshExpireAt(List serverNodes) { - - for (final ServerNode serverNode : serverNodes) { - serverNode.setExpireAt(getExpireAt()); + if (CollUtil.isEmpty(serverNodes)) { + return; } - // 批量更新 - if (serverNodes.size() != serverNodeMapper.updateBatchExpireAt(serverNodes)) { - try { - serverNodeMapper.insertBatch(serverNodes); - } catch (DuplicateKeyException ignored) { - } catch (Exception e) { - SnailJobLog.LOCAL.error("注册节点失败", e); + Set hostIds = Sets.newHashSet(); + Set hostIps = Sets.newHashSet(); + for (final ServerNode serverNode : serverNodes) { + serverNode.setExpireAt(getExpireAt()); + hostIds.add(serverNode.getHostId()); + hostIps.add(serverNode.getHostIp()); + } + + List dbServerNodes = serverNodeMapper.selectList( + new LambdaQueryWrapper() + .select(ServerNode::getHostIp, ServerNode::getHostId) + .in(ServerNode::getHostId, hostIds) + .in(ServerNode::getHostIp, hostIps) + ); + + List insertDBs = Lists.newArrayList(); + List updateDBs = Lists.newArrayList(); + Set> pairs = dbServerNodes.stream() + .map(serverNode -> Pair.of(serverNode.getHostId(), serverNode.getHostIp())).collect( + Collectors.toSet()); + + for (final ServerNode serverNode : serverNodes) { + if (pairs.contains(Pair.of(serverNode.getHostId(), serverNode.getHostIp()))) { + updateDBs.add(serverNode); + } else { + insertDBs.add(serverNode); } } + try { + // 批量更新 + if (updateDBs.size() != serverNodeMapper.updateBatchExpireAt(updateDBs)) { + SnailJobLog.LOCAL.warn("续租失败 [{}]", JsonUtil.toJsonString(updateDBs)); + } + } catch (Exception e) { + SnailJobLog.LOCAL.error("续租失败", e); + } + + try { + if (insertDBs.size() != serverNodeMapper.insertBatch(insertDBs)) { + SnailJobLog.LOCAL.warn("注册节点失败 [{}]", JsonUtil.toJsonString(insertDBs)); + } + } catch (DuplicateKeyException ignored) { + } catch (Exception e) { + SnailJobLog.LOCAL.error("注册节点失败", e); + } + for (final ServerNode serverNode : serverNodes) { // 刷新本地缓存过期时间 CacheRegisterTable.refreshExpireAt(serverNode);