feat:2.5.0
https://gitee.com/aizuda/easy-retry/issues/I8GRJU 服务端通知新增重试任务进入死信队列告警 服务端通知新增重试任务失败数量超过阈值告警
This commit is contained in:
parent
9099dc6cfb
commit
b3c7dd9860
@ -2,6 +2,8 @@ package com.aizuda.easy.retry.common.core.alarm;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author: www.byteblogs.com
|
||||
* @date : 2022-05-04 16:13
|
||||
@ -11,4 +13,7 @@ public class DingDingAttribute {
|
||||
|
||||
private String dingDingUrl;
|
||||
|
||||
private List<String> ats;
|
||||
|
||||
private boolean isAtAll;
|
||||
}
|
||||
|
@ -26,7 +26,7 @@ public class DingdingAlarm extends AbstractAlarm<AlarmContext> {
|
||||
|
||||
DingDingAttribute dingDingAttribute = JsonUtil.parseObject(context.getNotifyAttribute(), DingDingAttribute.class);
|
||||
threadPoolExecutor.execute(() ->
|
||||
DingDingUtils.sendMessage(DingDingUtils.buildSendRequest(context.getTitle(), context.getText()), dingDingAttribute.getDingDingUrl()));
|
||||
DingDingUtils.sendMessage(DingDingUtils.buildSendRequest(context.getTitle(), context.getText(),dingDingAttribute.getAts(),dingDingAttribute.isAtAll()), dingDingAttribute.getDingDingUrl()));
|
||||
|
||||
return true;
|
||||
}
|
||||
@ -34,7 +34,7 @@ public class DingdingAlarm extends AbstractAlarm<AlarmContext> {
|
||||
@Override
|
||||
public boolean syncSendMessage(AlarmContext context) {
|
||||
DingDingAttribute dingDingAttribute = JsonUtil.parseObject(context.getNotifyAttribute(), DingDingAttribute.class);
|
||||
return DingDingUtils.sendMessage(DingDingUtils.buildSendRequest(context.getTitle(), context.getText()), dingDingAttribute.getDingDingUrl());
|
||||
return DingDingUtils.sendMessage(DingDingUtils.buildSendRequest(context.getTitle(), context.getText(),dingDingAttribute.getAts(),dingDingAttribute.isAtAll()), dingDingAttribute.getDingDingUrl());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -11,13 +11,18 @@ import lombok.Getter;
|
||||
@Getter
|
||||
public enum NotifySceneEnum {
|
||||
|
||||
MAX_RETRY(1, "重试数量超过阈值", NodeTypeEnum.SERVER),
|
||||
MAX_RETRY(1, "场景重试数量超过阈值", NodeTypeEnum.SERVER),
|
||||
|
||||
MAX_RETRY_ERROR(2, "重试失败数量超过阈值", NodeTypeEnum.SERVER),
|
||||
MAX_RETRY_ERROR(2, "场景重试失败数量超过阈值", NodeTypeEnum.SERVER),
|
||||
|
||||
CLIENT_REPORT_ERROR(3,"客户端上报失败", NodeTypeEnum.CLIENT),
|
||||
|
||||
CLIENT_COMPONENT_ERROR(4,"客户端组件异常", NodeTypeEnum.CLIENT);
|
||||
CLIENT_COMPONENT_ERROR(4,"客户端组件异常", NodeTypeEnum.CLIENT),
|
||||
|
||||
RETRY_TASK_REACH_THRESHOLD(5, "任务重试失败数量超过阈值", NodeTypeEnum.SERVER),
|
||||
|
||||
RETRY_TASK_ENTER_DEAD_LETTER(6, "任务重试失败进入死信队列", NodeTypeEnum.SERVER);
|
||||
|
||||
|
||||
/**
|
||||
* 通知场景
|
||||
|
@ -5,8 +5,12 @@ import com.dingtalk.api.DefaultDingTalkClient;
|
||||
import com.dingtalk.api.DingTalkClient;
|
||||
import com.dingtalk.api.request.OapiRobotSendRequest;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author: www.byteblogs.com
|
||||
* @date : 2021-11-25 09:54
|
||||
@ -36,18 +40,31 @@ public class DingDingUtils {
|
||||
* @param text
|
||||
* @return
|
||||
*/
|
||||
public static OapiRobotSendRequest buildSendRequest(String title, String text) {
|
||||
|
||||
public static OapiRobotSendRequest buildSendRequest(String title, String text, List<String> ats,boolean isAtAll) {
|
||||
OapiRobotSendRequest request = new OapiRobotSendRequest();
|
||||
request.setMsgtype("markdown");
|
||||
OapiRobotSendRequest.Markdown markdown = new OapiRobotSendRequest.Markdown();
|
||||
markdown.setTitle(title);
|
||||
markdown.setText(subTextLength(text));
|
||||
markdown.setText(subTextLength(getAtText(ats,text)));
|
||||
request.setMarkdown(markdown);
|
||||
|
||||
OapiRobotSendRequest.At at = new OapiRobotSendRequest.At();
|
||||
at.setAtMobiles(ats);
|
||||
at.setIsAtAll(isAtAll);
|
||||
request.setAt(at);
|
||||
return request;
|
||||
}
|
||||
|
||||
private static String getAtText(List<String> ats, String text) {
|
||||
if(CollectionUtils.isEmpty(ats)){
|
||||
return text;
|
||||
}
|
||||
for(String at: ats){
|
||||
text = "@" + at;
|
||||
}
|
||||
return text;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param request
|
||||
|
@ -28,7 +28,7 @@ public class NotifyConfig implements Serializable {
|
||||
|
||||
private Integer rateLimiterStatus;
|
||||
|
||||
private Integer rateLimiterThreshold;
|
||||
private int rateLimiterThreshold;
|
||||
|
||||
private String description;
|
||||
|
||||
|
@ -0,0 +1,16 @@
|
||||
package com.aizuda.easy.retry.server.common;
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.util.concurrent.RateLimiter;
|
||||
|
||||
/**
|
||||
* 流量控制
|
||||
* @author: zuoJunLin
|
||||
* @date : 2023-11-21 13:04
|
||||
* @since 2.5.0
|
||||
*/
|
||||
public interface FlowControl {
|
||||
|
||||
RateLimiter getRateLimiter(Cache<String, RateLimiter> rateLimiterCache, String key, double rateLimiterThreshold);
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,25 @@
|
||||
package com.aizuda.easy.retry.server.common.flow.control;
|
||||
import com.aizuda.easy.retry.server.common.FlowControl;
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.util.concurrent.RateLimiter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
|
||||
/**
|
||||
* @author: zuoJunLin
|
||||
* @date : 2023-11-21 13:04
|
||||
* @since 2.5.0
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class AbstractFlowControl implements FlowControl {
|
||||
|
||||
public RateLimiter getRateLimiter(Cache<String, RateLimiter> rateLimiterCache, String key, double rateLimiterThreshold) {
|
||||
RateLimiter rateLimiter = rateLimiterCache.getIfPresent(key);
|
||||
if (Objects.isNull(rateLimiter)||rateLimiter.getRate()!=rateLimiterThreshold) {
|
||||
rateLimiterCache.put(key, RateLimiter.create(rateLimiterThreshold));
|
||||
}
|
||||
return rateLimiterCache.getIfPresent(key);
|
||||
}
|
||||
}
|
@ -1,39 +1,27 @@
|
||||
package com.aizuda.easy.retry.server.retry.task.service.impl;
|
||||
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
|
||||
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
|
||||
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||
import com.aizuda.easy.retry.common.core.util.JsonUtil;
|
||||
import com.aizuda.easy.retry.server.common.enums.DelayLevelEnum;
|
||||
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
|
||||
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
||||
import com.aizuda.easy.retry.server.common.generator.id.IdGenerator;
|
||||
import com.aizuda.easy.retry.server.common.util.DateUtils;
|
||||
import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO;
|
||||
import com.aizuda.easy.retry.server.retry.task.service.RetryDeadLetterConverter;
|
||||
import com.aizuda.easy.retry.server.retry.task.service.RetryService;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskLogConverter;
|
||||
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.event.RetryTaskFailDeadLetterAlarmEvent;
|
||||
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
|
||||
import com.aizuda.easy.retry.template.datasource.access.ConfigAccess;
|
||||
import com.aizuda.easy.retry.template.datasource.access.TaskAccess;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMapper;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.*;
|
||||
import com.aizuda.easy.retry.template.datasource.utils.RequestDataHelper;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
@ -49,6 +37,9 @@ public class RetryServiceImpl implements RetryService {
|
||||
@Autowired
|
||||
private AccessTemplate accessTemplate;
|
||||
|
||||
@Autowired
|
||||
private ApplicationContext context;
|
||||
|
||||
@Transactional
|
||||
@Override
|
||||
public Boolean moveDeadLetterAndDelFinish(String groupName) {
|
||||
@ -123,7 +114,6 @@ public class RetryServiceImpl implements RetryService {
|
||||
.delete(groupName, new LambdaQueryWrapper<RetryTask>()
|
||||
.eq(RetryTask::getGroupName, groupName).in(RetryTask::getId, waitDelRetryFinishSet)),
|
||||
() -> new EasyRetryServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retryTasks)));
|
||||
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
|
||||
@ -152,6 +142,8 @@ public class RetryServiceImpl implements RetryService {
|
||||
Assert.isTrue(retryTasks.size() == retryTaskAccess.delete(groupName, new LambdaQueryWrapper<RetryTask>()
|
||||
.eq(RetryTask::getGroupName, groupName).in(RetryTask::getId, ids)),
|
||||
() -> new EasyRetryServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retryTasks)));
|
||||
|
||||
context.publishEvent(new RetryTaskFailDeadLetterAlarmEvent(retryDeadLetters));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,60 @@
|
||||
package com.aizuda.easy.retry.server.retry.task.support.cache;
|
||||
|
||||
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||
import com.aizuda.easy.retry.server.common.Lifecycle;
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.util.concurrent.RateLimiter;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 缓存通知限流组件
|
||||
*
|
||||
* @author zuoJunLin
|
||||
* @date 2023-11-20
|
||||
* @since 2.5.0
|
||||
*/
|
||||
@Component
|
||||
@Data
|
||||
@Slf4j
|
||||
public class CacheNotifyRateLimiter implements Lifecycle {
|
||||
|
||||
private static Cache<String, RateLimiter> CACHE;
|
||||
|
||||
/**
|
||||
* 获取所有缓存
|
||||
*
|
||||
* @return 缓存对象
|
||||
*/
|
||||
public static Cache<String, RateLimiter> getAll() {
|
||||
return CACHE;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取所有缓存
|
||||
*
|
||||
* @return 缓存对象
|
||||
*/
|
||||
public static RateLimiter getRateLimiterByKey(String key) {
|
||||
return CACHE.getIfPresent(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
LogUtils.info(log, "CacheNotifyRateLimiter start");
|
||||
CACHE = CacheBuilder.newBuilder()
|
||||
// 设置并发级别为cpu核心数
|
||||
.concurrencyLevel(Runtime.getRuntime().availableProcessors())
|
||||
.expireAfterWrite(30, TimeUnit.MINUTES)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
LogUtils.info(log, "CacheNotifyRateLimiter stop");
|
||||
}
|
||||
}
|
@ -11,6 +11,7 @@ import com.aizuda.easy.retry.server.common.config.SystemProperties;
|
||||
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
|
||||
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.event.RetryTaskFailMoreThresholdAlarmEvent;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.handler.CallbackRetryTaskHandler;
|
||||
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
||||
@ -19,6 +20,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.Scope;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.TransactionStatus;
|
||||
@ -38,7 +40,8 @@ import java.time.LocalDateTime;
|
||||
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||||
@Slf4j
|
||||
public class FailureActor extends AbstractActor {
|
||||
|
||||
@Autowired
|
||||
private ApplicationContext context;
|
||||
@Autowired
|
||||
private AccessTemplate accessTemplate;
|
||||
@Autowired
|
||||
@ -84,6 +87,8 @@ public class FailureActor extends AbstractActor {
|
||||
.updateById(retryTask.getGroupName(), retryTask),
|
||||
() -> new EasyRetryServerException("更新重试任务失败. groupName:[{}] uniqueId:[{}]",
|
||||
retryTask.getGroupName(), retryTask.getUniqueId()));
|
||||
|
||||
context.publishEvent(new RetryTaskFailMoreThresholdAlarmEvent(retryTask));
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
|
@ -0,0 +1,25 @@
|
||||
package com.aizuda.easy.retry.server.retry.task.support.event;
|
||||
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter;
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 重试任务失败进入死信队列事件
|
||||
* @author: zuoJunLin
|
||||
* @date : 2023-11-20 21:40
|
||||
* @since 2.5.0
|
||||
*/
|
||||
public class RetryTaskFailDeadLetterAlarmEvent extends ApplicationEvent {
|
||||
private List<RetryDeadLetter> retryDeadLetters;
|
||||
|
||||
public RetryTaskFailDeadLetterAlarmEvent(List<RetryDeadLetter> retryDeadLetters) {
|
||||
super(retryDeadLetters);
|
||||
this.retryDeadLetters=retryDeadLetters;
|
||||
}
|
||||
|
||||
public List<RetryDeadLetter> getRetryDeadLetters() {
|
||||
return retryDeadLetters;
|
||||
}
|
||||
}
|
@ -0,0 +1,22 @@
|
||||
package com.aizuda.easy.retry.server.retry.task.support.event;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
|
||||
/**
|
||||
* 重试任务失败数量超过阈值事件
|
||||
* @author: zuoJunLin
|
||||
* @date : 2023-11-20 21:40
|
||||
* @since 2.5.0
|
||||
*/
|
||||
public class RetryTaskFailMoreThresholdAlarmEvent extends ApplicationEvent {
|
||||
private RetryTask retryTask;
|
||||
|
||||
public RetryTaskFailMoreThresholdAlarmEvent(RetryTask retryTask) {
|
||||
super(retryTask);
|
||||
this.retryTask=retryTask;
|
||||
}
|
||||
|
||||
public RetryTask getRetryTask() {
|
||||
return retryTask;
|
||||
}
|
||||
}
|
@ -0,0 +1,134 @@
|
||||
package com.aizuda.easy.retry.server.retry.task.support.listener;
|
||||
import com.aizuda.easy.retry.common.core.alarm.Alarm;
|
||||
import com.aizuda.easy.retry.common.core.alarm.AlarmContext;
|
||||
import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory;
|
||||
import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum;
|
||||
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
|
||||
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||
import com.aizuda.easy.retry.common.core.util.EnvironmentUtils;
|
||||
import com.aizuda.easy.retry.common.core.util.HostUtils;
|
||||
import com.aizuda.easy.retry.server.common.Lifecycle;
|
||||
import com.aizuda.easy.retry.server.common.flow.control.AbstractFlowControl;
|
||||
import com.aizuda.easy.retry.server.common.util.DateUtils;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.cache.CacheNotifyRateLimiter;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.event.RetryTaskFailDeadLetterAlarmEvent;
|
||||
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.NotifyConfig;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter;
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.util.concurrent.RateLimiter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 重试任务失败进入死信队列监听器
|
||||
* @author: zuoJunLin
|
||||
* @date : 2023-11-20 21:40
|
||||
* @since 2.5.0
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class RetryTaskFailDeadLetterAlarmListener extends AbstractFlowControl implements Runnable, Lifecycle, ApplicationListener<RetryTaskFailDeadLetterAlarmEvent> {
|
||||
|
||||
/**
|
||||
* 死信告警数据
|
||||
*/
|
||||
private LinkedBlockingQueue<List<RetryDeadLetter>> queue = new LinkedBlockingQueue<>(1000);
|
||||
|
||||
private static String retryTaskDeadTextMessagesFormatter =
|
||||
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 重试任务失败进入死信队列</font> \n" +
|
||||
"> 组名称:{} \n" +
|
||||
"> 执行器名称:{} \n" +
|
||||
"> 场景名称:{} \n" +
|
||||
"> 业务数据:{} \n" +
|
||||
"> 时间:{} \n";
|
||||
|
||||
|
||||
@Autowired
|
||||
private EasyRetryAlarmFactory easyRetryAlarmFactory;
|
||||
|
||||
@Autowired
|
||||
protected AccessTemplate accessTemplate;
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
new Thread(this).start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
LogUtils.info(log, "RetryTaskFailDeadLetterAlarmListener time[{}] ip:[{}]", LocalDateTime.now(), HostUtils.getIp());
|
||||
for (; ; ) {
|
||||
try {
|
||||
List<RetryDeadLetter> allRetryDeadLetterList = queue.poll(5, TimeUnit.SECONDS);
|
||||
if (CollectionUtils.isEmpty(allRetryDeadLetterList)) {
|
||||
continue;
|
||||
}
|
||||
//组分组
|
||||
Map<String, List<RetryDeadLetter>> groupNameMap = allRetryDeadLetterList.stream()
|
||||
.collect(Collectors.groupingBy(RetryDeadLetter::getGroupName));
|
||||
groupNameMap.forEach(((groupName, groupRetryDeadLetterList) -> {
|
||||
//场景分组
|
||||
Map<String, List<RetryDeadLetter>> sceneNameMap = groupRetryDeadLetterList.stream()
|
||||
.collect(Collectors.groupingBy(RetryDeadLetter::getSceneName));
|
||||
sceneNameMap.forEach(((sceneName, sceneRetryDeadLetterList) -> {
|
||||
//获取通知配置
|
||||
List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess().getNotifyConfigByGroupNameAndSceneName(groupName, sceneName, NotifySceneEnum.RETRY_TASK_ENTER_DEAD_LETTER.getNotifyScene());
|
||||
for (RetryDeadLetter retryDeadLetter : sceneRetryDeadLetterList) {
|
||||
for (NotifyConfig notifyConfig : notifyConfigs) {
|
||||
if (Objects.equals(notifyConfig.getRateLimiterStatus(),StatusEnum.YES.getStatus())) {
|
||||
//限流
|
||||
RateLimiter rateLimiter = getRateLimiter(CacheNotifyRateLimiter.getAll(), String.valueOf(notifyConfig.getId()), notifyConfig.getRateLimiterThreshold());
|
||||
if (Objects.nonNull(rateLimiter) && !rateLimiter.tryAcquire(100, TimeUnit.MILLISECONDS)) {
|
||||
LogUtils.warn(log, "组:[{}] 场景:[{}] 幂等id:[{}] 任务重试失败进入死信队列已到达最大限流阈值,本次通知不执行", groupName, sceneName, retryDeadLetter.getIdempotentId());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
// 预警
|
||||
AlarmContext context = AlarmContext.build().text(retryTaskDeadTextMessagesFormatter,
|
||||
EnvironmentUtils.getActiveProfile(),
|
||||
retryDeadLetter.getGroupName(),
|
||||
retryDeadLetter.getExecutorName(),
|
||||
retryDeadLetter.getSceneName(),
|
||||
retryDeadLetter.getArgsStr(),
|
||||
DateUtils.format(retryDeadLetter.getCreateDt(), DateUtils.NORM_DATETIME_PATTERN))
|
||||
.title("组:[{}] 场景:[{}] 环境重试任务失败进入死信队列", retryDeadLetter.getGroupName(), retryDeadLetter.getSceneName())
|
||||
.notifyAttribute(notifyConfig.getNotifyAttribute());
|
||||
Alarm<AlarmContext> alarmType = easyRetryAlarmFactory.getAlarmType(notifyConfig.getNotifyType());
|
||||
alarmType.asyncSendMessage(context);
|
||||
}
|
||||
}
|
||||
}));
|
||||
}));
|
||||
} catch (Exception e) {
|
||||
LogUtils.error(log, "RetryTaskFailDeadLetterAlarmListener queue poll Exception", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(RetryTaskFailDeadLetterAlarmEvent event) {
|
||||
try {
|
||||
queue.put(event.getRetryDeadLetters());
|
||||
} catch (InterruptedException e) {
|
||||
LogUtils.error(log, "RetryTaskFailDeadLetterAlarmListener queue push Exception", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -0,0 +1,121 @@
|
||||
package com.aizuda.easy.retry.server.retry.task.support.listener;
|
||||
import com.aizuda.easy.retry.common.core.alarm.Alarm;
|
||||
import com.aizuda.easy.retry.common.core.alarm.AlarmContext;
|
||||
import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory;
|
||||
import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum;
|
||||
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
|
||||
import com.aizuda.easy.retry.common.core.log.LogUtils;
|
||||
import com.aizuda.easy.retry.common.core.util.EnvironmentUtils;
|
||||
import com.aizuda.easy.retry.common.core.util.HostUtils;
|
||||
import com.aizuda.easy.retry.server.common.Lifecycle;
|
||||
import com.aizuda.easy.retry.server.common.flow.control.AbstractFlowControl;
|
||||
import com.aizuda.easy.retry.server.common.util.DateUtils;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.cache.CacheNotifyRateLimiter;
|
||||
import com.aizuda.easy.retry.server.retry.task.support.event.RetryTaskFailMoreThresholdAlarmEvent;
|
||||
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.NotifyConfig;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
|
||||
import com.google.common.util.concurrent.RateLimiter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 重试任务失败数量超过阈值监听器
|
||||
* @author: zuoJunLin
|
||||
* @date : 2023-11-20 21:40
|
||||
* @since 2.5.0
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class RetryTaskFailMoreThresholdAlarmListener extends AbstractFlowControl implements Runnable, Lifecycle, ApplicationListener<RetryTaskFailMoreThresholdAlarmEvent> {
|
||||
|
||||
/**
|
||||
* 重试任务失败数量超过阈值告警数据
|
||||
*/
|
||||
private LinkedBlockingQueue<RetryTask> queue = new LinkedBlockingQueue<>(1000);
|
||||
|
||||
private static String retryTaskDeadTextMessagesFormatter =
|
||||
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 重试任务失败数量超过阈值</font> \n" +
|
||||
"> 组名称:{} \n" +
|
||||
"> 执行器名称:{} \n" +
|
||||
"> 场景名称:{} \n" +
|
||||
"> 重试次数:{} \n" +
|
||||
"> 业务数据:{} \n" +
|
||||
"> 时间:{} \n";
|
||||
|
||||
|
||||
@Autowired
|
||||
private EasyRetryAlarmFactory easyRetryAlarmFactory;
|
||||
|
||||
@Autowired
|
||||
protected AccessTemplate accessTemplate;
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
new Thread(this).start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
LogUtils.info(log, "RetryTaskFailMoreThresholdAlarmListener time[{}] ip:[{}]", LocalDateTime.now(), HostUtils.getIp());
|
||||
for (; ; ) {
|
||||
try {
|
||||
RetryTask retryTask = queue.poll(5, TimeUnit.SECONDS);
|
||||
if (Objects.isNull(retryTask)) {
|
||||
continue;
|
||||
}
|
||||
//获取通知配置
|
||||
List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess().getNotifyConfigByGroupNameAndSceneName(retryTask.getGroupName(), retryTask.getSceneName(), NotifySceneEnum.RETRY_TASK_REACH_THRESHOLD.getNotifyScene());
|
||||
for (NotifyConfig notifyConfig : notifyConfigs) {
|
||||
//阈值判断
|
||||
if (retryTask.getRetryCount() >= notifyConfig.getNotifyThreshold()) {
|
||||
//限流判断
|
||||
if (Objects.equals(notifyConfig.getRateLimiterStatus(), StatusEnum.YES.getStatus())) {
|
||||
RateLimiter rateLimiter = getRateLimiter(CacheNotifyRateLimiter.getAll(), String.valueOf(notifyConfig.getId()), notifyConfig.getRateLimiterThreshold());
|
||||
if (Objects.nonNull(rateLimiter) && !rateLimiter.tryAcquire(100, TimeUnit.MILLISECONDS)) {
|
||||
LogUtils.warn(log, "组:[{}] 场景:[{}] 通知阈值:[{}] 幂等id:[{}] 重试任务失败数量超过阈值已到达最大限流阈值,本次通知不执行", notifyConfig.getGroupName(), notifyConfig.getSceneName(), notifyConfig.getNotifyThreshold(), retryTask.getIdempotentId());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
// 预警
|
||||
AlarmContext context = AlarmContext.build().text(retryTaskDeadTextMessagesFormatter,
|
||||
EnvironmentUtils.getActiveProfile(),
|
||||
retryTask.getGroupName(),
|
||||
retryTask.getExecutorName(),
|
||||
retryTask.getSceneName(),
|
||||
retryTask.getRetryCount(),
|
||||
retryTask.getArgsStr(),
|
||||
DateUtils.format(retryTask.getCreateDt(), DateUtils.NORM_DATETIME_PATTERN))
|
||||
.title("组:[{}] 场景:[{}] 环境重试任务失败数量超过阈值", retryTask.getGroupName(), retryTask.getSceneName())
|
||||
.notifyAttribute(notifyConfig.getNotifyAttribute());
|
||||
Alarm<AlarmContext> alarmType = easyRetryAlarmFactory.getAlarmType(notifyConfig.getNotifyType());
|
||||
alarmType.asyncSendMessage(context);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LogUtils.error(log, "RetryTaskFailMoreThresholdAlarmListener queue poll Exception", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(RetryTaskFailMoreThresholdAlarmEvent event) {
|
||||
try {
|
||||
queue.put(event.getRetryTask());
|
||||
} catch (InterruptedException e) {
|
||||
LogUtils.error(log, "RetryTaskFailMoreThresholdAlarmListener queue push Exception", e);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user