feat: 2.0.0

1. 优化客户端和服务端注册逻辑
This commit is contained in:
www.byteblogs.com 2023-06-07 23:14:24 +08:00 committed by byteblogs168
parent 042105b084
commit 9d1c417f46
7 changed files with 403 additions and 7 deletions

View File

@ -0,0 +1,27 @@
package com.aizuda.easy.retry.server.support;
import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum;
import com.aizuda.easy.retry.server.support.register.RegisterContext;
/**
* @author www.byteblogs.com
* @date 2023-06-07
* @since 2.0
*/
public interface Register {
/**
* 节点类型 see: {@link NodeTypeEnum}
*
* @param type 1. 客户端 2.服务端
* @return
*/
boolean supports(int type);
/**
* 执行注册
*
* @return
*/
boolean register(RegisterContext context);
}

View File

@ -0,0 +1,123 @@
package com.aizuda.easy.retry.server.support.cache;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode;
import com.aizuda.easy.retry.server.support.Lifecycle;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Comparator;
import java.util.Objects;
import java.util.Observable;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
/**
* POD注册表
*
* @author www.byteblogs.com
* @date 2021-10-30
* @since 2.0
*/
@Component
@Data
@Slf4j
public class CacheRegisterTable implements Lifecycle {
private static Cache<String, ConcurrentMap<String, ServerNode>> CACHE;
/**
* 获取所有缓存
*
* @return 缓存对象
*/
public static ConcurrentMap<String, ServerNode> get(String groupName) {
return CACHE.getIfPresent(groupName);
}
/**
* 获取所有缓存
*
* @return 缓存对象
*/
public static ServerNode getServerNode(String groupName, String hostId) {
ConcurrentMap<String, ServerNode> concurrentMap = CACHE.getIfPresent(groupName);
if (Objects.isNull(concurrentMap)) {
return null;
}
return concurrentMap.get(hostId);
}
/**
* 获取排序的ServerNode
*
* @return 缓存对象
*/
public static TreeSet<ServerNode> getServerNodeSet(String groupName) {
ConcurrentMap<String, ServerNode> concurrentMap = CACHE.getIfPresent(groupName);
if (Objects.isNull(concurrentMap)) {
return new TreeSet<>();
}
return new TreeSet<>(Comparator.comparingInt(o -> o.getId().intValue()));
}
/**
* 获取排序的ServerNode
*
* @return 缓存对象
*/
public static Set<String> gePodIpSet(String groupName) {
return getServerNodeSet(groupName).stream().map(ServerNode::getHostIp).collect(Collectors.toSet());
}
/**
* 获取所有缓存
*
* @return 缓存对象
*/
public static void put(String groupName, ServerNode serverNode) {
ConcurrentMap<String, ServerNode> concurrentMap = CACHE.getIfPresent(groupName);
if (Objects.isNull(concurrentMap)) {
concurrentMap = new ConcurrentHashMap<>();
}
concurrentMap.put(serverNode.getHostId(), serverNode);
}
public static void remove(String groupName, String hostId) {
ConcurrentMap<String, ServerNode> concurrentMap = CACHE.getIfPresent(groupName);
if (Objects.isNull(concurrentMap)) {
return;
}
concurrentMap.remove(hostId);
}
public static void expirationElimination(String groupName, String hostId) {
}
@Override
public void start() {
LogUtils.info(log, "CacheRegisterTable start");
CACHE = CacheBuilder.newBuilder()
// 设置并发级别为cpu核心数
.concurrencyLevel(Runtime.getRuntime().availableProcessors())
.build();
}
@Override
public void close() {
LogUtils.info(log, "CacheRegisterTable stop");
CACHE.invalidateAll();
}
}

View File

@ -0,0 +1,74 @@
package com.aizuda.easy.retry.server.support.register;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.config.SystemProperties;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.ServerNodeMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode;
import com.aizuda.easy.retry.server.support.Lifecycle;
import com.aizuda.easy.retry.server.support.Register;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import java.time.LocalDateTime;
/**
* @author www.byteblogs.com
* @date 2023-06-07
* @since 1.6.0
*/
@Slf4j
public abstract class AbstractRegister implements Register, Lifecycle {
@Autowired
private ServerNodeMapper serverNodeMapper;
@Autowired
private SystemProperties systemProperties;
@Override
public boolean register(RegisterContext context) {
beforeProcessor(context);
ServerNode serverNode = initServerNode(context);
return doRegister(context, serverNode);
}
protected void refreshExpireAt(ServerNode serverNode) {
try {
serverNodeMapper.insertOrUpdate(serverNode);
}catch (Exception e) {
LogUtils.error(log,"注册节点失败 groupName:[{}] hostIp:[{}]",
serverNode.getGroupName(), serverNode.getHostIp(), e);
}
}
protected abstract void beforeProcessor(RegisterContext context);
protected ServerNode initServerNode(RegisterContext context) {
ServerNode serverNode = new ServerNode();
serverNode.setHostId(context.getHostId());
serverNode.setHostIp(context.getHostIp());
serverNode.setGroupName(context.getGroupName());
serverNode.setHostPort(systemProperties.getNettyPort());
serverNode.setNodeType(getNodeType());
serverNode.setCreateDt(LocalDateTime.now());
serverNode.setContextPath(context.getContextPath());
serverNode.setExpireAt(getExpireAt(context));
return serverNode;
}
protected abstract LocalDateTime getExpireAt(RegisterContext context);
protected abstract boolean doRegister(RegisterContext context, ServerNode serverNode);
protected abstract Integer getNodeType();
}

View File

@ -0,0 +1,71 @@
package com.aizuda.easy.retry.server.support.register;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 客户端注册
*
* @author www.byteblogs.com
* @date 2023-06-07
* @since 1.6.0
*/
@Component
public class ClientRegister extends AbstractRegister {
public static final int DELAY_TIME = 30;
protected static final LinkedBlockingQueue<ServerNode> QUEUE = new LinkedBlockingQueue<>();
@Override
public boolean supports(int type) {
return getNodeType().equals(type);
}
@Override
protected void beforeProcessor(RegisterContext context) {
}
@Override
protected LocalDateTime getExpireAt(RegisterContext context) {
return LocalDateTime.now().plusSeconds(DELAY_TIME);
}
@Override
protected boolean doRegister(RegisterContext context, ServerNode serverNode) {
Assert.isTrue(QUEUE.offer(serverNode), () ->
new EasyRetryServerException("add register Queue error. groupName:[{}] size:[{}]",
serverNode.getGroupName(), QUEUE.size()));
return Boolean.TRUE;
}
@Override
protected Integer getNodeType() {
return NodeTypeEnum.CLIENT.getType();
}
@Override
public void start() {
new Thread(() -> {
try {
ServerNode serverNode = QUEUE.take();
refreshExpireAt(serverNode);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
@Override
public void close() {
}
}

View File

@ -0,0 +1,29 @@
package com.aizuda.easy.retry.server.support.register;
import lombok.Data;
import java.time.LocalDateTime;
/**
* @author www.byteblogs.com
* @date 2023-06-07
* @since 2.0
*/
@Data
public class RegisterContext {
private String groupName;
private String hostId;
private String hostIp;
private Integer hostPort;
private LocalDateTime expireAt;
private Integer nodeType;
private String contextPath;
}

View File

@ -0,0 +1,78 @@
package com.aizuda.easy.retry.server.support.register;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.HostUtils;
import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode;
import com.aizuda.easy.retry.server.support.Register;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 服务端注册
*
* @author www.byteblogs.com
* @date 2023-06-07
* @since 1.6.0
*/
@Component
public class ServerRegister extends AbstractRegister {
private final ScheduledExecutorService serverRegisterNode = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r,"ServerRegisterNode"));
public static final int DELAY_TIME = 30;
public static final String CURRENT_CID;
static {
CURRENT_CID = IdUtil.simpleUUID();
}
@Override
public boolean supports(int type) {
return getNodeType().equals(type);
}
@Override
protected void beforeProcessor(RegisterContext context) {
context.setHostId(CURRENT_CID);
context.setHostIp(HostUtils.getIp());
context.setGroupName(StrUtil.EMPTY);
context.setContextPath(StrUtil.EMPTY);
}
@Override
protected LocalDateTime getExpireAt(RegisterContext context) {
return LocalDateTime.now().plusSeconds(DELAY_TIME);
}
@Override
protected boolean doRegister(RegisterContext context, ServerNode serverNode) {
refreshExpireAt(serverNode);
return Boolean.TRUE;
}
@Override
protected Integer getNodeType() {
return NodeTypeEnum.SERVER.getType();
}
@Override
public void start() {
Register register = SpringContext.getBean("serverRegister", Register.class);
serverRegisterNode.scheduleAtFixedRate(()->{
register.register(new RegisterContext());
}, 1, DELAY_TIME / 2, TimeUnit.SECONDS);
}
@Override
public void close() {
}
}

View File

@ -23,13 +23,7 @@
#{hostPort,jdbcType=INTEGER},
#{expireAt,jdbcType=TIMESTAMP}, #{nodeType,jdbcType=TINYINT}, #{contextPath,jdbcType=VARCHAR}, #{createDt,jdbcType=TIMESTAMP}
) ON DUPLICATE KEY UPDATE
host_id = values(`host_id`),
host_ip = values(`host_ip`),
host_port = values(`host_port`),
expire_at = values(`expire_at`),
node_type = values(`node_type`),
create_dt = values(`create_dt`),
context_path = values(`context_path`)
expire_at = values(`expire_at`)
</insert>
<delete id="deleteByExpireAt">
delete from server_node