feat: 1.5.0

1. 完成回调通知持久化改造
This commit is contained in:
byteblogs168 2023-06-05 18:45:32 +08:00
parent 2b5803c65a
commit 78abf3fa1e
20 changed files with 346 additions and 316 deletions

View File

@ -13,6 +13,7 @@ public class SimpleRetryCompleteCallback implements RetryCompleteCallback {
@Override
public void doSuccessCallback(String sceneName, String executorName, Object[] params) {
}
@Override

View File

@ -1,18 +0,0 @@
package com.aizuda.easy.retry.common.core.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author www.byteblogs.com
* @date 2023-06-04
* @since 2.0
*/
@AllArgsConstructor
@Getter
public enum TaskTypeEnum {
RETRY(1),
CALLBACK(2);
private final Integer type;
}

View File

@ -7,6 +7,8 @@ import com.aizuda.easy.retry.server.support.dispatch.actor.exec.ExecUnitActor;
import com.aizuda.easy.retry.server.support.dispatch.actor.result.FailureActor;
import com.aizuda.easy.retry.server.support.dispatch.actor.result.FinishActor;
import com.aizuda.easy.retry.server.support.dispatch.actor.result.NoRetryActor;
import com.aizuda.easy.retry.server.support.dispatch.actor.scan.AbstractScanGroup;
import com.aizuda.easy.retry.server.support.dispatch.actor.scan.ScanCallbackGroupActor;
import com.aizuda.easy.retry.server.support.dispatch.actor.scan.ScanGroupActor;
import com.aizuda.easy.retry.common.core.context.SpringContext;
@ -52,7 +54,7 @@ public class ActorGenerator {
*
* @return actor 引用
*/
public static ActorRef callbackRetryResultActor() {
public static ActorRef execCallbackUnitActor() {
return getDispatchResultActorSystem().actorOf(getSpringExtension().props(ExecCallbackUnitActor.BEAN_NAME));
}
@ -74,6 +76,15 @@ public class ActorGenerator {
return getDispatchRetryActorSystem().actorOf(getSpringExtension().props(ScanGroupActor.BEAN_NAME));
}
/**
* 生成扫描重试数据的actor
*
* @return actor 引用
*/
public static ActorRef scanCallbackGroupActor() {
return getDispatchRetryActorSystem().actorOf(getSpringExtension().props(ScanCallbackGroupActor.BEAN_NAME));
}
public static SpringExtension getSpringExtension() {
return SpringContext.getBeanByType(SpringExtension.class);
}

View File

@ -0,0 +1,21 @@
package com.aizuda.easy.retry.server.enums;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.server.akka.ActorGenerator;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author www.byteblogs.com
* @date 2023-06-04
* @since 2.0
*/
@AllArgsConstructor
@Getter
public enum TaskTypeEnum {
RETRY(1, ActorGenerator.scanGroupActor()),
CALLBACK(2, ActorGenerator.scanCallbackGroupActor());
private final Integer type;
private final ActorRef actorRef;
}

View File

@ -37,7 +37,7 @@ public class MybatisRetryTaskAccess extends AbstractRetryTaskAccess {
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())
.eq(RetryTask::getGroupName, groupName)
.eq(RetryTask::getTaskType, taskType)
.ge(RetryTask::getCreateDt, lastAt)
.gt(RetryTask::getCreateDt, lastAt)
.orderByAsc(RetryTask::getCreateDt)).getRecords();
}

View File

@ -46,9 +46,13 @@ public interface RetryContext<V> {
*/
void setWaitStrategy(WaitStrategy waitStrategy);
WaitStrategy getWaitStrategy();
ServerNode getServerNode();
Set<String> getSceneBlacklist();
V getCallResult();
}

View File

@ -1,6 +1,7 @@
package com.aizuda.easy.retry.server.support.cache;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.server.enums.TaskTypeEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
@ -28,8 +29,17 @@ public class CacheGroupScanActor implements Lifecycle {
*
* @return 缓存对象
*/
public static Cache<String, ActorRef> getAll() {
return CACHE;
public static ActorRef get(String groupName, TaskTypeEnum typeEnum) {
return CACHE.getIfPresent(groupName.concat(typeEnum.name()));
}
/**
* 获取所有缓存
*
* @return 缓存对象
*/
public static void put(String groupName, TaskTypeEnum typeEnum, ActorRef actorRef) {
CACHE.put(groupName.concat(typeEnum.name()), actorRef);
}
@Override
@ -44,5 +54,6 @@ public class CacheGroupScanActor implements Lifecycle {
@Override
public void close() {
LogUtils.info(log, "CacheGroupScanActor stop");
CACHE.invalidateAll();
}
}

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.support.dispatch;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum;
import com.aizuda.easy.retry.server.enums.TaskTypeEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.akka.ActorGenerator;
import com.aizuda.easy.retry.server.config.SystemProperties;
@ -23,11 +24,8 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -50,14 +48,6 @@ public class DispatchService implements Lifecycle {
private final ScheduledExecutorService dispatchService = Executors
.newSingleThreadScheduledExecutor(r -> new Thread(r, "DispatchService"));
/**
* 缓存待拉取数据的起点时间
* <p>
* MAX_ID_MAP[key] = group idHash MAX_ID_MAP[value] = retry_task的 create_at时间
*/
public static final Map<String, LocalDateTime> LAST_AT_MAP = new HashMap<>();
public static final Map<String, LocalDateTime> LAST_AT_CALL_BACK_MAP = new HashMap<>();
/**
* 调度时长
*/
@ -87,7 +77,9 @@ public class DispatchService implements Lifecycle {
List<GroupConfig> currentHostGroupList = getCurrentHostGroupList();
if (!CollectionUtils.isEmpty(currentHostGroupList)) {
for (GroupConfig groupConfigContext : currentHostGroupList) {
produceScanActorTask(groupConfigContext);
ScanTaskDTO scanTaskDTO = new ScanTaskDTO();
scanTaskDTO.setGroupName(groupConfigContext.getGroupName());
produceScanActorTask(scanTaskDTO);
}
}
@ -102,19 +94,23 @@ public class DispatchService implements Lifecycle {
/**
* 扫描任务生成器
*
* @param groupConfig {@link GroupConfig} 组上下文
* @param scanTaskDTO {@link GroupConfig} 组上下文
*/
private void produceScanActorTask(GroupConfig groupConfig) {
private void produceScanActorTask(ScanTaskDTO scanTaskDTO) {
String groupName = groupConfig.getGroupName();
ActorRef scanActorRef = cacheActorRef(groupName);
String groupName = scanTaskDTO.getGroupName();
// 缓存按照
cacheRateLimiter(groupName);
// rebalance group scan 流程合一
scanActorRef.tell(groupConfig, scanActorRef);
// 扫描重试数据
ActorRef scanRetryActorRef = cacheActorRef(groupName, TaskTypeEnum.RETRY);
scanRetryActorRef.tell(scanTaskDTO, scanRetryActorRef);
// 扫描回调数据
ActorRef scanCallbackActorRef = cacheActorRef(groupName, TaskTypeEnum.CALLBACK);
scanCallbackActorRef.tell(scanTaskDTO, scanCallbackActorRef);
}
/**
@ -136,13 +132,12 @@ public class DispatchService implements Lifecycle {
/**
* 缓存Actor对象
*/
private ActorRef cacheActorRef(String groupName) {
Cache<String, ActorRef> actorRefCache = CacheGroupScanActor.getAll();
ActorRef scanActorRef = actorRefCache.getIfPresent(groupName);
private ActorRef cacheActorRef(String groupName, TaskTypeEnum typeEnum) {
ActorRef scanActorRef = CacheGroupScanActor.get(groupName, typeEnum);
if (Objects.isNull(scanActorRef)) {
scanActorRef = ActorGenerator.scanGroupActor();
scanActorRef = typeEnum.getActorRef();
// 缓存扫描器actor
actorRefCache.put(groupName, scanActorRef);
CacheGroupScanActor.put(groupName, typeEnum, scanActorRef);
}
return scanActorRef;
}

View File

@ -0,0 +1,13 @@
package com.aizuda.easy.retry.server.support.dispatch;
import lombok.Data;
/**
* @author: ww.byteblogs.com
* @date : 2023-06-05 16:30
*/
@Data
public class ScanTaskDTO {
String groupName;
}

View File

@ -6,6 +6,7 @@ import com.aizuda.easy.retry.client.model.DispatchRetryDTO;
import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO;
import com.aizuda.easy.retry.client.model.RetryCallbackDTO;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.model.EasyRetryHeaders;
import com.aizuda.easy.retry.common.core.model.Result;
@ -78,7 +79,7 @@ public class ExecCallbackUnitActor extends AbstractActor {
retryTaskLog.setErrorMessage(context.getException().getMessage());
}
} else {
retryTaskLog.setErrorMessage("暂无可用的客户端POD");
retryTaskLog.setErrorMessage("There are currently no available client PODs.");
}
}catch (Exception e) {
@ -131,7 +132,7 @@ public class ExecCallbackUnitActor extends AbstractActor {
Result result = restTemplate.postForObject(format, requestEntity, Result.class);
LogUtils.info(log, "回调请求客户端 response:[{}}] ", JsonUtil.toJsonString(result));
if (1 != result.getStatus() && StringUtils.isNotBlank(result.getMessage())) {
if (StatusEnum.YES.getStatus() != result.getStatus() && StringUtils.isNotBlank(result.getMessage())) {
retryTaskLog.setErrorMessage(result.getMessage());
} else {
DispatchRetryResultDTO data = JsonUtil.parseObject(JsonUtil.toJsonString(result.getData()), DispatchRetryResultDTO.class);

View File

@ -1,16 +1,14 @@
package com.aizuda.easy.retry.server.support.dispatch.actor.result;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.support.handler.CallbackRetryTaskHandler;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.akka.ActorGenerator;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskLogMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
@ -30,7 +28,6 @@ import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Objects;
/**
* 重试完成执行器
@ -81,12 +78,12 @@ public class FailureActor extends AbstractActor {
maxRetryCount = SystemConstants.CALL_BACK.MAX_RETRY_COUNT;
} else {
maxRetryCount = sceneConfig.getMaxRetryCount();
// 创建一个回调任务
callbackRetryTaskHandler.create(retryTask);
}
if (maxRetryCount <= retryTask.getRetryCount()) {
retryTask.setRetryStatus(RetryStatusEnum.MAX_RETRY_COUNT.getStatus());
// 创建一个回调任务
callbackRetryTaskHandler.create(retryTask);
}
retryTaskAccess.updateRetryTask(retryTask);

View File

@ -1,40 +1,30 @@
package com.aizuda.easy.retry.server.support.dispatch.actor.result;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.service.convert.RetryTaskConverter;
import com.aizuda.easy.retry.server.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.support.handler.CallbackRetryTaskHandler;
import com.aizuda.easy.retry.server.support.strategy.WaitStrategies;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.akka.ActorGenerator;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskLogMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTaskLog;
import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* 重试完成执行器
@ -75,7 +65,7 @@ public class FinishActor extends AbstractActor {
protected void doInTransactionWithoutResult(TransactionStatus status) {
retryTaskAccess.updateRetryTask(retryTask);
if (TaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) {
if (TaskTypeEnum.RETRY.getType().equals(retryTask.getTaskType())) {
// 创建一个回调任务
callbackRetryTaskHandler.create(retryTask);
}

View File

@ -4,8 +4,8 @@ import akka.actor.AbstractActor;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess;
import com.aizuda.easy.retry.server.support.RetryContext;
import com.aizuda.easy.retry.server.support.WaitStrategy;
import com.aizuda.easy.retry.server.support.context.MaxAttemptsPersistenceRetryContext;
import com.aizuda.easy.retry.server.support.retry.RetryExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -19,8 +19,9 @@ import org.springframework.stereotype.Component;
*
* @author: www.byteblogs.com
* @date : 2022-04-14 16:11
* @since 1.0.0
*/
@Component("NoRetryActor")
@Component(NoRetryActor.BEAN_NAME)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class NoRetryActor extends AbstractActor {
@ -35,7 +36,7 @@ public class NoRetryActor extends AbstractActor {
public Receive createReceive() {
return receiveBuilder().match(RetryExecutor.class, retryExecutor -> {
MaxAttemptsPersistenceRetryContext retryContext = (MaxAttemptsPersistenceRetryContext) retryExecutor.getRetryContext();
RetryContext retryContext = retryExecutor.getRetryContext();
RetryTask retryTask = retryContext.getRetryTask();
WaitStrategy waitStrategy = retryContext.getWaitStrategy();
retryTask.setNextTriggerAt(waitStrategy.computeRetryTime(retryContext));

View File

@ -0,0 +1,132 @@
package com.aizuda.easy.retry.server.support.dispatch.actor.scan;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.config.SystemProperties;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
import com.aizuda.easy.retry.server.persistence.support.ConfigAccess;
import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess;
import com.aizuda.easy.retry.server.support.IdempotentStrategy;
import com.aizuda.easy.retry.server.support.RetryContext;
import com.aizuda.easy.retry.server.support.dispatch.DispatchService;
import com.aizuda.easy.retry.server.support.dispatch.ScanTaskDTO;
import com.aizuda.easy.retry.server.support.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.server.support.retry.RetryExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
/**
* 数据扫描模板类
*
* @author: www.byteblogs.com
* @date : 2023-06-05 15:44
* @since 1.5.0
*/
@Slf4j
public abstract class AbstractScanGroup extends AbstractActor {
@Autowired
@Qualifier("retryTaskAccessProcessor")
protected RetryTaskAccess<RetryTask> retryTaskAccessProcessor;
@Autowired
@Qualifier("bitSetIdempotentStrategyHandler")
protected IdempotentStrategy<String, Integer> idempotentStrategy;
@Autowired
protected SystemProperties systemProperties;
@Autowired
@Qualifier("configAccessProcessor")
protected ConfigAccess configAccess;
@Autowired
protected ClientNodeAllocateHandler clientNodeAllocateHandler;
@Override
public Receive createReceive() {
return receiveBuilder().match(ScanTaskDTO.class, config -> {
try {
doScan(config);
} catch (Exception e) {
LogUtils.error(log, "Data scanner processing exception. [{}]", config, e);
}
}).build();
}
protected void doScan(final ScanTaskDTO scanTaskDTO) {
LocalDateTime defLastAt = LocalDateTime.now().minusDays(systemProperties.getLastDays());
String groupName = scanTaskDTO.getGroupName();
LocalDateTime lastAt = Optional.ofNullable(getLastAt(groupName)).orElse(defLastAt);
// 扫描当前Group 待重试的数据
List<RetryTask> list = retryTaskAccessProcessor.listAvailableTasks(groupName, lastAt, systemProperties.getRetryPullPageSize(),
getTaskType());
if (!CollectionUtils.isEmpty(list)) {
// 更新拉取的最大的创建时间
putLastAt(scanTaskDTO.getGroupName(), list.get(list.size() - 1).getCreateDt());
for (RetryTask retryTask : list) {
// 重试次数累加
retryCountIncrement(retryTask);
RetryContext retryContext = builderRetryContext(groupName, retryTask);
RetryExecutor executor = builderResultRetryExecutor(retryContext);
if (!executor.filter()) {
continue;
}
productExecUnitActor(executor);
}
} else {
// 数据为空则休眠5s
try {
Thread.sleep((DispatchService.PERIOD / 2) * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
putLastAt(groupName, defLastAt);
}
}
protected abstract RetryContext builderRetryContext(String groupName, RetryTask retryTask);
protected abstract RetryExecutor builderResultRetryExecutor(RetryContext retryContext);
protected abstract Integer getTaskType();
protected abstract LocalDateTime getLastAt(String groupName);
protected abstract LocalDateTime putLastAt(String groupName, LocalDateTime LocalDateTime);
private void retryCountIncrement(RetryTask retryTask) {
Integer retryCount = retryTask.getRetryCount();
retryTask.setRetryCount(++retryCount);
}
private void productExecUnitActor(RetryExecutor retryExecutor) {
String groupIdHash = retryExecutor.getRetryContext().getRetryTask().getGroupName();
Long retryId = retryExecutor.getRetryContext().getRetryTask().getId();
idempotentStrategy.set(groupIdHash, retryId.intValue());
// 重试成功回调客户端
ActorRef actorRef = getActorRef();
actorRef.tell(retryExecutor, actorRef);
}
protected abstract ActorRef getActorRef();
}

View File

@ -1,38 +1,26 @@
package com.aizuda.easy.retry.server.support.dispatch.actor.scan;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO;
import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.enums.TaskTypeEnum;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.server.akka.ActorGenerator;
import com.aizuda.easy.retry.server.config.SystemProperties;
import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
import com.aizuda.easy.retry.server.persistence.support.ConfigAccess;
import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess;
import com.aizuda.easy.retry.server.support.IdempotentStrategy;
import com.aizuda.easy.retry.server.support.RetryContext;
import com.aizuda.easy.retry.server.support.WaitStrategy;
import com.aizuda.easy.retry.server.support.context.CallbackRetryContext;
import com.aizuda.easy.retry.server.support.context.MaxAttemptsPersistenceRetryContext;
import com.aizuda.easy.retry.server.support.dispatch.DispatchService;
import com.aizuda.easy.retry.server.support.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.server.support.retry.RetryBuilder;
import com.aizuda.easy.retry.server.support.retry.RetryExecutor;
import com.aizuda.easy.retry.server.support.strategy.FilterStrategies;
import com.aizuda.easy.retry.server.support.strategy.StopStrategies;
import com.aizuda.easy.retry.server.support.strategy.WaitStrategies;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* @author www.byteblogs.com
@ -42,73 +30,30 @@ import java.util.List;
@Component(ScanCallbackGroupActor.BEAN_NAME)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class ScanCallbackGroupActor extends AbstractActor {
@Autowired
@Qualifier("retryTaskAccessProcessor")
private RetryTaskAccess<RetryTask> retryTaskAccessProcessor;
@Autowired
@Qualifier("bitSetIdempotentStrategyHandler")
private IdempotentStrategy<String, Integer> idempotentStrategy;
@Autowired
private SystemProperties systemProperties;
@Autowired
@Qualifier("configAccessProcessor")
private ConfigAccess configAccess;
@Autowired
private ClientNodeAllocateHandler clientNodeAllocateHandler;
public class ScanCallbackGroupActor extends AbstractScanGroup {
public static final String BEAN_NAME = "ScanCallbackGroupActor";
@Override
public Receive createReceive() {
return receiveBuilder().match(GroupConfig.class, config -> {
try {
doScan(config);
} catch (Exception e) {
LogUtils.error(log, "数据扫描器处理异常 [{}]", config, e);
}
}).build();
}
/**
* 扫描数据
*
* @param groupConfig
* 缓存待拉取数据的起点时间
* <p>
* LAST_AT_MAP[key] = groupName LAST_AT_MAP[value] = retry_task的 create_at时间
*/
private void doScan(GroupConfig groupConfig) {
public static final ConcurrentMap<String, LocalDateTime> LAST_AT_MAP = new ConcurrentHashMap<>();
LocalDateTime defLastAt = LocalDateTime.now().minusDays(systemProperties.getLastDays());
String groupName = groupConfig.getGroupName();
LocalDateTime lastAt = DispatchService.LAST_AT_CALL_BACK_MAP.getOrDefault(groupName, defLastAt);
// 扫描当前Group 待重试的数据
List<RetryTask> list = retryTaskAccessProcessor.listAvailableTasks(groupName, lastAt, systemProperties.getRetryPullPageSize(),
TaskTypeEnum.CALLBACK.getType());
if (!CollectionUtils.isEmpty(list)) {
DispatchService.LAST_AT_MAP.put(groupConfig.getGroupName(), list.get(list.size() - 1).getCreateDt());
for (RetryTask retryTask : list) {
retryCountIncrement(retryTask);
@Override
protected RetryContext builderRetryContext(final String groupName, final RetryTask retryTask) {
CallbackRetryContext<Result> retryContext = new CallbackRetryContext<>();
retryContext.setRetryTask(retryTask);
retryContext.setSceneBlacklist(configAccess.getBlacklist(groupName));
retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName()));
return retryContext;
}
RetryExecutor<Result> executor = RetryBuilder.<Result>newBuilder()
@Override
protected RetryExecutor builderResultRetryExecutor(RetryContext retryContext) {
return RetryBuilder.<Result>newBuilder()
.withStopStrategy(StopStrategies.stopException())
.withStopStrategy(StopStrategies.stopResultStatus())
.withWaitStrategy(getWaitWaitStrategy())
@ -119,23 +64,21 @@ public class ScanCallbackGroupActor extends AbstractActor {
.withFilterStrategy(FilterStrategies.rateLimiterFilter())
.withRetryContext(retryContext)
.build();
if (!executor.filter()) {
continue;
}
productExecUnitActor(executor);
}
} else {
// 数据为空则休眠5s
try {
Thread.sleep((DispatchService.PERIOD / 2) * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@Override
protected Integer getTaskType() {
return TaskTypeEnum.CALLBACK.getType();
}
DispatchService.LAST_AT_MAP.put(groupName, defLastAt);
@Override
protected LocalDateTime getLastAt(final String groupName) {
return LAST_AT_MAP.get(groupName);
}
@Override
protected LocalDateTime putLastAt(final String groupName, final LocalDateTime LocalDateTime) {
return LAST_AT_MAP.put(groupName, LocalDateTime);
}
private WaitStrategy getWaitWaitStrategy() {
@ -143,20 +86,9 @@ public class ScanCallbackGroupActor extends AbstractActor {
return WaitStrategies.WaitStrategyEnum.getWaitStrategy(WaitStrategies.WaitStrategyEnum.FIXED.getBackOff());
}
private void retryCountIncrement(RetryTask retryTask) {
Integer retryCount = retryTask.getRetryCount();
retryTask.setRetryCount(++retryCount);
}
private void productExecUnitActor(RetryExecutor<Result> retryExecutor) {
String groupIdHash = retryExecutor.getRetryContext().getRetryTask().getGroupName();
Long retryId = retryExecutor.getRetryContext().getRetryTask().getId();
idempotentStrategy.set(groupIdHash, retryId.intValue());
// 重试成功回调客户端
ActorRef actorRef = ActorGenerator.callbackRetryResultActor();
actorRef.tell(retryExecutor, actorRef);
@Override
protected ActorRef getActorRef() {
return ActorGenerator.execCallbackUnitActor();
}
}

View File

@ -1,116 +1,65 @@
package com.aizuda.easy.retry.server.support.dispatch.actor.scan;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum;
import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO;
import com.aizuda.easy.retry.server.enums.TaskTypeEnum;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.server.akka.ActorGenerator;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
import com.aizuda.easy.retry.server.persistence.mybatis.po.SceneConfig;
import com.aizuda.easy.retry.server.support.RetryContext;
import com.aizuda.easy.retry.server.support.WaitStrategy;
import com.aizuda.easy.retry.server.support.context.MaxAttemptsPersistenceRetryContext;
import com.aizuda.easy.retry.server.support.retry.RetryBuilder;
import com.aizuda.easy.retry.server.support.retry.RetryExecutor;
import com.aizuda.easy.retry.server.support.strategy.FilterStrategies;
import com.aizuda.easy.retry.server.support.strategy.StopStrategies;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.server.akka.ActorGenerator;
import com.aizuda.easy.retry.server.config.SystemProperties;
import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
import com.aizuda.easy.retry.server.persistence.mybatis.po.SceneConfig;
import com.aizuda.easy.retry.server.persistence.support.ConfigAccess;
import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess;
import com.aizuda.easy.retry.server.support.IdempotentStrategy;
import com.aizuda.easy.retry.server.support.WaitStrategy;
import com.aizuda.easy.retry.server.support.context.MaxAttemptsPersistenceRetryContext;
import com.aizuda.easy.retry.server.support.dispatch.DispatchService;
import com.aizuda.easy.retry.server.support.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.server.support.retry.RetryBuilder;
import com.aizuda.easy.retry.server.support.strategy.WaitStrategies;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* @author www.byteblogs.com
* @date 2021-10-30
* @since 2.0
*/
@Component("ScanGroupActor")
@Component(ScanGroupActor.BEAN_NAME)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class ScanGroupActor extends AbstractActor {
@Autowired
@Qualifier("retryTaskAccessProcessor")
private RetryTaskAccess<RetryTask> retryTaskAccessProcessor;
@Autowired
@Qualifier("bitSetIdempotentStrategyHandler")
private IdempotentStrategy<String, Integer> idempotentStrategy;
@Autowired
private SystemProperties systemProperties;
@Autowired
@Qualifier("configAccessProcessor")
private ConfigAccess configAccess;
@Autowired
private ClientNodeAllocateHandler clientNodeAllocateHandler;
public class ScanGroupActor extends AbstractScanGroup {
public static final String BEAN_NAME = "ScanGroupActor";
/**
* 缓存待拉取数据的起点时间
* <p>
* LAST_AT_MAP[key] = groupName LAST_AT_MAP[value] = retry_task的 create_at时间
*/
public static final ConcurrentMap<String, LocalDateTime> LAST_AT_MAP = new ConcurrentHashMap<>();
@Override
public Receive createReceive() {
return receiveBuilder().match(GroupConfig.class, config -> {
try {
doScan(config);
} catch (Exception e) {
LogUtils.error(log, "数据扫描器处理异常 [{}]", config, e);
}
}).build();
}
/**
* 扫描数据
*
* @param groupConfig
*/
private void doScan(GroupConfig groupConfig) {
LocalDateTime defLastAt = LocalDateTime.now().minusDays(systemProperties.getLastDays());
String groupName = groupConfig.getGroupName();
LocalDateTime lastAt = DispatchService.LAST_AT_MAP.getOrDefault(groupName, defLastAt);
// 扫描当前Group 待重试的数据
List<RetryTask> list = retryTaskAccessProcessor.listAvailableTasks(groupName, lastAt, systemProperties.getRetryPullPageSize(), TaskTypeEnum.RETRY.getType());
if (!CollectionUtils.isEmpty(list)) {
DispatchService.LAST_AT_MAP.put(groupConfig.getGroupName(), list.get(list.size() - 1).getCreateDt());
for (RetryTask retryTask : list) {
retryCountIncrement(retryTask);
protected RetryContext<Result<DispatchRetryResultDTO>> builderRetryContext(final String groupName,
final RetryTask retryTask) {
MaxAttemptsPersistenceRetryContext<Result<DispatchRetryResultDTO>> retryContext = new MaxAttemptsPersistenceRetryContext<>();
retryContext.setRetryTask(retryTask);
retryContext.setSceneBlacklist(configAccess.getBlacklist(groupName));
retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName()));
return retryContext;
}
RetryExecutor<Result<DispatchRetryResultDTO>> executor = RetryBuilder.<Result<DispatchRetryResultDTO>>newBuilder()
@Override
protected RetryExecutor<Result<DispatchRetryResultDTO>> builderResultRetryExecutor(RetryContext retryContext) {
RetryTask retryTask = retryContext.getRetryTask();
return RetryBuilder.<Result<DispatchRetryResultDTO>>newBuilder()
.withStopStrategy(StopStrategies.stopException())
.withStopStrategy(StopStrategies.stopResultStatusCode())
.withWaitStrategy(getWaitWaitStrategy(groupName, retryTask.getSceneName()))
.withWaitStrategy(getWaitWaitStrategy(retryTask.getGroupName(), retryTask.getSceneName()))
.withFilterStrategy(FilterStrategies.delayLevelFilter())
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))
.withFilterStrategy(FilterStrategies.sceneBlackFilter())
@ -118,23 +67,21 @@ public class ScanGroupActor extends AbstractActor {
.withFilterStrategy(FilterStrategies.rateLimiterFilter())
.withRetryContext(retryContext)
.build();
if (!executor.filter()) {
continue;
}
productExecUnitActor(executor);
}
} else {
// 数据为空则休眠5s
try {
Thread.sleep((DispatchService.PERIOD / 2) * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@Override
protected Integer getTaskType() {
return TaskTypeEnum.RETRY.getType();
}
DispatchService.LAST_AT_MAP.put(groupName, defLastAt);
@Override
protected LocalDateTime getLastAt(final String groupName) {
return LAST_AT_MAP.get(groupName);
}
@Override
protected LocalDateTime putLastAt(final String groupName, final LocalDateTime LocalDateTime) {
return LAST_AT_MAP.put(groupName, LocalDateTime);
}
private WaitStrategy getWaitWaitStrategy(String groupName, String sceneName) {
@ -145,20 +92,10 @@ public class ScanGroupActor extends AbstractActor {
return WaitStrategies.WaitStrategyEnum.getWaitStrategy(backOff);
}
private void retryCountIncrement(RetryTask retryTask) {
Integer retryCount = retryTask.getRetryCount();
retryTask.setRetryCount(++retryCount);
@Override
protected ActorRef getActorRef() {
return ActorGenerator.execUnitActor();
}
private void productExecUnitActor(RetryExecutor<Result<DispatchRetryResultDTO>> retryExecutor) {
String groupIdHash = retryExecutor.getRetryContext().getRetryTask().getGroupName();
Long retryId = retryExecutor.getRetryContext().getRetryTask().getId();
idempotentStrategy.set(groupIdHash, retryId.intValue());
ActorRef execUnitActor = ActorGenerator.execUnitActor();
// 将扫描到的数据tell 到执行单元中
execUnitActor.tell(retryExecutor, execUnitActor);
}
}

View File

@ -3,7 +3,7 @@ package com.aizuda.easy.retry.server.support.handler;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
import com.aizuda.easy.retry.server.persistence.support.RetryTaskAccess;
@ -35,13 +35,14 @@ public class CallbackRetryTaskHandler {
RetryTask callbackRetryTask = RetryTaskConverter.INSTANCE.toRetryTask(retryTask);
callbackRetryTask.setTaskType(TaskTypeEnum.CALLBACK.getType());
retryTask.setUniqueId(SystemConstants.CALL_BACK.CB_ + retryTask.getUniqueId());
retryTask.setRetryStatus(RetryStatusEnum.RUNNING.getStatus());
retryTask.setRetryCount(0);
retryTask.setCreateDt(LocalDateTime.now());
retryTask.setUpdateDt(LocalDateTime.now());
callbackRetryTask.setId(null);
callbackRetryTask.setUniqueId(SystemConstants.CALL_BACK.CB_ + retryTask.getUniqueId());
callbackRetryTask.setRetryStatus(RetryStatusEnum.RUNNING.getStatus());
callbackRetryTask.setRetryCount(0);
callbackRetryTask.setCreateDt(LocalDateTime.now());
callbackRetryTask.setUpdateDt(LocalDateTime.now());
retryTask.setNextTriggerAt(WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null));
callbackRetryTask.setNextTriggerAt(WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null));
Assert.isTrue(1 == retryTaskAccess.saveRetryTask(callbackRetryTask), () -> new EasyRetryServerException("failed to report data"));

View File

@ -8,6 +8,8 @@ import java.util.BitSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* BitSet幂等校验器
@ -21,7 +23,7 @@ public class BitSetIdempotentStrategyHandler implements IdempotentStrategy<Strin
* BIT_SET_MAP[key] : group
* BIT_SET_MAP[value] : BitSet
*/
public static final Map<String, BitSet> BIT_SET_MAP = new HashMap<>();
public static final ConcurrentMap<String, BitSet> BIT_SET_MAP = new ConcurrentHashMap<>();
@Override
public boolean set(String groupId, Integer key) {

View File

@ -1,7 +1,7 @@
package com.aizuda.easy.retry.server.support.strategy;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
import com.aizuda.easy.retry.server.persistence.mybatis.po.SceneConfig;
@ -12,7 +12,6 @@ import com.aizuda.easy.retry.common.core.util.CronExpression;
import com.google.common.base.Preconditions;
import com.aizuda.easy.retry.server.support.RetryContext;
import com.aizuda.easy.retry.server.support.WaitStrategy;
import com.aizuda.easy.retry.server.support.context.MaxAttemptsPersistenceRetryContext;
import lombok.Data;
import lombok.Getter;