feat:(grpc):

完成Grpc接入
This commit is contained in:
opensnail 2024-08-23 18:15:55 +08:00
parent 6d326c3b24
commit 0527313d26
62 changed files with 1148 additions and 897 deletions

54
pom.xml
View File

@ -34,6 +34,9 @@
<commons-logging.version>1.3.4</commons-logging.version>
<jakarta-validation.version>3.0.2</jakarta-validation.version>
<guava.version>32.0.0-jre</guava.version>
<grpc-java.version>1.64.2</grpc-java.version>
<proto-google-common-protos.version>2.43.0</proto-google-common-protos.version>
<protobuf-java.version>3.25.4</protobuf-java.version>
</properties>
<modules>
@ -174,11 +177,56 @@
<version>${commons-logging.version}</version>
<optional>true</optional>
</dependency>
<!-- gRPC dependency start -->
<dependency>
<groupId>net.devh</groupId>
<artifactId>grpc-client-spring-boot-starter</artifactId>
<version>3.1.0.RELEASE</version>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${grpc-java.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
<version>${grpc-java.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc-java.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc-java.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-util</artifactId>
<version>${grpc-java.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
<version>${grpc-java.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-common-protos</artifactId>
<version>${proto-google-common-protos.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf-java.version}</version>
</dependency>
<!-- gRPC dependency end -->
<!-- <dependency>-->
<!-- <groupId>net.devh</groupId>-->
<!-- <artifactId>grpc-client-spring-boot-starter</artifactId>-->
<!-- <version>3.1.0.RELEASE</version>-->
<!-- </dependency>-->
</dependencies>
</dependencyManagement>

View File

@ -87,18 +87,24 @@
<optional>true</optional>
</dependency>
<dependency>
<groupId>net.devh</groupId>
<artifactId>grpc-client-spring-boot-starter</artifactId>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.63.0</version>
<artifactId>grpc-protobuf</artifactId>
</dependency>
<dependency>
<groupId>net.devh</groupId>
<artifactId>grpc-server-spring-boot-starter</artifactId>
<version>3.1.0.RELEASE</version>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-util</artifactId>
</dependency>
</dependencies>

View File

@ -3,7 +3,7 @@ package com.aizuda.snailjob.client.common;
import com.aizuda.snailjob.client.common.annotation.Mapping;
import com.aizuda.snailjob.client.common.rpc.client.RequestMethod;
import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.server.model.dto.LogTaskDTO;
import com.aizuda.snailjob.server.model.dto.RetryTaskDTO;
@ -21,10 +21,10 @@ import java.util.List;
public interface NettyClient {
@Mapping(method = RequestMethod.POST, path = HTTP_PATH.BATCH_REPORT)
NettyResult reportRetryInfo(List<RetryTaskDTO> list);
SnailJobRpcResult reportRetryInfo(List<RetryTaskDTO> list);
@Mapping(method = RequestMethod.POST, path = HTTP_PATH.BATCH_LOG_REPORT)
NettyResult reportLogTask(List<LogTaskDTO> list);
SnailJobRpcResult reportLogTask(List<LogTaskDTO> list);
@Mapping(method = RequestMethod.POST, path = HTTP_PATH.SYNC_CONFIG)
Result syncRemoteConfig();

View File

@ -1,13 +1,17 @@
package com.aizuda.snailjob.client.common.config;
import com.aizuda.snailjob.common.core.alarm.email.SnailJobMailProperties;
import com.aizuda.snailjob.common.core.enums.RpcTypeEnum;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.TimeUnit;
@ -49,7 +53,12 @@ public class SnailJobProperties {
/**
* 指定客户端端口
*/
private Integer port = 1789;
private Integer port = 17889;
/**
* rpc类型
*/
private RpcTypeEnum rpcType = RpcTypeEnum.NETTY;
/**
* 重试调度日志远程上报滑动窗口配置
@ -61,11 +70,6 @@ public class SnailJobProperties {
*/
private ServerConfig server = new ServerConfig();
/**
* 调度线程池配置
*/
private DispatcherThreadPool dispatcherThreadPool = new DispatcherThreadPool();
/**
* 重试模块配置
*/
@ -82,6 +86,16 @@ public class SnailJobProperties {
*/
private String workspace;
/**
* 客户端Rpc配置
*/
private RpcClientProperties clientRpc = new RpcClientProperties();
/**
* 服务端Rpc配置
*/
private RpcServerProperties serverRpc = new RpcServerProperties();
@Data
public static class ServerConfig {
/**
@ -92,7 +106,7 @@ public class SnailJobProperties {
/**
* 服务端netty的端口号
*/
private int port = 1788;
private int port = 17888;
}
@Data
@ -146,7 +160,47 @@ public class SnailJobProperties {
}
@Data
public static class DispatcherThreadPool {
public static class Retry {
/**
* 远程上报滑动窗口配置
*/
private SlidingWindowConfig reportSlidingWindow = new SlidingWindowConfig();
}
@Data
public static class RpcServerProperties {
private int maxInboundMessageSize = 10 * 1024 * 1024;
private Duration keepAliveTime = Duration.of(2, ChronoUnit.HOURS);
private Duration keepAliveTimeout = Duration.of(20, ChronoUnit.SECONDS);
private Duration permitKeepAliveTime = Duration.of(5, ChronoUnit.MINUTES);
private ThreadPoolConfig dispatcherTp = new ThreadPoolConfig(16, 16, 1, TimeUnit.SECONDS , 10000);
}
@Data
public static class RpcClientProperties {
private int maxInboundMessageSize = 10 * 1024 * 1024;
private Duration keepAliveTime = Duration.of(2, ChronoUnit.HOURS);
private Duration keepAliveTimeout = Duration.of(20, ChronoUnit.SECONDS);
private Duration permitKeepAliveTime = Duration.of(5, ChronoUnit.MINUTES);
private ThreadPoolConfig clientTp = new ThreadPoolConfig(16, 16, 1, TimeUnit.SECONDS , 10000);
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class ThreadPoolConfig {
/**
* 核心线程池
@ -161,7 +215,7 @@ public class SnailJobProperties {
/**
* 线程存活时间
*/
private long keepAliveTime;
private long keepAliveTime = 1;
/**
* 线程存活时间(单位)
@ -173,13 +227,4 @@ public class SnailJobProperties {
*/
private int queueCapacity = 10000;
}
@Data
public static class Retry {
/**
* 远程上报滑动窗口配置
*/
private SlidingWindowConfig reportSlidingWindow = new SlidingWindowConfig();
}
}

View File

@ -5,7 +5,7 @@ import com.aizuda.snailjob.client.common.NettyClient;
import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.common.core.constant.SystemConstants.BEAT;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.log.SnailJobLog;
import org.springframework.stereotype.Component;
@ -26,7 +26,7 @@ public class ClientRegister implements Lifecycle {
public static final int REGISTER_TIME = 10;
static {
CLIENT = RequestBuilder.<NettyClient, NettyResult>newBuilder()
CLIENT = RequestBuilder.<NettyClient, SnailJobRpcResult>newBuilder()
.client(NettyClient.class)
.callback(
nettyResult -> {

View File

@ -4,7 +4,7 @@ import com.aizuda.snailjob.client.common.Lifecycle;
import com.aizuda.snailjob.client.common.NettyClient;
import com.aizuda.snailjob.client.common.cache.GroupVersionCache;
import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.model.dto.ConfigDTO;
@ -31,7 +31,7 @@ public class SyncRemoteConfig implements Lifecycle {
SCHEDULE_EXECUTOR.scheduleAtFixedRate(() -> {
try {
NettyClient client = RequestBuilder.<NettyClient, NettyResult>newBuilder()
NettyClient client = RequestBuilder.<NettyClient, SnailJobRpcResult>newBuilder()
.client(NettyClient.class)
.timeout(1000L)
.callback(nettyResult -> {

View File

@ -2,7 +2,7 @@ package com.aizuda.snailjob.client.common.log.report;
import com.aizuda.snailjob.client.common.NettyClient;
import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.window.Listener;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.model.dto.LogTaskDTO;
@ -20,7 +20,7 @@ import java.util.List;
@Slf4j
public class ReportLogListener implements Listener<LogTaskDTO> {
private static final NettyClient CLIENT = RequestBuilder.<NettyClient, NettyResult>newBuilder()
private static final NettyClient CLIENT = RequestBuilder.<NettyClient, SnailJobRpcResult>newBuilder()
.client(NettyClient.class)
.callback(nettyResult -> SnailJobLog.LOCAL.info("Data report log successfully requestId:[{}]", nettyResult.getReqId())).build();

View File

@ -136,7 +136,7 @@ public final class GrpcChannel {
}
public static ListenableFuture<GrpcResult> sendOfUnary(String path, String body) {
public static ListenableFuture<GrpcResult> sendOfUnary(String path, String body, final long reqId) {
if (channel == null) {
return null;
}
@ -176,6 +176,7 @@ public final class GrpcChannel {
GrpcSnailJobRequest snailJobRequest = GrpcSnailJobRequest
.newBuilder()
.setMetadata(metadata)
.setReqId(reqId)
.setBody(build)
.build();

View File

@ -1,36 +0,0 @@
package com.aizuda.snailjob.client.common.rpc.client;
import com.aizuda.snailjob.client.common.Lifecycle;
import com.aizuda.snailjob.client.common.config.SnailJobProperties;
import com.aizuda.snailjob.client.common.config.SnailJobProperties.ServerConfig;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import lombok.RequiredArgsConstructor;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
/**
* @author: opensnail
* @date : 2024-08-22
*/
@Component
@RequiredArgsConstructor
@Order(Ordered.HIGHEST_PRECEDENCE)
public class GrpcClient implements Lifecycle {
private final SnailJobProperties snailJobProperties;
@Override
public void start() {
// 创建 gRPC 频道
ServerConfig server = snailJobProperties.getServer();
ManagedChannel channel = ManagedChannelBuilder.forAddress(server.getHost(), server.getPort())
.usePlaintext()
.build();
GrpcChannel.setChannel(channel);
}
@Override
public void close() {
}
}

View File

@ -1,89 +0,0 @@
//package com.aizuda.snailjob.client.common.rpc.client;
//
//import com.aizuda.snailjob.common.core.grpc.auto.GrpcResult;
//import com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest;
//import com.aizuda.snailjob.common.core.grpc.auto.Metadata;
//import com.google.common.util.concurrent.FutureCallback;
//import com.google.common.util.concurrent.Futures;
//import com.google.common.util.concurrent.ListenableFuture;
//import com.google.protobuf.Any;
//import com.google.protobuf.UnsafeByteOperations;
//import io.grpc.Channel;
//import io.grpc.ManagedChannel;
//import io.grpc.ManagedChannelBuilder;
//import io.grpc.MethodDescriptor;
//import io.grpc.protobuf.ProtoUtils;
//
//import java.util.concurrent.LinkedBlockingDeque;
//import java.util.concurrent.ScheduledThreadPoolExecutor;
//import java.util.concurrent.ThreadPoolExecutor;
//import java.util.concurrent.TimeUnit;
//
///**
// * @author: shuguang.zhang
// * @date : 2024-08-21
// */
//public class GrpcClient2 {
// private final Channel channel;
//
// public GrpcClient2(Channel channel) {
// this.channel = channel;
// }
//
// public static void main(String[] args) {
// // 创建 gRPC 频道
// ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 1788)
// .usePlaintext()
// .build();
//
// // 实例化客户端
// GrpcClient2 grpcClient = new GrpcClient2(channel);
//
// String s = "zsg";
// Any build = Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(s.getBytes())).build();
// // 构造请求对象
// Metadata metadata = Metadata
// .newBuilder()
// .setClientIp("11")
// .setType("1")
// .putHeaders("aa", "bb")
// .build();
// GrpcSnailJobRequest request = GrpcSnailJobRequest.newBuilder().setMetadata(metadata).setBody(build).build();
//
// // 动态调用方法并获取响应 "UnaryRequest", "unaryRequest",
// ListenableFuture<GrpcResult> future = grpcClient.invokeMethod("unaryRequest", request);
// ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
// ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,2,1,TimeUnit.SECONDS, new LinkedBlockingDeque<>());
//// System.out.println("Response: " + response.getMessage());
// Futures.addCallback(future, new FutureCallback<GrpcResult>() {
// @Override
// public void onSuccess(final GrpcResult result) {
// System.out.println(result);
// }
//
// @Override
// public void onFailure(final Throwable t) {
// System.out.println(t);
// }
// }, threadPoolExecutor);
//
// Futures.withTimeout(future, 5, TimeUnit.SECONDS, scheduledThreadPoolExecutor);
// // 关闭频道
// channel.shutdown();
// }
//
// public ListenableFuture<GrpcResult> invokeMethod(String methodName, GrpcSnailJobRequest request) {
// MethodDescriptor<GrpcSnailJobRequest, GrpcResult> methodDescriptor =
// MethodDescriptor.<GrpcSnailJobRequest, GrpcResult>newBuilder()
// .setType(MethodDescriptor.MethodType.UNARY)
// .setFullMethodName(MethodDescriptor.generateFullMethodName("UnaryRequest", methodName))
// .setRequestMarshaller(ProtoUtils.marshaller(GrpcSnailJobRequest.getDefaultInstance()))
// .setResponseMarshaller(ProtoUtils.marshaller(GrpcResult.getDefaultInstance()))
// .build();
//
// // 创建动态代理调用方法
// return io.grpc.stub.ClientCalls.futureUnaryCall(
// channel.newCall(methodDescriptor, io.grpc.CallOptions.DEFAULT),
// request);
// }
//}

View File

@ -2,13 +2,14 @@ package com.aizuda.snailjob.client.common.rpc.client;
import cn.hutool.core.date.StopWatch;
import com.aizuda.snailjob.client.common.annotation.Mapping;
import com.aizuda.snailjob.client.common.config.SnailJobProperties;
import com.aizuda.snailjob.client.common.config.SnailJobProperties.RpcClientProperties;
import com.aizuda.snailjob.client.common.config.SnailJobProperties.ThreadPoolConfig;
import com.aizuda.snailjob.client.common.exception.SnailJobClientTimeOutException;
import com.aizuda.snailjob.common.core.enums.HeadersEnum;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.grpc.auto.GrpcResult;
import com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest;
import com.aizuda.snailjob.common.core.grpc.auto.Metadata;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
@ -16,21 +17,20 @@ import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Any;
import com.google.protobuf.UnsafeByteOperations;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
/**
@ -41,12 +41,17 @@ import java.util.function.Consumer;
* @since 1.3.0
*/
public class GrpcClientInvokeHandler<R extends Result<Object>> implements InvocationHandler {
public static final AtomicLong REQUEST_ID = new AtomicLong(0);
private final Consumer<R> consumer;
private final boolean async;
private final long timeout;
private final TimeUnit unit;
private static final ExecutorService executorService = Executors.newFixedThreadPool(10);
private static final ScheduledThreadPoolExecutor schedule = new ScheduledThreadPoolExecutor(1);
private static final ExecutorService executorService = createGrpcExecutor();
private static final ScheduledThreadPoolExecutor schedule = new ScheduledThreadPoolExecutor(1);
public GrpcClientInvokeHandler(boolean async, long timeout, TimeUnit unit, Consumer<R> consumer) {
this.consumer = consumer;
this.async = async;
@ -59,8 +64,10 @@ public class GrpcClientInvokeHandler<R extends Result<Object>> implements Invoca
StopWatch sw = new StopWatch();
Mapping annotation = method.getAnnotation(Mapping.class);
ListenableFuture<GrpcResult> future = GrpcChannel.sendOfUnary(annotation.path(), JsonUtil.toJsonString(args));
SnailJobLog.LOCAL.debug("request complete requestId:[{}] 耗时:[{}ms]", sw.getTotalTimeMillis());
long reqId = newId();
ListenableFuture<GrpcResult> future = GrpcChannel.sendOfUnary(annotation.path(), JsonUtil.toJsonString(args),
reqId);
SnailJobLog.LOCAL.debug("request complete requestId:[{}] 耗时:[{}ms]", sw.getTotalTimeMillis(), reqId);
if (async) {
if (future == null) {
@ -74,12 +81,13 @@ public class GrpcClientInvokeHandler<R extends Result<Object>> implements Invoca
ByteBuffer byteBuffer = result.getData().getValue().asReadOnlyByteBuffer();
String str = JsonUtil.parseObject(new ByteBufferBackedInputStream(byteBuffer), String.class);
consumer.accept((R) new NettyResult(result.getStatus(), result.getMessage(), str, 0));
consumer.accept(
(R) new SnailJobRpcResult(result.getStatus(), result.getMessage(), str, result.getReqId()));
}
@Override
public void onFailure(final Throwable t) {
consumer.accept((R) new NettyResult(StatusEnum.NO.getStatus(), t.getMessage(), null, 1));
consumer.accept((R) new SnailJobRpcResult(StatusEnum.NO.getStatus(), t.getMessage(), null, reqId));
}
}, executorService);
@ -95,10 +103,29 @@ public class GrpcClientInvokeHandler<R extends Result<Object>> implements Invoca
} catch (ExecutionException e) {
throw e.getCause();
} catch (TimeoutException e) {
throw new SnailJobClientTimeOutException("Request to remote interface timed out. path:[{}]", annotation.path());
throw new SnailJobClientTimeOutException("Request to remote interface timed out. path:[{}]",
annotation.path());
}
}
}
private static long newId() {
return REQUEST_ID.getAndIncrement();
}
protected static ThreadPoolExecutor createGrpcExecutor() {
SnailJobProperties snailJobProperties = SnailSpringContext.getBean(SnailJobProperties.class);
RpcClientProperties clientRpc = snailJobProperties.getClientRpc();
String serverIp = GrpcChannel.getServerHost().replaceAll("%", "-");
ThreadPoolConfig threadPool = clientRpc.getClientTp();
ThreadPoolExecutor grpcExecutor = new ThreadPoolExecutor(threadPool.getCorePoolSize(),
threadPool.getMaximumPoolSize(), threadPool.getKeepAliveTime(), TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(threadPool.getQueueCapacity()),
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("snail-job-grpc-client-executor-" + serverIp + "-%d")
.build());
grpcExecutor.allowCoreThreadTimeOut(true);
return grpcExecutor;
}
}

View File

@ -6,6 +6,7 @@ import com.aizuda.snailjob.common.core.constant.SystemConstants.BEAT;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.rpc.RpcContext;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
@ -43,8 +44,8 @@ public class NettyHttpClientHandler extends SimpleChannelInboundHandler<FullHttp
HttpHeaders headers = response.headers();
SnailJobLog.LOCAL.debug("Receive server data content:[{}], headers:[{}]", content, headers);
NettyResult nettyResult = JsonUtil.parseObject(content, NettyResult.class);
RpcContext.invoke(nettyResult.getReqId(), nettyResult, false);
SnailJobRpcResult snailJobRpcResult = JsonUtil.parseObject(content, SnailJobRpcResult.class);
RpcContext.invoke(snailJobRpcResult.getReqId(), snailJobRpcResult, false);
}

View File

@ -1,7 +1,9 @@
package com.aizuda.snailjob.client.common.rpc.client;
import com.aizuda.snailjob.client.common.Lifecycle;
import com.aizuda.snailjob.client.common.config.SnailJobProperties;
import com.aizuda.snailjob.client.common.handler.ClientRegister;
import com.aizuda.snailjob.common.core.enums.RpcTypeEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
@ -12,6 +14,7 @@ import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
@ -30,9 +33,10 @@ import java.util.concurrent.TimeUnit;
*/
@Getter
@Component
@RequiredArgsConstructor
@Order(Ordered.HIGHEST_PRECEDENCE)
public class NettyHttpConnectClient implements Lifecycle {
private final SnailJobProperties snailJobProperties;
private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
private static final Bootstrap bootstrap = new Bootstrap();
private Channel channel;
@ -40,6 +44,10 @@ public class NettyHttpConnectClient implements Lifecycle {
@Override
public void start() {
if (RpcTypeEnum.NETTY != snailJobProperties.getRpcType()) {
return;
}
try {
final NettyHttpConnectClient thisClient = this;
bootstrap.group(nioEventLoopGroup)

View File

@ -1,8 +1,12 @@
package com.aizuda.snailjob.client.common.rpc.client;
import com.aizuda.snailjob.client.common.config.SnailJobProperties;
import com.aizuda.snailjob.client.common.exception.SnailJobClientException;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.RpcTypeEnum;
import com.aizuda.snailjob.common.core.model.Result;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@ -67,10 +71,18 @@ public class RequestBuilder<T, R extends Result<Object>> {
throw new SnailJobClientException("class not found exception to: [{}]", clintInterface.getName());
}
// RpcClientInvokeHandler<R> rpcClientInvokeHandler = new RpcClientInvokeHandler<>(async, timeout, unit, callback);
GrpcClientInvokeHandler invokeHandler = new GrpcClientInvokeHandler(async, timeout, unit, callback);
InvocationHandler invocationHandler;
SnailJobProperties properties = SnailSpringContext.getBean(SnailJobProperties.class);
RpcTypeEnum rpcType = properties.getRpcType();
if (Objects.isNull(rpcType) || RpcTypeEnum.NETTY == rpcType) {
invocationHandler= new RpcClientInvokeHandler<>(async, timeout, unit,
callback);
} else {
invocationHandler = new GrpcClientInvokeHandler<>(async, timeout, unit, callback);
}
return (T) Proxy.newProxyInstance(clintInterface.getClassLoader(),
new Class[]{clintInterface}, invokeHandler);
new Class[]{clintInterface}, invocationHandler);
}
}

View File

@ -6,7 +6,7 @@ import com.aizuda.snailjob.client.common.annotation.Mapping;
import com.aizuda.snailjob.client.common.exception.SnailJobClientException;
import com.aizuda.snailjob.client.common.exception.SnailJobClientTimeOutException;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.rpc.RpcContext;
@ -67,7 +67,7 @@ public class RpcClientInvokeHandler<R extends Result<Object>> implements Invocat
newFuture.whenComplete((r, t) -> {
if (Objects.nonNull(t)) {
consumer.accept(
(R) new NettyResult(StatusEnum.NO.getStatus(), t.getMessage(), null, snailJobRequest.getReqId()));
(R) new SnailJobRpcResult(StatusEnum.NO.getStatus(), t.getMessage(), null, snailJobRequest.getReqId()));
} else {
consumer.accept(r);
}

View File

@ -0,0 +1,74 @@
package com.aizuda.snailjob.client.common.rpc.client;
import com.aizuda.snailjob.client.common.Lifecycle;
import com.aizuda.snailjob.client.common.config.SnailJobProperties;
import com.aizuda.snailjob.client.common.config.SnailJobProperties.RpcClientProperties;
import com.aizuda.snailjob.client.common.config.SnailJobProperties.ThreadPoolConfig;
import com.aizuda.snailjob.common.core.enums.RpcTypeEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.grpc.DecompressorRegistry;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import lombok.RequiredArgsConstructor;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author: opensnail
* @date : 2024-08-22
*/
@Component
@RequiredArgsConstructor
@Order(Ordered.HIGHEST_PRECEDENCE)
public class SnailJobGrpcClient implements Lifecycle {
private ManagedChannel channel;
private final SnailJobProperties snailJobProperties;
@Override
public void start() {
if (RpcTypeEnum.GPRC != snailJobProperties.getRpcType()) {
return;
}
RpcClientProperties clientRpc = snailJobProperties.getClientRpc();
// 创建 gRPC 频道
String serverHost = GrpcChannel.getServerHost();
channel = ManagedChannelBuilder.forAddress(serverHost, GrpcChannel.getServerPort())
.executor(createGrpcExecutor(serverHost))
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
.maxInboundMessageSize(clientRpc.getMaxInboundMessageSize())
.keepAliveTime(clientRpc.getKeepAliveTime().toMillis(), TimeUnit.MILLISECONDS)
.keepAliveTimeout(clientRpc.getKeepAliveTimeout().toMillis(), TimeUnit.MILLISECONDS)
.usePlaintext()
.build();
GrpcChannel.setChannel(channel);
SnailJobLog.LOCAL.info("grpc client started connect to server");
}
@Override
public void close() {
if (channel != null && !channel.isShutdown()) {
channel.shutdownNow();
}
}
private ThreadPoolExecutor createGrpcExecutor(String serverIp) {
RpcClientProperties clientRpc = snailJobProperties.getClientRpc();
ThreadPoolConfig threadPool = clientRpc.getClientTp();
serverIp = serverIp.replaceAll("%", "-");
ThreadPoolExecutor grpcExecutor = new ThreadPoolExecutor(threadPool.getCorePoolSize(),
threadPool.getMaximumPoolSize(), threadPool.getKeepAliveTime(), TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(threadPool.getQueueCapacity()),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("snail-job-grpc-client-executor-" + serverIp + "-%d")
.build());
grpcExecutor.allowCoreThreadTimeOut(true);
return grpcExecutor;
}
}

View File

@ -1,7 +1,5 @@
package com.aizuda.snailjob.client.common.rpc.server;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
@ -21,10 +19,9 @@ public class GrpcInterceptor implements ServerInterceptor {
final ServerCallHandler<ReqT, RespT> serverCallHandler) {
String fullMethodName = serverCall.getMethodDescriptor().getFullMethodName();
long start = System.currentTimeMillis();
Context context = Context.current();
try {
return Contexts.interceptCall(context, serverCall, metadata, serverCallHandler);
return serverCallHandler.startCall(serverCall, metadata);
} finally {
log.info("method invoked: {} cast:{}ms", fullMethodName, System.currentTimeMillis() - start);
}

View File

@ -1,65 +0,0 @@
package com.aizuda.snailjob.client.common.rpc.server;
import com.aizuda.snailjob.client.common.config.SnailJobProperties;
import com.aizuda.snailjob.client.common.config.SnailJobProperties.DispatcherThreadPool;
import com.aizuda.snailjob.client.common.rpc.supports.handler.SnailDispatcherRequestHandler;
import com.aizuda.snailjob.client.common.rpc.supports.handler.UnaryRequestHandler;
import com.aizuda.snailjob.common.core.grpc.auto.GrpcResult;
import com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest;
import io.grpc.MethodDescriptor;
import io.grpc.ServerBuilder;
import io.grpc.ServerCallHandler;
import io.grpc.ServerServiceDefinition;
import io.grpc.protobuf.ProtoUtils;
import io.grpc.stub.ServerCalls;
import lombok.RequiredArgsConstructor;
import net.devh.boot.grpc.server.serverfactory.GrpcServerConfigurer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@RequiredArgsConstructor
public class GrpcServerConsumerConfig implements GrpcServerConfigurer {
private final SnailDispatcherRequestHandler handler;
private final SnailJobProperties snailJobProperties;
@Override
public void accept(ServerBuilder<?> serverBuilder) {
DispatcherThreadPool threadPool = snailJobProperties.getDispatcherThreadPool();
ThreadPoolExecutor dispatcherThreadPool = new ThreadPoolExecutor(
threadPool.getCorePoolSize(), threadPool.getMaximumPoolSize(), threadPool.getKeepAliveTime(),
threadPool.getTimeUnit(), new LinkedBlockingQueue<>(threadPool.getQueueCapacity()),
new CustomizableThreadFactory("snail-grpc-server-"));
// 创建服务定义
ServerServiceDefinition serviceDefinition = createServiceDefinition(
"UnaryRequest", "unaryRequest",
new UnaryRequestHandler(dispatcherThreadPool, handler));
serverBuilder.addService(serviceDefinition);
serverBuilder.intercept(new GrpcInterceptor());
}
public static ServerServiceDefinition createServiceDefinition(
String serviceName,
String methodName,
ServerCalls.UnaryMethod<GrpcSnailJobRequest, GrpcResult> unaryMethod) {
MethodDescriptor<GrpcSnailJobRequest, GrpcResult> methodDescriptor =
MethodDescriptor.<GrpcSnailJobRequest, GrpcResult>newBuilder()
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName(MethodDescriptor.generateFullMethodName(serviceName, methodName))
.setRequestMarshaller(ProtoUtils.marshaller(GrpcSnailJobRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(GrpcResult.getDefaultInstance()))
.build();
ServerCallHandler<GrpcSnailJobRequest, GrpcResult> callHandler = ServerCalls.asyncUnaryCall(unaryMethod);
return ServerServiceDefinition.builder(serviceName)
.addMethod(methodDescriptor, callHandler)
.build();
}
}

View File

@ -0,0 +1,136 @@
package com.aizuda.snailjob.client.common.rpc.server;
import com.aizuda.snailjob.client.common.Lifecycle;
import com.aizuda.snailjob.client.common.config.SnailJobProperties;
import com.aizuda.snailjob.client.common.config.SnailJobProperties.RpcServerProperties;
import com.aizuda.snailjob.client.common.config.SnailJobProperties.ThreadPoolConfig;
import com.aizuda.snailjob.client.common.exception.SnailJobClientException;
import com.aizuda.snailjob.client.common.rpc.supports.handler.SnailDispatcherRequestHandler;
import com.aizuda.snailjob.client.common.rpc.supports.handler.UnaryRequestHandler;
import com.aizuda.snailjob.common.core.constant.GrpcServerConstants;
import com.aizuda.snailjob.common.core.enums.RpcTypeEnum;
import com.aizuda.snailjob.common.core.grpc.auto.GrpcResult;
import com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.ServerServiceDefinition;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.protobuf.ProtoUtils;
import io.grpc.stub.ServerCalls;
import io.grpc.util.MutableHandlerRegistry;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Grpc server
*
* @author: opensnail
* @date : 2024-04-12 23:03
* @since 3.3.0
*/
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
@RequiredArgsConstructor
@Getter
public class SnailGrpcServer implements Lifecycle {
private final SnailJobProperties snailJobProperties;
private final SnailDispatcherRequestHandler snailDispatcherRequestHandler;
private volatile boolean started = false;
private Server server;
@Override
public void start() {
if (started || RpcTypeEnum.GPRC != snailJobProperties.getRpcType()) {
return;
}
RpcServerProperties grpc = snailJobProperties.getServerRpc();
final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
addServices(handlerRegistry, new GrpcInterceptor());
NettyServerBuilder builder = NettyServerBuilder.forPort(snailJobProperties.getPort())
.executor(createGrpcExecutor(grpc.getDispatcherTp()));
Duration keepAliveTime = grpc.getKeepAliveTime();
Duration keepAliveTimeOut = grpc.getKeepAliveTimeout();
Duration permitKeepAliveTime = grpc.getPermitKeepAliveTime();
server = builder.maxInboundMessageSize(grpc.getMaxInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
.compressorRegistry(CompressorRegistry.getDefaultInstance())
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
.keepAliveTime(keepAliveTime.toMillis(), TimeUnit.MILLISECONDS)
.keepAliveTimeout(keepAliveTimeOut.toMillis(), TimeUnit.MILLISECONDS)
.permitKeepAliveTime(permitKeepAliveTime.toMillis(), TimeUnit.MILLISECONDS)
.build();
try {
server.start();
this.started = true;
SnailJobLog.LOCAL.info("------> snail-job remoting server start success, grpc = {}, port = {}",
SnailGrpcServer.class.getName(), snailJobProperties.getPort());
} catch (IOException e) {
SnailJobLog.LOCAL.error("--------> snail-job remoting server error.", e);
started = false;
throw new SnailJobClientException("snail-job server start error");
}
}
@Override
public void close() {
if (server != null) {
server.shutdownNow();
}
}
private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {
// 创建服务UNARY类型定义
ServerServiceDefinition serviceDefinition = createUnaryServiceDefinition(
GrpcServerConstants.UNARY_SERVICE_NAME, GrpcServerConstants.UNARY_METHOD_NAME,
new UnaryRequestHandler(snailJobProperties.getServerRpc().getDispatcherTp(), snailDispatcherRequestHandler));
handlerRegistry.addService(serviceDefinition);
handlerRegistry.addService(ServerInterceptors.intercept(serviceDefinition, serverInterceptor));
}
public static ServerServiceDefinition createUnaryServiceDefinition(
String serviceName,
String methodName,
ServerCalls.UnaryMethod<GrpcSnailJobRequest, GrpcResult> unaryMethod) {
MethodDescriptor<GrpcSnailJobRequest, GrpcResult> methodDescriptor =
MethodDescriptor.<GrpcSnailJobRequest, GrpcResult>newBuilder()
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName(MethodDescriptor.generateFullMethodName(serviceName, methodName))
.setRequestMarshaller(ProtoUtils.marshaller(GrpcSnailJobRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(GrpcResult.getDefaultInstance()))
.build();
return ServerServiceDefinition.builder(serviceName)
.addMethod(methodDescriptor, ServerCalls.asyncUnaryCall(unaryMethod))
.build();
}
private ThreadPoolExecutor createGrpcExecutor(final ThreadPoolConfig threadPool) {
ThreadPoolExecutor grpcExecutor = new ThreadPoolExecutor(threadPool.getCorePoolSize(),
threadPool.getMaximumPoolSize(), threadPool.getKeepAliveTime(), TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(threadPool.getQueueCapacity()),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("snail-job-grpc-server-executor-%d")
.build());
grpcExecutor.allowCoreThreadTimeOut(true);
return grpcExecutor;
}
}

View File

@ -7,6 +7,7 @@ import com.aizuda.snailjob.client.common.exception.SnailJobClientException;
import com.aizuda.snailjob.client.common.rpc.supports.handler.NettyHttpServerHandler;
import com.aizuda.snailjob.client.common.rpc.supports.handler.SnailDispatcherRequestHandler;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.RpcTypeEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
@ -95,6 +96,10 @@ public class SnailNettyHttpServer implements Runnable, Lifecycle {
@Override
public void start() {
if (RpcTypeEnum.NETTY != snailJobProperties.getRpcType()) {
return;
}
thread = new Thread(this);
thread.setDaemon(true);
thread.start();

View File

@ -1,10 +1,11 @@
package com.aizuda.snailjob.client.common.rpc.supports.handler;
import com.aizuda.snailjob.client.common.config.SnailJobProperties;
import com.aizuda.snailjob.client.common.config.SnailJobProperties.DispatcherThreadPool;
import com.aizuda.snailjob.client.common.config.SnailJobProperties.RpcServerProperties;
import com.aizuda.snailjob.client.common.config.SnailJobProperties.ThreadPoolConfig;
import com.aizuda.snailjob.client.common.rpc.supports.http.HttpResponse;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import io.netty.buffer.Unpooled;
@ -35,8 +36,8 @@ public class NettyHttpServerHandler extends SimpleChannelInboundHandler<FullHttp
this.dispatcher = snailDispatcherRequestHandler;
// 获取线程池配置
DispatcherThreadPool threadPool = snailJobProperties.getDispatcherThreadPool();
RpcServerProperties rpcServerProperties = snailJobProperties.getServerRpc();
ThreadPoolConfig threadPool = rpcServerProperties.getDispatcherTp();
dispatcherThreadPool = new ThreadPoolExecutor(
threadPool.getCorePoolSize(), threadPool.getMaximumPoolSize(), threadPool.getKeepAliveTime(),
threadPool.getTimeUnit(), new LinkedBlockingQueue<>(threadPool.getQueueCapacity()),
@ -68,17 +69,17 @@ public class NettyHttpServerHandler extends SimpleChannelInboundHandler<FullHttp
// 执行任务
dispatcherThreadPool.execute(() -> {
NettyResult nettyResult = null;
SnailJobRpcResult snailJobRpcResult = null;
try {
nettyResult = dispatcher.dispatch(nettyHttpRequest);
snailJobRpcResult = dispatcher.dispatch(nettyHttpRequest);
} catch (Exception e) {
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
nettyResult = new NettyResult(StatusEnum.NO.getStatus(), e.getMessage(), null, retryRequest.getReqId());
snailJobRpcResult = new SnailJobRpcResult(StatusEnum.NO.getStatus(), e.getMessage(), null, retryRequest.getReqId());
} finally {
writeResponse(channelHandlerContext,
HttpUtil.isKeepAlive(fullHttpRequest),
nettyHttpRequest.getHttpResponse(),
JsonUtil.toJsonString(nettyResult)
JsonUtil.toJsonString(snailJobRpcResult)
);
}
});

View File

@ -15,7 +15,7 @@ import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest;
import com.aizuda.snailjob.common.core.grpc.auto.Metadata;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
@ -46,9 +46,9 @@ import java.util.stream.Collectors;
public class SnailDispatcherRequestHandler {
private final SnailJobProperties snailJobProperties;
public NettyResult dispatch(NettyHttpRequest request) {
public SnailJobRpcResult dispatch(NettyHttpRequest request) {
NettyResult nettyResult = new NettyResult();
SnailJobRpcResult snailJobRpcResult = new SnailJobRpcResult();
List<HandlerInterceptor> handlerInterceptors = handlerInterceptors();
SnailJobRequest retryRequest = JsonUtil.parseObject(request.getContent(), SnailJobRequest.class);
@ -81,7 +81,7 @@ public class SnailDispatcherRequestHandler {
for (final HandlerInterceptor handlerInterceptor : handlerInterceptors) {
if (!handlerInterceptor.preHandle(httpRequest, httpResponse, endPointInfo)) {
return nettyResult;
return snailJobRpcResult;
}
}
@ -98,12 +98,12 @@ public class SnailDispatcherRequestHandler {
}
} catch (Exception ex) {
SnailJobLog.LOCAL.error("http request error. [{}]", request.getContent(), ex);
nettyResult.setMessage(ex.getMessage()).setStatus(StatusEnum.NO.getStatus());
snailJobRpcResult.setMessage(ex.getMessage()).setStatus(StatusEnum.NO.getStatus());
e = ex;
} finally {
nettyResult.setReqId(retryRequest.getReqId());
snailJobRpcResult.setReqId(retryRequest.getReqId());
if (Objects.nonNull(resultObj)) {
nettyResult.setData(resultObj.getData())
snailJobRpcResult.setData(resultObj.getData())
.setMessage(resultObj.getMessage())
.setStatus(resultObj.getStatus());
}
@ -113,11 +113,11 @@ public class SnailDispatcherRequestHandler {
}
}
return nettyResult;
return snailJobRpcResult;
}
public NettyResult dispatch(GrpcRequest request) {
NettyResult nettyResult = new NettyResult();
public SnailJobRpcResult dispatch(GrpcRequest request) {
SnailJobRpcResult snailJobRpcResult = new SnailJobRpcResult();
HttpRequest httpRequest = request.getHttpRequest();
HttpResponse httpResponse = request.getHttpResponse();
@ -155,7 +155,7 @@ public class SnailDispatcherRequestHandler {
for (final HandlerInterceptor handlerInterceptor : handlerInterceptors) {
if (!handlerInterceptor.preHandle(httpRequest, httpResponse, endPointInfo)) {
return nettyResult;
return snailJobRpcResult;
}
}
@ -172,12 +172,12 @@ public class SnailDispatcherRequestHandler {
}
} catch (Exception ex) {
SnailJobLog.LOCAL.error("http request error. [{}]", snailJobRequest, ex);
nettyResult.setMessage(ex.getMessage()).setStatus(StatusEnum.NO.getStatus());
snailJobRpcResult.setMessage(ex.getMessage()).setStatus(StatusEnum.NO.getStatus());
e = ex;
} finally {
nettyResult.setReqId(0);
snailJobRpcResult.setReqId(0);
if (Objects.nonNull(resultObj)) {
nettyResult.setData(resultObj.getData())
snailJobRpcResult.setData(resultObj.getData())
.setMessage(resultObj.getMessage())
.setStatus(resultObj.getStatus());
}
@ -187,7 +187,7 @@ public class SnailDispatcherRequestHandler {
}
}
return nettyResult;
return snailJobRpcResult;
}
private static List<HandlerInterceptor> handlerInterceptors() {

View File

@ -1,37 +1,43 @@
package com.aizuda.snailjob.client.common.rpc.supports.handler;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.client.common.config.SnailJobProperties.ThreadPoolConfig;
import com.aizuda.snailjob.client.common.rpc.supports.http.HttpRequest;
import com.aizuda.snailjob.client.common.rpc.supports.http.HttpResponse;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.grpc.auto.GrpcResult;
import com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest;
import com.aizuda.snailjob.common.core.grpc.auto.Metadata;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.google.protobuf.Any;
import com.google.protobuf.UnsafeByteOperations;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
import io.netty.handler.codec.http.HttpUtil;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author: opensnail
* @date : 2024-08-22
*/
public class UnaryRequestHandler implements ServerCalls.UnaryMethod<GrpcSnailJobRequest, GrpcResult>{
public class UnaryRequestHandler implements ServerCalls.UnaryMethod<GrpcSnailJobRequest, GrpcResult> {
private final ThreadPoolExecutor threadPoolExecutor;
private final ThreadPoolExecutor dispatcherThreadPool;
private final SnailDispatcherRequestHandler dispatcher;
public UnaryRequestHandler(final ThreadPoolExecutor dispatcherThreadPool,
public UnaryRequestHandler(final ThreadPoolConfig dispatcherThreadPool,
final SnailDispatcherRequestHandler handler) {
this.threadPoolExecutor = dispatcherThreadPool;
this.dispatcher = handler;
this.dispatcherThreadPool = new ThreadPoolExecutor(
dispatcherThreadPool.getCorePoolSize(), dispatcherThreadPool.getMaximumPoolSize(),
dispatcherThreadPool.getKeepAliveTime(),
dispatcherThreadPool.getTimeUnit(), new LinkedBlockingQueue<>(dispatcherThreadPool.getQueueCapacity()),
new CustomizableThreadFactory("snail-grpc-server-"));
}
@Override
@ -40,24 +46,25 @@ public class UnaryRequestHandler implements ServerCalls.UnaryMethod<GrpcSnailJob
Metadata metadata = snailJobRequest.getMetadata();
GrpcRequest grpcRequest = GrpcRequest.builder()
.httpRequest(new HttpRequest( metadata.getHeadersMap(), metadata.getUri()))
.httpRequest(new HttpRequest(metadata.getHeadersMap(), metadata.getUri()))
.httpResponse(new HttpResponse())
.snailJobRequest(snailJobRequest)
.build();
// 执行任务
threadPoolExecutor.execute(() -> {
NettyResult nettyResult = null;
dispatcherThreadPool.execute(() -> {
SnailJobRpcResult snailJobRpcResult = null;
try {
nettyResult = dispatcher.dispatch(grpcRequest);
snailJobRpcResult = dispatcher.dispatch(grpcRequest);
} catch (Exception e) {
nettyResult = new NettyResult(StatusEnum.NO.getStatus(), e.getMessage(), null, 0);
snailJobRpcResult = new SnailJobRpcResult(StatusEnum.NO.getStatus(), e.getMessage(), null, 0);
} finally {
GrpcResult grpcResult = GrpcResult.newBuilder()
.setStatus(nettyResult.getStatus())
.setMessage(Optional.ofNullable(nettyResult.getMessage()).orElse(StrUtil.EMPTY))
.setStatus(snailJobRpcResult.getStatus())
.setMessage(Optional.ofNullable(snailJobRpcResult.getMessage()).orElse(StrUtil.EMPTY))
.setData(Any.newBuilder()
.setValue(UnsafeByteOperations.unsafeWrap(JsonUtil.toJsonString(nettyResult.getData()).getBytes()))
.setValue(UnsafeByteOperations.unsafeWrap(
JsonUtil.toJsonString(snailJobRpcResult.getData()).getBytes()))
.build())
.build();

View File

@ -19,7 +19,7 @@ import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.JobContext;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.util.EnvironmentUtils;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.NetUtil;
@ -54,7 +54,7 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
> 异常:{} \s
\s""";
private static final JobNettyClient CLIENT = RequestBuilder.<JobNettyClient, NettyResult>newBuilder()
private static final JobNettyClient CLIENT = RequestBuilder.<JobNettyClient, SnailJobRpcResult>newBuilder()
.client(JobNettyClient.class)
.callback(nettyResult -> {
if (nettyResult.getStatus() == StatusEnum.NO.getStatus()) {

View File

@ -10,7 +10,7 @@ import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException;
import com.aizuda.snailjob.common.core.model.JobContext;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
@ -29,7 +29,7 @@ import java.util.Objects;
*/
public final class MapInvokeHandler implements InvocationHandler {
private static final JobNettyClient CLIENT = RequestBuilder.<JobNettyClient, NettyResult>newBuilder()
private static final JobNettyClient CLIENT = RequestBuilder.<JobNettyClient, SnailJobRpcResult>newBuilder()
.client(JobNettyClient.class)
.async(Boolean.FALSE)
.build();

View File

@ -11,7 +11,7 @@ import com.aizuda.snailjob.common.core.alarm.AlarmContext;
import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.util.EnvironmentUtils;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.NetUtil;
@ -52,7 +52,7 @@ public class ReportListener implements Listener<RetryTaskDTO> {
"> 时间:{} \n" +
"> 异常:{} \n";
private static final NettyClient CLIENT = RequestBuilder.<NettyClient, NettyResult>newBuilder()
private static final NettyClient CLIENT = RequestBuilder.<NettyClient, SnailJobRpcResult>newBuilder()
.client(NettyClient.class)
.callback(nettyResult -> SnailJobLog.LOCAL.info("Data report successfully requestId:[{}]", nettyResult.getReqId())).build();

View File

@ -8,7 +8,7 @@ import com.aizuda.snailjob.client.core.retryer.RetryerInfo;
import com.aizuda.snailjob.common.core.alarm.AlarmContext;
import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory;
import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.util.EnvironmentUtils;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.NetUtil;
@ -73,7 +73,7 @@ public class SyncReport extends AbstractReport {
RetryTaskDTO retryTaskDTO = buildRetryTaskDTO(scene, targetClassName, args);
NettyClient client = RequestBuilder.<NettyClient, NettyResult>newBuilder()
NettyClient client = RequestBuilder.<NettyClient, SnailJobRpcResult>newBuilder()
.client(NettyClient.class)
.async(Boolean.FALSE)
.timeout(timeout)
@ -81,7 +81,7 @@ public class SyncReport extends AbstractReport {
.build();
try {
NettyResult result = client.reportRetryInfo(Collections.singletonList(retryTaskDTO));
SnailJobRpcResult result = client.reportRetryInfo(Collections.singletonList(retryTaskDTO));
SnailJobLog.LOCAL.debug("Data report result result:[{}]", JsonUtil.toJsonString(result));
return (Boolean) result.getData();
} catch (Exception e) {

View File

@ -69,28 +69,21 @@
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</dependency>
<!-- 根据message定义生成java的model类 -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.63.0</version>
</dependency>
<!-- 根据service定义生成java的服务端代码和客户端代码 -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.63.0</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.25.4</version>
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-common-protos</artifactId>
<version>2.43.0</version>
</dependency>
<!-- 生成的代码使用到了javax的注解 -->
<dependency>

View File

@ -0,0 +1,12 @@
package com.aizuda.snailjob.common.core.constant;
/**
* @author: opensnail
* @date : 2024-08-23
*/
public interface GrpcServerConstants {
String UNARY_SERVICE_NAME = "UnaryRequest";
String UNARY_METHOD_NAME = "unaryRequest";
}

View File

@ -0,0 +1,11 @@
package com.aizuda.snailjob.common.core.enums;
/**
* @author: opensnail
* @date : 2024-08-23
*/
public enum RpcTypeEnum {
GPRC,
NETTY
}

View File

@ -40,10 +40,21 @@ private static final long serialVersionUID = 0L;
}
private int bitField0_;
public static final int STATUS_FIELD_NUMBER = 1;
public static final int REQID_FIELD_NUMBER = 1;
private long reqId_ = 0L;
/**
* <code>int64 reqId = 1;</code>
* @return The reqId.
*/
@java.lang.Override
public long getReqId() {
return reqId_;
}
public static final int STATUS_FIELD_NUMBER = 2;
private int status_ = 0;
/**
* <code>int32 status = 1;</code>
* <code>int32 status = 2;</code>
* @return The status.
*/
@java.lang.Override
@ -51,11 +62,11 @@ private static final long serialVersionUID = 0L;
return status_;
}
public static final int MESSAGE_FIELD_NUMBER = 2;
public static final int MESSAGE_FIELD_NUMBER = 3;
@SuppressWarnings("serial")
private volatile java.lang.Object message_ = "";
/**
* <code>string message = 2;</code>
* <code>string message = 3;</code>
* @return The message.
*/
@java.lang.Override
@ -72,7 +83,7 @@ private static final long serialVersionUID = 0L;
}
}
/**
* <code>string message = 2;</code>
* <code>string message = 3;</code>
* @return The bytes for message.
*/
@java.lang.Override
@ -90,10 +101,10 @@ private static final long serialVersionUID = 0L;
}
}
public static final int DATA_FIELD_NUMBER = 3;
public static final int DATA_FIELD_NUMBER = 4;
private com.google.protobuf.Any data_;
/**
* <code>.google.protobuf.Any data = 3;</code>
* <code>.google.protobuf.Any data = 4;</code>
* @return Whether the data field is set.
*/
@java.lang.Override
@ -101,7 +112,7 @@ private static final long serialVersionUID = 0L;
return ((bitField0_ & 0x00000001) != 0);
}
/**
* <code>.google.protobuf.Any data = 3;</code>
* <code>.google.protobuf.Any data = 4;</code>
* @return The data.
*/
@java.lang.Override
@ -109,7 +120,7 @@ private static final long serialVersionUID = 0L;
return data_ == null ? com.google.protobuf.Any.getDefaultInstance() : data_;
}
/**
* <code>.google.protobuf.Any data = 3;</code>
* <code>.google.protobuf.Any data = 4;</code>
*/
@java.lang.Override
public com.google.protobuf.AnyOrBuilder getDataOrBuilder() {
@ -130,14 +141,17 @@ private static final long serialVersionUID = 0L;
@java.lang.Override
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
if (reqId_ != 0L) {
output.writeInt64(1, reqId_);
}
if (status_ != 0) {
output.writeInt32(1, status_);
output.writeInt32(2, status_);
}
if (!com.google.protobuf.GeneratedMessageV3.isStringEmpty(message_)) {
com.google.protobuf.GeneratedMessageV3.writeString(output, 2, message_);
com.google.protobuf.GeneratedMessageV3.writeString(output, 3, message_);
}
if (((bitField0_ & 0x00000001) != 0)) {
output.writeMessage(3, getData());
output.writeMessage(4, getData());
}
getUnknownFields().writeTo(output);
}
@ -148,16 +162,20 @@ private static final long serialVersionUID = 0L;
if (size != -1) return size;
size = 0;
if (reqId_ != 0L) {
size += com.google.protobuf.CodedOutputStream
.computeInt64Size(1, reqId_);
}
if (status_ != 0) {
size += com.google.protobuf.CodedOutputStream
.computeInt32Size(1, status_);
.computeInt32Size(2, status_);
}
if (!com.google.protobuf.GeneratedMessageV3.isStringEmpty(message_)) {
size += com.google.protobuf.GeneratedMessageV3.computeStringSize(2, message_);
size += com.google.protobuf.GeneratedMessageV3.computeStringSize(3, message_);
}
if (((bitField0_ & 0x00000001) != 0)) {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(3, getData());
.computeMessageSize(4, getData());
}
size += getUnknownFields().getSerializedSize();
memoizedSize = size;
@ -174,6 +192,8 @@ private static final long serialVersionUID = 0L;
}
com.aizuda.snailjob.common.core.grpc.auto.GrpcResult other = (com.aizuda.snailjob.common.core.grpc.auto.GrpcResult) obj;
if (getReqId()
!= other.getReqId()) return false;
if (getStatus()
!= other.getStatus()) return false;
if (!getMessage()
@ -194,6 +214,9 @@ private static final long serialVersionUID = 0L;
}
int hash = 41;
hash = (19 * hash) + getDescriptor().hashCode();
hash = (37 * hash) + REQID_FIELD_NUMBER;
hash = (53 * hash) + com.google.protobuf.Internal.hashLong(
getReqId());
hash = (37 * hash) + STATUS_FIELD_NUMBER;
hash = (53 * hash) + getStatus();
hash = (37 * hash) + MESSAGE_FIELD_NUMBER;
@ -339,6 +362,7 @@ private static final long serialVersionUID = 0L;
public Builder clear() {
super.clear();
bitField0_ = 0;
reqId_ = 0L;
status_ = 0;
message_ = "";
data_ = null;
@ -380,13 +404,16 @@ private static final long serialVersionUID = 0L;
private void buildPartial0(com.aizuda.snailjob.common.core.grpc.auto.GrpcResult result) {
int from_bitField0_ = bitField0_;
if (((from_bitField0_ & 0x00000001) != 0)) {
result.status_ = status_;
result.reqId_ = reqId_;
}
if (((from_bitField0_ & 0x00000002) != 0)) {
result.status_ = status_;
}
if (((from_bitField0_ & 0x00000004) != 0)) {
result.message_ = message_;
}
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000004) != 0)) {
if (((from_bitField0_ & 0x00000008) != 0)) {
result.data_ = dataBuilder_ == null
? data_
: dataBuilder_.build();
@ -439,12 +466,15 @@ private static final long serialVersionUID = 0L;
public Builder mergeFrom(com.aizuda.snailjob.common.core.grpc.auto.GrpcResult other) {
if (other == com.aizuda.snailjob.common.core.grpc.auto.GrpcResult.getDefaultInstance()) return this;
if (other.getReqId() != 0L) {
setReqId(other.getReqId());
}
if (other.getStatus() != 0) {
setStatus(other.getStatus());
}
if (!other.getMessage().isEmpty()) {
message_ = other.message_;
bitField0_ |= 0x00000002;
bitField0_ |= 0x00000004;
onChanged();
}
if (other.hasData()) {
@ -477,22 +507,27 @@ private static final long serialVersionUID = 0L;
done = true;
break;
case 8: {
status_ = input.readInt32();
reqId_ = input.readInt64();
bitField0_ |= 0x00000001;
break;
} // case 8
case 18: {
message_ = input.readStringRequireUtf8();
case 16: {
status_ = input.readInt32();
bitField0_ |= 0x00000002;
break;
} // case 18
} // case 16
case 26: {
input.readMessage(
getDataFieldBuilder().getBuilder(),
extensionRegistry);
message_ = input.readStringRequireUtf8();
bitField0_ |= 0x00000004;
break;
} // case 26
case 34: {
input.readMessage(
getDataFieldBuilder().getBuilder(),
extensionRegistry);
bitField0_ |= 0x00000008;
break;
} // case 34
default: {
if (!super.parseUnknownField(input, extensionRegistry, tag)) {
done = true; // was an endgroup tag
@ -510,9 +545,41 @@ private static final long serialVersionUID = 0L;
}
private int bitField0_;
private long reqId_ ;
/**
* <code>int64 reqId = 1;</code>
* @return The reqId.
*/
@java.lang.Override
public long getReqId() {
return reqId_;
}
/**
* <code>int64 reqId = 1;</code>
* @param value The reqId to set.
* @return This builder for chaining.
*/
public Builder setReqId(long value) {
reqId_ = value;
bitField0_ |= 0x00000001;
onChanged();
return this;
}
/**
* <code>int64 reqId = 1;</code>
* @return This builder for chaining.
*/
public Builder clearReqId() {
bitField0_ = (bitField0_ & ~0x00000001);
reqId_ = 0L;
onChanged();
return this;
}
private int status_ ;
/**
* <code>int32 status = 1;</code>
* <code>int32 status = 2;</code>
* @return The status.
*/
@java.lang.Override
@ -520,23 +587,23 @@ private static final long serialVersionUID = 0L;
return status_;
}
/**
* <code>int32 status = 1;</code>
* <code>int32 status = 2;</code>
* @param value The status to set.
* @return This builder for chaining.
*/
public Builder setStatus(int value) {
status_ = value;
bitField0_ |= 0x00000001;
bitField0_ |= 0x00000002;
onChanged();
return this;
}
/**
* <code>int32 status = 1;</code>
* <code>int32 status = 2;</code>
* @return This builder for chaining.
*/
public Builder clearStatus() {
bitField0_ = (bitField0_ & ~0x00000001);
bitField0_ = (bitField0_ & ~0x00000002);
status_ = 0;
onChanged();
return this;
@ -544,7 +611,7 @@ private static final long serialVersionUID = 0L;
private java.lang.Object message_ = "";
/**
* <code>string message = 2;</code>
* <code>string message = 3;</code>
* @return The message.
*/
public java.lang.String getMessage() {
@ -560,7 +627,7 @@ private static final long serialVersionUID = 0L;
}
}
/**
* <code>string message = 2;</code>
* <code>string message = 3;</code>
* @return The bytes for message.
*/
public com.google.protobuf.ByteString
@ -577,7 +644,7 @@ private static final long serialVersionUID = 0L;
}
}
/**
* <code>string message = 2;</code>
* <code>string message = 3;</code>
* @param value The message to set.
* @return This builder for chaining.
*/
@ -585,22 +652,22 @@ private static final long serialVersionUID = 0L;
java.lang.String value) {
if (value == null) { throw new NullPointerException(); }
message_ = value;
bitField0_ |= 0x00000002;
bitField0_ |= 0x00000004;
onChanged();
return this;
}
/**
* <code>string message = 2;</code>
* <code>string message = 3;</code>
* @return This builder for chaining.
*/
public Builder clearMessage() {
message_ = getDefaultInstance().getMessage();
bitField0_ = (bitField0_ & ~0x00000002);
bitField0_ = (bitField0_ & ~0x00000004);
onChanged();
return this;
}
/**
* <code>string message = 2;</code>
* <code>string message = 3;</code>
* @param value The bytes for message to set.
* @return This builder for chaining.
*/
@ -609,7 +676,7 @@ private static final long serialVersionUID = 0L;
if (value == null) { throw new NullPointerException(); }
checkByteStringIsUtf8(value);
message_ = value;
bitField0_ |= 0x00000002;
bitField0_ |= 0x00000004;
onChanged();
return this;
}
@ -618,14 +685,14 @@ private static final long serialVersionUID = 0L;
private com.google.protobuf.SingleFieldBuilderV3<
com.google.protobuf.Any, com.google.protobuf.Any.Builder, com.google.protobuf.AnyOrBuilder> dataBuilder_;
/**
* <code>.google.protobuf.Any data = 3;</code>
* <code>.google.protobuf.Any data = 4;</code>
* @return Whether the data field is set.
*/
public boolean hasData() {
return ((bitField0_ & 0x00000004) != 0);
return ((bitField0_ & 0x00000008) != 0);
}
/**
* <code>.google.protobuf.Any data = 3;</code>
* <code>.google.protobuf.Any data = 4;</code>
* @return The data.
*/
public com.google.protobuf.Any getData() {
@ -636,7 +703,7 @@ private static final long serialVersionUID = 0L;
}
}
/**
* <code>.google.protobuf.Any data = 3;</code>
* <code>.google.protobuf.Any data = 4;</code>
*/
public Builder setData(com.google.protobuf.Any value) {
if (dataBuilder_ == null) {
@ -647,12 +714,12 @@ private static final long serialVersionUID = 0L;
} else {
dataBuilder_.setMessage(value);
}
bitField0_ |= 0x00000004;
bitField0_ |= 0x00000008;
onChanged();
return this;
}
/**
* <code>.google.protobuf.Any data = 3;</code>
* <code>.google.protobuf.Any data = 4;</code>
*/
public Builder setData(
com.google.protobuf.Any.Builder builderForValue) {
@ -661,16 +728,16 @@ private static final long serialVersionUID = 0L;
} else {
dataBuilder_.setMessage(builderForValue.build());
}
bitField0_ |= 0x00000004;
bitField0_ |= 0x00000008;
onChanged();
return this;
}
/**
* <code>.google.protobuf.Any data = 3;</code>
* <code>.google.protobuf.Any data = 4;</code>
*/
public Builder mergeData(com.google.protobuf.Any value) {
if (dataBuilder_ == null) {
if (((bitField0_ & 0x00000004) != 0) &&
if (((bitField0_ & 0x00000008) != 0) &&
data_ != null &&
data_ != com.google.protobuf.Any.getDefaultInstance()) {
getDataBuilder().mergeFrom(value);
@ -681,16 +748,16 @@ private static final long serialVersionUID = 0L;
dataBuilder_.mergeFrom(value);
}
if (data_ != null) {
bitField0_ |= 0x00000004;
bitField0_ |= 0x00000008;
onChanged();
}
return this;
}
/**
* <code>.google.protobuf.Any data = 3;</code>
* <code>.google.protobuf.Any data = 4;</code>
*/
public Builder clearData() {
bitField0_ = (bitField0_ & ~0x00000004);
bitField0_ = (bitField0_ & ~0x00000008);
data_ = null;
if (dataBuilder_ != null) {
dataBuilder_.dispose();
@ -700,15 +767,15 @@ private static final long serialVersionUID = 0L;
return this;
}
/**
* <code>.google.protobuf.Any data = 3;</code>
* <code>.google.protobuf.Any data = 4;</code>
*/
public com.google.protobuf.Any.Builder getDataBuilder() {
bitField0_ |= 0x00000004;
bitField0_ |= 0x00000008;
onChanged();
return getDataFieldBuilder().getBuilder();
}
/**
* <code>.google.protobuf.Any data = 3;</code>
* <code>.google.protobuf.Any data = 4;</code>
*/
public com.google.protobuf.AnyOrBuilder getDataOrBuilder() {
if (dataBuilder_ != null) {
@ -719,7 +786,7 @@ private static final long serialVersionUID = 0L;
}
}
/**
* <code>.google.protobuf.Any data = 3;</code>
* <code>.google.protobuf.Any data = 4;</code>
*/
private com.google.protobuf.SingleFieldBuilderV3<
com.google.protobuf.Any, com.google.protobuf.Any.Builder, com.google.protobuf.AnyOrBuilder>

View File

@ -8,35 +8,41 @@ public interface GrpcResultOrBuilder extends
com.google.protobuf.MessageOrBuilder {
/**
* <code>int32 status = 1;</code>
* <code>int64 reqId = 1;</code>
* @return The reqId.
*/
long getReqId();
/**
* <code>int32 status = 2;</code>
* @return The status.
*/
int getStatus();
/**
* <code>string message = 2;</code>
* <code>string message = 3;</code>
* @return The message.
*/
java.lang.String getMessage();
/**
* <code>string message = 2;</code>
* <code>string message = 3;</code>
* @return The bytes for message.
*/
com.google.protobuf.ByteString
getMessageBytes();
/**
* <code>.google.protobuf.Any data = 3;</code>
* <code>.google.protobuf.Any data = 4;</code>
* @return Whether the data field is set.
*/
boolean hasData();
/**
* <code>.google.protobuf.Any data = 3;</code>
* <code>.google.protobuf.Any data = 4;</code>
* @return The data.
*/
com.google.protobuf.Any getData();
/**
* <code>.google.protobuf.Any data = 3;</code>
* <code>.google.protobuf.Any data = 4;</code>
*/
com.google.protobuf.AnyOrBuilder getDataOrBuilder();
}

View File

@ -39,6 +39,17 @@ private static final long serialVersionUID = 0L;
}
private int bitField0_;
public static final int REQID_FIELD_NUMBER = 1;
private long reqId_ = 0L;
/**
* <code>int64 reqId = 1;</code>
* @return The reqId.
*/
@java.lang.Override
public long getReqId() {
return reqId_;
}
public static final int METADATA_FIELD_NUMBER = 2;
private com.aizuda.snailjob.common.core.grpc.auto.Metadata metadata_;
/**
@ -105,6 +116,9 @@ private static final long serialVersionUID = 0L;
@java.lang.Override
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
if (reqId_ != 0L) {
output.writeInt64(1, reqId_);
}
if (((bitField0_ & 0x00000001) != 0)) {
output.writeMessage(2, getMetadata());
}
@ -120,6 +134,10 @@ private static final long serialVersionUID = 0L;
if (size != -1) return size;
size = 0;
if (reqId_ != 0L) {
size += com.google.protobuf.CodedOutputStream
.computeInt64Size(1, reqId_);
}
if (((bitField0_ & 0x00000001) != 0)) {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(2, getMetadata());
@ -143,6 +161,8 @@ private static final long serialVersionUID = 0L;
}
com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest other = (com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest) obj;
if (getReqId()
!= other.getReqId()) return false;
if (hasMetadata() != other.hasMetadata()) return false;
if (hasMetadata()) {
if (!getMetadata()
@ -164,6 +184,9 @@ private static final long serialVersionUID = 0L;
}
int hash = 41;
hash = (19 * hash) + getDescriptor().hashCode();
hash = (37 * hash) + REQID_FIELD_NUMBER;
hash = (53 * hash) + com.google.protobuf.Internal.hashLong(
getReqId());
if (hasMetadata()) {
hash = (37 * hash) + METADATA_FIELD_NUMBER;
hash = (53 * hash) + getMetadata().hashCode();
@ -310,6 +333,7 @@ private static final long serialVersionUID = 0L;
public Builder clear() {
super.clear();
bitField0_ = 0;
reqId_ = 0L;
metadata_ = null;
if (metadataBuilder_ != null) {
metadataBuilder_.dispose();
@ -353,14 +377,17 @@ private static final long serialVersionUID = 0L;
private void buildPartial0(com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest result) {
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) != 0)) {
result.reqId_ = reqId_;
}
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000002) != 0)) {
result.metadata_ = metadataBuilder_ == null
? metadata_
: metadataBuilder_.build();
to_bitField0_ |= 0x00000001;
}
if (((from_bitField0_ & 0x00000002) != 0)) {
if (((from_bitField0_ & 0x00000004) != 0)) {
result.body_ = bodyBuilder_ == null
? body_
: bodyBuilder_.build();
@ -413,6 +440,9 @@ private static final long serialVersionUID = 0L;
public Builder mergeFrom(com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest other) {
if (other == com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest.getDefaultInstance()) return this;
if (other.getReqId() != 0L) {
setReqId(other.getReqId());
}
if (other.hasMetadata()) {
mergeMetadata(other.getMetadata());
}
@ -445,18 +475,23 @@ private static final long serialVersionUID = 0L;
case 0:
done = true;
break;
case 8: {
reqId_ = input.readInt64();
bitField0_ |= 0x00000001;
break;
} // case 8
case 18: {
input.readMessage(
getMetadataFieldBuilder().getBuilder(),
extensionRegistry);
bitField0_ |= 0x00000001;
bitField0_ |= 0x00000002;
break;
} // case 18
case 26: {
input.readMessage(
getBodyFieldBuilder().getBuilder(),
extensionRegistry);
bitField0_ |= 0x00000002;
bitField0_ |= 0x00000004;
break;
} // case 26
default: {
@ -476,6 +511,38 @@ private static final long serialVersionUID = 0L;
}
private int bitField0_;
private long reqId_ ;
/**
* <code>int64 reqId = 1;</code>
* @return The reqId.
*/
@java.lang.Override
public long getReqId() {
return reqId_;
}
/**
* <code>int64 reqId = 1;</code>
* @param value The reqId to set.
* @return This builder for chaining.
*/
public Builder setReqId(long value) {
reqId_ = value;
bitField0_ |= 0x00000001;
onChanged();
return this;
}
/**
* <code>int64 reqId = 1;</code>
* @return This builder for chaining.
*/
public Builder clearReqId() {
bitField0_ = (bitField0_ & ~0x00000001);
reqId_ = 0L;
onChanged();
return this;
}
private com.aizuda.snailjob.common.core.grpc.auto.Metadata metadata_;
private com.google.protobuf.SingleFieldBuilderV3<
com.aizuda.snailjob.common.core.grpc.auto.Metadata, com.aizuda.snailjob.common.core.grpc.auto.Metadata.Builder, com.aizuda.snailjob.common.core.grpc.auto.MetadataOrBuilder> metadataBuilder_;
@ -484,7 +551,7 @@ private static final long serialVersionUID = 0L;
* @return Whether the metadata field is set.
*/
public boolean hasMetadata() {
return ((bitField0_ & 0x00000001) != 0);
return ((bitField0_ & 0x00000002) != 0);
}
/**
* <code>.Metadata metadata = 2;</code>
@ -509,7 +576,7 @@ private static final long serialVersionUID = 0L;
} else {
metadataBuilder_.setMessage(value);
}
bitField0_ |= 0x00000001;
bitField0_ |= 0x00000002;
onChanged();
return this;
}
@ -523,7 +590,7 @@ private static final long serialVersionUID = 0L;
} else {
metadataBuilder_.setMessage(builderForValue.build());
}
bitField0_ |= 0x00000001;
bitField0_ |= 0x00000002;
onChanged();
return this;
}
@ -532,7 +599,7 @@ private static final long serialVersionUID = 0L;
*/
public Builder mergeMetadata(com.aizuda.snailjob.common.core.grpc.auto.Metadata value) {
if (metadataBuilder_ == null) {
if (((bitField0_ & 0x00000001) != 0) &&
if (((bitField0_ & 0x00000002) != 0) &&
metadata_ != null &&
metadata_ != com.aizuda.snailjob.common.core.grpc.auto.Metadata.getDefaultInstance()) {
getMetadataBuilder().mergeFrom(value);
@ -543,7 +610,7 @@ private static final long serialVersionUID = 0L;
metadataBuilder_.mergeFrom(value);
}
if (metadata_ != null) {
bitField0_ |= 0x00000001;
bitField0_ |= 0x00000002;
onChanged();
}
return this;
@ -552,7 +619,7 @@ private static final long serialVersionUID = 0L;
* <code>.Metadata metadata = 2;</code>
*/
public Builder clearMetadata() {
bitField0_ = (bitField0_ & ~0x00000001);
bitField0_ = (bitField0_ & ~0x00000002);
metadata_ = null;
if (metadataBuilder_ != null) {
metadataBuilder_.dispose();
@ -565,7 +632,7 @@ private static final long serialVersionUID = 0L;
* <code>.Metadata metadata = 2;</code>
*/
public com.aizuda.snailjob.common.core.grpc.auto.Metadata.Builder getMetadataBuilder() {
bitField0_ |= 0x00000001;
bitField0_ |= 0x00000002;
onChanged();
return getMetadataFieldBuilder().getBuilder();
}
@ -605,7 +672,7 @@ private static final long serialVersionUID = 0L;
* @return Whether the body field is set.
*/
public boolean hasBody() {
return ((bitField0_ & 0x00000002) != 0);
return ((bitField0_ & 0x00000004) != 0);
}
/**
* <code>.google.protobuf.Any body = 3;</code>
@ -630,7 +697,7 @@ private static final long serialVersionUID = 0L;
} else {
bodyBuilder_.setMessage(value);
}
bitField0_ |= 0x00000002;
bitField0_ |= 0x00000004;
onChanged();
return this;
}
@ -644,7 +711,7 @@ private static final long serialVersionUID = 0L;
} else {
bodyBuilder_.setMessage(builderForValue.build());
}
bitField0_ |= 0x00000002;
bitField0_ |= 0x00000004;
onChanged();
return this;
}
@ -653,7 +720,7 @@ private static final long serialVersionUID = 0L;
*/
public Builder mergeBody(com.google.protobuf.Any value) {
if (bodyBuilder_ == null) {
if (((bitField0_ & 0x00000002) != 0) &&
if (((bitField0_ & 0x00000004) != 0) &&
body_ != null &&
body_ != com.google.protobuf.Any.getDefaultInstance()) {
getBodyBuilder().mergeFrom(value);
@ -664,7 +731,7 @@ private static final long serialVersionUID = 0L;
bodyBuilder_.mergeFrom(value);
}
if (body_ != null) {
bitField0_ |= 0x00000002;
bitField0_ |= 0x00000004;
onChanged();
}
return this;
@ -673,7 +740,7 @@ private static final long serialVersionUID = 0L;
* <code>.google.protobuf.Any body = 3;</code>
*/
public Builder clearBody() {
bitField0_ = (bitField0_ & ~0x00000002);
bitField0_ = (bitField0_ & ~0x00000004);
body_ = null;
if (bodyBuilder_ != null) {
bodyBuilder_.dispose();
@ -686,7 +753,7 @@ private static final long serialVersionUID = 0L;
* <code>.google.protobuf.Any body = 3;</code>
*/
public com.google.protobuf.Any.Builder getBodyBuilder() {
bitField0_ |= 0x00000002;
bitField0_ |= 0x00000004;
onChanged();
return getBodyFieldBuilder().getBuilder();
}

View File

@ -7,6 +7,12 @@ public interface GrpcSnailJobRequestOrBuilder extends
// @@protoc_insertion_point(interface_extends:GrpcSnailJobRequest)
com.google.protobuf.MessageOrBuilder {
/**
* <code>int64 reqId = 1;</code>
* @return The reqId.
*/
long getReqId();
/**
* <code>.Metadata metadata = 2;</code>
* @return Whether the metadata field is set.

View File

@ -1,293 +0,0 @@
package com.aizuda.snailjob.common.core.grpc.auto;
import static io.grpc.MethodDescriptor.generateFullMethodName;
/**
*/
@javax.annotation.Generated(
value = "by gRPC proto compiler (version 1.63.0)",
comments = "Source: snail_job_grpc_service.proto")
@io.grpc.stub.annotations.GrpcGenerated
public final class RequestGrpc {
private RequestGrpc() {}
public static final java.lang.String SERVICE_NAME = "Request";
// Static method descriptors that strictly reflect the proto.
private static volatile io.grpc.MethodDescriptor<com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest,
com.aizuda.snailjob.common.core.grpc.auto.GrpcResult> getRequestMethod;
@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "request",
requestType = com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest.class,
responseType = com.aizuda.snailjob.common.core.grpc.auto.GrpcResult.class,
methodType = io.grpc.MethodDescriptor.MethodType.UNARY)
public static io.grpc.MethodDescriptor<com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest,
com.aizuda.snailjob.common.core.grpc.auto.GrpcResult> getRequestMethod() {
io.grpc.MethodDescriptor<com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest, com.aizuda.snailjob.common.core.grpc.auto.GrpcResult> getRequestMethod;
if ((getRequestMethod = RequestGrpc.getRequestMethod) == null) {
synchronized (RequestGrpc.class) {
if ((getRequestMethod = RequestGrpc.getRequestMethod) == null) {
RequestGrpc.getRequestMethod = getRequestMethod =
io.grpc.MethodDescriptor.<com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest, com.aizuda.snailjob.common.core.grpc.auto.GrpcResult>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.UNARY)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "request"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
com.aizuda.snailjob.common.core.grpc.auto.GrpcResult.getDefaultInstance()))
.setSchemaDescriptor(new RequestMethodDescriptorSupplier("request"))
.build();
}
}
}
return getRequestMethod;
}
/**
* Creates a new async stub that supports all call types for the service
*/
public static RequestStub newStub(io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<RequestStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<RequestStub>() {
@java.lang.Override
public RequestStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new RequestStub(channel, callOptions);
}
};
return RequestStub.newStub(factory, channel);
}
/**
* Creates a new blocking-style stub that supports unary and streaming output calls on the service
*/
public static RequestBlockingStub newBlockingStub(
io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<RequestBlockingStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<RequestBlockingStub>() {
@java.lang.Override
public RequestBlockingStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new RequestBlockingStub(channel, callOptions);
}
};
return RequestBlockingStub.newStub(factory, channel);
}
/**
* Creates a new ListenableFuture-style stub that supports unary calls on the service
*/
public static RequestFutureStub newFutureStub(
io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<RequestFutureStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<RequestFutureStub>() {
@java.lang.Override
public RequestFutureStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new RequestFutureStub(channel, callOptions);
}
};
return RequestFutureStub.newStub(factory, channel);
}
/**
*/
public interface AsyncService {
/**
*/
default void request(com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest request,
io.grpc.stub.StreamObserver<com.aizuda.snailjob.common.core.grpc.auto.GrpcResult> responseObserver) {
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getRequestMethod(), responseObserver);
}
}
/**
* Base class for the server implementation of the service Request.
*/
public static abstract class RequestImplBase
implements io.grpc.BindableService, AsyncService {
@java.lang.Override public final io.grpc.ServerServiceDefinition bindService() {
return RequestGrpc.bindService(this);
}
}
/**
* A stub to allow clients to do asynchronous rpc calls to service Request.
*/
public static final class RequestStub
extends io.grpc.stub.AbstractAsyncStub<RequestStub> {
private RequestStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected RequestStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new RequestStub(channel, callOptions);
}
/**
*/
public void request(com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest request,
io.grpc.stub.StreamObserver<com.aizuda.snailjob.common.core.grpc.auto.GrpcResult> responseObserver) {
io.grpc.stub.ClientCalls.asyncUnaryCall(
getChannel().newCall(getRequestMethod(), getCallOptions()), request, responseObserver);
}
}
/**
* A stub to allow clients to do synchronous rpc calls to service Request.
*/
public static final class RequestBlockingStub
extends io.grpc.stub.AbstractBlockingStub<RequestBlockingStub> {
private RequestBlockingStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected RequestBlockingStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new RequestBlockingStub(channel, callOptions);
}
/**
*/
public com.aizuda.snailjob.common.core.grpc.auto.GrpcResult request(com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest request) {
return io.grpc.stub.ClientCalls.blockingUnaryCall(
getChannel(), getRequestMethod(), getCallOptions(), request);
}
}
/**
* A stub to allow clients to do ListenableFuture-style rpc calls to service Request.
*/
public static final class RequestFutureStub
extends io.grpc.stub.AbstractFutureStub<RequestFutureStub> {
private RequestFutureStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected RequestFutureStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new RequestFutureStub(channel, callOptions);
}
/**
*/
public com.google.common.util.concurrent.ListenableFuture<com.aizuda.snailjob.common.core.grpc.auto.GrpcResult> request(
com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest request) {
return io.grpc.stub.ClientCalls.futureUnaryCall(
getChannel().newCall(getRequestMethod(), getCallOptions()), request);
}
}
private static final int METHODID_REQUEST = 0;
private static final class MethodHandlers<Req, Resp> implements
io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
io.grpc.stub.ServerCalls.ServerStreamingMethod<Req, Resp>,
io.grpc.stub.ServerCalls.ClientStreamingMethod<Req, Resp>,
io.grpc.stub.ServerCalls.BidiStreamingMethod<Req, Resp> {
private final AsyncService serviceImpl;
private final int methodId;
MethodHandlers(AsyncService serviceImpl, int methodId) {
this.serviceImpl = serviceImpl;
this.methodId = methodId;
}
@java.lang.Override
@java.lang.SuppressWarnings("unchecked")
public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
case METHODID_REQUEST:
serviceImpl.request((com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest) request,
(io.grpc.stub.StreamObserver<com.aizuda.snailjob.common.core.grpc.auto.GrpcResult>) responseObserver);
break;
default:
throw new AssertionError();
}
}
@java.lang.Override
@java.lang.SuppressWarnings("unchecked")
public io.grpc.stub.StreamObserver<Req> invoke(
io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
default:
throw new AssertionError();
}
}
}
public static final io.grpc.ServerServiceDefinition bindService(AsyncService service) {
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
getRequestMethod(),
io.grpc.stub.ServerCalls.asyncUnaryCall(
new MethodHandlers<
com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest,
com.aizuda.snailjob.common.core.grpc.auto.GrpcResult>(
service, METHODID_REQUEST)))
.build();
}
private static abstract class RequestBaseDescriptorSupplier
implements io.grpc.protobuf.ProtoFileDescriptorSupplier, io.grpc.protobuf.ProtoServiceDescriptorSupplier {
RequestBaseDescriptorSupplier() {}
@java.lang.Override
public com.google.protobuf.Descriptors.FileDescriptor getFileDescriptor() {
return com.aizuda.snailjob.common.core.grpc.auto.SnailJobGrpcService.getDescriptor();
}
@java.lang.Override
public com.google.protobuf.Descriptors.ServiceDescriptor getServiceDescriptor() {
return getFileDescriptor().findServiceByName("Request");
}
}
private static final class RequestFileDescriptorSupplier
extends RequestBaseDescriptorSupplier {
RequestFileDescriptorSupplier() {}
}
private static final class RequestMethodDescriptorSupplier
extends RequestBaseDescriptorSupplier
implements io.grpc.protobuf.ProtoMethodDescriptorSupplier {
private final java.lang.String methodName;
RequestMethodDescriptorSupplier(java.lang.String methodName) {
this.methodName = methodName;
}
@java.lang.Override
public com.google.protobuf.Descriptors.MethodDescriptor getMethodDescriptor() {
return getServiceDescriptor().findMethodByName(methodName);
}
}
private static volatile io.grpc.ServiceDescriptor serviceDescriptor;
public static io.grpc.ServiceDescriptor getServiceDescriptor() {
io.grpc.ServiceDescriptor result = serviceDescriptor;
if (result == null) {
synchronized (RequestGrpc.class) {
result = serviceDescriptor;
if (result == null) {
serviceDescriptor = result = io.grpc.ServiceDescriptor.newBuilder(SERVICE_NAME)
.setSchemaDescriptor(new RequestFileDescriptorSupplier())
.addMethod(getRequestMethod())
.build();
}
}
}
return result;
}
}

View File

@ -48,15 +48,14 @@ public final class SnailJobGrpcService {
"tamp.proto\"\220\001\n\010Metadata\022\014\n\004type\030\003 \001(\t\022\'\n" +
"\007headers\030\007 \003(\0132\026.Metadata.HeadersEntry\022\020" +
"\n\010clientIp\030\010 \001(\t\022\013\n\003uri\030\t \001(\t\032.\n\014Headers" +
"Entry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002 \001(\t:\0028\001\"V\n" +
"\023GrpcSnailJobRequest\022\033\n\010metadata\030\002 \001(\0132\t" +
".Metadata\022\"\n\004body\030\003 \001(\0132\024.google.protobu" +
"f.Any\"Q\n\nGrpcResult\022\016\n\006status\030\001 \001(\005\022\017\n\007m" +
"essage\030\002 \001(\t\022\"\n\004data\030\003 \001(\0132\024.google.prot" +
"obuf.Any29\n\007Request\022.\n\007request\022\024.GrpcSna" +
"ilJobRequest\032\013.GrpcResult\"\000B-\n)com.aizud" +
"a.snailjob.common.core.grpc.autoP\001b\006prot" +
"o3"
"Entry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002 \001(\t:\0028\001\"e\n" +
"\023GrpcSnailJobRequest\022\r\n\005reqId\030\001 \001(\003\022\033\n\010m" +
"etadata\030\002 \001(\0132\t.Metadata\022\"\n\004body\030\003 \001(\0132\024" +
".google.protobuf.Any\"`\n\nGrpcResult\022\r\n\005re" +
"qId\030\001 \001(\003\022\016\n\006status\030\002 \001(\005\022\017\n\007message\030\003 \001" +
"(\t\022\"\n\004data\030\004 \001(\0132\024.google.protobuf.AnyB-" +
"\n)com.aizuda.snailjob.common.core.grpc.a" +
"utoP\001b\006proto3"
};
descriptor = com.google.protobuf.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
@ -81,13 +80,13 @@ public final class SnailJobGrpcService {
internal_static_GrpcSnailJobRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
internal_static_GrpcSnailJobRequest_descriptor,
new java.lang.String[] { "Metadata", "Body", });
new java.lang.String[] { "ReqId", "Metadata", "Body", });
internal_static_GrpcResult_descriptor =
getDescriptor().getMessageTypes().get(2);
internal_static_GrpcResult_fieldAccessorTable = new
com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
internal_static_GrpcResult_descriptor,
new java.lang.String[] { "Status", "Message", "Data", });
new java.lang.String[] { "ReqId", "Status", "Message", "Data", });
com.google.protobuf.AnyProto.getDescriptor();
com.google.protobuf.TimestampProto.getDescriptor();
}

View File

@ -11,19 +11,19 @@ import lombok.experimental.Accessors;
@EqualsAndHashCode(callSuper = true)
@Data
@Accessors(chain = true)
public class NettyResult extends Result<Object> {
public class SnailJobRpcResult extends Result<Object> {
private long reqId;
public NettyResult(int status, String message, Object data, long reqId) {
public SnailJobRpcResult(int status, String message, Object data, long reqId) {
super(status, message, data);
this.reqId = reqId;
}
public NettyResult() {
public SnailJobRpcResult() {
}
public NettyResult(Object data, long reqId) {
public SnailJobRpcResult(Object data, long reqId) {
super(data);
this.reqId = reqId;
}

View File

@ -2,7 +2,7 @@ package com.aizuda.snailjob.common.core.rpc;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.exception.SnailJobRemotingTimeOutException;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.log.SnailJobLog;
import io.netty.util.HashedWheelTimer;
@ -40,7 +40,7 @@ public final class RpcContext {
private static final ConcurrentMap<Long, SnailJobFuture> COMPLETABLE_FUTURE = new ConcurrentHashMap<>();
public static void invoke(Long requestId, NettyResult nettyResult, boolean timeout) {
public static void invoke(Long requestId, SnailJobRpcResult snailJobRpcResult, boolean timeout) {
try {
// 同步请求同步返回
@ -49,7 +49,7 @@ public final class RpcContext {
if (timeout) {
future.completeExceptionally(new SnailJobRemotingTimeOutException("Request to remote interface timed out."));
} else {
future.complete(nettyResult);
future.complete(snailJobRpcResult);
}
});
@ -77,7 +77,7 @@ public final class RpcContext {
@Override
public void run(final Timeout timeout) throws Exception {
invoke(requestId, new NettyResult(StatusEnum.NO.getStatus(), "Request to remote interface timed out.", null, requestId), true);
invoke(requestId, new SnailJobRpcResult(StatusEnum.NO.getStatus(), "Request to remote interface timed out.", null, requestId), true);
}
}

View File

@ -1,20 +1,3 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto3";
import "google/protobuf/any.proto";
@ -24,30 +7,19 @@ option java_multiple_files = true;
option java_package = "com.aizuda.snailjob.common.core.grpc.auto";
message Metadata {
string type = 3;
string uri = 3;
map<string, string> headers = 7;
string clientIp = 8;
string uri = 9;
}
message GrpcSnailJobRequest {
int64 reqId = 1;
Metadata metadata = 2;
google.protobuf.Any body = 3;
}
message GrpcResult {
int32 status = 1;
string message = 2;
google.protobuf.Any data = 3;
int64 reqId = 1;
int32 status = 2;
string message = 3;
google.protobuf.Any data = 4;
}
service Request {
rpc request (GrpcSnailJobRequest) returns (GrpcResult) {
}
}
//service BiRequestStream {
// // Sends a biStreamRequest
// rpc requestBiStream (stream Payload) returns (stream Payload) {
// }
//}

View File

@ -92,19 +92,26 @@
<groupId>com.aizuda</groupId>
<artifactId>snail-job-common-log</artifactId>
</dependency>
<dependency>
<groupId>net.devh</groupId>
<artifactId>grpc-client-spring-boot-starter</artifactId>
</dependency>
<!-- grpc -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.63.0</version>
</dependency>
<dependency>
<groupId>net.devh</groupId>
<artifactId>grpc-server-spring-boot-starter</artifactId>
<version>3.1.0.RELEASE</version>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-util</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>

View File

@ -1,6 +1,7 @@
package com.aizuda.snailjob.server.common;
import cn.hutool.core.net.url.UrlBuilder;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
@ -17,6 +18,6 @@ public interface HttpRequestHandler {
HttpMethod method();
String doHandler(String content, UrlBuilder urlBuilder, HttpHeaders headers);
SnailJobRpcResult doHandler(String content, UrlBuilder urlBuilder, HttpHeaders headers);
}

View File

@ -17,6 +17,7 @@ public class ActorGenerator {
public static final String SCAN_BUCKET_ACTOR = "ScanBucketActor";
public static final String REQUEST_HANDLER_ACTOR = "RequestHandlerActor";
public static final String GRPC_REQUEST_HANDLER_ACTOR = "GrpcRequestHandlerActor";
private static final String COMMON_LOG_DISPATCHER = "akka.actor.common-log-dispatcher";
private static final String COMMON_SCAN_TASK_DISPATCHER = "akka.actor.common-scan-task-dispatcher";
private static final String NETTY_RECEIVE_REQUEST_DISPATCHER = "akka.actor.netty-receive-request-dispatcher";
@ -180,7 +181,7 @@ public class ActorGenerator {
}
/**
* 生成扫描重试数据的actor
* netty请求处理器
*
* @return actor 引用
*/
@ -189,6 +190,16 @@ public class ActorGenerator {
.withDispatcher(NETTY_RECEIVE_REQUEST_DISPATCHER));
}
/**
* Grpc请求处理器
*
* @return actor 引用
*/
public static ActorRef requestGrpcHandlerActor() {
return getNettyActorSystem().actorOf(getSpringExtension().props(GRPC_REQUEST_HANDLER_ACTOR)
.withDispatcher(NETTY_RECEIVE_REQUEST_DISPATCHER));
}
/**
* Job调度准备阶段actor

View File

@ -1,11 +1,18 @@
package com.aizuda.snailjob.server.common.config;
import com.aizuda.snailjob.common.core.alarm.email.SnailJobMailProperties;
import com.aizuda.snailjob.common.core.enums.RpcTypeEnum;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.TimeUnit;
/**
* 系统配置
*
@ -105,10 +112,85 @@ public class SystemProperties {
*/
private int summaryDay = 7;
/**
* rpc类型
*/
private RpcTypeEnum rpcType = RpcTypeEnum.NETTY;
/**
* 邮件配置
*/
@NestedConfigurationProperty
private SnailJobMailProperties mail = new SnailJobMailProperties();
/**
* 客户端Rpc配置
*/
private RpcClientProperties clientRpc = new RpcClientProperties();
/**
* 服务端Rpc配置
*/
private RpcServerProperties serverRpc = new RpcServerProperties();
@Data
public static class RpcServerProperties {
private int maxInboundMessageSize = 10 * 1024 * 1024;
private Duration keepAliveTime = Duration.of(2, ChronoUnit.HOURS);
private Duration keepAliveTimeout = Duration.of(20, ChronoUnit.SECONDS);
private Duration permitKeepAliveTime = Duration.of(5, ChronoUnit.MINUTES);
private ThreadPoolConfig dispatcherTp = new ThreadPoolConfig(16, 16, 1, TimeUnit.SECONDS, 10000);
}
@Data
public static class RpcClientProperties {
private int maxInboundMessageSize = 10 * 1024 * 1024;
private Duration keepAliveTime = Duration.of(2, ChronoUnit.HOURS);
private Duration keepAliveTimeout = Duration.of(20, ChronoUnit.SECONDS);
private Duration permitKeepAliveTime = Duration.of(5, ChronoUnit.MINUTES);
private ThreadPoolConfig clientTp = new ThreadPoolConfig(16, 16, 1, TimeUnit.SECONDS, 10000);
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class ThreadPoolConfig {
/**
* 核心线程池
*/
private int corePoolSize = 16;
/**
* 最大线程数
*/
private int maximumPoolSize = 16;
/**
* 线程存活时间
*/
private long keepAliveTime = 1;
/**
* 线程存活时间(单位)
*/
private TimeUnit timeUnit = TimeUnit.SECONDS;
/**
* 队列容量
*/
private int queueCapacity = 10000;
}
}

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.common.handler;
import cn.hutool.core.net.url.UrlBuilder;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.server.common.HttpRequestHandler;
import io.netty.handler.codec.http.HttpHeaders;
@ -15,11 +16,11 @@ import io.netty.handler.codec.http.HttpHeaders;
public abstract class GetHttpRequestHandler implements HttpRequestHandler {
@Override
public String doHandler(String content, UrlBuilder builder, HttpHeaders headers) {
public SnailJobRpcResult doHandler(String content, UrlBuilder builder, HttpHeaders headers) {
UrlQuery query = builder.getQuery();
return doHandler(content, query, headers);
}
public abstract String doHandler(String content, UrlQuery query, HttpHeaders headers);
public abstract SnailJobRpcResult doHandler(String content, UrlQuery query, HttpHeaders headers);
}

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.common.handler;
import cn.hutool.core.net.url.UrlBuilder;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.server.common.HttpRequestHandler;
import io.netty.handler.codec.http.HttpHeaders;
@ -15,11 +16,11 @@ import io.netty.handler.codec.http.HttpHeaders;
public abstract class PostHttpRequestHandler implements HttpRequestHandler {
@Override
public String doHandler(String content, UrlBuilder builder, HttpHeaders headers) {
public SnailJobRpcResult doHandler(String content, UrlBuilder builder, HttpHeaders headers) {
UrlQuery query = builder.getQuery();
return doHandler(content, query, headers);
}
public abstract String doHandler(String content, UrlQuery query, HttpHeaders headers);
public abstract SnailJobRpcResult doHandler(String content, UrlQuery query, HttpHeaders headers);
}

View File

@ -1,28 +1,33 @@
package com.aizuda.snailjob.server.common.rpc.client;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.grpc.auto.GrpcResult;
import com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest;
import com.aizuda.snailjob.common.core.grpc.auto.Metadata;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.config.SystemProperties.RpcClientProperties;
import com.aizuda.snailjob.server.common.config.SystemProperties.ThreadPoolConfig;
import com.aizuda.snailjob.server.common.triple.Pair;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.Any;
import com.google.protobuf.UnsafeByteOperations;
import io.grpc.DecompressorRegistry;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
import io.grpc.protobuf.ProtoUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import lombok.extern.slf4j.Slf4j;
import java.net.ConnectException;
import java.nio.channels.ClosedChannelException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author opensnail
@ -34,6 +39,7 @@ public class GrpcChannel {
private GrpcChannel() {
}
private static final ThreadPoolExecutor grpcExecutor = createGrpcExecutor();
private static ConcurrentHashMap<Pair<String, String>, ManagedChannel> CHANNEL_MAP = new ConcurrentHashMap<>(16);
public static void setChannel(String hostId, String ip, ManagedChannel channel) {
@ -52,14 +58,17 @@ public class GrpcChannel {
/**
* 发送数据
*
* @param url url地址
* @param body 请求的消息体
* @param url url地址
* @param body 请求的消息体
* @param reqId
* @throws InterruptedException
*/
public static ListenableFuture<GrpcResult> send(String hostId, String hostIp, Integer port, String url, String body, Map<String, String> headersMap) throws InterruptedException {
public static ListenableFuture<GrpcResult> send(String hostId, String hostIp, Integer port, String url, String body, Map<String, String> headersMap,
final long reqId) {
ManagedChannel channel = CHANNEL_MAP.get(Pair.of(hostId, hostIp));
if (Objects.isNull(channel) || !channel.isShutdown() || !channel.isShutdown()) {
removeChannel(channel);
channel = connect(hostId, hostIp, port);
if (Objects.isNull(channel)) {
SnailJobLog.LOCAL.error("send message but channel is null url:[{}] method:[{}] body:[{}] ", url, body);
@ -77,6 +86,7 @@ public class GrpcChannel {
GrpcSnailJobRequest snailJobRequest = GrpcSnailJobRequest
.newBuilder()
.setMetadata(metadata)
.setReqId(reqId)
.setBody(build)
.build();
@ -103,7 +113,13 @@ public class GrpcChannel {
public static ManagedChannel connect(String hostId, String ip, Integer port) {
try {
RpcClientProperties clientRpc = SnailSpringContext.getBean(SystemProperties.class).getClientRpc();
ManagedChannel channel = ManagedChannelBuilder.forAddress(ip, port)
.executor(grpcExecutor)
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
.maxInboundMessageSize(clientRpc.getMaxInboundMessageSize())
.keepAliveTime(clientRpc.getKeepAliveTime().toMillis(), TimeUnit.MILLISECONDS)
.keepAliveTimeout(clientRpc.getKeepAliveTimeout().toMillis(), TimeUnit.MILLISECONDS)
.usePlaintext()
.build();
GrpcChannel.setChannel(hostId, ip, channel);
@ -116,6 +132,18 @@ public class GrpcChannel {
return null;
}
private static ThreadPoolExecutor createGrpcExecutor() {
RpcClientProperties clientRpc = SnailSpringContext.getBean(SystemProperties.class).getClientRpc();
ThreadPoolConfig clientTp = clientRpc.getClientTp();
ThreadPoolExecutor grpcExecutor = new ThreadPoolExecutor(clientTp.getCorePoolSize(),
clientTp.getMaximumPoolSize(), clientTp.getKeepAliveTime(), TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(clientTp.getQueueCapacity()),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("snail-job-grpc-client-executor-%d")
.build());
grpcExecutor.allowCoreThreadTimeOut(true);
return grpcExecutor;
}
/**
* 连接失败处理
*

View File

@ -48,6 +48,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* 请求处理器
@ -59,6 +60,8 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public class GrpcClientInvokeHandler implements InvocationHandler {
public static final AtomicLong REQUEST_ID = new AtomicLong(0);
private final String groupName;
private String hostId;
private String hostIp;
@ -143,19 +146,30 @@ public class GrpcClientInvokeHandler implements InvocationHandler {
// 统一设置Token
requestHeaders.put(SystemConstants.SNAIL_JOB_AUTH_TOKEN, CacheToken.get(groupName, namespaceId));
// SnailJobRequest snailJobRequest = new SnailJobRequest(args);
long reqId = newId();
Result result = retryer.call(() -> {
ListenableFuture<GrpcResult> future = GrpcChannel.send(hostId, hostIp, hostPort,
mapping.path(), JsonUtil.toJsonString(args), requestHeaders);
StopWatch sw = new StopWatch();
sw.start("request start " + reqId);
ListenableFuture<GrpcResult> future;
try {
future = GrpcChannel.send(hostId, hostIp, hostPort,
mapping.path(), JsonUtil.toJsonString(args), requestHeaders, reqId);
} finally {
sw.stop();
}
SnailJobLog.LOCAL.debug("request complete requestId:[{}] 耗时:[{}ms]", reqId);
SnailJobLog.LOCAL.debug("request complete requestId:[{}] 耗时:[{}ms]", 0);
if (async) {
// 暂时不支持异步调用
return null;
} else {
Assert.notNull(future, () -> new SnailJobServerException("completableFuture is null"));
GrpcResult grpcResult = future.get(Optional.ofNullable(executorTimeout).orElse(20), TimeUnit.SECONDS);
GrpcResult grpcResult = future.get(Optional.ofNullable(executorTimeout).orElse(20),
TimeUnit.SECONDS);
ByteBuffer byteBuffer = grpcResult.getData().getValue().asReadOnlyByteBuffer();
Object obj = JsonUtil.parseObject(new ByteBufferBackedInputStream(byteBuffer), Object.class);
return new Result(grpcResult.getStatus(), grpcResult.getMessage(), obj);
@ -260,4 +274,8 @@ public class GrpcClientInvokeHandler implements InvocationHandler {
private Map<String, String> requestHeaders;
private Map<String, Object> paramMap;
}
private static long newId() {
return REQUEST_ID.getAndIncrement();
}
}

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.common.rpc.client;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.rpc.RpcContext;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
@ -32,8 +32,8 @@ public class NettyHttpClientHandler extends SimpleChannelInboundHandler<FullHttp
HttpHeaders headers = response.headers();
SnailJobLog.LOCAL.info("Receive server data content:[{}], headers:[{}]", content, headers);
NettyResult nettyResult = JsonUtil.parseObject(content, NettyResult.class);
RpcContext.invoke(nettyResult.getReqId(), nettyResult, false);
SnailJobRpcResult snailJobRpcResult = JsonUtil.parseObject(content, SnailJobRpcResult.class);
RpcContext.invoke(snailJobRpcResult.getReqId(), snailJobRpcResult, false);
}

View File

@ -1,10 +1,15 @@
package com.aizuda.snailjob.server.common.rpc.client;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.RpcTypeEnum;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.github.rholder.retry.RetryListener;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.Objects;
@ -105,17 +110,22 @@ public class RequestBuilder<T, R> {
throw new SnailJobServerException("class not found exception to: [{}]", clintInterface.getName());
}
// todo 测试先注释
RpcClientInvokeHandler clientInvokeHandler = new RpcClientInvokeHandler(
SystemProperties properties = SnailSpringContext.getBean(SystemProperties.class);
RpcTypeEnum rpcType = properties.getRpcType();
InvocationHandler invocationHandler;
if (Objects.isNull(rpcType) || RpcTypeEnum.NETTY == rpcType) {
invocationHandler = new RpcClientInvokeHandler(
nodeInfo.getGroupName(), nodeInfo, failRetry, retryTimes, retryInterval,
retryListener, routeKey, allocKey, failover, executorTimeout, nodeInfo.getNamespaceId());
GrpcClientInvokeHandler grpcClientInvokeHandler = new GrpcClientInvokeHandler(
nodeInfo.getGroupName(), nodeInfo, failRetry, retryTimes, retryInterval,
retryListener, routeKey, allocKey, failover, executorTimeout, nodeInfo.getNamespaceId());
} else {
invocationHandler = new GrpcClientInvokeHandler(
nodeInfo.getGroupName(), nodeInfo, failRetry, retryTimes, retryInterval,
retryListener, routeKey, allocKey, failover, executorTimeout, nodeInfo.getNamespaceId());
}
return (T) Proxy.newProxyInstance(clintInterface.getClassLoader(),
new Class[]{clintInterface}, grpcClientInvokeHandler);
new Class[]{clintInterface}, invocationHandler);
}
}

View File

@ -9,7 +9,7 @@ import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.grpc.auto.GrpcResult;
import com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest;
import com.aizuda.snailjob.common.core.grpc.auto.Metadata;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
@ -33,8 +33,6 @@ import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.CharsetUtil;
@ -46,10 +44,9 @@ import org.springframework.stereotype.Component;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import static com.aizuda.snailjob.common.core.alarm.AlarmContext.build;
/**
* 处理netty客户端请求
*
@ -57,7 +54,7 @@ import static com.aizuda.snailjob.common.core.alarm.AlarmContext.build;
* @date : 2023-07-24 09:20
* @since 2.1.0
*/
@Component(ActorGenerator.REQUEST_HANDLER_ACTOR)
@Component(ActorGenerator.GRPC_REQUEST_HANDLER_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class GrpcRequestHandlerActor extends AbstractActor {
@ -74,7 +71,7 @@ public class GrpcRequestHandlerActor extends AbstractActor {
}
Map<String, String> headersMap = metadata.getHeadersMap();
String result = "";
SnailJobRpcResult snailJobRpcResult = null;
try {
SnailJobRequest request = new SnailJobRequest();
Any body = grpcSnailJobRequest.getBody();
@ -82,22 +79,26 @@ public class GrpcRequestHandlerActor extends AbstractActor {
ByteBuffer byteBuffer = byteString.asReadOnlyByteBuffer();
Object[] objects = JsonUtil.parseObject(new ByteBufferBackedInputStream(byteBuffer), Object[].class);
request.setArgs(objects);
result = doProcess(uri, JsonUtil.toJsonString(request), headersMap);
snailJobRpcResult = doProcess(uri, JsonUtil.toJsonString(request), headersMap);
if (Objects.isNull(snailJobRpcResult)) {
snailJobRpcResult = new SnailJobRpcResult(StatusEnum.NO.getStatus(), "服务端异常", null,
grpcSnailJobRequest.getReqId());
}
} catch (Exception e) {
SnailJobLog.LOCAL.error("http request error. [{}]",grpcSnailJobRequest, e);
result = JsonUtil.toJsonString(new NettyResult(StatusEnum.NO.getStatus(), e.getMessage(), null, 0));
SnailJobLog.LOCAL.error("http request error. [{}]", grpcSnailJobRequest, e);
snailJobRpcResult = new SnailJobRpcResult(StatusEnum.NO.getStatus(), e.getMessage(), null,
grpcSnailJobRequest.getReqId());
} finally {
NettyResult nettyResult = JsonUtil.parseObject(result, NettyResult.class);
StreamObserver<GrpcResult> streamObserver = grpcRequest.getStreamObserver();
GrpcResult grpcResult = GrpcResult.newBuilder()
.setStatus(nettyResult.getStatus())
.setMessage(Optional.ofNullable(nettyResult.getMessage()).orElse(StrUtil.EMPTY))
.setReqId(snailJobRpcResult.getReqId())
.setStatus(snailJobRpcResult.getStatus())
.setMessage(Optional.ofNullable(snailJobRpcResult.getMessage()).orElse(StrUtil.EMPTY))
.setData(Any.newBuilder()
.setValue(UnsafeByteOperations.unsafeWrap(JsonUtil.toJsonString(nettyResult.getData()).getBytes()))
.build())
.setValue(UnsafeByteOperations.unsafeWrap(
JsonUtil.toJsonString(snailJobRpcResult.getData()).getBytes()))
.build())
.build();
streamObserver.onNext(grpcResult);
streamObserver.onCompleted();
getContext().stop(getSelf());
@ -107,7 +108,7 @@ public class GrpcRequestHandlerActor extends AbstractActor {
}).build();
}
private String doProcess(String uri, String content, Map<String, String> headersMap) {
private SnailJobRpcResult doProcess(String uri, String content, Map<String, String> headersMap) {
Register register = SnailSpringContext.getBean(ClientRegister.BEAN_NAME, Register.class);

View File

@ -0,0 +1,141 @@
package com.aizuda.snailjob.server.common.rpc.server;
import com.aizuda.snailjob.common.core.constant.GrpcServerConstants;
import com.aizuda.snailjob.common.core.enums.RpcTypeEnum;
import com.aizuda.snailjob.common.core.grpc.auto.GrpcResult;
import com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.config.SystemProperties.RpcServerProperties;
import com.aizuda.snailjob.server.common.config.SystemProperties.ThreadPoolConfig;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.ServerServiceDefinition;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.protobuf.ProtoUtils;
import io.grpc.stub.ServerCalls;
import io.grpc.util.MutableHandlerRegistry;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* netty server
*
* @author: opensnail
* @date : 2022-03-07 15:54
* @since 1.0.0
*/
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
@RequiredArgsConstructor
@Getter
public class GrpcServer implements Lifecycle {
private final SystemProperties systemProperties;
private volatile boolean started = false;
private Server server;
@Override
public void start() {
// 防止重复启动
if (started) {
return;
}
if (RpcTypeEnum.GPRC != systemProperties.getRpcType()) {
return;
}
RpcServerProperties grpc = systemProperties.getServerRpc();
final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
addServices(handlerRegistry, new GrpcInterceptor());
NettyServerBuilder builder = NettyServerBuilder.forPort(systemProperties.getNettyPort())
.executor(createGrpcExecutor(grpc.getDispatcherTp()));
Duration keepAliveTime = grpc.getKeepAliveTime();
Duration keepAliveTimeOut = grpc.getKeepAliveTimeout();
Duration permitKeepAliveTime = grpc.getPermitKeepAliveTime();
server = builder.maxInboundMessageSize(grpc.getMaxInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
.compressorRegistry(CompressorRegistry.getDefaultInstance())
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
.keepAliveTime(keepAliveTime.toMillis(), TimeUnit.MILLISECONDS)
.keepAliveTimeout(keepAliveTimeOut.toMillis(), TimeUnit.MILLISECONDS)
.permitKeepAliveTime(permitKeepAliveTime.toMillis(), TimeUnit.MILLISECONDS)
.build();
try {
server.start();
this.started = true;
SnailJobLog.LOCAL.info("------> snail-job remoting server start success, grpc = {}, port = {}",
GrpcServer.class.getName(), systemProperties.getNettyPort());
} catch (IOException e) {
SnailJobLog.LOCAL.error("--------> snail-job remoting server error.", e);
started = false;
throw new SnailJobServerException("snail-job server start error");
}
}
@Override
public void close() {
if (server != null) {
server.shutdownNow();
}
}
private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {
// 创建服务UNARY类型定义
ServerServiceDefinition serviceDefinition = createUnaryServiceDefinition(
GrpcServerConstants.UNARY_SERVICE_NAME, GrpcServerConstants.UNARY_METHOD_NAME,
new UnaryRequestHandler());
handlerRegistry.addService(serviceDefinition);
// unary common call register.
handlerRegistry.addService(ServerInterceptors.intercept(serviceDefinition, serverInterceptor));
}
public static ServerServiceDefinition createUnaryServiceDefinition(
String serviceName,
String methodName,
ServerCalls.UnaryMethod<GrpcSnailJobRequest, GrpcResult> unaryMethod) {
MethodDescriptor<GrpcSnailJobRequest, GrpcResult> methodDescriptor =
MethodDescriptor.<GrpcSnailJobRequest, GrpcResult>newBuilder()
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName(MethodDescriptor.generateFullMethodName(serviceName, methodName))
.setRequestMarshaller(ProtoUtils.marshaller(GrpcSnailJobRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(GrpcResult.getDefaultInstance()))
.build();
return ServerServiceDefinition.builder(serviceName)
.addMethod(methodDescriptor, ServerCalls.asyncUnaryCall(unaryMethod))
.build();
}
private ThreadPoolExecutor createGrpcExecutor(final ThreadPoolConfig threadPool) {
ThreadPoolExecutor grpcExecutor = new ThreadPoolExecutor(threadPool.getCorePoolSize(),
threadPool.getMaximumPoolSize(), threadPool.getKeepAliveTime(), TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(threadPool.getQueueCapacity()),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("snail-job-grpc-server-executor-%d")
.build());
grpcExecutor.allowCoreThreadTimeOut(true);
return grpcExecutor;
}
}

View File

@ -1,46 +0,0 @@
package com.aizuda.snailjob.server.common.rpc.server;
import com.aizuda.snailjob.common.core.grpc.auto.GrpcResult;
import com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest;
import io.grpc.MethodDescriptor;
import io.grpc.ServerBuilder;
import io.grpc.ServerCallHandler;
import io.grpc.ServerServiceDefinition;
import io.grpc.protobuf.ProtoUtils;
import io.grpc.stub.ServerCalls;
import net.devh.boot.grpc.server.serverfactory.GrpcServerConfigurer;
import org.springframework.context.annotation.Configuration;
@Configuration
public class GrpcServerConsumerConfig implements GrpcServerConfigurer {
@Override
public void accept(ServerBuilder<?> serverBuilder) {
// 创建服务定义
ServerServiceDefinition serviceDefinition = createServiceDefinition(
"UnaryRequest", "unaryRequest",
new UnaryRequestHandler());
serverBuilder.addService(serviceDefinition);
serverBuilder.intercept(new GrpcInterceptor());
}
public static ServerServiceDefinition createServiceDefinition(
String serviceName,
String methodName,
ServerCalls.UnaryMethod<GrpcSnailJobRequest, GrpcResult> unaryMethod) {
MethodDescriptor<GrpcSnailJobRequest, GrpcResult> methodDescriptor =
MethodDescriptor.<GrpcSnailJobRequest, GrpcResult>newBuilder()
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName(MethodDescriptor.generateFullMethodName(serviceName, methodName))
.setRequestMarshaller(ProtoUtils.marshaller(GrpcSnailJobRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(GrpcResult.getDefaultInstance()))
.build();
ServerCallHandler<GrpcSnailJobRequest, GrpcResult> callHandler = ServerCalls.asyncUnaryCall(unaryMethod);
return ServerServiceDefinition.builder(serviceName)
.addMethod(methodDescriptor, callHandler)
.build();
}
}

View File

@ -1,5 +1,6 @@
package com.aizuda.snailjob.server.common.rpc.server;
import com.aizuda.snailjob.common.core.enums.RpcTypeEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.common.config.SystemProperties;
@ -63,7 +64,7 @@ public class NettyHttpServer implements Runnable, Lifecycle {
}
});
// 在特定端口绑定并启动服务器 默认是1788
// 在特定端口绑定并启动服务器 默认是17888
ChannelFuture future = bootstrap.bind(systemProperties.getNettyPort()).sync();
SnailJobLog.LOCAL.info("------> snail-job remoting server start success, nettype = {}, port = {}",
@ -87,9 +88,12 @@ public class NettyHttpServer implements Runnable, Lifecycle {
@Override
public void start() {
// thread = new Thread(this);
// thread.setDaemon(true);
// thread.start();
if (RpcTypeEnum.NETTY != systemProperties.getRpcType()) {
return;
}
thread = new Thread(this);
thread.setDaemon(true);
thread.start();
}
@Override

View File

@ -6,7 +6,7 @@ import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.HeadersEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
@ -36,8 +36,8 @@ import java.util.Collection;
* @date : 2023-07-24 09:20
* @since 2.1.0
*/
//@Component(ActorGenerator.REQUEST_HANDLER_ACTOR)
//@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Component(ActorGenerator.REQUEST_HANDLER_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class RequestHandlerActor extends AbstractActor {
@ -58,15 +58,15 @@ public class RequestHandlerActor extends AbstractActor {
final String content = nettyHttpRequest.getContent();
final HttpHeaders headers = nettyHttpRequest.getHeaders();
String result = "";
SnailJobRpcResult result = null;
try {
result = doProcess(uri, content, method, headers);
} catch (Exception e) {
SnailJobLog.LOCAL.error("http request error. [{}]", nettyHttpRequest.getContent(), e);
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
result = JsonUtil.toJsonString(new NettyResult(StatusEnum.NO.getStatus(), e.getMessage(), null, retryRequest.getReqId()));
result = new SnailJobRpcResult(StatusEnum.NO.getStatus(), e.getMessage(), null, retryRequest.getReqId());
} finally {
writeResponse(channelHandlerContext, keepAlive, result);
writeResponse(channelHandlerContext, keepAlive, JsonUtil.toJsonString(result));
getContext().stop(getSelf());
}
@ -74,7 +74,7 @@ public class RequestHandlerActor extends AbstractActor {
}).build();
}
private String doProcess(String uri, String content, HttpMethod method,
private SnailJobRpcResult doProcess(String uri, String content, HttpMethod method,
HttpHeaders headers) {
Register register = SnailSpringContext.getBean(ClientRegister.BEAN_NAME, Register.class);

View File

@ -6,11 +6,8 @@ import com.aizuda.snailjob.common.core.grpc.auto.GrpcSnailJobRequest;
import com.aizuda.snailjob.common.core.grpc.auto.Metadata;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.dto.GrpcRequest;
import com.aizuda.snailjob.server.common.dto.NettyHttpRequest;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.util.CharsetUtil;
/**
* @author: opensnail
@ -20,9 +17,6 @@ public class UnaryRequestHandler implements ServerCalls.UnaryMethod<GrpcSnailJob
@Override
public void invoke(final GrpcSnailJobRequest snailJobRequest, final StreamObserver<GrpcResult> streamObserver) {
// 处理请求并返回响应
// GrpcResult result = GrpcResult.newBuilder().setMessage("调度成功").setStatus(1).build();
Metadata metadata = snailJobRequest.getMetadata();
GrpcRequest grpcRequest = GrpcRequest.builder()
.uri(metadata.getUri())
@ -30,11 +24,7 @@ public class UnaryRequestHandler implements ServerCalls.UnaryMethod<GrpcSnailJob
.streamObserver(streamObserver)
.build();
ActorRef actorRef = ActorGenerator.requestHandlerActor();
ActorRef actorRef = ActorGenerator.requestGrpcHandlerActor();
actorRef.tell(grpcRequest, actorRef);
// // 发送响应
// streamObserver.onNext(result);
// streamObserver.onCompleted();
}
}

View File

@ -2,7 +2,7 @@ package com.aizuda.snailjob.server.common.rpc.server.handler;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
@ -34,9 +34,9 @@ public class BeatHttpRequestHandler extends GetHttpRequestHandler {
}
@Override
public String doHandler(String content, UrlQuery query, HttpHeaders headers) {
public SnailJobRpcResult doHandler(String content, UrlQuery query, HttpHeaders headers) {
SnailJobLog.LOCAL.debug("Beat check content:[{}]", content);
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
return JsonUtil.toJsonString(new NettyResult(PONG, retryRequest.getReqId()));
return new SnailJobRpcResult(PONG, retryRequest.getReqId());
}
}

View File

@ -3,7 +3,7 @@ package com.aizuda.snailjob.server.common.rpc.server.handler;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.snailjob.common.core.enums.HeadersEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.server.common.handler.GetHttpRequestHandler;
@ -35,11 +35,11 @@ public class ConfigHttpRequestHandler extends GetHttpRequestHandler {
}
@Override
public String doHandler(String content, UrlQuery urlQuery, HttpHeaders headers) {
public SnailJobRpcResult doHandler(String content, UrlQuery urlQuery, HttpHeaders headers) {
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
String groupName = headers.get(HeadersEnum.GROUP_NAME.getKey());
String namespace = headers.get(HeadersEnum.NAMESPACE.getKey());
ConfigDTO configDTO = accessTemplate.getGroupConfigAccess().getConfigInfo(groupName, namespace);
return JsonUtil.toJsonString(new NettyResult(JsonUtil.toJsonString(configDTO), retryRequest.getReqId()));
return new SnailJobRpcResult(JsonUtil.toJsonString(configDTO), retryRequest.getReqId());
}
}

View File

@ -7,7 +7,7 @@ import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.enums.HeadersEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
@ -50,7 +50,7 @@ public class ReportLogHttpRequestHandler extends PostHttpRequestHandler {
}
@Override
public String doHandler(String content, UrlQuery urlQuery, HttpHeaders headers) {
public SnailJobRpcResult doHandler(String content, UrlQuery urlQuery, HttpHeaders headers) {
SnailJobLog.LOCAL.debug("Begin Handler Log Report Data. [{}]", content);
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
@ -84,17 +84,7 @@ public class ReportLogHttpRequestHandler extends PostHttpRequestHandler {
actorRef.tell(retryTasks, actorRef);
}
return JsonUtil.toJsonString(new NettyResult(StatusEnum.YES.getStatus(), "Batch Log Retry Data Upload Processed Successfully", Boolean.TRUE, retryRequest.getReqId()));
return new SnailJobRpcResult(StatusEnum.YES.getStatus(), "Batch Log Retry Data Upload Processed Successfully", Boolean.TRUE, retryRequest.getReqId());
}
private String getClientInfo(final HttpHeaders headers) {
String hostId = headers.get(HeadersEnum.HOST_ID.getKey());
String hostIp = headers.get(HeadersEnum.HOST_IP.getKey());
Integer hostPort = headers.getInt(HeadersEnum.HOST_PORT.getKey());
RegisterNodeInfo registerNodeInfo = new RegisterNodeInfo();
registerNodeInfo.setHostIp(hostIp);
registerNodeInfo.setHostPort(hostPort);
registerNodeInfo.setHostId(hostId);
return ClientInfoUtils.generate(registerNodeInfo);
}
}

View File

@ -6,7 +6,7 @@ import com.aizuda.snailjob.client.model.request.MapTaskRequest;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
@ -32,7 +32,6 @@ import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -45,7 +44,7 @@ import java.util.Objects;
*/
@Component
@RequiredArgsConstructor
public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
private final JobMapper jobMapper;
@ -60,7 +59,7 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
}
@Override
public String doHandler(final String content, final UrlQuery query, final HttpHeaders headers) {
public SnailJobRpcResult doHandler(final String content, final UrlQuery query, final HttpHeaders headers) {
SnailJobLog.LOCAL.info("map task Request. content:[{}]", content);
String groupName = HttpHeaderUtil.getGroupName(headers);
String namespace = HttpHeaderUtil.getNamespace(headers);
@ -92,9 +91,8 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
context.setWfContext(mapTaskRequest.getWfContext());
List<JobTask> taskList = taskInstance.generate(context);
if (CollUtil.isEmpty(taskList)) {
return JsonUtil.toJsonString(
new NettyResult(StatusEnum.NO.getStatus(), "Job task is empty", Boolean.FALSE,
retryRequest.getReqId()));
return new SnailJobRpcResult(StatusEnum.NO.getStatus(), "Job task is empty", Boolean.FALSE,
retryRequest.getReqId());
}
String newWfContext = null;
@ -111,9 +109,8 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(JobTaskTypeEnum.MAP_REDUCE.getType());
jobExecutor.execute(buildJobExecutorContext(mapTaskRequest, job, taskList, newWfContext));
return JsonUtil.toJsonString(
new NettyResult(StatusEnum.YES.getStatus(), "Report Map Task Processed Successfully", Boolean.TRUE,
retryRequest.getReqId()));
return new SnailJobRpcResult(StatusEnum.YES.getStatus(), "Report Map Task Processed Successfully", Boolean.TRUE,
retryRequest.getReqId());
}
private static JobExecutorContext buildJobExecutorContext(MapTaskRequest mapTaskRequest, Job job,

View File

@ -4,7 +4,7 @@ import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.snailjob.client.model.request.DispatchJobResultRequest;
import com.aizuda.snailjob.common.core.enums.HeadersEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
@ -15,7 +15,6 @@ import com.aizuda.snailjob.server.job.task.support.callback.ClientCallbackContex
import com.aizuda.snailjob.server.job.task.support.callback.ClientCallbackFactory;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.REPORT_JOB_DISPATCH_RESULT;
@ -39,7 +38,7 @@ public class ReportDispatchResultPostHttpRequestHandler extends PostHttpRequestH
}
@Override
public String doHandler(String content, UrlQuery query, HttpHeaders headers) {
public SnailJobRpcResult doHandler(String content, UrlQuery query, HttpHeaders headers) {
SnailJobLog.LOCAL.debug("Client Callback Request. content:[{}]", content);
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
@ -53,6 +52,6 @@ public class ReportDispatchResultPostHttpRequestHandler extends PostHttpRequestH
context.setNamespaceId(headers.getAsString(HeadersEnum.NAMESPACE.getKey()));
clientCallback.callback(context);
return JsonUtil.toJsonString(new NettyResult(StatusEnum.YES.getStatus(), "Report Dispatch Result Processed Successfully", Boolean.TRUE, retryRequest.getReqId()));
return new SnailJobRpcResult(StatusEnum.YES.getStatus(), "Report Dispatch Result Processed Successfully", Boolean.TRUE, retryRequest.getReqId());
}
}

View File

@ -4,7 +4,7 @@ import cn.hutool.core.lang.Assert;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.snailjob.common.core.enums.HeadersEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
@ -61,7 +61,7 @@ public class ReportRetryInfoHttpRequestHandler extends PostHttpRequestHandler {
@Override
@Transactional
public String doHandler(String content, UrlQuery urlQuery, HttpHeaders headers) {
public SnailJobRpcResult doHandler(String content, UrlQuery urlQuery, HttpHeaders headers) {
SnailJobLog.LOCAL.debug("Batch Report Retry Data. content:[{}]", content);
SnailJobRequest retryRequest = JsonUtil.parseObject(content, SnailJobRequest.class);
@ -121,7 +121,7 @@ public class ReportRetryInfoHttpRequestHandler extends PostHttpRequestHandler {
return null;
});
return JsonUtil.toJsonString(new NettyResult(StatusEnum.YES.getStatus(), "Batch Retry Data Upload Processed Successfully", Boolean.TRUE, retryRequest.getReqId()));
return new SnailJobRpcResult(StatusEnum.YES.getStatus(), "Batch Retry Data Upload Processed Successfully", Boolean.TRUE, retryRequest.getReqId());
} catch (Exception e) {
Throwable throwable = e;
@ -131,7 +131,7 @@ public class ReportRetryInfoHttpRequestHandler extends PostHttpRequestHandler {
}
SnailJobLog.LOCAL.error("Batch Report Retry Data Error. <|>{}<|>", args[0], throwable);
return JsonUtil.toJsonString(new NettyResult(StatusEnum.YES.getStatus(), throwable.getMessage(), Boolean.FALSE, retryRequest.getReqId()));
return new SnailJobRpcResult(StatusEnum.YES.getStatus(), throwable.getMessage(), Boolean.FALSE, retryRequest.getReqId());
}
}

View File

@ -75,14 +75,11 @@ logging:
# level:
# ## 方便调试 SQL
# com.aizuda.snailjob.template.datasource.persistence.mapper: debug
grpc:
server:
port: 1788
snail-job:
retry-pull-page-size: 1000 # 拉取重试数据的每批次的大小
job-pull-page-size: 1000 # 拉取重试数据的每批次的大小
netty-port: 1788 # 服务端netty端口
netty-port: 17888 # 服务端netty端口
limiter: 1000 # 一个客户端每秒最多接收的重试数量指令
step: 100 # 号段模式下步长配置
log-storage: 45 # 日志保存时间(单位: day)
@ -90,3 +87,4 @@ snail-job:
max-count: 288 #回调最大执行次数
trigger-interval: 900 #间隔时间
retry-max-pull-count: 10
rpc-type: gprc