refactor:2.5.0

1. 优化告警通知模块
This commit is contained in:
byteblogs168 2023-12-03 11:54:21 +08:00
parent 1da6a3ffc2
commit 789ebb2a87
16 changed files with 739 additions and 422 deletions

View File

@ -375,3 +375,66 @@ CREATE TABLE `job_task_batch`
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT ='任务批次';
CREATE TABLE `job_notify_config`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` varchar(64) NOT NULL COMMENT '组名称',
`job_id` bigint(20) NOT NULL COMMENT '任务id',
`notify_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '通知状态 0、未启用 1、启用',
`notify_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '通知类型 1、钉钉 2、邮件 3、企业微信',
`notify_attribute` varchar(512) NOT NULL COMMENT '配置属性',
`notify_threshold` int(11) NOT NULL DEFAULT '0' COMMENT '通知阈值',
`notify_scene` tinyint(4) NOT NULL DEFAULT '0' COMMENT '通知场景',
`rate_limiter_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '限流状态 0、未启用 1、启用',
`rate_limiter_threshold` int(11) NOT NULL DEFAULT '0' COMMENT '每秒限流阈值',
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
KEY `idx_namespace_id_group_name` (`namespace_id`,`group_name`)
) ENGINE=InnoDB
AUTO_INCREMENT=4
DEFAULT CHARSET=utf8mb4 COMMENT='job通知配置';
CREATE TABLE `job_summary`
(
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` VARCHAR(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` VARCHAR(64) NOT NULL DEFAULT '' COMMENT '组名称',
`job_id` bigint NOT NULL COMMENT '任务信息id',
`trigger_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '统计时间',
`success_num` int NOT NULL DEFAULT '0' COMMENT '执行成功-日志数量',
`fail_num` int NOT NULL DEFAULT '0' COMMENT '执行失败-日志数量',
`fail_reason` varchar(512) NOT NULL DEFAULT '' COMMENT '失败原因',
`stop_num` int NOT NULL DEFAULT '0' COMMENT '执行失败-日志数量',
`stop_reason` varchar(512) NOT NULL DEFAULT '' COMMENT '失败原因',
`cancel_num` int NOT NULL DEFAULT '0' COMMENT '执行失败-日志数量',
`cancel_reason` varchar(512) NOT NULL DEFAULT '' COMMENT '失败原因',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_job_id_trigger_at` (`job_id`, `trigger_at`) USING BTREE
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4 COMMENT ='DashBoard_Job';
CREATE TABLE `retry_summary`
(
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`namespace_id` VARCHAR(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` VARCHAR(64) NOT NULL DEFAULT '' COMMENT '组名称',
`scene_name` VARCHAR(50) NOT NULL DEFAULT '' COMMENT '场景名称',
`trigger_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '统计时间',
`running_num` int NOT NULL DEFAULT '0' COMMENT '重试中-日志数量',
`finish_num` int NOT NULL DEFAULT '0' COMMENT '重试完成-日志数量',
`max_count_num` int NOT NULL DEFAULT '0' COMMENT '重试到达最大次数-日志数量',
`suspend_num` int NOT NULL DEFAULT '0' COMMENT '暂停重试-日志数量',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_scene_name_trigger_at` (`scene_name`, `trigger_at`) USING BTREE
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4 COMMENT ='DashBoard_Retry';

View File

@ -497,3 +497,37 @@ COMMENT ON COLUMN "job_task_batch"."create_dt" IS '创建时间';
COMMENT ON COLUMN "job_task_batch"."update_dt" IS '创建时间';
COMMENT ON COLUMN "job_task_batch"."ext_attrs" IS '扩展字段';
COMMENT ON TABLE "job_task" IS '任务批次';
CREATE TABLE job_notify_config
(
id BIGSERIAL PRIMARY KEY,
group_name VARCHAR(64) NOT NULL,
job_id BIGINT NOT NULL,
notify_status SMALLINT NOT NULL DEFAULT 0,
notify_type SMALLINT NOT NULL DEFAULT 0,
notify_attribute VARCHAR(512) NOT NULL,
notify_threshold INT NOT NULL DEFAULT 0,
notify_scene SMALLINT NOT NULL DEFAULT 0,
rate_limiter_status SMALLINT NOT NULL DEFAULT 0,
rate_limiter_threshold INT NOT NULL DEFAULT 0,
description VARCHAR(256) NOT NULL DEFAULT '',
create_dt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_dt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_group_name ON job_notify_config (group_name);
COMMENT ON COLUMN "job_notify_config"."id" IS '主键';
COMMENT ON COLUMN "job_notify_config"."group_name" IS '组名称';
COMMENT ON COLUMN "job_notify_config"."job_id" IS '任务信息id';
COMMENT ON COLUMN "job_notify_config"."notify_status" IS '通知状态 0、未启用 1、启用';
COMMENT ON COLUMN "job_notify_config"."notify_type" IS '通知类型 1、钉钉 2、邮件 3、企业微信';
COMMENT ON COLUMN "job_notify_config"."notify_attribute" IS '配置属性';
COMMENT ON COLUMN "job_notify_config"."notify_threshold" IS '通知阈值';
COMMENT ON COLUMN "job_notify_config"."notify_scene" IS '通知场景';
COMMENT ON COLUMN "job_notify_config"."rate_limiter_status" IS '限流状态 0、未启用 1、启用';
COMMENT ON COLUMN "job_notify_config"."rate_limiter_threshold" IS '每秒限流阈值';
COMMENT ON COLUMN "job_notify_config"."description" IS '描述';
COMMENT ON COLUMN "job_notify_config"."create_dt" IS '创建时间';
COMMENT ON COLUMN "job_notify_config"."update_dt" IS '修改时间';
COMMENT ON TABLE "job_notify_config" IS '通知配置';

View File

@ -25,7 +25,7 @@ public interface JobTaskBatchMapper extends BaseMapper<JobTaskBatch> {
List<JobBatchResponseDO> selectJobBatchPageList(IPage<JobTaskBatch> iPage, @Param("queryDO") JobBatchQueryDO queryDO);
List<JobBatchResponseDO> selectJobBatchListByIds( @Param("ids") List<Long> ids);
List<JobBatchResponseDO> selectJobBatchListByIds(@Param("ids") List<Long> ids);
List<JobBatchSummaryResponseDO> summaryJobBatchList(@Param("from") LocalDateTime todayFrom, @Param("to") LocalDateTime to);
}

View File

@ -0,0 +1,37 @@
package com.aizuda.easy.retry.server.common;
import com.aizuda.easy.retry.server.common.alarm.AbstractRetryAlarm;
import com.aizuda.easy.retry.server.common.dto.JobAlarmInfo;
import com.aizuda.easy.retry.server.common.dto.NotifyConfigInfo;
import com.aizuda.easy.retry.server.common.dto.RetryAlarmInfo;
import com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchResponseDO;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobNotifyConfig;
import com.aizuda.easy.retry.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
import java.util.List;
/**
* @author xiaowoniu
* @date 2023-12-03 10:58:53
* @since 2.5.0
*/
@Mapper
public interface AlarmInfoConverter {
AlarmInfoConverter INSTANCE = Mappers.getMapper(AlarmInfoConverter.class);
List<RetryAlarmInfo> retryTaskToAlarmInfo(List<RetryTask> retryTasks);
List<RetryAlarmInfo> deadLetterToAlarmInfo(List<RetryDeadLetter> retryDeadLetters);
List<NotifyConfigInfo> retryToNotifyConfigInfos(List<NotifyConfig> notifyConfigs);
List<NotifyConfigInfo> jobToNotifyConfigInfos(List<JobNotifyConfig> notifyConfigs);
List<JobAlarmInfo> toJobAlarmInfos(List<JobBatchResponseDO> jobBatchResponse);
}

View File

@ -0,0 +1,132 @@
package com.aizuda.easy.retry.server.common.alarm;
import com.aizuda.easy.retry.common.core.alarm.Alarm;
import com.aizuda.easy.retry.common.core.alarm.AlarmContext;
import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.dto.AlarmInfo;
import com.aizuda.easy.retry.server.common.dto.NotifyConfigInfo;
import com.aizuda.easy.retry.server.common.triple.Triple;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.util.CollectionUtils;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* @author xiaowoniu
* @date 2023-12-03 09:47:11
* @since 2.5.0
*/
@Slf4j
public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmInfo, T> extends AbstractFlowControl<E> implements ApplicationListener<E>, Runnable, Lifecycle {
@Autowired
private EasyRetryAlarmFactory easyRetryAlarmFactory;
@Autowired
protected AccessTemplate accessTemplate;
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
// 从队列获取数据
List<A> alarmInfos = poll();
if (CollectionUtils.isEmpty(alarmInfos)) {
continue;
}
// 获取所有的命名空间
Set<String> namespaceIds = new HashSet<>();
// 获取所有的组名称
Set<String> groupNames = new HashSet<>();
// 获取所有的场景名称
Set<T> sceneNames = new HashSet<>();
// 转换AlarmDTO 为了下面循环发送使用
Map<Triple<String, String, T>, List<A>> waitSendAlarmInfos = convertAlarmDTO(
alarmInfos, namespaceIds, groupNames, sceneNames);
// 批量获取通知配置
Map<Triple<String, String, T>, List<NotifyConfigInfo>> notifyConfig = obtainNotifyConfig(namespaceIds, groupNames, sceneNames);
// 循环发送消息
waitSendAlarmInfos.forEach((key, list) -> {
List<NotifyConfigInfo> notifyConfigsList = notifyConfig.get(key);
for (A alarmDTO : list) {
sendAlarm(notifyConfigsList, alarmDTO);
}
});
} catch (InterruptedException e) {
LogUtils.info(log, "retry task fail dead letter alarm stop");
Thread.currentThread().interrupt();
} catch (Exception e) {
LogUtils.error(log, "RetryTaskFailDeadLetterAlarmListener queue poll Exception", e);
}
}
}
protected abstract Map<Triple<String, String, T>, List<A>> convertAlarmDTO(List<A> alarmData, Set<String> namespaceIds, Set<String> groupNames, Set<T> sceneNames);
protected abstract Map<Triple<String/*命名空间*/, String/*组名称*/, T/*场景名称ORJobId*/>,
List<NotifyConfigInfo>> obtainNotifyConfig(Set<String> namespaceIds,
Set<String> groupNames,
Set<T> sceneNames);
protected abstract List<A> poll() throws InterruptedException;
protected abstract AlarmContext buildAlarmContext(A alarmDTO, NotifyConfigInfo notifyConfig);
private Thread thread;
@Override
public void start() {
thread = new Thread(this);
thread.start();
startLog();
}
protected abstract void startLog();
@Override
public void close() {
if (Objects.nonNull(thread)) {
thread.interrupt();
}
}
protected void sendAlarm(List<NotifyConfigInfo> notifyConfigsList, A alarmDTO) {
for (final NotifyConfigInfo notifyConfig : notifyConfigsList) {
if (Objects.equals(notifyConfig.getRateLimiterStatus(), StatusEnum.YES.getStatus())) {
// 限流
RateLimiter rateLimiter = getRateLimiter(rateLimiterKey(notifyConfig),
notifyConfig.getRateLimiterThreshold());
// 每秒发送rateLimiterThreshold个告警
if (Objects.nonNull(rateLimiter) && !RateLimiter.create(notifyConfig.getRateLimiterThreshold())
.tryAcquire(1, TimeUnit.SECONDS)) {
continue;
}
}
AlarmContext context = buildAlarmContext(alarmDTO, notifyConfig);
Alarm<AlarmContext> alarmType = easyRetryAlarmFactory.getAlarmType(
notifyConfig.getNotifyType());
alarmType.asyncSendMessage(context);
}
}
protected abstract String rateLimiterKey(NotifyConfigInfo notifyConfig);
protected abstract int getNotifyScene();
}

View File

@ -0,0 +1,55 @@
package com.aizuda.easy.retry.server.common.alarm;
import com.aizuda.easy.retry.common.core.alarm.Alarm;
import com.aizuda.easy.retry.common.core.alarm.AlarmContext;
import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory;
import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.cache.CacheNotifyRateLimiter;
import com.aizuda.easy.retry.server.common.dto.AlarmInfo;
import com.aizuda.easy.retry.server.common.dto.NotifyConfigInfo;
import com.aizuda.easy.retry.server.common.dto.RetryAlarmInfo;
import com.aizuda.easy.retry.server.common.triple.ImmutableTriple;
import com.aizuda.easy.retry.server.common.triple.Triple;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.util.concurrent.RateLimiter;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @author: zuoJunLin
* @date : 2023-11-21 13:04
* @since 2.5.0
*/
@Slf4j
public abstract class AbstractFlowControl<E extends ApplicationEvent> implements ApplicationListener<E> {
protected RateLimiter getRateLimiter(String key, double rateLimiterThreshold) {
RateLimiter rateLimiter = CacheNotifyRateLimiter.getRateLimiterByKey(key);
if (Objects.isNull(rateLimiter) || rateLimiter.getRate() != rateLimiterThreshold) {
CacheNotifyRateLimiter.put(key, RateLimiter.create(rateLimiterThreshold));
}
return rateLimiter;
}
}

View File

@ -0,0 +1,82 @@
package com.aizuda.easy.retry.server.common.alarm;
import com.aizuda.easy.retry.common.core.enums.JobNotifySceneEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.server.common.AlarmInfoConverter;
import com.aizuda.easy.retry.server.common.dto.JobAlarmInfo;
import com.aizuda.easy.retry.server.common.dto.NotifyConfigInfo;
import com.aizuda.easy.retry.server.common.enums.SystemModeEnum;
import com.aizuda.easy.retry.server.common.triple.ImmutableTriple;
import com.aizuda.easy.retry.server.common.triple.Triple;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobNotifyConfigMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobNotifyConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.util.CollectionUtils;
import java.text.MessageFormat;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @author xiaowoniu
* @date 2023-12-03 10:19:19
* @since 2.5.0
*/
public abstract class AbstractJobAlarm<E extends ApplicationEvent> extends AbstractAlarm<E, JobAlarmInfo, Long> {
@Autowired
private JobNotifyConfigMapper jobNotifyConfigMapper;
@Override
protected Map<Triple<String, String, Long>, List<JobAlarmInfo>> convertAlarmDTO(List<JobAlarmInfo> alarmInfos, Set<String> namespaceIds, Set<String> groupNames, Set<Long> jobIds) {
return alarmInfos.stream().collect(Collectors.groupingBy(i -> {
String namespaceId = i.getNamespaceId();
String groupName = i.getGroupName();
Long jobId = i.getJobId();
namespaceIds.add(namespaceId);
groupNames.add(groupName);
jobIds.add(jobId);
return ImmutableTriple.of(namespaceId, groupName, jobId);
}));
}
@Override
protected Map<Triple<String, String, Long>, List<NotifyConfigInfo>> obtainNotifyConfig(Set<String> namespaceIds, Set<String> groupNames, Set<Long> jobIds) {
// 批量获取所需的通知配置
List<JobNotifyConfig> jobNotifyConfigs = jobNotifyConfigMapper.selectList(
new LambdaQueryWrapper<JobNotifyConfig>()
.eq(JobNotifyConfig::getNotifyStatus, StatusEnum.YES)
.eq(JobNotifyConfig::getNotifyScene, getNotifyScene())
.in(JobNotifyConfig::getNamespaceId, namespaceIds)
.in(JobNotifyConfig::getGroupName, groupNames)
.in(JobNotifyConfig::getJobId, jobIds)
);
if (CollectionUtils.isEmpty(jobNotifyConfigs)) {
return null;
}
List<NotifyConfigInfo> notifyConfigInfos = AlarmInfoConverter.INSTANCE.jobToNotifyConfigInfos(jobNotifyConfigs);
return notifyConfigInfos.stream()
.collect(Collectors.groupingBy(i -> {
String namespaceId = i.getNamespaceId();
String groupName = i.getGroupName();
Long jobId = i.getJobId();
return ImmutableTriple.of(namespaceId, groupName, jobId);
}));
}
@Override
protected String rateLimiterKey(NotifyConfigInfo notifyConfig) {
return MessageFormat.format("{}_{}", SystemModeEnum.JOB.name(), notifyConfig.getId());
}
}

View File

@ -0,0 +1,86 @@
package com.aizuda.easy.retry.server.common.alarm;
import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.server.common.AlarmInfoConverter;
import com.aizuda.easy.retry.server.common.dto.NotifyConfigInfo;
import com.aizuda.easy.retry.server.common.dto.RetryAlarmInfo;
import com.aizuda.easy.retry.server.common.enums.SystemModeEnum;
import com.aizuda.easy.retry.server.common.triple.ImmutableTriple;
import com.aizuda.easy.retry.server.common.triple.Triple;
import com.aizuda.easy.retry.template.datasource.persistence.po.NotifyConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Maps;
import org.springframework.context.ApplicationEvent;
import org.springframework.util.CollectionUtils;
import java.text.MessageFormat;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @author xiaowoniu
* @date 2023-12-03 10:19:19
* @since 2.5.0
*/
public abstract class AbstractRetryAlarm<E extends ApplicationEvent> extends AbstractAlarm<E, RetryAlarmInfo, String> {
@Override
protected Map<Triple<String, String, String>, List<RetryAlarmInfo>> convertAlarmDTO(
List<RetryAlarmInfo> alarmDataList,
Set<String> namespaceIds,
Set<String> groupNames,
Set<String> sceneNames) {
return alarmDataList.stream()
.collect(Collectors.groupingBy(i -> {
String namespaceId = i.getNamespaceId();
String groupName = i.getGroupName();
String sceneName = i.getSceneName();
namespaceIds.add(namespaceId);
groupNames.add(groupName);
sceneNames.add(sceneName);
return ImmutableTriple.of(namespaceId, groupName, sceneName);
}));
}
@Override
protected Map<Triple<String, String, String>, List<NotifyConfigInfo>> obtainNotifyConfig(Set<String> namespaceIds, Set<String> groupNames, Set<String> sceneNames) {
// 批量获取所需的通知配置
List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess().list(
new LambdaQueryWrapper<NotifyConfig>()
.eq(NotifyConfig::getNotifyStatus, StatusEnum.YES)
.eq(NotifyConfig::getNotifyScene, getNotifyScene())
.in(NotifyConfig::getNamespaceId, namespaceIds)
.in(NotifyConfig::getGroupName, groupNames)
.in(NotifyConfig::getSceneName, sceneNames)
);
if (CollectionUtils.isEmpty(notifyConfigs)) {
return Maps.newHashMap();
}
List<NotifyConfigInfo> notifyConfigInfos = AlarmInfoConverter.INSTANCE.retryToNotifyConfigInfos(notifyConfigs);
return notifyConfigInfos.stream()
.collect(Collectors.groupingBy(config -> {
String namespaceId = config.getNamespaceId();
String groupName = config.getGroupName();
String sceneName = config.getSceneName();
return ImmutableTriple.of(namespaceId, groupName, sceneName);
}));
}
@Override
protected String rateLimiterKey(NotifyConfigInfo notifyConfig) {
return MessageFormat.format("{}_{}", SystemModeEnum.RETRY.name(), notifyConfig.getId());
}
}

View File

@ -0,0 +1,13 @@
package com.aizuda.easy.retry.server.common.dto;
/**
* @author xiaowoniu
* @date 2023-12-03 10:15:37
* @since 2.5.0
*/
public class AlarmInfo {
private String namespaceId;
private String groupName;
}

View File

@ -0,0 +1,44 @@
package com.aizuda.easy.retry.server.common.dto;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* @author xiaowoniu
* @date 2023-12-03 11:05:19
* @since 2.5.0
*/
@EqualsAndHashCode(callSuper = true)
@Data
public class JobAlarmInfo extends AlarmInfo {
private Long id;
/**
* 命名空间
*/
private String namespaceId;
/**
* 组名称
*/
private String groupName;
/**
* 名称
*/
private String jobName;
/**
* 任务信息id
*/
private Long jobId;
/**
* 执行器名称
*/
private String executorInfo;
private String argsStr;
}

View File

@ -0,0 +1,39 @@
package com.aizuda.easy.retry.server.common.dto;
import lombok.Data;
/**
* @author xiaowoniu
* @date 2023-12-03 10:02:43
* @since 2.5.0
*/
@Data
public class NotifyConfigInfo {
private Long id;
private String namespaceId;
private String groupName;
// job告警时使用
private Long jobId;
// retry告警时使用
private String sceneName;
private Integer notifyStatus;
private Integer notifyType;
private String notifyAttribute;
private Integer notifyThreshold;
private Integer notifyScene;
private Integer rateLimiterStatus;
private Integer rateLimiterThreshold;
}

View File

@ -0,0 +1,36 @@
package com.aizuda.easy.retry.server.common.dto;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.time.LocalDateTime;
/**
* @author xiaowoniu
* @date 2023-12-03 10:15:37
* @since 2.5.0
*/
@EqualsAndHashCode(callSuper = true)
@Data
public class RetryAlarmInfo extends AlarmInfo {
private String namespaceId;
private String uniqueId;
private String groupName;
private String sceneName;
private String idempotentId;
private String bizNo;
private String executorName;
private String argsStr;
private Integer retryCount;
private LocalDateTime createDt;
}

View File

@ -1,147 +0,0 @@
package com.aizuda.easy.retry.server.common.flow.control;
import com.aizuda.easy.retry.common.core.alarm.Alarm;
import com.aizuda.easy.retry.common.core.alarm.AlarmContext;
import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory;
import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.cache.CacheNotifyRateLimiter;
import com.aizuda.easy.retry.server.common.triple.ImmutableTriple;
import com.aizuda.easy.retry.server.common.triple.Triple;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.util.concurrent.RateLimiter;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @author: zuoJunLin
* @date : 2023-11-21 13:04
* @since 2.5.0
*/
@Slf4j
public abstract class AbstractFlowControl<E extends ApplicationEvent> implements ApplicationListener<E> {
@Autowired
private EasyRetryAlarmFactory easyRetryAlarmFactory;
@Autowired
protected AccessTemplate accessTemplate;
protected RateLimiter getRateLimiter(String key, double rateLimiterThreshold) {
RateLimiter rateLimiter = CacheNotifyRateLimiter.getRateLimiterByKey(key);
if (Objects.isNull(rateLimiter) || rateLimiter.getRate() != rateLimiterThreshold) {
CacheNotifyRateLimiter.put(key, RateLimiter.create(rateLimiterThreshold));
}
return rateLimiter;
}
protected Map<Triple<String, String, String>, List<NotifyConfig>> getNotifyConfigMap(final Set<String> namespaceIds,
final Set<String> groupNames, final Set<String> sceneNames) {
// 批量获取所需的通知配置
List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess().list(
new LambdaQueryWrapper<NotifyConfig>()
.eq(NotifyConfig::getNotifyStatus, StatusEnum.YES)
.eq(NotifyConfig::getNotifyScene, NotifySceneEnum.RETRY_TASK_ENTER_DEAD_LETTER.getNotifyScene())
.in(NotifyConfig::getNamespaceId, namespaceIds)
.in(NotifyConfig::getGroupName, groupNames)
.in(NotifyConfig::getSceneName, sceneNames)
);
if (CollectionUtils.isEmpty(notifyConfigs)) {
return null;
}
return notifyConfigs.stream()
.collect(Collectors.groupingBy(i -> {
String namespaceId = i.getNamespaceId();
String groupName = i.getGroupName();
String sceneName = i.getSceneName();
return ImmutableTriple.of(namespaceId, groupName, sceneName);
}));
}
protected void doSendAlarm(Triple<String, String, String> key,
List<NotifyConfig> notifyConfigsList,
AlarmDTO alarmDTO
) {
for (final NotifyConfig notifyConfig : notifyConfigsList) {
if (Objects.equals(notifyConfig.getRateLimiterStatus(), StatusEnum.YES.getStatus())) {
// 限流
RateLimiter rateLimiter = getRateLimiter(String.valueOf(notifyConfig.getId()),
notifyConfig.getRateLimiterThreshold());
// 每秒发送rateLimiterThreshold个告警
if (Objects.nonNull(rateLimiter) && !rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) {
LogUtils.warn(log,
"namespaceId:[{}] groupName:[{}] senceName:[{}] idempotentId:[{}] 任务重试失败进入死信队列已到达最大限流阈值,本次通知不执行",
key.getLeft(), key.getMiddle(), key.getRight(),
alarmDTO.getIdempotentId());
continue;
}
}
AlarmContext context = buildAlarmContext(alarmDTO, notifyConfig);
Alarm<AlarmContext> alarmType = easyRetryAlarmFactory.getAlarmType(
notifyConfig.getNotifyType());
alarmType.asyncSendMessage(context);
}
}
protected abstract AlarmContext buildAlarmContext(final AlarmDTO alarmDTO, NotifyConfig notifyConfig);
@Data
public static class AlarmDTO {
private String uniqueId;
private String groupName;
private String sceneName;
private String idempotentId;
private String bizNo;
private String executorName;
private String argsStr;
private Integer retryCount;
private LocalDateTime createDt;
}
@Mapper
public interface AlarmDTOConverter {
AlarmDTOConverter INSTANCE = Mappers.getMapper(AlarmDTOConverter.class);
AlarmDTO toAlarmDTO(RetryDeadLetter retryDeadLetter);
AlarmDTO toAlarmDTO(RetryTask retryTask);
}
}

View File

@ -8,7 +8,11 @@ import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.EnvironmentUtils;
import com.aizuda.easy.retry.common.core.util.HostUtils;
import com.aizuda.easy.retry.server.common.AlarmInfoConverter;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.alarm.AbstractJobAlarm;
import com.aizuda.easy.retry.server.common.dto.JobAlarmInfo;
import com.aizuda.easy.retry.server.common.dto.NotifyConfigInfo;
import com.aizuda.easy.retry.server.common.triple.ImmutableTriple;
import com.aizuda.easy.retry.server.common.triple.Triple;
import com.aizuda.easy.retry.server.common.util.DateUtils;
@ -40,11 +44,7 @@ import java.util.stream.Collectors;
*/
@Component
@Slf4j
public class JobTaskFailAlarmListener implements ApplicationListener<JobTaskFailAlarmEvent>, Runnable, Lifecycle {
@Autowired
private JobNotifyConfigMapper jobNotifyConfigMapper;
public class JobTaskFailAlarmListener extends AbstractJobAlarm<JobTaskFailAlarmEvent> {
@Autowired
private JobTaskBatchMapper jobTaskBatchMapper;
@ -65,107 +65,45 @@ public class JobTaskFailAlarmListener implements ApplicationListener<JobTaskFail
"> 方法参数:{} \n" +
"> 时间:{} \n";
private Thread thread;
@Override
public void start() {
thread = new Thread(this);
thread.start();
protected List<JobAlarmInfo> poll() throws InterruptedException {
// 无数据时阻塞线程
Long jobTaskBatchId = queue.take();
// 拉取200条
List<Long> jobTaskBatchIds = Lists.newArrayList(jobTaskBatchId);
queue.drainTo(jobTaskBatchIds, 200);
List<JobBatchResponseDO> jobTaskBatchList = jobTaskBatchMapper.selectJobBatchListByIds(jobTaskBatchIds);
return AlarmInfoConverter.INSTANCE.toJobAlarmInfos(jobTaskBatchList);
}
@Override
public void close() {
if (Objects.nonNull(thread)) {
thread.interrupt();
}
protected AlarmContext buildAlarmContext(JobAlarmInfo alarmDTO, NotifyConfigInfo notifyConfig) {
// 预警
AlarmContext context = AlarmContext.build()
.text(jobTaskFailTextMessagesFormatter,
EnvironmentUtils.getActiveProfile(),
alarmDTO.getGroupName(),
alarmDTO.getJobName(),
alarmDTO.getExecutorInfo(),
alarmDTO.getArgsStr(),
DateUtils.toNowFormat(DateUtils.NORM_DATETIME_PATTERN))
.title("{}环境 JOB任务失败", EnvironmentUtils.getActiveProfile())
.notifyAttribute(notifyConfig.getNotifyAttribute());
Alarm<AlarmContext> alarmType = easyRetryAlarmFactory.getAlarmType(notifyConfig.getNotifyType());
alarmType.asyncSendMessage(context);
return context;
}
@Override
public void run() {
LogUtils.info(log, "JobTaskFailAlarmListener time[{}] ip:[{}]", LocalDateTime.now(), HostUtils.getIp());
while (!Thread.currentThread().isInterrupted()) {
try {
// 无数据时阻塞线程
Long jobTaskBatchId = queue.take();
// 拉取200条
List<Long> jobTaskBatchIds = Lists.newArrayList(jobTaskBatchId);
queue.drainTo(jobTaskBatchIds, 200);
List<JobBatchResponseDO> jobTaskBatchList = jobTaskBatchMapper.selectJobBatchListByIds(jobTaskBatchIds);
Set<String> namespaceIds = new HashSet<>();
Set<String> groupNames = new HashSet<>();
Set<Long> jobIds = new HashSet<>();
Map<Triple<String, String, Long>, List<JobBatchResponseDO>> jobTaskBatchMap = getJobTaskBatchMap(
jobTaskBatchList, namespaceIds, groupNames, jobIds);
Map<Triple<String, String, Long>, List<JobNotifyConfig>> jobNotifyConfigMap = getJobNotifyConfigMap(
namespaceIds, groupNames, jobIds);
if (jobNotifyConfigMap == null) {
continue;
}
// 循环发送消息
jobTaskBatchMap.forEach((key, list) -> {
List<JobNotifyConfig> jobNotifyConfigsList = jobNotifyConfigMap.get(key);
for (JobBatchResponseDO JobBatch : list) {
for (final JobNotifyConfig jobNotifyConfig : jobNotifyConfigsList) {
// 预警
AlarmContext context = AlarmContext.build()
.text(jobTaskFailTextMessagesFormatter,
EnvironmentUtils.getActiveProfile(),
JobBatch.getGroupName(),
JobBatch.getJobName(),
JobBatch.getExecutorInfo(),
JobBatch.getArgsStr(),
DateUtils.toNowFormat(DateUtils.NORM_DATETIME_PATTERN))
.title("{}环境 JOB任务失败", EnvironmentUtils.getActiveProfile())
.notifyAttribute(jobNotifyConfig.getNotifyAttribute());
Alarm<AlarmContext> alarmType = easyRetryAlarmFactory.getAlarmType(jobNotifyConfig.getNotifyType());
alarmType.asyncSendMessage(context);
}
}
});
} catch (InterruptedException e) {
LogUtils.info(log, "job task fail more alarm stop");
Thread.currentThread().interrupt();
} catch (Exception e) {
LogUtils.error(log, "JobTaskFailAlarmListener queue take Exception", e);
}
}
protected void startLog() {
LogUtils.info(log, "JobTaskFailAlarmListener started");
}
private Map<Triple<String, String, Long>, List<JobNotifyConfig>> getJobNotifyConfigMap(Set<String> namespaceIds, Set<String> groupNames, Set<Long> jobIds) {
// 批量获取所需的通知配置
List<JobNotifyConfig> jobNotifyConfigs = jobNotifyConfigMapper.selectList(
new LambdaQueryWrapper<JobNotifyConfig>()
.eq(JobNotifyConfig::getNotifyStatus, StatusEnum.YES)
.eq(JobNotifyConfig::getNotifyScene, JobNotifySceneEnum.JOB_TASK_ERROR.getNotifyScene())
.in(JobNotifyConfig::getNamespaceId, namespaceIds)
.in(JobNotifyConfig::getGroupName, groupNames)
.in(JobNotifyConfig::getJobId, jobIds)
);
if (CollectionUtils.isEmpty(jobNotifyConfigs)) {
return null;
}
return jobNotifyConfigs.stream()
.collect(Collectors.groupingBy(i -> {
String namespaceId = i.getNamespaceId();
String groupName = i.getGroupName();
Long jobId = i.getJobId();
return ImmutableTriple.of(namespaceId, groupName, jobId);
}));
}
private Map<Triple<String, String, Long>, List<JobBatchResponseDO>> getJobTaskBatchMap(List<JobBatchResponseDO> jobTaskBatchList, Set<String> namespaceIds, Set<String> groupNames, Set<Long> jobIds) {
return jobTaskBatchList.stream().collect(Collectors.groupingBy(i -> {
String namespaceId = i.getNamespaceId();
String groupName = i.getGroupName();
Long jobId = i.getJobId();
namespaceIds.add(namespaceId);
groupNames.add(groupName);
jobIds.add(jobId);
return ImmutableTriple.of(namespaceId, groupName, jobId);
}));
@Override
protected int getNotifyScene() {
return JobNotifySceneEnum.JOB_TASK_ERROR.getNotifyScene();
}

View File

@ -1,11 +1,18 @@
package com.aizuda.easy.retry.server.retry.task.support.listener;
import com.aizuda.easy.retry.common.core.alarm.AlarmContext;
import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.EnvironmentUtils;
import com.aizuda.easy.retry.common.core.util.HostUtils;
import com.aizuda.easy.retry.server.common.AlarmInfoConverter;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.flow.control.AbstractFlowControl;
import com.aizuda.easy.retry.server.common.alarm.AbstractAlarm;
import com.aizuda.easy.retry.server.common.alarm.AbstractFlowControl;
import com.aizuda.easy.retry.server.common.alarm.AbstractRetryAlarm;
import com.aizuda.easy.retry.server.common.dto.AlarmInfo;
import com.aizuda.easy.retry.server.common.dto.NotifyConfigInfo;
import com.aizuda.easy.retry.server.common.dto.RetryAlarmInfo;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.common.triple.ImmutableTriple;
import com.aizuda.easy.retry.server.common.triple.Triple;
@ -13,6 +20,7 @@ import com.aizuda.easy.retry.server.retry.task.support.event.RetryTaskFailDeadLe
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
@ -38,7 +46,7 @@ import java.util.stream.Collectors;
*/
@Component
@Slf4j
public class RetryTaskFailDeadLetterAlarmListener extends AbstractFlowControl<RetryTaskFailDeadLetterAlarmEvent> implements Runnable, Lifecycle {
public class RetryTaskFailDeadLetterAlarmListener extends AbstractRetryAlarm<RetryTaskFailDeadLetterAlarmEvent> implements Runnable, Lifecycle {
/**
* 死信告警数据
@ -46,87 +54,22 @@ public class RetryTaskFailDeadLetterAlarmListener extends AbstractFlowControl<Re
private LinkedBlockingQueue<List<RetryDeadLetter>> queue = new LinkedBlockingQueue<>(1000);
private static String retryTaskDeadTextMessagesFormatter =
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 重试任务失败进入死信队列</font> \n" +
"> 组名称:{} \n" +
"> 执行器名称:{} \n" +
"> 场景名称:{} \n" +
"> 业务数据:{} \n" +
"> 时间:{} \n";
@Autowired
protected AccessTemplate accessTemplate;
private Thread thread;
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 重试任务失败进入死信队列</font> \n" +
"> 组名称:{} \n" +
"> 执行器名称:{} \n" +
"> 场景名称:{} \n" +
"> 业务数据:{} \n" +
"> 时间:{} \n";
@Override
public void start() {
thread = new Thread(this);
thread.start();
}
protected List<RetryAlarmInfo> poll() throws InterruptedException {
@Override
public void close() {
if (Objects.nonNull(thread)) {
thread.interrupt();
List<RetryDeadLetter> allRetryDeadLetterList = queue.poll(5, TimeUnit.SECONDS);
if (CollectionUtils.isEmpty(allRetryDeadLetterList)) {
return Lists.newArrayList();
}
}
@Override
public void run() {
LogUtils.info(log, "RetryTaskFailDeadLetterAlarmListener time[{}] ip:[{}]", LocalDateTime.now(),
HostUtils.getIp());
while (!Thread.currentThread().isInterrupted()) {
try {
List<RetryDeadLetter> allRetryDeadLetterList = queue.poll(5, TimeUnit.SECONDS);
if (CollectionUtils.isEmpty(allRetryDeadLetterList)) {
continue;
}
Set<String> namespaceIds = new HashSet<>();
Set<String> groupNames = new HashSet<>();
Set<String> sceneNames = new HashSet<>();
Map<Triple<String, String, String>, List<RetryDeadLetter>> retryDeadLetterMap = getRetryDeadLetterMap(
allRetryDeadLetterList, namespaceIds, groupNames, sceneNames);
Map<Triple<String, String, String>, List<NotifyConfig>> notifyConfigMap = getNotifyConfigMap(
namespaceIds, groupNames, sceneNames);
if (notifyConfigMap == null) {
continue;
}
// 循环发送消息
retryDeadLetterMap.forEach((key, list) -> {
List<NotifyConfig> notifyConfigsList = notifyConfigMap.get(key);
for (RetryDeadLetter retryDeadLetter : list) {
doSendAlarm(key, notifyConfigsList, AlarmDTOConverter.INSTANCE.toAlarmDTO(retryDeadLetter));
}
});
} catch (InterruptedException e) {
LogUtils.info(log, "retry task fail dead letter alarm stop");
Thread.currentThread().interrupt();
} catch (Exception e) {
LogUtils.error(log, "RetryTaskFailDeadLetterAlarmListener queue poll Exception", e);
}
}
}
@NotNull
private static Map<Triple<String, String, String>, List<RetryDeadLetter>> getRetryDeadLetterMap(
final List<RetryDeadLetter> allRetryDeadLetterList, final Set<String> namespaceIds,
final Set<String> groupNames, final Set<String> sceneNames) {
return allRetryDeadLetterList.stream()
.collect(Collectors.groupingBy(i -> {
String namespaceId = i.getNamespaceId();
String groupName = i.getGroupName();
String sceneName = i.getSceneName();
namespaceIds.add(namespaceId);
groupNames.add(groupName);
sceneNames.add(sceneName);
return ImmutableTriple.of(namespaceId, groupName, sceneName);
}));
return AlarmInfoConverter.INSTANCE.deadLetterToAlarmInfo(allRetryDeadLetterList);
}
@Override
@ -137,18 +80,28 @@ public class RetryTaskFailDeadLetterAlarmListener extends AbstractFlowControl<Re
}
@Override
protected AlarmContext buildAlarmContext(final AlarmDTO alarmDTO, final NotifyConfig notifyConfig) {
protected AlarmContext buildAlarmContext(final RetryAlarmInfo retryAlarmInfo, final NotifyConfigInfo notifyConfig) {
// 预警
return AlarmContext.build().text(retryTaskDeadTextMessagesFormatter,
EnvironmentUtils.getActiveProfile(),
alarmDTO.getGroupName(),
alarmDTO.getExecutorName(),
alarmDTO.getSceneName(),
alarmDTO.getArgsStr(),
DateUtils.format(alarmDTO.getCreateDt(), DateUtils.NORM_DATETIME_PATTERN))
.title("组:[{}] 场景:[{}] 环境重试任务失败进入死信队列",
alarmDTO.getGroupName(), alarmDTO.getSceneName())
.notifyAttribute(notifyConfig.getNotifyAttribute());
EnvironmentUtils.getActiveProfile(),
retryAlarmInfo.getGroupName(),
retryAlarmInfo.getExecutorName(),
retryAlarmInfo.getSceneName(),
retryAlarmInfo.getArgsStr(),
DateUtils.format(retryAlarmInfo.getCreateDt(), DateUtils.NORM_DATETIME_PATTERN))
.title("组:[{}] 场景:[{}] 环境重试任务失败进入死信队列",
retryAlarmInfo.getGroupName(), retryAlarmInfo.getSceneName())
.notifyAttribute(notifyConfig.getNotifyAttribute());
}
@Override
protected void startLog() {
LogUtils.info(log, "RetryTaskFailDeadLetterAlarmListener started");
}
@Override
protected int getNotifyScene() {
return NotifySceneEnum.MAX_RETRY.getNotifyScene();
}
}

View File

@ -1,11 +1,17 @@
package com.aizuda.easy.retry.server.retry.task.support.listener;
import com.aizuda.easy.retry.common.core.alarm.AlarmContext;
import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.EnvironmentUtils;
import com.aizuda.easy.retry.common.core.util.HostUtils;
import com.aizuda.easy.retry.server.common.AlarmInfoConverter;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.flow.control.AbstractFlowControl;
import com.aizuda.easy.retry.server.common.alarm.AbstractFlowControl;
import com.aizuda.easy.retry.server.common.alarm.AbstractRetryAlarm;
import com.aizuda.easy.retry.server.common.dto.AlarmInfo;
import com.aizuda.easy.retry.server.common.dto.NotifyConfigInfo;
import com.aizuda.easy.retry.server.common.dto.RetryAlarmInfo;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.common.triple.ImmutableTriple;
import com.aizuda.easy.retry.server.common.triple.Triple;
@ -36,75 +42,31 @@ import java.util.stream.Collectors;
@Component
@Slf4j
public class RetryTaskFailMoreThresholdAlarmListener extends
AbstractFlowControl<RetryTaskFailMoreThresholdAlarmEvent> implements Runnable, Lifecycle {
AbstractRetryAlarm<RetryTaskFailMoreThresholdAlarmEvent> implements Runnable, Lifecycle {
/**
* 重试任务失败数量超过阈值告警数据
*/
private LinkedBlockingQueue<RetryTask> queue = new LinkedBlockingQueue<>(1000);
private Thread thread;
private static String retryTaskFailMoreThresholdMessagesFormatter =
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 重试任务失败数量超过阈值</font> \n" +
"> 组名称:{} \n" +
"> 执行器名称:{} \n" +
"> 场景名称:{} \n" +
"> 重试次数:{} \n" +
"> 业务数据:{} \n" +
"> 时间:{} \n";
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 重试任务失败数量超过阈值</font> \n" +
"> 组名称:{} \n" +
"> 执行器名称:{} \n" +
"> 场景名称:{} \n" +
"> 重试次数:{} \n" +
"> 业务数据:{} \n" +
"> 时间:{} \n";
@Override
public void start() {
thread = new Thread(this);
thread.start();
}
protected List<RetryAlarmInfo> poll() throws InterruptedException {
// 无数据时阻塞线程
RetryTask retryTask = queue.take();
@Override
public void close() {
if (Objects.nonNull(thread)) {
thread.interrupt();
}
}
// 拉取100条
List<RetryTask> lists = Lists.newArrayList(retryTask);
queue.drainTo(lists, 200);
@Override
public void run() {
LogUtils.info(log, "RetryTaskFailMoreThresholdAlarmListener time[{}] ip:[{}]", LocalDateTime.now(),
HostUtils.getIp());
while (!Thread.currentThread().isInterrupted()) {
try {
// 无数据时阻塞线程
RetryTask retryTask = queue.take();
// 拉取100条
List<RetryTask> lists = Lists.newArrayList(retryTask);
queue.drainTo(lists, 200);
Set<String> namespaceIds = new HashSet<>();
Set<String> groupNames = new HashSet<>();
Set<String> sceneNames = new HashSet<>();
Map<Triple<String, String, String>, List<RetryTask>> retryTaskMap = getRetryTaskMap(
lists, namespaceIds, groupNames, sceneNames);
Map<Triple<String, String, String>, List<NotifyConfig>> notifyConfigMap = getNotifyConfigMap(
namespaceIds, groupNames, sceneNames);
if (notifyConfigMap == null) {
continue;
}
// 循环发送消息
retryTaskMap.forEach((key, list) -> {
List<NotifyConfig> notifyConfigsList = notifyConfigMap.get(key);
for (RetryTask retryTask1 : list) {
doSendAlarm(key, notifyConfigsList, AlarmDTOConverter.INSTANCE.toAlarmDTO(retryTask1));
}
});
} catch (InterruptedException e) {
LogUtils.info(log, "retry task fail more alarm stop");
Thread.currentThread().interrupt();
} catch (Exception e) {
LogUtils.error(log, "RetryTaskFailMoreThresholdAlarmListener queue poll Exception", e);
}
}
return AlarmInfoConverter.INSTANCE.retryTaskToAlarmInfo(lists);
}
@Override
@ -115,39 +77,29 @@ public class RetryTaskFailMoreThresholdAlarmListener extends
}
@Override
protected AlarmContext buildAlarmContext(AlarmDTO alarmDTO, NotifyConfig notifyConfig) {
protected AlarmContext buildAlarmContext(RetryAlarmInfo retryAlarmInfo, NotifyConfigInfo notifyConfig) {
// 预警
return AlarmContext.build().text(retryTaskFailMoreThresholdMessagesFormatter,
EnvironmentUtils.getActiveProfile(),
alarmDTO.getGroupName(),
alarmDTO.getExecutorName(),
alarmDTO.getSceneName(),
alarmDTO.getRetryCount(),
alarmDTO.getArgsStr(),
DateUtils.format(alarmDTO.getCreateDt(), DateUtils.NORM_DATETIME_PATTERN))
.title("组:[{}] 场景:[{}] 环境重试任务失败数量超过阈值", alarmDTO.getGroupName(), alarmDTO.getSceneName())
.notifyAttribute(notifyConfig.getNotifyAttribute());
EnvironmentUtils.getActiveProfile(),
retryAlarmInfo.getGroupName(),
retryAlarmInfo.getExecutorName(),
retryAlarmInfo.getSceneName(),
retryAlarmInfo.getRetryCount(),
retryAlarmInfo.getArgsStr(),
DateUtils.format(retryAlarmInfo.getCreateDt(), DateUtils.NORM_DATETIME_PATTERN))
.title("组:[{}] 场景:[{}] 环境重试任务失败数量超过阈值", retryAlarmInfo.getGroupName(),
retryAlarmInfo.getSceneName())
.notifyAttribute(notifyConfig.getNotifyAttribute());
}
@NotNull
private static Map<Triple<String, String, String>, List<RetryTask>> getRetryTaskMap(
final List<RetryTask> list, final Set<String> namespaceIds,
final Set<String> groupNames, final Set<String> sceneNames) {
return list.stream()
.collect(Collectors.groupingBy(retryTask -> {
String namespaceId = retryTask.getNamespaceId();
String groupName = retryTask.getGroupName();
String sceneName = retryTask.getSceneName();
namespaceIds.add(namespaceId);
groupNames.add(groupName);
sceneNames.add(sceneName);
return ImmutableTriple.of(namespaceId, groupName, sceneName);
}));
@Override
protected void startLog() {
LogUtils.info(log, "RetryTaskFailMoreThresholdAlarmListener started");
}
@Override
protected int getNotifyScene() {
return NotifySceneEnum.MAX_RETRY_ERROR.getNotifyScene();
}
}