diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/alarm/DingDingAttribute.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/alarm/DingDingAttribute.java index 892f2121..dcdc4ffc 100644 --- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/alarm/DingDingAttribute.java +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/alarm/DingDingAttribute.java @@ -2,6 +2,8 @@ package com.aizuda.easy.retry.common.core.alarm; import lombok.Data; +import java.util.List; + /** * @author: www.byteblogs.com * @date : 2022-05-04 16:13 @@ -11,4 +13,7 @@ public class DingDingAttribute { private String dingDingUrl; + private List ats; + + private boolean isAtAll; } diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/alarm/strategy/DingdingAlarm.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/alarm/strategy/DingdingAlarm.java index ee40463c..62caae59 100644 --- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/alarm/strategy/DingdingAlarm.java +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/alarm/strategy/DingdingAlarm.java @@ -26,7 +26,7 @@ public class DingdingAlarm extends AbstractAlarm { DingDingAttribute dingDingAttribute = JsonUtil.parseObject(context.getNotifyAttribute(), DingDingAttribute.class); threadPoolExecutor.execute(() -> - DingDingUtils.sendMessage(DingDingUtils.buildSendRequest(context.getTitle(), context.getText()), dingDingAttribute.getDingDingUrl())); + DingDingUtils.sendMessage(DingDingUtils.buildSendRequest(context.getTitle(), context.getText(),dingDingAttribute.getAts(),dingDingAttribute.isAtAll()), dingDingAttribute.getDingDingUrl())); return true; } @@ -34,7 +34,7 @@ public class DingdingAlarm extends AbstractAlarm { @Override public boolean syncSendMessage(AlarmContext context) { DingDingAttribute dingDingAttribute = JsonUtil.parseObject(context.getNotifyAttribute(), DingDingAttribute.class); - return DingDingUtils.sendMessage(DingDingUtils.buildSendRequest(context.getTitle(), context.getText()), dingDingAttribute.getDingDingUrl()); + return DingDingUtils.sendMessage(DingDingUtils.buildSendRequest(context.getTitle(), context.getText(),dingDingAttribute.getAts(),dingDingAttribute.isAtAll()), dingDingAttribute.getDingDingUrl()); } @Override diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/NotifySceneEnum.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/NotifySceneEnum.java index 1a5b6767..4b992f39 100644 --- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/NotifySceneEnum.java +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/enums/NotifySceneEnum.java @@ -11,13 +11,18 @@ import lombok.Getter; @Getter public enum NotifySceneEnum { - MAX_RETRY(1, "重试数量超过阈值", NodeTypeEnum.SERVER), + MAX_RETRY(1, "场景重试数量超过阈值", NodeTypeEnum.SERVER), - MAX_RETRY_ERROR(2, "重试失败数量超过阈值", NodeTypeEnum.SERVER), + MAX_RETRY_ERROR(2, "场景重试失败数量超过阈值", NodeTypeEnum.SERVER), CLIENT_REPORT_ERROR(3,"客户端上报失败", NodeTypeEnum.CLIENT), - CLIENT_COMPONENT_ERROR(4,"客户端组件异常", NodeTypeEnum.CLIENT); + CLIENT_COMPONENT_ERROR(4,"客户端组件异常", NodeTypeEnum.CLIENT), + + RETRY_TASK_REACH_THRESHOLD(5, "任务重试失败数量超过阈值", NodeTypeEnum.SERVER), + + RETRY_TASK_ENTER_DEAD_LETTER(6, "任务重试失败进入死信队列", NodeTypeEnum.SERVER); + /** * 通知场景 diff --git a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/util/DingDingUtils.java b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/util/DingDingUtils.java index 2d4b45d2..b1aba572 100644 --- a/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/util/DingDingUtils.java +++ b/easy-retry-common/easy-retry-common-core/src/main/java/com/aizuda/easy/retry/common/core/util/DingDingUtils.java @@ -5,8 +5,12 @@ import com.dingtalk.api.DefaultDingTalkClient; import com.dingtalk.api.DingTalkClient; import com.dingtalk.api.request.OapiRobotSendRequest; import lombok.extern.slf4j.Slf4j; +import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; +import java.util.Arrays; +import java.util.List; + /** * @author: www.byteblogs.com * @date : 2021-11-25 09:54 @@ -36,18 +40,31 @@ public class DingDingUtils { * @param text * @return */ - public static OapiRobotSendRequest buildSendRequest(String title, String text) { - + public static OapiRobotSendRequest buildSendRequest(String title, String text, List ats,boolean isAtAll) { OapiRobotSendRequest request = new OapiRobotSendRequest(); request.setMsgtype("markdown"); OapiRobotSendRequest.Markdown markdown = new OapiRobotSendRequest.Markdown(); markdown.setTitle(title); - markdown.setText(subTextLength(text)); + markdown.setText(subTextLength(getAtText(ats,text))); request.setMarkdown(markdown); + OapiRobotSendRequest.At at = new OapiRobotSendRequest.At(); + at.setAtMobiles(ats); + at.setIsAtAll(isAtAll); + request.setAt(at); return request; } + private static String getAtText(List ats, String text) { + if(CollectionUtils.isEmpty(ats)){ + return text; + } + for(String at: ats){ + text = "@" + at; + } + return text; + } + /** * * @param request diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/NotifyConfig.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/NotifyConfig.java index f15765f9..6424555b 100644 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/NotifyConfig.java +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/NotifyConfig.java @@ -28,7 +28,7 @@ public class NotifyConfig implements Serializable { private Integer rateLimiterStatus; - private Integer rateLimiterThreshold; + private int rateLimiterThreshold; private String description; diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/FlowControl.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/FlowControl.java new file mode 100644 index 00000000..adf5453a --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/FlowControl.java @@ -0,0 +1,16 @@ +package com.aizuda.easy.retry.server.common; +import com.google.common.cache.Cache; +import com.google.common.util.concurrent.RateLimiter; + +/** + * 流量控制 + * @author: zuoJunLin + * @date : 2023-11-21 13:04 + * @since 2.5.0 + */ +public interface FlowControl { + + RateLimiter getRateLimiter(Cache rateLimiterCache, String key, double rateLimiterThreshold); +} + + diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/flow/control/AbstractFlowControl.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/flow/control/AbstractFlowControl.java new file mode 100644 index 00000000..0c71c212 --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/flow/control/AbstractFlowControl.java @@ -0,0 +1,25 @@ +package com.aizuda.easy.retry.server.common.flow.control; +import com.aizuda.easy.retry.server.common.FlowControl; +import com.google.common.cache.Cache; +import com.google.common.util.concurrent.RateLimiter; +import lombok.extern.slf4j.Slf4j; + +import java.util.Objects; + + +/** + * @author: zuoJunLin + * @date : 2023-11-21 13:04 + * @since 2.5.0 + */ +@Slf4j +public abstract class AbstractFlowControl implements FlowControl { + + public RateLimiter getRateLimiter(Cache rateLimiterCache, String key, double rateLimiterThreshold) { + RateLimiter rateLimiter = rateLimiterCache.getIfPresent(key); + if (Objects.isNull(rateLimiter)||rateLimiter.getRate()!=rateLimiterThreshold) { + rateLimiterCache.put(key, RateLimiter.create(rateLimiterThreshold)); + } + return rateLimiterCache.getIfPresent(key); + } +} diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/service/impl/RetryServiceImpl.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/service/impl/RetryServiceImpl.java index c3ddfd94..5282e608 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/service/impl/RetryServiceImpl.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/service/impl/RetryServiceImpl.java @@ -1,39 +1,27 @@ package com.aizuda.easy.retry.server.retry.task.service.impl; - import cn.hutool.core.lang.Assert; import cn.hutool.core.util.StrUtil; import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; -import com.aizuda.easy.retry.common.core.enums.StatusEnum; -import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.util.JsonUtil; -import com.aizuda.easy.retry.server.common.enums.DelayLevelEnum; import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; -import com.aizuda.easy.retry.server.common.generator.id.IdGenerator; -import com.aizuda.easy.retry.server.common.util.DateUtils; -import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO; import com.aizuda.easy.retry.server.retry.task.service.RetryDeadLetterConverter; import com.aizuda.easy.retry.server.retry.task.service.RetryService; -import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter; -import com.aizuda.easy.retry.server.retry.task.support.RetryTaskLogConverter; -import com.aizuda.easy.retry.server.common.strategy.WaitStrategies; +import com.aizuda.easy.retry.server.retry.task.support.event.RetryTaskFailDeadLetterAlarmEvent; import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; -import com.aizuda.easy.retry.template.datasource.access.ConfigAccess; import com.aizuda.easy.retry.template.datasource.access.TaskAccess; -import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.*; import com.aizuda.easy.retry.template.datasource.utils.RequestDataHelper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; - import java.time.LocalDateTime; import java.util.*; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** @@ -49,6 +37,9 @@ public class RetryServiceImpl implements RetryService { @Autowired private AccessTemplate accessTemplate; + @Autowired + private ApplicationContext context; + @Transactional @Override public Boolean moveDeadLetterAndDelFinish(String groupName) { @@ -123,7 +114,6 @@ public class RetryServiceImpl implements RetryService { .delete(groupName, new LambdaQueryWrapper() .eq(RetryTask::getGroupName, groupName).in(RetryTask::getId, waitDelRetryFinishSet)), () -> new EasyRetryServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retryTasks))); - return Boolean.TRUE; } @@ -152,6 +142,8 @@ public class RetryServiceImpl implements RetryService { Assert.isTrue(retryTasks.size() == retryTaskAccess.delete(groupName, new LambdaQueryWrapper() .eq(RetryTask::getGroupName, groupName).in(RetryTask::getId, ids)), () -> new EasyRetryServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retryTasks))); + + context.publishEvent(new RetryTaskFailDeadLetterAlarmEvent(retryDeadLetters)); } } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/cache/CacheNotifyRateLimiter.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/cache/CacheNotifyRateLimiter.java new file mode 100644 index 00000000..1ae7e333 --- /dev/null +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/cache/CacheNotifyRateLimiter.java @@ -0,0 +1,60 @@ +package com.aizuda.easy.retry.server.retry.task.support.cache; + +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.common.Lifecycle; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.util.concurrent.RateLimiter; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.concurrent.TimeUnit; + +/** + * 缓存通知限流组件 + * + * @author zuoJunLin + * @date 2023-11-20 + * @since 2.5.0 + */ +@Component +@Data +@Slf4j +public class CacheNotifyRateLimiter implements Lifecycle { + + private static Cache CACHE; + + /** + * 获取所有缓存 + * + * @return 缓存对象 + */ + public static Cache getAll() { + return CACHE; + } + + /** + * 获取所有缓存 + * + * @return 缓存对象 + */ + public static RateLimiter getRateLimiterByKey(String key) { + return CACHE.getIfPresent(key); + } + + @Override + public void start() { + LogUtils.info(log, "CacheNotifyRateLimiter start"); + CACHE = CacheBuilder.newBuilder() + // 设置并发级别为cpu核心数 + .concurrencyLevel(Runtime.getRuntime().availableProcessors()) + .expireAfterWrite(30, TimeUnit.MINUTES) + .build(); + } + + @Override + public void close() { + LogUtils.info(log, "CacheNotifyRateLimiter stop"); + } +} diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/result/FailureActor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/result/FailureActor.java index 3c26ddbd..1ff687b5 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/result/FailureActor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/result/FailureActor.java @@ -11,6 +11,7 @@ import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO; +import com.aizuda.easy.retry.server.retry.task.support.event.RetryTaskFailMoreThresholdAlarmEvent; import com.aizuda.easy.retry.server.retry.task.support.handler.CallbackRetryTaskHandler; import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; @@ -19,6 +20,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import org.springframework.transaction.TransactionStatus; @@ -38,7 +40,8 @@ import java.time.LocalDateTime; @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) @Slf4j public class FailureActor extends AbstractActor { - + @Autowired + private ApplicationContext context; @Autowired private AccessTemplate accessTemplate; @Autowired @@ -84,6 +87,8 @@ public class FailureActor extends AbstractActor { .updateById(retryTask.getGroupName(), retryTask), () -> new EasyRetryServerException("更新重试任务失败. groupName:[{}] uniqueId:[{}]", retryTask.getGroupName(), retryTask.getUniqueId())); + + context.publishEvent(new RetryTaskFailMoreThresholdAlarmEvent(retryTask)); } }); } catch (Exception e) { diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/event/RetryTaskFailDeadLetterAlarmEvent.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/event/RetryTaskFailDeadLetterAlarmEvent.java new file mode 100644 index 00000000..2b352cd8 --- /dev/null +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/event/RetryTaskFailDeadLetterAlarmEvent.java @@ -0,0 +1,25 @@ +package com.aizuda.easy.retry.server.retry.task.support.event; + +import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter; +import org.springframework.context.ApplicationEvent; + +import java.util.List; + +/** + * 重试任务失败进入死信队列事件 + * @author: zuoJunLin + * @date : 2023-11-20 21:40 + * @since 2.5.0 + */ +public class RetryTaskFailDeadLetterAlarmEvent extends ApplicationEvent { + private List retryDeadLetters; + + public RetryTaskFailDeadLetterAlarmEvent(List retryDeadLetters) { + super(retryDeadLetters); + this.retryDeadLetters=retryDeadLetters; + } + + public List getRetryDeadLetters() { + return retryDeadLetters; + } +} diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/event/RetryTaskFailMoreThresholdAlarmEvent.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/event/RetryTaskFailMoreThresholdAlarmEvent.java new file mode 100644 index 00000000..a5a5d6a7 --- /dev/null +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/event/RetryTaskFailMoreThresholdAlarmEvent.java @@ -0,0 +1,22 @@ +package com.aizuda.easy.retry.server.retry.task.support.event; +import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; +import org.springframework.context.ApplicationEvent; + +/** + * 重试任务失败数量超过阈值事件 + * @author: zuoJunLin + * @date : 2023-11-20 21:40 + * @since 2.5.0 + */ +public class RetryTaskFailMoreThresholdAlarmEvent extends ApplicationEvent { + private RetryTask retryTask; + + public RetryTaskFailMoreThresholdAlarmEvent(RetryTask retryTask) { + super(retryTask); + this.retryTask=retryTask; + } + + public RetryTask getRetryTask() { + return retryTask; + } +} diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/listener/RetryTaskFailDeadLetterAlarmListener.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/listener/RetryTaskFailDeadLetterAlarmListener.java new file mode 100644 index 00000000..1d1ab367 --- /dev/null +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/listener/RetryTaskFailDeadLetterAlarmListener.java @@ -0,0 +1,134 @@ +package com.aizuda.easy.retry.server.retry.task.support.listener; +import com.aizuda.easy.retry.common.core.alarm.Alarm; +import com.aizuda.easy.retry.common.core.alarm.AlarmContext; +import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory; +import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum; +import com.aizuda.easy.retry.common.core.enums.StatusEnum; +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.common.core.util.EnvironmentUtils; +import com.aizuda.easy.retry.common.core.util.HostUtils; +import com.aizuda.easy.retry.server.common.Lifecycle; +import com.aizuda.easy.retry.server.common.flow.control.AbstractFlowControl; +import com.aizuda.easy.retry.server.common.util.DateUtils; +import com.aizuda.easy.retry.server.retry.task.support.cache.CacheNotifyRateLimiter; +import com.aizuda.easy.retry.server.retry.task.support.event.RetryTaskFailDeadLetterAlarmEvent; +import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; +import com.aizuda.easy.retry.template.datasource.persistence.po.NotifyConfig; +import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter; +import com.google.common.cache.Cache; +import com.google.common.util.concurrent.RateLimiter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; +import java.time.LocalDateTime; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * 重试任务失败进入死信队列监听器 + * @author: zuoJunLin + * @date : 2023-11-20 21:40 + * @since 2.5.0 + */ +@Component +@Slf4j +public class RetryTaskFailDeadLetterAlarmListener extends AbstractFlowControl implements Runnable, Lifecycle, ApplicationListener { + + /** + * 死信告警数据 + */ + private LinkedBlockingQueue> queue = new LinkedBlockingQueue<>(1000); + + private static String retryTaskDeadTextMessagesFormatter = + "{}环境 重试任务失败进入死信队列 \n" + + "> 组名称:{} \n" + + "> 执行器名称:{} \n" + + "> 场景名称:{} \n" + + "> 业务数据:{} \n" + + "> 时间:{} \n"; + + + @Autowired + private EasyRetryAlarmFactory easyRetryAlarmFactory; + + @Autowired + protected AccessTemplate accessTemplate; + + @Override + public void start() { + new Thread(this).start(); + } + + @Override + public void close() { + + } + + @Override + public void run() { + LogUtils.info(log, "RetryTaskFailDeadLetterAlarmListener time[{}] ip:[{}]", LocalDateTime.now(), HostUtils.getIp()); + for (; ; ) { + try { + List allRetryDeadLetterList = queue.poll(5, TimeUnit.SECONDS); + if (CollectionUtils.isEmpty(allRetryDeadLetterList)) { + continue; + } + //组分组 + Map> groupNameMap = allRetryDeadLetterList.stream() + .collect(Collectors.groupingBy(RetryDeadLetter::getGroupName)); + groupNameMap.forEach(((groupName, groupRetryDeadLetterList) -> { + //场景分组 + Map> sceneNameMap = groupRetryDeadLetterList.stream() + .collect(Collectors.groupingBy(RetryDeadLetter::getSceneName)); + sceneNameMap.forEach(((sceneName, sceneRetryDeadLetterList) -> { + //获取通知配置 + List notifyConfigs = accessTemplate.getNotifyConfigAccess().getNotifyConfigByGroupNameAndSceneName(groupName, sceneName, NotifySceneEnum.RETRY_TASK_ENTER_DEAD_LETTER.getNotifyScene()); + for (RetryDeadLetter retryDeadLetter : sceneRetryDeadLetterList) { + for (NotifyConfig notifyConfig : notifyConfigs) { + if (Objects.equals(notifyConfig.getRateLimiterStatus(),StatusEnum.YES.getStatus())) { + //限流 + RateLimiter rateLimiter = getRateLimiter(CacheNotifyRateLimiter.getAll(), String.valueOf(notifyConfig.getId()), notifyConfig.getRateLimiterThreshold()); + if (Objects.nonNull(rateLimiter) && !rateLimiter.tryAcquire(100, TimeUnit.MILLISECONDS)) { + LogUtils.warn(log, "组:[{}] 场景:[{}] 幂等id:[{}] 任务重试失败进入死信队列已到达最大限流阈值,本次通知不执行", groupName, sceneName, retryDeadLetter.getIdempotentId()); + continue; + } + } + // 预警 + AlarmContext context = AlarmContext.build().text(retryTaskDeadTextMessagesFormatter, + EnvironmentUtils.getActiveProfile(), + retryDeadLetter.getGroupName(), + retryDeadLetter.getExecutorName(), + retryDeadLetter.getSceneName(), + retryDeadLetter.getArgsStr(), + DateUtils.format(retryDeadLetter.getCreateDt(), DateUtils.NORM_DATETIME_PATTERN)) + .title("组:[{}] 场景:[{}] 环境重试任务失败进入死信队列", retryDeadLetter.getGroupName(), retryDeadLetter.getSceneName()) + .notifyAttribute(notifyConfig.getNotifyAttribute()); + Alarm alarmType = easyRetryAlarmFactory.getAlarmType(notifyConfig.getNotifyType()); + alarmType.asyncSendMessage(context); + } + } + })); + })); + } catch (Exception e) { + LogUtils.error(log, "RetryTaskFailDeadLetterAlarmListener queue poll Exception", e); + } + } + } + + @Override + public void onApplicationEvent(RetryTaskFailDeadLetterAlarmEvent event) { + try { + queue.put(event.getRetryDeadLetters()); + } catch (InterruptedException e) { + LogUtils.error(log, "RetryTaskFailDeadLetterAlarmListener queue push Exception", e); + } + } + + +} diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/listener/RetryTaskFailMoreThresholdAlarmListener.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/listener/RetryTaskFailMoreThresholdAlarmListener.java new file mode 100644 index 00000000..9611580f --- /dev/null +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/listener/RetryTaskFailMoreThresholdAlarmListener.java @@ -0,0 +1,121 @@ +package com.aizuda.easy.retry.server.retry.task.support.listener; +import com.aizuda.easy.retry.common.core.alarm.Alarm; +import com.aizuda.easy.retry.common.core.alarm.AlarmContext; +import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory; +import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum; +import com.aizuda.easy.retry.common.core.enums.StatusEnum; +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.common.core.util.EnvironmentUtils; +import com.aizuda.easy.retry.common.core.util.HostUtils; +import com.aizuda.easy.retry.server.common.Lifecycle; +import com.aizuda.easy.retry.server.common.flow.control.AbstractFlowControl; +import com.aizuda.easy.retry.server.common.util.DateUtils; +import com.aizuda.easy.retry.server.retry.task.support.cache.CacheNotifyRateLimiter; +import com.aizuda.easy.retry.server.retry.task.support.event.RetryTaskFailMoreThresholdAlarmEvent; +import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; +import com.aizuda.easy.retry.template.datasource.persistence.po.NotifyConfig; +import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; +import com.google.common.util.concurrent.RateLimiter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; +import java.time.LocalDateTime; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * 重试任务失败数量超过阈值监听器 + * @author: zuoJunLin + * @date : 2023-11-20 21:40 + * @since 2.5.0 + */ +@Component +@Slf4j +public class RetryTaskFailMoreThresholdAlarmListener extends AbstractFlowControl implements Runnable, Lifecycle, ApplicationListener { + + /** + * 重试任务失败数量超过阈值告警数据 + */ + private LinkedBlockingQueue queue = new LinkedBlockingQueue<>(1000); + + private static String retryTaskDeadTextMessagesFormatter = + "{}环境 重试任务失败数量超过阈值 \n" + + "> 组名称:{} \n" + + "> 执行器名称:{} \n" + + "> 场景名称:{} \n" + + "> 重试次数:{} \n" + + "> 业务数据:{} \n" + + "> 时间:{} \n"; + + + @Autowired + private EasyRetryAlarmFactory easyRetryAlarmFactory; + + @Autowired + protected AccessTemplate accessTemplate; + + @Override + public void start() { + new Thread(this).start(); + } + + @Override + public void close() { + + } + + @Override + public void run() { + LogUtils.info(log, "RetryTaskFailMoreThresholdAlarmListener time[{}] ip:[{}]", LocalDateTime.now(), HostUtils.getIp()); + for (; ; ) { + try { + RetryTask retryTask = queue.poll(5, TimeUnit.SECONDS); + if (Objects.isNull(retryTask)) { + continue; + } + //获取通知配置 + List notifyConfigs = accessTemplate.getNotifyConfigAccess().getNotifyConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName(), NotifySceneEnum.RETRY_TASK_REACH_THRESHOLD.getNotifyScene()); + for (NotifyConfig notifyConfig : notifyConfigs) { + //阈值判断 + if (retryTask.getRetryCount() >= notifyConfig.getNotifyThreshold()) { + //限流判断 + if (Objects.equals(notifyConfig.getRateLimiterStatus(), StatusEnum.YES.getStatus())) { + RateLimiter rateLimiter = getRateLimiter(CacheNotifyRateLimiter.getAll(), String.valueOf(notifyConfig.getId()), notifyConfig.getRateLimiterThreshold()); + if (Objects.nonNull(rateLimiter) && !rateLimiter.tryAcquire(100, TimeUnit.MILLISECONDS)) { + LogUtils.warn(log, "组:[{}] 场景:[{}] 通知阈值:[{}] 幂等id:[{}] 重试任务失败数量超过阈值已到达最大限流阈值,本次通知不执行", notifyConfig.getGroupName(), notifyConfig.getSceneName(), notifyConfig.getNotifyThreshold(), retryTask.getIdempotentId()); + continue; + } + } + // 预警 + AlarmContext context = AlarmContext.build().text(retryTaskDeadTextMessagesFormatter, + EnvironmentUtils.getActiveProfile(), + retryTask.getGroupName(), + retryTask.getExecutorName(), + retryTask.getSceneName(), + retryTask.getRetryCount(), + retryTask.getArgsStr(), + DateUtils.format(retryTask.getCreateDt(), DateUtils.NORM_DATETIME_PATTERN)) + .title("组:[{}] 场景:[{}] 环境重试任务失败数量超过阈值", retryTask.getGroupName(), retryTask.getSceneName()) + .notifyAttribute(notifyConfig.getNotifyAttribute()); + Alarm alarmType = easyRetryAlarmFactory.getAlarmType(notifyConfig.getNotifyType()); + alarmType.asyncSendMessage(context); + } + } + } catch (Exception e) { + LogUtils.error(log, "RetryTaskFailMoreThresholdAlarmListener queue poll Exception", e); + } + } + } + + @Override + public void onApplicationEvent(RetryTaskFailMoreThresholdAlarmEvent event) { + try { + queue.put(event.getRetryTask()); + } catch (InterruptedException e) { + LogUtils.error(log, "RetryTaskFailMoreThresholdAlarmListener queue push Exception", e); + } + } +}