feat: 2.1.0

1. 数据上报使用横向数据结构实现滑动窗口替换环形滑动窗口
This commit is contained in:
byteblogs168 2023-07-24 00:01:06 +08:00
parent 805e617161
commit aede45eda9
18 changed files with 562 additions and 384 deletions

View File

@ -3,10 +3,12 @@ package com.aizuda.easy.retry.client.core.report;
import com.aizuda.easy.retry.client.core.Lifecycle;
import com.aizuda.easy.retry.client.core.retryer.RetryerInfo;
import com.aizuda.easy.retry.client.core.window.RetryLeapArray;
import com.aizuda.easy.retry.client.core.window.SlidingWindow;
import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -21,10 +23,11 @@ import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class AsyncReport extends AbstractReport implements Lifecycle {
private static SlidingWindow<RetryTaskDTO> slidingWindow;
private static ScheduledExecutorService dispatchService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "DispatchService"));
public static RetryLeapArray slidingWindow = new RetryLeapArray(SAMPLE_COUNT, INTERVAL_IN_MS, new ReportListener());
// public static RetryLeapArray slidingWindow = new RetryLeapArray(SAMPLE_COUNT, INTERVAL_IN_MS, new ReportListener());
@Override
public boolean supports(boolean async) {
@ -43,23 +46,31 @@ public class AsyncReport extends AbstractReport implements Lifecycle {
public Boolean syncReport(String scene, String targetClassName, Object[] args, long timeout, TimeUnit unit) {
RetryTaskDTO retryTaskDTO = buildRetryTaskDTO(scene, targetClassName, args);
slidingWindow.currentWindow().value().add(retryTaskDTO);
slidingWindow.add(retryTaskDTO);
return Boolean.TRUE;
}
@Override
public void start() {
dispatchService.scheduleAtFixedRate(() -> {
slidingWindow.currentWindow();
}, INTERVAL_IN_MS, INTERVAL_IN_MS / SAMPLE_COUNT, TimeUnit.MILLISECONDS);
slidingWindow = SlidingWindow
.Builder
.<RetryTaskDTO>newBuilder()
.withTotalThreshold(50)
.withDuration(5, ChronoUnit.SECONDS)
.withListener(new ReportListener())
.build();
slidingWindow.start();
// dispatchService.scheduleAtFixedRate(() -> {
// slidingWindow.currentWindow();
// }, INTERVAL_IN_MS, INTERVAL_IN_MS / SAMPLE_COUNT, TimeUnit.MILLISECONDS);
}
@Override
public void close() {
log.info("AsyncReport about to shutdown");
slidingWindow.currentWindow();
slidingWindow.end();
log.info("AsyncReport has been shutdown");
}
}

View File

@ -68,7 +68,7 @@ public class ReportListener implements Listener<RetryTaskDTO> {
sendMessage(throwable);
}, o -> LogUtils.info(log,"Data report successful retry<|>{}<|>", JsonUtil.toJsonString(list)));
} catch (Exception e) {
e.printStackTrace();
LogUtils.error(log,"Data report failed. <|>{}<|>", JsonUtil.toJsonString(list), e);
}
}
@ -92,7 +92,7 @@ public class ReportListener implements Listener<RetryTaskDTO> {
public <V> void onRetry(Attempt<V> attempt) {
if (attempt.hasException()) {
LogUtils.error(log,"easy-retry 上报失败,第[{}]次调度 ", attempt.getAttemptNumber(), attempt.getExceptionCause());
LogUtils.error(log,"easy-retry 上报服务端失败,第[{}]次尝试上报 ", attempt.getAttemptNumber(), attempt.getExceptionCause());
}
}

View File

@ -151,20 +151,36 @@ public class LocalRetryStrategies extends AbstractRetryStrategies {
return Collections.singletonList(new RetryListener() {
@Override
public <V> void onRetry(Attempt<V> attempt) {
if (attempt.hasException()) {
RetryType retryType = retryerInfo.getRetryType();
switch (retryType) {
case ONLY_LOCAL:
case LOCAL_REMOTE:
LogUtils.error(log,"[{}] 执行本地重试失败,第[{}]次重试", retryerInfo.getScene(), attempt.getAttemptNumber());
break;
case ONLY_REMOTE:
LogUtils.error(log,"[{}] 执行远程重试失败,第[{}]次重试", retryerInfo.getScene(), attempt.getAttemptNumber());
break;
default:
throw new EasyRetryClientException("异常重试模式 [{}]", retryType.name());
RetryType retryType = retryerInfo.getRetryType();
switch (retryType) {
case ONLY_LOCAL:
case LOCAL_REMOTE:
LogUtils.info(log,"[{}]执行本地重试,第[{}]次调度", retryerInfo.getScene(), attempt.getAttemptNumber());
break;
case ONLY_REMOTE:
LogUtils.info(log,"[{}]执行远程重试,第[{}]次调度", retryerInfo.getScene(), attempt.getAttemptNumber());
break;
default:
throw new EasyRetryClientException("异常重试模式 [{}]", retryType.name());
}
} else {
RetryType retryType = retryerInfo.getRetryType();
switch (retryType) {
case ONLY_LOCAL:
case LOCAL_REMOTE:
LogUtils.info(log,"[{}] 执行本地重试成功.", retryerInfo.getScene(), attempt.getAttemptNumber());
break;
case ONLY_REMOTE:
LogUtils.info(log,"[{}] 执行远程成功.", retryerInfo.getScene(), attempt.getAttemptNumber());
break;
default:
throw new EasyRetryClientException("异常重试模式 [{}]", retryType.name());
}
}
}
});
}

View File

@ -0,0 +1,436 @@
package com.aizuda.easy.retry.client.core.window;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.common.core.window.Listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* 滑动窗口组件
*
* @author: www.byteblogs.com
* @date : 2023-07-23 13:38
*/
@Slf4j
@SuppressWarnings({"squid:S1319"})
public class SlidingWindow<T> {
/**
* 滑动窗口存储数据
*/
public final TreeMap<LocalDateTime, ConcurrentLinkedQueue<T>> saveData = new TreeMap<>();
/**
* 总量窗口期阈值
*/
private final Integer totalThreshold;
/**
* 开启的窗口数据预警
*/
private final Integer windowTotalThreshold;
/**
* 监听器
*/
private final List<Listener<T>> listeners;
/**
* 窗前期线程
*/
private final ScheduledExecutorService threadPoolExecutor;
/**
* 窗口期时间长度
*/
private final long duration;
/**
* 窗口期单位
*/
private final ChronoUnit chronoUnit;
public static final ReentrantLock lock = new ReentrantLock();//创建锁对象
public SlidingWindow(int totalThreshold,
int windowTotalThreshold,
List<Listener<T>> listeners,
ScheduledExecutorService threadPoolExecutor,
long duration,
ChronoUnit chronoUnit) {
this.totalThreshold = totalThreshold;
this.listeners = listeners;
this.windowTotalThreshold = windowTotalThreshold;
this.threadPoolExecutor = threadPoolExecutor;
this.duration = duration;
this.chronoUnit = chronoUnit;
}
/**
* 添加数据
*
* @param data 需要保存到窗口期内的数据
*/
public void add(T data) {
LocalDateTime now = LocalDateTime.now();
if (isOpenNewWindow(now)) {
lock.lock();
LocalDateTime windowPeriod = now.plus(duration, chronoUnit);
try {
// 防止开启两个间隔时间小于窗口期的窗口
if (isOpenNewWindow(now)) {
ConcurrentLinkedQueue<T> list = new ConcurrentLinkedQueue<>();
list.add(data);
LogUtils
.info(log, "添加新数据 [{}] [{}] size:[{}]", windowPeriod, Thread.currentThread().getName(), list.size());
saveData.put(windowPeriod, list);
// 扫描n-1个窗口是否过期过期则删除
removeInvalidWindow();
// 超过窗口阈值预警
alarmWindowTotal();
} else {
oldWindowAdd(data);
}
} finally {
lock.unlock();
}
} else {
oldWindowAdd(data);
}
}
/**
* 超过窗口阈值预警
*/
private void alarmWindowTotal() {
if (saveData.size() > windowTotalThreshold) {
log.warn("当前存活的窗口数量过多 总量:[{}] > 阈值:[{}] ", saveData.size(), windowTotalThreshold);
}
}
/**
* 扫描n-1个窗口是否过期过期则删除 过期条件为窗口期内无数据
*/
private void removeInvalidWindow() {
for (int i = 0; i < saveData.size() - 1; i++) {
Map.Entry<LocalDateTime, ConcurrentLinkedQueue<T>> firstEntry = saveData.firstEntry();
if (CollectionUtils.isEmpty(firstEntry.getValue())) {
saveData.remove(firstEntry.getKey());
}
}
}
/**
* 往已存在的窗口期内添加数据
*
* @param data 数据
*/
private void oldWindowAdd(T data) {
LocalDateTime windowPeriod = getNewWindowPeriod();
ConcurrentLinkedQueue<T> list = saveData.get(windowPeriod);
list.add(data);
if (list.size() >= totalThreshold) {
doHandlerListener(windowPeriod);
}
}
/**
* 处理通知
*
* @param windowPeriod 窗口期时间
*/
private void doHandlerListener(LocalDateTime windowPeriod) {
lock.lock();
try {
ConcurrentLinkedQueue<T> list = saveData.get(windowPeriod);
if (CollectionUtils.isEmpty(list)) {
return;
}
// 深拷贝
ConcurrentLinkedQueue<T> deepCopy = new ConcurrentLinkedQueue<>(list);
clear(windowPeriod, deepCopy);
if (CollectionUtils.isEmpty(deepCopy)) {
return;
}
for (Listener<T> listener : listeners) {
listener.handler(new ArrayList<>(deepCopy));
}
} catch (Exception e) {
log.error("到达总量窗口期通知异常", e);
} finally {
lock.unlock();
}
}
/**
* 删除2倍窗口期之前无效窗口
*
* @param windowPeriod 当前最老窗口期
*/
private void removeInvalidWindow(LocalDateTime windowPeriod) {
LocalDateTime currentTime = LocalDateTime.now().minus(duration * 2, chronoUnit);
if (windowPeriod.isBefore(currentTime)) {
LogUtils.info(log, "删除过期窗口 windowPeriod:[{}] currentTime:[{}]", windowPeriod, currentTime);
saveData.remove(windowPeriod);
}
}
/**
* 获取窗口期
*
* @return 窗口期时间
*/
private LocalDateTime getOldWindowPeriod() {
return saveData.firstKey();
}
/**
* 获取窗口期
*
* @return 窗口期时间
*/
private LocalDateTime getNewWindowPeriod() {
return saveData.lastKey();
}
/**
* 是否开启新窗口期
*
* @return true- 开启 false- 关闭
*/
private boolean isOpenNewWindow(LocalDateTime now) {
if (saveData.size() == 0) {
return true;
}
LocalDateTime windowPeriod = getNewWindowPeriod();
return windowPeriod.isBefore(now);
}
/**
* 提取存储的第一个数据进行判断是否到达窗口期
*
* @param condition 当前时间
*/
private void extract(LocalDateTime condition) {
if (saveData.size() == 0) {
return;
}
LocalDateTime windowPeriod = getOldWindowPeriod();
// 删除过期窗口期数据
removeInvalidWindow(windowPeriod);
if (windowPeriod.isBefore(condition)) {
LogUtils.info(log, "到达时间窗口期 [{}] [{}]", windowPeriod, JsonUtil.toJsonString(saveData));
doHandlerListener(windowPeriod);
}
}
/**
* 清除已到达窗口期的数据
*
* @param windowPeriod 窗口期时间
*/
private void clear(LocalDateTime windowPeriod, ConcurrentLinkedQueue<T> list) {
saveData.get(windowPeriod).removeAll(list);
}
/**
* 滑动窗口启动
*/
public void start() {
threadPoolExecutor.scheduleAtFixedRate(() -> {
try {
extract(LocalDateTime.now().minus(duration, chronoUnit));
} catch (Exception e) {
log.error("滑动窗口异常", e);
}
}, 1, 1, TimeUnit.SECONDS);
}
/**
* 滑动窗口关闭
*/
public void end() {
ConcurrentLinkedQueue<T> list = saveData.get(LocalDateTime.now());
if (CollectionUtils.isEmpty(list)) {
return;
}
for (Listener<T> listener : listeners) {
listener.handler(new ArrayList<>(list));
}
}
/**
* 滑动窗口构建器
*
* @param <T>
*/
public static class Builder<T> {
/**
* 总量窗口期阈值
*/
private Integer totalThreshold = 10;
/**
* 窗口数量预警
*/
private Integer windowTotalThreshold = 5;
/**
* 监听器
*/
private List<Listener<T>> listeners;
/**
* 窗前期线程
*/
private ScheduledExecutorService threadPoolExecutor;
/**
* 窗口期时间长度
*/
private long duration = 10;
/**
* 窗口期单位
*/
private ChronoUnit chronoUnit = ChronoUnit.SECONDS;
/**
* 创建一个新的构建器
*
* @param <T>
* @return this
*/
public static <T> Builder<T> newBuilder() {
return new Builder<T>();
}
/**
* 总量窗口期阈值
*
* @param totalThreshold 总量窗口期阈值
* @return this
*/
public Builder<T> withTotalThreshold(int totalThreshold) {
Assert.isTrue(totalThreshold > 0, "总量窗口期阈值不能小于0");
this.totalThreshold = totalThreshold;
return this;
}
/**
* 窗口数量预警
*
* @param windowTotalThreshold 窗口数量阈值
* @return this
*/
public Builder<T> withWindowTotalThreshold(int windowTotalThreshold) {
Assert.isTrue(windowTotalThreshold > 0, "窗口数量阈值不能小于0");
this.windowTotalThreshold = windowTotalThreshold;
return this;
}
/**
* 添加监听器
*
* @param listener 监听器
* @return this
*/
public Builder<T> withListener(Listener<T> listener) {
if (CollectionUtils.isEmpty(listeners)) {
listeners = new ArrayList<>();
}
listeners.add(listener);
return this;
}
/**
* 添加窗口期时间
*
* @param duration 时长
* @param chronoUnit 单位
* @return this
*/
public Builder<T> withDuration(long duration, ChronoUnit chronoUnit) {
this.duration = duration;
this.chronoUnit = chronoUnit;
return this;
}
/**
* 添加定时调度线程池
*
* @param threadPoolExecutor 线程池对象
* @return this
*/
public Builder<T> withScheduledExecutorServiced(ScheduledExecutorService threadPoolExecutor) {
this.threadPoolExecutor = threadPoolExecutor;
return this;
}
/**
* 构建滑动窗口对象
*
* @return {@link SlidingWindow} 滑动窗口对象
*/
public SlidingWindow<T> build() {
if (Objects.isNull(threadPoolExecutor)) {
threadPoolExecutor = Executors
.newSingleThreadScheduledExecutor(r -> new Thread(r, "SlidingWindowThread"));
}
if (CollectionUtils.isEmpty(listeners)) {
listeners = Collections.EMPTY_LIST;
}
return new SlidingWindow<>(totalThreshold, windowTotalThreshold, listeners, threadPoolExecutor, duration,
chronoUnit);
}
}
}

View File

@ -20,7 +20,6 @@
<org.mapstruct.version>1.5.3.Final</org.mapstruct.version>
<akka.version>2.6.21</akka.version>
<java-jwt.version>4.4.0</java-jwt.version>
<shedlock.version>4.0.1</shedlock.version>
<okhttp.version>5.0.0-alpha.11</okhttp.version>
<commons-lang.version>2.6</commons-lang.version>
<perf4j.version>0.9.16</perf4j.version>
@ -62,16 +61,6 @@
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-provider-jdbc-template</artifactId>
<version>${shedlock.version}</version>
</dependency>
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-spring</artifactId>
<version>${shedlock.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>

View File

@ -3,6 +3,9 @@ package com.aizuda.easy.retry.server;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import java.util.TimeZone;
@ -12,6 +15,14 @@ import java.util.TimeZone;
@EnableTransactionManagement(proxyTargetClass = true)
public class EasyRetryServerApplication {
@Bean
public TaskScheduler scheduledExecutorService() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(2);
scheduler.setThreadNamePrefix("easy-retry-scheduled-thread-");
return scheduler;
}
public static void main(String[] args) {
TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"));
SpringApplication.run(EasyRetryServerApplication.class, args);

View File

@ -1,6 +1,6 @@
package com.aizuda.easy.retry.server.config;
import com.baomidou.mybatisplus.annotation.DbType;
import com.aizuda.easy.retry.server.enums.DbTypeEnum;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.DynamicTableNameInnerInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
@ -23,10 +23,11 @@ public class MyBatisPlusConfig {
private final static List<String> TABLES = Arrays.asList("retry_task", "retry_dead_letter");
@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor() {
public MybatisPlusInterceptor mybatisPlusInterceptor(SystemProperties systemProperties) {
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(dynamicTableNameInnerInterceptor());
interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
DbTypeEnum dbTypeEnum = systemProperties.getDbType();
interceptor.addInnerInterceptor(new PaginationInnerInterceptor(dbTypeEnum.getMpDbType()));
return interceptor;
}

View File

@ -1,43 +0,0 @@
package com.aizuda.easy.retry.server.config;
import net.javacrumbs.shedlock.core.LockProvider;
import net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider;
import net.javacrumbs.shedlock.spring.annotation.EnableSchedulerLock;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import javax.sql.DataSource;
import java.util.TimeZone;
/**
* @author: www.byteblogs.com
* @date : 2021-11-22 10:56
*/
@Configuration
//@EnableScheduling
//@EnableSchedulerLock(defaultLockAtMostFor = "PT30S")
@Deprecated
public class ShedlockConfig {
// @Bean
// public LockProvider lockProvider(DataSource dataSource) {
// return new JdbcTemplateLockProvider(
// JdbcTemplateLockProvider.Configuration.builder()
// .withJdbcTemplate(new JdbcTemplate(dataSource))
// .withTimeZone(TimeZone.getTimeZone("Asia/Shanghai"))
// .build()
// );
// }
@Bean
public TaskScheduler scheduledExecutorService() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(2);
scheduler.setThreadNamePrefix("easy-retry-scheduled-thread-");
return scheduler;
}
}

View File

@ -1,5 +1,6 @@
package com.aizuda.easy.retry.server.config;
import com.aizuda.easy.retry.server.enums.DbTypeEnum;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@ -51,6 +52,11 @@ public class SystemProperties {
*/
private int logStorage = 90;
/**
* 数据库类型
*/
private DbTypeEnum dbType = DbTypeEnum.MYSQL;
/**
* 回调配置
*/

View File

@ -1,23 +0,0 @@
package com.aizuda.easy.retry.server.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 锁的存储介质
*
* @author www.byteblogs.com
* @date 2023-06-04
* @since 2.0
*/
@AllArgsConstructor
@Getter
public enum DatabaseProductEnum {
MYSQL("mysql", "MySql数据库"),
MARIADB("mariadb", "MariaDB数据库"),
POSTGRE_SQL("postgresql", "Postgre数据库"),
OTHER("other", "其他数据库");
private final String db;
private final String desc;
}

View File

@ -0,0 +1,35 @@
package com.aizuda.easy.retry.server.enums;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import com.baomidou.mybatisplus.annotation.DbType;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 锁的存储介质
*
* @author www.byteblogs.com
* @date 2023-06-04
* @since 2.0
*/
@AllArgsConstructor
@Getter
public enum DbTypeEnum {
MYSQL("mysql", "MySql数据库", DbType.MYSQL),
MARIADB("mariadb", "MariaDB数据库", DbType.MARIADB),
POSTGRE_SQL("postgresql", "Postgre数据库", DbType.POSTGRE_SQL);
private final String db;
private final String desc;
private final DbType mpDbType;
public static DbTypeEnum modeOf(String db) {
for (DbTypeEnum value : DbTypeEnum.values()) {
if (value.getDb() == db) {
return value;
}
}
throw new EasyRetryServerException("暂不支持此数据库 [{}]", db);
}
}

View File

@ -2,7 +2,7 @@ package com.aizuda.easy.retry.server.persistence.support.access.lock;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.dto.LockConfig;
import com.aizuda.easy.retry.server.enums.DatabaseProductEnum;
import com.aizuda.easy.retry.server.enums.DbTypeEnum;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.DistributedLockMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.po.DistributedLock;
import com.aizuda.easy.retry.server.support.register.ServerRegister;
@ -38,13 +38,14 @@ public class JdbcLockAccess extends AbstractLockAccess {
@Override
public boolean supports(final String storageMedium) {
return Arrays.asList(
DatabaseProductEnum.MYSQL.getDb(),
DatabaseProductEnum.MARIADB.getDb(),
DatabaseProductEnum.POSTGRE_SQL.getDb()
DbTypeEnum.MYSQL.getDb(),
DbTypeEnum.MARIADB.getDb(),
DbTypeEnum.POSTGRE_SQL.getDb()
).contains(storageMedium);
}
@Override
public boolean unlock(final LockConfig lockConfig) {
LocalDateTime now = lockConfig.getCreateDt();
DistributedLock distributedLock = new DistributedLock();

View File

@ -1,6 +1,5 @@
package com.aizuda.easy.retry.server.support.cache;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.support.Lifecycle;
import com.google.common.cache.Cache;
@ -10,11 +9,9 @@ import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.Set;
import java.util.WeakHashMap;
/**
* 缓存本地的分布式锁的名称
*
* @author www.byteblogs.com
* @date 2023-07-20 22:53:21
* @since 2.1.0

View File

@ -1,19 +0,0 @@
package com.aizuda.easy.retry.server.support.handler;
import com.aizuda.easy.retry.server.persistence.support.LockAccess;
import com.aizuda.easy.retry.server.support.cache.CacheLockRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author www.byteblogs.com
* @date 2023-07-20 22:28:35
* @since 2.1.0
*/
@Component
public class DistributedLockHandler {
@Autowired
private LockAccess lockAccess;
}

View File

@ -2,9 +2,11 @@ package com.aizuda.easy.retry.server.support.schedule;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.config.SystemProperties;
import com.aizuda.easy.retry.server.dto.LockConfig;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.persistence.support.LockAccess;
import com.aizuda.easy.retry.server.support.Lifecycle;
import com.aizuda.easy.retry.server.support.Schedule;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -13,6 +15,7 @@ import org.springframework.scheduling.TaskScheduler;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
/**
* @author: www.byteblogs.com
@ -26,7 +29,9 @@ public abstract class AbstractSchedule implements Schedule {
@Qualifier("scheduledExecutorService")
protected TaskScheduler taskScheduler;
@Autowired
private LockAccess lockAccess;
private List<LockAccess> lockAccesses;
@Autowired
private SystemProperties systemProperties;
@Override
public void execute() {
@ -39,6 +44,8 @@ public abstract class AbstractSchedule implements Schedule {
Assert.notBlank(lockName, () -> new EasyRetryServerException("lockName can not be null."));
LockConfig lockConfig = new LockConfig(LocalDateTime.now(), lockName, Duration.parse(lockAtMost), Duration.parse(lockAtLeast));
LockAccess lockAccess = getLockAccess();
try {
if (lockAccess.lock(lockConfig)) {
doExecute();
@ -57,6 +64,12 @@ public abstract class AbstractSchedule implements Schedule {
abstract String lockAtMost();
abstract String lockAtLeast();
abstract String lockAtLeast();
private LockAccess getLockAccess() {
return lockAccesses.stream()
.filter(lockAccess -> lockAccess.supports(systemProperties.getDbType().getDb()))
.findFirst().orElseThrow(() -> new EasyRetryServerException("未找到合适锁处理器"));
}
}

View File

@ -1,132 +0,0 @@
package com.aizuda.easy.retry.server.support.schedule;
import com.aizuda.easy.retry.common.core.alarm.Alarm;
import com.aizuda.easy.retry.common.core.alarm.AlarmContext;
import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory;
import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.EnvironmentUtils;
import com.aizuda.easy.retry.common.core.util.HostUtils;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryDeadLetterMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig;
import com.aizuda.easy.retry.server.persistence.mybatis.po.NotifyConfig;
import com.aizuda.easy.retry.server.persistence.support.ConfigAccess;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
/**
* @author: www.byteblogs.com
* @date : 2021-11-24 14:58
*/
@Component
@Slf4j
@Deprecated
public class AlarmNotifyThreadSchedule {
private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static String retryErrorMoreThresholdTextMessageFormatter =
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 重试失败数据监控</font> \n" +
"> 名称:{} \n" +
"> 时间窗口:{} ~ {} \n" +
"> **共计:{}** \n";
private static String retryTaskMoreThresholdTextMessageFormatter =
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 重试数据监控</font> \n" +
"> 名称:{} \n" +
"> 时间:{} \n" +
"> **共计:{}** \n";
@Autowired
private RetryDeadLetterMapper retryDeadLetterMapper;
@Autowired
private RetryTaskMapper retryTaskMapper;
@Autowired
private EasyRetryAlarmFactory easyRetryAlarmFactory;
@Autowired
@Qualifier("configAccessProcessor")
private ConfigAccess configAccess;
/**
* 监控重试表中数据总量是否到达阈值
*/
// @Scheduled(cron = "0 0/10 * * * ?")
// @SchedulerLock(name = "retryTaskMoreThreshold", lockAtMostFor = "PT10M", lockAtLeastFor = "PT10M")
public void retryTaskMoreThreshold() {
LogUtils.info(log, "retryTaskMoreThreshold time[{}] ip:[{}]", LocalDateTime.now(), HostUtils.getIp());
for (GroupConfig groupConfig : configAccess.getAllConfigGroupList()) {
List<NotifyConfig> notifyConfigs = configAccess.getNotifyConfigByGroupName(groupConfig.getGroupName(), NotifySceneEnum.MAX_RETRY.getNotifyScene());
if (CollectionUtils.isEmpty(notifyConfigs)) {
continue;
}
int count = retryTaskMapper.countAllRetryTaskByRetryStatus(groupConfig.getGroupPartition(), RetryStatusEnum.RUNNING.getStatus());
for (NotifyConfig notifyConfig : notifyConfigs) {
if (count > notifyConfig.getNotifyThreshold()) {
// 预警
AlarmContext context = AlarmContext.build()
.text(retryTaskMoreThresholdTextMessageFormatter,
EnvironmentUtils.getActiveProfile(),
groupConfig.getGroupName(),
LocalDateTime.now().format(formatter),
count)
.title("组:[{}])重试数据过多", groupConfig.getGroupName())
.notifyAttribute(notifyConfig.getNotifyAttribute());
Alarm<AlarmContext> alarmType = easyRetryAlarmFactory.getAlarmType(notifyConfig.getNotifyType());
alarmType.asyncSendMessage(context);
}
}
}
}
/**
* 监控重试失败数据总量是否到达阈值
*/
// @Scheduled(cron = "0 0/11 * * * ?")
// @SchedulerLock(name = "retryErrorMoreThreshold", lockAtMostFor = "PT11M", lockAtLeastFor = "PT11M")
public void retryErrorMoreThreshold() {
LogUtils.info(log, "retryErrorMoreThreshold time[{}] ip:[{}]", LocalDateTime.now(), HostUtils.getIp());
for (GroupConfig groupConfig : configAccess.getAllConfigGroupList()) {
List<NotifyConfig> notifyConfigs = configAccess.getNotifyConfigByGroupName(groupConfig.getGroupName(), NotifySceneEnum.MAX_RETRY_ERROR.getNotifyScene());
if (CollectionUtils.isEmpty(notifyConfigs)) {
continue;
}
// x分钟内进入死信队列的数据量
LocalDateTime now = LocalDateTime.now();
int count = retryDeadLetterMapper.countRetryDeadLetterByCreateAt(now.minusMinutes(30), now, groupConfig.getGroupPartition());
for (NotifyConfig notifyConfig : notifyConfigs) {
if (count > notifyConfig.getNotifyThreshold()) {
// 预警
AlarmContext context = AlarmContext.build()
.text(retryErrorMoreThresholdTextMessageFormatter,
EnvironmentUtils.getActiveProfile(),
groupConfig.getGroupName(),
now.minusMinutes(30).format(formatter),
now.format(formatter),
count)
.title("组:[{}] 环境重试失败数据监控", groupConfig.getGroupName())
.notifyAttribute(notifyConfig.getNotifyAttribute());
Alarm<AlarmContext> alarmType = easyRetryAlarmFactory.getAlarmType(notifyConfig.getNotifyType());
alarmType.asyncSendMessage(context);
}
}
}
}
}

View File

@ -1,122 +0,0 @@
package com.aizuda.easy.retry.server.support.schedule;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.config.SystemProperties;
import com.aizuda.easy.retry.server.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskLogMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskLogMessageMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.ServerNodeMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTaskLog;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTaskLogMessage;
import com.aizuda.easy.retry.server.persistence.support.ConfigAccess;
import com.aizuda.easy.retry.server.service.RetryService;
import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.support.register.ServerRegister;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.Set;
import java.util.stream.Collectors;
/**
* 清除数据线程调度器
*
* @author: www.byteblogs.com
* @date : 2021-11-22 11:00
*/
@Component
@Slf4j
@Deprecated
public class ClearThreadSchedule {
@Autowired
private ServerNodeMapper serverNodeMapper;
@Autowired
private RetryService retryService;
@Autowired
@Qualifier("configAccessProcessor")
private ConfigAccess configAccess;
@Autowired
private RetryTaskLogMessageMapper retryTaskLogMessageMapper;
@Autowired
private RetryTaskLogMapper retryTaskLogMapper;
@Autowired
private SystemProperties systemProperties;
/**
* 删除过期下线机器
*/
// @Scheduled(fixedRate = 5000)
// @SchedulerLock(name = "clearOfflineNode", lockAtMostFor = "PT10s", lockAtLeastFor = "PT5s")
public void clearOfflineNode() {
try {
// 删除内存缓存的待下线的机器
LocalDateTime endTime = LocalDateTime.now().minusSeconds(ServerRegister.DELAY_TIME + (ServerRegister.DELAY_TIME / 3));
// 先删除DB中需要下线的机器
serverNodeMapper.deleteByExpireAt(endTime);
Set<RegisterNodeInfo> allPods = CacheRegisterTable.getAllPods();
Set<RegisterNodeInfo> waitOffline = allPods.stream().filter(registerNodeInfo -> registerNodeInfo.getExpireAt().isBefore(endTime)).collect(Collectors.toSet());
Set<String> podIds = waitOffline.stream().map(RegisterNodeInfo::getHostId).collect(Collectors.toSet());
if (CollectionUtils.isEmpty(podIds)) {
return;
}
for (final RegisterNodeInfo registerNodeInfo : waitOffline) {
CacheRegisterTable.remove(registerNodeInfo.getGroupName(), registerNodeInfo.getHostId());
}
} catch (Exception e) {
LogUtils.error(log, "clearOfflineNode 失败", e);
}
}
/**
* 删除重试完成的和重试到达最大重试次数的数据迁移到死信队列表
*/
// @Scheduled(cron = "0 0 0/1 * * ?")
// @SchedulerLock(name = "clearFinishAndMoveDeadLetterRetryTask", lockAtMostFor = "PT60s", lockAtLeastFor = "PT60s")
public void clearFinishAndMoveDeadLetterRetryTask() {
try {
Set<String> groupNameList = configAccess.getGroupNameList();
for (String groupName : groupNameList) {
retryService.moveDeadLetterAndDelFinish(groupName);
}
} catch (Exception e) {
LogUtils.error(log, "clearFinishAndMoveDeadLetterRetryTask 失败", e);
}
}
/**
* 清理日志 一小时运行一次
*/
// @Scheduled(cron = "0 0 0/1 * * ? ")
// @SchedulerLock(name = "clearLog", lockAtMostFor = "PT1H", lockAtLeastFor = "PT1H")
public void clearLog() {
try {
LocalDateTime endTime = LocalDateTime.now().minusDays(systemProperties.getLogStorage());
retryTaskLogMapper.delete(new LambdaUpdateWrapper<RetryTaskLog>().le(RetryTaskLog::getCreateDt, endTime));
retryTaskLogMessageMapper.delete(new LambdaUpdateWrapper<RetryTaskLogMessage>().le(RetryTaskLogMessage::getCreateDt, endTime));
} catch (Exception e) {
LogUtils.error(log, "clear log error", e);
}
}
}

View File

@ -48,6 +48,7 @@ easy-retry:
callback: # 回调配置
max-count: 288 #回调最大执行次数
trigger-interval: 900 #间隔时间
db-type: mysql