From 9d1c417f46fe89801a42fa8c41dcde5f7dd9a853 Mon Sep 17 00:00:00 2001 From: "www.byteblogs.com" <598092184@qq.com> Date: Wed, 7 Jun 2023 23:14:24 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.0.0=201.=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E5=AE=A2=E6=88=B7=E7=AB=AF=E5=92=8C=E6=9C=8D=E5=8A=A1=E7=AB=AF?= =?UTF-8?q?=E6=B3=A8=E5=86=8C=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../easy/retry/server/support/Register.java | 27 ++++ .../support/cache/CacheRegisterTable.java | 123 ++++++++++++++++++ .../support/register/AbstractRegister.java | 74 +++++++++++ .../support/register/ClientRegister.java | 71 ++++++++++ .../support/register/RegisterContext.java | 29 +++++ .../support/register/ServerRegister.java | 78 +++++++++++ .../resources/mapper/ServerNodeMapper.xml | 8 +- 7 files changed, 403 insertions(+), 7 deletions(-) create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/Register.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheRegisterTable.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/AbstractRegister.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ClientRegister.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/RegisterContext.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ServerRegister.java diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/Register.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/Register.java new file mode 100644 index 00000000..cdec3f92 --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/Register.java @@ -0,0 +1,27 @@ +package com.aizuda.easy.retry.server.support; + +import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum; +import com.aizuda.easy.retry.server.support.register.RegisterContext; + +/** + * @author www.byteblogs.com + * @date 2023-06-07 + * @since 2.0 + */ +public interface Register { + + /** + * 节点类型 see: {@link NodeTypeEnum} + * + * @param type 1. 客户端 2.服务端 + * @return + */ + boolean supports(int type); + + /** + * 执行注册 + * + * @return + */ + boolean register(RegisterContext context); +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheRegisterTable.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheRegisterTable.java new file mode 100644 index 00000000..9dcdb6e0 --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheRegisterTable.java @@ -0,0 +1,123 @@ +package com.aizuda.easy.retry.server.support.cache; + +import akka.actor.ActorRef; +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.enums.TaskTypeEnum; +import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode; +import com.aizuda.easy.retry.server.support.Lifecycle; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.Comparator; +import java.util.Objects; +import java.util.Observable; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; + +/** + * POD注册表 + * + * @author www.byteblogs.com + * @date 2021-10-30 + * @since 2.0 + */ +@Component +@Data +@Slf4j +public class CacheRegisterTable implements Lifecycle { + + private static Cache> CACHE; + + /** + * 获取所有缓存 + * + * @return 缓存对象 + */ + public static ConcurrentMap get(String groupName) { + return CACHE.getIfPresent(groupName); + } + + /** + * 获取所有缓存 + * + * @return 缓存对象 + */ + public static ServerNode getServerNode(String groupName, String hostId) { + ConcurrentMap concurrentMap = CACHE.getIfPresent(groupName); + if (Objects.isNull(concurrentMap)) { + return null; + } + + return concurrentMap.get(hostId); + } + + /** + * 获取排序的ServerNode + * + * @return 缓存对象 + */ + public static TreeSet getServerNodeSet(String groupName) { + ConcurrentMap concurrentMap = CACHE.getIfPresent(groupName); + if (Objects.isNull(concurrentMap)) { + return new TreeSet<>(); + } + + return new TreeSet<>(Comparator.comparingInt(o -> o.getId().intValue())); + } + + /** + * 获取排序的ServerNode + * + * @return 缓存对象 + */ + public static Set gePodIpSet(String groupName) { + return getServerNodeSet(groupName).stream().map(ServerNode::getHostIp).collect(Collectors.toSet()); + } + + /** + * 获取所有缓存 + * + * @return 缓存对象 + */ + public static void put(String groupName, ServerNode serverNode) { + ConcurrentMap concurrentMap = CACHE.getIfPresent(groupName); + if (Objects.isNull(concurrentMap)) { + concurrentMap = new ConcurrentHashMap<>(); + } + concurrentMap.put(serverNode.getHostId(), serverNode); + } + + public static void remove(String groupName, String hostId) { + ConcurrentMap concurrentMap = CACHE.getIfPresent(groupName); + if (Objects.isNull(concurrentMap)) { + return; + } + + concurrentMap.remove(hostId); + } + + public static void expirationElimination(String groupName, String hostId) { + + } + + @Override + public void start() { + LogUtils.info(log, "CacheRegisterTable start"); + CACHE = CacheBuilder.newBuilder() + // 设置并发级别为cpu核心数 + .concurrencyLevel(Runtime.getRuntime().availableProcessors()) + .build(); + } + + @Override + public void close() { + LogUtils.info(log, "CacheRegisterTable stop"); + CACHE.invalidateAll(); + } +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/AbstractRegister.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/AbstractRegister.java new file mode 100644 index 00000000..b3ef8295 --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/AbstractRegister.java @@ -0,0 +1,74 @@ +package com.aizuda.easy.retry.server.support.register; + +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.config.SystemProperties; +import com.aizuda.easy.retry.server.persistence.mybatis.mapper.ServerNodeMapper; +import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode; +import com.aizuda.easy.retry.server.support.Lifecycle; +import com.aizuda.easy.retry.server.support.Register; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; + +import java.time.LocalDateTime; + +/** + * @author www.byteblogs.com + * @date 2023-06-07 + * @since 1.6.0 + */ +@Slf4j +public abstract class AbstractRegister implements Register, Lifecycle { + + @Autowired + private ServerNodeMapper serverNodeMapper; + @Autowired + private SystemProperties systemProperties; + + @Override + public boolean register(RegisterContext context) { + + beforeProcessor(context); + + ServerNode serverNode = initServerNode(context); + + return doRegister(context, serverNode); + } + + protected void refreshExpireAt(ServerNode serverNode) { + + try { + serverNodeMapper.insertOrUpdate(serverNode); + }catch (Exception e) { + LogUtils.error(log,"注册节点失败 groupName:[{}] hostIp:[{}]", + serverNode.getGroupName(), serverNode.getHostIp(), e); + } + } + + protected abstract void beforeProcessor(RegisterContext context); + + protected ServerNode initServerNode(RegisterContext context) { + + ServerNode serverNode = new ServerNode(); + serverNode.setHostId(context.getHostId()); + serverNode.setHostIp(context.getHostIp()); + serverNode.setGroupName(context.getGroupName()); + serverNode.setHostPort(systemProperties.getNettyPort()); + serverNode.setNodeType(getNodeType()); + serverNode.setCreateDt(LocalDateTime.now()); + serverNode.setContextPath(context.getContextPath()); + serverNode.setExpireAt(getExpireAt(context)); + + return serverNode; + } + + protected abstract LocalDateTime getExpireAt(RegisterContext context); + + + protected abstract boolean doRegister(RegisterContext context, ServerNode serverNode); + + + protected abstract Integer getNodeType(); + + + +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ClientRegister.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ClientRegister.java new file mode 100644 index 00000000..9d5347ca --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ClientRegister.java @@ -0,0 +1,71 @@ +package com.aizuda.easy.retry.server.support.register; + +import cn.hutool.core.lang.Assert; +import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum; +import com.aizuda.easy.retry.server.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * 客户端注册 + * + * @author www.byteblogs.com + * @date 2023-06-07 + * @since 1.6.0 + */ +@Component +public class ClientRegister extends AbstractRegister { + public static final int DELAY_TIME = 30; + protected static final LinkedBlockingQueue QUEUE = new LinkedBlockingQueue<>(); + + @Override + public boolean supports(int type) { + return getNodeType().equals(type); + } + + @Override + protected void beforeProcessor(RegisterContext context) { + + } + + @Override + protected LocalDateTime getExpireAt(RegisterContext context) { + return LocalDateTime.now().plusSeconds(DELAY_TIME); + } + + @Override + protected boolean doRegister(RegisterContext context, ServerNode serverNode) { + + Assert.isTrue(QUEUE.offer(serverNode), () -> + new EasyRetryServerException("add register Queue error. groupName:[{}] size:[{}]", + serverNode.getGroupName(), QUEUE.size())); + + return Boolean.TRUE; + } + + @Override + protected Integer getNodeType() { + return NodeTypeEnum.CLIENT.getType(); + } + + @Override + public void start() { + new Thread(() -> { + try { + ServerNode serverNode = QUEUE.take(); + refreshExpireAt(serverNode); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + }).start(); + } + + @Override + public void close() { + + } +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/RegisterContext.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/RegisterContext.java new file mode 100644 index 00000000..511d8fd9 --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/RegisterContext.java @@ -0,0 +1,29 @@ +package com.aizuda.easy.retry.server.support.register; + +import lombok.Data; + +import java.time.LocalDateTime; + +/** + * @author www.byteblogs.com + * @date 2023-06-07 + * @since 2.0 + */ +@Data +public class RegisterContext { + + private String groupName; + + private String hostId; + + private String hostIp; + + private Integer hostPort; + + private LocalDateTime expireAt; + + private Integer nodeType; + + private String contextPath; + +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ServerRegister.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ServerRegister.java new file mode 100644 index 00000000..37661835 --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/register/ServerRegister.java @@ -0,0 +1,78 @@ +package com.aizuda.easy.retry.server.support.register; + +import cn.hutool.core.util.IdUtil; +import cn.hutool.core.util.StrUtil; +import com.aizuda.easy.retry.common.core.context.SpringContext; +import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum; +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.common.core.util.HostUtils; +import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode; +import com.aizuda.easy.retry.server.support.Register; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * 服务端注册 + * + * @author www.byteblogs.com + * @date 2023-06-07 + * @since 1.6.0 + */ +@Component +public class ServerRegister extends AbstractRegister { + + private final ScheduledExecutorService serverRegisterNode = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r,"ServerRegisterNode")); + public static final int DELAY_TIME = 30; + public static final String CURRENT_CID; + static { + CURRENT_CID = IdUtil.simpleUUID(); + } + + @Override + public boolean supports(int type) { + return getNodeType().equals(type); + } + + + @Override + protected void beforeProcessor(RegisterContext context) { + context.setHostId(CURRENT_CID); + context.setHostIp(HostUtils.getIp()); + context.setGroupName(StrUtil.EMPTY); + context.setContextPath(StrUtil.EMPTY); + } + + @Override + protected LocalDateTime getExpireAt(RegisterContext context) { + return LocalDateTime.now().plusSeconds(DELAY_TIME); + } + + @Override + protected boolean doRegister(RegisterContext context, ServerNode serverNode) { + refreshExpireAt(serverNode); + return Boolean.TRUE; + } + + @Override + protected Integer getNodeType() { + return NodeTypeEnum.SERVER.getType(); + } + + @Override + public void start() { + Register register = SpringContext.getBean("serverRegister", Register.class); + serverRegisterNode.scheduleAtFixedRate(()->{ + register.register(new RegisterContext()); + }, 1, DELAY_TIME / 2, TimeUnit.SECONDS); + + } + + @Override + public void close() { + + } +} diff --git a/easy-retry-server/src/main/resources/mapper/ServerNodeMapper.xml b/easy-retry-server/src/main/resources/mapper/ServerNodeMapper.xml index b2c70de7..574a480c 100644 --- a/easy-retry-server/src/main/resources/mapper/ServerNodeMapper.xml +++ b/easy-retry-server/src/main/resources/mapper/ServerNodeMapper.xml @@ -23,13 +23,7 @@ #{hostPort,jdbcType=INTEGER}, #{expireAt,jdbcType=TIMESTAMP}, #{nodeType,jdbcType=TINYINT}, #{contextPath,jdbcType=VARCHAR}, #{createDt,jdbcType=TIMESTAMP} ) ON DUPLICATE KEY UPDATE - host_id = values(`host_id`), - host_ip = values(`host_ip`), - host_port = values(`host_port`), - expire_at = values(`expire_at`), - node_type = values(`node_type`), - create_dt = values(`create_dt`), - context_path = values(`context_path`) + expire_at = values(`expire_at`) delete from server_node