feat: 2.4.0

1. 优化客户端心跳优先于其他的请求进行客户端续约
This commit is contained in:
byteblogs168 2023-11-09 16:04:47 +08:00
parent ee7634b2b4
commit c7b2d831dc
5 changed files with 17 additions and 5 deletions

View File

@ -28,6 +28,11 @@ public class SystemProperties {
*/ */
private int retryPullPageSize = 1000; private int retryPullPageSize = 1000;
/**
* 重试每次拉取的次数
*/
private int retryMaxPullCount = 10;
/** /**
* netty 端口 * netty 端口
*/ */

View File

@ -1,5 +1,6 @@
package com.aizuda.easy.retry.server.common.register; package com.aizuda.easy.retry.server.common.register;
import com.aizuda.easy.retry.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum; import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup; import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup;
@ -14,7 +15,7 @@ import java.time.LocalDateTime;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -32,7 +33,7 @@ public class ClientRegister extends AbstractRegister implements Runnable {
public static final int DELAY_TIME = 30; public static final int DELAY_TIME = 30;
private Thread THREAD = null; private Thread THREAD = null;
protected static final LinkedBlockingQueue<ServerNode> QUEUE = new LinkedBlockingQueue<>(256); protected static final LinkedBlockingDeque<ServerNode> QUEUE = new LinkedBlockingDeque<>(256);
@Override @Override
public boolean supports(int type) { public boolean supports(int type) {
@ -50,7 +51,11 @@ public class ClientRegister extends AbstractRegister implements Runnable {
@Override @Override
protected boolean doRegister(RegisterContext context, ServerNode serverNode) { protected boolean doRegister(RegisterContext context, ServerNode serverNode) {
return QUEUE.offer(serverNode); if (HTTP_PATH.BEAT.equals(context.getUri())) {
return QUEUE.offerFirst(serverNode);
}
return QUEUE.offerLast(serverNode);
} }
@Override @Override

View File

@ -28,4 +28,6 @@ public class RegisterContext {
private String extAttrs; private String extAttrs;
private String uri;
} }

View File

@ -85,8 +85,7 @@ public abstract class AbstractScanGroup extends AbstractActor {
// 计算循环拉取的次数 // 计算循环拉取的次数
if (preCostTime().get() > 0) { if (preCostTime().get() > 0) {
long loopCount = Math.max((SystemConstants.SCHEDULE_PERIOD * 1000) / preCostTime().get(), 1); long loopCount = Math.max((SystemConstants.SCHEDULE_PERIOD * 1000) / preCostTime().get(), 1);
// TODO 最大拉取次数支持可配置 loopCount = Math.min(loopCount, systemProperties.getRetryMaxPullCount());
loopCount = Math.min(loopCount, 10);
prePullCount().set(loopCount); prePullCount().set(loopCount);
} }

View File

@ -95,6 +95,7 @@ public class RequestHandlerActor extends AbstractActor {
registerContext.setHostPort(hostPort); registerContext.setHostPort(hostPort);
registerContext.setHostIp(hostIp); registerContext.setHostIp(hostIp);
registerContext.setHostId(hostId); registerContext.setHostId(hostId);
registerContext.setUri(uri);
boolean result = register.register(registerContext); boolean result = register.register(registerContext);
if (!result) { if (!result) {
LogUtils.warn(log, "client register error. groupName:[{}]", groupName); LogUtils.warn(log, "client register error. groupName:[{}]", groupName);