From ac1e67940908295622249bdcfd32dea18951adbe Mon Sep 17 00:00:00 2001 From: srzou Date: Thu, 5 Dec 2024 14:39:14 +0800 Subject: [PATCH] =?UTF-8?q?feat(sj=5F1.3.0-beta1):=20=E8=AE=BE=E7=BD=AE?= =?UTF-8?q?=E5=AE=A2=E6=88=B7=E7=AB=AFclient=E4=B8=BA-1=E6=97=B6=EF=BC=8C?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E9=9A=8F=E6=9C=BA=E7=AB=AF=E5=8F=A3=E5=8F=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../client/common/rpc/client/GrpcChannel.java | 61 ++++++++++++++++++- .../common/rpc/client/NettyChannel.java | 58 ++++++++++++++++++ 2 files changed, 117 insertions(+), 2 deletions(-) diff --git a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/GrpcChannel.java b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/GrpcChannel.java index e591a4b9e..c749ac14a 100644 --- a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/GrpcChannel.java +++ b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/GrpcChannel.java @@ -15,17 +15,19 @@ import com.aizuda.snailjob.common.core.grpc.auto.Metadata; import com.aizuda.snailjob.common.core.util.NetUtil; import com.aizuda.snailjob.common.log.SnailJobLog; import com.google.common.util.concurrent.ListenableFuture; -import com.google.protobuf.Any; -import com.google.protobuf.UnsafeByteOperations; import io.grpc.ManagedChannel; import io.grpc.MethodDescriptor; import io.grpc.protobuf.ProtoUtils; import org.springframework.boot.autoconfigure.web.ServerProperties; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.locks.ReentrantLock; /** * @author: opensnail @@ -57,6 +59,11 @@ public final class GrpcChannel { */ private static final String SNAIL_JOB_CLIENT_HOST = "snail-job.host"; + private static final Integer MIN_PORT = 15000; + private static final Integer MAX_PORT = 50000; + private static final ReentrantLock PORT_LOCK = new ReentrantLock(); + private static final Integer RANDOM_CLIENT_PORT = -1; + private static final String HOST_ID = IdUtil.getSnowflake().nextIdStr(); private static final int PORT; private static final String HOST; @@ -130,12 +137,62 @@ public final class GrpcChannel { // 获取客户端指定的端口 if (Objects.isNull(port)) { port = Optional.ofNullable(serverProperties.getPort()).orElse(PORT); + snailJobProperties.setPort(port); + SnailJobLog.LOCAL.info("snail job client port :{}", port); + } else if (port.equals(RANDOM_CLIENT_PORT)) { + // 使用随机算法获取端口 + PORT_LOCK.lock(); + try { + // 双重检查,避免重复获取端口 + if (snailJobProperties.getPort().equals(RANDOM_CLIENT_PORT)) { + port = getAvailablePort(); + snailJobProperties.setPort(port); + SnailJobLog.LOCAL.info("snail job client port :{}", port); + } else { + port = snailJobProperties.getPort(); + } + } finally { + PORT_LOCK.unlock(); + } } return port; } + /** + * 获取随机可用的端口 + * + * @return 可用端口号 + */ + private static Integer getAvailablePort() { + int port; + do { + port = MIN_PORT + (int) (Math.random()*(MAX_PORT - MIN_PORT)); + }while (!isPortAvailable(port)); + + return port; + } + + /** + * 检查端口是否可以使用 + * + * @param port 端口号 + * @return 是否可用 + */ + private static boolean isPortAvailable(int port) { + try (ServerSocket serverSocket = new ServerSocket()) { + // 设置端口重用 + serverSocket.setReuseAddress(true); + // 绑定端口 + serverSocket.bind(new InetSocketAddress(port)); + return true; + } catch (IOException e) { + return false; + } + } + + public static ListenableFuture sendOfUnary(String path, String body, final long reqId) { if (channel == null) { return null; diff --git a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/NettyChannel.java b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/NettyChannel.java index f3c4c58f5..d0ea26bc0 100644 --- a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/NettyChannel.java +++ b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/NettyChannel.java @@ -17,9 +17,13 @@ import io.netty.handler.codec.http.*; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.web.ServerProperties; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; import java.nio.charset.StandardCharsets; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.locks.ReentrantLock; /** * @author opensnail @@ -51,6 +55,11 @@ public class NettyChannel { */ private static final String SNAIL_JOB_CLIENT_HOST = "snail-job.host"; + private static final Integer MIN_PORT = 15000; + private static final Integer MAX_PORT = 50000; + private static final ReentrantLock PORT_LOCK = new ReentrantLock(); + private static final Integer RANDOM_CLIENT_PORT = -1; + private static final String HOST_ID = IdUtil.getSnowflake().nextIdStr(); private static final int PORT; private static final String HOST; @@ -124,11 +133,60 @@ public class NettyChannel { // 获取客户端指定的端口 if (Objects.isNull(port)) { port = Optional.ofNullable(serverProperties.getPort()).orElse(PORT); + snailJobProperties.setPort(port); + SnailJobLog.LOCAL.info("snail job client port :{}", port); + } else if (port.equals(RANDOM_CLIENT_PORT)) { + // 使用随机算法获取端口 + PORT_LOCK.lock(); + try { + // 双重检查,避免重复获取端口 + if (snailJobProperties.getPort().equals(RANDOM_CLIENT_PORT)) { + port = getAvailablePort(); + snailJobProperties.setPort(port); + SnailJobLog.LOCAL.info("snail job client port :{}", port); + } else { + port = snailJobProperties.getPort(); + } + } finally { + PORT_LOCK.unlock(); + } } return port; } + /** + * 获取随机可用的端口 + * + * @return 可用端口号 + */ + private static Integer getAvailablePort() { + int port; + do { + port = MIN_PORT + (int) (Math.random() * (MAX_PORT - MIN_PORT)); + } while (!isPortAvailable(port)); + + return port; + } + + /** + * 检查端口是否可以使用 + * + * @param port 端口号 + * @return 是否可用 + */ + private static boolean isPortAvailable(int port) { + try (ServerSocket serverSocket = new ServerSocket()) { + // 设置端口重用 + serverSocket.setReuseAddress(true); + // 绑定端口 + serverSocket.bind(new InetSocketAddress(port)); + return true; + } catch (IOException e) { + return false; + } + } + public static void setChannel(Channel channel) { NettyChannel.CHANNEL = channel;