feat:(1.3.0-beta1.1): 新增grpc 重连事件
This commit is contained in:
parent
8fde82c067
commit
f35eddc806
2
pom.xml
2
pom.xml
@ -21,7 +21,7 @@
|
||||
<java.version>17</java.version>
|
||||
<maven.compiler.source>17</maven.compiler.source>
|
||||
<maven.compiler.target>17</maven.compiler.target>
|
||||
<revision>1.3.0-beta1</revision>
|
||||
<revision>1.3.0-beta1.1</revision>
|
||||
<netty-all.version>4.1.114.Final</netty-all.version>
|
||||
<hutool-all.version>5.8.32</hutool-all.version>
|
||||
<mybatis-plus.version>3.5.9</mybatis-plus.version>
|
||||
|
@ -4,20 +4,20 @@ import com.aizuda.snailjob.client.common.Lifecycle;
|
||||
import com.aizuda.snailjob.client.common.config.SnailJobProperties;
|
||||
import com.aizuda.snailjob.client.common.config.SnailJobProperties.RpcClientProperties;
|
||||
import com.aizuda.snailjob.client.common.config.SnailJobProperties.ThreadPoolConfig;
|
||||
import com.aizuda.snailjob.client.common.event.SnailChannelReconnectEvent;
|
||||
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
|
||||
import com.aizuda.snailjob.common.core.enums.RpcTypeEnum;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import io.grpc.DecompressorRegistry;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import io.grpc.*;
|
||||
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
|
||||
/**
|
||||
* @author: opensnail
|
||||
@ -29,27 +29,31 @@ import java.util.concurrent.TimeUnit;
|
||||
public class SnailJobGrpcClient implements Lifecycle {
|
||||
private ManagedChannel channel;
|
||||
private final SnailJobProperties snailJobProperties;
|
||||
|
||||
private static final ScheduledExecutorService SCHEDULE_EXECUTOR = Executors.newSingleThreadScheduledExecutor(
|
||||
r -> new Thread(r, "sj-client-check"));
|
||||
@Override
|
||||
public void start() {
|
||||
if (RpcTypeEnum.GRPC != snailJobProperties.getRpcType()) {
|
||||
return;
|
||||
}
|
||||
|
||||
RpcClientProperties clientRpc = snailJobProperties.getClientRpc();
|
||||
// 创建 gRPC 频道
|
||||
String serverHost = GrpcChannel.getServerHost();
|
||||
channel = ManagedChannelBuilder.forAddress(serverHost, GrpcChannel.getServerPort())
|
||||
.executor(createGrpcExecutor(serverHost))
|
||||
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
|
||||
.maxInboundMessageSize(clientRpc.getMaxInboundMessageSize())
|
||||
.keepAliveTime(clientRpc.getKeepAliveTime().toMillis(), TimeUnit.MILLISECONDS)
|
||||
.keepAliveTimeout(clientRpc.getKeepAliveTimeout().toMillis(), TimeUnit.MILLISECONDS)
|
||||
.usePlaintext()
|
||||
.build();
|
||||
channel = connection();
|
||||
GrpcChannel.setChannel(channel);
|
||||
SnailJobLog.LOCAL.info("grpc client started connect to server");
|
||||
|
||||
// 连接检测
|
||||
SCHEDULE_EXECUTOR.scheduleAtFixedRate(() -> {
|
||||
ConnectivityState state = channel.getState(true);
|
||||
if (state == ConnectivityState.TRANSIENT_FAILURE) {
|
||||
try {
|
||||
// 抛出重连事件
|
||||
SnailSpringContext.getContext().publishEvent(new SnailChannelReconnectEvent());
|
||||
} catch (Exception e) {
|
||||
SnailJobLog.LOCAL.error("reconnect error ", e);
|
||||
}
|
||||
}
|
||||
}, 0, 10, TimeUnit.SECONDS);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -59,6 +63,20 @@ public class SnailJobGrpcClient implements Lifecycle {
|
||||
}
|
||||
}
|
||||
|
||||
public ManagedChannel connection() {
|
||||
RpcClientProperties clientRpc = snailJobProperties.getClientRpc();
|
||||
// 创建 gRPC 频道
|
||||
String serverHost = GrpcChannel.getServerHost();
|
||||
return NettyChannelBuilder.forAddress(serverHost, GrpcChannel.getServerPort())
|
||||
.executor(createGrpcExecutor(serverHost))
|
||||
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
|
||||
.maxInboundMessageSize(clientRpc.getMaxInboundMessageSize())
|
||||
.keepAliveTime(clientRpc.getKeepAliveTime().toMillis(), TimeUnit.MILLISECONDS)
|
||||
.keepAliveTimeout(clientRpc.getKeepAliveTimeout().toMillis(), TimeUnit.MILLISECONDS)
|
||||
.usePlaintext().enableRetry().maxRetryAttempts(16)
|
||||
.build();
|
||||
}
|
||||
|
||||
private ThreadPoolExecutor createGrpcExecutor(String serverIp) {
|
||||
RpcClientProperties clientRpc = snailJobProperties.getClientRpc();
|
||||
ThreadPoolConfig threadPool = clientRpc.getClientTp();
|
||||
|
Loading…
Reference in New Issue
Block a user