feat: 2.4.0

1. 优化客户端负载均衡
This commit is contained in:
byteblogs168 2023-10-13 21:40:00 +08:00
parent 45d0c427eb
commit 88ae12540a
35 changed files with 258 additions and 164 deletions

View File

@ -12,7 +12,6 @@ CREATE TABLE `group_config`
`group_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '组状态 0、未启用 1、启用',
`version` int(11) NOT NULL COMMENT '版本号',
`group_partition` int(11) NOT NULL COMMENT '分区',
`route_key` tinyint(4) NOT NULL COMMENT '路由策略',
`id_generator_mode` tinyint(4) NOT NULL DEFAULT '1' COMMENT '唯一id生成模式 默认号段模式',
`init_scene` tinyint(4) NOT NULL DEFAULT '0' COMMENT '是否初始化场景 0:否 1:是',
`bucket_index` int(11) DEFAULT NULL COMMENT 'bucket',
@ -135,6 +134,7 @@ CREATE TABLE `scene_config`
`back_off` tinyint(4) NOT NULL DEFAULT '1' COMMENT '1、默认等级 2、固定间隔时间 3、CRON 表达式',
`trigger_interval` varchar(16) NOT NULL DEFAULT '' COMMENT '间隔时长',
`deadline_request` bigint(20) unsigned NOT NULL DEFAULT '60000' COMMENT 'Deadline Request 调用链超时 单位毫秒',
`route_key` tinyint(4) NOT NULL DEFAULT '4' COMMENT '路由策略',
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
@ -225,7 +225,7 @@ CREATE TABLE `job` (
`next_trigger_at` datetime NOT NULL COMMENT '下次触发时间',
`job_status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '重试状态 0、关闭、1、开启',
`task_type` varchar(255) DEFAULT NULL COMMENT '任务类型 1、集群 2、广播 3、切片',
`route_key` varchar(50) DEFAULT NULL COMMENT '执行器路由策略',
`route_key` tinyint(4) NOT NULL DEFAULT '4' COMMENT '路由策略',
`executor_type` tinyint(4) NOT NULL DEFAULT '1' COMMENT '执行器类型',
`executor_name` varchar(255) DEFAULT NULL COMMENT '执行器名称',
`trigger_type` tinyint(4) NOT NULL COMMENT '触发类型 1.CRON 表达式 2. 固定时间',

View File

@ -19,6 +19,7 @@ public class GroupConfig implements Serializable {
private Integer groupPartition;
@Deprecated
private Integer routeKey;
private Integer idGeneratorMode;

View File

@ -67,7 +67,7 @@ public class Job implements Serializable {
/**
* 执行器路由策略
*/
private String routeKey;
private Integer routeKey;
/**
* 执行器类型 1Java

View File

@ -29,6 +29,8 @@ public class SceneConfig implements Serializable {
private Long deadlineRequest;
private Integer routeKey;
private LocalDateTime createDt;
private LocalDateTime updateDt;

View File

@ -8,7 +8,7 @@ import java.util.TreeSet;
*/
public interface ClientLoadBalance {
String route(String currentGroupName, TreeSet<String> clientAllAddressSet);
String route(String key, TreeSet<String> clientAllAddressSet);
int routeType();

View File

@ -1,4 +1,4 @@
package com.aizuda.easy.retry.server.common.handler;
package com.aizuda.easy.retry.server.common;
import cn.hutool.core.net.url.UrlBuilder;
import io.netty.handler.codec.http.HttpHeaders;

View File

@ -1,7 +1,7 @@
package com.aizuda.easy.retry.server.common.allocate.client;
import com.aizuda.easy.retry.server.common.ClientLoadBalance;
import com.aizuda.easy.retry.server.common.enums.AllocationAlgorithmEnum;
import com.aizuda.easy.retry.server.common.allocate.client.ClientLoadBalanceManager.AllocationAlgorithmEnum;
import com.aizuda.easy.retry.server.common.allocate.common.ConsistentHashRouter;
import com.aizuda.easy.retry.server.common.allocate.common.Node;
@ -22,7 +22,7 @@ public class ClientLoadBalanceConsistentHash implements ClientLoadBalance {
}
@Override
public String route(String currentGroupName, TreeSet<String> clientAllAddressSet) {
public String route(String allocKey, TreeSet<String> clientAllAddressSet) {
Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
for (String clientAddress : clientAllAddressSet) {
@ -30,7 +30,7 @@ public class ClientLoadBalanceConsistentHash implements ClientLoadBalance {
}
final ConsistentHashRouter<ClientNode> consistentHashRouter = new ConsistentHashRouter<>(cidNodes, virtualNodeCnt);
ClientNode clientNode = consistentHashRouter.routeNode(currentGroupName);
ClientNode clientNode = consistentHashRouter.routeNode(allocKey);
return clientNode.clientAddress;
}

View File

@ -23,8 +23,8 @@ public class ClientLoadBalanceLRU implements ClientLoadBalance {
private ConcurrentHashMap<String, LinkedHashMap<String, String>> LRU_CACHE = new ConcurrentHashMap<>();
@Override
public String route(String currentGroupName, TreeSet<String> clientAllAddressSet) {
LinkedHashMap<String, String> lruItem = LRU_CACHE.get(currentGroupName);
public String route(String allocKey, TreeSet<String> clientAllAddressSet) {
LinkedHashMap<String, String> lruItem = LRU_CACHE.get(allocKey);
if (Objects.isNull(lruItem)) {
lruItem = new LinkedHashMap<String, String>(16, 0.75f, true) {
@Override
@ -32,13 +32,13 @@ public class ClientLoadBalanceLRU implements ClientLoadBalance {
return super.size() > size;
}
};
LRU_CACHE.put(allocKey, lruItem);
}
// 添加新数据
for (String address: clientAllAddressSet) {
if (!lruItem.containsKey(address)) {
lruItem.put(address, address);
}
lruItem.computeIfAbsent(address, key -> address);
}
// 删除已经下线的节点

View File

@ -1,6 +1,7 @@
package com.aizuda.easy.retry.server.common.allocate.client;
import com.aizuda.easy.retry.server.common.ClientLoadBalance;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import lombok.Getter;
import org.springframework.stereotype.Component;
@ -8,7 +9,6 @@ import org.springframework.stereotype.Component;
* @author: www.byteblogs.com
* @date : 2022-03-12 10:20
*/
@Component
public class ClientLoadBalanceManager {
public static ClientLoadBalance getClientLoadBalance(int routeType) {
@ -19,7 +19,7 @@ public class ClientLoadBalanceManager {
}
}
return null;
throw new EasyRetryServerException("routeType is not existed. routeType:[{}]", routeType);
}
@Getter
@ -28,6 +28,7 @@ public class ClientLoadBalanceManager {
CONSISTENT_HASH(1, new ClientLoadBalanceConsistentHash(100)),
RANDOM(2, new ClientLoadBalanceRandom()),
LRU(3, new ClientLoadBalanceLRU(100)),
ROUND(4, new ClientLoadBalanceRound())
;
private final int type;

View File

@ -1,8 +1,7 @@
package com.aizuda.easy.retry.server.common.allocate.client;
import com.aizuda.easy.retry.server.common.ClientLoadBalance;
import com.aizuda.easy.retry.server.common.enums.AllocationAlgorithmEnum;
import org.springframework.stereotype.Component;
import com.aizuda.easy.retry.server.common.allocate.client.ClientLoadBalanceManager.AllocationAlgorithmEnum;
import java.util.Random;
import java.util.TreeSet;
@ -11,13 +10,12 @@ import java.util.TreeSet;
* @author: www.byteblogs.com
* @date : 2022-03-11 22:00
*/
@Component
public class ClientLoadBalanceRandom implements ClientLoadBalance {
private Random random = new Random();
@Override
public String route(String currentGroupName, TreeSet<String> clientAllAddressSet) {
public String route(String allocKey, TreeSet<String> clientAllAddressSet) {
String[] addressArr = clientAllAddressSet.toArray(new String[clientAllAddressSet.size()]);
return addressArr[random.nextInt(clientAllAddressSet.size())];
}

View File

@ -0,0 +1,37 @@
package com.aizuda.easy.retry.server.common.allocate.client;
import com.aizuda.easy.retry.server.common.ClientLoadBalance;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author: www.byteblogs.com
* @date : 2023-10-13 15:43
* @since : 2.4.0
*/
public class ClientLoadBalanceRound implements ClientLoadBalance {
private static final ConcurrentHashMap<String, AtomicInteger> COUNTER = new ConcurrentHashMap<>();
private static final int THRESHOLD = Integer.MAX_VALUE - 10000;
@Override
public String route(final String allocKey, final TreeSet<String> clientAllAddressSet) {
String[] addressArr = clientAllAddressSet.toArray(new String[0]);
AtomicInteger next = COUNTER.getOrDefault(allocKey, new AtomicInteger(1));
String nextClientId = addressArr[ next.get() % clientAllAddressSet.size()];
int nextIndex = next.incrementAndGet();
if (nextIndex > THRESHOLD) {
next = new AtomicInteger(1);
}
COUNTER.put(allocKey, next);
return nextClientId;
}
@Override
public int routeType() {
return 0;
}
}

View File

@ -1,6 +1,7 @@
package com.aizuda.easy.retry.server.common.client;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.github.rholder.retry.RetryListener;
@ -18,14 +19,14 @@ public class RequestBuilder<T, R> {
private Class<T> clintInterface;
private String groupName;
private String hostId;
private String hostIp;
private Integer hostPort;
private String contextPath;
private RegisterNodeInfo nodeInfo;
private boolean failRetry;
private int retryTimes = 3;
private int retryInterval = 1;
private RetryListener retryListener = new SimpleRetryListener();
private boolean failover;
private int routeKey;
private String allocKey;
public static <T, R> RequestBuilder<T, R> newBuilder() {
@ -37,23 +38,8 @@ public class RequestBuilder<T, R> {
return this;
}
public RequestBuilder<T, R> hostPort(Integer hostPort) {
this.hostPort = hostPort;
return this;
}
public RequestBuilder<T, R> contextPath(String contextPath) {
this.contextPath = contextPath;
return this;
}
public RequestBuilder<T, R> hostId(String hostId) {
this.hostId = hostId;
return this;
}
public RequestBuilder<T, R> hostIp(String hostIp) {
this.hostIp = hostIp;
public RequestBuilder<T, R> nodeInfo(RegisterNodeInfo nodeInfo) {
this.nodeInfo = nodeInfo;
return this;
}
@ -82,6 +68,21 @@ public class RequestBuilder<T, R> {
return this;
}
public RequestBuilder<T, R> allocKey(boolean failover) {
this.failover = failover;
return this;
}
public RequestBuilder<T, R> routeKey(int routeKey) {
this.routeKey = routeKey;
return this;
}
public RequestBuilder<T, R> allocKey(String allocKey) {
this.allocKey = allocKey;
return this;
}
public T build() {
if (Objects.isNull(clintInterface)) {
@ -89,11 +90,12 @@ public class RequestBuilder<T, R> {
}
Assert.notBlank(groupName, () -> new EasyRetryServerException("groupName cannot be null"));
Assert.notBlank(hostId, () -> new EasyRetryServerException("hostId cannot be null"));
Assert.notBlank(hostIp, () -> new EasyRetryServerException("hostIp cannot be null"));
Assert.notNull(hostPort, () -> new EasyRetryServerException("hostPort cannot be null"));
Assert.notBlank(contextPath, () -> new EasyRetryServerException("contextPath cannot be null"));
Assert.notNull(nodeInfo, () -> new EasyRetryServerException("nodeInfo cannot be null"));
if (failover) {
Assert.isTrue(routeKey > 0, () -> new EasyRetryServerException("routeKey cannot be null"));
Assert.notBlank(allocKey, () -> new EasyRetryServerException("allocKey cannot be null"));
}
try {
clintInterface = (Class<T>) Class.forName(clintInterface.getName());
} catch (Exception e) {
@ -101,7 +103,7 @@ public class RequestBuilder<T, R> {
}
RpcClientInvokeHandler clientInvokeHandler = new RpcClientInvokeHandler(
groupName, hostId, hostIp, hostPort, contextPath, failRetry, retryTimes, retryInterval, retryListener);
groupName, nodeInfo, failRetry, retryTimes, retryInterval, retryListener, routeKey, allocKey, failover);
return (T) Proxy.newProxyInstance(clintInterface.getClassLoader(),
new Class[]{clintInterface}, clientInvokeHandler);

View File

@ -7,6 +7,7 @@ import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.core.util.HostUtils;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.allocate.client.ClientLoadBalanceManager.AllocationAlgorithmEnum;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.client.annotation.Body;
import com.aizuda.easy.retry.server.common.client.annotation.Header;
@ -29,6 +30,7 @@ import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.OkHttp3ClientHttpRequestFactory;
import org.springframework.util.CollectionUtils;
import org.springframework.web.client.ResourceAccessException;
import org.springframework.web.client.RestClientException;
@ -52,27 +54,47 @@ import java.util.concurrent.TimeUnit;
* @since 2.0.0
*/
@Slf4j
@AllArgsConstructor
public class RpcClientInvokeHandler implements InvocationHandler {
public static final String URL = "http://{0}:{1}/{2}";
private String groupName;
private final String groupName;
private String hostId;
private String hostIp;
private Integer hostPort;
private String contextPath;
private boolean failRetry;
private int retryTimes;
private int retryInterval;
private RetryListener retryListener;
private final boolean failRetry;
private final int retryTimes;
private final int retryInterval;
private final RetryListener retryListener;
private final boolean failover;
private final Integer routeKey;
private final String allocKey;
public RpcClientInvokeHandler(final String groupName, final RegisterNodeInfo registerNodeInfo,
final boolean failRetry, final int retryTimes,
final int retryInterval, final RetryListener retryListener, final Integer routeKey, final String allocKey,
final boolean failover) {
this.groupName = groupName;
this.hostId = registerNodeInfo.getHostId();
this.hostPort = registerNodeInfo.getHostPort();
this.hostIp = registerNodeInfo.getHostIp();
this.contextPath = registerNodeInfo.getContextPath();
this.failRetry = failRetry;
this.retryTimes = retryTimes;
this.retryInterval = retryInterval;
this.retryListener = retryListener;
this.failover = failover;
this.routeKey = routeKey;
this.allocKey = allocKey;
}
@Override
public Result invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
Mapping annotation = method.getAnnotation(Mapping.class);
Assert.notNull(annotation, () -> new EasyRetryServerException("@Mapping cannot be null"));
if (annotation.failover()) {
if (failover) {
return doFailoverHandler(method, args, annotation);
}
@ -134,7 +156,7 @@ public class RpcClientInvokeHandler implements InvocationHandler {
return result;
} catch (RestClientException ex) {
// 网络异常
if (ex instanceof ResourceAccessException && mapping.failover()) {
if (ex instanceof ResourceAccessException && failover) {
log.error("request client I/O error, count:[{}] clientId:[{}] clientAddr:[{}:{}] serverIp:[{}]", count,
hostId, hostIp, hostPort, HostUtils.getIp(), ex);
@ -143,7 +165,8 @@ public class RpcClientInvokeHandler implements InvocationHandler {
// 重新选一个可用的客户端节点
ClientNodeAllocateHandler clientNodeAllocateHandler = SpringContext.CONTEXT.getBean(
ClientNodeAllocateHandler.class);
RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(groupName);
RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(allocKey, groupName,
routeKey);
// 这里表示无可用节点
if (Objects.isNull(serverNode)) {
throw ex;

View File

@ -24,10 +24,4 @@ public @interface Mapping {
String path() default "";
/**
* 是否支持失败转移
* @return false or trur
*/
boolean failover() default false;
}

View File

@ -34,6 +34,10 @@ public class RegisterNodeInfo implements Comparable<RegisterNodeInfo> {
return MessageFormat.format(URL, hostIp, hostPort.toString(), contextPath);
}
public String address() {
return MessageFormat.format("{0}:{1}", hostIp, hostPort.toString());
}
@Override
public int compareTo(RegisterNodeInfo info) {
return hostId.compareTo(info.hostId);

View File

@ -1,21 +0,0 @@
package com.aizuda.easy.retry.server.common.enums;
import lombok.Getter;
/**
* @author: www.byteblogs.com
* @date : 2022-03-11 21:55
*/
@Getter
public enum AllocationAlgorithmEnum {
CONSISTENT_HASH(1),
RANDOM(2),
LRU(3),
;
private final int type;
AllocationAlgorithmEnum(int type) {
this.type = type;
}
}

View File

@ -3,6 +3,7 @@ package com.aizuda.easy.retry.server.common.handler;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.ClientLoadBalance;
import com.aizuda.easy.retry.server.common.allocate.client.ClientLoadBalanceManager;
import com.aizuda.easy.retry.server.common.allocate.client.ClientLoadBalanceManager.AllocationAlgorithmEnum;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
@ -15,6 +16,7 @@ import org.springframework.util.CollectionUtils;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* @author: www.byteblogs.com
@ -29,20 +31,25 @@ public class ClientNodeAllocateHandler {
/**
* 获取分配的节点
* @param allocKey 分配的key
* @param groupName 组名称
* @param routeKey {@link AllocationAlgorithmEnum} 路由类型
*/
public RegisterNodeInfo getServerNode(String groupName) {
public RegisterNodeInfo getServerNode(String allocKey, String groupName, Integer routeKey) {
GroupConfig groupConfig = accessTemplate.getGroupConfigAccess().getGroupConfigByGroupName(groupName);
Set<RegisterNodeInfo> serverNodes = CacheRegisterTable.getServerNodeSet(groupName);
if (CollectionUtils.isEmpty(serverNodes)) {
LogUtils.warn(log, "client node is null. groupName:[{}]", groupName);
return null;
}
ClientLoadBalance clientLoadBalanceRandom = ClientLoadBalanceManager.getClientLoadBalance(groupConfig.getRouteKey());
ClientLoadBalance clientLoadBalanceRandom = ClientLoadBalanceManager.getClientLoadBalance(routeKey);
String hostIp = clientLoadBalanceRandom.route(groupName, new TreeSet<>(serverNodes.stream().map(RegisterNodeInfo::getHostIp).collect(Collectors.toSet())));
return serverNodes.stream().filter(s -> s.getHostIp().equals(hostIp)).findFirst().get();
String hostIp = clientLoadBalanceRandom.route(allocKey, new TreeSet<>(serverNodes.stream().map(RegisterNodeInfo::getHostIp).collect(Collectors.toSet())));
Stream<RegisterNodeInfo> registerNodeInfoStream = serverNodes.stream()
.filter(s -> s.getHostIp().equals(hostIp));
return registerNodeInfoStream.findFirst().orElse(null);
}

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.common.handler;
import cn.hutool.core.net.url.UrlBuilder;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.easy.retry.server.common.HttpRequestHandler;
import io.netty.handler.codec.http.HttpHeaders;
/**

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.common.handler;
import cn.hutool.core.net.url.UrlBuilder;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.easy.retry.server.common.HttpRequestHandler;
import io.netty.handler.codec.http.HttpHeaders;
/**

View File

@ -49,6 +49,7 @@ public class JobExecutorActor extends AbstractActor {
context.setGroupName(taskExecute.getGroupName());
context.setJobId(job.getId());
context.setTaskType(job.getTaskType());
context.setJob(job);
jobExecutor.execute(context);
}

View File

@ -21,23 +21,17 @@ import java.util.List;
*/
public abstract class AbstractJobExecutor implements JobExecutor, InitializingBean {
@Autowired
private JobMapper jobMapper;
@Override
public void execute(JobExecutorContext context) {
// 生成任务
JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(getTaskInstanceType().getType());
JobTaskGenerateContext instanceGenerateContext = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(context);
instanceGenerateContext.setTaskBatchId(context.getTaskBatchId());
List<JobTask> taskList = taskInstance.generate(instanceGenerateContext);
if (CollectionUtils.isEmpty(taskList)) {
return;
}
Job job = jobMapper.selectById(context.getJobId());
context.setJob(job);
context.setTaskList(taskList);
doExecute(context);

View File

@ -36,6 +36,16 @@ public class JobExecutorContext {
private List<JobTask> taskList;
private Job job;
/**
* 执行方法参数
*/
private String argsStr;
/**
* 参数类型 text/json
*/
private Integer argsType;
private Integer routeKey;
}

View File

@ -115,11 +115,8 @@ public class RealJobExecutorActor extends AbstractActor {
private JobRpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo, RealJobExecutorDTO realJobExecutorDTO) {
return RequestBuilder.<JobRpcClient, Result>newBuilder()
.hostPort(registerNodeInfo.getHostPort())
.groupName(registerNodeInfo.getGroupName())
.hostId(registerNodeInfo.getHostId())
.hostIp(registerNodeInfo.getHostIp())
.contextPath(registerNodeInfo.getContextPath())
.nodeInfo(registerNodeInfo)
.failRetry(Boolean.TRUE)
.retryTimes(realJobExecutorDTO.getMaxRetryTimes())
.retryInterval(realJobExecutorDTO.getRetryInterval())

View File

@ -52,15 +52,12 @@ public class BroadcastTaskGenerator extends AbstractJobTaskGenerator {
return Lists.newArrayList();
}
Job job = jobMapper.selectById(context.getJobId());
List<JobTask> jobTasks = new ArrayList<>(serverNodes.size());
for (RegisterNodeInfo serverNode : serverNodes) {
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientId(serverNode.getHostId());
jobTask.setArgsType(job.getArgsType());
jobTask.setArgsStr(job.getArgsStr());
jobTask.setExtAttrs(job.getExtAttrs());
jobTask.setArgsType(context.getArgsType());
jobTask.setArgsStr(context.getArgsStr());
jobTask.setExecuteStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败"));

View File

@ -45,20 +45,18 @@ public class ClusterTaskGenerator extends AbstractJobTaskGenerator {
@Override
public List<JobTask> doGenerate(JobTaskGenerateContext context) {
// 生成可执行任务
RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(context.getGroupName());
RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(context.getJobId().toString(),
context.getGroupName(), context.getRouteKey());
if (Objects.isNull(serverNode)) {
log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
return Lists.newArrayList();
}
Job job = jobMapper.selectById(context.getJobId());
// 新增任务实例
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientId(serverNode.getHostId());
jobTask.setArgsType(job.getArgsType());
jobTask.setArgsStr(job.getArgsStr());
jobTask.setExtAttrs(job.getExtAttrs());
jobTask.setArgsType(context.getArgsType());
jobTask.setArgsStr(context.getArgsStr());
jobTask.setExecuteStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败"));

View File

@ -12,4 +12,14 @@ public class JobTaskGenerateContext {
private Long taskBatchId;
private String groupName;
private Long jobId;
private Integer routeKey;
/**
* 执行方法参数
*/
private String argsStr;
/**
* 参数类型 text/json
*/
private Integer argsType;
}

View File

@ -55,8 +55,7 @@ public class ShardingTaskGenerator extends AbstractJobTaskGenerator {
return Lists.newArrayList();
}
Job job = jobMapper.selectById(context.getJobId());
String argsStr = job.getArgsStr();
String argsStr = context.getArgsStr();
Map<String, String> split = Splitter.on(";").omitEmptyStrings().withKeyValueSeparator('=').split(argsStr);
List<RegisterNodeInfo> nodeInfoList = new ArrayList<>(serverNodes);
@ -66,9 +65,8 @@ public class ShardingTaskGenerator extends AbstractJobTaskGenerator {
// 新增任务实例
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientId(registerNodeInfo.getHostId());
jobTask.setArgsType(job.getArgsType());
jobTask.setArgsStr(job.getArgsStr());
jobTask.setExtAttrs(job.getExtAttrs());
jobTask.setArgsType(context.getArgsType());
jobTask.setArgsStr(value);
jobTask.setExecuteStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败"));

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.retry.task.client;
import com.aizuda.easy.retry.client.model.DispatchRetryDTO;
import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO;
import com.aizuda.easy.retry.client.model.GenerateRetryIdempotentIdDTO;
import com.aizuda.easy.retry.client.model.RetryCallbackDTO;
import com.aizuda.easy.retry.client.model.StopJobDTO;
import com.aizuda.easy.retry.client.model.request.DispatchJobRequest;
@ -27,4 +28,7 @@ public interface RetryRpcClient {
@Mapping(path = "/retry/callback/v1", method = RequestMethod.POST, failover = true)
Result callback(@Body RetryCallbackDTO retryCallbackDTO);
@Mapping(path = "/retry/generate/idempotent-id/v1", method = RequestMethod.POST)
Result generateIdempotentId(@Body GenerateRetryIdempotentIdDTO retryCallbackDTO);
}

View File

@ -9,6 +9,7 @@ import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
@ -39,8 +40,10 @@ public abstract class AbstractTaskExecutor implements TaskExecutor, Initializing
// 重试次数累加
retryCountIncrement(retryTask);
RetryContext retryContext = builderRetryContext(retryTask.getGroupName(), retryTask);
RetryExecutor executor = builderResultRetryExecutor(retryContext);
SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName());
RetryContext retryContext = builderRetryContext(retryTask.getGroupName(), retryTask, sceneConfig);
RetryExecutor executor = builderResultRetryExecutor(retryContext, sceneConfig);
if (!preCheck(retryContext, executor)) {
return;
@ -75,9 +78,11 @@ public abstract class AbstractTaskExecutor implements TaskExecutor, Initializing
actorRef.tell(retryExecutor, actorRef);
}
protected abstract RetryContext builderRetryContext(String groupName, RetryTask retryTask);
protected abstract RetryContext builderRetryContext(String groupName, RetryTask retryTask,
final SceneConfig sceneConfig);
protected abstract RetryExecutor builderResultRetryExecutor(RetryContext retryContext);
protected abstract RetryExecutor builderResultRetryExecutor(RetryContext retryContext,
final SceneConfig sceneConfig);
protected abstract ActorRef getActorRef();

View File

@ -12,6 +12,7 @@ import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies
import com.aizuda.easy.retry.server.retry.task.support.strategy.StopStrategies;
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
import org.springframework.stereotype.Component;
/**
@ -25,17 +26,20 @@ import org.springframework.stereotype.Component;
public class CallbackTaskExecutor extends AbstractTaskExecutor {
@Override
protected RetryContext builderRetryContext(final String groupName, final RetryTask retryTask) {
protected RetryContext builderRetryContext(final String groupName, final RetryTask retryTask,
final SceneConfig sceneConfig) {
CallbackRetryContext<Result> retryContext = new CallbackRetryContext<>();
retryContext.setRetryTask(retryTask);
retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName));
retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName()));
retryContext.setServerNode(
clientNodeAllocateHandler.getServerNode(retryTask.getSceneName(), retryTask.getGroupName(),
sceneConfig.getRouteKey()));
return retryContext;
}
@Override
protected RetryExecutor builderResultRetryExecutor(RetryContext retryContext) {
protected RetryExecutor builderResultRetryExecutor(RetryContext retryContext, final SceneConfig sceneConfig) {
return RetryBuilder.<Result>newBuilder()
.withStopStrategy(StopStrategies.stopException())
.withStopStrategy(StopStrategies.stopResultStatus())

View File

@ -15,6 +15,7 @@ import com.aizuda.easy.retry.server.retry.task.support.strategy.FilterStrategies
import com.aizuda.easy.retry.server.retry.task.support.strategy.StopStrategies;
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
import org.springframework.stereotype.Component;
/**
@ -28,29 +29,32 @@ import org.springframework.stereotype.Component;
public class ManualCallbackTaskExecutor extends AbstractTaskExecutor {
@Override
protected RetryContext builderRetryContext(final String groupName, final RetryTask retryTask) {
protected RetryContext builderRetryContext(final String groupName, final RetryTask retryTask,
final SceneConfig sceneConfig) {
CallbackRetryContext<Result> retryContext = new CallbackRetryContext<>();
retryContext.setRetryTask(retryTask);
retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName));
retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName()));
retryContext.setServerNode(
clientNodeAllocateHandler.getServerNode(retryTask.getSceneName(), retryTask.getGroupName(),
sceneConfig.getRouteKey()));
return retryContext;
}
@Override
protected RetryExecutor builderResultRetryExecutor(RetryContext retryContext) {
protected RetryExecutor builderResultRetryExecutor(RetryContext retryContext, final SceneConfig sceneConfig) {
return RetryBuilder.<Result>newBuilder()
.withStopStrategy(StopStrategies.stopException())
.withStopStrategy(StopStrategies.stopResultStatus())
.withWaitStrategy(getWaitWaitStrategy())
.withFilterStrategy(FilterStrategies.triggerAtFilter())
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))
.withFilterStrategy(FilterStrategies.sceneBlackFilter())
.withFilterStrategy(FilterStrategies.checkAliveClientPodFilter())
.withFilterStrategy(FilterStrategies.rebalanceFilterStrategies())
.withFilterStrategy(FilterStrategies.rateLimiterFilter())
.withRetryContext(retryContext)
.build();
.withStopStrategy(StopStrategies.stopException())
.withStopStrategy(StopStrategies.stopResultStatus())
.withWaitStrategy(getWaitWaitStrategy())
.withFilterStrategy(FilterStrategies.triggerAtFilter())
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))
.withFilterStrategy(FilterStrategies.sceneBlackFilter())
.withFilterStrategy(FilterStrategies.checkAliveClientPodFilter())
.withFilterStrategy(FilterStrategies.rebalanceFilterStrategies())
.withFilterStrategy(FilterStrategies.rateLimiterFilter())
.withRetryContext(retryContext)
.build();
}
@Override

View File

@ -31,16 +31,19 @@ public class ManualRetryTaskExecutor extends AbstractTaskExecutor {
@Override
protected RetryContext<Result<DispatchRetryResultDTO>> builderRetryContext(final String groupName,
final RetryTask retryTask) {
final RetryTask retryTask,
final SceneConfig sceneConfig) {
MaxAttemptsPersistenceRetryContext<Result<DispatchRetryResultDTO>> retryContext = new MaxAttemptsPersistenceRetryContext<>();
retryContext.setRetryTask(retryTask);
retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName));
retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName()));
return retryContext;
retryContext.setServerNode(
clientNodeAllocateHandler.getServerNode(retryTask.getSceneName(), retryTask.getGroupName(),
sceneConfig.getRouteKey())); return retryContext;
}
@Override
protected RetryExecutor<Result<DispatchRetryResultDTO>> builderResultRetryExecutor(RetryContext retryContext) {
protected RetryExecutor<Result<DispatchRetryResultDTO>> builderResultRetryExecutor(RetryContext retryContext,
final SceneConfig sceneConfig) {
RetryTask retryTask = retryContext.getRetryTask();
return RetryBuilder.<Result>newBuilder()

View File

@ -28,30 +28,34 @@ public class RetryTaskExecutor extends AbstractTaskExecutor {
@Override
protected RetryContext<Result<DispatchRetryResultDTO>> builderRetryContext(final String groupName,
final RetryTask retryTask) {
final RetryTask retryTask,
final SceneConfig sceneConfig) {
MaxAttemptsPersistenceRetryContext<Result<DispatchRetryResultDTO>> retryContext = new MaxAttemptsPersistenceRetryContext<>();
retryContext.setRetryTask(retryTask);
retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName));
retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName()));
retryContext.setServerNode(
clientNodeAllocateHandler.getServerNode(retryTask.getSceneName(), retryTask.getGroupName(),
sceneConfig.getRouteKey()));
return retryContext;
}
@Override
protected RetryExecutor<Result<DispatchRetryResultDTO>> builderResultRetryExecutor(RetryContext retryContext) {
protected RetryExecutor<Result<DispatchRetryResultDTO>> builderResultRetryExecutor(RetryContext retryContext,
final SceneConfig sceneConfig) {
RetryTask retryTask = retryContext.getRetryTask();
return RetryBuilder.<Result<DispatchRetryResultDTO>>newBuilder()
.withStopStrategy(StopStrategies.stopException())
.withStopStrategy(StopStrategies.stopResultStatusCode())
.withWaitStrategy(getWaitWaitStrategy(retryTask.getGroupName(), retryTask.getSceneName()))
.withFilterStrategy(FilterStrategies.triggerAtFilter())
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))
.withFilterStrategy(FilterStrategies.sceneBlackFilter())
.withFilterStrategy(FilterStrategies.checkAliveClientPodFilter())
.withFilterStrategy(FilterStrategies.rebalanceFilterStrategies())
.withFilterStrategy(FilterStrategies.rateLimiterFilter())
.withRetryContext(retryContext)
.build();
.withStopStrategy(StopStrategies.stopException())
.withStopStrategy(StopStrategies.stopResultStatusCode())
.withWaitStrategy(getWaitWaitStrategy(sceneConfig))
.withFilterStrategy(FilterStrategies.triggerAtFilter())
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))
.withFilterStrategy(FilterStrategies.sceneBlackFilter())
.withFilterStrategy(FilterStrategies.checkAliveClientPodFilter())
.withFilterStrategy(FilterStrategies.rebalanceFilterStrategies())
.withFilterStrategy(FilterStrategies.rateLimiterFilter())
.withRetryContext(retryContext)
.build();
}
@Override
@ -59,11 +63,8 @@ public class RetryTaskExecutor extends AbstractTaskExecutor {
return TaskExecutorSceneEnum.AUTO_RETRY;
}
private WaitStrategy getWaitWaitStrategy(String groupName, String sceneName) {
SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(groupName, sceneName);
private WaitStrategy getWaitWaitStrategy(SceneConfig sceneConfig) {
Integer backOff = sceneConfig.getBackOff();
return WaitStrategies.WaitStrategyEnum.getWaitStrategy(backOff);
}

View File

@ -14,7 +14,7 @@ import com.aizuda.easy.retry.server.common.Register;
import com.aizuda.easy.retry.server.retry.task.support.handler.ConfigVersionSyncHandler;
import com.aizuda.easy.retry.server.common.register.ClientRegister;
import com.aizuda.easy.retry.server.common.register.RegisterContext;
import com.aizuda.easy.retry.server.common.handler.HttpRequestHandler;
import com.aizuda.easy.retry.server.common.HttpRequestHandler;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;

View File

@ -6,11 +6,14 @@ import com.aizuda.easy.retry.client.model.GenerateRetryIdempotentIdDTO;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.client.RequestBuilder;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.enums.TaskGeneratorScene;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO;
import com.aizuda.easy.retry.server.retry.task.client.RetryRpcClient;
import com.aizuda.easy.retry.server.retry.task.generator.task.TaskContext;
import com.aizuda.easy.retry.server.retry.task.generator.task.TaskGenerator;
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
@ -37,6 +40,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLog
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLogMessage;
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
import com.aizuda.easy.retry.template.datasource.utils.RequestDataHelper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
@ -55,6 +59,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -202,21 +207,34 @@ public class RetryTaskServiceImpl implements RetryTaskService {
@Override
public String idempotentIdGenerate(final GenerateRetryIdempotentIdVO generateRetryIdempotentIdVO) {
RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(generateRetryIdempotentIdVO.getGroupName());
Assert.notNull(serverNode, () -> new EasyRetryServerException("生成idempotentId失败: 不存在活跃的客户端节点"));
Set<RegisterNodeInfo> serverNodes = CacheRegisterTable.getServerNodeSet(generateRetryIdempotentIdVO.getGroupName());
Assert.notEmpty(serverNodes, () -> new EasyRetryServerException("生成idempotentId失败: 不存在活跃的客户端节点"));
SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess()
.getSceneConfigByGroupNameAndSceneName(generateRetryIdempotentIdVO.getGroupName(),
generateRetryIdempotentIdVO.getSceneName());
RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(sceneConfig.getSceneName(),
sceneConfig.getGroupName(), sceneConfig.getRouteKey());
// 委托客户端生成idempotentId
String url = MessageFormat
.format(URL, serverNode.getHostIp(), serverNode.getHostPort().toString(), serverNode.getContextPath());
GenerateRetryIdempotentIdDTO generateRetryIdempotentIdDTO = new GenerateRetryIdempotentIdDTO();
generateRetryIdempotentIdDTO.setGroup(generateRetryIdempotentIdVO.getGroupName());
generateRetryIdempotentIdDTO.setScene(generateRetryIdempotentIdVO.getSceneName());
generateRetryIdempotentIdDTO.setArgsStr(generateRetryIdempotentIdVO.getArgsStr());
generateRetryIdempotentIdDTO.setExecutorName(generateRetryIdempotentIdVO.getExecutorName());
HttpEntity<GenerateRetryIdempotentIdDTO> requestEntity = new HttpEntity<>(generateRetryIdempotentIdDTO);
Result result = restTemplate.postForObject(url, requestEntity, Result.class);
RetryRpcClient rpcClient = RequestBuilder.<RetryRpcClient, Result>newBuilder()
.hostPort(serverNode.getHostPort())
.groupName(serverNode.getGroupName())
.hostId(serverNode.getHostId())
.hostIp(serverNode.getHostIp())
.contextPath(serverNode.getContextPath())
.client(RetryRpcClient.class)
.build();
Result result = rpcClient.generateIdempotentId(generateRetryIdempotentIdDTO);
Assert.notNull(result, () -> new EasyRetryServerException("idempotentId生成失败"));
Assert.isTrue(1 == result.getStatus(), () -> new EasyRetryServerException("idempotentId生成失败:请确保参数与执行器名称正确"));