diff --git a/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/report/AsyncReport.java b/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/report/AsyncReport.java index 323214ed..44d410a5 100644 --- a/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/report/AsyncReport.java +++ b/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/report/AsyncReport.java @@ -25,10 +25,6 @@ import java.util.concurrent.TimeUnit; public class AsyncReport extends AbstractReport implements Lifecycle { private static SlidingWindow slidingWindow; - private static ScheduledExecutorService dispatchService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "DispatchService")); - -// public static RetryLeapArray slidingWindow = new RetryLeapArray(SAMPLE_COUNT, INTERVAL_IN_MS, new ReportListener()); - @Override public boolean supports(boolean async) { return async; @@ -57,14 +53,10 @@ public class AsyncReport extends AbstractReport implements Lifecycle { slidingWindow = SlidingWindow .Builder .newBuilder() - .withTotalThreshold(50) + .withTotalThreshold(10) .withDuration(5, ChronoUnit.SECONDS) .withListener(new ReportListener()) .build(); - slidingWindow.start(); -// dispatchService.scheduleAtFixedRate(() -> { -// slidingWindow.currentWindow(); -// }, INTERVAL_IN_MS, INTERVAL_IN_MS / SAMPLE_COUNT, TimeUnit.MILLISECONDS); } @Override diff --git a/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/window/SlidingWindow.java b/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/window/SlidingWindow.java index 95cf577f..4457d058 100644 --- a/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/window/SlidingWindow.java +++ b/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/window/SlidingWindow.java @@ -21,9 +21,9 @@ import java.util.concurrent.locks.ReentrantLock; * * @author: www.byteblogs.com * @date : 2023-07-23 13:38 + * @since 2.1.0 */ @Slf4j -@SuppressWarnings({"squid:S1319"}) public class SlidingWindow { /** diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/akka/ActorGenerator.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/akka/ActorGenerator.java index 98fc3940..62aa6365 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/akka/ActorGenerator.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/akka/ActorGenerator.java @@ -3,6 +3,7 @@ package com.aizuda.easy.retry.server.akka; import akka.actor.ActorRef; import akka.actor.ActorSystem; import com.aizuda.easy.retry.common.core.context.SpringContext; +import com.aizuda.easy.retry.server.server.RequestHandlerActor; import com.aizuda.easy.retry.server.support.dispatch.actor.exec.ExecCallbackUnitActor; import com.aizuda.easy.retry.server.support.dispatch.actor.exec.ExecUnitActor; import com.aizuda.easy.retry.server.support.dispatch.actor.log.LogActor; @@ -91,9 +92,17 @@ public class ActorGenerator { * @return actor 引用 */ public static ActorRef logActor() { - return getLogActorSystemSystem().actorOf(getSpringExtension().props(LogActor.BEAN_NAME)); + return getNettyActorSystem().actorOf(getSpringExtension().props(LogActor.BEAN_NAME)); } + /** + * 生成扫描重试数据的actor + * + * @return actor 引用 + */ + public static ActorRef requestHandlerActor() { + return getLogActorSystemSystem().actorOf(getSpringExtension().props(RequestHandlerActor.BEAN_NAME)); + } public static SpringExtension getSpringExtension() { return SpringContext.getBeanByType(SpringExtension.class); @@ -131,4 +140,14 @@ public class ActorGenerator { public static ActorSystem getLogActorSystemSystem() { return SpringContext.getBean("logActorSystem", ActorSystem.class); } + + + /** + * 处理netty客户端请求 + * + * @return + */ + public static ActorSystem getNettyActorSystem() { + return SpringContext.getBean("nettyActorSystem", ActorSystem.class); + } } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/akka/AkkaConfiguration.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/akka/AkkaConfiguration.java index 58c18c44..f1ab1a40 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/akka/AkkaConfiguration.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/akka/AkkaConfiguration.java @@ -19,6 +19,7 @@ public class AkkaConfiguration { private static final String DISPATCH_EXEC_UNIT_RETRY_ACTOR_SYSTEM = "DISPATCH_EXEC_UNIT_RETRY_ACTOR_SYSTEM"; private static final String DISPATCH_RESULT_ACTOR_SYSTEM = "DISPATCH_RESULT_ACTOR_SYSTEM"; private static final String LOG_ACTOR_SYSTEM = "LOG_ACTOR_SYSTEM"; + private static final String NETTY_ACTOR_SYSTEM = "nettyActorSystem"; @Autowired private ApplicationContext applicationContext; @@ -73,4 +74,16 @@ public class AkkaConfiguration { springExtension.initialize(applicationContext); return system; } + + /** + * 处理netty客户端请求 + * + * @return {@link ActorSystem} 顶级actor + */ + @Bean("nettyActorSystem") + public ActorSystem nettyActorSystem() { + ActorSystem system = ActorSystem.create(NETTY_ACTOR_SYSTEM); + springExtension.initialize(applicationContext); + return system; + } } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/dto/NettyHttpRequest.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/dto/NettyHttpRequest.java new file mode 100644 index 00000000..0ea4feed --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/dto/NettyHttpRequest.java @@ -0,0 +1,32 @@ +package com.aizuda.easy.retry.server.dto; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; +import lombok.Builder; +import lombok.Data; + +/** + * netty客户端请求模型 + * + * @author: www.byteblogs.com + * @date : 2023-07-24 09:32 + */ +@Data +@Builder +public class NettyHttpRequest { + + private ChannelHandlerContext channelHandlerContext; + + private String content; + + private boolean keepAlive; + + private HttpMethod method; + + private String uri; + + private HttpHeaders headers; + +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/server/NettyHttpServerHandler.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/server/NettyHttpServerHandler.java index f6f166be..f712ead9 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/server/NettyHttpServerHandler.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/server/NettyHttpServerHandler.java @@ -1,35 +1,15 @@ package com.aizuda.easy.retry.server.server; -import cn.hutool.core.net.url.UrlBuilder; -import com.aizuda.easy.retry.common.core.context.SpringContext; -import com.aizuda.easy.retry.common.core.enums.HeadersEnum; -import com.aizuda.easy.retry.common.core.log.LogUtils; -import com.aizuda.easy.retry.common.core.model.Result; -import com.aizuda.easy.retry.common.core.util.JsonUtil; -import com.aizuda.easy.retry.server.exception.EasyRetryServerException; -import com.aizuda.easy.retry.server.server.handler.HttpRequestHandler; -import com.aizuda.easy.retry.server.support.Register; -import com.aizuda.easy.retry.server.support.handler.ConfigVersionSyncHandler; -import com.aizuda.easy.retry.server.support.register.ClientRegister; -import com.aizuda.easy.retry.server.support.register.RegisterContext; -import io.netty.buffer.Unpooled; +import akka.actor.ActorRef; +import com.aizuda.easy.retry.server.akka.ActorGenerator; +import com.aizuda.easy.retry.server.dto.NettyHttpRequest; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; -import io.netty.handler.codec.http.FullHttpResponse; -import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpHeaderValues; -import io.netty.handler.codec.http.HttpHeaders; -import io.netty.handler.codec.http.HttpMethod; -import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpUtil; -import io.netty.handler.codec.http.HttpVersion; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang.StringUtils; -import java.util.Collection; import java.util.concurrent.ThreadPoolExecutor; /** @@ -39,88 +19,26 @@ import java.util.concurrent.ThreadPoolExecutor; @Slf4j public class NettyHttpServerHandler extends SimpleChannelInboundHandler { - private ThreadPoolExecutor threadPoolExecutor; - public NettyHttpServerHandler(final ThreadPoolExecutor serverHandlerPool) { - this.threadPoolExecutor = serverHandlerPool; } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) throws Exception { - final String content = fullHttpRequest.content().toString(CharsetUtil.UTF_8); - final boolean keepAlive = HttpUtil.isKeepAlive(fullHttpRequest); - final HttpMethod method = fullHttpRequest.method(); - final String uri = fullHttpRequest.uri(); - threadPoolExecutor.execute(() -> { + NettyHttpRequest nettyHttpRequest = NettyHttpRequest.builder() + .keepAlive(HttpUtil.isKeepAlive(fullHttpRequest)) + .uri(fullHttpRequest.uri()) + .channelHandlerContext(channelHandlerContext) + .method(fullHttpRequest.method()) + .headers(fullHttpRequest.headers()) + .content(fullHttpRequest.content().toString(CharsetUtil.UTF_8)) + .build(); - String result = doProcess(channelHandlerContext, uri, content, method, fullHttpRequest.headers()); - - writeResponse(channelHandlerContext, keepAlive, result); - }); + ActorRef actorRef = ActorGenerator.requestHandlerActor(); + actorRef.tell(nettyHttpRequest, actorRef); } - private String doProcess(ChannelHandlerContext channelHandlerContext, String uri, String content, HttpMethod method, - HttpHeaders headers) { - - if (StringUtils.isBlank(uri)) { - throw new EasyRetryServerException("uri 不能为空"); - } - - Register register = SpringContext.getBean(ClientRegister.BEAN_NAME, Register.class); - - String hostId = headers.get(HeadersEnum.HOST_ID.getKey()); - String hostIp = headers.get(HeadersEnum.HOST_IP.getKey()); - Integer hostPort = headers.getInt(HeadersEnum.HOST_PORT.getKey()); - String groupName = headers.get(HeadersEnum.GROUP_NAME.getKey()); - String contextPath = headers.get(HeadersEnum.CONTEXT_PATH.getKey()); - - // 注册版本 - RegisterContext registerContext = new RegisterContext(); - registerContext.setContextPath(contextPath); - registerContext.setGroupName(groupName); - registerContext.setHostPort(hostPort); - registerContext.setHostIp(hostIp); - registerContext.setHostId(hostId); - boolean result = register.register(registerContext); - if (!result) { - LogUtils.warn(log, "client register error. groupName:[{}]", groupName); - } - - // 同步版本 - ConfigVersionSyncHandler syncHandler = SpringContext.getBeanByType(ConfigVersionSyncHandler.class); - Integer clientVersion = headers.getInt(HeadersEnum.VERSION.getKey()); - syncHandler.addSyncTask(groupName, clientVersion); - - UrlBuilder builder = UrlBuilder.ofHttp(uri); - Collection httpRequestHandlers = SpringContext.CONTEXT - .getBeansOfType(HttpRequestHandler.class).values(); - for (HttpRequestHandler httpRequestHandler : httpRequestHandlers) { - if (httpRequestHandler.supports(builder.getPathStr()) && method.name() - .equals(httpRequestHandler.method().name())) { - return httpRequestHandler.doHandler(content, builder, headers); - } - } - - return JsonUtil.toJsonString(new Result<>()); - } - - /** - * write response - */ - private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) { - // write response - FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, - Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8)); // Unpooled.wrappedBuffer(responseJson) - response.headers().set(HttpHeaderNames.CONTENT_TYPE, - HttpHeaderValues.APPLICATION_JSON); // HttpHeaderValues.TEXT_PLAIN.toString() - response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); - if (keepAlive) { - response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); - } - ctx.writeAndFlush(response); - } } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/server/RequestHandlerActor.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/server/RequestHandlerActor.java new file mode 100644 index 00000000..2b075788 --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/server/RequestHandlerActor.java @@ -0,0 +1,162 @@ +package com.aizuda.easy.retry.server.server; + +import akka.actor.AbstractActor; +import akka.actor.OneForOneStrategy; +import akka.actor.SupervisorStrategy; +import cn.hutool.core.net.url.UrlBuilder; +import com.aizuda.easy.retry.common.core.context.SpringContext; +import com.aizuda.easy.retry.common.core.enums.HeadersEnum; +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.common.core.model.EasyRetryRequest; +import com.aizuda.easy.retry.common.core.model.Result; +import com.aizuda.easy.retry.common.core.util.JsonUtil; +import com.aizuda.easy.retry.server.dto.NettyHttpRequest; +import com.aizuda.easy.retry.server.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.server.handler.HttpRequestHandler; +import com.aizuda.easy.retry.server.support.Register; +import com.aizuda.easy.retry.server.support.dispatch.actor.log.LogActor; +import com.aizuda.easy.retry.server.support.handler.ConfigVersionSyncHandler; +import com.aizuda.easy.retry.server.support.register.ClientRegister; +import com.aizuda.easy.retry.server.support.register.RegisterContext; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.util.CharsetUtil; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.dao.ConcurrencyFailureException; +import org.springframework.dao.DuplicateKeyException; +import org.springframework.stereotype.Component; +import org.springframework.transaction.TransactionSystemException; +import scala.concurrent.duration.Duration; + +import java.util.Collection; + +/** + * 处理netty客户端请求 + * + * @author: www.byteblogs.com + * @date : 2023-07-24 09:20 + * @since 2.1.0 + */ +@Component(RequestHandlerActor.BEAN_NAME) +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +@Slf4j +public class RequestHandlerActor extends AbstractActor { + + public static final String BEAN_NAME = "requestHandlerActor"; + + private static final SupervisorStrategy STRATEGY = + new OneForOneStrategy(5, Duration.create("1 minute"), + throwable -> { + if (throwable instanceof DuplicateKeyException + || throwable instanceof TransactionSystemException + || throwable instanceof ConcurrencyFailureException) { + LogUtils.error(log, "RequestHandlerActor handler exception", throwable); + return SupervisorStrategy.restart(); + }else { + return SupervisorStrategy.escalate(); + } + }); + + + @Override + public Receive createReceive() { + return receiveBuilder().match(NettyHttpRequest.class, nettyHttpRequest -> { + + + final String uri = nettyHttpRequest.getUri(); + if (StringUtils.isBlank(uri)) { + LogUtils.error(log, "uri can not be null"); + return; + } + + ChannelHandlerContext channelHandlerContext = nettyHttpRequest.getChannelHandlerContext(); + + final boolean keepAlive = nettyHttpRequest.isKeepAlive(); + final HttpMethod method = nettyHttpRequest.getMethod(); + final String content = nettyHttpRequest.getContent(); + final HttpHeaders headers = nettyHttpRequest.getHeaders(); + + String result = doProcess(uri, content, method, headers); + + writeResponse(channelHandlerContext, keepAlive, result); + + }).build(); + } + + @Override + public SupervisorStrategy supervisorStrategy() { + return STRATEGY; + } + + + private String doProcess(String uri, String content, HttpMethod method, + HttpHeaders headers) { + + Register register = SpringContext.getBean(ClientRegister.BEAN_NAME, Register.class); + + String hostId = headers.get(HeadersEnum.HOST_ID.getKey()); + String hostIp = headers.get(HeadersEnum.HOST_IP.getKey()); + Integer hostPort = headers.getInt(HeadersEnum.HOST_PORT.getKey()); + String groupName = headers.get(HeadersEnum.GROUP_NAME.getKey()); + String contextPath = headers.get(HeadersEnum.CONTEXT_PATH.getKey()); + + // 注册版本 + RegisterContext registerContext = new RegisterContext(); + registerContext.setContextPath(contextPath); + registerContext.setGroupName(groupName); + registerContext.setHostPort(hostPort); + registerContext.setHostIp(hostIp); + registerContext.setHostId(hostId); + boolean result = register.register(registerContext); + if (!result) { + LogUtils.warn(log, "client register error. groupName:[{}]", groupName); + } + + // 同步版本 + ConfigVersionSyncHandler syncHandler = SpringContext.getBeanByType(ConfigVersionSyncHandler.class); + Integer clientVersion = headers.getInt(HeadersEnum.VERSION.getKey()); + syncHandler.addSyncTask(groupName, clientVersion); + + UrlBuilder builder = UrlBuilder.ofHttp(uri); + Collection httpRequestHandlers = SpringContext.CONTEXT + .getBeansOfType(HttpRequestHandler.class).values(); + for (HttpRequestHandler httpRequestHandler : httpRequestHandlers) { + if (httpRequestHandler.supports(builder.getPathStr()) && method.name() + .equals(httpRequestHandler.method().name())) { + return httpRequestHandler.doHandler(content, builder, headers); + } + } + + return JsonUtil.toJsonString(new Result<>()); + } + + /** + * write response + */ + private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) { + // write response + FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, + Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8)); // Unpooled.wrappedBuffer(responseJson) + response.headers().set(HttpHeaderNames.CONTENT_TYPE, + HttpHeaderValues.APPLICATION_JSON); // HttpHeaderValues.TEXT_PLAIN.toString() + response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); + if (keepAlive) { + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + } + ctx.writeAndFlush(response); + } + +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentIdGenerator.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentIdGenerator.java index edbb1110..40fef9c2 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentIdGenerator.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/generator/id/SegmentIdGenerator.java @@ -63,7 +63,7 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle { */ private static final long SEGMENT_DURATION = 15 * 60 * 1000L; - private static final String TIME_FORMAT = "yyyyMMddHHmmss"; + private static final String TIME_FORMAT = "yyyyMMddHHmmssSSS"; private ThreadPoolExecutor service = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(5000), new UpdateThreadFactory()); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ClientNodeAllocateHandler.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ClientNodeAllocateHandler.java index 3dbd02a0..22f89334 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ClientNodeAllocateHandler.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/ClientNodeAllocateHandler.java @@ -1,5 +1,6 @@ package com.aizuda.easy.retry.server.support.handler; +import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.persistence.mybatis.mapper.ServerNodeMapper; import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig; @@ -10,6 +11,7 @@ import com.aizuda.easy.retry.server.support.allocate.client.ClientLoadBalanceMan import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.aizuda.easy.retry.server.support.ClientLoadBalance; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; @@ -25,6 +27,7 @@ import java.util.stream.Collectors; * @date : 2023-01-10 14:18 */ @Component +@Slf4j public class ClientNodeAllocateHandler { @Autowired @@ -39,6 +42,7 @@ public class ClientNodeAllocateHandler { GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(groupName); Set serverNodes = CacheRegisterTable.getServerNodeSet(groupName); if (CollectionUtils.isEmpty(serverNodes)) { + LogUtils.warn(log, "client node is null. groupName:[{}]", groupName); return null; }