feat: 2.4.0

1. 调整代码结构
This commit is contained in:
byteblogs168 2023-09-22 22:08:19 +08:00
parent 28b488135b
commit 9cef08d338
22 changed files with 456 additions and 49 deletions

View File

@ -29,6 +29,8 @@ public class SceneConfig implements Serializable {
private Long deadlineRequest;
private Integer bucketIndex;
private LocalDateTime createDt;
private LocalDateTime updateDt;

View File

@ -15,6 +15,7 @@ public class ActorGenerator {
public static final String SCAN_CALLBACK_GROUP_ACTOR = "ScanCallbackGroupActor";
public static final String SCAN_RETRY_GROUP_ACTOR = "ScanGroupActor";
public static final String SCAN_JOB_ACTOR = "ScanJobActor";
public static final String SCAN_BUCKET_ACTOR = "ScanBucketActor";
public static final String FINISH_ACTOR = "FinishActor";
public static final String FAILURE_ACTOR = "FailureActor";
@ -82,7 +83,7 @@ public class ActorGenerator {
}
/**
* 生成扫描重试数据的actor
* 生成扫描回调数据的actor
*
* @return actor 引用
*/
@ -90,6 +91,15 @@ public class ActorGenerator {
return getDispatchRetryActorSystem().actorOf(getSpringExtension().props(SCAN_CALLBACK_GROUP_ACTOR));
}
/**
* 生成扫描JOB任务的actor
*
* @return actor 引用
*/
public static ActorRef scanJobActor() {
return getDispatchRetryActorSystem().actorOf(getSpringExtension().props(SCAN_JOB_ACTOR));
}
/**
* 生成扫描重试数据的actor
*

View File

@ -20,7 +20,6 @@ import java.util.concurrent.ConcurrentMap;
*/
@Component
@Slf4j
@Deprecated
public class CacheConsumerGroup implements Lifecycle {
private static Cache<String /*groupName*/, String/*groupName*/> CACHE;
@ -30,11 +29,29 @@ public class CacheConsumerGroup implements Lifecycle {
*
* @return 缓存对象
*/
@Deprecated
public static Set<String> getAllConsumerGroupName() {
ConcurrentMap<String, String> concurrentMap = CACHE.asMap();
return new HashSet<>(concurrentMap.values());
}
/**
* 无缓存时添加
* 有缓存时更新
*
* @return 缓存对象
*/
public static synchronized void addOrUpdate(String groupName) {
LogUtils.info(log, "add consumer cache. groupName:[{}]", groupName);
CACHE.put(groupName, groupName);
}
public static void remove(String groupName) {
LogUtils.info(log, "Remove consumer cache. groupName:[{}]", groupName);
CACHE.invalidate(groupName);
}
public static void clear() {
CACHE.invalidateAll();
}
@Override

View File

@ -0,0 +1,59 @@
package com.aizuda.easy.retry.server.common.cache;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 缓存组扫描Actor
*
* @author www.byteblogs.com
* @date 2021-10-30
* @since 1.0.0
*/
@Component
@Data
@Slf4j
public class CacheGroupScanActor implements Lifecycle {
private static Cache<String, ActorRef> CACHE;
/**
* 获取所有缓存
*
* @return 缓存对象
*/
public static ActorRef get(String groupName, TaskTypeEnum typeEnum) {
return CACHE.getIfPresent(groupName.concat(typeEnum.name()));
}
/**
* 获取所有缓存
*
* @return 缓存对象
*/
public static void put(String groupName, TaskTypeEnum typeEnum, ActorRef actorRef) {
CACHE.put(groupName.concat(typeEnum.name()), actorRef);
}
@Override
public void start() {
LogUtils.info(log, "CacheGroupScanActor start");
CACHE = CacheBuilder.newBuilder()
// 设置并发级别为cpu核心数
.concurrencyLevel(Runtime.getRuntime().availableProcessors())
.build();
}
@Override
public void close() {
LogUtils.info(log, "CacheGroupScanActor stop");
CACHE.invalidateAll();
}
}

View File

@ -1,7 +1,9 @@
package com.aizuda.easy.retry.server.retry.task.support.dispatch;
package com.aizuda.easy.retry.server.common.dto;
import lombok.Data;
import java.util.Set;
/**
* 扫描任务模型
*
@ -12,5 +14,7 @@ import lombok.Data;
@Data
public class ScanTask {
String groupName;
private String groupName;
private Set<Integer> buckets;
}

View File

@ -16,7 +16,10 @@ import java.util.function.Supplier;
@Getter
public enum TaskTypeEnum {
RETRY(1, ActorGenerator::scanGroupActor),
CALLBACK(2, ActorGenerator::scanCallbackGroupActor);
CALLBACK(2, ActorGenerator::scanCallbackGroupActor),
JOB(3, ActorGenerator::scanJobActor),
;
private final Integer type;
private final Supplier<ActorRef> actorRef;

View File

@ -95,7 +95,7 @@ public class ServerNodeBalance implements Lifecycle, Runnable {
int bucketTotal = systemProperties.getBucketTotal();
bucketList = new ArrayList<>(bucketTotal);
for (int i = 1; i <= bucketTotal; i++) {
for (int i = 0; i < bucketTotal; i++) {
bucketList.add(i);
}

View File

@ -81,6 +81,18 @@ public class ClientRegister extends AbstractRegister implements Runnable {
refreshExpireAt(serverNode);
}
// 同步当前POD消费的组的节点信息
// netty的client只会注册到一个服务端若组分配的和client连接的不是一个POD则会导致当前POD没有其他客户端的注册信息
Set<String> allConsumerGroupName = CacheConsumerGroup.getAllConsumerGroupName();
if (!CollectionUtils.isEmpty(allConsumerGroupName)) {
List<ServerNode> serverNodes = serverNodeMapper.selectList(
new LambdaQueryWrapper<ServerNode>().in(ServerNode::getGroupName, allConsumerGroupName));
for (final ServerNode node : serverNodes) {
// 刷新全量本地缓存
CacheRegisterTable.addOrUpdate(node.getGroupName(), node);
}
}
}catch (InterruptedException e) {
LogUtils.info(log, "[{}] thread stop.", Thread.currentThread().getName());
} catch (Exception e) {

View File

@ -18,7 +18,45 @@
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.aizuda</groupId>
<artifactId>easy-retry-server-common</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.aizuda</groupId>
<artifactId>easy-retry-common-server-api</artifactId>
</dependency>
<dependency>
<groupId>com.aizuda</groupId>
<artifactId>easy-retry-datasource-template</artifactId>
</dependency>
<dependency>
<groupId>com.aizuda</groupId>
<artifactId>easy-retry-common-client-api</artifactId>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct</artifactId>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct-processor</artifactId>
</dependency>
<dependency>
<groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>

View File

@ -0,0 +1,16 @@
package com.aizuda.easy.retry.server.job.task.config;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
/**
* web访问模块
*
* @author: www.byteblogs.com
* @date : 2023-09-19 09:21
*/
@Configuration
@ComponentScan("com.aizuda.easy.retry.server.job.task.*")
public class EasyRetryServerRetryJobAutoConfiguration {
}

View File

@ -0,0 +1,41 @@
package com.aizuda.easy.retry.server.job.task.scan;
import akka.actor.AbstractActor;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.dto.ScanTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
/**
* JOB任务扫描
*
* @author: www.byteblogs.com
* @date : 2023-09-22 09:08
* @since 2.4.0
*/
@Component(ActorGenerator.SCAN_JOB_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class ScanJobTaskActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder().match(ScanTask.class, config -> {
try {
doScan(config);
} catch (Exception e) {
LogUtils.error(log, "Data scanner processing exception. [{}]", config, e);
}
}).build();
}
private void doScan(final ScanTask scanTask) {
}
}

View File

@ -0,0 +1,2 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.aizuda.easy.retry.server.job.task.config.EasyRetryServerRetryJobAutoConfiguration

View File

@ -0,0 +1,18 @@
package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
/**
* @author: www.byteblogs.com
* @date : 2023-09-22 17:09
*/
@Slf4j
public class CallbackTimerTask implements TimerTask {
@Override
public void run(final Timeout timeout) throws Exception {
log.info("回调任务执行");
}
}

View File

@ -0,0 +1,25 @@
package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor;
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
/**
* @author: www.byteblogs.com
* @date : 2023-09-22 17:09
*/
@Data
@Slf4j
public class RetryTimerTask implements TimerTask {
private RetryExecutor executor;
@Override
public void run(final Timeout timeout) throws Exception {
log.info("重试任务执行");
// RetryContext retryContext = executor.getRetryContext();
// RetryTask retryTask = retryContext.getRetryTask();
}
}

View File

@ -0,0 +1,45 @@
package com.aizuda.easy.retry.server.retry.task.support.dispatch.actor;
import com.aizuda.easy.retry.server.common.Lifecycle;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* @author: www.byteblogs.com
* @date : 2023-09-22 17:03
*/
@Component
public class TimerWheelHandler implements Lifecycle {
private static HashedWheelTimer hashedWheelTimer = null;
public static ConcurrentHashMap<String, Timeout> taskConcurrentHashMap = new ConcurrentHashMap<>();
@Override
public void start() {
hashedWheelTimer = new HashedWheelTimer(
new CustomizableThreadFactory("retry-task-timer-wheel"), 10, TimeUnit.MILLISECONDS, 16);
hashedWheelTimer.start();
}
public static void register(String groupName, String uniqueId, TimerTask task, long delay, TimeUnit unit) {
Timeout timeout = hashedWheelTimer.newTimeout(task, delay, unit);
taskConcurrentHashMap.put(groupName.concat("_").concat(uniqueId), timeout);
}
public static boolean cancel(Long taskId) {
Timeout timeout = taskConcurrentHashMap.get(taskId);
return timeout.cancel();
}
@Override
public void close() {
hashedWheelTimer.stop();
}
}

View File

@ -9,11 +9,15 @@ import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.CallbackTimerTask;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.RetryTimerTask;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.TimerWheelHandler;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO;
import com.aizuda.easy.retry.server.retry.task.support.handler.CallbackRetryTaskHandler;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
@ -24,6 +28,8 @@ import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.concurrent.TimeUnit;
/**
* 重试完成执行器 1更新重试任务 2记录重试日志
@ -60,11 +66,14 @@ public class FailureActor extends AbstractActor {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
TimerTask timerTask = null;
Integer maxRetryCount;
if (TaskTypeEnum.CALLBACK.getType().equals(retryTask.getTaskType())) {
maxRetryCount = systemProperties.getCallback().getMaxCount();
timerTask = new CallbackTimerTask();
} else {
maxRetryCount = sceneConfig.getMaxRetryCount();
timerTask = new RetryTimerTask();
}
if (maxRetryCount <= retryTask.getRetryCount()) {
@ -73,6 +82,11 @@ public class FailureActor extends AbstractActor {
callbackRetryTaskHandler.create(retryTask);
}
// TODO 计算延迟的时间 此处需要判断符合条件的才会进入时间轮
LocalDateTime nextTriggerAt = retryTask.getNextTriggerAt();
long delay = nextTriggerAt.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
TimerWheelHandler.register(retryTask.getGroupName(), retryTask.getUniqueId(), timerTask, delay, TimeUnit.MILLISECONDS);
retryTask.setUpdateDt(LocalDateTime.now());
Assert.isTrue(1 == accessTemplate.getRetryTaskAccess()
.updateById(retryTask.getGroupName(), retryTask),

View File

@ -8,21 +8,29 @@ import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy;
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.ScanTask;
import com.aizuda.easy.retry.server.common.dto.ScanTask;
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.RetryTimerTask;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.TimerWheelHandler;
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
* 数据扫描模板类
@ -44,6 +52,7 @@ public abstract class AbstractScanGroup extends AbstractActor {
@Autowired
protected ClientNodeAllocateHandler clientNodeAllocateHandler;
@Override
public Receive createReceive() {
return receiveBuilder().match(ScanTask.class, config -> {
@ -89,12 +98,15 @@ public abstract class AbstractScanGroup extends AbstractActor {
continue;
}
Timeout timeout = TimerWheelHandler.taskConcurrentHashMap.get(retryTask.getGroupName().concat("_").concat(retryTask.getUniqueId()));
if (Objects.isNull(timeout)) {
productExecUnitActor(executor);
}
}
} else {
// 数据为空则休眠5s
try {
Thread.sleep((DispatchService.PERIOD / 2) * 1000);
Thread.sleep((10 / 2) * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

View File

@ -29,9 +29,9 @@ import java.util.concurrent.ConcurrentMap;
@Component(ActorGenerator.SCAN_CALLBACK_GROUP_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class ScanCallbackGroupActor extends AbstractScanGroup {
public class ScanCallbackTaskActor extends AbstractScanGroup {
public static final String BEAN_NAME = "ScanCallbackGroupActor";
public static final String BEAN_NAME = "ScanCallbackTaskActor";
/**
* 缓存待拉取数据的起点id

View File

@ -31,7 +31,7 @@ import java.util.concurrent.ConcurrentMap;
@Component(ActorGenerator.SCAN_RETRY_GROUP_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class ScanGroupActor extends AbstractScanGroup {
public class ScanRetryTaskActor extends AbstractScanGroup {
/**
* 缓存待拉取数据的起点id

View File

@ -2,6 +2,8 @@ package com.aizuda.easy.retry.server.dispatch;
import lombok.Data;
import java.util.Set;
/**
* @author www.byteblogs.com
* @date 2023-09-21 23:30:22
@ -10,5 +12,5 @@ import lombok.Data;
@Data
public class ConsumerBucket {
private Integer bucket;
private Set<Integer> buckets;
}

View File

@ -1,12 +1,34 @@
package com.aizuda.easy.retry.server.dispatch;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup;
import com.aizuda.easy.retry.server.common.cache.CacheGroupScanActor;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.common.dto.ScanTask;
import com.aizuda.easy.retry.server.retry.task.support.cache.CacheGroupRateLimiter;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.ServerNodeMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.cache.Cache;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
* 消费当前节点分配的bucket并生成扫描任务
* <p>
@ -19,8 +41,101 @@ import org.springframework.stereotype.Component;
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class ConsumerBucketActor extends AbstractActor {
@Autowired
protected AccessTemplate accessTemplate;
@Autowired
protected ServerNodeMapper serverNodeMapper;
@Autowired
protected SystemProperties systemProperties;
@Override
public Receive createReceive() {
return null;
return receiveBuilder().match(ConsumerBucket.class, consumerBucket -> {
try {
doDispatch(consumerBucket);
} catch (Exception e) {
LogUtils.error(log, "Data dispatcher processing exception. [{}]", consumerBucket, e);
}
}).build();
}
private void doDispatch(final ConsumerBucket consumerBucket) {
// 查询桶对应组信息
Set<String> groupNameSet = accessTemplate.getSceneConfigAccess().list(
new LambdaQueryWrapper<SceneConfig>().select(SceneConfig::getGroupName)
.eq(SceneConfig::getSceneStatus, StatusEnum.YES.getStatus())
.in(SceneConfig::getBucketIndex, consumerBucket.getBuckets())
.groupBy(SceneConfig::getGroupName)).stream().map(SceneConfig::getGroupName).collect(Collectors.toSet());
CacheConsumerGroup.clear();
// todo 需要对groupNameSet进行状态过滤只有开启才进行任务调度
for (final String groupName : groupNameSet) {
CacheConsumerGroup.addOrUpdate(groupName);
ScanTask scanTask = new ScanTask();
scanTask.setBuckets(consumerBucket.getBuckets());
scanTask.setGroupName(groupName);
produceScanActorTask(scanTask);
}
// job
ScanTask scanTask = new ScanTask();
scanTask.setBuckets(consumerBucket.getBuckets());
ActorRef jobActor = TaskTypeEnum.JOB.getActorRef().get();
jobActor.tell(scanTask, jobActor);
}
/**
* 扫描任务生成器
*
* @param scanTask {@link ScanTask} 组上下文
*/
private void produceScanActorTask(ScanTask scanTask) {
String groupName = scanTask.getGroupName();
// 缓存按照
cacheRateLimiter(groupName);
// 扫描重试数据
ActorRef scanRetryActorRef = cacheActorRef(groupName, TaskTypeEnum.RETRY);
scanRetryActorRef.tell(scanTask, scanRetryActorRef);
// 扫描回调数据
ActorRef scanCallbackActorRef = cacheActorRef(groupName, TaskTypeEnum.CALLBACK);
scanCallbackActorRef.tell(scanTask, scanCallbackActorRef);
}
/**
* 缓存限流对象
*/
private void cacheRateLimiter(String groupName) {
List<ServerNode> serverNodes = serverNodeMapper.selectList(new LambdaQueryWrapper<ServerNode>()
.eq(ServerNode::getGroupName, groupName));
Cache<String, RateLimiter> rateLimiterCache = CacheGroupRateLimiter.getAll();
for (ServerNode serverNode : serverNodes) {
RateLimiter rateLimiter = rateLimiterCache.getIfPresent(serverNode.getHostId());
if (Objects.isNull(rateLimiter)) {
rateLimiterCache.put(serverNode.getHostId(), RateLimiter.create(systemProperties.getLimiter()));
}
}
}
/**
* 缓存Actor对象
*/
private ActorRef cacheActorRef(String groupName, TaskTypeEnum typeEnum) {
ActorRef scanActorRef = CacheGroupScanActor.get(groupName, typeEnum);
if (Objects.isNull(scanActorRef)) {
scanActorRef = typeEnum.getActorRef().get();
// 缓存扫描器actor
CacheGroupScanActor.put(groupName, typeEnum, scanActorRef);
}
return scanActorRef;
}
}

View File

@ -47,6 +47,9 @@ public class DispatchService implements Lifecycle {
@Override
public void start() {
// TODO待优化
ActorRef actorRef = ActorGenerator.scanBucketActor();
dispatchService.scheduleAtFixedRate(() -> {
try {
@ -59,11 +62,9 @@ public class DispatchService implements Lifecycle {
Set<Integer> currentConsumerBuckets = getConsumerBucket();
LogUtils.info(log, "当前节点分配的桶:[{}]", currentConsumerBuckets);
if (!CollectionUtils.isEmpty(currentConsumerBuckets)) {
for (Integer bucket : currentConsumerBuckets) {
ConsumerBucket scanTaskDTO = new ConsumerBucket();
scanTaskDTO.setBucket(bucket);
produceConsumerBucketActorTask(scanTaskDTO);
}
scanTaskDTO.setBuckets(currentConsumerBuckets);
actorRef.tell(scanTaskDTO, actorRef);
}
} catch (Exception e) {
@ -75,22 +76,6 @@ public class DispatchService implements Lifecycle {
}
/**
* 生成bucket对应的Actor
*
* @param consumerBucket {@link ConsumerBucket} 消费的桶的上下文
*/
private void produceConsumerBucketActorTask(ConsumerBucket consumerBucket) {
Integer bucket = consumerBucket.getBucket();
// 缓存bucket对应的 Actor
ActorRef actorRef = cacheActorRef(bucket);
actorRef.tell(consumerBucket, actorRef);
}
/**
* 分配当前POD负责消费的桶
*
@ -100,19 +85,6 @@ public class DispatchService implements Lifecycle {
return DistributeInstance.INSTANCE.getConsumerBucket();
}
/**
* 缓存Actor对象
*/
private ActorRef cacheActorRef(Integer bucket) {
ActorRef scanActorRef = CacheBucketActor.get(bucket);
if (Objects.isNull(scanActorRef)) {
scanActorRef = ActorGenerator.scanBucketActor();
// 缓存扫描器actor
CacheBucketActor.put(bucket, scanActorRef);
}
return scanActorRef;
}
@Override
public void close() {