From 3427401cc94a99d31ed1b22d0adf54287fbf00bc Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Fri, 24 Nov 2023 14:21:48 +0800 Subject: [PATCH] =?UTF-8?q?feat:=202.5.0=201.=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E5=AE=A2=E6=88=B7=E7=AB=AF=E6=B3=A8=E5=86=8C=202.=20=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E5=91=8A=E8=AD=A6=E9=80=9A=E7=9F=A5=203.=20=E5=AE=A2?= =?UTF-8?q?=E6=88=B7=E7=AB=AF=E6=B7=BB=E5=8A=A0=E5=91=BD=E5=90=8D=E7=A9=BA?= =?UTF-8?q?=E9=97=B4=E7=9A=84=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/config/EasyRetryProperties.java | 5 + .../client/common/netty/NettyChannel.java | 3 + .../common/core/constant/SystemConstants.java | 7 +- .../retry/server/model/dto/RetryTaskDTO.java | 5 - .../datasource/access/ConfigAccess.java | 1 + .../persistence/mapper/ServerNodeMapper.java | 3 + .../mariadb/mapper/ServerNodeMapper.xml | 11 +- .../mysql/mapper/ServerNodeMapper.xml | 15 +- .../easy/retry/server/common/FlowControl.java | 16 -- .../common}/cache/CacheNotifyRateLimiter.java | 11 +- .../flow/control/AbstractFlowControl.java | 141 ++++++++++++++++- .../common/register/AbstractRegister.java | 22 ++- .../common/register/ClientRegister.java | 29 +++- .../common/register/ServerRegister.java | 3 +- .../server/common/triple/ImmutableTriple.java | 41 +++++ .../retry/server/common/triple/Triple.java | 66 ++++++++ .../handler/ConfigVersionSyncHandler.java | 2 +- .../RetryTaskFailDeadLetterAlarmListener.java | 136 +++++++++------- ...tryTaskFailMoreThresholdAlarmListener.java | 148 +++++++++++------- 19 files changed, 492 insertions(+), 173 deletions(-) delete mode 100644 easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/FlowControl.java rename easy-retry-server/{easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support => easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common}/cache/CacheNotifyRateLimiter.java (85%) create mode 100644 easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/triple/ImmutableTriple.java create mode 100644 easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/triple/Triple.java diff --git a/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/config/EasyRetryProperties.java b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/config/EasyRetryProperties.java index e7bc80c29..383a139a5 100644 --- a/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/config/EasyRetryProperties.java +++ b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/config/EasyRetryProperties.java @@ -23,6 +23,11 @@ import java.util.Objects; @Setter public class EasyRetryProperties { + /** + * 服务端对应的group + */ + private String namespace; + /** * 服务端对应的group */ diff --git a/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/netty/NettyChannel.java b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/netty/NettyChannel.java index 7e7db02ca..9d1db2518 100644 --- a/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/netty/NettyChannel.java +++ b/easy-retry-client/easy-retry-client-common/src/main/java/com/aizuda/easy/retry/client/common/netty/NettyChannel.java @@ -4,6 +4,7 @@ import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.StrUtil; import com.aizuda.easy.retry.client.common.cache.GroupVersionCache; import com.aizuda.easy.retry.client.common.config.EasyRetryProperties; +import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.context.SpringContext; import com.aizuda.easy.retry.common.core.enums.HeadersEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; @@ -103,6 +104,8 @@ public class NettyChannel { .set(HeadersEnum.HOST_PORT.getKey(), port) .set(HeadersEnum.VERSION.getKey(), GroupVersionCache.getVersion()) .set(HeadersEnum.HOST.getKey(), serverConfig.getHost()) + .set(HeadersEnum.NAMESPACE.getKey(), Optional.ofNullable(easyRetryProperties.getNamespace()).orElse( + SystemConstants.DEFAULT_NAMESPACE)) ; //发送数据 diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java index 0b2d6c761..dffeee192 100644 --- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/constant/SystemConstants.java @@ -1,7 +1,5 @@ package com.aizuda.easy.retry.common.core.constant; -import java.time.format.DateTimeFormatter; - /** * 系统通用常量 * @@ -88,4 +86,9 @@ public interface SystemConstants { * 延迟30s为了尽可能保障集群节点都启动完成在进行rebalance */ Long SCHEDULE_INITIAL_DELAY = 30L; + + /** + * 默认名称空间 + */ + String DEFAULT_NAMESPACE = "764d604ec6fc45f68cd92514c40e9e1a"; } diff --git a/easy-retry-common/easy-retry-common-server-api/src/main/java/com/aizuda/easy/retry/server/model/dto/RetryTaskDTO.java b/easy-retry-common/easy-retry-common-server-api/src/main/java/com/aizuda/easy/retry/server/model/dto/RetryTaskDTO.java index bff9576d7..9e3979ab6 100644 --- a/easy-retry-common/easy-retry-common-server-api/src/main/java/com/aizuda/easy/retry/server/model/dto/RetryTaskDTO.java +++ b/easy-retry-common/easy-retry-common-server-api/src/main/java/com/aizuda/easy/retry/server/model/dto/RetryTaskDTO.java @@ -17,11 +17,6 @@ public class RetryTaskDTO implements Serializable { */ private String groupName; - /** - * namespaceId - */ - private String namespaceId; - /** * sceneName */ diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/access/ConfigAccess.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/access/ConfigAccess.java index 42ad993f8..f4275c582 100644 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/access/ConfigAccess.java +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/access/ConfigAccess.java @@ -62,6 +62,7 @@ public interface ConfigAccess extends Access { * @param notifyScene {@link NotifySceneEnum} 场景类型 * @return {@link NotifyConfig} 场景配置 */ + @Deprecated List getNotifyConfigByGroupNameAndSceneName(String groupName, String sceneName, Integer notifyScene); /** diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/ServerNodeMapper.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/ServerNodeMapper.java index 4bbe62a3c..f8599cc62 100644 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/ServerNodeMapper.java +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/ServerNodeMapper.java @@ -7,12 +7,15 @@ import org.apache.ibatis.annotations.Param; import org.springframework.stereotype.Repository; import java.time.LocalDateTime; +import java.util.List; @Mapper public interface ServerNodeMapper extends BaseMapper { int insertOrUpdate(ServerNode record); + int insertOrUpdate(List records); + int deleteByExpireAt(@Param("endTime") LocalDateTime endTime); } diff --git a/easy-retry-datasource/easy-retry-mariadb-datasource/src/main/resources/mariadb/mapper/ServerNodeMapper.xml b/easy-retry-datasource/easy-retry-mariadb-datasource/src/main/resources/mariadb/mapper/ServerNodeMapper.xml index eb7df8dd2..bb113548d 100644 --- a/easy-retry-datasource/easy-retry-mariadb-datasource/src/main/resources/mariadb/mapper/ServerNodeMapper.xml +++ b/easy-retry-datasource/easy-retry-mariadb-datasource/src/main/resources/mariadb/mapper/ServerNodeMapper.xml @@ -20,10 +20,13 @@ insert into server_node (id, group_name, host_id, host_ip, host_port, expire_at, node_type, ext_attrs, context_path, create_dt) - values (#{id,jdbcType=BIGINT}, #{groupName,jdbcType=VARCHAR}, #{hostId,jdbcType=VARCHAR}, #{hostIp,jdbcType=VARCHAR}, - #{hostPort,jdbcType=INTEGER}, - #{expireAt,jdbcType=TIMESTAMP}, #{nodeType,jdbcType=TINYINT}, #{extAttrs,jdbcType=VARCHAR}, #{contextPath,jdbcType=VARCHAR}, #{createDt,jdbcType=TIMESTAMP} - ) ON DUPLICATE KEY UPDATE + values + + #{id,jdbcType=BIGINT}, #{groupName,jdbcType=VARCHAR}, #{hostId,jdbcType=VARCHAR}, #{hostIp,jdbcType=VARCHAR}, + #{hostPort,jdbcType=INTEGER}, + #{expireAt,jdbcType=TIMESTAMP}, #{nodeType,jdbcType=TINYINT}, #{extAttrs,jdbcType=VARCHAR}, #{contextPath,jdbcType=VARCHAR}, #{createDt,jdbcType=TIMESTAMP} + + ON DUPLICATE KEY UPDATE expire_at = values(`expire_at`) diff --git a/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/ServerNodeMapper.xml b/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/ServerNodeMapper.xml index 7f34ed8cd..465502109 100644 --- a/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/ServerNodeMapper.xml +++ b/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/ServerNodeMapper.xml @@ -19,12 +19,15 @@ insert into server_node (id, group_name, host_id, host_ip, host_port, - expire_at, node_type, ext_attrs, context_path, create_dt) - values (#{id,jdbcType=BIGINT}, #{groupName,jdbcType=VARCHAR}, #{hostId,jdbcType=VARCHAR}, #{hostIp,jdbcType=VARCHAR}, - #{hostPort,jdbcType=INTEGER}, - #{expireAt,jdbcType=TIMESTAMP}, #{nodeType,jdbcType=TINYINT}, #{extAttrs,jdbcType=VARCHAR}, #{contextPath,jdbcType=VARCHAR}, #{createDt,jdbcType=TIMESTAMP} - ) ON DUPLICATE KEY UPDATE - expire_at = values(`expire_at`) + expire_at, node_type, ext_attrs, context_path, create_dt) + values + + #{id,jdbcType=BIGINT}, #{groupName,jdbcType=VARCHAR}, #{hostId,jdbcType=VARCHAR}, #{hostIp,jdbcType=VARCHAR}, + #{hostPort,jdbcType=INTEGER}, + #{expireAt,jdbcType=TIMESTAMP}, #{nodeType,jdbcType=TINYINT}, #{extAttrs,jdbcType=VARCHAR}, #{contextPath,jdbcType=VARCHAR}, #{createDt,jdbcType=TIMESTAMP} + + ON DUPLICATE KEY UPDATE + expire_at = values(`expire_at`) delete from server_node diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/FlowControl.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/FlowControl.java deleted file mode 100644 index adf5453af..000000000 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/FlowControl.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.aizuda.easy.retry.server.common; -import com.google.common.cache.Cache; -import com.google.common.util.concurrent.RateLimiter; - -/** - * 流量控制 - * @author: zuoJunLin - * @date : 2023-11-21 13:04 - * @since 2.5.0 - */ -public interface FlowControl { - - RateLimiter getRateLimiter(Cache rateLimiterCache, String key, double rateLimiterThreshold); -} - - diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/cache/CacheNotifyRateLimiter.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheNotifyRateLimiter.java similarity index 85% rename from easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/cache/CacheNotifyRateLimiter.java rename to easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheNotifyRateLimiter.java index 1ae7e3334..636e0c6d2 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/cache/CacheNotifyRateLimiter.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/cache/CacheNotifyRateLimiter.java @@ -1,4 +1,4 @@ -package com.aizuda.easy.retry.server.retry.task.support.cache; +package com.aizuda.easy.retry.server.common.cache; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.common.Lifecycle; @@ -43,6 +43,15 @@ public class CacheNotifyRateLimiter implements Lifecycle { return CACHE.getIfPresent(key); } + /** + * 获取所有缓存 + * + * @return 缓存对象 + */ + public static void put(String key, RateLimiter rateLimiter) { + CACHE.put(key, rateLimiter); + } + @Override public void start() { LogUtils.info(log, "CacheNotifyRateLimiter start"); diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/flow/control/AbstractFlowControl.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/flow/control/AbstractFlowControl.java index 48ee6e6f7..c7db82b69 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/flow/control/AbstractFlowControl.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/flow/control/AbstractFlowControl.java @@ -1,8 +1,36 @@ package com.aizuda.easy.retry.server.common.flow.control; -import com.aizuda.easy.retry.server.common.FlowControl; -import com.google.common.cache.Cache; + +import com.aizuda.easy.retry.common.core.alarm.Alarm; +import com.aizuda.easy.retry.common.core.alarm.AlarmContext; +import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory; +import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum; +import com.aizuda.easy.retry.common.core.enums.StatusEnum; +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.common.cache.CacheNotifyRateLimiter; +import com.aizuda.easy.retry.server.common.triple.ImmutableTriple; +import com.aizuda.easy.retry.server.common.triple.Triple; +import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; +import com.aizuda.easy.retry.template.datasource.persistence.po.NotifyConfig; +import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter; +import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.google.common.util.concurrent.RateLimiter; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.mapstruct.Mapper; +import org.mapstruct.factory.Mappers; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEvent; +import org.springframework.context.ApplicationListener; +import org.springframework.util.CollectionUtils; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** @@ -10,13 +38,110 @@ import java.util.Objects; * @date : 2023-11-21 13:04 * @since 2.5.0 */ -public abstract class AbstractFlowControl implements FlowControl { +@Slf4j +public abstract class AbstractFlowControl implements ApplicationListener { - public RateLimiter getRateLimiter(Cache rateLimiterCache, String key, double rateLimiterThreshold) { - RateLimiter rateLimiter = rateLimiterCache.getIfPresent(key); - if (Objects.isNull(rateLimiter)||rateLimiter.getRate()!=rateLimiterThreshold) { - rateLimiterCache.put(key, RateLimiter.create(rateLimiterThreshold)); + + @Autowired + private EasyRetryAlarmFactory easyRetryAlarmFactory; + + @Autowired + protected AccessTemplate accessTemplate; + + protected RateLimiter getRateLimiter(String key, double rateLimiterThreshold) { + RateLimiter rateLimiter = CacheNotifyRateLimiter.getRateLimiterByKey(key); + if (Objects.isNull(rateLimiter) || rateLimiter.getRate() != rateLimiterThreshold) { + CacheNotifyRateLimiter.put(key, RateLimiter.create(rateLimiterThreshold)); } - return rateLimiterCache.getIfPresent(key); + + return rateLimiter; + } + + protected Map, List> getNotifyConfigMap(final Set namespaceIds, + final Set groupNames, final Set sceneNames) { + + // 批量获取所需的通知配置 + List notifyConfigs = accessTemplate.getNotifyConfigAccess().list( + new LambdaQueryWrapper() + .eq(NotifyConfig::getNotifyStatus, StatusEnum.YES) + .eq(NotifyConfig::getNotifyScene, NotifySceneEnum.RETRY_TASK_ENTER_DEAD_LETTER.getNotifyScene()) + .in(NotifyConfig::getNamespaceId, namespaceIds) + .in(NotifyConfig::getGroupName, groupNames) + .in(NotifyConfig::getSceneName, sceneNames) + ); + + if (CollectionUtils.isEmpty(notifyConfigs)) { + return null; + } + + return notifyConfigs.stream() + .collect(Collectors.groupingBy(i -> { + + String namespaceId = i.getNamespaceId(); + String groupName = i.getGroupName(); + String sceneName = i.getSceneName(); + + return ImmutableTriple.of(namespaceId, groupName, sceneName); + })); + } + + protected void doSendAlarm(Triple key, + List notifyConfigsList, + AlarmDTO alarmDTO + ) { + for (final NotifyConfig notifyConfig : notifyConfigsList) { + if (Objects.equals(notifyConfig.getRateLimiterStatus(), StatusEnum.YES.getStatus())) { + // 限流 + RateLimiter rateLimiter = getRateLimiter(String.valueOf(notifyConfig.getId()), + notifyConfig.getRateLimiterThreshold()); + // 每秒发送rateLimiterThreshold个告警 + if (Objects.nonNull(rateLimiter) && !rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) { + LogUtils.warn(log, + "namespaceId:[{}] groupName:[{}] senceName:[{}] idempotentId:[{}] 任务重试失败进入死信队列已到达最大限流阈值,本次通知不执行", + key.getLeft(), key.getMiddle(), key.getRight(), + alarmDTO.getIdempotentId()); + continue; + } + } + + AlarmContext context = buildAlarmContext(alarmDTO, notifyConfig); + Alarm alarmType = easyRetryAlarmFactory.getAlarmType( + notifyConfig.getNotifyType()); + alarmType.asyncSendMessage(context); + } + } + + protected abstract AlarmContext buildAlarmContext(final AlarmDTO alarmDTO, NotifyConfig notifyConfig); + + @Data + public static class AlarmDTO { + + private String uniqueId; + + private String groupName; + + private String sceneName; + + private String idempotentId; + + private String bizNo; + + private String executorName; + + private String argsStr; + + private Integer retryCount; + + private LocalDateTime createDt; + } + + @Mapper + public interface AlarmDTOConverter { + + AlarmDTOConverter INSTANCE = Mappers.getMapper(AlarmDTOConverter.class); + + AlarmDTO toAlarmDTO(RetryDeadLetter retryDeadLetter); + + AlarmDTO toAlarmDTO(RetryTask retryTask); } } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/AbstractRegister.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/AbstractRegister.java index 792f488cd..8381d5011 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/AbstractRegister.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/AbstractRegister.java @@ -10,6 +10,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import java.time.LocalDateTime; +import java.util.List; /** * @author www.byteblogs.com @@ -32,16 +33,23 @@ public abstract class AbstractRegister implements Register, Lifecycle { return doRegister(context, serverNode); } - protected void refreshExpireAt(ServerNode serverNode) { + protected void refreshExpireAt(List serverNodes) { try { - serverNode.setExpireAt(getExpireAt()); - serverNodeMapper.insertOrUpdate(serverNode); - // 刷新本地缓存过期时间 - CacheRegisterTable.refreshExpireAt(serverNode); + + for (final ServerNode serverNode : serverNodes) { + serverNode.setExpireAt(getExpireAt()); + } + + serverNodeMapper.insertOrUpdate(serverNodes); + + for (final ServerNode serverNode : serverNodes) { + // 刷新本地缓存过期时间 + CacheRegisterTable.refreshExpireAt(serverNode); + } + }catch (Exception e) { - LogUtils.error(log,"注册节点失败 groupName:[{}] hostIp:[{}]", - serverNode.getGroupName(), serverNode.getHostIp(), e); + LogUtils.error(log,"注册节点失败", e); } } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ClientRegister.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ClientRegister.java index f3f8173f2..bbc2b9ec9 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ClientRegister.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ClientRegister.java @@ -5,13 +5,17 @@ import com.aizuda.easy.retry.common.core.enums.NodeTypeEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup; import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; +import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -19,6 +23,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * 客户端注册 @@ -35,7 +40,7 @@ public class ClientRegister extends AbstractRegister implements Runnable { public static final int DELAY_TIME = 30; private Thread THREAD = null; - protected static final LinkedBlockingDeque QUEUE = new LinkedBlockingDeque<>(256); + protected static final LinkedBlockingDeque QUEUE = new LinkedBlockingDeque<>(1000); @Override public boolean supports(int type) { @@ -84,7 +89,16 @@ public class ClientRegister extends AbstractRegister implements Runnable { try { ServerNode serverNode = QUEUE.poll(5L, TimeUnit.SECONDS); if (Objects.nonNull(serverNode)) { - refreshExpireAt(serverNode); + List lists = Lists.newArrayList(serverNode); + QUEUE.drainTo(lists, 100); + + // 去重 + lists = new ArrayList<>(lists.stream() + .collect( + Collectors.toMap(ServerNode::getHostId, node -> node, (existing, replacement) -> existing)) + .values()); + + refreshExpireAt(lists); } // 同步当前POD消费的组的节点信息 @@ -92,16 +106,18 @@ public class ClientRegister extends AbstractRegister implements Runnable { ConcurrentMap allConsumerGroupName = CacheConsumerGroup.getAllConsumerGroupName(); if (!CollectionUtils.isEmpty(allConsumerGroupName)) { List serverNodes = serverNodeMapper.selectList( - new LambdaQueryWrapper() - .eq(ServerNode::getNodeType, NodeTypeEnum.CLIENT.getType()) - .in(ServerNode::getNamespaceId, new HashSet<>(allConsumerGroupName.values())) - .in(ServerNode::getGroupName, allConsumerGroupName.keySet())); + new LambdaQueryWrapper() + .eq(ServerNode::getNodeType, NodeTypeEnum.CLIENT.getType()) + .in(ServerNode::getNamespaceId, new HashSet<>(allConsumerGroupName.values())) + .in(ServerNode::getGroupName, allConsumerGroupName.keySet())); for (final ServerNode node : serverNodes) { // 刷新全量本地缓存 CacheRegisterTable.addOrUpdate(node); } } + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); } catch (Exception e) { LogUtils.error(log, "client refresh expireAt error."); } finally { @@ -109,6 +125,7 @@ public class ClientRegister extends AbstractRegister implements Runnable { try { TimeUnit.MILLISECONDS.sleep(5000); } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); } } } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ServerRegister.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ServerRegister.java index 38507d033..75381c4ac 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ServerRegister.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/ServerRegister.java @@ -12,6 +12,7 @@ import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.dto.ServerNodeExtAttrs; import com.aizuda.easy.retry.server.common.handler.ServerNodeBalance; import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode; +import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.web.ServerProperties; @@ -76,7 +77,7 @@ public class ServerRegister extends AbstractRegister { @Override protected boolean doRegister(RegisterContext context, ServerNode serverNode) { - refreshExpireAt(serverNode); + refreshExpireAt(Lists.newArrayList(serverNode)); return Boolean.TRUE; } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/triple/ImmutableTriple.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/triple/ImmutableTriple.java new file mode 100644 index 000000000..8e82b4cd5 --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/triple/ImmutableTriple.java @@ -0,0 +1,41 @@ +package com.aizuda.easy.retry.server.common.triple; + +/** + * @author: xiaowoniu + * @date : 2023-11-24 08:56 + * @since : 2.5.0 + */ +public final class ImmutableTriple extends Triple { + + private static final long serialVersionUID = 1L; + + public final L left; + public final M middle; + public final R right; + + public static ImmutableTriple of(final L left, final M middle, final R right) { + return new ImmutableTriple<>(left, middle, right); + } + + public ImmutableTriple(final L left, final M middle, final R right) { + super(); + this.left = left; + this.middle = middle; + this.right = right; + } + + @Override + public L getLeft() { + return left; + } + + @Override + public M getMiddle() { + return middle; + } + + @Override + public R getRight() { + return right; + } +} diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/triple/Triple.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/triple/Triple.java new file mode 100644 index 000000000..1e54f16a6 --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/triple/Triple.java @@ -0,0 +1,66 @@ +package com.aizuda.easy.retry.server.common.triple; + +import cn.hutool.core.builder.CompareToBuilder; + +import java.io.Serializable; +import java.util.Objects; + +/** + * @author: xiaowoniu + * @date : 2023-11-24 08:54 + * @since : 2.5.0 + */ +public abstract class Triple implements Comparable>, Serializable { + + private static final long serialVersionUID = 1L; + + public static Triple of(final L left, final M middle, final R right) { + return new ImmutableTriple<>(left, middle, right); + } + + public abstract L getLeft(); + + public abstract M getMiddle(); + + public abstract R getRight(); + + @Override + public int compareTo(final Triple other) { + return new CompareToBuilder().append(getLeft(), other.getLeft()) + .append(getMiddle(), other.getMiddle()) + .append(getRight(), other.getRight()).toComparison(); + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj instanceof Triple) { + final Triple other = (Triple) obj; + return Objects.equals(getLeft(), other.getLeft()) + && Objects.equals(getMiddle(), other.getMiddle()) + && Objects.equals(getRight(), other.getRight()); + } + return false; + } + + @Override + public int hashCode() { + return (getLeft() == null ? 0 : getLeft().hashCode()) ^ + (getMiddle() == null ? 0 : getMiddle().hashCode()) ^ + (getRight() == null ? 0 : getRight().hashCode()); + } + + @Override + public String toString() { + return "(" + getLeft() + "," + getMiddle() + "," + getRight() + ")"; + } + + + public String toString(final String format) { + return String.format(format, getLeft(), getMiddle(), getRight()); + } + + +} diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/handler/ConfigVersionSyncHandler.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/handler/ConfigVersionSyncHandler.java index 20bccf12c..338b827c7 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/handler/ConfigVersionSyncHandler.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/handler/ConfigVersionSyncHandler.java @@ -34,7 +34,7 @@ public class ConfigVersionSyncHandler implements Lifecycle, Runnable { private static final LinkedBlockingQueue QUEUE = new LinkedBlockingQueue<>(256); public static final String URL = "http://{0}:{1}/{2}/retry/sync/version/v1"; - public static Thread THREAD = null; + public Thread THREAD = null; @Autowired private RestTemplate restTemplate; @Autowired diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/listener/RetryTaskFailDeadLetterAlarmListener.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/listener/RetryTaskFailDeadLetterAlarmListener.java index cc18ca046..2d64c2c58 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/listener/RetryTaskFailDeadLetterAlarmListener.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/listener/RetryTaskFailDeadLetterAlarmListener.java @@ -1,44 +1,44 @@ package com.aizuda.easy.retry.server.retry.task.support.listener; -import com.aizuda.easy.retry.common.core.alarm.Alarm; + import com.aizuda.easy.retry.common.core.alarm.AlarmContext; -import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory; -import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum; -import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.util.EnvironmentUtils; import com.aizuda.easy.retry.common.core.util.HostUtils; import com.aizuda.easy.retry.server.common.Lifecycle; import com.aizuda.easy.retry.server.common.flow.control.AbstractFlowControl; import com.aizuda.easy.retry.server.common.util.DateUtils; -import com.aizuda.easy.retry.server.retry.task.support.cache.CacheNotifyRateLimiter; +import com.aizuda.easy.retry.server.common.triple.ImmutableTriple; +import com.aizuda.easy.retry.server.common.triple.Triple; import com.aizuda.easy.retry.server.retry.task.support.event.RetryTaskFailDeadLetterAlarmEvent; import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; import com.aizuda.easy.retry.template.datasource.persistence.po.NotifyConfig; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter; -import com.google.common.cache.Cache; -import com.google.common.util.concurrent.RateLimiter; import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; + import java.time.LocalDateTime; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** * 重试任务失败进入死信队列监听器 + * * @author: zuoJunLin * @date : 2023-11-20 21:40 * @since 2.5.0 */ @Component @Slf4j -public class RetryTaskFailDeadLetterAlarmListener extends AbstractFlowControl implements Runnable, Lifecycle, ApplicationListener { +public class RetryTaskFailDeadLetterAlarmListener extends AbstractFlowControl implements Runnable, Lifecycle { /** * 死信告警数据 @@ -46,89 +46,109 @@ public class RetryTaskFailDeadLetterAlarmListener extends AbstractFlowControl im private LinkedBlockingQueue> queue = new LinkedBlockingQueue<>(1000); private static String retryTaskDeadTextMessagesFormatter = - "{}环境 重试任务失败进入死信队列 \n" + + "{}环境 重试任务失败进入死信队列 \n" + "> 组名称:{} \n" + "> 执行器名称:{} \n" + "> 场景名称:{} \n" + "> 业务数据:{} \n" + "> 时间:{} \n"; - - @Autowired - private EasyRetryAlarmFactory easyRetryAlarmFactory; - @Autowired protected AccessTemplate accessTemplate; + private Thread thread; + @Override public void start() { - new Thread(this).start(); + thread = new Thread(this); + thread.start(); } @Override public void close() { - + if (Objects.nonNull(thread)) { + thread.interrupt(); + } } @Override public void run() { - LogUtils.info(log, "RetryTaskFailDeadLetterAlarmListener time[{}] ip:[{}]", LocalDateTime.now(), HostUtils.getIp()); - for (; ; ) { + LogUtils.info(log, "RetryTaskFailDeadLetterAlarmListener time[{}] ip:[{}]", LocalDateTime.now(), + HostUtils.getIp()); + while (!Thread.currentThread().isInterrupted()) { try { List allRetryDeadLetterList = queue.poll(5, TimeUnit.SECONDS); if (CollectionUtils.isEmpty(allRetryDeadLetterList)) { continue; } - //组分组 - Map> groupNameMap = allRetryDeadLetterList.stream() - .collect(Collectors.groupingBy(RetryDeadLetter::getGroupName)); - groupNameMap.forEach(((groupName, groupRetryDeadLetterList) -> { - //场景分组 - Map> sceneNameMap = groupRetryDeadLetterList.stream() - .collect(Collectors.groupingBy(RetryDeadLetter::getSceneName)); - sceneNameMap.forEach(((sceneName, sceneRetryDeadLetterList) -> { - //获取通知配置 - List notifyConfigs = accessTemplate.getNotifyConfigAccess().getNotifyConfigByGroupNameAndSceneName(groupName, sceneName, NotifySceneEnum.RETRY_TASK_ENTER_DEAD_LETTER.getNotifyScene()); - for (RetryDeadLetter retryDeadLetter : sceneRetryDeadLetterList) { - for (NotifyConfig notifyConfig : notifyConfigs) { - if (Objects.equals(notifyConfig.getRateLimiterStatus(),StatusEnum.YES.getStatus())) { - //限流 - RateLimiter rateLimiter = getRateLimiter(CacheNotifyRateLimiter.getAll(), String.valueOf(notifyConfig.getId()), notifyConfig.getRateLimiterThreshold()); - if (Objects.nonNull(rateLimiter) && !rateLimiter.tryAcquire(100, TimeUnit.MILLISECONDS)) { - LogUtils.warn(log, "组:[{}] 场景:[{}] 幂等id:[{}] 任务重试失败进入死信队列已到达最大限流阈值,本次通知不执行", groupName, sceneName, retryDeadLetter.getIdempotentId()); - continue; - } - } - // 预警 - AlarmContext context = AlarmContext.build().text(retryTaskDeadTextMessagesFormatter, - EnvironmentUtils.getActiveProfile(), - retryDeadLetter.getGroupName(), - retryDeadLetter.getExecutorName(), - retryDeadLetter.getSceneName(), - retryDeadLetter.getArgsStr(), - DateUtils.format(retryDeadLetter.getCreateDt(), DateUtils.NORM_DATETIME_PATTERN)) - .title("{}环境 重试任务失败进入死信队列", EnvironmentUtils.getActiveProfile()) - .notifyAttribute(notifyConfig.getNotifyAttribute()); - Alarm alarmType = easyRetryAlarmFactory.getAlarmType(notifyConfig.getNotifyType()); - alarmType.asyncSendMessage(context); - } - } - })); - })); + + Set namespaceIds = new HashSet<>(); + Set groupNames = new HashSet<>(); + Set sceneNames = new HashSet<>(); + Map, List> retryDeadLetterMap = getRetryDeadLetterMap( + allRetryDeadLetterList, namespaceIds, groupNames, sceneNames); + + Map, List> notifyConfigMap = getNotifyConfigMap( + namespaceIds, groupNames, sceneNames); + if (notifyConfigMap == null) { + continue; + } + + // 循环发送消息 + retryDeadLetterMap.forEach((key, list) -> { + List notifyConfigsList = notifyConfigMap.get(key); + for (RetryDeadLetter retryDeadLetter : list) { + doSendAlarm(key, notifyConfigsList, AlarmDTOConverter.INSTANCE.toAlarmDTO(retryDeadLetter)); + } + }); + } catch (InterruptedException e) { + LogUtils.info(log, "retry task fail dead letter alarm stop"); + Thread.currentThread().interrupt(); } catch (Exception e) { LogUtils.error(log, "RetryTaskFailDeadLetterAlarmListener queue poll Exception", e); } } } + @NotNull + private static Map, List> getRetryDeadLetterMap( + final List allRetryDeadLetterList, final Set namespaceIds, + final Set groupNames, final Set sceneNames) { + return allRetryDeadLetterList.stream() + .collect(Collectors.groupingBy(i -> { + + String namespaceId = i.getNamespaceId(); + String groupName = i.getGroupName(); + String sceneName = i.getSceneName(); + + namespaceIds.add(namespaceId); + groupNames.add(groupName); + sceneNames.add(sceneName); + + return ImmutableTriple.of(namespaceId, groupName, sceneName); + })); + } + @Override public void onApplicationEvent(RetryTaskFailDeadLetterAlarmEvent event) { - try { - queue.put(event.getRetryDeadLetters()); - } catch (InterruptedException e) { - LogUtils.error(log, "RetryTaskFailDeadLetterAlarmListener queue push Exception", e); + if (queue.offer(event.getRetryDeadLetters())) { + LogUtils.warn(log, "任务重试失败进入死信队列告警队列已满"); } } + @Override + protected AlarmContext buildAlarmContext(final AlarmDTO alarmDTO, final NotifyConfig notifyConfig) { + // 预警 + return AlarmContext.build().text(retryTaskDeadTextMessagesFormatter, + EnvironmentUtils.getActiveProfile(), + alarmDTO.getGroupName(), + alarmDTO.getExecutorName(), + alarmDTO.getSceneName(), + alarmDTO.getArgsStr(), + DateUtils.format(alarmDTO.getCreateDt(), DateUtils.NORM_DATETIME_PATTERN)) + .title("组:[{}] 场景:[{}] 环境重试任务失败进入死信队列", + alarmDTO.getGroupName(), alarmDTO.getSceneName()) + .notifyAttribute(notifyConfig.getNotifyAttribute()); + } } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/listener/RetryTaskFailMoreThresholdAlarmListener.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/listener/RetryTaskFailMoreThresholdAlarmListener.java index 7c1a3a17e..1a4fedfd5 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/listener/RetryTaskFailMoreThresholdAlarmListener.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/listener/RetryTaskFailMoreThresholdAlarmListener.java @@ -1,48 +1,50 @@ package com.aizuda.easy.retry.server.retry.task.support.listener; -import com.aizuda.easy.retry.common.core.alarm.Alarm; + import com.aizuda.easy.retry.common.core.alarm.AlarmContext; -import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory; -import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum; -import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.util.EnvironmentUtils; import com.aizuda.easy.retry.common.core.util.HostUtils; import com.aizuda.easy.retry.server.common.Lifecycle; import com.aizuda.easy.retry.server.common.flow.control.AbstractFlowControl; import com.aizuda.easy.retry.server.common.util.DateUtils; -import com.aizuda.easy.retry.server.retry.task.support.cache.CacheNotifyRateLimiter; +import com.aizuda.easy.retry.server.common.triple.ImmutableTriple; +import com.aizuda.easy.retry.server.common.triple.Triple; import com.aizuda.easy.retry.server.retry.task.support.event.RetryTaskFailMoreThresholdAlarmEvent; -import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; import com.aizuda.easy.retry.template.datasource.persistence.po.NotifyConfig; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; -import com.google.common.util.concurrent.RateLimiter; +import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationListener; +import org.jetbrains.annotations.NotNull; import org.springframework.stereotype.Component; + import java.time.LocalDateTime; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * 重试任务失败数量超过阈值监听器 + * * @author: zuoJunLin * @date : 2023-11-20 21:40 * @since 2.5.0 */ @Component @Slf4j -public class RetryTaskFailMoreThresholdAlarmListener extends AbstractFlowControl implements Runnable, Lifecycle, ApplicationListener { +public class RetryTaskFailMoreThresholdAlarmListener extends + AbstractFlowControl implements Runnable, Lifecycle { /** * 重试任务失败数量超过阈值告警数据 */ private LinkedBlockingQueue queue = new LinkedBlockingQueue<>(1000); - - private static String retryTaskDeadTextMessagesFormatter = - "{}环境 重试任务失败数量超过阈值 \n" + + private Thread thread; + private static String retryTaskFailMoreThresholdMessagesFormatter = + "{}环境 重试任务失败数量超过阈值 \n" + "> 组名称:{} \n" + "> 执行器名称:{} \n" + "> 场景名称:{} \n" + @@ -50,60 +52,55 @@ public class RetryTaskFailMoreThresholdAlarmListener extends AbstractFlowControl "> 业务数据:{} \n" + "> 时间:{} \n"; - - @Autowired - private EasyRetryAlarmFactory easyRetryAlarmFactory; - - @Autowired - protected AccessTemplate accessTemplate; - @Override public void start() { - new Thread(this).start(); + thread = new Thread(this); + thread.start(); } @Override public void close() { - + if (Objects.nonNull(thread)) { + thread.interrupt(); + } } @Override public void run() { - LogUtils.info(log, "RetryTaskFailMoreThresholdAlarmListener time[{}] ip:[{}]", LocalDateTime.now(), HostUtils.getIp()); - for (; ; ) { + LogUtils.info(log, "RetryTaskFailMoreThresholdAlarmListener time[{}] ip:[{}]", LocalDateTime.now(), + HostUtils.getIp()); + while (!Thread.currentThread().isInterrupted()) { try { - RetryTask retryTask = queue.poll(5, TimeUnit.SECONDS); - if (Objects.isNull(retryTask)) { + + // 无数据时阻塞线程 + RetryTask retryTask = queue.take(); + + // 拉取100条 + List lists = Lists.newArrayList(retryTask); + queue.drainTo(lists, 200); + + Set namespaceIds = new HashSet<>(); + Set groupNames = new HashSet<>(); + Set sceneNames = new HashSet<>(); + Map, List> retryTaskMap = getRetryTaskMap( + lists, namespaceIds, groupNames, sceneNames); + + Map, List> notifyConfigMap = getNotifyConfigMap( + namespaceIds, groupNames, sceneNames); + if (notifyConfigMap == null) { continue; } - //获取通知配置 - List notifyConfigs = accessTemplate.getNotifyConfigAccess().getNotifyConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName(), NotifySceneEnum.RETRY_TASK_REACH_THRESHOLD.getNotifyScene()); - for (NotifyConfig notifyConfig : notifyConfigs) { - //阈值判断 - if (retryTask.getRetryCount() >= notifyConfig.getNotifyThreshold()) { - //限流判断 - if (Objects.equals(notifyConfig.getRateLimiterStatus(), StatusEnum.YES.getStatus())) { - RateLimiter rateLimiter = getRateLimiter(CacheNotifyRateLimiter.getAll(), String.valueOf(notifyConfig.getId()), notifyConfig.getRateLimiterThreshold()); - if (Objects.nonNull(rateLimiter) && !rateLimiter.tryAcquire(100, TimeUnit.MILLISECONDS)) { - LogUtils.warn(log, "组:[{}] 场景:[{}] 通知阈值:[{}] 幂等id:[{}] 重试任务失败数量超过阈值已到达最大限流阈值,本次通知不执行", notifyConfig.getGroupName(), notifyConfig.getSceneName(), notifyConfig.getNotifyThreshold(), retryTask.getIdempotentId()); - continue; - } - } - // 预警 - AlarmContext context = AlarmContext.build().text(retryTaskDeadTextMessagesFormatter, - EnvironmentUtils.getActiveProfile(), - retryTask.getGroupName(), - retryTask.getExecutorName(), - retryTask.getSceneName(), - retryTask.getRetryCount(), - retryTask.getArgsStr(), - DateUtils.format(retryTask.getCreateDt(), DateUtils.NORM_DATETIME_PATTERN)) - .title("{}环境 环境重试任务失败数量超过阈值", EnvironmentUtils.getActiveProfile()) - .notifyAttribute(notifyConfig.getNotifyAttribute()); - Alarm alarmType = easyRetryAlarmFactory.getAlarmType(notifyConfig.getNotifyType()); - alarmType.asyncSendMessage(context); + + // 循环发送消息 + retryTaskMap.forEach((key, list) -> { + List notifyConfigsList = notifyConfigMap.get(key); + for (RetryTask retryTask1 : list) { + doSendAlarm(key, notifyConfigsList, AlarmDTOConverter.INSTANCE.toAlarmDTO(retryTask1)); } - } + }); + } catch (InterruptedException e) { + LogUtils.info(log, "retry task fail more alarm stop"); + Thread.currentThread().interrupt(); } catch (Exception e) { LogUtils.error(log, "RetryTaskFailMoreThresholdAlarmListener queue poll Exception", e); } @@ -112,10 +109,45 @@ public class RetryTaskFailMoreThresholdAlarmListener extends AbstractFlowControl @Override public void onApplicationEvent(RetryTaskFailMoreThresholdAlarmEvent event) { - try { - queue.put(event.getRetryTask()); - } catch (InterruptedException e) { - LogUtils.error(log, "RetryTaskFailMoreThresholdAlarmListener queue push Exception", e); + if (queue.offer(event.getRetryTask())) { + LogUtils.warn(log, "任务失败数量超过阈值告警队列已满"); } } + + @Override + protected AlarmContext buildAlarmContext(AlarmDTO alarmDTO, NotifyConfig notifyConfig) { + // 预警 + return AlarmContext.build().text(retryTaskFailMoreThresholdMessagesFormatter, + EnvironmentUtils.getActiveProfile(), + alarmDTO.getGroupName(), + alarmDTO.getExecutorName(), + alarmDTO.getSceneName(), + alarmDTO.getRetryCount(), + alarmDTO.getArgsStr(), + DateUtils.format(alarmDTO.getCreateDt(), DateUtils.NORM_DATETIME_PATTERN)) + .title("组:[{}] 场景:[{}] 环境重试任务失败数量超过阈值", alarmDTO.getGroupName(), alarmDTO.getSceneName()) + .notifyAttribute(notifyConfig.getNotifyAttribute()); + } + + @NotNull + private static Map, List> getRetryTaskMap( + final List list, final Set namespaceIds, + final Set groupNames, final Set sceneNames) { + + return list.stream() + .collect(Collectors.groupingBy(retryTask -> { + String namespaceId = retryTask.getNamespaceId(); + String groupName = retryTask.getGroupName(); + String sceneName = retryTask.getSceneName(); + + namespaceIds.add(namespaceId); + groupNames.add(groupName); + sceneNames.add(sceneName); + + return ImmutableTriple.of(namespaceId, groupName, sceneName); + })); + + } + + }