diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/AbstractRegister.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/AbstractRegister.java index 564c70e3..dbeded3e 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/AbstractRegister.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/register/AbstractRegister.java @@ -1,17 +1,25 @@ package com.aizuda.snailjob.server.common.register; +import cn.hutool.core.collection.CollUtil; +import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.Lifecycle; import com.aizuda.snailjob.server.common.Register; import com.aizuda.snailjob.server.common.cache.CacheRegisterTable; +import com.aizuda.snailjob.server.common.triple.Pair; import com.aizuda.snailjob.template.datasource.persistence.mapper.ServerNodeMapper; import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DuplicateKeyException; import java.time.LocalDateTime; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; /** * @author opensnail @@ -41,21 +49,57 @@ public abstract class AbstractRegister implements Register, Lifecycle { protected abstract void afterProcessor(final ServerNode serverNode); protected void refreshExpireAt(List serverNodes) { - - for (final ServerNode serverNode : serverNodes) { - serverNode.setExpireAt(getExpireAt()); + if (CollUtil.isEmpty(serverNodes)) { + return; } - // 批量更新 - if (serverNodes.size() != serverNodeMapper.updateBatchExpireAt(serverNodes)) { - try { - serverNodeMapper.insertBatch(serverNodes); - } catch (DuplicateKeyException ignored) { - } catch (Exception e) { - SnailJobLog.LOCAL.error("注册节点失败", e); + Set hostIds = Sets.newHashSet(); + Set hostIps = Sets.newHashSet(); + for (final ServerNode serverNode : serverNodes) { + serverNode.setExpireAt(getExpireAt()); + hostIds.add(serverNode.getHostId()); + hostIps.add(serverNode.getHostIp()); + } + + List dbServerNodes = serverNodeMapper.selectList( + new LambdaQueryWrapper() + .select(ServerNode::getHostIp, ServerNode::getHostId) + .in(ServerNode::getHostId, hostIds) + .in(ServerNode::getHostIp, hostIps) + ); + + List insertDBs = Lists.newArrayList(); + List updateDBs = Lists.newArrayList(); + Set> pairs = dbServerNodes.stream() + .map(serverNode -> Pair.of(serverNode.getHostId(), serverNode.getHostIp())).collect( + Collectors.toSet()); + + for (final ServerNode serverNode : serverNodes) { + if (pairs.contains(Pair.of(serverNode.getHostId(), serverNode.getHostIp()))) { + updateDBs.add(serverNode); + } else { + insertDBs.add(serverNode); } } + try { + // 批量更新 + if (updateDBs.size() != serverNodeMapper.updateBatchExpireAt(updateDBs)) { + SnailJobLog.LOCAL.warn("续租失败 [{}]", JsonUtil.toJsonString(updateDBs)); + } + } catch (Exception e) { + SnailJobLog.LOCAL.error("续租失败", e); + } + + try { + if (insertDBs.size() != serverNodeMapper.insertBatch(insertDBs)) { + SnailJobLog.LOCAL.warn("注册节点失败 [{}]", JsonUtil.toJsonString(insertDBs)); + } + } catch (DuplicateKeyException ignored) { + } catch (Exception e) { + SnailJobLog.LOCAL.error("注册节点失败", e); + } + for (final ServerNode serverNode : serverNodes) { // 刷新本地缓存过期时间 CacheRegisterTable.refreshExpireAt(serverNode);