fix(sj_1.1.1): 修复客户端注册问题

This commit is contained in:
opensnail 2024-07-16 17:49:02 +08:00
parent bbd0d66dbd
commit dff4abbc88

View File

@ -1,17 +1,25 @@
package com.aizuda.snailjob.server.common.register;
import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.common.Register;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.triple.Pair;
import com.aizuda.snailjob.template.datasource.persistence.mapper.ServerNodeMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @author opensnail
@ -41,21 +49,57 @@ public abstract class AbstractRegister implements Register, Lifecycle {
protected abstract void afterProcessor(final ServerNode serverNode);
protected void refreshExpireAt(List<ServerNode> serverNodes) {
for (final ServerNode serverNode : serverNodes) {
serverNode.setExpireAt(getExpireAt());
if (CollUtil.isEmpty(serverNodes)) {
return;
}
// 批量更新
if (serverNodes.size() != serverNodeMapper.updateBatchExpireAt(serverNodes)) {
try {
serverNodeMapper.insertBatch(serverNodes);
} catch (DuplicateKeyException ignored) {
} catch (Exception e) {
SnailJobLog.LOCAL.error("注册节点失败", e);
Set<String> hostIds = Sets.newHashSet();
Set<String> hostIps = Sets.newHashSet();
for (final ServerNode serverNode : serverNodes) {
serverNode.setExpireAt(getExpireAt());
hostIds.add(serverNode.getHostId());
hostIps.add(serverNode.getHostIp());
}
List<ServerNode> dbServerNodes = serverNodeMapper.selectList(
new LambdaQueryWrapper<ServerNode>()
.select(ServerNode::getHostIp, ServerNode::getHostId)
.in(ServerNode::getHostId, hostIds)
.in(ServerNode::getHostIp, hostIps)
);
List<ServerNode> insertDBs = Lists.newArrayList();
List<ServerNode> updateDBs = Lists.newArrayList();
Set<Pair<String, String>> pairs = dbServerNodes.stream()
.map(serverNode -> Pair.of(serverNode.getHostId(), serverNode.getHostIp())).collect(
Collectors.toSet());
for (final ServerNode serverNode : serverNodes) {
if (pairs.contains(Pair.of(serverNode.getHostId(), serverNode.getHostIp()))) {
updateDBs.add(serverNode);
} else {
insertDBs.add(serverNode);
}
}
try {
// 批量更新
if (updateDBs.size() != serverNodeMapper.updateBatchExpireAt(updateDBs)) {
SnailJobLog.LOCAL.warn("续租失败 [{}]", JsonUtil.toJsonString(updateDBs));
}
} catch (Exception e) {
SnailJobLog.LOCAL.error("续租失败", e);
}
try {
if (insertDBs.size() != serverNodeMapper.insertBatch(insertDBs)) {
SnailJobLog.LOCAL.warn("注册节点失败 [{}]", JsonUtil.toJsonString(insertDBs));
}
} catch (DuplicateKeyException ignored) {
} catch (Exception e) {
SnailJobLog.LOCAL.error("注册节点失败", e);
}
for (final ServerNode serverNode : serverNodes) {
// 刷新本地缓存过期时间
CacheRegisterTable.refreshExpireAt(serverNode);