feat: 2.1.0
1. netty服务端接收请求改为actor模型
This commit is contained in:
parent
aede45eda9
commit
9f10a0d500
@ -25,10 +25,6 @@ import java.util.concurrent.TimeUnit;
|
|||||||
public class AsyncReport extends AbstractReport implements Lifecycle {
|
public class AsyncReport extends AbstractReport implements Lifecycle {
|
||||||
private static SlidingWindow<RetryTaskDTO> slidingWindow;
|
private static SlidingWindow<RetryTaskDTO> slidingWindow;
|
||||||
|
|
||||||
private static ScheduledExecutorService dispatchService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "DispatchService"));
|
|
||||||
|
|
||||||
// public static RetryLeapArray slidingWindow = new RetryLeapArray(SAMPLE_COUNT, INTERVAL_IN_MS, new ReportListener());
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean supports(boolean async) {
|
public boolean supports(boolean async) {
|
||||||
return async;
|
return async;
|
||||||
@ -57,14 +53,10 @@ public class AsyncReport extends AbstractReport implements Lifecycle {
|
|||||||
slidingWindow = SlidingWindow
|
slidingWindow = SlidingWindow
|
||||||
.Builder
|
.Builder
|
||||||
.<RetryTaskDTO>newBuilder()
|
.<RetryTaskDTO>newBuilder()
|
||||||
.withTotalThreshold(50)
|
.withTotalThreshold(10)
|
||||||
.withDuration(5, ChronoUnit.SECONDS)
|
.withDuration(5, ChronoUnit.SECONDS)
|
||||||
.withListener(new ReportListener())
|
.withListener(new ReportListener())
|
||||||
.build();
|
.build();
|
||||||
slidingWindow.start();
|
|
||||||
// dispatchService.scheduleAtFixedRate(() -> {
|
|
||||||
// slidingWindow.currentWindow();
|
|
||||||
// }, INTERVAL_IN_MS, INTERVAL_IN_MS / SAMPLE_COUNT, TimeUnit.MILLISECONDS);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -21,9 +21,9 @@ import java.util.concurrent.locks.ReentrantLock;
|
|||||||
*
|
*
|
||||||
* @author: www.byteblogs.com
|
* @author: www.byteblogs.com
|
||||||
* @date : 2023-07-23 13:38
|
* @date : 2023-07-23 13:38
|
||||||
|
* @since 2.1.0
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@SuppressWarnings({"squid:S1319"})
|
|
||||||
public class SlidingWindow<T> {
|
public class SlidingWindow<T> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -3,6 +3,7 @@ package com.aizuda.easy.retry.server.akka;
|
|||||||
import akka.actor.ActorRef;
|
import akka.actor.ActorRef;
|
||||||
import akka.actor.ActorSystem;
|
import akka.actor.ActorSystem;
|
||||||
import com.aizuda.easy.retry.common.core.context.SpringContext;
|
import com.aizuda.easy.retry.common.core.context.SpringContext;
|
||||||
|
import com.aizuda.easy.retry.server.server.RequestHandlerActor;
|
||||||
import com.aizuda.easy.retry.server.support.dispatch.actor.exec.ExecCallbackUnitActor;
|
import com.aizuda.easy.retry.server.support.dispatch.actor.exec.ExecCallbackUnitActor;
|
||||||
import com.aizuda.easy.retry.server.support.dispatch.actor.exec.ExecUnitActor;
|
import com.aizuda.easy.retry.server.support.dispatch.actor.exec.ExecUnitActor;
|
||||||
import com.aizuda.easy.retry.server.support.dispatch.actor.log.LogActor;
|
import com.aizuda.easy.retry.server.support.dispatch.actor.log.LogActor;
|
||||||
@ -91,9 +92,17 @@ public class ActorGenerator {
|
|||||||
* @return actor 引用
|
* @return actor 引用
|
||||||
*/
|
*/
|
||||||
public static ActorRef logActor() {
|
public static ActorRef logActor() {
|
||||||
return getLogActorSystemSystem().actorOf(getSpringExtension().props(LogActor.BEAN_NAME));
|
return getNettyActorSystem().actorOf(getSpringExtension().props(LogActor.BEAN_NAME));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 生成扫描重试数据的actor
|
||||||
|
*
|
||||||
|
* @return actor 引用
|
||||||
|
*/
|
||||||
|
public static ActorRef requestHandlerActor() {
|
||||||
|
return getLogActorSystemSystem().actorOf(getSpringExtension().props(RequestHandlerActor.BEAN_NAME));
|
||||||
|
}
|
||||||
|
|
||||||
public static SpringExtension getSpringExtension() {
|
public static SpringExtension getSpringExtension() {
|
||||||
return SpringContext.getBeanByType(SpringExtension.class);
|
return SpringContext.getBeanByType(SpringExtension.class);
|
||||||
@ -131,4 +140,14 @@ public class ActorGenerator {
|
|||||||
public static ActorSystem getLogActorSystemSystem() {
|
public static ActorSystem getLogActorSystemSystem() {
|
||||||
return SpringContext.getBean("logActorSystem", ActorSystem.class);
|
return SpringContext.getBean("logActorSystem", ActorSystem.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理netty客户端请求
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public static ActorSystem getNettyActorSystem() {
|
||||||
|
return SpringContext.getBean("nettyActorSystem", ActorSystem.class);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -19,6 +19,7 @@ public class AkkaConfiguration {
|
|||||||
private static final String DISPATCH_EXEC_UNIT_RETRY_ACTOR_SYSTEM = "DISPATCH_EXEC_UNIT_RETRY_ACTOR_SYSTEM";
|
private static final String DISPATCH_EXEC_UNIT_RETRY_ACTOR_SYSTEM = "DISPATCH_EXEC_UNIT_RETRY_ACTOR_SYSTEM";
|
||||||
private static final String DISPATCH_RESULT_ACTOR_SYSTEM = "DISPATCH_RESULT_ACTOR_SYSTEM";
|
private static final String DISPATCH_RESULT_ACTOR_SYSTEM = "DISPATCH_RESULT_ACTOR_SYSTEM";
|
||||||
private static final String LOG_ACTOR_SYSTEM = "LOG_ACTOR_SYSTEM";
|
private static final String LOG_ACTOR_SYSTEM = "LOG_ACTOR_SYSTEM";
|
||||||
|
private static final String NETTY_ACTOR_SYSTEM = "nettyActorSystem";
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private ApplicationContext applicationContext;
|
private ApplicationContext applicationContext;
|
||||||
@ -73,4 +74,16 @@ public class AkkaConfiguration {
|
|||||||
springExtension.initialize(applicationContext);
|
springExtension.initialize(applicationContext);
|
||||||
return system;
|
return system;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理netty客户端请求
|
||||||
|
*
|
||||||
|
* @return {@link ActorSystem} 顶级actor
|
||||||
|
*/
|
||||||
|
@Bean("nettyActorSystem")
|
||||||
|
public ActorSystem nettyActorSystem() {
|
||||||
|
ActorSystem system = ActorSystem.create(NETTY_ACTOR_SYSTEM);
|
||||||
|
springExtension.initialize(applicationContext);
|
||||||
|
return system;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,32 @@
|
|||||||
|
package com.aizuda.easy.retry.server.dto;
|
||||||
|
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.handler.codec.http.FullHttpRequest;
|
||||||
|
import io.netty.handler.codec.http.HttpHeaders;
|
||||||
|
import io.netty.handler.codec.http.HttpMethod;
|
||||||
|
import lombok.Builder;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* netty客户端请求模型
|
||||||
|
*
|
||||||
|
* @author: www.byteblogs.com
|
||||||
|
* @date : 2023-07-24 09:32
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@Builder
|
||||||
|
public class NettyHttpRequest {
|
||||||
|
|
||||||
|
private ChannelHandlerContext channelHandlerContext;
|
||||||
|
|
||||||
|
private String content;
|
||||||
|
|
||||||
|
private boolean keepAlive;
|
||||||
|
|
||||||
|
private HttpMethod method;
|
||||||
|
|
||||||
|
private String uri;
|
||||||
|
|
||||||
|
private HttpHeaders headers;
|
||||||
|
|
||||||
|
}
|
@ -1,35 +1,15 @@
|
|||||||
package com.aizuda.easy.retry.server.server;
|
package com.aizuda.easy.retry.server.server;
|
||||||
|
|
||||||
import cn.hutool.core.net.url.UrlBuilder;
|
import akka.actor.ActorRef;
|
||||||
import com.aizuda.easy.retry.common.core.context.SpringContext;
|
import com.aizuda.easy.retry.server.akka.ActorGenerator;
|
||||||
import com.aizuda.easy.retry.common.core.enums.HeadersEnum;
|
import com.aizuda.easy.retry.server.dto.NettyHttpRequest;
|
||||||
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
|
||||||
import com.aizuda.easy.retry.common.core.model.Result;
|
|
||||||
import com.aizuda.easy.retry.common.core.util.JsonUtil;
|
|
||||||
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
|
|
||||||
import com.aizuda.easy.retry.server.server.handler.HttpRequestHandler;
|
|
||||||
import com.aizuda.easy.retry.server.support.Register;
|
|
||||||
import com.aizuda.easy.retry.server.support.handler.ConfigVersionSyncHandler;
|
|
||||||
import com.aizuda.easy.retry.server.support.register.ClientRegister;
|
|
||||||
import com.aizuda.easy.retry.server.support.register.RegisterContext;
|
|
||||||
import io.netty.buffer.Unpooled;
|
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.SimpleChannelInboundHandler;
|
import io.netty.channel.SimpleChannelInboundHandler;
|
||||||
import io.netty.handler.codec.http.DefaultFullHttpResponse;
|
|
||||||
import io.netty.handler.codec.http.FullHttpRequest;
|
import io.netty.handler.codec.http.FullHttpRequest;
|
||||||
import io.netty.handler.codec.http.FullHttpResponse;
|
|
||||||
import io.netty.handler.codec.http.HttpHeaderNames;
|
|
||||||
import io.netty.handler.codec.http.HttpHeaderValues;
|
|
||||||
import io.netty.handler.codec.http.HttpHeaders;
|
|
||||||
import io.netty.handler.codec.http.HttpMethod;
|
|
||||||
import io.netty.handler.codec.http.HttpResponseStatus;
|
|
||||||
import io.netty.handler.codec.http.HttpUtil;
|
import io.netty.handler.codec.http.HttpUtil;
|
||||||
import io.netty.handler.codec.http.HttpVersion;
|
|
||||||
import io.netty.util.CharsetUtil;
|
import io.netty.util.CharsetUtil;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang.StringUtils;
|
|
||||||
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -39,88 +19,26 @@ import java.util.concurrent.ThreadPoolExecutor;
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
public class NettyHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
|
public class NettyHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
|
||||||
|
|
||||||
private ThreadPoolExecutor threadPoolExecutor;
|
|
||||||
|
|
||||||
public NettyHttpServerHandler(final ThreadPoolExecutor serverHandlerPool) {
|
public NettyHttpServerHandler(final ThreadPoolExecutor serverHandlerPool) {
|
||||||
this.threadPoolExecutor = serverHandlerPool;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest)
|
protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
final String content = fullHttpRequest.content().toString(CharsetUtil.UTF_8);
|
|
||||||
final boolean keepAlive = HttpUtil.isKeepAlive(fullHttpRequest);
|
|
||||||
final HttpMethod method = fullHttpRequest.method();
|
|
||||||
final String uri = fullHttpRequest.uri();
|
|
||||||
|
|
||||||
threadPoolExecutor.execute(() -> {
|
NettyHttpRequest nettyHttpRequest = NettyHttpRequest.builder()
|
||||||
|
.keepAlive(HttpUtil.isKeepAlive(fullHttpRequest))
|
||||||
|
.uri(fullHttpRequest.uri())
|
||||||
|
.channelHandlerContext(channelHandlerContext)
|
||||||
|
.method(fullHttpRequest.method())
|
||||||
|
.headers(fullHttpRequest.headers())
|
||||||
|
.content(fullHttpRequest.content().toString(CharsetUtil.UTF_8))
|
||||||
|
.build();
|
||||||
|
|
||||||
String result = doProcess(channelHandlerContext, uri, content, method, fullHttpRequest.headers());
|
ActorRef actorRef = ActorGenerator.requestHandlerActor();
|
||||||
|
actorRef.tell(nettyHttpRequest, actorRef);
|
||||||
writeResponse(channelHandlerContext, keepAlive, result);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private String doProcess(ChannelHandlerContext channelHandlerContext, String uri, String content, HttpMethod method,
|
|
||||||
HttpHeaders headers) {
|
|
||||||
|
|
||||||
if (StringUtils.isBlank(uri)) {
|
|
||||||
throw new EasyRetryServerException("uri 不能为空");
|
|
||||||
}
|
|
||||||
|
|
||||||
Register register = SpringContext.getBean(ClientRegister.BEAN_NAME, Register.class);
|
|
||||||
|
|
||||||
String hostId = headers.get(HeadersEnum.HOST_ID.getKey());
|
|
||||||
String hostIp = headers.get(HeadersEnum.HOST_IP.getKey());
|
|
||||||
Integer hostPort = headers.getInt(HeadersEnum.HOST_PORT.getKey());
|
|
||||||
String groupName = headers.get(HeadersEnum.GROUP_NAME.getKey());
|
|
||||||
String contextPath = headers.get(HeadersEnum.CONTEXT_PATH.getKey());
|
|
||||||
|
|
||||||
// 注册版本
|
|
||||||
RegisterContext registerContext = new RegisterContext();
|
|
||||||
registerContext.setContextPath(contextPath);
|
|
||||||
registerContext.setGroupName(groupName);
|
|
||||||
registerContext.setHostPort(hostPort);
|
|
||||||
registerContext.setHostIp(hostIp);
|
|
||||||
registerContext.setHostId(hostId);
|
|
||||||
boolean result = register.register(registerContext);
|
|
||||||
if (!result) {
|
|
||||||
LogUtils.warn(log, "client register error. groupName:[{}]", groupName);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 同步版本
|
|
||||||
ConfigVersionSyncHandler syncHandler = SpringContext.getBeanByType(ConfigVersionSyncHandler.class);
|
|
||||||
Integer clientVersion = headers.getInt(HeadersEnum.VERSION.getKey());
|
|
||||||
syncHandler.addSyncTask(groupName, clientVersion);
|
|
||||||
|
|
||||||
UrlBuilder builder = UrlBuilder.ofHttp(uri);
|
|
||||||
Collection<HttpRequestHandler> httpRequestHandlers = SpringContext.CONTEXT
|
|
||||||
.getBeansOfType(HttpRequestHandler.class).values();
|
|
||||||
for (HttpRequestHandler httpRequestHandler : httpRequestHandlers) {
|
|
||||||
if (httpRequestHandler.supports(builder.getPathStr()) && method.name()
|
|
||||||
.equals(httpRequestHandler.method().name())) {
|
|
||||||
return httpRequestHandler.doHandler(content, builder, headers);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return JsonUtil.toJsonString(new Result<>());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* write response
|
|
||||||
*/
|
|
||||||
private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {
|
|
||||||
// write response
|
|
||||||
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
|
|
||||||
Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8)); // Unpooled.wrappedBuffer(responseJson)
|
|
||||||
response.headers().set(HttpHeaderNames.CONTENT_TYPE,
|
|
||||||
HttpHeaderValues.APPLICATION_JSON); // HttpHeaderValues.TEXT_PLAIN.toString()
|
|
||||||
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
|
|
||||||
if (keepAlive) {
|
|
||||||
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
|
|
||||||
}
|
|
||||||
ctx.writeAndFlush(response);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,162 @@
|
|||||||
|
package com.aizuda.easy.retry.server.server;
|
||||||
|
|
||||||
|
import akka.actor.AbstractActor;
|
||||||
|
import akka.actor.OneForOneStrategy;
|
||||||
|
import akka.actor.SupervisorStrategy;
|
||||||
|
import cn.hutool.core.net.url.UrlBuilder;
|
||||||
|
import com.aizuda.easy.retry.common.core.context.SpringContext;
|
||||||
|
import com.aizuda.easy.retry.common.core.enums.HeadersEnum;
|
||||||
|
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||||
|
import com.aizuda.easy.retry.common.core.model.EasyRetryRequest;
|
||||||
|
import com.aizuda.easy.retry.common.core.model.Result;
|
||||||
|
import com.aizuda.easy.retry.common.core.util.JsonUtil;
|
||||||
|
import com.aizuda.easy.retry.server.dto.NettyHttpRequest;
|
||||||
|
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
|
||||||
|
import com.aizuda.easy.retry.server.server.handler.HttpRequestHandler;
|
||||||
|
import com.aizuda.easy.retry.server.support.Register;
|
||||||
|
import com.aizuda.easy.retry.server.support.dispatch.actor.log.LogActor;
|
||||||
|
import com.aizuda.easy.retry.server.support.handler.ConfigVersionSyncHandler;
|
||||||
|
import com.aizuda.easy.retry.server.support.register.ClientRegister;
|
||||||
|
import com.aizuda.easy.retry.server.support.register.RegisterContext;
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.handler.codec.http.DefaultFullHttpResponse;
|
||||||
|
import io.netty.handler.codec.http.FullHttpRequest;
|
||||||
|
import io.netty.handler.codec.http.FullHttpResponse;
|
||||||
|
import io.netty.handler.codec.http.HttpHeaderNames;
|
||||||
|
import io.netty.handler.codec.http.HttpHeaderValues;
|
||||||
|
import io.netty.handler.codec.http.HttpHeaders;
|
||||||
|
import io.netty.handler.codec.http.HttpMethod;
|
||||||
|
import io.netty.handler.codec.http.HttpResponseStatus;
|
||||||
|
import io.netty.handler.codec.http.HttpUtil;
|
||||||
|
import io.netty.handler.codec.http.HttpVersion;
|
||||||
|
import io.netty.util.CharsetUtil;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||||
|
import org.springframework.context.annotation.Scope;
|
||||||
|
import org.springframework.dao.ConcurrencyFailureException;
|
||||||
|
import org.springframework.dao.DuplicateKeyException;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.transaction.TransactionSystemException;
|
||||||
|
import scala.concurrent.duration.Duration;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理netty客户端请求
|
||||||
|
*
|
||||||
|
* @author: www.byteblogs.com
|
||||||
|
* @date : 2023-07-24 09:20
|
||||||
|
* @since 2.1.0
|
||||||
|
*/
|
||||||
|
@Component(RequestHandlerActor.BEAN_NAME)
|
||||||
|
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||||||
|
@Slf4j
|
||||||
|
public class RequestHandlerActor extends AbstractActor {
|
||||||
|
|
||||||
|
public static final String BEAN_NAME = "requestHandlerActor";
|
||||||
|
|
||||||
|
private static final SupervisorStrategy STRATEGY =
|
||||||
|
new OneForOneStrategy(5, Duration.create("1 minute"),
|
||||||
|
throwable -> {
|
||||||
|
if (throwable instanceof DuplicateKeyException
|
||||||
|
|| throwable instanceof TransactionSystemException
|
||||||
|
|| throwable instanceof ConcurrencyFailureException) {
|
||||||
|
LogUtils.error(log, "RequestHandlerActor handler exception", throwable);
|
||||||
|
return SupervisorStrategy.restart();
|
||||||
|
}else {
|
||||||
|
return SupervisorStrategy.escalate();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Receive createReceive() {
|
||||||
|
return receiveBuilder().match(NettyHttpRequest.class, nettyHttpRequest -> {
|
||||||
|
|
||||||
|
|
||||||
|
final String uri = nettyHttpRequest.getUri();
|
||||||
|
if (StringUtils.isBlank(uri)) {
|
||||||
|
LogUtils.error(log, "uri can not be null");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ChannelHandlerContext channelHandlerContext = nettyHttpRequest.getChannelHandlerContext();
|
||||||
|
|
||||||
|
final boolean keepAlive = nettyHttpRequest.isKeepAlive();
|
||||||
|
final HttpMethod method = nettyHttpRequest.getMethod();
|
||||||
|
final String content = nettyHttpRequest.getContent();
|
||||||
|
final HttpHeaders headers = nettyHttpRequest.getHeaders();
|
||||||
|
|
||||||
|
String result = doProcess(uri, content, method, headers);
|
||||||
|
|
||||||
|
writeResponse(channelHandlerContext, keepAlive, result);
|
||||||
|
|
||||||
|
}).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SupervisorStrategy supervisorStrategy() {
|
||||||
|
return STRATEGY;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private String doProcess(String uri, String content, HttpMethod method,
|
||||||
|
HttpHeaders headers) {
|
||||||
|
|
||||||
|
Register register = SpringContext.getBean(ClientRegister.BEAN_NAME, Register.class);
|
||||||
|
|
||||||
|
String hostId = headers.get(HeadersEnum.HOST_ID.getKey());
|
||||||
|
String hostIp = headers.get(HeadersEnum.HOST_IP.getKey());
|
||||||
|
Integer hostPort = headers.getInt(HeadersEnum.HOST_PORT.getKey());
|
||||||
|
String groupName = headers.get(HeadersEnum.GROUP_NAME.getKey());
|
||||||
|
String contextPath = headers.get(HeadersEnum.CONTEXT_PATH.getKey());
|
||||||
|
|
||||||
|
// 注册版本
|
||||||
|
RegisterContext registerContext = new RegisterContext();
|
||||||
|
registerContext.setContextPath(contextPath);
|
||||||
|
registerContext.setGroupName(groupName);
|
||||||
|
registerContext.setHostPort(hostPort);
|
||||||
|
registerContext.setHostIp(hostIp);
|
||||||
|
registerContext.setHostId(hostId);
|
||||||
|
boolean result = register.register(registerContext);
|
||||||
|
if (!result) {
|
||||||
|
LogUtils.warn(log, "client register error. groupName:[{}]", groupName);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 同步版本
|
||||||
|
ConfigVersionSyncHandler syncHandler = SpringContext.getBeanByType(ConfigVersionSyncHandler.class);
|
||||||
|
Integer clientVersion = headers.getInt(HeadersEnum.VERSION.getKey());
|
||||||
|
syncHandler.addSyncTask(groupName, clientVersion);
|
||||||
|
|
||||||
|
UrlBuilder builder = UrlBuilder.ofHttp(uri);
|
||||||
|
Collection<HttpRequestHandler> httpRequestHandlers = SpringContext.CONTEXT
|
||||||
|
.getBeansOfType(HttpRequestHandler.class).values();
|
||||||
|
for (HttpRequestHandler httpRequestHandler : httpRequestHandlers) {
|
||||||
|
if (httpRequestHandler.supports(builder.getPathStr()) && method.name()
|
||||||
|
.equals(httpRequestHandler.method().name())) {
|
||||||
|
return httpRequestHandler.doHandler(content, builder, headers);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return JsonUtil.toJsonString(new Result<>());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* write response
|
||||||
|
*/
|
||||||
|
private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {
|
||||||
|
// write response
|
||||||
|
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
|
||||||
|
Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8)); // Unpooled.wrappedBuffer(responseJson)
|
||||||
|
response.headers().set(HttpHeaderNames.CONTENT_TYPE,
|
||||||
|
HttpHeaderValues.APPLICATION_JSON); // HttpHeaderValues.TEXT_PLAIN.toString()
|
||||||
|
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
|
||||||
|
if (keepAlive) {
|
||||||
|
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
|
||||||
|
}
|
||||||
|
ctx.writeAndFlush(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -63,7 +63,7 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle {
|
|||||||
*/
|
*/
|
||||||
private static final long SEGMENT_DURATION = 15 * 60 * 1000L;
|
private static final long SEGMENT_DURATION = 15 * 60 * 1000L;
|
||||||
|
|
||||||
private static final String TIME_FORMAT = "yyyyMMddHHmmss";
|
private static final String TIME_FORMAT = "yyyyMMddHHmmssSSS";
|
||||||
|
|
||||||
private ThreadPoolExecutor service = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS,
|
private ThreadPoolExecutor service = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS,
|
||||||
new LinkedBlockingDeque<>(5000), new UpdateThreadFactory());
|
new LinkedBlockingDeque<>(5000), new UpdateThreadFactory());
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package com.aizuda.easy.retry.server.support.handler;
|
package com.aizuda.easy.retry.server.support.handler;
|
||||||
|
|
||||||
|
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||||
import com.aizuda.easy.retry.server.dto.RegisterNodeInfo;
|
import com.aizuda.easy.retry.server.dto.RegisterNodeInfo;
|
||||||
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.ServerNodeMapper;
|
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.ServerNodeMapper;
|
||||||
import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig;
|
import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig;
|
||||||
@ -10,6 +11,7 @@ import com.aizuda.easy.retry.server.support.allocate.client.ClientLoadBalanceMan
|
|||||||
import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable;
|
import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable;
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
import com.aizuda.easy.retry.server.support.ClientLoadBalance;
|
import com.aizuda.easy.retry.server.support.ClientLoadBalance;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
@ -25,6 +27,7 @@ import java.util.stream.Collectors;
|
|||||||
* @date : 2023-01-10 14:18
|
* @date : 2023-01-10 14:18
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
|
@Slf4j
|
||||||
public class ClientNodeAllocateHandler {
|
public class ClientNodeAllocateHandler {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
@ -39,6 +42,7 @@ public class ClientNodeAllocateHandler {
|
|||||||
GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(groupName);
|
GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(groupName);
|
||||||
Set<RegisterNodeInfo> serverNodes = CacheRegisterTable.getServerNodeSet(groupName);
|
Set<RegisterNodeInfo> serverNodes = CacheRegisterTable.getServerNodeSet(groupName);
|
||||||
if (CollectionUtils.isEmpty(serverNodes)) {
|
if (CollectionUtils.isEmpty(serverNodes)) {
|
||||||
|
LogUtils.warn(log, "client node is null. groupName:[{}]", groupName);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user