feat: 2.4.0
1. 调整代码结构
This commit is contained in:
parent
76ed394e67
commit
4db063cc17
@ -29,6 +29,8 @@ public class SceneConfig implements Serializable {
|
|||||||
|
|
||||||
private Long deadlineRequest;
|
private Long deadlineRequest;
|
||||||
|
|
||||||
|
private Integer bucketIndex;
|
||||||
|
|
||||||
private LocalDateTime createDt;
|
private LocalDateTime createDt;
|
||||||
|
|
||||||
private LocalDateTime updateDt;
|
private LocalDateTime updateDt;
|
||||||
|
@ -15,6 +15,7 @@ public class ActorGenerator {
|
|||||||
|
|
||||||
public static final String SCAN_CALLBACK_GROUP_ACTOR = "ScanCallbackGroupActor";
|
public static final String SCAN_CALLBACK_GROUP_ACTOR = "ScanCallbackGroupActor";
|
||||||
public static final String SCAN_RETRY_GROUP_ACTOR = "ScanGroupActor";
|
public static final String SCAN_RETRY_GROUP_ACTOR = "ScanGroupActor";
|
||||||
|
public static final String SCAN_JOB_ACTOR = "ScanJobActor";
|
||||||
public static final String SCAN_BUCKET_ACTOR = "ScanBucketActor";
|
public static final String SCAN_BUCKET_ACTOR = "ScanBucketActor";
|
||||||
public static final String FINISH_ACTOR = "FinishActor";
|
public static final String FINISH_ACTOR = "FinishActor";
|
||||||
public static final String FAILURE_ACTOR = "FailureActor";
|
public static final String FAILURE_ACTOR = "FailureActor";
|
||||||
@ -82,7 +83,7 @@ public class ActorGenerator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 生成扫描重试数据的actor
|
* 生成扫描回调数据的actor
|
||||||
*
|
*
|
||||||
* @return actor 引用
|
* @return actor 引用
|
||||||
*/
|
*/
|
||||||
@ -90,6 +91,15 @@ public class ActorGenerator {
|
|||||||
return getDispatchRetryActorSystem().actorOf(getSpringExtension().props(SCAN_CALLBACK_GROUP_ACTOR));
|
return getDispatchRetryActorSystem().actorOf(getSpringExtension().props(SCAN_CALLBACK_GROUP_ACTOR));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 生成扫描JOB任务的actor
|
||||||
|
*
|
||||||
|
* @return actor 引用
|
||||||
|
*/
|
||||||
|
public static ActorRef scanJobActor() {
|
||||||
|
return getDispatchRetryActorSystem().actorOf(getSpringExtension().props(SCAN_JOB_ACTOR));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 生成扫描重试数据的actor
|
* 生成扫描重试数据的actor
|
||||||
*
|
*
|
||||||
|
@ -20,7 +20,6 @@ import java.util.concurrent.ConcurrentMap;
|
|||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Deprecated
|
|
||||||
public class CacheConsumerGroup implements Lifecycle {
|
public class CacheConsumerGroup implements Lifecycle {
|
||||||
|
|
||||||
private static Cache<String /*groupName*/, String/*groupName*/> CACHE;
|
private static Cache<String /*groupName*/, String/*groupName*/> CACHE;
|
||||||
@ -30,11 +29,29 @@ public class CacheConsumerGroup implements Lifecycle {
|
|||||||
*
|
*
|
||||||
* @return 缓存对象
|
* @return 缓存对象
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
|
||||||
public static Set<String> getAllConsumerGroupName() {
|
public static Set<String> getAllConsumerGroupName() {
|
||||||
ConcurrentMap<String, String> concurrentMap = CACHE.asMap();
|
ConcurrentMap<String, String> concurrentMap = CACHE.asMap();
|
||||||
return new HashSet<>(concurrentMap.values());
|
return new HashSet<>(concurrentMap.values());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 无缓存时添加
|
||||||
|
* 有缓存时更新
|
||||||
|
*
|
||||||
|
* @return 缓存对象
|
||||||
|
*/
|
||||||
|
public static synchronized void addOrUpdate(String groupName) {
|
||||||
|
LogUtils.info(log, "add consumer cache. groupName:[{}]", groupName);
|
||||||
|
CACHE.put(groupName, groupName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void remove(String groupName) {
|
||||||
|
LogUtils.info(log, "Remove consumer cache. groupName:[{}]", groupName);
|
||||||
|
CACHE.invalidate(groupName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void clear() {
|
||||||
|
CACHE.invalidateAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -0,0 +1,59 @@
|
|||||||
|
package com.aizuda.easy.retry.server.common.cache;
|
||||||
|
|
||||||
|
import akka.actor.ActorRef;
|
||||||
|
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||||
|
import com.aizuda.easy.retry.server.common.Lifecycle;
|
||||||
|
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
|
||||||
|
import com.google.common.cache.Cache;
|
||||||
|
import com.google.common.cache.CacheBuilder;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 缓存组扫描Actor
|
||||||
|
*
|
||||||
|
* @author www.byteblogs.com
|
||||||
|
* @date 2021-10-30
|
||||||
|
* @since 1.0.0
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
@Data
|
||||||
|
@Slf4j
|
||||||
|
public class CacheGroupScanActor implements Lifecycle {
|
||||||
|
|
||||||
|
private static Cache<String, ActorRef> CACHE;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取所有缓存
|
||||||
|
*
|
||||||
|
* @return 缓存对象
|
||||||
|
*/
|
||||||
|
public static ActorRef get(String groupName, TaskTypeEnum typeEnum) {
|
||||||
|
return CACHE.getIfPresent(groupName.concat(typeEnum.name()));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取所有缓存
|
||||||
|
*
|
||||||
|
* @return 缓存对象
|
||||||
|
*/
|
||||||
|
public static void put(String groupName, TaskTypeEnum typeEnum, ActorRef actorRef) {
|
||||||
|
CACHE.put(groupName.concat(typeEnum.name()), actorRef);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start() {
|
||||||
|
LogUtils.info(log, "CacheGroupScanActor start");
|
||||||
|
CACHE = CacheBuilder.newBuilder()
|
||||||
|
// 设置并发级别为cpu核心数
|
||||||
|
.concurrencyLevel(Runtime.getRuntime().availableProcessors())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
LogUtils.info(log, "CacheGroupScanActor stop");
|
||||||
|
CACHE.invalidateAll();
|
||||||
|
}
|
||||||
|
}
|
@ -1,7 +1,9 @@
|
|||||||
package com.aizuda.easy.retry.server.retry.task.support.dispatch;
|
package com.aizuda.easy.retry.server.common.dto;
|
||||||
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 扫描任务模型
|
* 扫描任务模型
|
||||||
*
|
*
|
||||||
@ -12,5 +14,7 @@ import lombok.Data;
|
|||||||
@Data
|
@Data
|
||||||
public class ScanTask {
|
public class ScanTask {
|
||||||
|
|
||||||
String groupName;
|
private String groupName;
|
||||||
|
|
||||||
|
private Set<Integer> buckets;
|
||||||
}
|
}
|
@ -16,7 +16,10 @@ import java.util.function.Supplier;
|
|||||||
@Getter
|
@Getter
|
||||||
public enum TaskTypeEnum {
|
public enum TaskTypeEnum {
|
||||||
RETRY(1, ActorGenerator::scanGroupActor),
|
RETRY(1, ActorGenerator::scanGroupActor),
|
||||||
CALLBACK(2, ActorGenerator::scanCallbackGroupActor);
|
CALLBACK(2, ActorGenerator::scanCallbackGroupActor),
|
||||||
|
JOB(3, ActorGenerator::scanJobActor),
|
||||||
|
;
|
||||||
|
|
||||||
|
|
||||||
private final Integer type;
|
private final Integer type;
|
||||||
private final Supplier<ActorRef> actorRef;
|
private final Supplier<ActorRef> actorRef;
|
||||||
|
@ -95,7 +95,7 @@ public class ServerNodeBalance implements Lifecycle, Runnable {
|
|||||||
|
|
||||||
int bucketTotal = systemProperties.getBucketTotal();
|
int bucketTotal = systemProperties.getBucketTotal();
|
||||||
bucketList = new ArrayList<>(bucketTotal);
|
bucketList = new ArrayList<>(bucketTotal);
|
||||||
for (int i = 1; i <= bucketTotal; i++) {
|
for (int i = 0; i < bucketTotal; i++) {
|
||||||
bucketList.add(i);
|
bucketList.add(i);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,6 +81,18 @@ public class ClientRegister extends AbstractRegister implements Runnable {
|
|||||||
refreshExpireAt(serverNode);
|
refreshExpireAt(serverNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 同步当前POD消费的组的节点信息
|
||||||
|
// netty的client只会注册到一个服务端,若组分配的和client连接的不是一个POD则会导致当前POD没有其他客户端的注册信息
|
||||||
|
Set<String> allConsumerGroupName = CacheConsumerGroup.getAllConsumerGroupName();
|
||||||
|
if (!CollectionUtils.isEmpty(allConsumerGroupName)) {
|
||||||
|
List<ServerNode> serverNodes = serverNodeMapper.selectList(
|
||||||
|
new LambdaQueryWrapper<ServerNode>().in(ServerNode::getGroupName, allConsumerGroupName));
|
||||||
|
for (final ServerNode node : serverNodes) {
|
||||||
|
// 刷新全量本地缓存
|
||||||
|
CacheRegisterTable.addOrUpdate(node.getGroupName(), node);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}catch (InterruptedException e) {
|
}catch (InterruptedException e) {
|
||||||
LogUtils.info(log, "[{}] thread stop.", Thread.currentThread().getName());
|
LogUtils.info(log, "[{}] thread stop.", Thread.currentThread().getName());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -18,7 +18,45 @@
|
|||||||
<maven.compiler.target>8</maven.compiler.target>
|
<maven.compiler.target>8</maven.compiler.target>
|
||||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
</properties>
|
</properties>
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.aizuda</groupId>
|
||||||
|
<artifactId>easy-retry-server-common</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter-web</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.projectlombok</groupId>
|
||||||
|
<artifactId>lombok</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.aizuda</groupId>
|
||||||
|
<artifactId>easy-retry-common-server-api</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.aizuda</groupId>
|
||||||
|
<artifactId>easy-retry-datasource-template</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.aizuda</groupId>
|
||||||
|
<artifactId>easy-retry-common-client-api</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.mapstruct</groupId>
|
||||||
|
<artifactId>mapstruct</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.mapstruct</groupId>
|
||||||
|
<artifactId>mapstruct-processor</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.github.rholder</groupId>
|
||||||
|
<artifactId>guava-retrying</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
</dependencies>
|
||||||
<build>
|
<build>
|
||||||
<plugins>
|
<plugins>
|
||||||
<plugin>
|
<plugin>
|
||||||
|
@ -0,0 +1,16 @@
|
|||||||
|
package com.aizuda.easy.retry.server.job.task.config;
|
||||||
|
|
||||||
|
import org.springframework.context.annotation.ComponentScan;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* web访问模块
|
||||||
|
*
|
||||||
|
* @author: www.byteblogs.com
|
||||||
|
* @date : 2023-09-19 09:21
|
||||||
|
*/
|
||||||
|
@Configuration
|
||||||
|
@ComponentScan("com.aizuda.easy.retry.server.job.task.*")
|
||||||
|
public class EasyRetryServerRetryJobAutoConfiguration {
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,41 @@
|
|||||||
|
package com.aizuda.easy.retry.server.job.task.scan;
|
||||||
|
|
||||||
|
import akka.actor.AbstractActor;
|
||||||
|
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||||
|
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||||
|
import com.aizuda.easy.retry.server.common.dto.ScanTask;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||||
|
import org.springframework.context.annotation.Scope;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* JOB任务扫描
|
||||||
|
*
|
||||||
|
* @author: www.byteblogs.com
|
||||||
|
* @date : 2023-09-22 09:08
|
||||||
|
* @since 2.4.0
|
||||||
|
*/
|
||||||
|
@Component(ActorGenerator.SCAN_JOB_ACTOR)
|
||||||
|
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||||||
|
@Slf4j
|
||||||
|
public class ScanJobTaskActor extends AbstractActor {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Receive createReceive() {
|
||||||
|
return receiveBuilder().match(ScanTask.class, config -> {
|
||||||
|
|
||||||
|
try {
|
||||||
|
doScan(config);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LogUtils.error(log, "Data scanner processing exception. [{}]", config, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
}).build();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doScan(final ScanTask scanTask) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,2 @@
|
|||||||
|
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
|
||||||
|
com.aizuda.easy.retry.server.job.task.config.EasyRetryServerRetryJobAutoConfiguration
|
@ -0,0 +1,18 @@
|
|||||||
|
package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor;
|
||||||
|
|
||||||
|
import io.netty.util.Timeout;
|
||||||
|
import io.netty.util.TimerTask;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author: www.byteblogs.com
|
||||||
|
* @date : 2023-09-22 17:09
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public class CallbackTimerTask implements TimerTask {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run(final Timeout timeout) throws Exception {
|
||||||
|
log.info("回调任务执行");
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,25 @@
|
|||||||
|
package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor;
|
||||||
|
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
|
||||||
|
import io.netty.util.Timeout;
|
||||||
|
import io.netty.util.TimerTask;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author: www.byteblogs.com
|
||||||
|
* @date : 2023-09-22 17:09
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@Slf4j
|
||||||
|
public class RetryTimerTask implements TimerTask {
|
||||||
|
|
||||||
|
private RetryExecutor executor;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run(final Timeout timeout) throws Exception {
|
||||||
|
log.info("重试任务执行");
|
||||||
|
// RetryContext retryContext = executor.getRetryContext();
|
||||||
|
// RetryTask retryTask = retryContext.getRetryTask();
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,45 @@
|
|||||||
|
package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor;
|
||||||
|
|
||||||
|
import com.aizuda.easy.retry.server.common.Lifecycle;
|
||||||
|
import io.netty.util.HashedWheelTimer;
|
||||||
|
import io.netty.util.Timeout;
|
||||||
|
import io.netty.util.TimerTask;
|
||||||
|
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author: www.byteblogs.com
|
||||||
|
* @date : 2023-09-22 17:03
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class TimerWheelHandler implements Lifecycle {
|
||||||
|
|
||||||
|
private static HashedWheelTimer hashedWheelTimer = null;
|
||||||
|
|
||||||
|
public static ConcurrentHashMap<String, Timeout> taskConcurrentHashMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start() {
|
||||||
|
hashedWheelTimer = new HashedWheelTimer(
|
||||||
|
new CustomizableThreadFactory("retry-task-timer-wheel"), 10, TimeUnit.MILLISECONDS, 16);
|
||||||
|
hashedWheelTimer.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void register(String groupName, String uniqueId, TimerTask task, long delay, TimeUnit unit) {
|
||||||
|
Timeout timeout = hashedWheelTimer.newTimeout(task, delay, unit);
|
||||||
|
taskConcurrentHashMap.put(groupName.concat("_").concat(uniqueId), timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean cancel(Long taskId) {
|
||||||
|
Timeout timeout = taskConcurrentHashMap.get(taskId);
|
||||||
|
return timeout.cancel();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
hashedWheelTimer.stop();
|
||||||
|
}
|
||||||
|
}
|
@ -9,11 +9,15 @@ import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
|||||||
import com.aizuda.easy.retry.server.common.config.SystemProperties;
|
import com.aizuda.easy.retry.server.common.config.SystemProperties;
|
||||||
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
|
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
|
||||||
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.CallbackTimerTask;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.RetryTimerTask;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.TimerWheelHandler;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO;
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.handler.CallbackRetryTaskHandler;
|
import com.aizuda.easy.retry.server.retry.task.support.handler.CallbackRetryTaskHandler;
|
||||||
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
|
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
|
||||||
|
import io.netty.util.TimerTask;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||||
@ -24,6 +28,8 @@ import org.springframework.transaction.support.TransactionCallbackWithoutResult;
|
|||||||
import org.springframework.transaction.support.TransactionTemplate;
|
import org.springframework.transaction.support.TransactionTemplate;
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
|
import java.time.ZoneId;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 重试完成执行器 1、更新重试任务 2、记录重试日志
|
* 重试完成执行器 1、更新重试任务 2、记录重试日志
|
||||||
@ -60,11 +66,14 @@ public class FailureActor extends AbstractActor {
|
|||||||
@Override
|
@Override
|
||||||
protected void doInTransactionWithoutResult(TransactionStatus status) {
|
protected void doInTransactionWithoutResult(TransactionStatus status) {
|
||||||
|
|
||||||
|
TimerTask timerTask = null;
|
||||||
Integer maxRetryCount;
|
Integer maxRetryCount;
|
||||||
if (TaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) {
|
if (TaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) {
|
||||||
maxRetryCount = systemProperties.getCallback().getMaxCount();
|
maxRetryCount = systemProperties.getCallback().getMaxCount();
|
||||||
|
timerTask = new CallbackTimerTask();
|
||||||
} else {
|
} else {
|
||||||
maxRetryCount = sceneConfig.getMaxRetryCount();
|
maxRetryCount = sceneConfig.getMaxRetryCount();
|
||||||
|
timerTask = new RetryTimerTask();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (maxRetryCount <= retryTask.getRetryCount()) {
|
if (maxRetryCount <= retryTask.getRetryCount()) {
|
||||||
@ -73,6 +82,11 @@ public class FailureActor extends AbstractActor {
|
|||||||
callbackRetryTaskHandler.create(retryTask);
|
callbackRetryTaskHandler.create(retryTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO 计算延迟的时间 此处需要判断符合条件的才会进入时间轮
|
||||||
|
LocalDateTime nextTriggerAt = retryTask.getNextTriggerAt();
|
||||||
|
long delay = nextTriggerAt.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
|
||||||
|
TimerWheelHandler.register(retryTask.getGroupName(), retryTask.getUniqueId(), timerTask, delay, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
retryTask.setUpdateDt(LocalDateTime.now());
|
retryTask.setUpdateDt(LocalDateTime.now());
|
||||||
Assert.isTrue(1 == accessTemplate.getRetryTaskAccess()
|
Assert.isTrue(1 == accessTemplate.getRetryTaskAccess()
|
||||||
.updateById(retryTask.getGroupName(), retryTask),
|
.updateById(retryTask.getGroupName(), retryTask),
|
||||||
|
@ -8,21 +8,29 @@ import com.aizuda.easy.retry.common.core.log.LogUtils;
|
|||||||
import com.aizuda.easy.retry.server.common.config.SystemProperties;
|
import com.aizuda.easy.retry.server.common.config.SystemProperties;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy;
|
import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
|
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.dispatch.ScanTask;
|
import com.aizuda.easy.retry.server.common.dto.ScanTask;
|
||||||
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
|
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.RetryTimerTask;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.TimerWheelHandler;
|
||||||
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
|
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
|
||||||
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
|
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
||||||
|
import io.netty.util.HashedWheelTimer;
|
||||||
|
import io.netty.util.Timeout;
|
||||||
|
import io.netty.util.TimerTask;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
|
||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 数据扫描模板类
|
* 数据扫描模板类
|
||||||
@ -44,6 +52,7 @@ public abstract class AbstractScanGroup extends AbstractActor {
|
|||||||
@Autowired
|
@Autowired
|
||||||
protected ClientNodeAllocateHandler clientNodeAllocateHandler;
|
protected ClientNodeAllocateHandler clientNodeAllocateHandler;
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Receive createReceive() {
|
public Receive createReceive() {
|
||||||
return receiveBuilder().match(ScanTask.class, config -> {
|
return receiveBuilder().match(ScanTask.class, config -> {
|
||||||
@ -89,12 +98,15 @@ public abstract class AbstractScanGroup extends AbstractActor {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
productExecUnitActor(executor);
|
Timeout timeout = TimerWheelHandler.taskConcurrentHashMap.get(retryTask.getGroupName().concat("_").concat(retryTask.getUniqueId()));
|
||||||
|
if (Objects.isNull(timeout)) {
|
||||||
|
productExecUnitActor(executor);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// 数据为空则休眠5s
|
// 数据为空则休眠5s
|
||||||
try {
|
try {
|
||||||
Thread.sleep((DispatchService.PERIOD / 2) * 1000);
|
Thread.sleep((10 / 2) * 1000);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
|
@ -29,9 +29,9 @@ import java.util.concurrent.ConcurrentMap;
|
|||||||
@Component(ActorGenerator.SCAN_CALLBACK_GROUP_ACTOR)
|
@Component(ActorGenerator.SCAN_CALLBACK_GROUP_ACTOR)
|
||||||
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class ScanCallbackGroupActor extends AbstractScanGroup {
|
public class ScanCallbackTaskActor extends AbstractScanGroup {
|
||||||
|
|
||||||
public static final String BEAN_NAME = "ScanCallbackGroupActor";
|
public static final String BEAN_NAME = "ScanCallbackTaskActor";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 缓存待拉取数据的起点id
|
* 缓存待拉取数据的起点id
|
@ -31,7 +31,7 @@ import java.util.concurrent.ConcurrentMap;
|
|||||||
@Component(ActorGenerator.SCAN_RETRY_GROUP_ACTOR)
|
@Component(ActorGenerator.SCAN_RETRY_GROUP_ACTOR)
|
||||||
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class ScanGroupActor extends AbstractScanGroup {
|
public class ScanRetryTaskActor extends AbstractScanGroup {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 缓存待拉取数据的起点id
|
* 缓存待拉取数据的起点id
|
@ -2,6 +2,8 @@ package com.aizuda.easy.retry.server.dispatch;
|
|||||||
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author www.byteblogs.com
|
* @author www.byteblogs.com
|
||||||
* @date 2023-09-21 23:30:22
|
* @date 2023-09-21 23:30:22
|
||||||
@ -10,5 +12,5 @@ import lombok.Data;
|
|||||||
@Data
|
@Data
|
||||||
public class ConsumerBucket {
|
public class ConsumerBucket {
|
||||||
|
|
||||||
private Integer bucket;
|
private Set<Integer> buckets;
|
||||||
}
|
}
|
||||||
|
@ -1,12 +1,34 @@
|
|||||||
package com.aizuda.easy.retry.server.dispatch;
|
package com.aizuda.easy.retry.server.dispatch;
|
||||||
|
|
||||||
import akka.actor.AbstractActor;
|
import akka.actor.AbstractActor;
|
||||||
|
import akka.actor.ActorRef;
|
||||||
|
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
|
||||||
|
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||||
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||||
|
import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup;
|
||||||
|
import com.aizuda.easy.retry.server.common.cache.CacheGroupScanActor;
|
||||||
|
import com.aizuda.easy.retry.server.common.config.SystemProperties;
|
||||||
|
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
|
||||||
|
import com.aizuda.easy.retry.server.common.dto.ScanTask;
|
||||||
|
import com.aizuda.easy.retry.server.retry.task.support.cache.CacheGroupRateLimiter;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.ServerNodeMapper;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode;
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
|
import com.google.common.cache.Cache;
|
||||||
|
import com.google.common.util.concurrent.RateLimiter;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||||
import org.springframework.context.annotation.Scope;
|
import org.springframework.context.annotation.Scope;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 消费当前节点分配的bucket并生成扫描任务
|
* 消费当前节点分配的bucket并生成扫描任务
|
||||||
* <p>
|
* <p>
|
||||||
@ -19,8 +41,101 @@ import org.springframework.stereotype.Component;
|
|||||||
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class ConsumerBucketActor extends AbstractActor {
|
public class ConsumerBucketActor extends AbstractActor {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
protected AccessTemplate accessTemplate;
|
||||||
|
@Autowired
|
||||||
|
protected ServerNodeMapper serverNodeMapper;
|
||||||
|
@Autowired
|
||||||
|
protected SystemProperties systemProperties;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Receive createReceive() {
|
public Receive createReceive() {
|
||||||
return null;
|
return receiveBuilder().match(ConsumerBucket.class, consumerBucket -> {
|
||||||
|
|
||||||
|
try {
|
||||||
|
doDispatch(consumerBucket);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LogUtils.error(log, "Data dispatcher processing exception. [{}]", consumerBucket, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
}).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doDispatch(final ConsumerBucket consumerBucket) {
|
||||||
|
|
||||||
|
// 查询桶对应组信息
|
||||||
|
Set<String> groupNameSet = accessTemplate.getSceneConfigAccess().list(
|
||||||
|
new LambdaQueryWrapper<SceneConfig>().select(SceneConfig::getGroupName)
|
||||||
|
.eq(SceneConfig::getSceneStatus, StatusEnum.YES.getStatus())
|
||||||
|
.in(SceneConfig::getBucketIndex, consumerBucket.getBuckets())
|
||||||
|
.groupBy(SceneConfig::getGroupName)).stream().map(SceneConfig::getGroupName).collect(Collectors.toSet());
|
||||||
|
|
||||||
|
CacheConsumerGroup.clear();
|
||||||
|
// todo 需要对groupNameSet进行状态过滤只有开启才进行任务调度
|
||||||
|
for (final String groupName : groupNameSet) {
|
||||||
|
CacheConsumerGroup.addOrUpdate(groupName);
|
||||||
|
ScanTask scanTask = new ScanTask();
|
||||||
|
scanTask.setBuckets(consumerBucket.getBuckets());
|
||||||
|
scanTask.setGroupName(groupName);
|
||||||
|
produceScanActorTask(scanTask);
|
||||||
|
}
|
||||||
|
|
||||||
|
// job
|
||||||
|
ScanTask scanTask = new ScanTask();
|
||||||
|
scanTask.setBuckets(consumerBucket.getBuckets());
|
||||||
|
ActorRef jobActor = TaskTypeEnum.JOB.getActorRef().get();
|
||||||
|
jobActor.tell(scanTask, jobActor);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 扫描任务生成器
|
||||||
|
*
|
||||||
|
* @param scanTask {@link ScanTask} 组上下文
|
||||||
|
*/
|
||||||
|
private void produceScanActorTask(ScanTask scanTask) {
|
||||||
|
|
||||||
|
String groupName = scanTask.getGroupName();
|
||||||
|
|
||||||
|
// 缓存按照
|
||||||
|
cacheRateLimiter(groupName);
|
||||||
|
|
||||||
|
// 扫描重试数据
|
||||||
|
ActorRef scanRetryActorRef = cacheActorRef(groupName, TaskTypeEnum.RETRY);
|
||||||
|
scanRetryActorRef.tell(scanTask, scanRetryActorRef);
|
||||||
|
|
||||||
|
// 扫描回调数据
|
||||||
|
ActorRef scanCallbackActorRef = cacheActorRef(groupName, TaskTypeEnum.CALLBACK);
|
||||||
|
scanCallbackActorRef.tell(scanTask, scanCallbackActorRef);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 缓存限流对象
|
||||||
|
*/
|
||||||
|
private void cacheRateLimiter(String groupName) {
|
||||||
|
List<ServerNode> serverNodes = serverNodeMapper.selectList(new LambdaQueryWrapper<ServerNode>()
|
||||||
|
.eq(ServerNode::getGroupName, groupName));
|
||||||
|
Cache<String, RateLimiter> rateLimiterCache = CacheGroupRateLimiter.getAll();
|
||||||
|
for (ServerNode serverNode : serverNodes) {
|
||||||
|
RateLimiter rateLimiter = rateLimiterCache.getIfPresent(serverNode.getHostId());
|
||||||
|
if (Objects.isNull(rateLimiter)) {
|
||||||
|
rateLimiterCache.put(serverNode.getHostId(), RateLimiter.create(systemProperties.getLimiter()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 缓存Actor对象
|
||||||
|
*/
|
||||||
|
private ActorRef cacheActorRef(String groupName, TaskTypeEnum typeEnum) {
|
||||||
|
ActorRef scanActorRef = CacheGroupScanActor.get(groupName, typeEnum);
|
||||||
|
if (Objects.isNull(scanActorRef)) {
|
||||||
|
scanActorRef = typeEnum.getActorRef().get();
|
||||||
|
// 缓存扫描器actor
|
||||||
|
CacheGroupScanActor.put(groupName, typeEnum, scanActorRef);
|
||||||
|
}
|
||||||
|
return scanActorRef;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -47,6 +47,9 @@ public class DispatchService implements Lifecycle {
|
|||||||
@Override
|
@Override
|
||||||
public void start() {
|
public void start() {
|
||||||
|
|
||||||
|
// TODO待优化
|
||||||
|
ActorRef actorRef = ActorGenerator.scanBucketActor();
|
||||||
|
|
||||||
dispatchService.scheduleAtFixedRate(() -> {
|
dispatchService.scheduleAtFixedRate(() -> {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@ -59,11 +62,9 @@ public class DispatchService implements Lifecycle {
|
|||||||
Set<Integer> currentConsumerBuckets = getConsumerBucket();
|
Set<Integer> currentConsumerBuckets = getConsumerBucket();
|
||||||
LogUtils.info(log, "当前节点分配的桶:[{}]", currentConsumerBuckets);
|
LogUtils.info(log, "当前节点分配的桶:[{}]", currentConsumerBuckets);
|
||||||
if (!CollectionUtils.isEmpty(currentConsumerBuckets)) {
|
if (!CollectionUtils.isEmpty(currentConsumerBuckets)) {
|
||||||
for (Integer bucket : currentConsumerBuckets) {
|
ConsumerBucket scanTaskDTO = new ConsumerBucket();
|
||||||
ConsumerBucket scanTaskDTO = new ConsumerBucket();
|
scanTaskDTO.setBuckets(currentConsumerBuckets);
|
||||||
scanTaskDTO.setBucket(bucket);
|
actorRef.tell(scanTaskDTO, actorRef);
|
||||||
produceConsumerBucketActorTask(scanTaskDTO);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
@ -75,22 +76,6 @@ public class DispatchService implements Lifecycle {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 生成bucket对应的Actor
|
|
||||||
*
|
|
||||||
* @param consumerBucket {@link ConsumerBucket} 消费的桶的上下文
|
|
||||||
*/
|
|
||||||
private void produceConsumerBucketActorTask(ConsumerBucket consumerBucket) {
|
|
||||||
|
|
||||||
Integer bucket = consumerBucket.getBucket();
|
|
||||||
|
|
||||||
// 缓存bucket对应的 Actor
|
|
||||||
ActorRef actorRef = cacheActorRef(bucket);
|
|
||||||
actorRef.tell(consumerBucket, actorRef);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 分配当前POD负责消费的桶
|
* 分配当前POD负责消费的桶
|
||||||
*
|
*
|
||||||
@ -100,19 +85,6 @@ public class DispatchService implements Lifecycle {
|
|||||||
return DistributeInstance.INSTANCE.getConsumerBucket();
|
return DistributeInstance.INSTANCE.getConsumerBucket();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 缓存Actor对象
|
|
||||||
*/
|
|
||||||
private ActorRef cacheActorRef(Integer bucket) {
|
|
||||||
ActorRef scanActorRef = CacheBucketActor.get(bucket);
|
|
||||||
if (Objects.isNull(scanActorRef)) {
|
|
||||||
scanActorRef = ActorGenerator.scanBucketActor();
|
|
||||||
// 缓存扫描器actor
|
|
||||||
CacheBucketActor.put(bucket, scanActorRef);
|
|
||||||
}
|
|
||||||
return scanActorRef;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user