diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java index 87058a90..2c9c66b9 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java @@ -28,6 +28,11 @@ public class SystemProperties { */ private int retryPullPageSize = 1000; + /** + * 重试每次拉取的次数 + */ + private int retryMaxPullCount = 10; + /** * netty 端口 */ diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ClientRegister.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ClientRegister.java index b162eb9b..89ff5978 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ClientRegister.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ClientRegister.java @@ -1,5 +1,6 @@ package com.aizuda.easy.retry.server.common.register; +import com.aizuda.easy.retry.common.core.constant.SystemConstants.HTTP_PATH; import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup; @@ -14,7 +15,7 @@ import java.time.LocalDateTime; import java.util.List; import java.util.Objects; import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; /** @@ -32,7 +33,7 @@ public class ClientRegister extends AbstractRegister implements Runnable { public static final int DELAY_TIME = 30; private Thread THREAD = null; - protected static final LinkedBlockingQueue QUEUE = new LinkedBlockingQueue<>(256); + protected static final LinkedBlockingDeque QUEUE = new LinkedBlockingDeque<>(256); @Override public boolean supports(int type) { @@ -50,7 +51,11 @@ public class ClientRegister extends AbstractRegister implements Runnable { @Override protected boolean doRegister(RegisterContext context, ServerNode serverNode) { - return QUEUE.offer(serverNode); + if (HTTP_PATH.BEAT.equals(context.getUri())) { + return QUEUE.offerFirst(serverNode); + } + + return QUEUE.offerLast(serverNode); } @Override diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/RegisterContext.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/RegisterContext.java index 9f209234..d103662d 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/RegisterContext.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/RegisterContext.java @@ -28,4 +28,6 @@ public class RegisterContext { private String extAttrs; + private String uri; + } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java index a9bfb8d7..cb66999b 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java @@ -85,8 +85,7 @@ public abstract class AbstractScanGroup extends AbstractActor { // 计算循环拉取的次数 if (preCostTime().get() > 0) { long loopCount = Math.max((SystemConstants.SCHEDULE_PERIOD * 1000) / preCostTime().get(), 1); - // TODO 最大拉取次数支持可配置 - loopCount = Math.min(loopCount, 10); + loopCount = Math.min(loopCount, systemProperties.getRetryMaxPullCount()); prePullCount().set(loopCount); } diff --git a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/server/RequestHandlerActor.java b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/server/RequestHandlerActor.java index f4142679..b6a77304 100644 --- a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/server/RequestHandlerActor.java +++ b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/server/RequestHandlerActor.java @@ -95,6 +95,7 @@ public class RequestHandlerActor extends AbstractActor { registerContext.setHostPort(hostPort); registerContext.setHostIp(hostIp); registerContext.setHostId(hostId); + registerContext.setUri(uri); boolean result = register.register(registerContext); if (!result) { LogUtils.warn(log, "client register error. groupName:[{}]", groupName);