diff --git a/doc/sql/easy_retry_mysql.sql b/doc/sql/easy_retry_mysql.sql index a2f2e588b..6e0e09da5 100644 --- a/doc/sql/easy_retry_mysql.sql +++ b/doc/sql/easy_retry_mysql.sql @@ -375,3 +375,66 @@ CREATE TABLE `job_task_batch` ) ENGINE = InnoDB AUTO_INCREMENT = 0 DEFAULT CHARSET = utf8mb4 COMMENT ='任务批次'; + +CREATE TABLE `job_notify_config` +( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', + `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id', + `group_name` varchar(64) NOT NULL COMMENT '组名称', + `job_id` bigint(20) NOT NULL COMMENT '任务id', + `notify_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '通知状态 0、未启用 1、启用', + `notify_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '通知类型 1、钉钉 2、邮件 3、企业微信', + `notify_attribute` varchar(512) NOT NULL COMMENT '配置属性', + `notify_threshold` int(11) NOT NULL DEFAULT '0' COMMENT '通知阈值', + `notify_scene` tinyint(4) NOT NULL DEFAULT '0' COMMENT '通知场景', + `rate_limiter_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '限流状态 0、未启用 1、启用', + `rate_limiter_threshold` int(11) NOT NULL DEFAULT '0' COMMENT '每秒限流阈值', + `description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述', + `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + PRIMARY KEY (`id`), + KEY `idx_namespace_id_group_name` (`namespace_id`,`group_name`) +) ENGINE=InnoDB + AUTO_INCREMENT=4 + DEFAULT CHARSET=utf8mb4 COMMENT='job通知配置'; + +CREATE TABLE `job_summary` +( + `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', + `namespace_id` VARCHAR(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id', + `group_name` VARCHAR(64) NOT NULL DEFAULT '' COMMENT '组名称', + `job_id` bigint NOT NULL COMMENT '任务信息id', + `trigger_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '统计时间', + `success_num` int NOT NULL DEFAULT '0' COMMENT '执行成功-日志数量', + `fail_num` int NOT NULL DEFAULT '0' COMMENT '执行失败-日志数量', + `fail_reason` varchar(512) NOT NULL DEFAULT '' COMMENT '失败原因', + `stop_num` int NOT NULL DEFAULT '0' COMMENT '执行失败-日志数量', + `stop_reason` varchar(512) NOT NULL DEFAULT '' COMMENT '失败原因', + `cancel_num` int NOT NULL DEFAULT '0' COMMENT '执行失败-日志数量', + `cancel_reason` varchar(512) NOT NULL DEFAULT '' COMMENT '失败原因', + `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + PRIMARY KEY (`id`), + UNIQUE KEY `uk_job_id_trigger_at` (`job_id`, `trigger_at`) USING BTREE +) ENGINE = InnoDB + AUTO_INCREMENT = 1 + DEFAULT CHARSET = utf8mb4 COMMENT ='DashBoard_Job'; + +CREATE TABLE `retry_summary` +( + `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', + `namespace_id` VARCHAR(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id', + `group_name` VARCHAR(64) NOT NULL DEFAULT '' COMMENT '组名称', + `scene_name` VARCHAR(50) NOT NULL DEFAULT '' COMMENT '场景名称', + `trigger_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '统计时间', + `running_num` int NOT NULL DEFAULT '0' COMMENT '重试中-日志数量', + `finish_num` int NOT NULL DEFAULT '0' COMMENT '重试完成-日志数量', + `max_count_num` int NOT NULL DEFAULT '0' COMMENT '重试到达最大次数-日志数量', + `suspend_num` int NOT NULL DEFAULT '0' COMMENT '暂停重试-日志数量', + `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', + PRIMARY KEY (`id`), + UNIQUE KEY `uk_scene_name_trigger_at` (`scene_name`, `trigger_at`) USING BTREE +) ENGINE = InnoDB + AUTO_INCREMENT = 1 + DEFAULT CHARSET = utf8mb4 COMMENT ='DashBoard_Retry'; \ No newline at end of file diff --git a/doc/sql/easy_retry_postgre.sql b/doc/sql/easy_retry_postgre.sql index 43d3475b0..6b81c6034 100644 --- a/doc/sql/easy_retry_postgre.sql +++ b/doc/sql/easy_retry_postgre.sql @@ -497,3 +497,37 @@ COMMENT ON COLUMN "job_task_batch"."create_dt" IS '创建时间'; COMMENT ON COLUMN "job_task_batch"."update_dt" IS '创建时间'; COMMENT ON COLUMN "job_task_batch"."ext_attrs" IS '扩展字段'; COMMENT ON TABLE "job_task" IS '任务批次'; + +CREATE TABLE job_notify_config +( + id BIGSERIAL PRIMARY KEY, + group_name VARCHAR(64) NOT NULL, + job_id BIGINT NOT NULL, + notify_status SMALLINT NOT NULL DEFAULT 0, + notify_type SMALLINT NOT NULL DEFAULT 0, + notify_attribute VARCHAR(512) NOT NULL, + notify_threshold INT NOT NULL DEFAULT 0, + notify_scene SMALLINT NOT NULL DEFAULT 0, + rate_limiter_status SMALLINT NOT NULL DEFAULT 0, + rate_limiter_threshold INT NOT NULL DEFAULT 0, + description VARCHAR(256) NOT NULL DEFAULT '', + create_dt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_dt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX idx_group_name ON job_notify_config (group_name); + +COMMENT ON COLUMN "job_notify_config"."id" IS '主键'; +COMMENT ON COLUMN "job_notify_config"."group_name" IS '组名称'; +COMMENT ON COLUMN "job_notify_config"."job_id" IS '任务信息id'; +COMMENT ON COLUMN "job_notify_config"."notify_status" IS '通知状态 0、未启用 1、启用'; +COMMENT ON COLUMN "job_notify_config"."notify_type" IS '通知类型 1、钉钉 2、邮件 3、企业微信'; +COMMENT ON COLUMN "job_notify_config"."notify_attribute" IS '配置属性'; +COMMENT ON COLUMN "job_notify_config"."notify_threshold" IS '通知阈值'; +COMMENT ON COLUMN "job_notify_config"."notify_scene" IS '通知场景'; +COMMENT ON COLUMN "job_notify_config"."rate_limiter_status" IS '限流状态 0、未启用 1、启用'; +COMMENT ON COLUMN "job_notify_config"."rate_limiter_threshold" IS '每秒限流阈值'; +COMMENT ON COLUMN "job_notify_config"."description" IS '描述'; +COMMENT ON COLUMN "job_notify_config"."create_dt" IS '创建时间'; +COMMENT ON COLUMN "job_notify_config"."update_dt" IS '修改时间'; +COMMENT ON TABLE "job_notify_config" IS '通知配置'; \ No newline at end of file diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/JobTaskBatchMapper.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/JobTaskBatchMapper.java index bc7326dee..72bbbc33d 100644 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/JobTaskBatchMapper.java +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/JobTaskBatchMapper.java @@ -25,7 +25,7 @@ public interface JobTaskBatchMapper extends BaseMapper { List selectJobBatchPageList(IPage iPage, @Param("queryDO") JobBatchQueryDO queryDO); - List selectJobBatchListByIds( @Param("ids") List ids); + List selectJobBatchListByIds(@Param("ids") List ids); List summaryJobBatchList(@Param("from") LocalDateTime todayFrom, @Param("to") LocalDateTime to); } diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/AlarmInfoConverter.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/AlarmInfoConverter.java new file mode 100644 index 000000000..f228d8bfc --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/AlarmInfoConverter.java @@ -0,0 +1,37 @@ +package com.aizuda.easy.retry.server.common; + +import com.aizuda.easy.retry.server.common.alarm.AbstractRetryAlarm; +import com.aizuda.easy.retry.server.common.dto.JobAlarmInfo; +import com.aizuda.easy.retry.server.common.dto.NotifyConfigInfo; +import com.aizuda.easy.retry.server.common.dto.RetryAlarmInfo; +import com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchResponseDO; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobNotifyConfig; +import com.aizuda.easy.retry.template.datasource.persistence.po.NotifyConfig; +import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter; +import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; +import org.mapstruct.Mapper; +import org.mapstruct.factory.Mappers; + +import java.util.List; + +/** + * @author xiaowoniu + * @date 2023-12-03 10:58:53 + * @since 2.5.0 + */ +@Mapper +public interface AlarmInfoConverter { + + AlarmInfoConverter INSTANCE = Mappers.getMapper(AlarmInfoConverter.class); + + List retryTaskToAlarmInfo(List retryTasks); + + List deadLetterToAlarmInfo(List retryDeadLetters); + + List retryToNotifyConfigInfos(List notifyConfigs); + + List jobToNotifyConfigInfos(List notifyConfigs); + + List toJobAlarmInfos(List jobBatchResponse); + +} diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/alarm/AbstractAlarm.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/alarm/AbstractAlarm.java new file mode 100644 index 000000000..988515442 --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/alarm/AbstractAlarm.java @@ -0,0 +1,132 @@ +package com.aizuda.easy.retry.server.common.alarm; + +import com.aizuda.easy.retry.common.core.alarm.Alarm; +import com.aizuda.easy.retry.common.core.alarm.AlarmContext; +import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory; +import com.aizuda.easy.retry.common.core.enums.StatusEnum; +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.common.Lifecycle; +import com.aizuda.easy.retry.server.common.dto.AlarmInfo; +import com.aizuda.easy.retry.server.common.dto.NotifyConfigInfo; +import com.aizuda.easy.retry.server.common.triple.Triple; +import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; +import com.google.common.util.concurrent.RateLimiter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEvent; +import org.springframework.context.ApplicationListener; +import org.springframework.util.CollectionUtils; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +/** + * @author xiaowoniu + * @date 2023-12-03 09:47:11 + * @since 2.5.0 + */ +@Slf4j +public abstract class AbstractAlarm extends AbstractFlowControl implements ApplicationListener, Runnable, Lifecycle { + + @Autowired + private EasyRetryAlarmFactory easyRetryAlarmFactory; + + @Autowired + protected AccessTemplate accessTemplate; + + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + + try { + // 从队列获取数据 + List alarmInfos = poll(); + if (CollectionUtils.isEmpty(alarmInfos)) { + continue; + } + + // 获取所有的命名空间 + Set namespaceIds = new HashSet<>(); + // 获取所有的组名称 + Set groupNames = new HashSet<>(); + // 获取所有的场景名称 + Set sceneNames = new HashSet<>(); + + // 转换AlarmDTO 为了下面循环发送使用 + Map, List> waitSendAlarmInfos = convertAlarmDTO( + alarmInfos, namespaceIds, groupNames, sceneNames); + + // 批量获取通知配置 + Map, List> notifyConfig = obtainNotifyConfig(namespaceIds, groupNames, sceneNames); + + // 循环发送消息 + waitSendAlarmInfos.forEach((key, list) -> { + List notifyConfigsList = notifyConfig.get(key); + for (A alarmDTO : list) { + sendAlarm(notifyConfigsList, alarmDTO); + } + }); + } catch (InterruptedException e) { + LogUtils.info(log, "retry task fail dead letter alarm stop"); + Thread.currentThread().interrupt(); + } catch (Exception e) { + LogUtils.error(log, "RetryTaskFailDeadLetterAlarmListener queue poll Exception", e); + } + } + } + + protected abstract Map, List> convertAlarmDTO(List alarmData, Set namespaceIds, Set groupNames, Set sceneNames); + + protected abstract Map, + List> obtainNotifyConfig(Set namespaceIds, + Set groupNames, + Set sceneNames); + + protected abstract List poll() throws InterruptedException; + + protected abstract AlarmContext buildAlarmContext(A alarmDTO, NotifyConfigInfo notifyConfig); + + private Thread thread; + + @Override + public void start() { + thread = new Thread(this); + thread.start(); + startLog(); + } + + protected abstract void startLog(); + + @Override + public void close() { + if (Objects.nonNull(thread)) { + thread.interrupt(); + } + } + + protected void sendAlarm(List notifyConfigsList, A alarmDTO) { + for (final NotifyConfigInfo notifyConfig : notifyConfigsList) { + if (Objects.equals(notifyConfig.getRateLimiterStatus(), StatusEnum.YES.getStatus())) { + // 限流 + RateLimiter rateLimiter = getRateLimiter(rateLimiterKey(notifyConfig), + notifyConfig.getRateLimiterThreshold()); + // 每秒发送rateLimiterThreshold个告警 + if (Objects.nonNull(rateLimiter) && !RateLimiter.create(notifyConfig.getRateLimiterThreshold()) + .tryAcquire(1, TimeUnit.SECONDS)) { + continue; + } + } + + AlarmContext context = buildAlarmContext(alarmDTO, notifyConfig); + Alarm alarmType = easyRetryAlarmFactory.getAlarmType( + notifyConfig.getNotifyType()); + alarmType.asyncSendMessage(context); + } + } + + protected abstract String rateLimiterKey(NotifyConfigInfo notifyConfig); + + protected abstract int getNotifyScene(); +} + + diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/alarm/AbstractFlowControl.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/alarm/AbstractFlowControl.java new file mode 100644 index 000000000..63732d6a2 --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/alarm/AbstractFlowControl.java @@ -0,0 +1,55 @@ +package com.aizuda.easy.retry.server.common.alarm; + +import com.aizuda.easy.retry.common.core.alarm.Alarm; +import com.aizuda.easy.retry.common.core.alarm.AlarmContext; +import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory; +import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum; +import com.aizuda.easy.retry.common.core.enums.StatusEnum; +import com.aizuda.easy.retry.common.core.log.LogUtils; +import com.aizuda.easy.retry.server.common.cache.CacheNotifyRateLimiter; +import com.aizuda.easy.retry.server.common.dto.AlarmInfo; +import com.aizuda.easy.retry.server.common.dto.NotifyConfigInfo; +import com.aizuda.easy.retry.server.common.dto.RetryAlarmInfo; +import com.aizuda.easy.retry.server.common.triple.ImmutableTriple; +import com.aizuda.easy.retry.server.common.triple.Triple; +import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; +import com.aizuda.easy.retry.template.datasource.persistence.po.NotifyConfig; +import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter; +import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.google.common.util.concurrent.RateLimiter; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.mapstruct.Mapper; +import org.mapstruct.factory.Mappers; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEvent; +import org.springframework.context.ApplicationListener; +import org.springframework.util.CollectionUtils; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + + +/** + * @author: zuoJunLin + * @date : 2023-11-21 13:04 + * @since 2.5.0 + */ +@Slf4j +public abstract class AbstractFlowControl implements ApplicationListener { + + protected RateLimiter getRateLimiter(String key, double rateLimiterThreshold) { + RateLimiter rateLimiter = CacheNotifyRateLimiter.getRateLimiterByKey(key); + if (Objects.isNull(rateLimiter) || rateLimiter.getRate() != rateLimiterThreshold) { + CacheNotifyRateLimiter.put(key, RateLimiter.create(rateLimiterThreshold)); + } + + return rateLimiter; + } +} diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/alarm/AbstractJobAlarm.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/alarm/AbstractJobAlarm.java new file mode 100644 index 000000000..809c1f637 --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/alarm/AbstractJobAlarm.java @@ -0,0 +1,82 @@ +package com.aizuda.easy.retry.server.common.alarm; + +import com.aizuda.easy.retry.common.core.enums.JobNotifySceneEnum; +import com.aizuda.easy.retry.common.core.enums.StatusEnum; +import com.aizuda.easy.retry.server.common.AlarmInfoConverter; +import com.aizuda.easy.retry.server.common.dto.JobAlarmInfo; +import com.aizuda.easy.retry.server.common.dto.NotifyConfigInfo; +import com.aizuda.easy.retry.server.common.enums.SystemModeEnum; +import com.aizuda.easy.retry.server.common.triple.ImmutableTriple; +import com.aizuda.easy.retry.server.common.triple.Triple; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobNotifyConfigMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobNotifyConfig; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEvent; +import org.springframework.util.CollectionUtils; + +import java.text.MessageFormat; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * @author xiaowoniu + * @date 2023-12-03 10:19:19 + * @since 2.5.0 + */ +public abstract class AbstractJobAlarm extends AbstractAlarm { + + @Autowired + private JobNotifyConfigMapper jobNotifyConfigMapper; + + @Override + protected Map, List> convertAlarmDTO(List alarmInfos, Set namespaceIds, Set groupNames, Set jobIds) { + + return alarmInfos.stream().collect(Collectors.groupingBy(i -> { + String namespaceId = i.getNamespaceId(); + String groupName = i.getGroupName(); + Long jobId = i.getJobId(); + namespaceIds.add(namespaceId); + groupNames.add(groupName); + jobIds.add(jobId); + return ImmutableTriple.of(namespaceId, groupName, jobId); + })); + } + + @Override + protected Map, List> obtainNotifyConfig(Set namespaceIds, Set groupNames, Set jobIds) { + + // 批量获取所需的通知配置 + List jobNotifyConfigs = jobNotifyConfigMapper.selectList( + new LambdaQueryWrapper() + .eq(JobNotifyConfig::getNotifyStatus, StatusEnum.YES) + .eq(JobNotifyConfig::getNotifyScene, getNotifyScene()) + .in(JobNotifyConfig::getNamespaceId, namespaceIds) + .in(JobNotifyConfig::getGroupName, groupNames) + .in(JobNotifyConfig::getJobId, jobIds) + ); + if (CollectionUtils.isEmpty(jobNotifyConfigs)) { + return null; + } + + List notifyConfigInfos = AlarmInfoConverter.INSTANCE.jobToNotifyConfigInfos(jobNotifyConfigs); + + return notifyConfigInfos.stream() + .collect(Collectors.groupingBy(i -> { + + String namespaceId = i.getNamespaceId(); + String groupName = i.getGroupName(); + Long jobId = i.getJobId(); + + return ImmutableTriple.of(namespaceId, groupName, jobId); + })); + + } + + @Override + protected String rateLimiterKey(NotifyConfigInfo notifyConfig) { + return MessageFormat.format("{}_{}", SystemModeEnum.JOB.name(), notifyConfig.getId()); + } +} diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/alarm/AbstractRetryAlarm.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/alarm/AbstractRetryAlarm.java new file mode 100644 index 000000000..116f968b5 --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/alarm/AbstractRetryAlarm.java @@ -0,0 +1,86 @@ +package com.aizuda.easy.retry.server.common.alarm; + +import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum; +import com.aizuda.easy.retry.common.core.enums.StatusEnum; +import com.aizuda.easy.retry.server.common.AlarmInfoConverter; +import com.aizuda.easy.retry.server.common.dto.NotifyConfigInfo; +import com.aizuda.easy.retry.server.common.dto.RetryAlarmInfo; +import com.aizuda.easy.retry.server.common.enums.SystemModeEnum; +import com.aizuda.easy.retry.server.common.triple.ImmutableTriple; +import com.aizuda.easy.retry.server.common.triple.Triple; +import com.aizuda.easy.retry.template.datasource.persistence.po.NotifyConfig; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.google.common.collect.Maps; +import org.springframework.context.ApplicationEvent; +import org.springframework.util.CollectionUtils; + +import java.text.MessageFormat; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * @author xiaowoniu + * @date 2023-12-03 10:19:19 + * @since 2.5.0 + */ +public abstract class AbstractRetryAlarm extends AbstractAlarm { + @Override + protected Map, List> convertAlarmDTO( + List alarmDataList, + Set namespaceIds, + Set groupNames, + Set sceneNames) { + + return alarmDataList.stream() + .collect(Collectors.groupingBy(i -> { + + String namespaceId = i.getNamespaceId(); + String groupName = i.getGroupName(); + String sceneName = i.getSceneName(); + + namespaceIds.add(namespaceId); + groupNames.add(groupName); + sceneNames.add(sceneName); + + return ImmutableTriple.of(namespaceId, groupName, sceneName); + })); + } + + + @Override + protected Map, List> obtainNotifyConfig(Set namespaceIds, Set groupNames, Set sceneNames) { + + // 批量获取所需的通知配置 + List notifyConfigs = accessTemplate.getNotifyConfigAccess().list( + new LambdaQueryWrapper() + .eq(NotifyConfig::getNotifyStatus, StatusEnum.YES) + .eq(NotifyConfig::getNotifyScene, getNotifyScene()) + .in(NotifyConfig::getNamespaceId, namespaceIds) + .in(NotifyConfig::getGroupName, groupNames) + .in(NotifyConfig::getSceneName, sceneNames) + ); + + if (CollectionUtils.isEmpty(notifyConfigs)) { + return Maps.newHashMap(); + } + + List notifyConfigInfos = AlarmInfoConverter.INSTANCE.retryToNotifyConfigInfos(notifyConfigs); + return notifyConfigInfos.stream() + .collect(Collectors.groupingBy(config -> { + + String namespaceId = config.getNamespaceId(); + String groupName = config.getGroupName(); + String sceneName = config.getSceneName(); + + return ImmutableTriple.of(namespaceId, groupName, sceneName); + })); + } + + @Override + protected String rateLimiterKey(NotifyConfigInfo notifyConfig) { + return MessageFormat.format("{}_{}", SystemModeEnum.RETRY.name(), notifyConfig.getId()); + } + +} diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/AlarmInfo.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/AlarmInfo.java new file mode 100644 index 000000000..6bef2cc3f --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/AlarmInfo.java @@ -0,0 +1,13 @@ +package com.aizuda.easy.retry.server.common.dto; + +/** + * @author xiaowoniu + * @date 2023-12-03 10:15:37 + * @since 2.5.0 + */ +public class AlarmInfo { + + private String namespaceId; + + private String groupName; +} diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/JobAlarmInfo.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/JobAlarmInfo.java new file mode 100644 index 000000000..aae87e1d9 --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/JobAlarmInfo.java @@ -0,0 +1,44 @@ +package com.aizuda.easy.retry.server.common.dto; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +/** + * @author xiaowoniu + * @date 2023-12-03 11:05:19 + * @since 2.5.0 + */ +@EqualsAndHashCode(callSuper = true) +@Data +public class JobAlarmInfo extends AlarmInfo { + + private Long id; + + /** + * 命名空间 + */ + private String namespaceId; + + /** + * 组名称 + */ + private String groupName; + + /** + * 名称 + */ + private String jobName; + + /** + * 任务信息id + */ + private Long jobId; + /** + * 执行器名称 + */ + private String executorInfo; + + + private String argsStr; + +} diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/NotifyConfigInfo.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/NotifyConfigInfo.java new file mode 100644 index 000000000..f745c123b --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/NotifyConfigInfo.java @@ -0,0 +1,39 @@ +package com.aizuda.easy.retry.server.common.dto; + +import lombok.Data; + +/** + * @author xiaowoniu + * @date 2023-12-03 10:02:43 + * @since 2.5.0 + */ +@Data +public class NotifyConfigInfo { + + private Long id; + + private String namespaceId; + + private String groupName; + + // job告警时使用 + private Long jobId; + + // retry告警时使用 + private String sceneName; + + private Integer notifyStatus; + + private Integer notifyType; + + private String notifyAttribute; + + private Integer notifyThreshold; + + private Integer notifyScene; + + private Integer rateLimiterStatus; + + private Integer rateLimiterThreshold; + +} diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/RetryAlarmInfo.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/RetryAlarmInfo.java new file mode 100644 index 000000000..2d129d3f1 --- /dev/null +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/RetryAlarmInfo.java @@ -0,0 +1,36 @@ +package com.aizuda.easy.retry.server.common.dto; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +import java.time.LocalDateTime; + +/** + * @author xiaowoniu + * @date 2023-12-03 10:15:37 + * @since 2.5.0 + */ +@EqualsAndHashCode(callSuper = true) +@Data +public class RetryAlarmInfo extends AlarmInfo { + + private String namespaceId; + + private String uniqueId; + + private String groupName; + + private String sceneName; + + private String idempotentId; + + private String bizNo; + + private String executorName; + + private String argsStr; + + private Integer retryCount; + + private LocalDateTime createDt; +} diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/flow/control/AbstractFlowControl.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/flow/control/AbstractFlowControl.java deleted file mode 100644 index c7db82b69..000000000 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/flow/control/AbstractFlowControl.java +++ /dev/null @@ -1,147 +0,0 @@ -package com.aizuda.easy.retry.server.common.flow.control; - -import com.aizuda.easy.retry.common.core.alarm.Alarm; -import com.aizuda.easy.retry.common.core.alarm.AlarmContext; -import com.aizuda.easy.retry.common.core.alarm.EasyRetryAlarmFactory; -import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum; -import com.aizuda.easy.retry.common.core.enums.StatusEnum; -import com.aizuda.easy.retry.common.core.log.LogUtils; -import com.aizuda.easy.retry.server.common.cache.CacheNotifyRateLimiter; -import com.aizuda.easy.retry.server.common.triple.ImmutableTriple; -import com.aizuda.easy.retry.server.common.triple.Triple; -import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; -import com.aizuda.easy.retry.template.datasource.persistence.po.NotifyConfig; -import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter; -import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.google.common.util.concurrent.RateLimiter; -import lombok.Data; -import lombok.extern.slf4j.Slf4j; -import org.mapstruct.Mapper; -import org.mapstruct.factory.Mappers; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationEvent; -import org.springframework.context.ApplicationListener; -import org.springframework.util.CollectionUtils; - -import java.time.LocalDateTime; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - - -/** - * @author: zuoJunLin - * @date : 2023-11-21 13:04 - * @since 2.5.0 - */ -@Slf4j -public abstract class AbstractFlowControl implements ApplicationListener { - - - @Autowired - private EasyRetryAlarmFactory easyRetryAlarmFactory; - - @Autowired - protected AccessTemplate accessTemplate; - - protected RateLimiter getRateLimiter(String key, double rateLimiterThreshold) { - RateLimiter rateLimiter = CacheNotifyRateLimiter.getRateLimiterByKey(key); - if (Objects.isNull(rateLimiter) || rateLimiter.getRate() != rateLimiterThreshold) { - CacheNotifyRateLimiter.put(key, RateLimiter.create(rateLimiterThreshold)); - } - - return rateLimiter; - } - - protected Map, List> getNotifyConfigMap(final Set namespaceIds, - final Set groupNames, final Set sceneNames) { - - // 批量获取所需的通知配置 - List notifyConfigs = accessTemplate.getNotifyConfigAccess().list( - new LambdaQueryWrapper() - .eq(NotifyConfig::getNotifyStatus, StatusEnum.YES) - .eq(NotifyConfig::getNotifyScene, NotifySceneEnum.RETRY_TASK_ENTER_DEAD_LETTER.getNotifyScene()) - .in(NotifyConfig::getNamespaceId, namespaceIds) - .in(NotifyConfig::getGroupName, groupNames) - .in(NotifyConfig::getSceneName, sceneNames) - ); - - if (CollectionUtils.isEmpty(notifyConfigs)) { - return null; - } - - return notifyConfigs.stream() - .collect(Collectors.groupingBy(i -> { - - String namespaceId = i.getNamespaceId(); - String groupName = i.getGroupName(); - String sceneName = i.getSceneName(); - - return ImmutableTriple.of(namespaceId, groupName, sceneName); - })); - } - - protected void doSendAlarm(Triple key, - List notifyConfigsList, - AlarmDTO alarmDTO - ) { - for (final NotifyConfig notifyConfig : notifyConfigsList) { - if (Objects.equals(notifyConfig.getRateLimiterStatus(), StatusEnum.YES.getStatus())) { - // 限流 - RateLimiter rateLimiter = getRateLimiter(String.valueOf(notifyConfig.getId()), - notifyConfig.getRateLimiterThreshold()); - // 每秒发送rateLimiterThreshold个告警 - if (Objects.nonNull(rateLimiter) && !rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) { - LogUtils.warn(log, - "namespaceId:[{}] groupName:[{}] senceName:[{}] idempotentId:[{}] 任务重试失败进入死信队列已到达最大限流阈值,本次通知不执行", - key.getLeft(), key.getMiddle(), key.getRight(), - alarmDTO.getIdempotentId()); - continue; - } - } - - AlarmContext context = buildAlarmContext(alarmDTO, notifyConfig); - Alarm alarmType = easyRetryAlarmFactory.getAlarmType( - notifyConfig.getNotifyType()); - alarmType.asyncSendMessage(context); - } - } - - protected abstract AlarmContext buildAlarmContext(final AlarmDTO alarmDTO, NotifyConfig notifyConfig); - - @Data - public static class AlarmDTO { - - private String uniqueId; - - private String groupName; - - private String sceneName; - - private String idempotentId; - - private String bizNo; - - private String executorName; - - private String argsStr; - - private Integer retryCount; - - private LocalDateTime createDt; - } - - @Mapper - public interface AlarmDTOConverter { - - AlarmDTOConverter INSTANCE = Mappers.getMapper(AlarmDTOConverter.class); - - AlarmDTO toAlarmDTO(RetryDeadLetter retryDeadLetter); - - AlarmDTO toAlarmDTO(RetryTask retryTask); - } -} diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/listener/JobTaskFailAlarmListener.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/listener/JobTaskFailAlarmListener.java index e33a27fff..384f390b4 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/listener/JobTaskFailAlarmListener.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/listener/JobTaskFailAlarmListener.java @@ -8,7 +8,11 @@ import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.util.EnvironmentUtils; import com.aizuda.easy.retry.common.core.util.HostUtils; +import com.aizuda.easy.retry.server.common.AlarmInfoConverter; import com.aizuda.easy.retry.server.common.Lifecycle; +import com.aizuda.easy.retry.server.common.alarm.AbstractJobAlarm; +import com.aizuda.easy.retry.server.common.dto.JobAlarmInfo; +import com.aizuda.easy.retry.server.common.dto.NotifyConfigInfo; import com.aizuda.easy.retry.server.common.triple.ImmutableTriple; import com.aizuda.easy.retry.server.common.triple.Triple; import com.aizuda.easy.retry.server.common.util.DateUtils; @@ -40,11 +44,7 @@ import java.util.stream.Collectors; */ @Component @Slf4j -public class JobTaskFailAlarmListener implements ApplicationListener, Runnable, Lifecycle { - - - @Autowired - private JobNotifyConfigMapper jobNotifyConfigMapper; +public class JobTaskFailAlarmListener extends AbstractJobAlarm { @Autowired private JobTaskBatchMapper jobTaskBatchMapper; @@ -65,107 +65,45 @@ public class JobTaskFailAlarmListener implements ApplicationListener 方法参数:{} \n" + "> 时间:{} \n"; - private Thread thread; - @Override - public void start() { - thread = new Thread(this); - thread.start(); + protected List poll() throws InterruptedException { + // 无数据时阻塞线程 + Long jobTaskBatchId = queue.take(); + // 拉取200条 + List jobTaskBatchIds = Lists.newArrayList(jobTaskBatchId); + queue.drainTo(jobTaskBatchIds, 200); + + List jobTaskBatchList = jobTaskBatchMapper.selectJobBatchListByIds(jobTaskBatchIds); + return AlarmInfoConverter.INSTANCE.toJobAlarmInfos(jobTaskBatchList); } @Override - public void close() { - if (Objects.nonNull(thread)) { - thread.interrupt(); - } + protected AlarmContext buildAlarmContext(JobAlarmInfo alarmDTO, NotifyConfigInfo notifyConfig) { + // 预警 + AlarmContext context = AlarmContext.build() + .text(jobTaskFailTextMessagesFormatter, + EnvironmentUtils.getActiveProfile(), + alarmDTO.getGroupName(), + alarmDTO.getJobName(), + alarmDTO.getExecutorInfo(), + alarmDTO.getArgsStr(), + DateUtils.toNowFormat(DateUtils.NORM_DATETIME_PATTERN)) + .title("{}环境 JOB任务失败", EnvironmentUtils.getActiveProfile()) + .notifyAttribute(notifyConfig.getNotifyAttribute()); + Alarm alarmType = easyRetryAlarmFactory.getAlarmType(notifyConfig.getNotifyType()); + alarmType.asyncSendMessage(context); + + return context; } @Override - public void run() { - LogUtils.info(log, "JobTaskFailAlarmListener time[{}] ip:[{}]", LocalDateTime.now(), HostUtils.getIp()); - while (!Thread.currentThread().isInterrupted()) { - try { - // 无数据时阻塞线程 - Long jobTaskBatchId = queue.take(); - // 拉取200条 - List jobTaskBatchIds = Lists.newArrayList(jobTaskBatchId); - queue.drainTo(jobTaskBatchIds, 200); - List jobTaskBatchList = jobTaskBatchMapper.selectJobBatchListByIds(jobTaskBatchIds); - Set namespaceIds = new HashSet<>(); - Set groupNames = new HashSet<>(); - Set jobIds = new HashSet<>(); - Map, List> jobTaskBatchMap = getJobTaskBatchMap( - jobTaskBatchList, namespaceIds, groupNames, jobIds); - - Map, List> jobNotifyConfigMap = getJobNotifyConfigMap( - namespaceIds, groupNames, jobIds); - if (jobNotifyConfigMap == null) { - continue; - } - // 循环发送消息 - jobTaskBatchMap.forEach((key, list) -> { - List jobNotifyConfigsList = jobNotifyConfigMap.get(key); - for (JobBatchResponseDO JobBatch : list) { - for (final JobNotifyConfig jobNotifyConfig : jobNotifyConfigsList) { - // 预警 - AlarmContext context = AlarmContext.build() - .text(jobTaskFailTextMessagesFormatter, - EnvironmentUtils.getActiveProfile(), - JobBatch.getGroupName(), - JobBatch.getJobName(), - JobBatch.getExecutorInfo(), - JobBatch.getArgsStr(), - DateUtils.toNowFormat(DateUtils.NORM_DATETIME_PATTERN)) - .title("{}环境 JOB任务失败", EnvironmentUtils.getActiveProfile()) - .notifyAttribute(jobNotifyConfig.getNotifyAttribute()); - Alarm alarmType = easyRetryAlarmFactory.getAlarmType(jobNotifyConfig.getNotifyType()); - alarmType.asyncSendMessage(context); - } - } - }); - } catch (InterruptedException e) { - LogUtils.info(log, "job task fail more alarm stop"); - Thread.currentThread().interrupt(); - } catch (Exception e) { - LogUtils.error(log, "JobTaskFailAlarmListener queue take Exception", e); - } - } + protected void startLog() { + LogUtils.info(log, "JobTaskFailAlarmListener started"); } - private Map, List> getJobNotifyConfigMap(Set namespaceIds, Set groupNames, Set jobIds) { - // 批量获取所需的通知配置 - List jobNotifyConfigs = jobNotifyConfigMapper.selectList( - new LambdaQueryWrapper() - .eq(JobNotifyConfig::getNotifyStatus, StatusEnum.YES) - .eq(JobNotifyConfig::getNotifyScene, JobNotifySceneEnum.JOB_TASK_ERROR.getNotifyScene()) - .in(JobNotifyConfig::getNamespaceId, namespaceIds) - .in(JobNotifyConfig::getGroupName, groupNames) - .in(JobNotifyConfig::getJobId, jobIds) - ); - if (CollectionUtils.isEmpty(jobNotifyConfigs)) { - return null; - } - return jobNotifyConfigs.stream() - .collect(Collectors.groupingBy(i -> { - - String namespaceId = i.getNamespaceId(); - String groupName = i.getGroupName(); - Long jobId = i.getJobId(); - - return ImmutableTriple.of(namespaceId, groupName, jobId); - })); - } - - private Map, List> getJobTaskBatchMap(List jobTaskBatchList, Set namespaceIds, Set groupNames, Set jobIds) { - return jobTaskBatchList.stream().collect(Collectors.groupingBy(i -> { - String namespaceId = i.getNamespaceId(); - String groupName = i.getGroupName(); - Long jobId = i.getJobId(); - namespaceIds.add(namespaceId); - groupNames.add(groupName); - jobIds.add(jobId); - return ImmutableTriple.of(namespaceId, groupName, jobId); - })); + @Override + protected int getNotifyScene() { + return JobNotifySceneEnum.JOB_TASK_ERROR.getNotifyScene(); } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/listener/RetryTaskFailDeadLetterAlarmListener.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/listener/RetryTaskFailDeadLetterAlarmListener.java index 2d64c2c58..da4e40c8e 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/listener/RetryTaskFailDeadLetterAlarmListener.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/listener/RetryTaskFailDeadLetterAlarmListener.java @@ -1,11 +1,18 @@ package com.aizuda.easy.retry.server.retry.task.support.listener; import com.aizuda.easy.retry.common.core.alarm.AlarmContext; +import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.util.EnvironmentUtils; import com.aizuda.easy.retry.common.core.util.HostUtils; +import com.aizuda.easy.retry.server.common.AlarmInfoConverter; import com.aizuda.easy.retry.server.common.Lifecycle; -import com.aizuda.easy.retry.server.common.flow.control.AbstractFlowControl; +import com.aizuda.easy.retry.server.common.alarm.AbstractAlarm; +import com.aizuda.easy.retry.server.common.alarm.AbstractFlowControl; +import com.aizuda.easy.retry.server.common.alarm.AbstractRetryAlarm; +import com.aizuda.easy.retry.server.common.dto.AlarmInfo; +import com.aizuda.easy.retry.server.common.dto.NotifyConfigInfo; +import com.aizuda.easy.retry.server.common.dto.RetryAlarmInfo; import com.aizuda.easy.retry.server.common.util.DateUtils; import com.aizuda.easy.retry.server.common.triple.ImmutableTriple; import com.aizuda.easy.retry.server.common.triple.Triple; @@ -13,6 +20,7 @@ import com.aizuda.easy.retry.server.retry.task.support.event.RetryTaskFailDeadLe import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; import com.aizuda.easy.retry.template.datasource.persistence.po.NotifyConfig; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter; +import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; @@ -38,7 +46,7 @@ import java.util.stream.Collectors; */ @Component @Slf4j -public class RetryTaskFailDeadLetterAlarmListener extends AbstractFlowControl implements Runnable, Lifecycle { +public class RetryTaskFailDeadLetterAlarmListener extends AbstractRetryAlarm implements Runnable, Lifecycle { /** * 死信告警数据 @@ -46,87 +54,22 @@ public class RetryTaskFailDeadLetterAlarmListener extends AbstractFlowControl> queue = new LinkedBlockingQueue<>(1000); private static String retryTaskDeadTextMessagesFormatter = - "{}环境 重试任务失败进入死信队列 \n" + - "> 组名称:{} \n" + - "> 执行器名称:{} \n" + - "> 场景名称:{} \n" + - "> 业务数据:{} \n" + - "> 时间:{} \n"; - - @Autowired - protected AccessTemplate accessTemplate; - - private Thread thread; + "{}环境 重试任务失败进入死信队列 \n" + + "> 组名称:{} \n" + + "> 执行器名称:{} \n" + + "> 场景名称:{} \n" + + "> 业务数据:{} \n" + + "> 时间:{} \n"; @Override - public void start() { - thread = new Thread(this); - thread.start(); - } + protected List poll() throws InterruptedException { - @Override - public void close() { - if (Objects.nonNull(thread)) { - thread.interrupt(); + List allRetryDeadLetterList = queue.poll(5, TimeUnit.SECONDS); + if (CollectionUtils.isEmpty(allRetryDeadLetterList)) { + return Lists.newArrayList(); } - } - @Override - public void run() { - LogUtils.info(log, "RetryTaskFailDeadLetterAlarmListener time[{}] ip:[{}]", LocalDateTime.now(), - HostUtils.getIp()); - while (!Thread.currentThread().isInterrupted()) { - try { - List allRetryDeadLetterList = queue.poll(5, TimeUnit.SECONDS); - if (CollectionUtils.isEmpty(allRetryDeadLetterList)) { - continue; - } - - Set namespaceIds = new HashSet<>(); - Set groupNames = new HashSet<>(); - Set sceneNames = new HashSet<>(); - Map, List> retryDeadLetterMap = getRetryDeadLetterMap( - allRetryDeadLetterList, namespaceIds, groupNames, sceneNames); - - Map, List> notifyConfigMap = getNotifyConfigMap( - namespaceIds, groupNames, sceneNames); - if (notifyConfigMap == null) { - continue; - } - - // 循环发送消息 - retryDeadLetterMap.forEach((key, list) -> { - List notifyConfigsList = notifyConfigMap.get(key); - for (RetryDeadLetter retryDeadLetter : list) { - doSendAlarm(key, notifyConfigsList, AlarmDTOConverter.INSTANCE.toAlarmDTO(retryDeadLetter)); - } - }); - } catch (InterruptedException e) { - LogUtils.info(log, "retry task fail dead letter alarm stop"); - Thread.currentThread().interrupt(); - } catch (Exception e) { - LogUtils.error(log, "RetryTaskFailDeadLetterAlarmListener queue poll Exception", e); - } - } - } - - @NotNull - private static Map, List> getRetryDeadLetterMap( - final List allRetryDeadLetterList, final Set namespaceIds, - final Set groupNames, final Set sceneNames) { - return allRetryDeadLetterList.stream() - .collect(Collectors.groupingBy(i -> { - - String namespaceId = i.getNamespaceId(); - String groupName = i.getGroupName(); - String sceneName = i.getSceneName(); - - namespaceIds.add(namespaceId); - groupNames.add(groupName); - sceneNames.add(sceneName); - - return ImmutableTriple.of(namespaceId, groupName, sceneName); - })); + return AlarmInfoConverter.INSTANCE.deadLetterToAlarmInfo(allRetryDeadLetterList); } @Override @@ -137,18 +80,28 @@ public class RetryTaskFailDeadLetterAlarmListener extends AbstractFlowControl implements Runnable, Lifecycle { + AbstractRetryAlarm implements Runnable, Lifecycle { /** * 重试任务失败数量超过阈值告警数据 */ private LinkedBlockingQueue queue = new LinkedBlockingQueue<>(1000); - private Thread thread; private static String retryTaskFailMoreThresholdMessagesFormatter = - "{}环境 重试任务失败数量超过阈值 \n" + - "> 组名称:{} \n" + - "> 执行器名称:{} \n" + - "> 场景名称:{} \n" + - "> 重试次数:{} \n" + - "> 业务数据:{} \n" + - "> 时间:{} \n"; + "{}环境 重试任务失败数量超过阈值 \n" + + "> 组名称:{} \n" + + "> 执行器名称:{} \n" + + "> 场景名称:{} \n" + + "> 重试次数:{} \n" + + "> 业务数据:{} \n" + + "> 时间:{} \n"; @Override - public void start() { - thread = new Thread(this); - thread.start(); - } + protected List poll() throws InterruptedException { + // 无数据时阻塞线程 + RetryTask retryTask = queue.take(); - @Override - public void close() { - if (Objects.nonNull(thread)) { - thread.interrupt(); - } - } + // 拉取100条 + List lists = Lists.newArrayList(retryTask); + queue.drainTo(lists, 200); - @Override - public void run() { - LogUtils.info(log, "RetryTaskFailMoreThresholdAlarmListener time[{}] ip:[{}]", LocalDateTime.now(), - HostUtils.getIp()); - while (!Thread.currentThread().isInterrupted()) { - try { - - // 无数据时阻塞线程 - RetryTask retryTask = queue.take(); - - // 拉取100条 - List lists = Lists.newArrayList(retryTask); - queue.drainTo(lists, 200); - - Set namespaceIds = new HashSet<>(); - Set groupNames = new HashSet<>(); - Set sceneNames = new HashSet<>(); - Map, List> retryTaskMap = getRetryTaskMap( - lists, namespaceIds, groupNames, sceneNames); - - Map, List> notifyConfigMap = getNotifyConfigMap( - namespaceIds, groupNames, sceneNames); - if (notifyConfigMap == null) { - continue; - } - - // 循环发送消息 - retryTaskMap.forEach((key, list) -> { - List notifyConfigsList = notifyConfigMap.get(key); - for (RetryTask retryTask1 : list) { - doSendAlarm(key, notifyConfigsList, AlarmDTOConverter.INSTANCE.toAlarmDTO(retryTask1)); - } - }); - } catch (InterruptedException e) { - LogUtils.info(log, "retry task fail more alarm stop"); - Thread.currentThread().interrupt(); - } catch (Exception e) { - LogUtils.error(log, "RetryTaskFailMoreThresholdAlarmListener queue poll Exception", e); - } - } + return AlarmInfoConverter.INSTANCE.retryTaskToAlarmInfo(lists); } @Override @@ -115,39 +77,29 @@ public class RetryTaskFailMoreThresholdAlarmListener extends } @Override - protected AlarmContext buildAlarmContext(AlarmDTO alarmDTO, NotifyConfig notifyConfig) { + protected AlarmContext buildAlarmContext(RetryAlarmInfo retryAlarmInfo, NotifyConfigInfo notifyConfig) { + // 预警 return AlarmContext.build().text(retryTaskFailMoreThresholdMessagesFormatter, - EnvironmentUtils.getActiveProfile(), - alarmDTO.getGroupName(), - alarmDTO.getExecutorName(), - alarmDTO.getSceneName(), - alarmDTO.getRetryCount(), - alarmDTO.getArgsStr(), - DateUtils.format(alarmDTO.getCreateDt(), DateUtils.NORM_DATETIME_PATTERN)) - .title("组:[{}] 场景:[{}] 环境重试任务失败数量超过阈值", alarmDTO.getGroupName(), alarmDTO.getSceneName()) - .notifyAttribute(notifyConfig.getNotifyAttribute()); + EnvironmentUtils.getActiveProfile(), + retryAlarmInfo.getGroupName(), + retryAlarmInfo.getExecutorName(), + retryAlarmInfo.getSceneName(), + retryAlarmInfo.getRetryCount(), + retryAlarmInfo.getArgsStr(), + DateUtils.format(retryAlarmInfo.getCreateDt(), DateUtils.NORM_DATETIME_PATTERN)) + .title("组:[{}] 场景:[{}] 环境重试任务失败数量超过阈值", retryAlarmInfo.getGroupName(), + retryAlarmInfo.getSceneName()) + .notifyAttribute(notifyConfig.getNotifyAttribute()); } - @NotNull - private static Map, List> getRetryTaskMap( - final List list, final Set namespaceIds, - final Set groupNames, final Set sceneNames) { - - return list.stream() - .collect(Collectors.groupingBy(retryTask -> { - String namespaceId = retryTask.getNamespaceId(); - String groupName = retryTask.getGroupName(); - String sceneName = retryTask.getSceneName(); - - namespaceIds.add(namespaceId); - groupNames.add(groupName); - sceneNames.add(sceneName); - - return ImmutableTriple.of(namespaceId, groupName, sceneName); - })); - + @Override + protected void startLog() { + LogUtils.info(log, "RetryTaskFailMoreThresholdAlarmListener started"); } - + @Override + protected int getNotifyScene() { + return NotifySceneEnum.MAX_RETRY_ERROR.getNotifyScene(); + } }