From 805e617161268ddd0e86950581225185fcbf153d Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Fri, 21 Jul 2023 18:48:03 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.1.0=201.=E5=88=86=E5=B8=83=E5=BC=8F?= =?UTF-8?q?=E9=94=81=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/strategy/LocalRetryStrategies.java | 15 ++- .../common/core/constant/SystemConstants.java | 6 + .../retry/server/config/ShedlockConfig.java | 23 ++-- .../easy/retry/server/dto/LockConfig.java | 53 +++++++++ .../server/enums/DatabaseProductEnum.java | 23 ++++ .../server/persistence/support/Access.java | 2 +- .../persistence/support/LockAccess.java | 9 +- .../access/lock/AbstractLockAccess.java | 43 ++++++- .../support/access/lock/JdbcLockAccess.java | 106 +++++++++++++++++ .../easy/retry/server/support/Schedule.java | 12 ++ .../server/support/cache/CacheLockRecord.java | 9 ++ .../handler/DistributedLockHandler.java | 15 --- .../support/schedule/AbstractSchedule.java | 62 ++++++++++ .../schedule/AlarmNotifyThreadSchedule.java | 9 +- .../support/schedule/ClearLogSchedule.java | 73 ++++++++++++ .../support/schedule/ClearThreadSchedule.java | 13 +- .../support/schedule/OfflineNodeSchedule.java | 85 ++++++++++++++ .../RetryErrorMoreThresholdAlarmSchedule.java | 111 ++++++++++++++++++ .../RetryTaskMoreThresholdAlarmSchedule.java | 107 +++++++++++++++++ .../support/schedule/RetryTaskSchedule.java | 70 +++++++++++ 20 files changed, 801 insertions(+), 45 deletions(-) create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/dto/LockConfig.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/enums/DatabaseProductEnum.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/lock/JdbcLockAccess.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/Schedule.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/AbstractSchedule.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/ClearLogSchedule.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/OfflineNodeSchedule.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/RetryErrorMoreThresholdAlarmSchedule.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/RetryTaskMoreThresholdAlarmSchedule.java create mode 100644 easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/RetryTaskSchedule.java diff --git a/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/strategy/LocalRetryStrategies.java b/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/strategy/LocalRetryStrategies.java index bc9ff4bb5..9aa563372 100644 --- a/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/strategy/LocalRetryStrategies.java +++ b/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/strategy/LocalRetryStrategies.java @@ -151,7 +151,20 @@ public class LocalRetryStrategies extends AbstractRetryStrategies { return Collections.singletonList(new RetryListener() { @Override public void onRetry(Attempt attempt) { - LogUtils.info(log,"easy-retry 本地重试,第[{}]次调度", attempt.getAttemptNumber()); + + RetryType retryType = retryerInfo.getRetryType(); + switch (retryType) { + case ONLY_LOCAL: + case LOCAL_REMOTE: + LogUtils.info(log,"[{}]执行本地重试,第[{}]次调度", retryerInfo.getScene(), attempt.getAttemptNumber()); + break; + case ONLY_REMOTE: + LogUtils.info(log,"[{}]执行远程重试,第[{}]次调度", retryerInfo.getScene(), attempt.getAttemptNumber()); + break; + default: + throw new EasyRetryClientException("异常重试模式 [{}]", retryType.name()); + + } } }); } diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java index 7a2b758a8..1331ce662 100644 --- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java @@ -1,5 +1,7 @@ package com.aizuda.easy.retry.common.core.constant; +import java.time.format.DateTimeFormatter; + /** * 系统通用常量 * @@ -72,4 +74,8 @@ public interface SystemConstants { " |___\\__,_/__/\\_, | |_|_\\___|\\__|_| \\_, |\n" + " |__/ |__/ \n" + " :: Easy Retry :: (v{}) \n"; + + interface DATE_FORMAT { + DateTimeFormatter YYYYMMDDHHMMSS = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + } } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/ShedlockConfig.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/ShedlockConfig.java index 8567aa61f..4f61ddd55 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/ShedlockConfig.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/config/ShedlockConfig.java @@ -18,21 +18,22 @@ import java.util.TimeZone; * @date : 2021-11-22 10:56 */ @Configuration -@EnableScheduling -@EnableSchedulerLock(defaultLockAtMostFor = "PT30S") +//@EnableScheduling +//@EnableSchedulerLock(defaultLockAtMostFor = "PT30S") +@Deprecated public class ShedlockConfig { // @Bean - public LockProvider lockProvider(DataSource dataSource) { - return new JdbcTemplateLockProvider( - JdbcTemplateLockProvider.Configuration.builder() - .withJdbcTemplate(new JdbcTemplate(dataSource)) - .withTimeZone(TimeZone.getTimeZone("Asia/Shanghai")) - .build() - ); - } +// public LockProvider lockProvider(DataSource dataSource) { +// return new JdbcTemplateLockProvider( +// JdbcTemplateLockProvider.Configuration.builder() +// .withJdbcTemplate(new JdbcTemplate(dataSource)) +// .withTimeZone(TimeZone.getTimeZone("Asia/Shanghai")) +// .build() +// ); +// } -// @Bean + @Bean public TaskScheduler scheduledExecutorService() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(2); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/dto/LockConfig.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/dto/LockConfig.java new file mode 100644 index 000000000..695a5d62d --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/dto/LockConfig.java @@ -0,0 +1,53 @@ +package com.aizuda.easy.retry.server.dto; + +import cn.hutool.core.lang.Assert; +import com.aizuda.easy.retry.server.exception.EasyRetryServerException; +import lombok.Getter; + +import java.time.Duration; +import java.time.LocalDateTime; + +/** + * @author: www.byteblogs.com + * @date : 2023-07-21 08:43 + * @since 2.1.0 + */ +public class LockConfig { + + private final LocalDateTime createDt; + + private final String lockName; + + private final Duration lockAtMost; + + private final Duration lockAtLeast; + + public LockConfig(final LocalDateTime createDt, final String lockName, final Duration lockAtMost, final Duration lockAtLeast) { + this.lockName = lockName; + this.lockAtMost = lockAtMost; + this.lockAtLeast = lockAtLeast; + this.createDt = createDt; + Assert.notNull(createDt, () -> new EasyRetryServerException("createDt can not be null.")); + Assert.notBlank(lockName, () -> new EasyRetryServerException("lockName can not be null.")); + Assert.notNull(lockAtMost, () -> new EasyRetryServerException("lockAtMost can not be null. lockName:[{}]", lockName)); + Assert.isFalse(lockAtMost.isNegative(), () -> new EasyRetryServerException("lockAtMost is negative. lockName:[{}]", lockName)); + Assert.notNull(lockAtLeast, () -> new EasyRetryServerException("lockAtLeast can not be null. lockName:[{}]", lockName)); + Assert.isFalse(lockAtLeast.compareTo(lockAtMost) > 0, () -> new EasyRetryServerException("lockAtLeast is longer than lockAtMost for lock. lockName:[{}]", lockName)); + } + + public LocalDateTime getCreateDt() { + return createDt; + } + + public String getLockName() { + return lockName; + } + + public LocalDateTime getLockAtMost() { + return createDt.plus(lockAtMost); + } + + public LocalDateTime getLockAtLeast() { + return createDt.plus(lockAtLeast); + } +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/enums/DatabaseProductEnum.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/enums/DatabaseProductEnum.java new file mode 100644 index 000000000..d250b0bf1 --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/enums/DatabaseProductEnum.java @@ -0,0 +1,23 @@ +package com.aizuda.easy.retry.server.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * 锁的存储介质 + * + * @author www.byteblogs.com + * @date 2023-06-04 + * @since 2.0 + */ +@AllArgsConstructor +@Getter +public enum DatabaseProductEnum { + MYSQL("mysql", "MySql数据库"), + MARIADB("mariadb", "MariaDB数据库"), + POSTGRE_SQL("postgresql", "Postgre数据库"), + OTHER("other", "其他数据库"); + + private final String db; + private final String desc; +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/Access.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/Access.java index 591d1d9e2..9b0b44761 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/Access.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/Access.java @@ -7,5 +7,5 @@ package com.aizuda.easy.retry.server.persistence.support; */ public interface Access { - boolean supports(int type); + boolean supports(String storageMedium); } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/LockAccess.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/LockAccess.java index 715ae9f28..4d689ea06 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/LockAccess.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/LockAccess.java @@ -1,7 +1,6 @@ package com.aizuda.easy.retry.server.persistence.support; -import net.javacrumbs.shedlock.core.LockConfiguration; -import org.jetbrains.annotations.NotNull; +import com.aizuda.easy.retry.server.dto.LockConfig; /** * @author www.byteblogs.com @@ -10,10 +9,8 @@ import org.jetbrains.annotations.NotNull; */ public interface LockAccess extends Access { - boolean insertRecord(); + boolean lock(LockConfig lockConfig); - boolean updateRecord(); - - void unlock(); + boolean unlock(LockConfig lockConfig); } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/lock/AbstractLockAccess.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/lock/AbstractLockAccess.java index ddcbc1248..508ce4a9c 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/lock/AbstractLockAccess.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/lock/AbstractLockAccess.java @@ -1,11 +1,52 @@ package com.aizuda.easy.retry.server.persistence.support.access.lock; +import com.aizuda.easy.retry.server.dto.LockConfig; import com.aizuda.easy.retry.server.persistence.support.LockAccess; +import com.aizuda.easy.retry.server.support.cache.CacheLockRecord; +import org.jetbrains.annotations.NotNull; /** * @author www.byteblogs.com * @date 2023-07-20 22:46:14 - * @since + * @since 2.1.0 */ public abstract class AbstractLockAccess implements LockAccess { + + @Override + public boolean lock(final LockConfig lockConfig) { + + String lockName = lockConfig.getLockName(); + + boolean tryToCreateLockRecord = CacheLockRecord.lockRecordRecentlyCreated(lockName); + if (!tryToCreateLockRecord) { + if (doLock(lockConfig)) { + CacheLockRecord.addLockRecord(lockName); + return true; + } + + CacheLockRecord.addLockRecord(lockName); + } + + try { + return doLockAfter(lockConfig); + } catch (Exception e) { + if (tryToCreateLockRecord) { + CacheLockRecord.remove(lockName); + } + + throw e; + } + } + + protected boolean doLockAfter(LockConfig lockConfig) { + return updateRecord(lockConfig); + } + + protected boolean doLock(@NotNull final LockConfig lockConfig) { + return insertRecord(lockConfig); + } + + protected abstract boolean insertRecord(@NotNull final LockConfig lockConfig); + + protected abstract boolean updateRecord(@NotNull final LockConfig lockConfig); } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/lock/JdbcLockAccess.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/lock/JdbcLockAccess.java new file mode 100644 index 000000000..0f36cb3b3 --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/persistence/support/access/lock/JdbcLockAccess.java @@ -0,0 +1,106 @@ +package com.aizuda.easy.retry.server.persistence.support.access.lock; + +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.dto.LockConfig; +import com.aizuda.easy.retry.server.enums.DatabaseProductEnum; +import com.aizuda.easy.retry.server.persistence.mybatis.mapper.DistributedLockMapper; +import com.aizuda.easy.retry.server.persistence.mybatis.po.DistributedLock; +import com.aizuda.easy.retry.server.support.register.ServerRegister; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; +import org.springframework.dao.ConcurrencyFailureException; +import org.springframework.dao.DataIntegrityViolationException; +import org.springframework.dao.DuplicateKeyException; +import org.springframework.jdbc.BadSqlGrammarException; +import org.springframework.jdbc.UncategorizedSQLException; +import org.springframework.stereotype.Component; +import org.springframework.transaction.TransactionSystemException; + +import java.time.LocalDateTime; +import java.util.Arrays; + +/** + * 基于DB实现的分布式锁 + * + * @author: www.byteblogs.com + * @date : 2023-07-21 08:34 + * @since 2.1.0 + */ +@Component +@RequiredArgsConstructor +@Slf4j +public class JdbcLockAccess extends AbstractLockAccess { + + private final DistributedLockMapper distributedLockMapper; + + @Override + public boolean supports(final String storageMedium) { + return Arrays.asList( + DatabaseProductEnum.MYSQL.getDb(), + DatabaseProductEnum.MARIADB.getDb(), + DatabaseProductEnum.POSTGRE_SQL.getDb() + ).contains(storageMedium); + } + + @Override + public boolean unlock(final LockConfig lockConfig) { + LocalDateTime now = lockConfig.getCreateDt(); + DistributedLock distributedLock = new DistributedLock(); + distributedLock.setLockedBy(ServerRegister.CURRENT_CID); + distributedLock.setLockedAt(now); + LocalDateTime lockAtLeast = lockConfig.getLockAtLeast(); + distributedLock.setLockUntil(now.isBefore(lockAtLeast) ? lockAtLeast : now); + + for (int i = 0; i < 10; i++) { + try { + return distributedLockMapper.update(distributedLock, new LambdaUpdateWrapper() + .eq(DistributedLock::getName, lockConfig.getLockName())) > 0; + } catch (Exception e) { + LogUtils.error(log, "unlock error. retrying attempt [{}] ", i, e); + } + } + + return false; + } + + @Override + protected boolean insertRecord(@NotNull final LockConfig lockConfig) { + + try { + LocalDateTime now = lockConfig.getCreateDt(); + DistributedLock distributedLock = new DistributedLock(); + distributedLock.setName(lockConfig.getLockName()); + distributedLock.setLockedBy(ServerRegister.CURRENT_CID); + distributedLock.setLockedAt(now); + distributedLock.setLockUntil(lockConfig.getLockAtMost()); + distributedLock.setCreateDt(now); + distributedLock.setUpdateDt(now); + return distributedLockMapper.insert(distributedLock) > 0; + } catch (DuplicateKeyException | ConcurrencyFailureException | TransactionSystemException e) { + LogUtils.warn(log,"Duplicate key. lockName:[{}]", lockConfig.getLockName()); + return false; + } catch (DataIntegrityViolationException | BadSqlGrammarException | UncategorizedSQLException e) { + LogUtils.error(log,"Unexpected exception. lockName:[{}]", lockConfig.getLockName(), e); + return false; + } + + + } + + @Override + protected boolean updateRecord(@NotNull final LockConfig lockConfig) { + LocalDateTime now = lockConfig.getCreateDt(); + DistributedLock distributedLock = new DistributedLock(); + distributedLock.setLockedBy(ServerRegister.CURRENT_CID); + distributedLock.setLockedAt(now); + distributedLock.setLockUntil(lockConfig.getLockAtMost()); + return distributedLockMapper.update(distributedLock, new LambdaUpdateWrapper() + .eq(DistributedLock::getName, lockConfig.getLockName()) + .le(DistributedLock::getLockUntil, now)) > 0; + } + + + +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/Schedule.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/Schedule.java new file mode 100644 index 000000000..219f25040 --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/Schedule.java @@ -0,0 +1,12 @@ +package com.aizuda.easy.retry.server.support; + +/** + * @author: www.byteblogs.com + * @date : 2023-07-21 13:02 + * @since 2.1.0 + */ +public interface Schedule { + + void execute(); + +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheLockRecord.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheLockRecord.java index 911da62d9..31f118cde 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheLockRecord.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/cache/CacheLockRecord.java @@ -6,6 +6,9 @@ import com.aizuda.easy.retry.server.support.Lifecycle; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import lombok.extern.slf4j.Slf4j; +import org.springframework.core.Ordered; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; import java.util.Collections; import java.util.Set; @@ -17,6 +20,8 @@ import java.util.WeakHashMap; * @since 2.1.0 */ @Slf4j +@Component +@Order(Ordered.HIGHEST_PRECEDENCE) public class CacheLockRecord implements Lifecycle { private static Cache CACHE; @@ -32,6 +37,10 @@ public class CacheLockRecord implements Lifecycle { return CACHE.size(); } + public static void remove(String lockName) { + CACHE.invalidate(lockName); + } + public static void clear() { CACHE.invalidateAll(); } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/DistributedLockHandler.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/DistributedLockHandler.java index b5a0a9114..eafc18383 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/DistributedLockHandler.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/handler/DistributedLockHandler.java @@ -1,6 +1,5 @@ package com.aizuda.easy.retry.server.support.handler; -import com.aizuda.easy.retry.server.persistence.mybatis.mapper.DistributedLockMapper; import com.aizuda.easy.retry.server.persistence.support.LockAccess; import com.aizuda.easy.retry.server.support.cache.CacheLockRecord; import org.springframework.beans.factory.annotation.Autowired; @@ -16,19 +15,5 @@ public class DistributedLockHandler { @Autowired private LockAccess lockAccess; - public boolean lock(String lockName) { - // 先本地缓存锁定信息 - if (!CacheLockRecord.lockRecordRecentlyCreated(lockName)) { - if (lockAccess.insertRecord()) { - CacheLockRecord.addLockRecord(lockName); - return true; - } - // we were not to create the record, it already exists, let's put it to the cache so we do not try again - CacheLockRecord.addLockRecord(lockName); - } - - return lockAccess.updateRecord(); - } - } diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/AbstractSchedule.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/AbstractSchedule.java new file mode 100644 index 000000000..14e8b89f9 --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/AbstractSchedule.java @@ -0,0 +1,62 @@ +package com.aizuda.easy.retry.server.support.schedule; + +import cn.hutool.core.lang.Assert; +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.dto.LockConfig; +import com.aizuda.easy.retry.server.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.persistence.support.LockAccess; +import com.aizuda.easy.retry.server.support.Schedule; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.TaskScheduler; + +import java.time.Duration; +import java.time.LocalDateTime; + +/** + * @author: www.byteblogs.com + * @date : 2023-07-21 13:04 + * @since 2.1.0 + */ +@Slf4j +public abstract class AbstractSchedule implements Schedule { + + @Autowired + @Qualifier("scheduledExecutorService") + protected TaskScheduler taskScheduler; + @Autowired + private LockAccess lockAccess; + + @Override + public void execute() { + + String lockName = lockName(); + String lockAtMost = lockAtMost(); + String lockAtLeast = lockAtLeast(); + Assert.notBlank(lockAtMost, () -> new EasyRetryServerException("lockAtLeast can not be null.")); + Assert.notBlank(lockAtLeast, () -> new EasyRetryServerException("lockAtLeast can not be null.")); + Assert.notBlank(lockName, () -> new EasyRetryServerException("lockName can not be null.")); + + LockConfig lockConfig = new LockConfig(LocalDateTime.now(), lockName, Duration.parse(lockAtMost), Duration.parse(lockAtLeast)); + try { + if (lockAccess.lock(lockConfig)) { + doExecute(); + } + } catch (Exception e) { + LogUtils.error(log, this.getClass().getName() + " execute error. lockName:[{}]", lockName, e); + } finally { + lockAccess.unlock(lockConfig); + } + + } + + protected abstract void doExecute(); + + abstract String lockName(); + + abstract String lockAtMost(); + + abstract String lockAtLeast(); + +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/AlarmNotifyThreadSchedule.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/AlarmNotifyThreadSchedule.java index f32eeb95f..bbb39c640 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/AlarmNotifyThreadSchedule.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/AlarmNotifyThreadSchedule.java @@ -31,6 +31,7 @@ import java.util.List; */ @Component @Slf4j +@Deprecated public class AlarmNotifyThreadSchedule { private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); @@ -60,8 +61,8 @@ public class AlarmNotifyThreadSchedule { /** * 监控重试表中数据总量是否到达阈值 */ - @Scheduled(cron = "0 0/10 * * * ?") - @SchedulerLock(name = "retryTaskMoreThreshold", lockAtMostFor = "PT10M", lockAtLeastFor = "PT10M") +// @Scheduled(cron = "0 0/10 * * * ?") +// @SchedulerLock(name = "retryTaskMoreThreshold", lockAtMostFor = "PT10M", lockAtLeastFor = "PT10M") public void retryTaskMoreThreshold() { LogUtils.info(log, "retryTaskMoreThreshold time[{}] ip:[{}]", LocalDateTime.now(), HostUtils.getIp()); @@ -94,8 +95,8 @@ public class AlarmNotifyThreadSchedule { /** * 监控重试失败数据总量是否到达阈值 */ - @Scheduled(cron = "0 0/11 * * * ?") - @SchedulerLock(name = "retryErrorMoreThreshold", lockAtMostFor = "PT11M", lockAtLeastFor = "PT11M") +// @Scheduled(cron = "0 0/11 * * * ?") +// @SchedulerLock(name = "retryErrorMoreThreshold", lockAtMostFor = "PT11M", lockAtLeastFor = "PT11M") public void retryErrorMoreThreshold() { LogUtils.info(log, "retryErrorMoreThreshold time[{}] ip:[{}]", LocalDateTime.now(), HostUtils.getIp()); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/ClearLogSchedule.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/ClearLogSchedule.java new file mode 100644 index 000000000..22827299d --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/ClearLogSchedule.java @@ -0,0 +1,73 @@ +package com.aizuda.easy.retry.server.support.schedule; + +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.config.SystemProperties; +import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskLogMapper; +import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskLogMessageMapper; +import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTaskLog; +import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTaskLogMessage; +import com.aizuda.easy.retry.server.support.Lifecycle; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; + +/** + * 清理日志 一小时运行一次 + * + * @author: www.byteblogs.com + * @date : 2023-07-21 13:32 + * @since 2.1.0 + */ +@Component +@Slf4j +public class ClearLogSchedule extends AbstractSchedule implements Lifecycle { + + @Autowired + private RetryTaskLogMapper retryTaskLogMapper; + @Autowired + private SystemProperties systemProperties; + @Autowired + private RetryTaskLogMessageMapper retryTaskLogMessageMapper; + + @Override + public String lockName() { + return "clearLog"; + } + + @Override + public String lockAtMost() { + return "PT1H"; + } + + @Override + public String lockAtLeast() { + return "PT1M"; + } + + @Override + protected void doExecute() { + try { + LocalDateTime endTime = LocalDateTime.now().minusDays(systemProperties.getLogStorage()); + retryTaskLogMapper.delete(new LambdaUpdateWrapper().le(RetryTaskLog::getCreateDt, endTime)); + retryTaskLogMessageMapper.delete(new LambdaUpdateWrapper().le(RetryTaskLogMessage::getCreateDt, endTime)); + } catch (Exception e) { + LogUtils.error(log, "clear log error", e); + } + } + + @Override + public void start() { + taskScheduler.scheduleAtFixedRate(this::execute, Duration.parse("PT1H")); + } + + @Override + public void close() { + + } +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/ClearThreadSchedule.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/ClearThreadSchedule.java index 526d6aa42..f58e44c12 100644 --- a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/ClearThreadSchedule.java +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/ClearThreadSchedule.java @@ -33,6 +33,7 @@ import java.util.stream.Collectors; */ @Component @Slf4j +@Deprecated public class ClearThreadSchedule { @Autowired @@ -55,8 +56,8 @@ public class ClearThreadSchedule { /** * 删除过期下线机器 */ - @Scheduled(fixedRate = 5000) - @SchedulerLock(name = "clearOfflineNode", lockAtMostFor = "PT10s", lockAtLeastFor = "PT5s") +// @Scheduled(fixedRate = 5000) +// @SchedulerLock(name = "clearOfflineNode", lockAtMostFor = "PT10s", lockAtLeastFor = "PT5s") public void clearOfflineNode() { try { @@ -86,8 +87,8 @@ public class ClearThreadSchedule { /** * 删除重试完成的和重试到达最大重试次数的数据迁移到死信队列表 */ - @Scheduled(cron = "0 0 0/1 * * ?") - @SchedulerLock(name = "clearFinishAndMoveDeadLetterRetryTask", lockAtMostFor = "PT60s", lockAtLeastFor = "PT60s") +// @Scheduled(cron = "0 0 0/1 * * ?") +// @SchedulerLock(name = "clearFinishAndMoveDeadLetterRetryTask", lockAtMostFor = "PT60s", lockAtLeastFor = "PT60s") public void clearFinishAndMoveDeadLetterRetryTask() { try { @@ -106,8 +107,8 @@ public class ClearThreadSchedule { /** * 清理日志 一小时运行一次 */ - @Scheduled(cron = "0 0 0/1 * * ? ") - @SchedulerLock(name = "clearLog", lockAtMostFor = "PT1H", lockAtLeastFor = "PT1H") +// @Scheduled(cron = "0 0 0/1 * * ? ") +// @SchedulerLock(name = "clearLog", lockAtMostFor = "PT1H", lockAtLeastFor = "PT1H") public void clearLog() { try { LocalDateTime endTime = LocalDateTime.now().minusDays(systemProperties.getLogStorage()); diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/OfflineNodeSchedule.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/OfflineNodeSchedule.java new file mode 100644 index 000000000..ec43b99c1 --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/OfflineNodeSchedule.java @@ -0,0 +1,85 @@ +package com.aizuda.easy.retry.server.support.schedule; + +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.dto.RegisterNodeInfo; +import com.aizuda.easy.retry.server.persistence.mybatis.mapper.ServerNodeMapper; +import com.aizuda.easy.retry.server.support.Lifecycle; +import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable; +import com.aizuda.easy.retry.server.support.register.ServerRegister; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * 删除过期下线机器 + * + * @author: www.byteblogs.com + * @date : 2023-07-21 14:59 + * @since 2.1.0 + */ +@Component +@Slf4j +@RequiredArgsConstructor +public class OfflineNodeSchedule extends AbstractSchedule implements Lifecycle { + private final ServerNodeMapper serverNodeMapper; + + @Override + protected void doExecute() { + + try { + // 删除内存缓存的待下线的机器 + LocalDateTime endTime = LocalDateTime.now().minusSeconds( + ServerRegister.DELAY_TIME + (ServerRegister.DELAY_TIME / 3)); + + // 先删除DB中需要下线的机器 + serverNodeMapper.deleteByExpireAt(endTime); + + Set allPods = CacheRegisterTable.getAllPods(); + Set waitOffline = allPods.stream().filter(registerNodeInfo -> registerNodeInfo.getExpireAt().isBefore(endTime)).collect( + Collectors.toSet()); + Set podIds = waitOffline.stream().map(RegisterNodeInfo::getHostId).collect(Collectors.toSet()); + if (CollectionUtils.isEmpty(podIds)) { + return; + } + + for (final RegisterNodeInfo registerNodeInfo : waitOffline) { + CacheRegisterTable.remove(registerNodeInfo.getGroupName(), registerNodeInfo.getHostId()); + } + + } catch (Exception e) { + LogUtils.error(log, "clearOfflineNode 失败", e); + } + } + + @Override + String lockName() { + return "clearOfflineNode"; + } + + @Override + String lockAtMost() { + return "PT10S"; + } + + @Override + String lockAtLeast() { + return "PT5S"; + } + + @Override + public void start() { + taskScheduler.scheduleWithFixedDelay(this::execute, Instant.now(), Duration.parse("PT5S")); + } + + @Override + public void close() { + + } +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/RetryErrorMoreThresholdAlarmSchedule.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/RetryErrorMoreThresholdAlarmSchedule.java new file mode 100644 index 000000000..4e5fb5615 --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/RetryErrorMoreThresholdAlarmSchedule.java @@ -0,0 +1,111 @@ +package com.aizuda.easy.retry.server.support.schedule; + +import com.aizuda.easy.retry.common.core.alarm.Alarm; +import com.aizuda.easy.retry.common.core.alarm.AlarmContext; +import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory; +import com.aizuda.easy.retry.common.core.constant.SystemConstants.DATE_FORMAT; +import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum; +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.common.core.util.EnvironmentUtils; +import com.aizuda.easy.retry.common.core.util.HostUtils; +import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryDeadLetterMapper; +import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskMapper; +import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig; +import com.aizuda.easy.retry.server.persistence.mybatis.po.NotifyConfig; +import com.aizuda.easy.retry.server.persistence.support.ConfigAccess; +import com.aizuda.easy.retry.server.support.Lifecycle; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.util.List; + +/** + * 监控重试失败数据总量是否到达阈值 + * + * @author: www.byteblogs.com + * @date : 2023-07-21 17:25 + * @since 2.1.0 + */ +@Component +@Slf4j +public class RetryErrorMoreThresholdAlarmSchedule extends AbstractSchedule implements Lifecycle { + private static String retryErrorMoreThresholdTextMessageFormatter = + "{}环境 重试失败数据监控 \n" + + "> 名称:{} \n" + + "> 时间窗口:{} ~ {} \n" + + "> **共计:{}** \n"; + + @Autowired + private RetryDeadLetterMapper retryDeadLetterMapper; + @Autowired + private EasyRetryAlarmFactory easyRetryAlarmFactory; + @Autowired + @Qualifier("configAccessProcessor") + private ConfigAccess configAccess; + + @Override + public void start() { + taskScheduler.scheduleWithFixedDelay(this::execute, Instant.now(), Duration.parse("PT10M")); + } + + @Override + public void close() { + + } + + @Override + protected void doExecute() { + LogUtils.info(log, "retryErrorMoreThreshold time[{}] ip:[{}]", LocalDateTime.now(), HostUtils.getIp()); + + for (GroupConfig groupConfig : configAccess.getAllConfigGroupList()) { + List notifyConfigs = configAccess.getNotifyConfigByGroupName(groupConfig.getGroupName(), NotifySceneEnum.MAX_RETRY_ERROR.getNotifyScene()); + if (CollectionUtils.isEmpty(notifyConfigs)) { + continue; + } + + // x分钟内进入死信队列的数据量 + LocalDateTime now = LocalDateTime.now(); + int count = retryDeadLetterMapper.countRetryDeadLetterByCreateAt(now.minusMinutes(30), now, groupConfig.getGroupPartition()); + + for (NotifyConfig notifyConfig : notifyConfigs) { + if (count > notifyConfig.getNotifyThreshold()) { + // 预警 + AlarmContext context = AlarmContext.build() + .text(retryErrorMoreThresholdTextMessageFormatter, + EnvironmentUtils.getActiveProfile(), + groupConfig.getGroupName(), + now.minusMinutes(30).format(DATE_FORMAT.YYYYMMDDHHMMSS), + now.format(DATE_FORMAT.YYYYMMDDHHMMSS), + count) + .title("组:[{}] 环境重试失败数据监控", groupConfig.getGroupName()) + .notifyAttribute(notifyConfig.getNotifyAttribute()); + + Alarm alarmType = easyRetryAlarmFactory.getAlarmType(notifyConfig.getNotifyType()); + alarmType.asyncSendMessage(context); + } + } + } + } + + @Override + String lockName() { + return "retryErrorMoreThreshold"; + } + + @Override + String lockAtMost() { + return "PT10M"; + } + + @Override + String lockAtLeast() { + return "PT1M"; + } +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/RetryTaskMoreThresholdAlarmSchedule.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/RetryTaskMoreThresholdAlarmSchedule.java new file mode 100644 index 000000000..79c5fe1bd --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/RetryTaskMoreThresholdAlarmSchedule.java @@ -0,0 +1,107 @@ +package com.aizuda.easy.retry.server.support.schedule; + +import com.aizuda.easy.retry.common.core.alarm.Alarm; +import com.aizuda.easy.retry.common.core.alarm.AlarmContext; +import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory; +import com.aizuda.easy.retry.common.core.constant.SystemConstants.DATE_FORMAT; +import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum; +import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.common.core.util.EnvironmentUtils; +import com.aizuda.easy.retry.common.core.util.HostUtils; +import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskMapper; +import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig; +import com.aizuda.easy.retry.server.persistence.mybatis.po.NotifyConfig; +import com.aizuda.easy.retry.server.persistence.support.ConfigAccess; +import com.aizuda.easy.retry.server.support.Lifecycle; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.List; + +/** + * 监控重试表中数据总量是否到达阈值 + * + * @author: www.byteblogs.com + * @date : 2023-07-21 17:25 + * @since 2.1.0 + */ +@Component +@Slf4j +@RequiredArgsConstructor +public class RetryTaskMoreThresholdAlarmSchedule extends AbstractSchedule implements Lifecycle { + private static String retryTaskMoreThresholdTextMessageFormatter = + "{}环境 重试数据监控 \n" + + "> 名称:{} \n" + + "> 时间:{} \n" + + "> **共计:{}** \n"; + + private final RetryTaskMapper retryTaskMapper; + private final EasyRetryAlarmFactory easyRetryAlarmFactory; + @Autowired + @Qualifier("configAccessProcessor") + private ConfigAccess configAccess; + + @Override + public void start() { + taskScheduler.scheduleWithFixedDelay(this::execute, Instant.now(), Duration.parse("PT10M")); + } + + @Override + public void close() { + + } + + @Override + protected void doExecute() { + LogUtils.info(log, "retryTaskMoreThreshold time[{}] ip:[{}]", LocalDateTime.now(), HostUtils.getIp()); + + for (GroupConfig groupConfig : configAccess.getAllConfigGroupList()) { + List notifyConfigs = configAccess.getNotifyConfigByGroupName(groupConfig.getGroupName(), NotifySceneEnum.MAX_RETRY.getNotifyScene()); + if (CollectionUtils.isEmpty(notifyConfigs)) { + continue; + } + + int count = retryTaskMapper.countAllRetryTaskByRetryStatus(groupConfig.getGroupPartition(), RetryStatusEnum.RUNNING.getStatus()); + for (NotifyConfig notifyConfig : notifyConfigs) { + if (count > notifyConfig.getNotifyThreshold()) { + // 预警 + AlarmContext context = AlarmContext.build() + .text(retryTaskMoreThresholdTextMessageFormatter, + EnvironmentUtils.getActiveProfile(), + groupConfig.getGroupName(), + LocalDateTime.now().format(DATE_FORMAT.YYYYMMDDHHMMSS), + count) + .title("组:[{}])重试数据过多", groupConfig.getGroupName()) + .notifyAttribute(notifyConfig.getNotifyAttribute()); + + Alarm alarmType = easyRetryAlarmFactory.getAlarmType(notifyConfig.getNotifyType()); + alarmType.asyncSendMessage(context); + } + } + } + } + + @Override + String lockName() { + return "retryTaskMoreThreshold"; + } + + @Override + String lockAtMost() { + return "PT10M"; + } + + @Override + String lockAtLeast() { + return "PT1M"; + } +} diff --git a/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/RetryTaskSchedule.java b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/RetryTaskSchedule.java new file mode 100644 index 000000000..364e036d9 --- /dev/null +++ b/easy-retry-server/src/main/java/com/aizuda/easy/retry/server/support/schedule/RetryTaskSchedule.java @@ -0,0 +1,70 @@ +package com.aizuda.easy.retry.server.support.schedule; + +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.persistence.support.ConfigAccess; +import com.aizuda.easy.retry.server.service.RetryService; +import com.aizuda.easy.retry.server.support.Lifecycle; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.time.Instant; +import java.util.Set; + +/** + * 删除重试完成的和重试到达最大重试次数的数据迁移到死信队列表 + * + * @author: www.byteblogs.com + * @date : 2023-07-21 17:19 + * @since 2.1.0 + */ +@Component +@Slf4j +@RequiredArgsConstructor +public class RetryTaskSchedule extends AbstractSchedule implements Lifecycle { + private final RetryService retryService; + @Autowired + @Qualifier("configAccessProcessor") + private ConfigAccess configAccess; + @Override + public void start() { + taskScheduler.scheduleWithFixedDelay(this::execute, Instant.now(), Duration.parse("PT1H")); + } + + @Override + public void close() { + + } + + @Override + protected void doExecute() { + try { + Set groupNameList = configAccess.getGroupNameList(); + + for (String groupName : groupNameList) { + retryService.moveDeadLetterAndDelFinish(groupName); + } + + } catch (Exception e) { + LogUtils.error(log, "clearFinishAndMoveDeadLetterRetryTask 失败", e); + } + } + + @Override + String lockName() { + return "clearFinishAndMoveDeadLetterRetryTask"; + } + + @Override + String lockAtMost() { + return "PT60s"; + } + + @Override + String lockAtLeast() { + return "PT60s"; + } +}