From aede45eda9ba14804f45dadd91d02bd03005b992 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Mon, 24 Jul 2023 00:01:06 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.1.0=201.=20=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E4=B8=8A=E6=8A=A5=E4=BD=BF=E7=94=A8=E6=A8=AA=E5=90=91=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E7=BB=93=E6=9E=84=E5=AE=9E=E7=8E=B0=E6=BB=91=E5=8A=A8?= =?UTF-8?q?=E7=AA=97=E5=8F=A3=E6=9B=BF=E6=8D=A2=E7=8E=AF=E5=BD=A2=E6=BB=91?= =?UTF-8?q?=E5=8A=A8=E7=AA=97=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../retry/client/core/report/AsyncReport.java | 25 +- .../client/core/report/ReportListener.java | 4 +- .../core/strategy/LocalRetryStrategies.java | 38 +- .../client/core/window/SlidingWindow.java | 436 ++++++++++++++++++ easy-retry-server/pom.xml | 11 - .../server/EasyRetryServerApplication.java | 11 + .../server/config/MyBatisPlusConfig.java | 7 +- .../retry/server/config/ShedlockConfig.java | 43 -- .../retry/server/config/SystemProperties.java | 6 + .../server/enums/DatabaseProductEnum.java | 23 - .../easy/retry/server/enums/DbTypeEnum.java | 35 ++ .../support/access/lock/JdbcLockAccess.java | 9 +- .../server/support/cache/CacheLockRecord.java | 7 +- .../handler/DistributedLockHandler.java | 19 - .../support/schedule/AbstractSchedule.java | 17 +- .../schedule/AlarmNotifyThreadSchedule.java | 132 ------ .../support/schedule/ClearThreadSchedule.java | 122 ----- .../src/main/resources/application.yml | 1 + 18 files changed, 562 insertions(+), 384 deletions(-) create mode 100644 easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/window/SlidingWindow.java delete mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/ShedlockConfig.java delete mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/enums/DatabaseProductEnum.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/enums/DbTypeEnum.java delete mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/DistributedLockHandler.java delete mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/AlarmNotifyThreadSchedule.java delete mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/ClearThreadSchedule.java diff --git a/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/report/AsyncReport.java b/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/report/AsyncReport.java index f2d9b0f8d..323214edb 100644 --- a/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/report/AsyncReport.java +++ b/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/report/AsyncReport.java @@ -3,10 +3,12 @@ package com.aizuda.easy.retry.client.core.report; import com.aizuda.easy.retry.client.core.Lifecycle; import com.aizuda.easy.retry.client.core.retryer.RetryerInfo; import com.aizuda.easy.retry.client.core.window.RetryLeapArray; +import com.aizuda.easy.retry.client.core.window.SlidingWindow; import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.time.temporal.ChronoUnit; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -21,10 +23,11 @@ import java.util.concurrent.TimeUnit; @Component @Slf4j public class AsyncReport extends AbstractReport implements Lifecycle { + private static SlidingWindow slidingWindow; private static ScheduledExecutorService dispatchService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "DispatchService")); - public static RetryLeapArray slidingWindow = new RetryLeapArray(SAMPLE_COUNT, INTERVAL_IN_MS, new ReportListener()); +// public static RetryLeapArray slidingWindow = new RetryLeapArray(SAMPLE_COUNT, INTERVAL_IN_MS, new ReportListener()); @Override public boolean supports(boolean async) { @@ -43,23 +46,31 @@ public class AsyncReport extends AbstractReport implements Lifecycle { public Boolean syncReport(String scene, String targetClassName, Object[] args, long timeout, TimeUnit unit) { RetryTaskDTO retryTaskDTO = buildRetryTaskDTO(scene, targetClassName, args); - slidingWindow.currentWindow().value().add(retryTaskDTO); - + slidingWindow.add(retryTaskDTO); return Boolean.TRUE; } @Override public void start() { - dispatchService.scheduleAtFixedRate(() -> { - slidingWindow.currentWindow(); - }, INTERVAL_IN_MS, INTERVAL_IN_MS / SAMPLE_COUNT, TimeUnit.MILLISECONDS); + + slidingWindow = SlidingWindow + .Builder + .newBuilder() + .withTotalThreshold(50) + .withDuration(5, ChronoUnit.SECONDS) + .withListener(new ReportListener()) + .build(); + slidingWindow.start(); +// dispatchService.scheduleAtFixedRate(() -> { +// slidingWindow.currentWindow(); +// }, INTERVAL_IN_MS, INTERVAL_IN_MS / SAMPLE_COUNT, TimeUnit.MILLISECONDS); } @Override public void close() { log.info("AsyncReport about to shutdown"); - slidingWindow.currentWindow(); + slidingWindow.end(); log.info("AsyncReport has been shutdown"); } } diff --git a/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/report/ReportListener.java b/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/report/ReportListener.java index 25b2f90c4..c72f0031e 100644 --- a/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/report/ReportListener.java +++ b/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/report/ReportListener.java @@ -68,7 +68,7 @@ public class ReportListener implements Listener { sendMessage(throwable); }, o -> LogUtils.info(log,"Data report successful retry:<|>{}<|>", JsonUtil.toJsonString(list))); } catch (Exception e) { - e.printStackTrace(); + LogUtils.error(log,"Data report failed. <|>{}<|>", JsonUtil.toJsonString(list), e); } } @@ -92,7 +92,7 @@ public class ReportListener implements Listener { public void onRetry(Attempt attempt) { if (attempt.hasException()) { - LogUtils.error(log,"easy-retry 上报失败,第[{}]次调度 ", attempt.getAttemptNumber(), attempt.getExceptionCause()); + LogUtils.error(log,"easy-retry 上报服务端失败,第[{}]次尝试上报 ", attempt.getAttemptNumber(), attempt.getExceptionCause()); } } diff --git a/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/strategy/LocalRetryStrategies.java b/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/strategy/LocalRetryStrategies.java index 9aa563372..e9d1eb6c6 100644 --- a/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/strategy/LocalRetryStrategies.java +++ b/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/strategy/LocalRetryStrategies.java @@ -151,20 +151,36 @@ public class LocalRetryStrategies extends AbstractRetryStrategies { return Collections.singletonList(new RetryListener() { @Override public void onRetry(Attempt attempt) { + if (attempt.hasException()) { + RetryType retryType = retryerInfo.getRetryType(); + switch (retryType) { + case ONLY_LOCAL: + case LOCAL_REMOTE: + LogUtils.error(log,"[{}] 执行本地重试失败,第[{}]次重试", retryerInfo.getScene(), attempt.getAttemptNumber()); + break; + case ONLY_REMOTE: + LogUtils.error(log,"[{}] 执行远程重试失败,第[{}]次重试", retryerInfo.getScene(), attempt.getAttemptNumber()); + break; + default: + throw new EasyRetryClientException("异常重试模式 [{}]", retryType.name()); - RetryType retryType = retryerInfo.getRetryType(); - switch (retryType) { - case ONLY_LOCAL: - case LOCAL_REMOTE: - LogUtils.info(log,"[{}]执行本地重试,第[{}]次调度", retryerInfo.getScene(), attempt.getAttemptNumber()); - break; - case ONLY_REMOTE: - LogUtils.info(log,"[{}]执行远程重试,第[{}]次调度", retryerInfo.getScene(), attempt.getAttemptNumber()); - break; - default: - throw new EasyRetryClientException("异常重试模式 [{}]", retryType.name()); + } + } else { + RetryType retryType = retryerInfo.getRetryType(); + switch (retryType) { + case ONLY_LOCAL: + case LOCAL_REMOTE: + LogUtils.info(log,"[{}] 执行本地重试成功.", retryerInfo.getScene(), attempt.getAttemptNumber()); + break; + case ONLY_REMOTE: + LogUtils.info(log,"[{}] 执行远程成功.", retryerInfo.getScene(), attempt.getAttemptNumber()); + break; + default: + throw new EasyRetryClientException("异常重试模式 [{}]", retryType.name()); + } } + } }); } diff --git a/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/window/SlidingWindow.java b/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/window/SlidingWindow.java new file mode 100644 index 000000000..95cf577fa --- /dev/null +++ b/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/window/SlidingWindow.java @@ -0,0 +1,436 @@ +package com.aizuda.easy.retry.client.core.window; + +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.common.core.util.JsonUtil; +import com.aizuda.easy.retry.common.core.window.Listener; +import lombok.extern.slf4j.Slf4j; +import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; + +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +/** + * 滑动窗口组件 + * + * @author: www.byteblogs.com + * @date : 2023-07-23 13:38 + */ +@Slf4j +@SuppressWarnings({"squid:S1319"}) +public class SlidingWindow { + + /** + * 滑动窗口存储数据 + */ + public final TreeMap> saveData = new TreeMap<>(); + + /** + * 总量窗口期阈值 + */ + private final Integer totalThreshold; + + /** + * 开启的窗口数据预警 + */ + private final Integer windowTotalThreshold; + + /** + * 监听器 + */ + private final List> listeners; + + /** + * 窗前期线程 + */ + private final ScheduledExecutorService threadPoolExecutor; + + /** + * 窗口期时间长度 + */ + private final long duration; + + /** + * 窗口期单位 + */ + private final ChronoUnit chronoUnit; + + public static final ReentrantLock lock = new ReentrantLock();//创建锁对象 + + public SlidingWindow(int totalThreshold, + int windowTotalThreshold, + List> listeners, + ScheduledExecutorService threadPoolExecutor, + long duration, + ChronoUnit chronoUnit) { + this.totalThreshold = totalThreshold; + this.listeners = listeners; + this.windowTotalThreshold = windowTotalThreshold; + this.threadPoolExecutor = threadPoolExecutor; + this.duration = duration; + this.chronoUnit = chronoUnit; + } + + /** + * 添加数据 + * + * @param data 需要保存到窗口期内的数据 + */ + public void add(T data) { + + LocalDateTime now = LocalDateTime.now(); + if (isOpenNewWindow(now)) { + + lock.lock(); + LocalDateTime windowPeriod = now.plus(duration, chronoUnit); + try { + + // 防止开启两个间隔时间小于窗口期的窗口 + if (isOpenNewWindow(now)) { + ConcurrentLinkedQueue list = new ConcurrentLinkedQueue<>(); + list.add(data); + + LogUtils + .info(log, "添加新数据 [{}] [{}] size:[{}]", windowPeriod, Thread.currentThread().getName(), list.size()); + saveData.put(windowPeriod, list); + + // 扫描n-1个窗口,是否过期,过期则删除 + removeInvalidWindow(); + + // 超过窗口阈值预警 + alarmWindowTotal(); + + } else { + oldWindowAdd(data); + } + + } finally { + lock.unlock(); + } + + } else { + oldWindowAdd(data); + } + + } + + /** + * 超过窗口阈值预警 + */ + private void alarmWindowTotal() { + if (saveData.size() > windowTotalThreshold) { + log.warn("当前存活的窗口数量过多 总量:[{}] > 阈值:[{}] ", saveData.size(), windowTotalThreshold); + } + } + + /** + * 扫描n-1个窗口,是否过期,过期则删除 过期条件为窗口期内无数据 + */ + private void removeInvalidWindow() { + + for (int i = 0; i < saveData.size() - 1; i++) { + Map.Entry> firstEntry = saveData.firstEntry(); + if (CollectionUtils.isEmpty(firstEntry.getValue())) { + saveData.remove(firstEntry.getKey()); + } + } + } + + /** + * 往已存在的窗口期内添加数据 + * + * @param data 数据 + */ + private void oldWindowAdd(T data) { + + LocalDateTime windowPeriod = getNewWindowPeriod(); + + ConcurrentLinkedQueue list = saveData.get(windowPeriod); + list.add(data); + + if (list.size() >= totalThreshold) { + doHandlerListener(windowPeriod); + } + + } + + /** + * 处理通知 + * + * @param windowPeriod 窗口期时间 + */ + private void doHandlerListener(LocalDateTime windowPeriod) { + + lock.lock(); + + try { + + ConcurrentLinkedQueue list = saveData.get(windowPeriod); + if (CollectionUtils.isEmpty(list)) { + return; + } + + // 深拷贝 + ConcurrentLinkedQueue deepCopy = new ConcurrentLinkedQueue<>(list); + clear(windowPeriod, deepCopy); + + if (CollectionUtils.isEmpty(deepCopy)) { + return; + } + + for (Listener listener : listeners) { + listener.handler(new ArrayList<>(deepCopy)); + } + + } catch (Exception e) { + log.error("到达总量窗口期通知异常", e); + } finally { + lock.unlock(); + } + + } + + /** + * 删除2倍窗口期之前无效窗口 + * + * @param windowPeriod 当前最老窗口期 + */ + private void removeInvalidWindow(LocalDateTime windowPeriod) { + + LocalDateTime currentTime = LocalDateTime.now().minus(duration * 2, chronoUnit); + if (windowPeriod.isBefore(currentTime)) { + LogUtils.info(log, "删除过期窗口 windowPeriod:[{}] currentTime:[{}]", windowPeriod, currentTime); + saveData.remove(windowPeriod); + } + + } + + /** + * 获取窗口期 + * + * @return 窗口期时间 + */ + private LocalDateTime getOldWindowPeriod() { + return saveData.firstKey(); + } + + /** + * 获取窗口期 + * + * @return 窗口期时间 + */ + private LocalDateTime getNewWindowPeriod() { + return saveData.lastKey(); + } + + /** + * 是否开启新窗口期 + * + * @return true- 开启 false- 关闭 + */ + private boolean isOpenNewWindow(LocalDateTime now) { + + if (saveData.size() == 0) { + return true; + } + + LocalDateTime windowPeriod = getNewWindowPeriod(); + return windowPeriod.isBefore(now); + } + + /** + * 提取存储的第一个数据进行判断是否到达窗口期 + * + * @param condition 当前时间 + */ + private void extract(LocalDateTime condition) { + + if (saveData.size() == 0) { + return; + } + + LocalDateTime windowPeriod = getOldWindowPeriod(); + + // 删除过期窗口期数据 + removeInvalidWindow(windowPeriod); + + if (windowPeriod.isBefore(condition)) { + LogUtils.info(log, "到达时间窗口期 [{}] [{}]", windowPeriod, JsonUtil.toJsonString(saveData)); + doHandlerListener(windowPeriod); + } + } + + /** + * 清除已到达窗口期的数据 + * + * @param windowPeriod 窗口期时间 + */ + private void clear(LocalDateTime windowPeriod, ConcurrentLinkedQueue list) { + saveData.get(windowPeriod).removeAll(list); + } + + /** + * 滑动窗口启动 + */ + public void start() { + + threadPoolExecutor.scheduleAtFixedRate(() -> { + try { + extract(LocalDateTime.now().minus(duration, chronoUnit)); + } catch (Exception e) { + log.error("滑动窗口异常", e); + } + }, 1, 1, TimeUnit.SECONDS); + } + + /** + * 滑动窗口关闭 + */ + public void end() { + ConcurrentLinkedQueue list = saveData.get(LocalDateTime.now()); + if (CollectionUtils.isEmpty(list)) { + return; + } + for (Listener listener : listeners) { + listener.handler(new ArrayList<>(list)); + } + } + + /** + * 滑动窗口构建器 + * + * @param + */ + public static class Builder { + + /** + * 总量窗口期阈值 + */ + private Integer totalThreshold = 10; + + /** + * 窗口数量预警 + */ + private Integer windowTotalThreshold = 5; + + /** + * 监听器 + */ + private List> listeners; + + /** + * 窗前期线程 + */ + private ScheduledExecutorService threadPoolExecutor; + + /** + * 窗口期时间长度 + */ + private long duration = 10; + + /** + * 窗口期单位 + */ + private ChronoUnit chronoUnit = ChronoUnit.SECONDS; + + /** + * 创建一个新的构建器 + * + * @param + * @return this + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * 总量窗口期阈值 + * + * @param totalThreshold 总量窗口期阈值 + * @return this + */ + public Builder withTotalThreshold(int totalThreshold) { + Assert.isTrue(totalThreshold > 0, "总量窗口期阈值不能小于0"); + this.totalThreshold = totalThreshold; + return this; + } + + /** + * 窗口数量预警 + * + * @param windowTotalThreshold 窗口数量阈值 + * @return this + */ + public Builder withWindowTotalThreshold(int windowTotalThreshold) { + Assert.isTrue(windowTotalThreshold > 0, "窗口数量阈值不能小于0"); + this.windowTotalThreshold = windowTotalThreshold; + return this; + } + + /** + * 添加监听器 + * + * @param listener 监听器 + * @return this + */ + public Builder withListener(Listener listener) { + + if (CollectionUtils.isEmpty(listeners)) { + listeners = new ArrayList<>(); + } + + listeners.add(listener); + return this; + } + + /** + * 添加窗口期时间 + * + * @param duration 时长 + * @param chronoUnit 单位 + * @return this + */ + public Builder withDuration(long duration, ChronoUnit chronoUnit) { + this.duration = duration; + this.chronoUnit = chronoUnit; + return this; + } + + /** + * 添加定时调度线程池 + * + * @param threadPoolExecutor 线程池对象 + * @return this + */ + public Builder withScheduledExecutorServiced(ScheduledExecutorService threadPoolExecutor) { + this.threadPoolExecutor = threadPoolExecutor; + return this; + } + + /** + * 构建滑动窗口对象 + * + * @return {@link SlidingWindow} 滑动窗口对象 + */ + public SlidingWindow build() { + if (Objects.isNull(threadPoolExecutor)) { + threadPoolExecutor = Executors + .newSingleThreadScheduledExecutor(r -> new Thread(r, "SlidingWindowThread")); + } + + if (CollectionUtils.isEmpty(listeners)) { + listeners = Collections.EMPTY_LIST; + } + + return new SlidingWindow<>(totalThreshold, windowTotalThreshold, listeners, threadPoolExecutor, duration, + chronoUnit); + } + + } +} diff --git a/easy-retry-server/pom.xml b/easy-retry-server/pom.xml index 9e55529df..f6aea1b08 100644 --- a/easy-retry-server/pom.xml +++ b/easy-retry-server/pom.xml @@ -20,7 +20,6 @@ 1.5.3.Final 2.6.21 4.4.0 - 4.0.1 5.0.0-alpha.11 2.6 0.9.16 @@ -62,16 +61,6 @@ lombok true - - net.javacrumbs.shedlock - shedlock-provider-jdbc-template - ${shedlock.version} - - - net.javacrumbs.shedlock - shedlock-spring - ${shedlock.version} - mysql mysql-connector-java diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/EasyRetryServerApplication.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/EasyRetryServerApplication.java index 6727263d4..46cf1a311 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/EasyRetryServerApplication.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/EasyRetryServerApplication.java @@ -3,6 +3,9 @@ package com.aizuda.easy.retry.server; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.transaction.annotation.EnableTransactionManagement; import java.util.TimeZone; @@ -12,6 +15,14 @@ import java.util.TimeZone; @EnableTransactionManagement(proxyTargetClass = true) public class EasyRetryServerApplication { + @Bean + public TaskScheduler scheduledExecutorService() { + ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(2); + scheduler.setThreadNamePrefix("easy-retry-scheduled-thread-"); + return scheduler; + } + public static void main(String[] args) { TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai")); SpringApplication.run(EasyRetryServerApplication.class, args); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/MyBatisPlusConfig.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/MyBatisPlusConfig.java index bdfa6da84..9806bcb12 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/MyBatisPlusConfig.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/MyBatisPlusConfig.java @@ -1,6 +1,6 @@ package com.aizuda.easy.retry.server.config; -import com.baomidou.mybatisplus.annotation.DbType; +import com.aizuda.easy.retry.server.enums.DbTypeEnum; import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor; import com.baomidou.mybatisplus.extension.plugins.inner.DynamicTableNameInnerInterceptor; import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor; @@ -23,10 +23,11 @@ public class MyBatisPlusConfig { private final static List TABLES = Arrays.asList("retry_task", "retry_dead_letter"); @Bean - public MybatisPlusInterceptor mybatisPlusInterceptor() { + public MybatisPlusInterceptor mybatisPlusInterceptor(SystemProperties systemProperties) { MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor(); interceptor.addInnerInterceptor(dynamicTableNameInnerInterceptor()); - interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL)); + DbTypeEnum dbTypeEnum = systemProperties.getDbType(); + interceptor.addInnerInterceptor(new PaginationInnerInterceptor(dbTypeEnum.getMpDbType())); return interceptor; } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/ShedlockConfig.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/ShedlockConfig.java deleted file mode 100644 index 4f61ddd55..000000000 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/ShedlockConfig.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.aizuda.easy.retry.server.config; - -import net.javacrumbs.shedlock.core.LockProvider; -import net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider; -import net.javacrumbs.shedlock.spring.annotation.EnableSchedulerLock; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.scheduling.TaskScheduler; -import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; - -import javax.sql.DataSource; -import java.util.TimeZone; - -/** - * @author: www.byteblogs.com - * @date : 2021-11-22 10:56 - */ -@Configuration -//@EnableScheduling -//@EnableSchedulerLock(defaultLockAtMostFor = "PT30S") -@Deprecated -public class ShedlockConfig { - -// @Bean -// public LockProvider lockProvider(DataSource dataSource) { -// return new JdbcTemplateLockProvider( -// JdbcTemplateLockProvider.Configuration.builder() -// .withJdbcTemplate(new JdbcTemplate(dataSource)) -// .withTimeZone(TimeZone.getTimeZone("Asia/Shanghai")) -// .build() -// ); -// } - - @Bean - public TaskScheduler scheduledExecutorService() { - ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); - scheduler.setPoolSize(2); - scheduler.setThreadNamePrefix("easy-retry-scheduled-thread-"); - return scheduler; - } -} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/SystemProperties.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/SystemProperties.java index 33d523189..3d44502c5 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/SystemProperties.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/SystemProperties.java @@ -1,5 +1,6 @@ package com.aizuda.easy.retry.server.config; +import com.aizuda.easy.retry.server.enums.DbTypeEnum; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; @@ -51,6 +52,11 @@ public class SystemProperties { */ private int logStorage = 90; + /** + * 数据库类型 + */ + private DbTypeEnum dbType = DbTypeEnum.MYSQL; + /** * 回调配置 */ diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/enums/DatabaseProductEnum.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/enums/DatabaseProductEnum.java deleted file mode 100644 index d250b0bf1..000000000 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/enums/DatabaseProductEnum.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.aizuda.easy.retry.server.enums; - -import lombok.AllArgsConstructor; -import lombok.Getter; - -/** - * 锁的存储介质 - * - * @author www.byteblogs.com - * @date 2023-06-04 - * @since 2.0 - */ -@AllArgsConstructor -@Getter -public enum DatabaseProductEnum { - MYSQL("mysql", "MySql数据库"), - MARIADB("mariadb", "MariaDB数据库"), - POSTGRE_SQL("postgresql", "Postgre数据库"), - OTHER("other", "其他数据库"); - - private final String db; - private final String desc; -} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/enums/DbTypeEnum.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/enums/DbTypeEnum.java new file mode 100644 index 000000000..00066e628 --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/enums/DbTypeEnum.java @@ -0,0 +1,35 @@ +package com.aizuda.easy.retry.server.enums; + +import com.aizuda.easy.retry.server.exception.EasyRetryServerException; +import com.baomidou.mybatisplus.annotation.DbType; +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * 锁的存储介质 + * + * @author www.byteblogs.com + * @date 2023-06-04 + * @since 2.0 + */ +@AllArgsConstructor +@Getter +public enum DbTypeEnum { + MYSQL("mysql", "MySql数据库", DbType.MYSQL), + MARIADB("mariadb", "MariaDB数据库", DbType.MARIADB), + POSTGRE_SQL("postgresql", "Postgre数据库", DbType.POSTGRE_SQL); + + private final String db; + private final String desc; + private final DbType mpDbType; + + public static DbTypeEnum modeOf(String db) { + for (DbTypeEnum value : DbTypeEnum.values()) { + if (value.getDb() == db) { + return value; + } + } + + throw new EasyRetryServerException("暂不支持此数据库 [{}]", db); + } +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/lock/JdbcLockAccess.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/lock/JdbcLockAccess.java index 0f36cb3b3..14828960d 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/lock/JdbcLockAccess.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/lock/JdbcLockAccess.java @@ -2,7 +2,7 @@ package com.aizuda.easy.retry.server.persistence.support.access.lock; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.dto.LockConfig; -import com.aizuda.easy.retry.server.enums.DatabaseProductEnum; +import com.aizuda.easy.retry.server.enums.DbTypeEnum; import com.aizuda.easy.retry.server.persistence.mybatis.mapper.DistributedLockMapper; import com.aizuda.easy.retry.server.persistence.mybatis.po.DistributedLock; import com.aizuda.easy.retry.server.support.register.ServerRegister; @@ -38,13 +38,14 @@ public class JdbcLockAccess extends AbstractLockAccess { @Override public boolean supports(final String storageMedium) { return Arrays.asList( - DatabaseProductEnum.MYSQL.getDb(), - DatabaseProductEnum.MARIADB.getDb(), - DatabaseProductEnum.POSTGRE_SQL.getDb() + DbTypeEnum.MYSQL.getDb(), + DbTypeEnum.MARIADB.getDb(), + DbTypeEnum.POSTGRE_SQL.getDb() ).contains(storageMedium); } @Override + public boolean unlock(final LockConfig lockConfig) { LocalDateTime now = lockConfig.getCreateDt(); DistributedLock distributedLock = new DistributedLock(); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheLockRecord.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheLockRecord.java index 31f118cde..7e5f0730b 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheLockRecord.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheLockRecord.java @@ -1,6 +1,5 @@ package com.aizuda.easy.retry.server.support.cache; -import akka.actor.ActorRef; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.support.Lifecycle; import com.google.common.cache.Cache; @@ -10,11 +9,9 @@ import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; -import java.util.Collections; -import java.util.Set; -import java.util.WeakHashMap; - /** + * 缓存本地的分布式锁的名称 + * * @author www.byteblogs.com * @date 2023-07-20 22:53:21 * @since 2.1.0 diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/DistributedLockHandler.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/DistributedLockHandler.java deleted file mode 100644 index eafc18383..000000000 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/DistributedLockHandler.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.aizuda.easy.retry.server.support.handler; - -import com.aizuda.easy.retry.server.persistence.support.LockAccess; -import com.aizuda.easy.retry.server.support.cache.CacheLockRecord; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -/** - * @author www.byteblogs.com - * @date 2023-07-20 22:28:35 - * @since 2.1.0 - */ -@Component -public class DistributedLockHandler { - @Autowired - private LockAccess lockAccess; - - -} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/AbstractSchedule.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/AbstractSchedule.java index 14e8b89f9..743c515bc 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/AbstractSchedule.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/AbstractSchedule.java @@ -2,9 +2,11 @@ package com.aizuda.easy.retry.server.support.schedule; import cn.hutool.core.lang.Assert; import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.config.SystemProperties; import com.aizuda.easy.retry.server.dto.LockConfig; import com.aizuda.easy.retry.server.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.persistence.support.LockAccess; +import com.aizuda.easy.retry.server.support.Lifecycle; import com.aizuda.easy.retry.server.support.Schedule; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -13,6 +15,7 @@ import org.springframework.scheduling.TaskScheduler; import java.time.Duration; import java.time.LocalDateTime; +import java.util.List; /** * @author: www.byteblogs.com @@ -26,7 +29,9 @@ public abstract class AbstractSchedule implements Schedule { @Qualifier("scheduledExecutorService") protected TaskScheduler taskScheduler; @Autowired - private LockAccess lockAccess; + private List lockAccesses; + @Autowired + private SystemProperties systemProperties; @Override public void execute() { @@ -39,6 +44,8 @@ public abstract class AbstractSchedule implements Schedule { Assert.notBlank(lockName, () -> new EasyRetryServerException("lockName can not be null.")); LockConfig lockConfig = new LockConfig(LocalDateTime.now(), lockName, Duration.parse(lockAtMost), Duration.parse(lockAtLeast)); + + LockAccess lockAccess = getLockAccess(); try { if (lockAccess.lock(lockConfig)) { doExecute(); @@ -57,6 +64,12 @@ public abstract class AbstractSchedule implements Schedule { abstract String lockAtMost(); - abstract String lockAtLeast(); + abstract String lockAtLeast(); + + private LockAccess getLockAccess() { + return lockAccesses.stream() + .filter(lockAccess -> lockAccess.supports(systemProperties.getDbType().getDb())) + .findFirst().orElseThrow(() -> new EasyRetryServerException("未找到合适锁处理器")); + } } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/AlarmNotifyThreadSchedule.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/AlarmNotifyThreadSchedule.java deleted file mode 100644 index bbb39c640..000000000 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/AlarmNotifyThreadSchedule.java +++ /dev/null @@ -1,132 +0,0 @@ -package com.aizuda.easy.retry.server.support.schedule; - -import com.aizuda.easy.retry.common.core.alarm.Alarm; -import com.aizuda.easy.retry.common.core.alarm.AlarmContext; -import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory; -import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum; -import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; -import com.aizuda.easy.retry.common.core.log.LogUtils; -import com.aizuda.easy.retry.common.core.util.EnvironmentUtils; -import com.aizuda.easy.retry.common.core.util.HostUtils; -import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryDeadLetterMapper; -import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskMapper; -import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig; -import com.aizuda.easy.retry.server.persistence.mybatis.po.NotifyConfig; -import com.aizuda.easy.retry.server.persistence.support.ConfigAccess; -import lombok.extern.slf4j.Slf4j; -import net.javacrumbs.shedlock.spring.annotation.SchedulerLock; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; -import org.springframework.util.CollectionUtils; - -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.List; - -/** - * @author: www.byteblogs.com - * @date : 2021-11-24 14:58 - */ -@Component -@Slf4j -@Deprecated -public class AlarmNotifyThreadSchedule { - - private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - - private static String retryErrorMoreThresholdTextMessageFormatter = - "{}环境 重试失败数据监控 \n" + - "> 名称:{} \n" + - "> 时间窗口:{} ~ {} \n" + - "> **共计:{}** \n"; - - private static String retryTaskMoreThresholdTextMessageFormatter = - "{}环境 重试数据监控 \n" + - "> 名称:{} \n" + - "> 时间:{} \n" + - "> **共计:{}** \n"; - - @Autowired - private RetryDeadLetterMapper retryDeadLetterMapper; - @Autowired - private RetryTaskMapper retryTaskMapper; - @Autowired - private EasyRetryAlarmFactory easyRetryAlarmFactory; - @Autowired - @Qualifier("configAccessProcessor") - private ConfigAccess configAccess; - - /** - * 监控重试表中数据总量是否到达阈值 - */ -// @Scheduled(cron = "0 0/10 * * * ?") -// @SchedulerLock(name = "retryTaskMoreThreshold", lockAtMostFor = "PT10M", lockAtLeastFor = "PT10M") - public void retryTaskMoreThreshold() { - LogUtils.info(log, "retryTaskMoreThreshold time[{}] ip:[{}]", LocalDateTime.now(), HostUtils.getIp()); - - for (GroupConfig groupConfig : configAccess.getAllConfigGroupList()) { - List notifyConfigs = configAccess.getNotifyConfigByGroupName(groupConfig.getGroupName(), NotifySceneEnum.MAX_RETRY.getNotifyScene()); - if (CollectionUtils.isEmpty(notifyConfigs)) { - continue; - } - - int count = retryTaskMapper.countAllRetryTaskByRetryStatus(groupConfig.getGroupPartition(), RetryStatusEnum.RUNNING.getStatus()); - for (NotifyConfig notifyConfig : notifyConfigs) { - if (count > notifyConfig.getNotifyThreshold()) { - // 预警 - AlarmContext context = AlarmContext.build() - .text(retryTaskMoreThresholdTextMessageFormatter, - EnvironmentUtils.getActiveProfile(), - groupConfig.getGroupName(), - LocalDateTime.now().format(formatter), - count) - .title("组:[{}])重试数据过多", groupConfig.getGroupName()) - .notifyAttribute(notifyConfig.getNotifyAttribute()); - - Alarm alarmType = easyRetryAlarmFactory.getAlarmType(notifyConfig.getNotifyType()); - alarmType.asyncSendMessage(context); - } - } - } - } - - /** - * 监控重试失败数据总量是否到达阈值 - */ -// @Scheduled(cron = "0 0/11 * * * ?") -// @SchedulerLock(name = "retryErrorMoreThreshold", lockAtMostFor = "PT11M", lockAtLeastFor = "PT11M") - public void retryErrorMoreThreshold() { - LogUtils.info(log, "retryErrorMoreThreshold time[{}] ip:[{}]", LocalDateTime.now(), HostUtils.getIp()); - - for (GroupConfig groupConfig : configAccess.getAllConfigGroupList()) { - List notifyConfigs = configAccess.getNotifyConfigByGroupName(groupConfig.getGroupName(), NotifySceneEnum.MAX_RETRY_ERROR.getNotifyScene()); - if (CollectionUtils.isEmpty(notifyConfigs)) { - continue; - } - - // x分钟内进入死信队列的数据量 - LocalDateTime now = LocalDateTime.now(); - int count = retryDeadLetterMapper.countRetryDeadLetterByCreateAt(now.minusMinutes(30), now, groupConfig.getGroupPartition()); - - for (NotifyConfig notifyConfig : notifyConfigs) { - if (count > notifyConfig.getNotifyThreshold()) { - // 预警 - AlarmContext context = AlarmContext.build() - .text(retryErrorMoreThresholdTextMessageFormatter, - EnvironmentUtils.getActiveProfile(), - groupConfig.getGroupName(), - now.minusMinutes(30).format(formatter), - now.format(formatter), - count) - .title("组:[{}] 环境重试失败数据监控", groupConfig.getGroupName()) - .notifyAttribute(notifyConfig.getNotifyAttribute()); - - Alarm alarmType = easyRetryAlarmFactory.getAlarmType(notifyConfig.getNotifyType()); - alarmType.asyncSendMessage(context); - } - } - } - } -} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/ClearThreadSchedule.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/ClearThreadSchedule.java deleted file mode 100644 index f58e44c12..000000000 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/ClearThreadSchedule.java +++ /dev/null @@ -1,122 +0,0 @@ -package com.aizuda.easy.retry.server.support.schedule; - -import com.aizuda.easy.retry.common.core.log.LogUtils; -import com.aizuda.easy.retry.server.config.SystemProperties; -import com.aizuda.easy.retry.server.dto.RegisterNodeInfo; -import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskLogMapper; -import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskLogMessageMapper; -import com.aizuda.easy.retry.server.persistence.mybatis.mapper.ServerNodeMapper; -import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTaskLog; -import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTaskLogMessage; -import com.aizuda.easy.retry.server.persistence.support.ConfigAccess; -import com.aizuda.easy.retry.server.service.RetryService; -import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable; -import com.aizuda.easy.retry.server.support.register.ServerRegister; -import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; -import lombok.extern.slf4j.Slf4j; -import net.javacrumbs.shedlock.spring.annotation.SchedulerLock; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; -import org.springframework.util.CollectionUtils; - -import java.time.LocalDateTime; -import java.util.Set; -import java.util.stream.Collectors; - -/** - * 清除数据线程调度器 - * - * @author: www.byteblogs.com - * @date : 2021-11-22 11:00 - */ -@Component -@Slf4j -@Deprecated -public class ClearThreadSchedule { - - @Autowired - private ServerNodeMapper serverNodeMapper; - - @Autowired - private RetryService retryService; - - @Autowired - @Qualifier("configAccessProcessor") - private ConfigAccess configAccess; - - @Autowired - private RetryTaskLogMessageMapper retryTaskLogMessageMapper; - @Autowired - private RetryTaskLogMapper retryTaskLogMapper; - @Autowired - private SystemProperties systemProperties; - - /** - * 删除过期下线机器 - */ -// @Scheduled(fixedRate = 5000) -// @SchedulerLock(name = "clearOfflineNode", lockAtMostFor = "PT10s", lockAtLeastFor = "PT5s") - public void clearOfflineNode() { - - try { - // 删除内存缓存的待下线的机器 - LocalDateTime endTime = LocalDateTime.now().minusSeconds(ServerRegister.DELAY_TIME + (ServerRegister.DELAY_TIME / 3)); - - // 先删除DB中需要下线的机器 - serverNodeMapper.deleteByExpireAt(endTime); - - Set allPods = CacheRegisterTable.getAllPods(); - Set waitOffline = allPods.stream().filter(registerNodeInfo -> registerNodeInfo.getExpireAt().isBefore(endTime)).collect(Collectors.toSet()); - Set podIds = waitOffline.stream().map(RegisterNodeInfo::getHostId).collect(Collectors.toSet()); - if (CollectionUtils.isEmpty(podIds)) { - return; - } - - for (final RegisterNodeInfo registerNodeInfo : waitOffline) { - CacheRegisterTable.remove(registerNodeInfo.getGroupName(), registerNodeInfo.getHostId()); - } - - } catch (Exception e) { - LogUtils.error(log, "clearOfflineNode 失败", e); - } - - } - - /** - * 删除重试完成的和重试到达最大重试次数的数据迁移到死信队列表 - */ -// @Scheduled(cron = "0 0 0/1 * * ?") -// @SchedulerLock(name = "clearFinishAndMoveDeadLetterRetryTask", lockAtMostFor = "PT60s", lockAtLeastFor = "PT60s") - public void clearFinishAndMoveDeadLetterRetryTask() { - - try { - Set groupNameList = configAccess.getGroupNameList(); - - for (String groupName : groupNameList) { - retryService.moveDeadLetterAndDelFinish(groupName); - } - - } catch (Exception e) { - LogUtils.error(log, "clearFinishAndMoveDeadLetterRetryTask 失败", e); - } - - } - - /** - * 清理日志 一小时运行一次 - */ -// @Scheduled(cron = "0 0 0/1 * * ? ") -// @SchedulerLock(name = "clearLog", lockAtMostFor = "PT1H", lockAtLeastFor = "PT1H") - public void clearLog() { - try { - LocalDateTime endTime = LocalDateTime.now().minusDays(systemProperties.getLogStorage()); - retryTaskLogMapper.delete(new LambdaUpdateWrapper().le(RetryTaskLog::getCreateDt, endTime)); - retryTaskLogMessageMapper.delete(new LambdaUpdateWrapper().le(RetryTaskLogMessage::getCreateDt, endTime)); - } catch (Exception e) { - LogUtils.error(log, "clear log error", e); - } - } - -} diff --git a/easy-retry-server/src/main/resources/application.yml b/easy-retry-server/src/main/resources/application.yml index bdada8599..e1433efba 100644 --- a/easy-retry-server/src/main/resources/application.yml +++ b/easy-retry-server/src/main/resources/application.yml @@ -48,6 +48,7 @@ easy-retry: callback: # 回调配置 max-count: 288 #回调最大执行次数 trigger-interval: 900 #间隔时间 + db-type: mysql