diff --git a/pom.xml b/pom.xml index 79ad6b8cf..232fc108e 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ 17 17 17 - 1.3.0-beta1 + 1.3.0-beta1.1 4.1.114.Final 5.8.32 3.5.9 diff --git a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/SnailJobGrpcClient.java b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/SnailJobGrpcClient.java index 287dc8590..1a0894a8e 100644 --- a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/SnailJobGrpcClient.java +++ b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/rpc/client/SnailJobGrpcClient.java @@ -4,20 +4,20 @@ import com.aizuda.snailjob.client.common.Lifecycle; import com.aizuda.snailjob.client.common.config.SnailJobProperties; import com.aizuda.snailjob.client.common.config.SnailJobProperties.RpcClientProperties; import com.aizuda.snailjob.client.common.config.SnailJobProperties.ThreadPoolConfig; +import com.aizuda.snailjob.client.common.event.SnailChannelReconnectEvent; +import com.aizuda.snailjob.common.core.context.SnailSpringContext; import com.aizuda.snailjob.common.core.enums.RpcTypeEnum; import com.aizuda.snailjob.common.log.SnailJobLog; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import io.grpc.DecompressorRegistry; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; +import io.grpc.*; +import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; import lombok.RequiredArgsConstructor; import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; + /** * @author: opensnail @@ -29,27 +29,31 @@ import java.util.concurrent.TimeUnit; public class SnailJobGrpcClient implements Lifecycle { private ManagedChannel channel; private final SnailJobProperties snailJobProperties; - + private static final ScheduledExecutorService SCHEDULE_EXECUTOR = Executors.newSingleThreadScheduledExecutor( + r -> new Thread(r, "sj-client-check")); @Override public void start() { if (RpcTypeEnum.GRPC != snailJobProperties.getRpcType()) { return; } - RpcClientProperties clientRpc = snailJobProperties.getClientRpc(); - // 创建 gRPC 频道 - String serverHost = GrpcChannel.getServerHost(); - channel = ManagedChannelBuilder.forAddress(serverHost, GrpcChannel.getServerPort()) - .executor(createGrpcExecutor(serverHost)) - .decompressorRegistry(DecompressorRegistry.getDefaultInstance()) - .maxInboundMessageSize(clientRpc.getMaxInboundMessageSize()) - .keepAliveTime(clientRpc.getKeepAliveTime().toMillis(), TimeUnit.MILLISECONDS) - .keepAliveTimeout(clientRpc.getKeepAliveTimeout().toMillis(), TimeUnit.MILLISECONDS) - .usePlaintext() - .build(); + channel = connection(); GrpcChannel.setChannel(channel); SnailJobLog.LOCAL.info("grpc client started connect to server"); + // 连接检测 + SCHEDULE_EXECUTOR.scheduleAtFixedRate(() -> { + ConnectivityState state = channel.getState(true); + if (state == ConnectivityState.TRANSIENT_FAILURE) { + try { + // 抛出重连事件 + SnailSpringContext.getContext().publishEvent(new SnailChannelReconnectEvent()); + } catch (Exception e) { + SnailJobLog.LOCAL.error("reconnect error ", e); + } + } + }, 0, 10, TimeUnit.SECONDS); + } @Override @@ -59,6 +63,20 @@ public class SnailJobGrpcClient implements Lifecycle { } } + public ManagedChannel connection() { + RpcClientProperties clientRpc = snailJobProperties.getClientRpc(); + // 创建 gRPC 频道 + String serverHost = GrpcChannel.getServerHost(); + return NettyChannelBuilder.forAddress(serverHost, GrpcChannel.getServerPort()) + .executor(createGrpcExecutor(serverHost)) + .decompressorRegistry(DecompressorRegistry.getDefaultInstance()) + .maxInboundMessageSize(clientRpc.getMaxInboundMessageSize()) + .keepAliveTime(clientRpc.getKeepAliveTime().toMillis(), TimeUnit.MILLISECONDS) + .keepAliveTimeout(clientRpc.getKeepAliveTimeout().toMillis(), TimeUnit.MILLISECONDS) + .usePlaintext().enableRetry().maxRetryAttempts(16) + .build(); + } + private ThreadPoolExecutor createGrpcExecutor(String serverIp) { RpcClientProperties clientRpc = snailJobProperties.getClientRpc(); ThreadPoolConfig threadPool = clientRpc.getClientTp();