feat: 2.5.0

1. 优化客户端注册
2. 优化告警通知
3. 客户端添加命名空间的配置
This commit is contained in:
byteblogs168 2023-11-24 14:21:48 +08:00
parent 2ff534a094
commit 3427401cc9
19 changed files with 492 additions and 173 deletions

View File

@ -23,6 +23,11 @@ import java.util.Objects;
@Setter
public class EasyRetryProperties {
/**
* 服务端对应的group
*/
private String namespace;
/**
* 服务端对应的group
*/

View File

@ -4,6 +4,7 @@ import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.client.common.cache.GroupVersionCache;
import com.aizuda.easy.retry.client.common.config.EasyRetryProperties;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.enums.HeadersEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
@ -103,6 +104,8 @@ public class NettyChannel {
.set(HeadersEnum.HOST_PORT.getKey(), port)
.set(HeadersEnum.VERSION.getKey(), GroupVersionCache.getVersion())
.set(HeadersEnum.HOST.getKey(), serverConfig.getHost())
.set(HeadersEnum.NAMESPACE.getKey(), Optional.ofNullable(easyRetryProperties.getNamespace()).orElse(
SystemConstants.DEFAULT_NAMESPACE))
;
//发送数据

View File

@ -1,7 +1,5 @@
package com.aizuda.easy.retry.common.core.constant;
import java.time.format.DateTimeFormatter;
/**
* 系统通用常量
*
@ -88,4 +86,9 @@ public interface SystemConstants {
* 延迟30s为了尽可能保障集群节点都启动完成在进行rebalance
*/
Long SCHEDULE_INITIAL_DELAY = 30L;
/**
* 默认名称空间
*/
String DEFAULT_NAMESPACE = "764d604ec6fc45f68cd92514c40e9e1a";
}

View File

@ -17,11 +17,6 @@ public class RetryTaskDTO implements Serializable {
*/
private String groupName;
/**
* namespaceId
*/
private String namespaceId;
/**
* sceneName
*/

View File

@ -62,6 +62,7 @@ public interface ConfigAccess<T> extends Access<T> {
* @param notifyScene {@link NotifySceneEnum} 场景类型
* @return {@link NotifyConfig} 场景配置
*/
@Deprecated
List<NotifyConfig> getNotifyConfigByGroupNameAndSceneName(String groupName, String sceneName, Integer notifyScene);
/**

View File

@ -7,12 +7,15 @@ import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Repository;
import java.time.LocalDateTime;
import java.util.List;
@Mapper
public interface ServerNodeMapper extends BaseMapper<ServerNode> {
int insertOrUpdate(ServerNode record);
int insertOrUpdate(List<ServerNode> records);
int deleteByExpireAt(@Param("endTime") LocalDateTime endTime);
}

View File

@ -20,10 +20,13 @@
<insert id="insertOrUpdate" parameterType="com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode" useGeneratedKeys="true" keyProperty="id">
insert into server_node (id, group_name, host_id, host_ip, host_port,
expire_at, node_type, ext_attrs, context_path, create_dt)
values (#{id,jdbcType=BIGINT}, #{groupName,jdbcType=VARCHAR}, #{hostId,jdbcType=VARCHAR}, #{hostIp,jdbcType=VARCHAR},
values
<foreach collection="records" item="item" index="index" separator="," open="(" close=")">
#{id,jdbcType=BIGINT}, #{groupName,jdbcType=VARCHAR}, #{hostId,jdbcType=VARCHAR}, #{hostIp,jdbcType=VARCHAR},
#{hostPort,jdbcType=INTEGER},
#{expireAt,jdbcType=TIMESTAMP}, #{nodeType,jdbcType=TINYINT}, #{extAttrs,jdbcType=VARCHAR}, #{contextPath,jdbcType=VARCHAR}, #{createDt,jdbcType=TIMESTAMP}
) ON DUPLICATE KEY UPDATE
</foreach>
ON DUPLICATE KEY UPDATE
expire_at = values(`expire_at`)
</insert>
<delete id="deleteByExpireAt">

View File

@ -20,10 +20,13 @@
<insert id="insertOrUpdate" parameterType="com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode" useGeneratedKeys="true" keyProperty="id">
insert into server_node (id, group_name, host_id, host_ip, host_port,
expire_at, node_type, ext_attrs, context_path, create_dt)
values (#{id,jdbcType=BIGINT}, #{groupName,jdbcType=VARCHAR}, #{hostId,jdbcType=VARCHAR}, #{hostIp,jdbcType=VARCHAR},
values
<foreach collection="records" item="item" index="index" separator="," open="(" close=")">
#{id,jdbcType=BIGINT}, #{groupName,jdbcType=VARCHAR}, #{hostId,jdbcType=VARCHAR}, #{hostIp,jdbcType=VARCHAR},
#{hostPort,jdbcType=INTEGER},
#{expireAt,jdbcType=TIMESTAMP}, #{nodeType,jdbcType=TINYINT}, #{extAttrs,jdbcType=VARCHAR}, #{contextPath,jdbcType=VARCHAR}, #{createDt,jdbcType=TIMESTAMP}
) ON DUPLICATE KEY UPDATE
</foreach>
ON DUPLICATE KEY UPDATE
expire_at = values(`expire_at`)
</insert>
<delete id="deleteByExpireAt">

View File

@ -1,16 +0,0 @@
package com.aizuda.easy.retry.server.common;
import com.google.common.cache.Cache;
import com.google.common.util.concurrent.RateLimiter;
/**
* 流量控制
* @author: zuoJunLin
* @date : 2023-11-21 13:04
* @since 2.5.0
*/
public interface FlowControl {
RateLimiter getRateLimiter(Cache<String, RateLimiter> rateLimiterCache, String key, double rateLimiterThreshold);
}

View File

@ -1,4 +1,4 @@
package com.aizuda.easy.retry.server.retry.task.support.cache;
package com.aizuda.easy.retry.server.common.cache;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
@ -43,6 +43,15 @@ public class CacheNotifyRateLimiter implements Lifecycle {
return CACHE.getIfPresent(key);
}
/**
* 获取所有缓存
*
* @return 缓存对象
*/
public static void put(String key, RateLimiter rateLimiter) {
CACHE.put(key, rateLimiter);
}
@Override
public void start() {
LogUtils.info(log, "CacheNotifyRateLimiter start");

View File

@ -1,8 +1,36 @@
package com.aizuda.easy.retry.server.common.flow.control;
import com.aizuda.easy.retry.server.common.FlowControl;
import com.google.common.cache.Cache;
import com.aizuda.easy.retry.common.core.alarm.Alarm;
import com.aizuda.easy.retry.common.core.alarm.AlarmContext;
import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory;
import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.cache.CacheNotifyRateLimiter;
import com.aizuda.easy.retry.server.common.triple.ImmutableTriple;
import com.aizuda.easy.retry.server.common.triple.Triple;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.util.concurrent.RateLimiter;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
@ -10,13 +38,110 @@ import java.util.Objects;
* @date : 2023-11-21 13:04
* @since 2.5.0
*/
public abstract class AbstractFlowControl implements FlowControl {
@Slf4j
public abstract class AbstractFlowControl<E extends ApplicationEvent> implements ApplicationListener<E> {
public RateLimiter getRateLimiter(Cache<String, RateLimiter> rateLimiterCache, String key, double rateLimiterThreshold) {
RateLimiter rateLimiter = rateLimiterCache.getIfPresent(key);
if (Objects.isNull(rateLimiter)||rateLimiter.getRate()!=rateLimiterThreshold) {
rateLimiterCache.put(key, RateLimiter.create(rateLimiterThreshold));
@Autowired
private EasyRetryAlarmFactory easyRetryAlarmFactory;
@Autowired
protected AccessTemplate accessTemplate;
protected RateLimiter getRateLimiter(String key, double rateLimiterThreshold) {
RateLimiter rateLimiter = CacheNotifyRateLimiter.getRateLimiterByKey(key);
if (Objects.isNull(rateLimiter) || rateLimiter.getRate() != rateLimiterThreshold) {
CacheNotifyRateLimiter.put(key, RateLimiter.create(rateLimiterThreshold));
}
return rateLimiterCache.getIfPresent(key);
return rateLimiter;
}
protected Map<Triple<String, String, String>, List<NotifyConfig>> getNotifyConfigMap(final Set<String> namespaceIds,
final Set<String> groupNames, final Set<String> sceneNames) {
// 批量获取所需的通知配置
List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess().list(
new LambdaQueryWrapper<NotifyConfig>()
.eq(NotifyConfig::getNotifyStatus, StatusEnum.YES)
.eq(NotifyConfig::getNotifyScene, NotifySceneEnum.RETRY_TASK_ENTER_DEAD_LETTER.getNotifyScene())
.in(NotifyConfig::getNamespaceId, namespaceIds)
.in(NotifyConfig::getGroupName, groupNames)
.in(NotifyConfig::getSceneName, sceneNames)
);
if (CollectionUtils.isEmpty(notifyConfigs)) {
return null;
}
return notifyConfigs.stream()
.collect(Collectors.groupingBy(i -> {
String namespaceId = i.getNamespaceId();
String groupName = i.getGroupName();
String sceneName = i.getSceneName();
return ImmutableTriple.of(namespaceId, groupName, sceneName);
}));
}
protected void doSendAlarm(Triple<String, String, String> key,
List<NotifyConfig> notifyConfigsList,
AlarmDTO alarmDTO
) {
for (final NotifyConfig notifyConfig : notifyConfigsList) {
if (Objects.equals(notifyConfig.getRateLimiterStatus(), StatusEnum.YES.getStatus())) {
// 限流
RateLimiter rateLimiter = getRateLimiter(String.valueOf(notifyConfig.getId()),
notifyConfig.getRateLimiterThreshold());
// 每秒发送rateLimiterThreshold个告警
if (Objects.nonNull(rateLimiter) && !rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) {
LogUtils.warn(log,
"namespaceId:[{}] groupName:[{}] senceName:[{}] idempotentId:[{}] 任务重试失败进入死信队列已到达最大限流阈值,本次通知不执行",
key.getLeft(), key.getMiddle(), key.getRight(),
alarmDTO.getIdempotentId());
continue;
}
}
AlarmContext context = buildAlarmContext(alarmDTO, notifyConfig);
Alarm<AlarmContext> alarmType = easyRetryAlarmFactory.getAlarmType(
notifyConfig.getNotifyType());
alarmType.asyncSendMessage(context);
}
}
protected abstract AlarmContext buildAlarmContext(final AlarmDTO alarmDTO, NotifyConfig notifyConfig);
@Data
public static class AlarmDTO {
private String uniqueId;
private String groupName;
private String sceneName;
private String idempotentId;
private String bizNo;
private String executorName;
private String argsStr;
private Integer retryCount;
private LocalDateTime createDt;
}
@Mapper
public interface AlarmDTOConverter {
AlarmDTOConverter INSTANCE = Mappers.getMapper(AlarmDTOConverter.class);
AlarmDTO toAlarmDTO(RetryDeadLetter retryDeadLetter);
AlarmDTO toAlarmDTO(RetryTask retryTask);
}
}

View File

@ -10,6 +10,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import java.time.LocalDateTime;
import java.util.List;
/**
* @author www.byteblogs.com
@ -32,16 +33,23 @@ public abstract class AbstractRegister implements Register, Lifecycle {
return doRegister(context, serverNode);
}
protected void refreshExpireAt(ServerNode serverNode) {
protected void refreshExpireAt(List<ServerNode> serverNodes) {
try {
for (final ServerNode serverNode : serverNodes) {
serverNode.setExpireAt(getExpireAt());
serverNodeMapper.insertOrUpdate(serverNode);
}
serverNodeMapper.insertOrUpdate(serverNodes);
for (final ServerNode serverNode : serverNodes) {
// 刷新本地缓存过期时间
CacheRegisterTable.refreshExpireAt(serverNode);
}
}catch (Exception e) {
LogUtils.error(log,"注册节点失败 groupName:[{}] hostIp:[{}]",
serverNode.getGroupName(), serverNode.getHostIp(), e);
LogUtils.error(log,"注册节点失败", e);
}
}

View File

@ -5,13 +5,17 @@ import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
@ -19,6 +23,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* 客户端注册
@ -35,7 +40,7 @@ public class ClientRegister extends AbstractRegister implements Runnable {
public static final int DELAY_TIME = 30;
private Thread THREAD = null;
protected static final LinkedBlockingDeque<ServerNode> QUEUE = new LinkedBlockingDeque<>(256);
protected static final LinkedBlockingDeque<ServerNode> QUEUE = new LinkedBlockingDeque<>(1000);
@Override
public boolean supports(int type) {
@ -84,7 +89,16 @@ public class ClientRegister extends AbstractRegister implements Runnable {
try {
ServerNode serverNode = QUEUE.poll(5L, TimeUnit.SECONDS);
if (Objects.nonNull(serverNode)) {
refreshExpireAt(serverNode);
List<ServerNode> lists = Lists.newArrayList(serverNode);
QUEUE.drainTo(lists, 100);
// 去重
lists = new ArrayList<>(lists.stream()
.collect(
Collectors.toMap(ServerNode::getHostId, node -> node, (existing, replacement) -> existing))
.values());
refreshExpireAt(lists);
}
// 同步当前POD消费的组的节点信息
@ -102,6 +116,8 @@ public class ClientRegister extends AbstractRegister implements Runnable {
}
}
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
} catch (Exception e) {
LogUtils.error(log, "client refresh expireAt error.");
} finally {
@ -109,6 +125,7 @@ public class ClientRegister extends AbstractRegister implements Runnable {
try {
TimeUnit.MILLISECONDS.sleep(5000);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
}

View File

@ -12,6 +12,7 @@ import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.ServerNodeExtAttrs;
import com.aizuda.easy.retry.server.common.handler.ServerNodeBalance;
import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.web.ServerProperties;
@ -76,7 +77,7 @@ public class ServerRegister extends AbstractRegister {
@Override
protected boolean doRegister(RegisterContext context, ServerNode serverNode) {
refreshExpireAt(serverNode);
refreshExpireAt(Lists.newArrayList(serverNode));
return Boolean.TRUE;
}

View File

@ -0,0 +1,41 @@
package com.aizuda.easy.retry.server.common.triple;
/**
* @author: xiaowoniu
* @date : 2023-11-24 08:56
* @since : 2.5.0
*/
public final class ImmutableTriple<L, M, R> extends Triple<L, M, R> {
private static final long serialVersionUID = 1L;
public final L left;
public final M middle;
public final R right;
public static <L, M, R> ImmutableTriple<L, M, R> of(final L left, final M middle, final R right) {
return new ImmutableTriple<>(left, middle, right);
}
public ImmutableTriple(final L left, final M middle, final R right) {
super();
this.left = left;
this.middle = middle;
this.right = right;
}
@Override
public L getLeft() {
return left;
}
@Override
public M getMiddle() {
return middle;
}
@Override
public R getRight() {
return right;
}
}

View File

@ -0,0 +1,66 @@
package com.aizuda.easy.retry.server.common.triple;
import cn.hutool.core.builder.CompareToBuilder;
import java.io.Serializable;
import java.util.Objects;
/**
* @author: xiaowoniu
* @date : 2023-11-24 08:54
* @since : 2.5.0
*/
public abstract class Triple<L, M, R> implements Comparable<Triple<L, M, R>>, Serializable {
private static final long serialVersionUID = 1L;
public static <L, M, R> Triple<L, M, R> of(final L left, final M middle, final R right) {
return new ImmutableTriple<>(left, middle, right);
}
public abstract L getLeft();
public abstract M getMiddle();
public abstract R getRight();
@Override
public int compareTo(final Triple<L, M, R> other) {
return new CompareToBuilder().append(getLeft(), other.getLeft())
.append(getMiddle(), other.getMiddle())
.append(getRight(), other.getRight()).toComparison();
}
@Override
public boolean equals(final Object obj) {
if (obj == this) {
return true;
}
if (obj instanceof Triple<?, ?, ?>) {
final Triple<?, ?, ?> other = (Triple<?, ?, ?>) obj;
return Objects.equals(getLeft(), other.getLeft())
&& Objects.equals(getMiddle(), other.getMiddle())
&& Objects.equals(getRight(), other.getRight());
}
return false;
}
@Override
public int hashCode() {
return (getLeft() == null ? 0 : getLeft().hashCode()) ^
(getMiddle() == null ? 0 : getMiddle().hashCode()) ^
(getRight() == null ? 0 : getRight().hashCode());
}
@Override
public String toString() {
return "(" + getLeft() + "," + getMiddle() + "," + getRight() + ")";
}
public String toString(final String format) {
return String.format(format, getLeft(), getMiddle(), getRight());
}
}

View File

@ -34,7 +34,7 @@ public class ConfigVersionSyncHandler implements Lifecycle, Runnable {
private static final LinkedBlockingQueue<ConfigSyncTask> QUEUE = new LinkedBlockingQueue<>(256);
public static final String URL = "http://{0}:{1}/{2}/retry/sync/version/v1";
public static Thread THREAD = null;
public Thread THREAD = null;
@Autowired
private RestTemplate restTemplate;
@Autowired

View File

@ -1,44 +1,44 @@
package com.aizuda.easy.retry.server.retry.task.support.listener;
import com.aizuda.easy.retry.common.core.alarm.Alarm;
import com.aizuda.easy.retry.common.core.alarm.AlarmContext;
import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory;
import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.EnvironmentUtils;
import com.aizuda.easy.retry.common.core.util.HostUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.flow.control.AbstractFlowControl;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.retry.task.support.cache.CacheNotifyRateLimiter;
import com.aizuda.easy.retry.server.common.triple.ImmutableTriple;
import com.aizuda.easy.retry.server.common.triple.Triple;
import com.aizuda.easy.retry.server.retry.task.support.event.RetryTaskFailDeadLetterAlarmEvent;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter;
import com.google.common.cache.Cache;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* 重试任务失败进入死信队列监听器
*
* @author: zuoJunLin
* @date : 2023-11-20 21:40
* @since 2.5.0
*/
@Component
@Slf4j
public class RetryTaskFailDeadLetterAlarmListener extends AbstractFlowControl implements Runnable, Lifecycle, ApplicationListener<RetryTaskFailDeadLetterAlarmEvent> {
public class RetryTaskFailDeadLetterAlarmListener extends AbstractFlowControl<RetryTaskFailDeadLetterAlarmEvent> implements Runnable, Lifecycle {
/**
* 死信告警数据
@ -53,82 +53,102 @@ public class RetryTaskFailDeadLetterAlarmListener extends AbstractFlowControl im
"> 业务数据:{} \n" +
"> 时间:{} \n";
@Autowired
private EasyRetryAlarmFactory easyRetryAlarmFactory;
@Autowired
protected AccessTemplate accessTemplate;
private Thread thread;
@Override
public void start() {
new Thread(this).start();
thread = new Thread(this);
thread.start();
}
@Override
public void close() {
if (Objects.nonNull(thread)) {
thread.interrupt();
}
}
@Override
public void run() {
LogUtils.info(log, "RetryTaskFailDeadLetterAlarmListener time[{}] ip:[{}]", LocalDateTime.now(), HostUtils.getIp());
for (; ; ) {
LogUtils.info(log, "RetryTaskFailDeadLetterAlarmListener time[{}] ip:[{}]", LocalDateTime.now(),
HostUtils.getIp());
while (!Thread.currentThread().isInterrupted()) {
try {
List<RetryDeadLetter> allRetryDeadLetterList = queue.poll(5, TimeUnit.SECONDS);
if (CollectionUtils.isEmpty(allRetryDeadLetterList)) {
continue;
}
//组分组
Map<String, List<RetryDeadLetter>> groupNameMap = allRetryDeadLetterList.stream()
.collect(Collectors.groupingBy(RetryDeadLetter::getGroupName));
groupNameMap.forEach(((groupName, groupRetryDeadLetterList) -> {
//场景分组
Map<String, List<RetryDeadLetter>> sceneNameMap = groupRetryDeadLetterList.stream()
.collect(Collectors.groupingBy(RetryDeadLetter::getSceneName));
sceneNameMap.forEach(((sceneName, sceneRetryDeadLetterList) -> {
//获取通知配置
List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess().getNotifyConfigByGroupNameAndSceneName(groupName, sceneName, NotifySceneEnum.RETRY_TASK_ENTER_DEAD_LETTER.getNotifyScene());
for (RetryDeadLetter retryDeadLetter : sceneRetryDeadLetterList) {
for (NotifyConfig notifyConfig : notifyConfigs) {
if (Objects.equals(notifyConfig.getRateLimiterStatus(),StatusEnum.YES.getStatus())) {
//限流
RateLimiter rateLimiter = getRateLimiter(CacheNotifyRateLimiter.getAll(), String.valueOf(notifyConfig.getId()), notifyConfig.getRateLimiterThreshold());
if (Objects.nonNull(rateLimiter) && !rateLimiter.tryAcquire(100, TimeUnit.MILLISECONDS)) {
LogUtils.warn(log, "组:[{}] 场景:[{}] 幂等id:[{}] 任务重试失败进入死信队列已到达最大限流阈值,本次通知不执行", groupName, sceneName, retryDeadLetter.getIdempotentId());
Set<String> namespaceIds = new HashSet<>();
Set<String> groupNames = new HashSet<>();
Set<String> sceneNames = new HashSet<>();
Map<Triple<String, String, String>, List<RetryDeadLetter>> retryDeadLetterMap = getRetryDeadLetterMap(
allRetryDeadLetterList, namespaceIds, groupNames, sceneNames);
Map<Triple<String, String, String>, List<NotifyConfig>> notifyConfigMap = getNotifyConfigMap(
namespaceIds, groupNames, sceneNames);
if (notifyConfigMap == null) {
continue;
}
// 循环发送消息
retryDeadLetterMap.forEach((key, list) -> {
List<NotifyConfig> notifyConfigsList = notifyConfigMap.get(key);
for (RetryDeadLetter retryDeadLetter : list) {
doSendAlarm(key, notifyConfigsList, AlarmDTOConverter.INSTANCE.toAlarmDTO(retryDeadLetter));
}
// 预警
AlarmContext context = AlarmContext.build().text(retryTaskDeadTextMessagesFormatter,
EnvironmentUtils.getActiveProfile(),
retryDeadLetter.getGroupName(),
retryDeadLetter.getExecutorName(),
retryDeadLetter.getSceneName(),
retryDeadLetter.getArgsStr(),
DateUtils.format(retryDeadLetter.getCreateDt(), DateUtils.NORM_DATETIME_PATTERN))
.title("{}环境 重试任务失败进入死信队列", EnvironmentUtils.getActiveProfile())
.notifyAttribute(notifyConfig.getNotifyAttribute());
Alarm<AlarmContext> alarmType = easyRetryAlarmFactory.getAlarmType(notifyConfig.getNotifyType());
alarmType.asyncSendMessage(context);
}
}
}));
}));
});
} catch (InterruptedException e) {
LogUtils.info(log, "retry task fail dead letter alarm stop");
Thread.currentThread().interrupt();
} catch (Exception e) {
LogUtils.error(log, "RetryTaskFailDeadLetterAlarmListener queue poll Exception", e);
}
}
}
@NotNull
private static Map<Triple<String, String, String>, List<RetryDeadLetter>> getRetryDeadLetterMap(
final List<RetryDeadLetter> allRetryDeadLetterList, final Set<String> namespaceIds,
final Set<String> groupNames, final Set<String> sceneNames) {
return allRetryDeadLetterList.stream()
.collect(Collectors.groupingBy(i -> {
String namespaceId = i.getNamespaceId();
String groupName = i.getGroupName();
String sceneName = i.getSceneName();
namespaceIds.add(namespaceId);
groupNames.add(groupName);
sceneNames.add(sceneName);
return ImmutableTriple.of(namespaceId, groupName, sceneName);
}));
}
@Override
public void onApplicationEvent(RetryTaskFailDeadLetterAlarmEvent event) {
try {
queue.put(event.getRetryDeadLetters());
} catch (InterruptedException e) {
LogUtils.error(log, "RetryTaskFailDeadLetterAlarmListener queue push Exception", e);
if (queue.offer(event.getRetryDeadLetters())) {
LogUtils.warn(log, "任务重试失败进入死信队列告警队列已满");
}
}
@Override
protected AlarmContext buildAlarmContext(final AlarmDTO alarmDTO, final NotifyConfig notifyConfig) {
// 预警
return AlarmContext.build().text(retryTaskDeadTextMessagesFormatter,
EnvironmentUtils.getActiveProfile(),
alarmDTO.getGroupName(),
alarmDTO.getExecutorName(),
alarmDTO.getSceneName(),
alarmDTO.getArgsStr(),
DateUtils.format(alarmDTO.getCreateDt(), DateUtils.NORM_DATETIME_PATTERN))
.title("组:[{}] 场景:[{}] 环境重试任务失败进入死信队列",
alarmDTO.getGroupName(), alarmDTO.getSceneName())
.notifyAttribute(notifyConfig.getNotifyAttribute());
}
}

View File

@ -1,47 +1,49 @@
package com.aizuda.easy.retry.server.retry.task.support.listener;
import com.aizuda.easy.retry.common.core.alarm.Alarm;
import com.aizuda.easy.retry.common.core.alarm.AlarmContext;
import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory;
import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.EnvironmentUtils;
import com.aizuda.easy.retry.common.core.util.HostUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.flow.control.AbstractFlowControl;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.retry.task.support.cache.CacheNotifyRateLimiter;
import com.aizuda.easy.retry.server.common.triple.ImmutableTriple;
import com.aizuda.easy.retry.server.common.triple.Triple;
import com.aizuda.easy.retry.server.retry.task.support.event.RetryTaskFailMoreThresholdAlarmEvent;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.jetbrains.annotations.NotNull;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* 重试任务失败数量超过阈值监听器
*
* @author: zuoJunLin
* @date : 2023-11-20 21:40
* @since 2.5.0
*/
@Component
@Slf4j
public class RetryTaskFailMoreThresholdAlarmListener extends AbstractFlowControl implements Runnable, Lifecycle, ApplicationListener<RetryTaskFailMoreThresholdAlarmEvent> {
public class RetryTaskFailMoreThresholdAlarmListener extends
AbstractFlowControl<RetryTaskFailMoreThresholdAlarmEvent> implements Runnable, Lifecycle {
/**
* 重试任务失败数量超过阈值告警数据
*/
private LinkedBlockingQueue<RetryTask> queue = new LinkedBlockingQueue<>(1000);
private static String retryTaskDeadTextMessagesFormatter =
private Thread thread;
private static String retryTaskFailMoreThresholdMessagesFormatter =
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 重试任务失败数量超过阈值</font> \n" +
"> 组名称:{} \n" +
"> 执行器名称:{} \n" +
@ -50,60 +52,55 @@ public class RetryTaskFailMoreThresholdAlarmListener extends AbstractFlowControl
"> 业务数据:{} \n" +
"> 时间:{} \n";
@Autowired
private EasyRetryAlarmFactory easyRetryAlarmFactory;
@Autowired
protected AccessTemplate accessTemplate;
@Override
public void start() {
new Thread(this).start();
thread = new Thread(this);
thread.start();
}
@Override
public void close() {
if (Objects.nonNull(thread)) {
thread.interrupt();
}
}
@Override
public void run() {
LogUtils.info(log, "RetryTaskFailMoreThresholdAlarmListener time[{}] ip:[{}]", LocalDateTime.now(), HostUtils.getIp());
for (; ; ) {
LogUtils.info(log, "RetryTaskFailMoreThresholdAlarmListener time[{}] ip:[{}]", LocalDateTime.now(),
HostUtils.getIp());
while (!Thread.currentThread().isInterrupted()) {
try {
RetryTask retryTask = queue.poll(5, TimeUnit.SECONDS);
if (Objects.isNull(retryTask)) {
// 无数据时阻塞线程
RetryTask retryTask = queue.take();
// 拉取100条
List<RetryTask> lists = Lists.newArrayList(retryTask);
queue.drainTo(lists, 200);
Set<String> namespaceIds = new HashSet<>();
Set<String> groupNames = new HashSet<>();
Set<String> sceneNames = new HashSet<>();
Map<Triple<String, String, String>, List<RetryTask>> retryTaskMap = getRetryTaskMap(
lists, namespaceIds, groupNames, sceneNames);
Map<Triple<String, String, String>, List<NotifyConfig>> notifyConfigMap = getNotifyConfigMap(
namespaceIds, groupNames, sceneNames);
if (notifyConfigMap == null) {
continue;
}
//获取通知配置
List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess().getNotifyConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName(), NotifySceneEnum.RETRY_TASK_REACH_THRESHOLD.getNotifyScene());
for (NotifyConfig notifyConfig : notifyConfigs) {
//阈值判断
if (retryTask.getRetryCount() >= notifyConfig.getNotifyThreshold()) {
//限流判断
if (Objects.equals(notifyConfig.getRateLimiterStatus(), StatusEnum.YES.getStatus())) {
RateLimiter rateLimiter = getRateLimiter(CacheNotifyRateLimiter.getAll(), String.valueOf(notifyConfig.getId()), notifyConfig.getRateLimiterThreshold());
if (Objects.nonNull(rateLimiter) && !rateLimiter.tryAcquire(100, TimeUnit.MILLISECONDS)) {
LogUtils.warn(log, "组:[{}] 场景:[{}] 通知阈值:[{}] 幂等id:[{}] 重试任务失败数量超过阈值已到达最大限流阈值,本次通知不执行", notifyConfig.getGroupName(), notifyConfig.getSceneName(), notifyConfig.getNotifyThreshold(), retryTask.getIdempotentId());
continue;
}
}
// 预警
AlarmContext context = AlarmContext.build().text(retryTaskDeadTextMessagesFormatter,
EnvironmentUtils.getActiveProfile(),
retryTask.getGroupName(),
retryTask.getExecutorName(),
retryTask.getSceneName(),
retryTask.getRetryCount(),
retryTask.getArgsStr(),
DateUtils.format(retryTask.getCreateDt(), DateUtils.NORM_DATETIME_PATTERN))
.title("{}环境 环境重试任务失败数量超过阈值", EnvironmentUtils.getActiveProfile())
.notifyAttribute(notifyConfig.getNotifyAttribute());
Alarm<AlarmContext> alarmType = easyRetryAlarmFactory.getAlarmType(notifyConfig.getNotifyType());
alarmType.asyncSendMessage(context);
}
// 循环发送消息
retryTaskMap.forEach((key, list) -> {
List<NotifyConfig> notifyConfigsList = notifyConfigMap.get(key);
for (RetryTask retryTask1 : list) {
doSendAlarm(key, notifyConfigsList, AlarmDTOConverter.INSTANCE.toAlarmDTO(retryTask1));
}
});
} catch (InterruptedException e) {
LogUtils.info(log, "retry task fail more alarm stop");
Thread.currentThread().interrupt();
} catch (Exception e) {
LogUtils.error(log, "RetryTaskFailMoreThresholdAlarmListener queue poll Exception", e);
}
@ -112,10 +109,45 @@ public class RetryTaskFailMoreThresholdAlarmListener extends AbstractFlowControl
@Override
public void onApplicationEvent(RetryTaskFailMoreThresholdAlarmEvent event) {
try {
queue.put(event.getRetryTask());
} catch (InterruptedException e) {
LogUtils.error(log, "RetryTaskFailMoreThresholdAlarmListener queue push Exception", e);
if (queue.offer(event.getRetryTask())) {
LogUtils.warn(log, "任务失败数量超过阈值告警队列已满");
}
}
@Override
protected AlarmContext buildAlarmContext(AlarmDTO alarmDTO, NotifyConfig notifyConfig) {
// 预警
return AlarmContext.build().text(retryTaskFailMoreThresholdMessagesFormatter,
EnvironmentUtils.getActiveProfile(),
alarmDTO.getGroupName(),
alarmDTO.getExecutorName(),
alarmDTO.getSceneName(),
alarmDTO.getRetryCount(),
alarmDTO.getArgsStr(),
DateUtils.format(alarmDTO.getCreateDt(), DateUtils.NORM_DATETIME_PATTERN))
.title("组:[{}] 场景:[{}] 环境重试任务失败数量超过阈值", alarmDTO.getGroupName(), alarmDTO.getSceneName())
.notifyAttribute(notifyConfig.getNotifyAttribute());
}
@NotNull
private static Map<Triple<String, String, String>, List<RetryTask>> getRetryTaskMap(
final List<RetryTask> list, final Set<String> namespaceIds,
final Set<String> groupNames, final Set<String> sceneNames) {
return list.stream()
.collect(Collectors.groupingBy(retryTask -> {
String namespaceId = retryTask.getNamespaceId();
String groupName = retryTask.getGroupName();
String sceneName = retryTask.getSceneName();
namespaceIds.add(namespaceId);
groupNames.add(groupName);
sceneNames.add(sceneName);
return ImmutableTriple.of(namespaceId, groupName, sceneName);
}));
}
}