feat:2.4.0

1. 新增系统模式
This commit is contained in:
byteblogs168 2023-10-19 23:54:29 +08:00
parent daa2cae39d
commit a5ddbab360
18 changed files with 176 additions and 148 deletions

View File

@ -1,5 +1,6 @@
package com.aizuda.easy.retry.server.common.config;
import com.aizuda.easy.retry.server.common.enums.SystemModeEnum;
import com.aizuda.easy.retry.template.datasource.enums.DbTypeEnum;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ -71,6 +72,14 @@ public class SystemProperties {
*/
private Callback callback = new Callback();
/**
* 系统模式:
* RETRY: 分布式重试重
* JOB: 分布式定时任务
* ALL: 分布式重试重 && 分布式定时任务
*/
private SystemModeEnum mode = SystemModeEnum.ALL;
/**
* 回调配置
*/

View File

@ -0,0 +1,15 @@
package com.aizuda.easy.retry.server.common.enums;
/**
* 系统模式: 分布式重试重试分布式定时任务
*
* @author www.byteblogs.com
* @date 2023-10-19 22:04:38
* @since 2.4.0
*/
public enum SystemModeEnum {
RETRY,
JOB,
ALL
}

View File

@ -7,7 +7,7 @@ import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerTask;
import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerWheelHandler;
import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerWheel;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import lombok.extern.slf4j.Slf4j;
@ -58,7 +58,7 @@ public class JobTaskBatchGenerator {
jobTimerTaskDTO.setTaskBatchId(jobTaskBatch.getId());
jobTimerTaskDTO.setGroupName(context.getGroupName());
jobTimerTaskDTO.setJobId(context.getJobId());
JobTimerWheelHandler.register(context.getGroupName(), jobTaskBatch.getId(),
JobTimerWheel.register(jobTaskBatch.getId(),
new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
}

View File

@ -0,0 +1,37 @@
package com.aizuda.easy.retry.server.job.task.support.idempotent;
import com.aizuda.easy.retry.server.common.IdempotentStrategy;
import java.util.HashSet;
import java.util.Set;
/**
* @author www.byteblogs.com
* @date 2023-10-19 21:54:57
* @since 2.4.0
*/
public class TimerIdempotent implements IdempotentStrategy<Long, Long> {
private static final Set<Long> cache = new HashSet<>();
@Override
public boolean set(Long key, Long value) {
return cache.add(key);
}
@Override
public Long get(Long s) {
throw new UnsupportedOperationException("不支持此操作");
}
@Override
public boolean isExist(Long key, Long value) {
return cache.contains(key);
}
@Override
public boolean clear(Long key, Long value) {
cache.clear();
return Boolean.TRUE;
}
}

View File

@ -3,7 +3,7 @@ package com.aizuda.easy.retry.server.job.task.support.prepare;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerWheelHandler;
import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerWheel;
import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@ -32,7 +32,7 @@ public class WaitJobPrepareHandler extends AbstractJobPrePareHandler {
log.info("存在待处理任务. taskBatchId:[{}]", jobPrepareDTO.getTaskBatchId());
// 若时间轮中数据不存在则重新加入
if (!JobTimerWheelHandler.isExisted(jobPrepareDTO.getGroupName(), jobPrepareDTO.getTaskBatchId())) {
if (!JobTimerWheel.isExisted(jobPrepareDTO.getTaskBatchId())) {
// 进入时间轮
long delay = jobPrepareDTO.getNextTriggerAt().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()
@ -42,7 +42,7 @@ public class WaitJobPrepareHandler extends AbstractJobPrePareHandler {
jobTimerTaskDTO.setJobId(jobPrepareDTO.getJobId());
jobTimerTaskDTO.setGroupName(jobPrepareDTO.getGroupName());
JobTimerWheelHandler.register(jobPrepareDTO.getGroupName(), jobPrepareDTO.getTaskBatchId(),
JobTimerWheel.register(jobPrepareDTO.getTaskBatchId(),
new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
}
}

View File

@ -49,7 +49,7 @@ public class JobTimerTask implements TimerTask {
try {
// 清除时间轮的缓存
JobTimerWheelHandler.clearCache(jobTimerTaskDTO.getGroupName(), jobTimerTaskDTO.getTaskBatchId());
JobTimerWheel.clearCache(jobTimerTaskDTO.getTaskBatchId());
JobMapper jobMapper = SpringContext.getBeanByType(JobMapper.class);
Job job = jobMapper.selectOne(new LambdaQueryWrapper<Job>()

View File

@ -0,0 +1,65 @@
package com.aizuda.easy.retry.server.job.task.support.timer;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.job.task.support.idempotent.TimerIdempotent;
import io.netty.util.HashedWheelTimer;
import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* @author: www.byteblogs.com
* @date : 2023-09-22 17:03
* @since : 2.4.0
*/
@Component
@Slf4j
public class JobTimerWheel implements Lifecycle {
private static final int TICK_DURATION = 100;
private static final String THREAD_NAME_PREFIX = "job-task-timer-wheel-";
private static HashedWheelTimer timer = null;
private static final TimerIdempotent idempotent = new TimerIdempotent();
@Override
public void start() {
timer = new HashedWheelTimer(
new CustomizableThreadFactory(THREAD_NAME_PREFIX), TICK_DURATION,
TimeUnit.MILLISECONDS);
timer.start();
}
public static void register(Long uniqueId, TimerTask task, long delay, TimeUnit unit) {
if (!isExisted(uniqueId)) {
delay = delay < 0 ? 0 : delay;
log.info("加入时间轮. delay:[{}ms] uniqueId:[{}]", delay, uniqueId);
try {
timer.newTimeout(task, delay, unit);
idempotent.set(uniqueId, uniqueId);
} catch (Exception e) {
LogUtils.error(log, "加入时间轮失败. uniqueId:[{}]", uniqueId, e);
}
}
}
public static boolean isExisted(Long uniqueId) {
return idempotent.isExist(uniqueId, uniqueId);
}
public static void clearCache(Long uniqueId) {
idempotent.clear(uniqueId, uniqueId);
}
@Override
public void close() {
timer.stop();
}
}

View File

@ -1,101 +0,0 @@
package com.aizuda.easy.retry.server.job.task.support.timer;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* @author: www.byteblogs.com
* @date : 2023-09-22 17:03
*/
@Component
@Slf4j
public class JobTimerWheelHandler implements Lifecycle {
private static final int TICK_DURATION = 100;
private static final String THREAD_NAME_REFIX = "job-task-timer-wheel-";
private static HashedWheelTimer timer = null;
private static Cache<String, Timeout> cache;
@Override
public void start() {
timer = new HashedWheelTimer(
new CustomizableThreadFactory(THREAD_NAME_REFIX), TICK_DURATION,
TimeUnit.MILLISECONDS);
timer.start();
cache = CacheBuilder.newBuilder()
// 设置并发级别为cpu核心数
.concurrencyLevel(Runtime.getRuntime().availableProcessors())
.build();
}
public static void register(String groupName, Long taskBatchId, TimerTask task, long delay, TimeUnit unit) {
if (delay < 0) {
delay = 0;
}
Timeout timeout = getTimeout(groupName, taskBatchId);
if (Objects.isNull(timeout)) {
try {
log.info("加入时间轮. delay:[{}ms] taskId:[{}]", delay, taskBatchId);
timeout = timer.newTimeout(task, delay, unit);
cache.put(getKey(groupName, taskBatchId), timeout);
} catch (Exception e) {
LogUtils.error(log, "加入时间轮失败. groupName:[{}] uniqueId:[{}]",
groupName, taskBatchId, e);
}
}
}
private static String getKey(String groupName, Long uniqueId) {
return groupName.concat(StrUtil.UNDERLINE).concat(uniqueId.toString());
}
public static Timeout getTimeout(String groupName, Long uniqueId) {
return cache.getIfPresent(getKey(groupName, uniqueId));
}
public static boolean isExisted(String groupName, Long uniqueId) {
return Objects.nonNull(cache.getIfPresent(getKey(groupName, uniqueId)));
}
public static boolean cancel(String groupName, Long uniqueId) {
String key = getKey(groupName, uniqueId);
Timeout timeout = cache.getIfPresent(key);
if (Objects.isNull(timeout)) {
return false;
}
cache.invalidate(key);
return timeout.cancel();
}
public static void clearCache(String groupName, Long uniqueId) {
cache.invalidate(getKey(groupName, uniqueId));
}
@Override
public void close() {
timer.stop();
cache.invalidateAll();
}
}

View File

@ -14,7 +14,7 @@ import com.aizuda.easy.retry.server.common.client.RequestBuilder;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.retry.task.client.RetryRpcClient;
import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy;
import com.aizuda.easy.retry.server.common.IdempotentStrategy;
import com.aizuda.easy.retry.server.retry.task.support.context.CallbackRetryContext;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO;
import com.aizuda.easy.retry.server.retry.task.support.handler.CallbackRetryTaskHandler;

View File

@ -5,7 +5,6 @@ import akka.actor.ActorRef;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.client.model.DispatchRetryDTO;
import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.RetryResultStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
@ -16,7 +15,7 @@ import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.client.RequestBuilder;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.retry.task.client.RetryRpcClient;
import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy;
import com.aizuda.easy.retry.server.common.IdempotentStrategy;
import com.aizuda.easy.retry.server.retry.task.support.context.MaxAttemptsPersistenceRetryContext;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO;
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
@ -27,7 +26,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;
import java.util.Objects;

View File

@ -4,7 +4,7 @@ import akka.actor.AbstractActor;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy;
import com.aizuda.easy.retry.server.common.IdempotentStrategy;
import com.aizuda.easy.retry.server.common.dto.ScanTask;
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutor;

View File

@ -4,7 +4,7 @@ import akka.actor.ActorRef;
import cn.hutool.core.lang.Pair;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy;
import com.aizuda.easy.retry.server.common.IdempotentStrategy;
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
import com.aizuda.easy.retry.server.retry.task.support.retry.RetryExecutor;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;

View File

@ -1,7 +1,7 @@
package com.aizuda.easy.retry.server.retry.task.support.idempotent;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy;
import com.aizuda.easy.retry.server.common.IdempotentStrategy;
import org.springframework.stereotype.Component;
import java.util.BitSet;

View File

@ -6,7 +6,7 @@ import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.dto.DistributeInstance;
import com.aizuda.easy.retry.server.retry.task.support.FilterStrategy;
import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy;
import com.aizuda.easy.retry.server.common.IdempotentStrategy;
import com.aizuda.easy.retry.server.retry.task.support.RetryContext;
import com.aizuda.easy.retry.server.retry.task.support.cache.CacheGroupRateLimiter;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.ServerNodeMapper;

View File

@ -8,13 +8,13 @@ import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup;
import com.aizuda.easy.retry.server.common.cache.CacheGroupScanActor;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.enums.SystemModeEnum;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.common.dto.ScanTask;
import com.aizuda.easy.retry.server.retry.task.support.cache.CacheGroupRateLimiter;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.ServerNodeMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig;
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.cache.Cache;
@ -70,31 +70,36 @@ public class ConsumerBucketActor extends AbstractActor {
return;
}
// 查询桶对应组信息
List<GroupConfig> groupConfigs = accessTemplate.getGroupConfigAccess().list(
new LambdaQueryWrapper<GroupConfig>()
.select(GroupConfig::getGroupName)
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
.in(GroupConfig::getBucketIndex, consumerBucket.getBuckets())
);
if (systemProperties.getMode() == SystemModeEnum.ALL || systemProperties.getMode() == SystemModeEnum.RETRY) {
// 查询桶对应组信息
List<GroupConfig> groupConfigs = accessTemplate.getGroupConfigAccess().list(
new LambdaQueryWrapper<GroupConfig>()
.select(GroupConfig::getGroupName)
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
.in(GroupConfig::getBucketIndex, consumerBucket.getBuckets())
);
if (!CollectionUtils.isEmpty(groupConfigs)) {
for (final GroupConfig groupConfig : groupConfigs) {
CacheConsumerGroup.addOrUpdate(groupConfig.getGroupName());
ScanTask scanTask = new ScanTask();
scanTask.setGroupName(groupConfig.getGroupName());
scanTask.setBuckets(consumerBucket.getBuckets());
produceScanActorTask(scanTask);
if (!CollectionUtils.isEmpty(groupConfigs)) {
for (final GroupConfig groupConfig : groupConfigs) {
CacheConsumerGroup.addOrUpdate(groupConfig.getGroupName());
ScanTask scanTask = new ScanTask();
scanTask.setGroupName(groupConfig.getGroupName());
scanTask.setBuckets(consumerBucket.getBuckets());
produceScanActorTask(scanTask);
}
}
}
// 扫描回调数据
ScanTask scanTask = new ScanTask();
scanTask.setBuckets(consumerBucket.getBuckets());
scanTask.setSize(1000);
scanTask.setStartId(0);
ActorRef scanJobActorRef = cacheActorRef("DEFAULT_JOB_KEY", TaskTypeEnum.JOB);
scanJobActorRef.tell(scanTask, scanJobActorRef);
if (systemProperties.getMode() == SystemModeEnum.ALL || systemProperties.getMode() == SystemModeEnum.JOB) {
// 扫描回调数据
ScanTask scanTask = new ScanTask();
scanTask.setBuckets(consumerBucket.getBuckets());
scanTask.setSize(1000);
scanTask.setStartId(0);
ActorRef scanJobActorRef = cacheActorRef("DEFAULT_JOB_KEY", TaskTypeEnum.JOB);
scanJobActorRef.tell(scanTask, scanJobActorRef);
}
}
/**

View File

@ -48,6 +48,7 @@ easy-retry:
max-count: 288 #回调最大执行次数
trigger-interval: 900 #间隔时间
db-type: mysql #当前使用的数据库
mode: retry

View File

@ -1,6 +1,5 @@
package com.aizuda.easy.retry.server;
import com.aizuda.easy.retry.server.retry.task.support.handler.ConfigVersionSyncHandler;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
@ -13,14 +12,14 @@ import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class ConfigVersionSyncHandlerTest {
@Autowired
private ConfigVersionSyncHandler configVersionSyncHandler;
@SneakyThrows
@Test
public void syncVersion() {
configVersionSyncHandler.addSyncTask( "example_group", 0);
}
// @Autowired
// private ConfigVersionSyncHandler configVersionSyncHandler;
//
// @SneakyThrows
// @Test
// public void syncVersion() {
// configVersionSyncHandler.addSyncTask( "example_group", 0);
//
// }
}