feat: 2.1.0

1.分布式锁实现
This commit is contained in:
byteblogs168 2023-07-21 18:48:03 +08:00
parent 82259df805
commit 805e617161
20 changed files with 801 additions and 45 deletions

View File

@ -151,7 +151,20 @@ public class LocalRetryStrategies extends AbstractRetryStrategies {
return Collections.singletonList(new RetryListener() {
@Override
public <V> void onRetry(Attempt<V> attempt) {
LogUtils.info(log,"easy-retry 本地重试,第[{}]次调度", attempt.getAttemptNumber());
RetryType retryType = retryerInfo.getRetryType();
switch (retryType) {
case ONLY_LOCAL:
case LOCAL_REMOTE:
LogUtils.info(log,"[{}]执行本地重试,第[{}]次调度", retryerInfo.getScene(), attempt.getAttemptNumber());
break;
case ONLY_REMOTE:
LogUtils.info(log,"[{}]执行远程重试,第[{}]次调度", retryerInfo.getScene(), attempt.getAttemptNumber());
break;
default:
throw new EasyRetryClientException("异常重试模式 [{}]", retryType.name());
}
}
});
}

View File

@ -1,5 +1,7 @@
package com.aizuda.easy.retry.common.core.constant;
import java.time.format.DateTimeFormatter;
/**
* 系统通用常量
*
@ -72,4 +74,8 @@ public interface SystemConstants {
" |___\\__,_/__/\\_, | |_|_\\___|\\__|_| \\_, |\n" +
" |__/ |__/ \n" +
" :: Easy Retry :: (v{}) \n";
interface DATE_FORMAT {
DateTimeFormatter YYYYMMDDHHMMSS = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
}
}

View File

@ -18,21 +18,22 @@ import java.util.TimeZone;
* @date : 2021-11-22 10:56
*/
@Configuration
@EnableScheduling
@EnableSchedulerLock(defaultLockAtMostFor = "PT30S")
//@EnableScheduling
//@EnableSchedulerLock(defaultLockAtMostFor = "PT30S")
@Deprecated
public class ShedlockConfig {
// @Bean
public LockProvider lockProvider(DataSource dataSource) {
return new JdbcTemplateLockProvider(
JdbcTemplateLockProvider.Configuration.builder()
.withJdbcTemplate(new JdbcTemplate(dataSource))
.withTimeZone(TimeZone.getTimeZone("Asia/Shanghai"))
.build()
);
}
// public LockProvider lockProvider(DataSource dataSource) {
// return new JdbcTemplateLockProvider(
// JdbcTemplateLockProvider.Configuration.builder()
// .withJdbcTemplate(new JdbcTemplate(dataSource))
// .withTimeZone(TimeZone.getTimeZone("Asia/Shanghai"))
// .build()
// );
// }
// @Bean
@Bean
public TaskScheduler scheduledExecutorService() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(2);

View File

@ -0,0 +1,53 @@
package com.aizuda.easy.retry.server.dto;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import lombok.Getter;
import java.time.Duration;
import java.time.LocalDateTime;
/**
* @author: www.byteblogs.com
* @date : 2023-07-21 08:43
* @since 2.1.0
*/
public class LockConfig {
private final LocalDateTime createDt;
private final String lockName;
private final Duration lockAtMost;
private final Duration lockAtLeast;
public LockConfig(final LocalDateTime createDt, final String lockName, final Duration lockAtMost, final Duration lockAtLeast) {
this.lockName = lockName;
this.lockAtMost = lockAtMost;
this.lockAtLeast = lockAtLeast;
this.createDt = createDt;
Assert.notNull(createDt, () -> new EasyRetryServerException("createDt can not be null."));
Assert.notBlank(lockName, () -> new EasyRetryServerException("lockName can not be null."));
Assert.notNull(lockAtMost, () -> new EasyRetryServerException("lockAtMost can not be null. lockName:[{}]", lockName));
Assert.isFalse(lockAtMost.isNegative(), () -> new EasyRetryServerException("lockAtMost is negative. lockName:[{}]", lockName));
Assert.notNull(lockAtLeast, () -> new EasyRetryServerException("lockAtLeast can not be null. lockName:[{}]", lockName));
Assert.isFalse(lockAtLeast.compareTo(lockAtMost) > 0, () -> new EasyRetryServerException("lockAtLeast is longer than lockAtMost for lock. lockName:[{}]", lockName));
}
public LocalDateTime getCreateDt() {
return createDt;
}
public String getLockName() {
return lockName;
}
public LocalDateTime getLockAtMost() {
return createDt.plus(lockAtMost);
}
public LocalDateTime getLockAtLeast() {
return createDt.plus(lockAtLeast);
}
}

View File

@ -0,0 +1,23 @@
package com.aizuda.easy.retry.server.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 锁的存储介质
*
* @author www.byteblogs.com
* @date 2023-06-04
* @since 2.0
*/
@AllArgsConstructor
@Getter
public enum DatabaseProductEnum {
MYSQL("mysql", "MySql数据库"),
MARIADB("mariadb", "MariaDB数据库"),
POSTGRE_SQL("postgresql", "Postgre数据库"),
OTHER("other", "其他数据库");
private final String db;
private final String desc;
}

View File

@ -7,5 +7,5 @@ package com.aizuda.easy.retry.server.persistence.support;
*/
public interface Access {
boolean supports(int type);
boolean supports(String storageMedium);
}

View File

@ -1,7 +1,6 @@
package com.aizuda.easy.retry.server.persistence.support;
import net.javacrumbs.shedlock.core.LockConfiguration;
import org.jetbrains.annotations.NotNull;
import com.aizuda.easy.retry.server.dto.LockConfig;
/**
* @author www.byteblogs.com
@ -10,10 +9,8 @@ import org.jetbrains.annotations.NotNull;
*/
public interface LockAccess extends Access {
boolean insertRecord();
boolean lock(LockConfig lockConfig);
boolean updateRecord();
void unlock();
boolean unlock(LockConfig lockConfig);
}

View File

@ -1,11 +1,52 @@
package com.aizuda.easy.retry.server.persistence.support.access.lock;
import com.aizuda.easy.retry.server.dto.LockConfig;
import com.aizuda.easy.retry.server.persistence.support.LockAccess;
import com.aizuda.easy.retry.server.support.cache.CacheLockRecord;
import org.jetbrains.annotations.NotNull;
/**
* @author www.byteblogs.com
* @date 2023-07-20 22:46:14
* @since
* @since 2.1.0
*/
public abstract class AbstractLockAccess implements LockAccess {
@Override
public boolean lock(final LockConfig lockConfig) {
String lockName = lockConfig.getLockName();
boolean tryToCreateLockRecord = CacheLockRecord.lockRecordRecentlyCreated(lockName);
if (!tryToCreateLockRecord) {
if (doLock(lockConfig)) {
CacheLockRecord.addLockRecord(lockName);
return true;
}
CacheLockRecord.addLockRecord(lockName);
}
try {
return doLockAfter(lockConfig);
} catch (Exception e) {
if (tryToCreateLockRecord) {
CacheLockRecord.remove(lockName);
}
throw e;
}
}
protected boolean doLockAfter(LockConfig lockConfig) {
return updateRecord(lockConfig);
}
protected boolean doLock(@NotNull final LockConfig lockConfig) {
return insertRecord(lockConfig);
}
protected abstract boolean insertRecord(@NotNull final LockConfig lockConfig);
protected abstract boolean updateRecord(@NotNull final LockConfig lockConfig);
}

View File

@ -0,0 +1,106 @@
package com.aizuda.easy.retry.server.persistence.support.access.lock;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.dto.LockConfig;
import com.aizuda.easy.retry.server.enums.DatabaseProductEnum;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.DistributedLockMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.po.DistributedLock;
import com.aizuda.easy.retry.server.support.register.ServerRegister;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.dao.ConcurrencyFailureException;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.jdbc.BadSqlGrammarException;
import org.springframework.jdbc.UncategorizedSQLException;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionSystemException;
import java.time.LocalDateTime;
import java.util.Arrays;
/**
* 基于DB实现的分布式锁
*
* @author: www.byteblogs.com
* @date : 2023-07-21 08:34
* @since 2.1.0
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class JdbcLockAccess extends AbstractLockAccess {
private final DistributedLockMapper distributedLockMapper;
@Override
public boolean supports(final String storageMedium) {
return Arrays.asList(
DatabaseProductEnum.MYSQL.getDb(),
DatabaseProductEnum.MARIADB.getDb(),
DatabaseProductEnum.POSTGRE_SQL.getDb()
).contains(storageMedium);
}
@Override
public boolean unlock(final LockConfig lockConfig) {
LocalDateTime now = lockConfig.getCreateDt();
DistributedLock distributedLock = new DistributedLock();
distributedLock.setLockedBy(ServerRegister.CURRENT_CID);
distributedLock.setLockedAt(now);
LocalDateTime lockAtLeast = lockConfig.getLockAtLeast();
distributedLock.setLockUntil(now.isBefore(lockAtLeast) ? lockAtLeast : now);
for (int i = 0; i < 10; i++) {
try {
return distributedLockMapper.update(distributedLock, new LambdaUpdateWrapper<DistributedLock>()
.eq(DistributedLock::getName, lockConfig.getLockName())) > 0;
} catch (Exception e) {
LogUtils.error(log, "unlock error. retrying attempt [{}] ", i, e);
}
}
return false;
}
@Override
protected boolean insertRecord(@NotNull final LockConfig lockConfig) {
try {
LocalDateTime now = lockConfig.getCreateDt();
DistributedLock distributedLock = new DistributedLock();
distributedLock.setName(lockConfig.getLockName());
distributedLock.setLockedBy(ServerRegister.CURRENT_CID);
distributedLock.setLockedAt(now);
distributedLock.setLockUntil(lockConfig.getLockAtMost());
distributedLock.setCreateDt(now);
distributedLock.setUpdateDt(now);
return distributedLockMapper.insert(distributedLock) > 0;
} catch (DuplicateKeyException | ConcurrencyFailureException | TransactionSystemException e) {
LogUtils.warn(log,"Duplicate key. lockName:[{}]", lockConfig.getLockName());
return false;
} catch (DataIntegrityViolationException | BadSqlGrammarException | UncategorizedSQLException e) {
LogUtils.error(log,"Unexpected exception. lockName:[{}]", lockConfig.getLockName(), e);
return false;
}
}
@Override
protected boolean updateRecord(@NotNull final LockConfig lockConfig) {
LocalDateTime now = lockConfig.getCreateDt();
DistributedLock distributedLock = new DistributedLock();
distributedLock.setLockedBy(ServerRegister.CURRENT_CID);
distributedLock.setLockedAt(now);
distributedLock.setLockUntil(lockConfig.getLockAtMost());
return distributedLockMapper.update(distributedLock, new LambdaUpdateWrapper<DistributedLock>()
.eq(DistributedLock::getName, lockConfig.getLockName())
.le(DistributedLock::getLockUntil, now)) > 0;
}
}

View File

@ -0,0 +1,12 @@
package com.aizuda.easy.retry.server.support;
/**
* @author: www.byteblogs.com
* @date : 2023-07-21 13:02
* @since 2.1.0
*/
public interface Schedule {
void execute();
}

View File

@ -6,6 +6,9 @@ import com.aizuda.easy.retry.server.support.Lifecycle;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.Set;
@ -17,6 +20,8 @@ import java.util.WeakHashMap;
* @since 2.1.0
*/
@Slf4j
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class CacheLockRecord implements Lifecycle {
private static Cache<String, String> CACHE;
@ -32,6 +37,10 @@ public class CacheLockRecord implements Lifecycle {
return CACHE.size();
}
public static void remove(String lockName) {
CACHE.invalidate(lockName);
}
public static void clear() {
CACHE.invalidateAll();
}

View File

@ -1,6 +1,5 @@
package com.aizuda.easy.retry.server.support.handler;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.DistributedLockMapper;
import com.aizuda.easy.retry.server.persistence.support.LockAccess;
import com.aizuda.easy.retry.server.support.cache.CacheLockRecord;
import org.springframework.beans.factory.annotation.Autowired;
@ -16,19 +15,5 @@ public class DistributedLockHandler {
@Autowired
private LockAccess lockAccess;
public boolean lock(String lockName) {
// 先本地缓存锁定信息
if (!CacheLockRecord.lockRecordRecentlyCreated(lockName)) {
if (lockAccess.insertRecord()) {
CacheLockRecord.addLockRecord(lockName);
return true;
}
// we were not to create the record, it already exists, let's put it to the cache so we do not try again
CacheLockRecord.addLockRecord(lockName);
}
return lockAccess.updateRecord();
}
}

View File

@ -0,0 +1,62 @@
package com.aizuda.easy.retry.server.support.schedule;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.dto.LockConfig;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.persistence.support.LockAccess;
import com.aizuda.easy.retry.server.support.Schedule;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.TaskScheduler;
import java.time.Duration;
import java.time.LocalDateTime;
/**
* @author: www.byteblogs.com
* @date : 2023-07-21 13:04
* @since 2.1.0
*/
@Slf4j
public abstract class AbstractSchedule implements Schedule {
@Autowired
@Qualifier("scheduledExecutorService")
protected TaskScheduler taskScheduler;
@Autowired
private LockAccess lockAccess;
@Override
public void execute() {
String lockName = lockName();
String lockAtMost = lockAtMost();
String lockAtLeast = lockAtLeast();
Assert.notBlank(lockAtMost, () -> new EasyRetryServerException("lockAtLeast can not be null."));
Assert.notBlank(lockAtLeast, () -> new EasyRetryServerException("lockAtLeast can not be null."));
Assert.notBlank(lockName, () -> new EasyRetryServerException("lockName can not be null."));
LockConfig lockConfig = new LockConfig(LocalDateTime.now(), lockName, Duration.parse(lockAtMost), Duration.parse(lockAtLeast));
try {
if (lockAccess.lock(lockConfig)) {
doExecute();
}
} catch (Exception e) {
LogUtils.error(log, this.getClass().getName() + " execute error. lockName:[{}]", lockName, e);
} finally {
lockAccess.unlock(lockConfig);
}
}
protected abstract void doExecute();
abstract String lockName();
abstract String lockAtMost();
abstract String lockAtLeast();
}

View File

@ -31,6 +31,7 @@ import java.util.List;
*/
@Component
@Slf4j
@Deprecated
public class AlarmNotifyThreadSchedule {
private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@ -60,8 +61,8 @@ public class AlarmNotifyThreadSchedule {
/**
* 监控重试表中数据总量是否到达阈值
*/
@Scheduled(cron = "0 0/10 * * * ?")
@SchedulerLock(name = "retryTaskMoreThreshold", lockAtMostFor = "PT10M", lockAtLeastFor = "PT10M")
// @Scheduled(cron = "0 0/10 * * * ?")
// @SchedulerLock(name = "retryTaskMoreThreshold", lockAtMostFor = "PT10M", lockAtLeastFor = "PT10M")
public void retryTaskMoreThreshold() {
LogUtils.info(log, "retryTaskMoreThreshold time[{}] ip:[{}]", LocalDateTime.now(), HostUtils.getIp());
@ -94,8 +95,8 @@ public class AlarmNotifyThreadSchedule {
/**
* 监控重试失败数据总量是否到达阈值
*/
@Scheduled(cron = "0 0/11 * * * ?")
@SchedulerLock(name = "retryErrorMoreThreshold", lockAtMostFor = "PT11M", lockAtLeastFor = "PT11M")
// @Scheduled(cron = "0 0/11 * * * ?")
// @SchedulerLock(name = "retryErrorMoreThreshold", lockAtMostFor = "PT11M", lockAtLeastFor = "PT11M")
public void retryErrorMoreThreshold() {
LogUtils.info(log, "retryErrorMoreThreshold time[{}] ip:[{}]", LocalDateTime.now(), HostUtils.getIp());

View File

@ -0,0 +1,73 @@
package com.aizuda.easy.retry.server.support.schedule;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.config.SystemProperties;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskLogMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskLogMessageMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTaskLog;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTaskLogMessage;
import com.aizuda.easy.retry.server.support.Lifecycle;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
/**
* 清理日志 一小时运行一次
*
* @author: www.byteblogs.com
* @date : 2023-07-21 13:32
* @since 2.1.0
*/
@Component
@Slf4j
public class ClearLogSchedule extends AbstractSchedule implements Lifecycle {
@Autowired
private RetryTaskLogMapper retryTaskLogMapper;
@Autowired
private SystemProperties systemProperties;
@Autowired
private RetryTaskLogMessageMapper retryTaskLogMessageMapper;
@Override
public String lockName() {
return "clearLog";
}
@Override
public String lockAtMost() {
return "PT1H";
}
@Override
public String lockAtLeast() {
return "PT1M";
}
@Override
protected void doExecute() {
try {
LocalDateTime endTime = LocalDateTime.now().minusDays(systemProperties.getLogStorage());
retryTaskLogMapper.delete(new LambdaUpdateWrapper<RetryTaskLog>().le(RetryTaskLog::getCreateDt, endTime));
retryTaskLogMessageMapper.delete(new LambdaUpdateWrapper<RetryTaskLogMessage>().le(RetryTaskLogMessage::getCreateDt, endTime));
} catch (Exception e) {
LogUtils.error(log, "clear log error", e);
}
}
@Override
public void start() {
taskScheduler.scheduleAtFixedRate(this::execute, Duration.parse("PT1H"));
}
@Override
public void close() {
}
}

View File

@ -33,6 +33,7 @@ import java.util.stream.Collectors;
*/
@Component
@Slf4j
@Deprecated
public class ClearThreadSchedule {
@Autowired
@ -55,8 +56,8 @@ public class ClearThreadSchedule {
/**
* 删除过期下线机器
*/
@Scheduled(fixedRate = 5000)
@SchedulerLock(name = "clearOfflineNode", lockAtMostFor = "PT10s", lockAtLeastFor = "PT5s")
// @Scheduled(fixedRate = 5000)
// @SchedulerLock(name = "clearOfflineNode", lockAtMostFor = "PT10s", lockAtLeastFor = "PT5s")
public void clearOfflineNode() {
try {
@ -86,8 +87,8 @@ public class ClearThreadSchedule {
/**
* 删除重试完成的和重试到达最大重试次数的数据迁移到死信队列表
*/
@Scheduled(cron = "0 0 0/1 * * ?")
@SchedulerLock(name = "clearFinishAndMoveDeadLetterRetryTask", lockAtMostFor = "PT60s", lockAtLeastFor = "PT60s")
// @Scheduled(cron = "0 0 0/1 * * ?")
// @SchedulerLock(name = "clearFinishAndMoveDeadLetterRetryTask", lockAtMostFor = "PT60s", lockAtLeastFor = "PT60s")
public void clearFinishAndMoveDeadLetterRetryTask() {
try {
@ -106,8 +107,8 @@ public class ClearThreadSchedule {
/**
* 清理日志 一小时运行一次
*/
@Scheduled(cron = "0 0 0/1 * * ? ")
@SchedulerLock(name = "clearLog", lockAtMostFor = "PT1H", lockAtLeastFor = "PT1H")
// @Scheduled(cron = "0 0 0/1 * * ? ")
// @SchedulerLock(name = "clearLog", lockAtMostFor = "PT1H", lockAtLeastFor = "PT1H")
public void clearLog() {
try {
LocalDateTime endTime = LocalDateTime.now().minusDays(systemProperties.getLogStorage());

View File

@ -0,0 +1,85 @@
package com.aizuda.easy.retry.server.support.schedule;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.ServerNodeMapper;
import com.aizuda.easy.retry.server.support.Lifecycle;
import com.aizuda.easy.retry.server.support.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.support.register.ServerRegister;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.Set;
import java.util.stream.Collectors;
/**
* 删除过期下线机器
*
* @author: www.byteblogs.com
* @date : 2023-07-21 14:59
* @since 2.1.0
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class OfflineNodeSchedule extends AbstractSchedule implements Lifecycle {
private final ServerNodeMapper serverNodeMapper;
@Override
protected void doExecute() {
try {
// 删除内存缓存的待下线的机器
LocalDateTime endTime = LocalDateTime.now().minusSeconds(
ServerRegister.DELAY_TIME + (ServerRegister.DELAY_TIME / 3));
// 先删除DB中需要下线的机器
serverNodeMapper.deleteByExpireAt(endTime);
Set<RegisterNodeInfo> allPods = CacheRegisterTable.getAllPods();
Set<RegisterNodeInfo> waitOffline = allPods.stream().filter(registerNodeInfo -> registerNodeInfo.getExpireAt().isBefore(endTime)).collect(
Collectors.toSet());
Set<String> podIds = waitOffline.stream().map(RegisterNodeInfo::getHostId).collect(Collectors.toSet());
if (CollectionUtils.isEmpty(podIds)) {
return;
}
for (final RegisterNodeInfo registerNodeInfo : waitOffline) {
CacheRegisterTable.remove(registerNodeInfo.getGroupName(), registerNodeInfo.getHostId());
}
} catch (Exception e) {
LogUtils.error(log, "clearOfflineNode 失败", e);
}
}
@Override
String lockName() {
return "clearOfflineNode";
}
@Override
String lockAtMost() {
return "PT10S";
}
@Override
String lockAtLeast() {
return "PT5S";
}
@Override
public void start() {
taskScheduler.scheduleWithFixedDelay(this::execute, Instant.now(), Duration.parse("PT5S"));
}
@Override
public void close() {
}
}

View File

@ -0,0 +1,111 @@
package com.aizuda.easy.retry.server.support.schedule;
import com.aizuda.easy.retry.common.core.alarm.Alarm;
import com.aizuda.easy.retry.common.core.alarm.AlarmContext;
import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory;
import com.aizuda.easy.retry.common.core.constant.SystemConstants.DATE_FORMAT;
import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.EnvironmentUtils;
import com.aizuda.easy.retry.common.core.util.HostUtils;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryDeadLetterMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig;
import com.aizuda.easy.retry.server.persistence.mybatis.po.NotifyConfig;
import com.aizuda.easy.retry.server.persistence.support.ConfigAccess;
import com.aizuda.easy.retry.server.support.Lifecycle;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.List;
/**
* 监控重试失败数据总量是否到达阈值
*
* @author: www.byteblogs.com
* @date : 2023-07-21 17:25
* @since 2.1.0
*/
@Component
@Slf4j
public class RetryErrorMoreThresholdAlarmSchedule extends AbstractSchedule implements Lifecycle {
private static String retryErrorMoreThresholdTextMessageFormatter =
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 重试失败数据监控</font> \n" +
"> 名称:{} \n" +
"> 时间窗口:{} ~ {} \n" +
"> **共计:{}** \n";
@Autowired
private RetryDeadLetterMapper retryDeadLetterMapper;
@Autowired
private EasyRetryAlarmFactory easyRetryAlarmFactory;
@Autowired
@Qualifier("configAccessProcessor")
private ConfigAccess configAccess;
@Override
public void start() {
taskScheduler.scheduleWithFixedDelay(this::execute, Instant.now(), Duration.parse("PT10M"));
}
@Override
public void close() {
}
@Override
protected void doExecute() {
LogUtils.info(log, "retryErrorMoreThreshold time[{}] ip:[{}]", LocalDateTime.now(), HostUtils.getIp());
for (GroupConfig groupConfig : configAccess.getAllConfigGroupList()) {
List<NotifyConfig> notifyConfigs = configAccess.getNotifyConfigByGroupName(groupConfig.getGroupName(), NotifySceneEnum.MAX_RETRY_ERROR.getNotifyScene());
if (CollectionUtils.isEmpty(notifyConfigs)) {
continue;
}
// x分钟内进入死信队列的数据量
LocalDateTime now = LocalDateTime.now();
int count = retryDeadLetterMapper.countRetryDeadLetterByCreateAt(now.minusMinutes(30), now, groupConfig.getGroupPartition());
for (NotifyConfig notifyConfig : notifyConfigs) {
if (count > notifyConfig.getNotifyThreshold()) {
// 预警
AlarmContext context = AlarmContext.build()
.text(retryErrorMoreThresholdTextMessageFormatter,
EnvironmentUtils.getActiveProfile(),
groupConfig.getGroupName(),
now.minusMinutes(30).format(DATE_FORMAT.YYYYMMDDHHMMSS),
now.format(DATE_FORMAT.YYYYMMDDHHMMSS),
count)
.title("组:[{}] 环境重试失败数据监控", groupConfig.getGroupName())
.notifyAttribute(notifyConfig.getNotifyAttribute());
Alarm<AlarmContext> alarmType = easyRetryAlarmFactory.getAlarmType(notifyConfig.getNotifyType());
alarmType.asyncSendMessage(context);
}
}
}
}
@Override
String lockName() {
return "retryErrorMoreThreshold";
}
@Override
String lockAtMost() {
return "PT10M";
}
@Override
String lockAtLeast() {
return "PT1M";
}
}

View File

@ -0,0 +1,107 @@
package com.aizuda.easy.retry.server.support.schedule;
import com.aizuda.easy.retry.common.core.alarm.Alarm;
import com.aizuda.easy.retry.common.core.alarm.AlarmContext;
import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory;
import com.aizuda.easy.retry.common.core.constant.SystemConstants.DATE_FORMAT;
import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.EnvironmentUtils;
import com.aizuda.easy.retry.common.core.util.HostUtils;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.po.GroupConfig;
import com.aizuda.easy.retry.server.persistence.mybatis.po.NotifyConfig;
import com.aizuda.easy.retry.server.persistence.support.ConfigAccess;
import com.aizuda.easy.retry.server.support.Lifecycle;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
/**
* 监控重试表中数据总量是否到达阈值
*
* @author: www.byteblogs.com
* @date : 2023-07-21 17:25
* @since 2.1.0
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class RetryTaskMoreThresholdAlarmSchedule extends AbstractSchedule implements Lifecycle {
private static String retryTaskMoreThresholdTextMessageFormatter =
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 重试数据监控</font> \n" +
"> 名称:{} \n" +
"> 时间:{} \n" +
"> **共计:{}** \n";
private final RetryTaskMapper retryTaskMapper;
private final EasyRetryAlarmFactory easyRetryAlarmFactory;
@Autowired
@Qualifier("configAccessProcessor")
private ConfigAccess configAccess;
@Override
public void start() {
taskScheduler.scheduleWithFixedDelay(this::execute, Instant.now(), Duration.parse("PT10M"));
}
@Override
public void close() {
}
@Override
protected void doExecute() {
LogUtils.info(log, "retryTaskMoreThreshold time[{}] ip:[{}]", LocalDateTime.now(), HostUtils.getIp());
for (GroupConfig groupConfig : configAccess.getAllConfigGroupList()) {
List<NotifyConfig> notifyConfigs = configAccess.getNotifyConfigByGroupName(groupConfig.getGroupName(), NotifySceneEnum.MAX_RETRY.getNotifyScene());
if (CollectionUtils.isEmpty(notifyConfigs)) {
continue;
}
int count = retryTaskMapper.countAllRetryTaskByRetryStatus(groupConfig.getGroupPartition(), RetryStatusEnum.RUNNING.getStatus());
for (NotifyConfig notifyConfig : notifyConfigs) {
if (count > notifyConfig.getNotifyThreshold()) {
// 预警
AlarmContext context = AlarmContext.build()
.text(retryTaskMoreThresholdTextMessageFormatter,
EnvironmentUtils.getActiveProfile(),
groupConfig.getGroupName(),
LocalDateTime.now().format(DATE_FORMAT.YYYYMMDDHHMMSS),
count)
.title("组:[{}])重试数据过多", groupConfig.getGroupName())
.notifyAttribute(notifyConfig.getNotifyAttribute());
Alarm<AlarmContext> alarmType = easyRetryAlarmFactory.getAlarmType(notifyConfig.getNotifyType());
alarmType.asyncSendMessage(context);
}
}
}
}
@Override
String lockName() {
return "retryTaskMoreThreshold";
}
@Override
String lockAtMost() {
return "PT10M";
}
@Override
String lockAtLeast() {
return "PT1M";
}
}

View File

@ -0,0 +1,70 @@
package com.aizuda.easy.retry.server.support.schedule;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.persistence.support.ConfigAccess;
import com.aizuda.easy.retry.server.service.RetryService;
import com.aizuda.easy.retry.server.support.Lifecycle;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.time.Instant;
import java.util.Set;
/**
* 删除重试完成的和重试到达最大重试次数的数据迁移到死信队列表
*
* @author: www.byteblogs.com
* @date : 2023-07-21 17:19
* @since 2.1.0
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class RetryTaskSchedule extends AbstractSchedule implements Lifecycle {
private final RetryService retryService;
@Autowired
@Qualifier("configAccessProcessor")
private ConfigAccess configAccess;
@Override
public void start() {
taskScheduler.scheduleWithFixedDelay(this::execute, Instant.now(), Duration.parse("PT1H"));
}
@Override
public void close() {
}
@Override
protected void doExecute() {
try {
Set<String> groupNameList = configAccess.getGroupNameList();
for (String groupName : groupNameList) {
retryService.moveDeadLetterAndDelFinish(groupName);
}
} catch (Exception e) {
LogUtils.error(log, "clearFinishAndMoveDeadLetterRetryTask 失败", e);
}
}
@Override
String lockName() {
return "clearFinishAndMoveDeadLetterRetryTask";
}
@Override
String lockAtMost() {
return "PT60s";
}
@Override
String lockAtLeast() {
return "PT60s";
}
}