feat: 2.1.0

1. 优化滑动窗口
2. 异步上报提供参数配置
3. 上报数据服务端支持失败重试
4. 重试任务和回调任务扫描改为以id作为偏移进行滚动扫描
5. 客户端回调接口支持SpringBean和普通类模式
6. 负载均衡器新增负载均衡周期时间配置
This commit is contained in:
byteblogs168 2023-07-28 17:39:45 +08:00
parent 9f10a0d500
commit 2cd71f6e16
23 changed files with 303 additions and 128 deletions

View File

@ -16,7 +16,6 @@
<properties>
<java.version>1.8</java.version>
<guava-retrying.version>2.0.0</guava-retrying.version>
</properties>
<dependencies>
@ -38,7 +37,6 @@
<dependency>
<groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId>
<version>${guava-retrying.version}</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>

View File

@ -26,6 +26,8 @@ import com.aizuda.easy.retry.client.model.DispatchRetryDTO;
import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO;
import com.aizuda.easy.retry.client.model.RetryCallbackDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.util.ReflectionUtils;
@ -60,7 +62,7 @@ public class RetryEndPoint {
* 服务端调度重试入口
*/
@PostMapping("/dispatch/v1")
public Result<DispatchRetryResultDTO> dispatch(@RequestBody @Validated DispatchRetryDTO executeReqDto) {
public Result<DispatchRetryResultDTO> dispatch(@RequestBody @Validated DispatchRetryDTO executeReqDto) {
RetryerInfo retryerInfo = RetryerInfoCache.get(executeReqDto.getScene(), executeReqDto.getExecutorName());
if (Objects.isNull(retryerInfo)) {
@ -71,7 +73,8 @@ public class RetryEndPoint {
Object[] deSerialize = null;
try {
deSerialize = (Object[]) retryArgSerializer.deSerialize(executeReqDto.getArgsStr(), retryerInfo.getExecutor().getClass(), retryerInfo.getMethod());
deSerialize = (Object[]) retryArgSerializer.deSerialize(executeReqDto.getArgsStr(),
retryerInfo.getExecutor().getClass(), retryerInfo.getMethod());
} catch (JsonProcessingException e) {
throw new EasyRetryClientException("参数解析异常", e);
}
@ -83,7 +86,8 @@ public class RetryEndPoint {
HttpServletRequest request = Objects.requireNonNull(attributes).getRequest();
request.setAttribute("attemptNumber", executeReqDto.getRetryCount());
RetryerResultContext retryerResultContext = retryStrategy.openRetry(executeReqDto.getScene(), executeReqDto.getExecutorName(), deSerialize);
RetryerResultContext retryerResultContext = retryStrategy.openRetry(executeReqDto.getScene(),
executeReqDto.getExecutorName(), deSerialize);
if (RetrySiteSnapshot.isRetryForStatusCode()) {
executeRespDto.setStatusCode(RetryResultStatusEnum.STOP.getStatus());
@ -127,25 +131,87 @@ public class RetryEndPoint {
RetryArgSerializer retryArgSerializer = new JacksonSerializer();
Object[] deSerialize = null;
Object[] deSerialize;
try {
deSerialize = (Object[]) retryArgSerializer.deSerialize(callbackDTO.getArgsStr(), retryerInfo.getExecutor().getClass(), retryerInfo.getMethod());
deSerialize = (Object[]) retryArgSerializer.deSerialize(callbackDTO.getArgsStr(),
retryerInfo.getExecutor().getClass(), retryerInfo.getMethod());
} catch (JsonProcessingException e) {
throw new EasyRetryClientException("参数解析异常", e);
}
try {
// 以Spring Bean模式回调
return doCallbackForSpringBean(callbackDTO, retryerInfo, deSerialize);
} catch (NoSuchBeanDefinitionException e) {
// 若不是SpringBean 则直接反射以普通类调用
return doCallbackForOrdinaryClass(callbackDTO, retryerInfo, deSerialize);
}
}
/**
* 以普通类进行回调
*
* @param callbackDTO {@link RetryCallbackDTO} 服务端调度重试入参
* @param retryerInfo {@link RetryerInfo} 定义重试场景的信息
* @param deSerialize 参数信息
* @return Result
*/
private Result doCallbackForOrdinaryClass(RetryCallbackDTO callbackDTO, RetryerInfo retryerInfo,
Object[] deSerialize) {
Class<? extends RetryCompleteCallback> retryCompleteCallbackClazz = retryerInfo.getRetryCompleteCallback();
try {
RetryCompleteCallback retryCompleteCallback = retryCompleteCallbackClazz.newInstance();
Method method;
switch (Objects.requireNonNull(RetryStatusEnum.getByStatus(callbackDTO.getRetryStatus()))) {
case FINISH:
method = retryCompleteCallbackClazz.getMethod("doSuccessCallback", String.class, String.class,
Object[].class);
break;
case MAX_COUNT:
method = retryCompleteCallbackClazz.getMethod("doMaxRetryCallback", String.class, String.class,
Object[].class);
break;
default:
throw new EasyRetryClientException("回调状态异常");
}
Assert.notNull(method, () -> new EasyRetryClientException("no such method"));
ReflectionUtils.invokeMethod(method, retryCompleteCallback, retryerInfo.getScene(),
retryerInfo.getExecutorClassName(), deSerialize);
return new Result(1, "回调成功");
} catch (Exception ex) {
return new Result(0, ex.getMessage());
}
}
/**
* 以Spring Bean模式回调
*
* @param callbackDTO {@link RetryCallbackDTO} 服务端调度重试入参
* @param retryerInfo {@link RetryerInfo} 定义重试场景的信息
* @param deSerialize 参数信息
* @return Result
*/
private Result doCallbackForSpringBean(RetryCallbackDTO callbackDTO, RetryerInfo retryerInfo, Object[] deSerialize) {
Class<? extends RetryCompleteCallback> retryCompleteCallbackClazz = retryerInfo.getRetryCompleteCallback();
RetryCompleteCallback retryCompleteCallback = SpringContext.getBeanByType(retryCompleteCallbackClazz);
if (RetryStatusEnum.FINISH.getStatus().equals(callbackDTO.getRetryStatus())) {
retryCompleteCallback.doSuccessCallback(retryerInfo.getScene(), retryerInfo.getExecutorClassName(), deSerialize);
switch (Objects.requireNonNull(RetryStatusEnum.getByStatus(callbackDTO.getRetryStatus()))) {
case FINISH:
retryCompleteCallback.doSuccessCallback(retryerInfo.getScene(), retryerInfo.getExecutorClassName(),
deSerialize);
break;
case MAX_COUNT:
retryCompleteCallback.doMaxRetryCallback(retryerInfo.getScene(), retryerInfo.getExecutorClassName(),
deSerialize);
break;
default:
throw new EasyRetryClientException("回调状态异常");
}
if (RetryStatusEnum.MAX_COUNT.getStatus().equals(callbackDTO.getRetryStatus())) {
retryCompleteCallback.doMaxRetryCallback(retryerInfo.getScene(), retryerInfo.getExecutorClassName(), deSerialize);
}
return new Result();
return new Result(1, "回调成功");
}
/**
@ -155,14 +221,16 @@ public class RetryEndPoint {
* @return idempotentId
*/
@PostMapping("/generate/idempotent-id/v1")
public Result<String> idempotentIdGenerate(@RequestBody @Validated GenerateRetryIdempotentIdDTO generateRetryIdempotentIdDTO) {
public Result<String> idempotentIdGenerate(
@RequestBody @Validated GenerateRetryIdempotentIdDTO generateRetryIdempotentIdDTO) {
String scene = generateRetryIdempotentIdDTO.getScene();
String executorName = generateRetryIdempotentIdDTO.getExecutorName();
String argsStr = generateRetryIdempotentIdDTO.getArgsStr();
RetryerInfo retryerInfo = RetryerInfoCache.get(scene, executorName);
Assert.notNull(retryerInfo, ()-> new EasyRetryClientException("重试信息不存在 scene:[{}] executorName:[{}]", scene, executorName));
Assert.notNull(retryerInfo,
() -> new EasyRetryClientException("重试信息不存在 scene:[{}] executorName:[{}]", scene, executorName));
Method executorMethod = retryerInfo.getMethod();
@ -170,7 +238,8 @@ public class RetryEndPoint {
Object[] deSerialize = null;
try {
deSerialize = (Object[]) retryArgSerializer.deSerialize(argsStr, retryerInfo.getExecutor().getClass(), retryerInfo.getMethod());
deSerialize = (Object[]) retryArgSerializer.deSerialize(argsStr, retryerInfo.getExecutor().getClass(),
retryerInfo.getMethod());
} catch (JsonProcessingException e) {
throw new EasyRetryClientException("参数解析异常", e);
}
@ -180,7 +249,8 @@ public class RetryEndPoint {
Class<? extends IdempotentIdGenerate> idempotentIdGenerate = retryerInfo.getIdempotentIdGenerate();
IdempotentIdGenerate generate = idempotentIdGenerate.newInstance();
Method method = idempotentIdGenerate.getMethod("idGenerate", IdempotentIdContext.class);
IdempotentIdContext idempotentIdContext = new IdempotentIdContext(scene, executorName, deSerialize, executorMethod.getName());
IdempotentIdContext idempotentIdContext = new IdempotentIdContext(scene, executorName, deSerialize,
executorMethod.getName());
idempotentId = (String) ReflectionUtils.invokeMethod(method, generate, idempotentIdContext);
} catch (Exception exception) {
LogUtils.error(log, "幂等id生成异常{},{}", scene, argsStr, exception);

View File

@ -1,18 +1,21 @@
package com.aizuda.easy.retry.client.core.config;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.util.HostUtils;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
/**
* easy retry 客户端配置
*
* @author: www.byteblogs.com
* @date : 2022-03-04 15:53
* @since 1.0.0
*/
@Configuration
@ConfigurationProperties(prefix = "easy-retry")
@ -35,6 +38,11 @@ public class EasyRetryProperties {
*/
private Integer port;
/**
* 远程上报滑动窗口配置
*/
private SlidingWindowConfig slidingWindow = new SlidingWindowConfig();
/**
* 服务端配置
*/
@ -53,6 +61,31 @@ public class EasyRetryProperties {
private int port = 1788;
}
@Data
public static class SlidingWindowConfig {
/**
* 总量窗口期阈值
*/
private int totalThreshold = 50;
/**
* 窗口数量预警
*/
private int windowTotalThreshold = 150;
/**
* 窗口期时间长度
*/
private long duration = 10;
/**
* 窗口期单位
*/
private ChronoUnit chronoUnit = ChronoUnit.SECONDS;
}
public static String getGroup() {
EasyRetryProperties properties = SpringContext.CONTEXT.getBean(EasyRetryProperties.class);
return Objects.requireNonNull(properties).group;

View File

@ -1,16 +1,15 @@
package com.aizuda.easy.retry.client.core.report;
import com.aizuda.easy.retry.client.core.Lifecycle;
import com.aizuda.easy.retry.client.core.config.EasyRetryProperties;
import com.aizuda.easy.retry.client.core.config.EasyRetryProperties.SlidingWindowConfig;
import com.aizuda.easy.retry.client.core.retryer.RetryerInfo;
import com.aizuda.easy.retry.client.core.window.RetryLeapArray;
import com.aizuda.easy.retry.client.core.window.SlidingWindow;
import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
@ -23,8 +22,10 @@ import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class AsyncReport extends AbstractReport implements Lifecycle {
private static SlidingWindow<RetryTaskDTO> slidingWindow;
@Autowired
private EasyRetryProperties easyRetryProperties;
private SlidingWindow<RetryTaskDTO> slidingWindow;
@Override
public boolean supports(boolean async) {
return async;
@ -33,7 +34,8 @@ public class AsyncReport extends AbstractReport implements Lifecycle {
@Override
public boolean doReport(RetryerInfo retryerInfo, Object[] params) {
return syncReport(retryerInfo.getScene(), retryerInfo.getExecutorClassName(), params, retryerInfo.getTimeout(), retryerInfo.getUnit());
return syncReport(retryerInfo.getScene(), retryerInfo.getExecutorClassName(), params, retryerInfo.getTimeout(),
retryerInfo.getUnit());
}
/**
@ -46,17 +48,21 @@ public class AsyncReport extends AbstractReport implements Lifecycle {
return Boolean.TRUE;
}
@Override
public void start() {
SlidingWindowConfig slidingWindowConfig = easyRetryProperties.getSlidingWindow();
slidingWindow = SlidingWindow
.Builder
.<RetryTaskDTO>newBuilder()
.withTotalThreshold(10)
.withDuration(5, ChronoUnit.SECONDS)
.withListener(new ReportListener())
.build();
.Builder
.<RetryTaskDTO>newBuilder()
.withTotalThreshold(slidingWindowConfig.getTotalThreshold())
.withWindowTotalThreshold(slidingWindowConfig.getWindowTotalThreshold())
.withDuration(slidingWindowConfig.getDuration(), slidingWindowConfig.getChronoUnit())
.withListener(new ReportListener())
.build();
slidingWindow.start();
}
@Override

View File

@ -61,7 +61,15 @@ public class SlidingWindow<T> {
*/
private final ChronoUnit chronoUnit;
public static final ReentrantLock lock = new ReentrantLock();//创建锁对象
/**
* 新增窗口锁
*/
private static final ReentrantLock SAVE_LOCK = new ReentrantLock();
/**
* 到达时间窗口期或者总量窗口期锁
*/
private static final ReentrantLock NOTICE_LOCK = new ReentrantLock();
public SlidingWindow(int totalThreshold,
int windowTotalThreshold,
@ -87,7 +95,7 @@ public class SlidingWindow<T> {
LocalDateTime now = LocalDateTime.now();
if (isOpenNewWindow(now)) {
lock.lock();
SAVE_LOCK.lock();
LocalDateTime windowPeriod = now.plus(duration, chronoUnit);
try {
@ -111,7 +119,7 @@ public class SlidingWindow<T> {
}
} finally {
lock.unlock();
SAVE_LOCK.unlock();
}
} else {
@ -167,7 +175,7 @@ public class SlidingWindow<T> {
*/
private void doHandlerListener(LocalDateTime windowPeriod) {
lock.lock();
NOTICE_LOCK.lock();
try {
@ -191,7 +199,7 @@ public class SlidingWindow<T> {
} catch (Exception e) {
log.error("到达总量窗口期通知异常", e);
} finally {
lock.unlock();
NOTICE_LOCK.unlock();
}
}
@ -397,6 +405,7 @@ public class SlidingWindow<T> {
* @return this
*/
public Builder<T> withDuration(long duration, ChronoUnit chronoUnit) {
Assert.isTrue(duration > 0, "窗口期不能小于0");
this.duration = duration;
this.chronoUnit = chronoUnit;
return this;
@ -421,7 +430,7 @@ public class SlidingWindow<T> {
public SlidingWindow<T> build() {
if (Objects.isNull(threadPoolExecutor)) {
threadPoolExecutor = Executors
.newSingleThreadScheduledExecutor(r -> new Thread(r, "SlidingWindowThread"));
.newSingleThreadScheduledExecutor(r -> new Thread(r, "sliding-window-thread"));
}
if (CollectionUtils.isEmpty(listeners)) {

View File

@ -127,6 +127,10 @@
<artifactId>perf4j</artifactId>
<version>${perf4j.version}</version>
</dependency>
<dependency>
<groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -92,7 +92,7 @@ public class ActorGenerator {
* @return actor 引用
*/
public static ActorRef logActor() {
return getNettyActorSystem().actorOf(getSpringExtension().props(LogActor.BEAN_NAME));
return getLogActorSystemSystem().actorOf(getSpringExtension().props(LogActor.BEAN_NAME));
}
/**
@ -101,7 +101,7 @@ public class ActorGenerator {
* @return actor 引用
*/
public static ActorRef requestHandlerActor() {
return getLogActorSystemSystem().actorOf(getSpringExtension().props(RequestHandlerActor.BEAN_NAME));
return getNettyActorSystem().actorOf(getSpringExtension().props(RequestHandlerActor.BEAN_NAME));
}
public static SpringExtension getSpringExtension() {

View File

@ -19,7 +19,7 @@ public class AkkaConfiguration {
private static final String DISPATCH_EXEC_UNIT_RETRY_ACTOR_SYSTEM = "DISPATCH_EXEC_UNIT_RETRY_ACTOR_SYSTEM";
private static final String DISPATCH_RESULT_ACTOR_SYSTEM = "DISPATCH_RESULT_ACTOR_SYSTEM";
private static final String LOG_ACTOR_SYSTEM = "LOG_ACTOR_SYSTEM";
private static final String NETTY_ACTOR_SYSTEM = "nettyActorSystem";
private static final String NETTY_ACTOR_SYSTEM = "NETTY_ACTOR_SYSTEM";
@Autowired
private ApplicationContext applicationContext;

View File

@ -39,7 +39,7 @@ public class SystemProperties {
/**
* 一个客户端每秒最多接收的重试数量指令
*/
private int limiter = 10;
private int limiter = 100;
/**
* 号段模式下步长配置
@ -57,6 +57,11 @@ public class SystemProperties {
*/
private DbTypeEnum dbType = DbTypeEnum.MYSQL;
/**
* 负载均衡周期时间
*/
private int loadBalanceCycleTime = 10;
/**
* 回调配置
*/

View File

@ -14,7 +14,7 @@ public interface RetryTaskAccess<T> {
/**
* 批量查询重试任务
*/
List<T> listAvailableTasks(String groupName, LocalDateTime lastAt, Integer pageSize, Integer taskType);
List<T> listAvailableTasks(String groupName, LocalDateTime lastAt, final Long lastId, Integer pageSize, Integer taskType);
List<T> listRetryTaskByRetryCount(String groupName, Integer retryStatus);

View File

@ -30,31 +30,35 @@ public class MybatisRetryTaskAccess extends AbstractRetryTaskAccess {
}
@Override
public List<RetryTask> listAvailableTasks(String groupName, LocalDateTime lastAt, Integer pageSize, Integer taskType) {
public List<RetryTask> listAvailableTasks(String groupName, LocalDateTime lastAt, Long lastId, Integer pageSize,
Integer taskType) {
setPartition(groupName);
return retryTaskMapper.selectPage(new PageDTO<>(0, pageSize),
new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())
.eq(RetryTask::getGroupName, groupName)
.eq(RetryTask::getTaskType, taskType)
.gt(RetryTask::getCreateDt, lastAt)
.orderByAsc(RetryTask::getCreateDt)).getRecords();
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())
.eq(RetryTask::getGroupName, groupName)
.eq(RetryTask::getTaskType, taskType)
.gt(RetryTask::getId, lastId)
.gt(RetryTask::getCreateDt, lastAt)
.orderByAsc(RetryTask::getId)
.orderByAsc(RetryTask::getCreateDt))
.getRecords();
}
@Override
public List<RetryTask> listRetryTaskByRetryCount(String groupName, Integer retryStatus) {
setPartition(groupName);
return retryTaskMapper.selectPage(new PageDTO<>(0, 1000),
new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getRetryStatus, retryStatus)
.eq(RetryTask::getGroupName, groupName)).getRecords();
new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getRetryStatus, retryStatus)
.eq(RetryTask::getGroupName, groupName)).getRecords();
}
@Override
public int deleteByDelayLevel(String groupName, Integer retryStatus) {
setPartition(groupName);
return retryTaskMapper.delete(new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getGroupName, groupName).eq(RetryTask::getRetryStatus, retryStatus));
.eq(RetryTask::getGroupName, groupName).eq(RetryTask::getRetryStatus, retryStatus));
}
@Override

View File

@ -30,8 +30,8 @@ public class RetryTaskAccessProcessor implements RetryTaskAccess<RetryTask> {
* 批量查询重试任务
*/
@Override
public List<RetryTask> listAvailableTasks(String groupName, LocalDateTime lastAt, Integer pageSize, Integer taskType) {
return retryTaskAccesses.listAvailableTasks(groupName, lastAt, pageSize, taskType);
public List<RetryTask> listAvailableTasks(String groupName, LocalDateTime lastAt, final Long lastId, Integer pageSize, Integer taskType) {
return retryTaskAccesses.listAvailableTasks(groupName, lastAt, lastId, pageSize, taskType);
}
@Override

View File

@ -1,45 +1,34 @@
package com.aizuda.easy.retry.server.server;
import akka.actor.AbstractActor;
import akka.actor.OneForOneStrategy;
import akka.actor.SupervisorStrategy;
import cn.hutool.core.net.url.UrlBuilder;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.enums.HeadersEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.model.EasyRetryRequest;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.dto.NettyHttpRequest;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.server.handler.HttpRequestHandler;
import com.aizuda.easy.retry.server.support.Register;
import com.aizuda.easy.retry.server.support.dispatch.actor.log.LogActor;
import com.aizuda.easy.retry.server.support.handler.ConfigVersionSyncHandler;
import com.aizuda.easy.retry.server.support.register.ClientRegister;
import com.aizuda.easy.retry.server.support.register.RegisterContext;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.dao.ConcurrencyFailureException;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionSystemException;
import scala.concurrent.duration.Duration;
import java.util.Collection;
@ -57,25 +46,10 @@ public class RequestHandlerActor extends AbstractActor {
public static final String BEAN_NAME = "requestHandlerActor";
private static final SupervisorStrategy STRATEGY =
new OneForOneStrategy(5, Duration.create("1 minute"),
throwable -> {
if (throwable instanceof DuplicateKeyException
|| throwable instanceof TransactionSystemException
|| throwable instanceof ConcurrencyFailureException) {
LogUtils.error(log, "RequestHandlerActor handler exception", throwable);
return SupervisorStrategy.restart();
}else {
return SupervisorStrategy.escalate();
}
});
@Override
public Receive createReceive() {
return receiveBuilder().match(NettyHttpRequest.class, nettyHttpRequest -> {
final String uri = nettyHttpRequest.getUri();
if (StringUtils.isBlank(uri)) {
LogUtils.error(log, "uri can not be null");
@ -89,19 +63,22 @@ public class RequestHandlerActor extends AbstractActor {
final String content = nettyHttpRequest.getContent();
final HttpHeaders headers = nettyHttpRequest.getHeaders();
String result = doProcess(uri, content, method, headers);
String result = "";
try {
result = doProcess(uri, content, method, headers);
} catch (Exception e) {
LogUtils.error(log, "http request error. [{}]", nettyHttpRequest.getContent(), e);
result = JsonUtil.toJsonString(new Result<>(0, e.getMessage()));
throw e;
} finally {
writeResponse(channelHandlerContext, keepAlive, result);
getContext().stop(getSelf());
}
writeResponse(channelHandlerContext, keepAlive, result);
}).build();
}
@Override
public SupervisorStrategy supervisorStrategy() {
return STRATEGY;
}
private String doProcess(String uri, String content, HttpMethod method,
HttpHeaders headers) {

View File

@ -13,16 +13,29 @@ import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO;
import com.aizuda.easy.retry.server.service.convert.TaskContextConverter;
import com.aizuda.easy.retry.server.support.generator.TaskGenerator;
import com.aizuda.easy.retry.server.support.generator.task.TaskContext;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.ConcurrencyFailureException;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionSystemException;
import org.springframework.transaction.annotation.Transactional;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.aizuda.easy.retry.common.core.constant.SystemConstants.HTTP_PATH.BATCH_REPORT;
@ -72,20 +85,55 @@ public class ReportRetryInfoHttpRequestHandler extends PostHttpRequestHandler {
Map<String, List<RetryTaskDTO>> map = retryTaskList.stream().collect(Collectors.groupingBy(RetryTaskDTO::getSceneName));
map.forEach(((sceneName, retryTaskDTOS) -> {
TaskContext taskContext = new TaskContext();
taskContext.setSceneName(sceneName);
taskContext.setGroupName(set.stream().findFirst().get());
taskContext.setTaskInfos(TaskContextConverter.INSTANCE.toTaskContextInfo(retryTaskList));
Retryer<Object> retryer = RetryerBuilder.newBuilder()
.retryIfException(throwable -> {
// 若是数据库异常则重试
if (throwable instanceof DuplicateKeyException
|| throwable instanceof TransactionSystemException
|| throwable instanceof ConcurrencyFailureException
|| throwable instanceof IOException) {
return true;
}
return false;
})
.withStopStrategy(StopStrategies.stopAfterAttempt(5))
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))
.withRetryListener(new RetryListener() {
@Override
public <V> void onRetry(final Attempt<V> attempt) {
if (attempt.hasException()) {
LogUtils.error(log, "数据上报发生异常执行重试. reqId:[{}] count:[{}]",
retryRequest.getReqId(), attempt.getAttemptNumber(), attempt.getExceptionCause());
}
}
})
.build();
// 生成任务
taskGenerator.taskGenerator(taskContext);
}));
retryer.call(() -> {
map.forEach(((sceneName, retryTaskDTOS) -> {
TaskContext taskContext = new TaskContext();
taskContext.setSceneName(sceneName);
taskContext.setGroupName(set.stream().findFirst().get());
taskContext.setTaskInfos(TaskContextConverter.INSTANCE.toTaskContextInfo(retryTaskList));
// 生成任务
taskGenerator.taskGenerator(taskContext);
}));
return null;
});
return JsonUtil.toJsonString(new NettyResult(StatusEnum.YES.getStatus(), "Batch Retry Data Upload Processed Successfully", Boolean.TRUE, retryRequest.getReqId()));
} catch (Exception e) {
LogUtils.error(log, "Batch Report Retry Data Error. <|>{}<|>", args[0], e);
return JsonUtil.toJsonString(new NettyResult(StatusEnum.YES.getStatus(), e.getMessage(), Boolean.FALSE, retryRequest.getReqId()));
Throwable throwable = e;
if (e.getClass().isAssignableFrom(RetryException.class)) {
RetryException re = (RetryException) e;
throwable = re.getLastFailedAttempt().getExceptionCause();
}
LogUtils.error(log, "Batch Report Retry Data Error. <|>{}<|>", args[0], throwable);
return JsonUtil.toJsonString(new NettyResult(StatusEnum.YES.getStatus(), throwable.getMessage(), Boolean.FALSE, retryRequest.getReqId()));
}
}
}

View File

@ -2,12 +2,14 @@ package com.aizuda.easy.retry.server.support.dispatch.actor.result;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.akka.ActorGenerator;
import com.aizuda.easy.retry.server.config.SystemProperties;
import com.aizuda.easy.retry.server.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
import com.aizuda.easy.retry.server.persistence.mybatis.po.SceneConfig;
import com.aizuda.easy.retry.server.persistence.support.ConfigAccess;
@ -78,7 +80,9 @@ public class FailureActor extends AbstractActor {
callbackRetryTaskHandler.create(retryTask);
}
retryTaskAccess.updateRetryTask(retryTask);
Assert.isTrue(1 == retryTaskAccess.updateRetryTask(retryTask), () ->
new EasyRetryServerException("更新重试任务失败. groupName:[{}] uniqueId:[{}]",
retryTask.getGroupName(), retryTask.getUniqueId()));
}
});
} catch (Exception e) {

View File

@ -2,9 +2,11 @@ package com.aizuda.easy.retry.server.support.dispatch.actor.result;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.server.akka.ActorGenerator;
import com.aizuda.easy.retry.server.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.support.dispatch.actor.log.RetryTaskLogDTO;
import com.aizuda.easy.retry.server.support.handler.CallbackRetryTaskHandler;
import com.aizuda.easy.retry.common.core.log.LogUtils;
@ -55,7 +57,9 @@ public class FinishActor extends AbstractActor {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
retryTaskAccess.updateRetryTask(retryTask);
Assert.isTrue(1 == retryTaskAccess.updateRetryTask(retryTask), () ->
new EasyRetryServerException("更新重试任务失败. groupName:[{}] uniqueId:[{}]",
retryTask.getGroupName(), retryTask.getUniqueId()));
// 创建一个回调任务
callbackRetryTaskHandler.create(retryTask);

View File

@ -1,7 +1,9 @@
package com.aizuda.easy.retry.server.support.dispatch.actor.result;
import akka.actor.AbstractActor;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess;
import com.aizuda.easy.retry.server.support.RetryContext;
@ -44,7 +46,9 @@ public class NoRetryActor extends AbstractActor {
// 不更新重试次数
retryTask.setRetryCount(null);
try {
retryTaskAccess.updateRetryTask(retryTask);
Assert.isTrue(1 == retryTaskAccess.updateRetryTask(retryTask), () ->
new EasyRetryServerException("更新重试任务失败. groupName:[{}] uniqueId:[{}]",
retryTask.getGroupName(), retryTask.getUniqueId()));
}catch (Exception e) {
LogUtils.error(log,"更新重试任务失败", e);
} finally {

View File

@ -61,19 +61,19 @@ public abstract class AbstractScanGroup extends AbstractActor {
protected void doScan(final ScanTaskDTO scanTaskDTO) {
LocalDateTime defLastAt = LocalDateTime.now().minusDays(systemProperties.getLastDays());
LocalDateTime lastAt = LocalDateTime.now().minusDays(systemProperties.getLastDays());
int retryPullPageSize = systemProperties.getRetryPullPageSize();
String groupName = scanTaskDTO.getGroupName();
LocalDateTime lastAt = Optional.ofNullable(getLastAt(groupName)).orElse(defLastAt);
Long lastId = Optional.ofNullable(getLastId(groupName)).orElse(0L);
// 扫描当前Group 重试的数据
List<RetryTask> list = retryTaskAccessProcessor.listAvailableTasks(groupName, lastAt, systemProperties.getRetryPullPageSize(),
// 扫描当前Group 处理的任务
List<RetryTask> list = retryTaskAccessProcessor.listAvailableTasks(groupName, lastAt, lastId, retryPullPageSize,
getTaskType());
if (!CollectionUtils.isEmpty(list)) {
// 更新拉取的最大的创建时间
putLastAt(scanTaskDTO.getGroupName(), list.get(list.size() - 1).getCreateDt());
// 更新拉取的最大的id
putLastId(scanTaskDTO.getGroupName(), list.get(list.size() - 1).getId());
for (RetryTask retryTask : list) {
@ -97,7 +97,7 @@ public abstract class AbstractScanGroup extends AbstractActor {
Thread.currentThread().interrupt();
}
putLastAt(groupName, defLastAt);
putLastId(groupName, 0L);
}
}
@ -108,9 +108,9 @@ public abstract class AbstractScanGroup extends AbstractActor {
protected abstract Integer getTaskType();
protected abstract LocalDateTime getLastAt(String groupName);
protected abstract Long getLastId(String groupName);
protected abstract LocalDateTime putLastAt(String groupName, LocalDateTime LocalDateTime);
protected abstract void putLastId(String groupName, Long lastId);
private void retryCountIncrement(RetryTask retryTask) {
Integer retryCount = retryTask.getRetryCount();

View File

@ -35,11 +35,11 @@ public class ScanCallbackGroupActor extends AbstractScanGroup {
public static final String BEAN_NAME = "ScanCallbackGroupActor";
/**
* 缓存待拉取数据的起点时间
* 缓存待拉取数据的起点id
* <p>
* LAST_AT_MAP[key] = groupName LAST_AT_MAP[value] = retry_task的 create_at时间
* LAST_AT_MAP[key] = groupName LAST_AT_MAP[value] = retry_task的 id
*/
public static final ConcurrentMap<String, LocalDateTime> LAST_AT_MAP = new ConcurrentHashMap<>();
private static final ConcurrentMap<String, Long> LAST_AT_MAP = new ConcurrentHashMap<>();
@Override
protected RetryContext builderRetryContext(final String groupName, final RetryTask retryTask) {
@ -73,13 +73,13 @@ public class ScanCallbackGroupActor extends AbstractScanGroup {
}
@Override
protected LocalDateTime getLastAt(final String groupName) {
protected Long getLastId(final String groupName) {
return LAST_AT_MAP.get(groupName);
}
@Override
protected LocalDateTime putLastAt(final String groupName, final LocalDateTime LocalDateTime) {
return LAST_AT_MAP.put(groupName, LocalDateTime);
protected void putLastId(final String groupName, final Long lastId) {
LAST_AT_MAP.put(groupName, lastId);
}
private WaitStrategy getWaitWaitStrategy() {

View File

@ -36,11 +36,11 @@ public class ScanGroupActor extends AbstractScanGroup {
public static final String BEAN_NAME = "ScanGroupActor";
/**
* 缓存待拉取数据的起点时间
* 缓存待拉取数据的起点id
* <p>
* LAST_AT_MAP[key] = groupName LAST_AT_MAP[value] = retry_task的 create_at时间
* LAST_AT_MAP[key] = groupName LAST_AT_MAP[value] = retry_task的id
*/
public static final ConcurrentMap<String, LocalDateTime> LAST_AT_MAP = new ConcurrentHashMap<>();
private static final ConcurrentMap<String, Long> LAST_AT_MAP = new ConcurrentHashMap<>();
@Override
protected RetryContext<Result<DispatchRetryResultDTO>> builderRetryContext(final String groupName,
@ -76,15 +76,16 @@ public class ScanGroupActor extends AbstractScanGroup {
}
@Override
protected LocalDateTime getLastAt(final String groupName) {
protected Long getLastId(final String groupName) {
return LAST_AT_MAP.get(groupName);
}
@Override
protected LocalDateTime putLastAt(final String groupName, final LocalDateTime LocalDateTime) {
return LAST_AT_MAP.put(groupName, LocalDateTime);
protected void putLastId(final String groupName, final Long lastId) {
LAST_AT_MAP.put(groupName, lastId);
}
private WaitStrategy getWaitWaitStrategy(String groupName, String sceneName) {
SceneConfig sceneConfig = configAccess.getSceneConfigByGroupNameAndSceneName(groupName, sceneName);

View File

@ -104,7 +104,7 @@ public class ConfigVersionSyncHandler implements Lifecycle, Runnable {
} finally {
try {
// 防止刷的过快休眠1s
TimeUnit.MILLISECONDS.sleep(1000);
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}

View File

@ -3,6 +3,7 @@ package com.aizuda.easy.retry.server.support.handler;
import cn.hutool.core.lang.Opt;
import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.config.SystemProperties;
import com.aizuda.easy.retry.server.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.ServerNodeMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig;
@ -56,6 +57,8 @@ public class ServerNodeBalance implements Lifecycle, Runnable {
@Autowired
protected ServerNodeMapper serverNodeMapper;
@Autowired
protected SystemProperties systemProperties;
/**
* 控制rebalance状态
@ -253,7 +256,7 @@ public class ServerNodeBalance implements Lifecycle, Runnable {
LogUtils.error(log, "check balance error", e);
} finally {
try {
TimeUnit.SECONDS.sleep(1);
TimeUnit.SECONDS.sleep(systemProperties.getLoadBalanceCycleTime());
} catch (InterruptedException e) {
LogUtils.error(log, "check balance interrupt");
Thread.currentThread().interrupt();

View File

@ -27,6 +27,7 @@
<hutool-all.version>5.8.19</hutool-all.version>
<mybatis-plus.version>3.5.3.1</mybatis-plus.version>
<alibaba-dingtalk.version>2.0.0</alibaba-dingtalk.version>
<guava-retrying.version>2.0.0</guava-retrying.version>
</properties>
<modules>
@ -81,7 +82,11 @@
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<dependency>
<groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId>
<version>${guava-retrying.version}</version>
</dependency>
</dependencies>
</dependencyManagement>