feat: 1.5.0

1. 回调支持持久化
This commit is contained in:
www.byteblogs.com 2023-06-04 18:46:09 +08:00
parent 03ba7592b2
commit 2b5803c65a
28 changed files with 617 additions and 145 deletions

View File

@ -43,6 +43,7 @@ CREATE TABLE `retry_dead_letter_0`
`executor_name` varchar(512) NOT NULL DEFAULT '' COMMENT '执行器名称',
`args_str` text NOT NULL COMMENT '执行方法参数',
`ext_attrs` text NOT NULL COMMENT '扩展字段',
`task_type` tinyint(4) NOT NULL DEFAULT '1' COMMENT '任务类型 1、重试数据 2、回调数据',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`),
KEY `idx_group_name_scene_name` (`group_name`, `scene_name`),
@ -66,6 +67,7 @@ CREATE TABLE `retry_task_0`
`next_trigger_at` datetime NOT NULL COMMENT '下次触发时间',
`retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数',
`retry_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '重试状态 0、重试中 1、成功 2、最大重试次数',
`task_type` tinyint(4) NOT NULL DEFAULT '1' COMMENT '任务类型 1、重试数据 2、回调数据',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
@ -89,6 +91,7 @@ CREATE TABLE `retry_task_log`
`args_str` text NOT NULL COMMENT '执行方法参数',
`ext_attrs` text NOT NULL COMMENT '扩展字段',
`retry_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '重试状态 0、失败 1、成功',
`task_type` tinyint(4) NOT NULL DEFAULT '1' COMMENT '任务类型 1、重试数据 2、回调数据',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`error_message` text NOT NULL COMMENT '异常信息',

View File

@ -61,4 +61,19 @@ public interface SystemConstants {
}
interface CALL_BACK {
/**
* 回调id前缀
*/
String CB_ = "CB_";
/**
* 最大重试次数
*/
int MAX_RETRY_COUNT = 288;
/**
* 间隔时间
*/
int TRIGGER_INTERVAL = 15 * 60;
}
}

View File

@ -0,0 +1,18 @@
package com.aizuda.easy.retry.common.core.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author www.byteblogs.com
* @date 2023-06-04
* @since 2.0
*/
@AllArgsConstructor
@Getter
public enum TaskTypeEnum {
RETRY(1),
CALLBACK(2);
private final Integer type;
}

View File

@ -2,7 +2,7 @@ package com.aizuda.easy.retry.server.akka;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import com.aizuda.easy.retry.server.support.dispatch.actor.callback.CallbackRetryResultActor;
import com.aizuda.easy.retry.server.support.dispatch.actor.exec.ExecCallbackUnitActor;
import com.aizuda.easy.retry.server.support.dispatch.actor.exec.ExecUnitActor;
import com.aizuda.easy.retry.server.support.dispatch.actor.result.FailureActor;
import com.aizuda.easy.retry.server.support.dispatch.actor.result.FinishActor;
@ -53,7 +53,7 @@ public class ActorGenerator {
* @return actor 引用
*/
public static ActorRef callbackRetryResultActor() {
return getDispatchResultActorSystem().actorOf(getSpringExtension().props(CallbackRetryResultActor.BEAN_NAME));
return getDispatchResultActorSystem().actorOf(getSpringExtension().props(ExecCallbackUnitActor.BEAN_NAME));
}
/**

View File

@ -29,6 +29,8 @@ public class RetryDeadLetter implements Serializable {
private String extAttrs;
private Integer taskType;
private LocalDateTime createDt;
private static final long serialVersionUID = 1L;

View File

@ -37,6 +37,8 @@ public class RetryTask implements Serializable {
private Integer retryStatus;
private Integer taskType;
private LocalDateTime createDt;
private LocalDateTime updateDt;

View File

@ -31,6 +31,8 @@ public class RetryTaskLog implements Serializable {
private Integer retryStatus;
private Integer taskType;
private String errorMessage;
private LocalDateTime createDt;

View File

@ -14,7 +14,7 @@ public interface RetryTaskAccess<T> {
/**
* 批量查询重试任务
*/
List<T> listAvailableTasks(String groupName, LocalDateTime lastAt, Integer pageSize);
List<T> listAvailableTasks(String groupName, LocalDateTime lastAt, Integer pageSize, Integer taskType);
List<T> listRetryTaskByRetryCount(String groupName, Integer retryStatus);

View File

@ -30,12 +30,14 @@ public class MybatisRetryTaskAccess extends AbstractRetryTaskAccess {
}
@Override
public List<RetryTask> listAvailableTasks(String groupName, LocalDateTime lastAt, Integer pageSize) {
public List<RetryTask> listAvailableTasks(String groupName, LocalDateTime lastAt, Integer pageSize, Integer taskType) {
setPartition(groupName);
return retryTaskMapper.selectPage(new PageDTO<>(0, pageSize),
new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())
.eq(RetryTask::getGroupName, groupName).ge(RetryTask::getCreateDt, lastAt)
.eq(RetryTask::getGroupName, groupName)
.eq(RetryTask::getTaskType, taskType)
.ge(RetryTask::getCreateDt, lastAt)
.orderByAsc(RetryTask::getCreateDt)).getRecords();
}

View File

@ -30,8 +30,8 @@ public class RetryTaskAccessProcessor implements RetryTaskAccess<RetryTask> {
* 批量查询重试任务
*/
@Override
public List<RetryTask> listAvailableTasks(String groupName, LocalDateTime lastAt, Integer pageSize) {
return retryTaskAccesses.listAvailableTasks(groupName, lastAt, pageSize);
public List<RetryTask> listAvailableTasks(String groupName, LocalDateTime lastAt, Integer pageSize, Integer taskType) {
return retryTaskAccesses.listAvailableTasks(groupName, lastAt, pageSize, taskType);
}
@Override

View File

@ -19,6 +19,8 @@ public interface RetryTaskConverter {
RetryTask toRetryTask(RetryTaskDTO retryTaskDTO);
RetryTask toRetryTask(RetryTask retryTask);
RetryTask toRetryTask(RetryTaskSaveRequestVO retryTaskSaveRequestVO);
List<RetryTask> toRetryTaskList(List<RetryTaskDTO> retryTaskDTOList);

View File

@ -1,6 +1,9 @@
package com.aizuda.easy.retry.server.support;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode;
import java.util.Set;
/**
* @author: www.byteblogs.com
@ -42,4 +45,10 @@ public interface RetryContext<V> {
* @param waitStrategy {@link WaitStrategy} 等待策略
*/
void setWaitStrategy(WaitStrategy waitStrategy);
ServerNode getServerNode();
Set<String> getSceneBlacklist();
V getCallResult();
}

View File

@ -0,0 +1,56 @@
package com.aizuda.easy.retry.server.support.context;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode;
import com.aizuda.easy.retry.server.support.RetryContext;
import com.aizuda.easy.retry.server.support.WaitStrategy;
import lombok.Data;
import lombok.Getter;
import java.util.Objects;
import java.util.Set;
/**
* @author www.byteblogs.com
* @date 2023-06-04
* @since 2.0
*/
@Data
public class CallbackRetryContext<V> implements RetryContext<V> {
/**
* 通知客户端回调结果
*/
private V callResult;
/**
* 异常信息
*/
private Exception exception;
/**
* 等待策略
*/
private WaitStrategy waitStrategy;
/**
* 当前重试数据
*/
private RetryTask retryTask;
/**
* 目前处理关闭的场景
*/
private Set<String> sceneBlacklist;
/**
* 需要调度的节点
*/
private ServerNode serverNode;
@Override
public boolean hasException() {
return Objects.nonNull(exception);
}
}

View File

@ -56,6 +56,7 @@ public class DispatchService implements Lifecycle {
* MAX_ID_MAP[key] = group idHash MAX_ID_MAP[value] = retry_task的 create_at时间
*/
public static final Map<String, LocalDateTime> LAST_AT_MAP = new HashMap<>();
public static final Map<String, LocalDateTime> LAST_AT_CALL_BACK_MAP = new HashMap<>();
/**
* 调度时长

View File

@ -1,85 +0,0 @@
package com.aizuda.easy.retry.server.support.dispatch.actor.callback;
import akka.actor.AbstractActor;
import cn.hutool.core.util.IdUtil;
import com.aizuda.easy.retry.client.model.RetryCallbackDTO;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.core.model.EasyRetryHeaders;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode;
import com.aizuda.easy.retry.server.support.handler.ClientNodeAllocateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.text.MessageFormat;
import java.util.Objects;
/**
* @author: www.byteblogs.com
* @date : 2023-01-10 08:50
*/
@Component("CallbackRetryResultActor")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class CallbackRetryResultActor extends AbstractActor {
public static final String BEAN_NAME = "CallbackRetryResultActor";
public static final String URL = "http://{0}:{1}/{2}/retry/callback/v1";
@Autowired
private RestTemplate restTemplate;
@Autowired
private ClientNodeAllocateHandler clientNodeAllocateHandler;
@Override
public Receive createReceive() {
return receiveBuilder().match(RetryTask.class, retryTask->{
try {
ServerNode serverNode = clientNodeAllocateHandler.getServerNode(retryTask.getGroupName());
if (Objects.isNull(serverNode)) {
LogUtils.warn(log, "暂无可用的客户端节点");
return;
}
// 回调参数
RetryCallbackDTO retryCallbackDTO = new RetryCallbackDTO();
retryCallbackDTO.setIdempotentId(retryTask.getIdempotentId());
retryCallbackDTO.setRetryStatus(retryTask.getRetryStatus());
retryCallbackDTO.setArgsStr(retryTask.getArgsStr());
retryCallbackDTO.setScene(retryTask.getSceneName());
retryCallbackDTO.setGroup(retryTask.getGroupName());
retryCallbackDTO.setExecutorName(retryTask.getExecutorName());
retryCallbackDTO.setUniqueId(retryTask.getUniqueId());
// 设置header
HttpHeaders requestHeaders = new HttpHeaders();
EasyRetryHeaders easyRetryHeaders = new EasyRetryHeaders();
easyRetryHeaders.setEasyRetry(Boolean.TRUE);
easyRetryHeaders.setEasyRetryId(retryTask.getUniqueId());
requestHeaders.add(SystemConstants.EASY_RETRY_HEAD_KEY, JsonUtil.toJsonString(easyRetryHeaders));
HttpEntity<RetryCallbackDTO> requestEntity = new HttpEntity<>(retryCallbackDTO, requestHeaders);
String format = MessageFormat.format(URL, serverNode.getHostIp(), serverNode.getHostPort().toString(), serverNode.getContextPath());
Result result = restTemplate.postForObject(format, requestEntity, Result.class);
LogUtils.info(log, "回调请求客户端 response:[{}}] ", JsonUtil.toJsonString(result));
} finally {
getContext().stop(getSelf());
}
}).build();
}
}

View File

@ -0,0 +1,151 @@
package com.aizuda.easy.retry.server.support.dispatch.actor.exec;
import akka.actor.AbstractActor;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.client.model.DispatchRetryDTO;
import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO;
import com.aizuda.easy.retry.client.model.RetryCallbackDTO;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.model.EasyRetryHeaders;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskLogMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTaskLog;
import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode;
import com.aizuda.easy.retry.server.service.convert.RetryTaskLogConverter;
import com.aizuda.easy.retry.server.support.IdempotentStrategy;
import com.aizuda.easy.retry.server.support.context.CallbackRetryContext;
import com.aizuda.easy.retry.server.support.context.MaxAttemptsPersistenceRetryContext;
import com.aizuda.easy.retry.server.support.retry.RetryExecutor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.text.MessageFormat;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.concurrent.Callable;
/**
* 重试结果执行器
*
* @author www.byteblogs.com
* @date 2021-10-30
* @since 1.5.0
*/
@Component(ExecCallbackUnitActor.BEAN_NAME)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class ExecCallbackUnitActor extends AbstractActor {
public static final String BEAN_NAME = "ExecCallbackUnitActor";
public static final String URL = "http://{0}:{1}/{2}/retry/callback/v1";
@Autowired
@Qualifier("bitSetIdempotentStrategyHandler")
private IdempotentStrategy<String, Integer> idempotentStrategy;
@Autowired
private RetryTaskLogMapper retryTaskLogMapper;
@Autowired
private RestTemplate restTemplate;
@Override
public Receive createReceive() {
return receiveBuilder().match(RetryExecutor.class, retryExecutor -> {
CallbackRetryContext context = (CallbackRetryContext) retryExecutor.getRetryContext();
RetryTask retryTask = context.getRetryTask();
ServerNode serverNode = context.getServerNode();
RetryTaskLog retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTask(retryTask);
retryTaskLog.setErrorMessage(StringUtils.EMPTY);
try {
if (Objects.nonNull(serverNode)) {
retryExecutor.call((Callable<Result<Void>>) () -> callClient(retryTask, retryTaskLog, serverNode));
if (context.hasException()) {
retryTaskLog.setErrorMessage(context.getException().getMessage());
}
} else {
retryTaskLog.setErrorMessage("暂无可用的客户端POD");
}
}catch (Exception e) {
LogUtils.error(log, "回调客户端失败 retryTask:[{}]", JsonUtil.toJsonString(retryTask), e);
retryTaskLog.setErrorMessage(StringUtils.isBlank(e.getMessage()) ? StringUtils.EMPTY : e.getMessage());
} finally {
// 清除幂等标识位
idempotentStrategy.clear(retryTask.getGroupName(), retryTask.getId().intValue());
getContext().stop(getSelf());
// 记录重试日志
retryTaskLog.setCreateDt(LocalDateTime.now());
retryTaskLog.setId(null);
Assert.isTrue(1 == retryTaskLogMapper.insert(retryTaskLog),
() -> new EasyRetryServerException("新增重试日志失败"));
}
}).build();
}
/**
* 调用客户端
*
* @param retryTask {@link RetryTask} 需要重试的数据
* @return 重试结果返回值
*/
private Result<Void> callClient(RetryTask retryTask, RetryTaskLog retryTaskLog, ServerNode serverNode) {
// 回调参数
RetryCallbackDTO retryCallbackDTO = new RetryCallbackDTO();
retryCallbackDTO.setIdempotentId(retryTask.getIdempotentId());
retryCallbackDTO.setRetryStatus(retryTask.getRetryStatus());
retryCallbackDTO.setArgsStr(retryTask.getArgsStr());
retryCallbackDTO.setScene(retryTask.getSceneName());
retryCallbackDTO.setGroup(retryTask.getGroupName());
retryCallbackDTO.setExecutorName(retryTask.getExecutorName());
retryCallbackDTO.setUniqueId(retryTask.getUniqueId());
// 设置header
HttpHeaders requestHeaders = new HttpHeaders();
EasyRetryHeaders easyRetryHeaders = new EasyRetryHeaders();
easyRetryHeaders.setEasyRetry(Boolean.TRUE);
easyRetryHeaders.setEasyRetryId(retryTask.getUniqueId());
requestHeaders.add(SystemConstants.EASY_RETRY_HEAD_KEY, JsonUtil.toJsonString(easyRetryHeaders));
HttpEntity<RetryCallbackDTO> requestEntity = new HttpEntity<>(retryCallbackDTO, requestHeaders);
String format = MessageFormat.format(URL, serverNode.getHostIp(), serverNode.getHostPort().toString(), serverNode.getContextPath());
Result result = restTemplate.postForObject(format, requestEntity, Result.class);
LogUtils.info(log, "回调请求客户端 response:[{}}] ", JsonUtil.toJsonString(result));
if (1 != result.getStatus() && StringUtils.isNotBlank(result.getMessage())) {
retryTaskLog.setErrorMessage(result.getMessage());
} else {
DispatchRetryResultDTO data = JsonUtil.parseObject(JsonUtil.toJsonString(result.getData()), DispatchRetryResultDTO.class);
result.setData(data);
if (Objects.nonNull(data) && StringUtils.isNotBlank(data.getExceptionMsg())) {
retryTaskLog.setErrorMessage(data.getExceptionMsg());
}
}
LogUtils.info(log, "请求客户端 response:[{}}] ", JsonUtil.toJsonString(result));
return result;
}
}

View File

@ -125,7 +125,7 @@ public class ExecUnitActor extends AbstractActor {
HttpEntity<DispatchRetryDTO> requestEntity = new HttpEntity<>(dispatchRetryDTO, requestHeaders);
String format = MessageFormat.format(URL, serverNode.getHostIp(), serverNode.getHostPort().toString(), serverNode.getContextPath());
Result result = restTemplate.postForObject(format, requestEntity, Result.class);
Result<DispatchRetryResultDTO> result = restTemplate.postForObject(format, requestEntity, Result.class);
if (1 != result.getStatus() && StringUtils.isNotBlank(result.getMessage())) {
retryTaskLog.setErrorMessage(result.getMessage());

View File

@ -3,6 +3,9 @@ package com.aizuda.easy.retry.server.support.dispatch.actor.result;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.support.handler.CallbackRetryTaskHandler;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
@ -21,6 +24,9 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.CollectionUtils;
import java.util.List;
@ -45,13 +51,15 @@ public class FailureActor extends AbstractActor {
@Autowired
@Qualifier("retryTaskAccessProcessor")
private RetryTaskAccess<RetryTask> retryTaskAccess;
@Autowired
@Qualifier("configAccessProcessor")
private ConfigAccess configAccess;
@Autowired
private RetryTaskLogMapper retryTaskLogMapper;
@Autowired
private CallbackRetryTaskHandler callbackRetryTaskHandler;
@Autowired
private TransactionTemplate transactionTemplate;
@Override
public Receive createReceive() {
@ -62,19 +70,28 @@ public class FailureActor extends AbstractActor {
SceneConfig sceneConfig =
configAccess.getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName());
ActorRef actorRef = null;
if (sceneConfig.getMaxRetryCount() <= retryTask.getRetryCount()) {
retryTask.setRetryStatus(RetryStatusEnum.MAX_RETRY_COUNT.getStatus());
actorRef = ActorGenerator.callbackRetryResultActor();
}
try {
retryTaskAccess.updateRetryTask(retryTask);
// 重试成功回调客户端
if (Objects.nonNull(actorRef)) {
actorRef.tell(retryTask, actorRef);
}
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
Integer maxRetryCount;
if (TaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) {
maxRetryCount = SystemConstants.CALL_BACK.MAX_RETRY_COUNT;
} else {
maxRetryCount = sceneConfig.getMaxRetryCount();
// 创建一个回调任务
callbackRetryTaskHandler.create(retryTask);
}
if (maxRetryCount <= retryTask.getRetryCount()) {
retryTask.setRetryStatus(RetryStatusEnum.MAX_RETRY_COUNT.getStatus());
}
retryTaskAccess.updateRetryTask(retryTask);
}
});
} catch (Exception e) {
LogUtils.error(log,"更新重试任务失败", e);
} finally {

View File

@ -3,6 +3,10 @@ package com.aizuda.easy.retry.server.support.dispatch.actor.result;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.service.convert.RetryTaskConverter;
import com.aizuda.easy.retry.server.support.handler.CallbackRetryTaskHandler;
import com.aizuda.easy.retry.server.support.strategy.WaitStrategies;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
@ -14,14 +18,23 @@ import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTaskLog;
import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* 重试完成执行器
@ -42,9 +55,12 @@ public class FinishActor extends AbstractActor {
@Autowired
@Qualifier("retryTaskAccessProcessor")
private RetryTaskAccess<RetryTask> retryTaskAccess;
@Autowired
private RetryTaskLogMapper retryTaskLogMapper;
@Autowired
private CallbackRetryTaskHandler callbackRetryTaskHandler;
@Autowired
private TransactionTemplate transactionTemplate;
@Override
public Receive createReceive() {
@ -54,11 +70,18 @@ public class FinishActor extends AbstractActor {
retryTask.setRetryStatus(RetryStatusEnum.FINISH.getStatus());
try {
retryTaskAccess.updateRetryTask(retryTask);
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
retryTaskAccess.updateRetryTask(retryTask);
if (TaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) {
// 创建一个回调任务
callbackRetryTaskHandler.create(retryTask);
}
}
});
// 重试成功回调客户端
ActorRef actorRef = ActorGenerator.callbackRetryResultActor();
actorRef.tell(retryTask, actorRef);
}catch (Exception e) {
LogUtils.error(log, "更新重试任务失败", e);
} finally {

View File

@ -0,0 +1,162 @@
package com.aizuda.easy.retry.server.support.dispatch.actor.scan;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO;
import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.server.akka.ActorGenerator;
import com.aizuda.easy.retry.server.config.SystemProperties;
import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
import com.aizuda.easy.retry.server.persistence.support.ConfigAccess;
import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess;
import com.aizuda.easy.retry.server.support.IdempotentStrategy;
import com.aizuda.easy.retry.server.support.WaitStrategy;
import com.aizuda.easy.retry.server.support.context.CallbackRetryContext;
import com.aizuda.easy.retry.server.support.context.MaxAttemptsPersistenceRetryContext;
import com.aizuda.easy.retry.server.support.dispatch.DispatchService;
import com.aizuda.easy.retry.server.support.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.server.support.retry.RetryBuilder;
import com.aizuda.easy.retry.server.support.retry.RetryExecutor;
import com.aizuda.easy.retry.server.support.strategy.FilterStrategies;
import com.aizuda.easy.retry.server.support.strategy.StopStrategies;
import com.aizuda.easy.retry.server.support.strategy.WaitStrategies;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.List;
/**
* @author www.byteblogs.com
* @date 2021-10-30
* @since 1.5.0
*/
@Component(ScanCallbackGroupActor.BEAN_NAME)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class ScanCallbackGroupActor extends AbstractActor {
@Autowired
@Qualifier("retryTaskAccessProcessor")
private RetryTaskAccess<RetryTask> retryTaskAccessProcessor;
@Autowired
@Qualifier("bitSetIdempotentStrategyHandler")
private IdempotentStrategy<String, Integer> idempotentStrategy;
@Autowired
private SystemProperties systemProperties;
@Autowired
@Qualifier("configAccessProcessor")
private ConfigAccess configAccess;
@Autowired
private ClientNodeAllocateHandler clientNodeAllocateHandler;
public static final String BEAN_NAME = "ScanCallbackGroupActor";
@Override
public Receive createReceive() {
return receiveBuilder().match(GroupConfig.class, config -> {
try {
doScan(config);
} catch (Exception e) {
LogUtils.error(log, "数据扫描器处理异常 [{}]", config, e);
}
}).build();
}
/**
* 扫描数据
*
* @param groupConfig
*/
private void doScan(GroupConfig groupConfig) {
LocalDateTime defLastAt = LocalDateTime.now().minusDays(systemProperties.getLastDays());
String groupName = groupConfig.getGroupName();
LocalDateTime lastAt = DispatchService.LAST_AT_CALL_BACK_MAP.getOrDefault(groupName, defLastAt);
// 扫描当前Group 待重试的数据
List<RetryTask> list = retryTaskAccessProcessor.listAvailableTasks(groupName, lastAt, systemProperties.getRetryPullPageSize(),
TaskTypeEnum.CALLBACK.getType());
if (!CollectionUtils.isEmpty(list)) {
DispatchService.LAST_AT_MAP.put(groupConfig.getGroupName(), list.get(list.size() - 1).getCreateDt());
for (RetryTask retryTask : list) {
retryCountIncrement(retryTask);
CallbackRetryContext<Result> retryContext = new CallbackRetryContext<>();
retryContext.setRetryTask(retryTask);
retryContext.setSceneBlacklist(configAccess.getBlacklist(groupName));
retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName()));
RetryExecutor<Result> executor = RetryBuilder.<Result>newBuilder()
.withStopStrategy(StopStrategies.stopException())
.withStopStrategy(StopStrategies.stopResultStatus())
.withWaitStrategy(getWaitWaitStrategy())
.withFilterStrategy(FilterStrategies.delayLevelFilter())
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))
.withFilterStrategy(FilterStrategies.sceneBlackFilter())
.withFilterStrategy(FilterStrategies.checkAliveClientPodFilter())
.withFilterStrategy(FilterStrategies.rateLimiterFilter())
.withRetryContext(retryContext)
.build();
if (!executor.filter()) {
continue;
}
productExecUnitActor(executor);
}
} else {
// 数据为空则休眠5s
try {
Thread.sleep((DispatchService.PERIOD / 2) * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
DispatchService.LAST_AT_MAP.put(groupName, defLastAt);
}
}
private WaitStrategy getWaitWaitStrategy() {
// 回调失败每15min重试一次
return WaitStrategies.WaitStrategyEnum.getWaitStrategy(WaitStrategies.WaitStrategyEnum.FIXED.getBackOff());
}
private void retryCountIncrement(RetryTask retryTask) {
Integer retryCount = retryTask.getRetryCount();
retryTask.setRetryCount(++retryCount);
}
private void productExecUnitActor(RetryExecutor<Result> retryExecutor) {
String groupIdHash = retryExecutor.getRetryContext().getRetryTask().getGroupName();
Long retryId = retryExecutor.getRetryContext().getRetryTask().getId();
idempotentStrategy.set(groupIdHash, retryId.intValue());
// 重试成功回调客户端
ActorRef actorRef = ActorGenerator.callbackRetryResultActor();
actorRef.tell(retryExecutor, actorRef);
}
}

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.support.dispatch.actor.scan;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.support.retry.RetryExecutor;
import com.aizuda.easy.retry.server.support.strategy.FilterStrategies;
import com.aizuda.easy.retry.server.support.strategy.StopStrategies;
@ -90,7 +91,7 @@ public class ScanGroupActor extends AbstractActor {
LocalDateTime lastAt = DispatchService.LAST_AT_MAP.getOrDefault(groupName, defLastAt);
// 扫描当前Group 待重试的数据
List<RetryTask> list = retryTaskAccessProcessor.listAvailableTasks(groupName, lastAt, systemProperties.getRetryPullPageSize());
List<RetryTask> list = retryTaskAccessProcessor.listAvailableTasks(groupName, lastAt, systemProperties.getRetryPullPageSize(), TaskTypeEnum.RETRY.getType());
if (!CollectionUtils.isEmpty(list)) {
@ -108,7 +109,7 @@ public class ScanGroupActor extends AbstractActor {
RetryExecutor<Result<DispatchRetryResultDTO>> executor = RetryBuilder.<Result<DispatchRetryResultDTO>>newBuilder()
.withStopStrategy(StopStrategies.stopException())
.withStopStrategy(StopStrategies.stopResultStatus())
.withStopStrategy(StopStrategies.stopResultStatusCode())
.withWaitStrategy(getWaitWaitStrategy(groupName, retryTask.getSceneName()))
.withFilterStrategy(FilterStrategies.delayLevelFilter())
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))

View File

@ -0,0 +1,50 @@
package com.aizuda.easy.retry.server.support.handler;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess;
import com.aizuda.easy.retry.server.service.convert.RetryTaskConverter;
import com.aizuda.easy.retry.server.support.strategy.WaitStrategies;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
/**
* @author www.byteblogs.com
* @date 2023-06-04
* @since 1.5.0
*/
@Component
public class CallbackRetryTaskHandler {
@Autowired
@Qualifier("retryTaskAccessProcessor")
private RetryTaskAccess<RetryTask> retryTaskAccess;
@Transactional
public void create(RetryTask retryTask) {
RetryTask callbackRetryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryTask);
callbackRetryTask.setTaskType(TaskTypeEnum.CALLBACK.getType());
retryTask.setUniqueId(SystemConstants.CALL_BACK.CB_ + retryTask.getUniqueId());
retryTask.setRetryStatus(RetryStatusEnum.RUNNING.getStatus());
retryTask.setRetryCount(0);
retryTask.setCreateDt(LocalDateTime.now());
retryTask.setUpdateDt(LocalDateTime.now());
retryTask.setNextTriggerAt(WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null));
Assert.isTrue(1 == retryTaskAccess.saveRetryTask(callbackRetryTask), () -> new EasyRetryServerException("failed to report data"));
}
}

View File

@ -85,9 +85,7 @@ public class FilterStrategies {
@Override
public boolean filter(RetryContext retryContext) {
MaxAttemptsPersistenceRetryContext context = (MaxAttemptsPersistenceRetryContext) retryContext;
LocalDateTime nextTriggerAt = context.getRetryTask().getNextTriggerAt();
LocalDateTime nextTriggerAt = retryContext.getRetryTask().getNextTriggerAt();
return nextTriggerAt.isBefore(LocalDateTime.now());
}
@ -131,8 +129,7 @@ public class FilterStrategies {
@Override
public boolean filter(RetryContext retryContext) {
MaxAttemptsPersistenceRetryContext context = (MaxAttemptsPersistenceRetryContext) retryContext;
return !context.getSceneBlacklist().contains(retryContext.getRetryTask().getSceneName());
return !retryContext.getSceneBlacklist().contains(retryContext.getRetryTask().getSceneName());
}
@Override
@ -148,8 +145,7 @@ public class FilterStrategies {
@Override
public boolean filter(RetryContext retryContext) {
MaxAttemptsPersistenceRetryContext context = (MaxAttemptsPersistenceRetryContext) retryContext;
ServerNode serverNode = context.getServerNode();
ServerNode serverNode = retryContext.getServerNode();
if (Objects.isNull(serverNode)) {
return false;
@ -173,8 +169,7 @@ public class FilterStrategies {
@Override
public boolean filter(RetryContext retryContext) {
MaxAttemptsPersistenceRetryContext context = (MaxAttemptsPersistenceRetryContext) retryContext;
ServerNode serverNode = context.getServerNode();
ServerNode serverNode = retryContext.getServerNode();
RateLimiter rateLimiter = CacheGroupRateLimiter.getRateLimiterByKey(serverNode.getHostId());
if (Objects.nonNull(rateLimiter) && !rateLimiter.tryAcquire(100, TimeUnit.MILLISECONDS)) {

View File

@ -31,14 +31,23 @@ public class StopStrategies {
}
/**
* 根据客户端返回结果判断是否终止重试
* 根据客户端返回状态判断是否终止重试
*
* @return {@link ResultStatusStopStrategy} 重试结果停止策略
* @return {@link ResultStatusCodeStopStrategy} 重试结果停止策略
*/
public static StopStrategy stopResultStatus() {
return new ResultStatusStopStrategy();
}
/**
* 根据客户端返回结果对象的状态码判断是否终止重试
*
* @return {@link ResultStatusCodeStopStrategy} 重试结果停止策略
*/
public static StopStrategy stopResultStatusCode() {
return new ResultStatusCodeStopStrategy();
}
/**
* 调用客户端发生异常触发停止策略
*/
@ -61,20 +70,47 @@ public class StopStrategies {
}
/**
* 根据客户端返回结果集判断是否应该停止
*
* 根据客户端返回的状态码判断是否应该停止
* <p>
* 1{@link Result#getStatus()} 不为1 则继续重试
* 2根据{@link Result#getData()}中的statusCode判断是否停止
*/
private static final class ResultStatusStopStrategy implements StopStrategy {
@Override
public boolean shouldStop(RetryContext retryContext) {
MaxAttemptsPersistenceRetryContext<Result<DispatchRetryResultDTO>> context =
(MaxAttemptsPersistenceRetryContext<Result<DispatchRetryResultDTO>>) retryContext;
Result response = (Result) retryContext.getCallResult();
Result<DispatchRetryResultDTO> response = context.getCallResult();
if (Objects.isNull(response) || StatusEnum.YES.getStatus() != response.getStatus()) {
return Boolean.FALSE;
}
return Boolean.TRUE;
}
@Override
public boolean supports(RetryContext retryContext) {
return true;
}
@Override
public int order() {
return 2;
}
}
/**
* 根据客户端返回结果集判断是否应该停止
* <p>
* 1{@link Result#getStatus()} 不为1 则继续重试
* 2根据{@link Result#getData()}中的statusCode判断是否停止
*/
private static final class ResultStatusCodeStopStrategy implements StopStrategy {
@Override
public boolean shouldStop(RetryContext retryContext) {
Result<DispatchRetryResultDTO> response = (Result<DispatchRetryResultDTO>) retryContext.getCallResult();
if (Objects.isNull(response) || StatusEnum.YES.getStatus() != response.getStatus()) {
return Boolean.FALSE;
@ -97,7 +133,7 @@ public class StopStrategies {
@Override
public int order() {
return 2;
return 3;
}
}

View File

@ -1,5 +1,7 @@
package com.aizuda.easy.retry.server.support.strategy;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
import com.aizuda.easy.retry.server.persistence.mybatis.po.SceneConfig;
@ -145,8 +147,7 @@ public class WaitStrategies {
private static final class DelayLevelWaitStrategy implements WaitStrategy {
@Override
public LocalDateTime computeRetryTime(RetryContext retryContext) {
MaxAttemptsPersistenceRetryContext context = (MaxAttemptsPersistenceRetryContext) retryContext;
public LocalDateTime computeRetryTime(RetryContext context) {
RetryTask retryTask = context.getRetryTask();
DelayLevelEnum levelEnum = DelayLevelEnum.getDelayLevelByLevel(retryTask.getRetryCount());
return retryTask.getNextTriggerAt().plus(levelEnum.getTime(), levelEnum.getUnit());
@ -160,13 +161,20 @@ public class WaitStrategies {
@Override
public LocalDateTime computeRetryTime(RetryContext retryContext) {
MaxAttemptsPersistenceRetryContext context = (MaxAttemptsPersistenceRetryContext) retryContext;
RetryTask retryTask = context.getRetryTask();
ConfigAccess configAccess = SpringContext.CONTEXT.getBean("configAccessProcessor", ConfigAccess.class);
RetryTask retryTask = retryContext.getRetryTask();
int triggerInterval;
if (TaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) {
// 回调失败的默认15分钟执行一次重试
triggerInterval = SystemConstants.CALL_BACK.TRIGGER_INTERVAL;
} else {
ConfigAccess configAccess = SpringContext.CONTEXT.getBean("configAccessProcessor", ConfigAccess.class);
SceneConfig sceneConfig =
configAccess.getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName());
triggerInterval = Integer.parseInt(sceneConfig.getTriggerInterval());
}
SceneConfig sceneConfig =
configAccess.getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName());
return retryTask.getNextTriggerAt().plusSeconds(Integer.parseInt(sceneConfig.getTriggerInterval()));
return retryTask.getNextTriggerAt().plusSeconds(triggerInterval);
}
}
@ -176,8 +184,7 @@ public class WaitStrategies {
private static final class CronWaitStrategy implements WaitStrategy {
@Override
public LocalDateTime computeRetryTime(RetryContext retryContext) {
MaxAttemptsPersistenceRetryContext context = (MaxAttemptsPersistenceRetryContext) retryContext;
public LocalDateTime computeRetryTime(RetryContext context) {
RetryTask retryTask = context.getRetryTask();
ConfigAccess configAccess = SpringContext.CONTEXT.getBean(ConfigAccess.class);
@ -185,7 +192,7 @@ public class WaitStrategies {
SceneConfig sceneConfig =
configAccess.getSceneConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName());
Date nextValidTime = null;
Date nextValidTime;
try {
ZonedDateTime zdt = retryTask.getNextTriggerAt().atZone(ZoneOffset.ofHours(8));
nextValidTime = new CronExpression(sceneConfig.getTriggerInterval()).getNextValidTimeAfter(Date.from(zdt.toInstant()));

View File

@ -11,10 +11,11 @@
<result column="executor_name" jdbcType="VARCHAR" property="executorName" />
<result column="args_str" jdbcType="VARCHAR" property="argsStr" />
<result column="ext_attrs" jdbcType="VARCHAR" property="extAttrs" />
<result column="task_type" jdbcType="TINYINT" property="taskType"/>
<result column="create_dt" jdbcType="TIMESTAMP" property="createDt" />
</resultMap>
<sql id="Base_Column_List">
id, unique_id, group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, create_dt
id, unique_id, group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, create_dt, task_type
</sql>
<insert id="insertBatch">
insert into retry_dead_letter_${partition} (id, unique_id, group_name, scene_name,

View File

@ -12,12 +12,13 @@
<result column="args_str" jdbcType="VARCHAR" property="argsStr"/>
<result column="ext_attrs" jdbcType="VARCHAR" property="extAttrs"/>
<result column="retry_status" jdbcType="TINYINT" property="retryStatus"/>
<result column="task_type" jdbcType="TINYINT" property="taskType"/>
<result column="error_message" jdbcType="VARCHAR" property="errorMessage"/>
<result column="create_dt" jdbcType="TIMESTAMP" property="createDt"/>
</resultMap>
<sql id="Base_Column_List">
id, unique_id, group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, retry_status, error_message,
create_dt
create_dt, task_type
</sql>
<select id="selectByPrimaryKey" parameterType="java.lang.Long" resultMap="BaseResultMap">
select

View File

@ -14,12 +14,13 @@
<result column="next_trigger_at" jdbcType="TIMESTAMP" property="nextTriggerAt" />
<result column="retry_count" jdbcType="TINYINT" property="retryCount" />
<result column="retry_status" jdbcType="TINYINT" property="retryStatus" />
<result column="task_type" jdbcType="TINYINT" property="taskType"/>
<result column="create_dt" jdbcType="TIMESTAMP" property="createDt" />
<result column="update_dt" jdbcType="TIMESTAMP" property="updateDt" />
</resultMap>
<sql id="Base_Column_List">
id, unique_id, group_name, scene_name, idempotent_id, biz_no, executor_name, args_str, ext_attrs, next_trigger_at, retry_count, retry_status,
create_dt, update_dt
create_dt, update_dt, task_type
</sql>
<select id="selectByPrimaryKey" parameterType="java.lang.Long" resultMap="BaseResultMap">
select