feat(1.3.0-beta1.1):优化重试定时告警逻辑

This commit is contained in:
opensnail 2025-01-11 16:55:28 +08:00
parent 2fb450f48a
commit c6e1bc6dff
8 changed files with 312 additions and 254 deletions

View File

@ -18,12 +18,6 @@ public class NotifyConfig extends CreateUpdateDt {
private String groupName;
//
// /**
// * 业务id (scene_name或job_id或workflow_id)
// */
// private String businessId;
/**
* 任务类型 1重试任务 2回调任务 3JOB任务 4WORKFLOW任务
*/

View File

@ -0,0 +1,37 @@
package com.aizuda.snailjob.server.retry.task.dto;
import lombok.Data;
import java.util.List;
import java.util.Set;
/**
* @author opensnail
* @date 2025-01-11
* @since 1.3.0-beta1.1
*/
@Data
public class NotifyConfigDTO {
private Long id;
private Set<Long> recipientIds;
private Integer notifyThreshold;
private Integer notifyScene;
private Integer rateLimiterStatus;
private Integer rateLimiterThreshold;
private List<RecipientInfo> recipientInfos;
@Data
public static class RecipientInfo {
private Integer notifyType;
private String notifyAttribute;
}
}

View File

@ -14,6 +14,7 @@ import java.util.Set;
*/
@EqualsAndHashCode(callSuper = true)
@Data
@Deprecated
public class NotifyConfigPartitionTask extends PartitionTask {
private String namespaceId;

View File

@ -0,0 +1,28 @@
package com.aizuda.snailjob.server.retry.task.dto;
import com.aizuda.snailjob.server.common.dto.PartitionTask;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.util.Set;
/**
* @author opensnail
* @date 2025-01-11
* @since 1.3.0-beta1.1
*/
@EqualsAndHashCode(callSuper = true)
@Data
public class RetrySceneConfigPartitionTask extends PartitionTask {
private String namespaceId;
private String groupName;
private String sceneName;
/**
* 通知告警场景配置id列表
*/
private Set<Long> notifyIds;
}

View File

@ -6,10 +6,7 @@ import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo;
import com.aizuda.snailjob.server.common.dto.RetryLogMetaDTO;
import com.aizuda.snailjob.server.model.dto.RetryLogTaskDTO;
import com.aizuda.snailjob.server.model.dto.RetryTaskDTO;
import com.aizuda.snailjob.server.retry.task.dto.NotifyConfigPartitionTask;
import com.aizuda.snailjob.server.retry.task.dto.RetryPartitionTask;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskExecutorDTO;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.retry.task.dto.*;
import com.aizuda.snailjob.server.retry.task.generator.task.TaskContext;
import com.aizuda.snailjob.server.retry.task.support.timer.RetryTimerContext;
import com.aizuda.snailjob.template.datasource.persistence.po.*;
@ -52,12 +49,27 @@ public interface RetryTaskConverter {
RetryTimerContext toRetryTimerContext(RetryPartitionTask retryPartitionTask);
List<NotifyConfigPartitionTask> toNotifyConfigPartitionTask(List<NotifyConfig> notifyConfigs);
List<NotifyConfigDTO> toNotifyConfigDTO(List<NotifyConfig> notifyConfigs);
List<RetrySceneConfigPartitionTask> toRetrySceneConfigPartitionTask(List<RetrySceneConfig> retrySceneConfigs);
@Mappings({
@Mapping(target = "notifyIds", expression = "java(RetryTaskConverter.toNotifyIds(retrySceneConfig.getNotifyIds()))")
})
RetrySceneConfigPartitionTask toRetrySceneConfigPartitionTask(RetrySceneConfig retrySceneConfig);
@Mappings({
@Mapping(target = "recipientIds", expression = "java(RetryTaskConverter.toNotifyRecipientIds(notifyConfig.getRecipientIds()))")
})
NotifyConfigPartitionTask toNotifyConfigPartitionTask(NotifyConfig notifyConfig);
NotifyConfigDTO toNotifyConfigDTO(NotifyConfig notifyConfig);
static Set<Long> toNotifyIds(String notifyIdsStr) {
if (StrUtil.isBlank(notifyIdsStr)) {
return new HashSet<>();
}
return new HashSet<>(JsonUtil.parseList(notifyIdsStr, Long.class));
}
static Set<Long> toNotifyRecipientIds(String notifyRecipientIdsStr) {
if (StrUtil.isBlank(notifyRecipientIdsStr)) {

View File

@ -0,0 +1,159 @@
package com.aizuda.snailjob.server.retry.task.support.schedule;
import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.common.dto.PartitionTask;
import com.aizuda.snailjob.server.common.schedule.AbstractSchedule;
import com.aizuda.snailjob.server.common.util.PartitionTaskUtils;
import com.aizuda.snailjob.server.retry.task.dto.NotifyConfigDTO;
import com.aizuda.snailjob.server.retry.task.dto.RetrySceneConfigPartitionTask;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.mapper.NotifyRecipientMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyRecipient;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.*;
/**
* @author opensnail
* @date 2025-01-11
* @since 1.3.0-beta1.1
*/
public abstract class AbstractRetryTaskAlarmSchedule extends AbstractSchedule implements Lifecycle {
@Autowired
protected AccessTemplate accessTemplate;
@Autowired
private NotifyRecipientMapper recipientMapper;
@Override
protected void doExecute() {
PartitionTaskUtils.process(this::queryPartitionList, this::doHandler, 0);
}
/**
* 循环场景信息
*
* @param partitionTasks 需要告警的场景信息
*/
private void doHandler(List<? extends PartitionTask> partitionTasks) {
// 处理通知信息
Map<Long, NotifyConfigDTO> notifyConfigInfo = getNotifyConfigInfo((List<RetrySceneConfigPartitionTask>) partitionTasks);
if (notifyConfigInfo.isEmpty()) {
return;
}
for (PartitionTask partitionTask : partitionTasks) {
doSendAlarm((RetrySceneConfigPartitionTask) partitionTask, notifyConfigInfo);
}
}
protected abstract void doSendAlarm(RetrySceneConfigPartitionTask partitionTask, Map<Long, NotifyConfigDTO> notifyConfigInfo);
/**
* 获取需要处理的配置信息
*
* @param startId 偏移id
* @return 需要处理的场景列表
*/
protected List<RetrySceneConfigPartitionTask> queryPartitionList(Long startId) {
List<RetrySceneConfig> retrySceneConfigList = accessTemplate.getSceneConfigAccess()
.listPage(new PageDTO<>(0, 500),
new LambdaQueryWrapper<RetrySceneConfig>()
.gt(RetrySceneConfig::getId, startId)
.eq(RetrySceneConfig::getSceneStatus, StatusEnum.YES.getStatus())
.orderByDesc(RetrySceneConfig::getId)
).getRecords();
return RetryTaskConverter.INSTANCE.toRetrySceneConfigPartitionTask(retrySceneConfigList);
}
/**
* 获取通知信息
* @param partitionTasks 本次需要处理的场景列表
* @return Map<Long(通知配置id), NotifyConfigDTO(配置信息)>
*/
protected Map<Long, NotifyConfigDTO> getNotifyConfigInfo(List<RetrySceneConfigPartitionTask> partitionTasks) {
// 提前通知配置id
Set<Long> retryNotifyIds = partitionTasks.stream()
.map(RetrySceneConfigPartitionTask::getNotifyIds)
.filter(CollUtil::isNotEmpty)
.reduce((a, b) -> {
HashSet<Long> set = Sets.newHashSet();
set.addAll(a);
set.addAll(b);
return set;
}).orElse(new HashSet<>());
if (CollUtil.isEmpty(retryNotifyIds)) {
return Maps.newHashMap();
}
// 从DB中获取通知配置信息
List<NotifyConfigDTO> notifyConfigs = RetryTaskConverter.INSTANCE.toNotifyConfigDTO(accessTemplate.getNotifyConfigAccess()
.list(new LambdaQueryWrapper<NotifyConfig>()
.in(NotifyConfig::getId, retryNotifyIds)
.eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus())
.eq(NotifyConfig::getNotifyScene, RetryNotifySceneEnum.MAX_RETRY_ERROR.getNotifyScene())
.orderByAsc(NotifyConfig::getId)));
if (CollUtil.isEmpty(notifyConfigs)) {
return Maps.newHashMap();
}
// 提前通知人信息
Set<Long> recipientIds = notifyConfigs.stream()
.map(NotifyConfigDTO::getRecipientIds)
.filter(CollUtil::isNotEmpty)
.reduce((a, b) -> {
HashSet<Long> set = Sets.newHashSet();
set.addAll(a);
set.addAll(b);
return set;
}).orElse(new HashSet<>());
if (CollUtil.isEmpty(recipientIds)) {
return Maps.newHashMap();
}
// 从DB中获取通知人信息
List<NotifyRecipient> notifyRecipients = recipientMapper.selectByIds(recipientIds);
Map<Long, NotifyRecipient> recipientMap = StreamUtils.toIdentityMap(notifyRecipients, NotifyRecipient::getId);
Map<Long, NotifyConfigDTO> notifyConfigMap = Maps.newHashMap();
for (final NotifyConfigDTO notifyConfigDTO : notifyConfigs) {
List<NotifyConfigDTO.RecipientInfo> recipientList = StreamUtils.toList(notifyConfigDTO.getRecipientIds(),
recipientId -> {
NotifyRecipient notifyRecipient = recipientMap.get(recipientId);
if (Objects.isNull(notifyRecipient)) {
return null;
}
NotifyConfigDTO.RecipientInfo recipientInfo = new NotifyConfigDTO.RecipientInfo();
recipientInfo.setNotifyType(notifyRecipient.getNotifyType());
recipientInfo.setNotifyAttribute(notifyRecipient.getNotifyAttribute());
return recipientInfo;
});
notifyConfigDTO.setRecipientInfos(recipientList);
notifyConfigMap.put(notifyConfigDTO.getId(), notifyConfigDTO);
}
return notifyConfigMap;
}
}

View File

@ -1,36 +1,17 @@
package com.aizuda.snailjob.server.retry.task.support.schedule;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.alarm.Alarm;
import com.aizuda.snailjob.common.core.alarm.AlarmContext;
import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory;
import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.util.EnvironmentUtils;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.NetUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.common.dto.PartitionTask;
import com.aizuda.snailjob.server.common.schedule.AbstractSchedule;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.common.util.PartitionTaskUtils;
import com.aizuda.snailjob.server.retry.task.dto.NotifyConfigPartitionTask;
import com.aizuda.snailjob.server.retry.task.dto.NotifyConfigPartitionTask.RecipientInfo;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.server.retry.task.dto.NotifyConfigDTO;
import com.aizuda.snailjob.server.retry.task.dto.RetrySceneConfigPartitionTask;
import com.aizuda.snailjob.template.datasource.access.TaskAccess;
import com.aizuda.snailjob.template.datasource.persistence.mapper.NotifyRecipientMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyRecipient;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
@ -48,7 +29,7 @@ import java.util.*;
*/
@Component
@RequiredArgsConstructor
public class RetryErrorMoreThresholdAlarmSchedule extends AbstractSchedule implements Lifecycle {
public class RetryErrorMoreThresholdAlarmSchedule extends AbstractRetryTaskAlarmSchedule implements Lifecycle {
private static final String retryErrorMoreThresholdTextMessageFormatter =
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 场景重试失败数量超过{}个</font> \n" +
@ -58,9 +39,6 @@ public class RetryErrorMoreThresholdAlarmSchedule extends AbstractSchedule imple
"> 时间窗口:{} ~ {} \n" +
"> **共计:{}** \n";
private final AccessTemplate accessTemplate;
private final NotifyRecipientMapper recipientMapper;
@Override
public void start() {
taskScheduler.scheduleWithFixedDelay(this::execute, Instant.now(), Duration.parse("PT10M"));
@ -68,25 +46,13 @@ public class RetryErrorMoreThresholdAlarmSchedule extends AbstractSchedule imple
@Override
public void close() {
}
@Override
protected void doExecute() {
SnailJobLog.LOCAL.debug("retryErrorMoreThreshold time[{}] ip:[{}]",
LocalDateTime.now(),
NetUtil.getLocalIpStr());
PartitionTaskUtils.process(this::getNotifyConfigPartitions, this::doHandler, 0);
}
private void doHandler(List<? extends PartitionTask> partitionTasks) {
for (PartitionTask partitionTask : partitionTasks) {
doSendAlarm((NotifyConfigPartitionTask) partitionTask);
protected void doSendAlarm(RetrySceneConfigPartitionTask partitionTask, Map<Long, NotifyConfigDTO> notifyConfigInfo) {
if (CollUtil.isEmpty(partitionTask.getNotifyIds())) {
return;
}
}
private void doSendAlarm(NotifyConfigPartitionTask partitionTask) {
// x分钟内x组x场景进入死信队列的数据量
LocalDateTime now = LocalDateTime.now();
@ -95,102 +61,43 @@ public class RetryErrorMoreThresholdAlarmSchedule extends AbstractSchedule imple
partitionTask.getNamespaceId(),
new LambdaQueryWrapper<RetryDeadLetter>().
between(RetryDeadLetter::getCreateDt, now.minusMinutes(30), now)
.eq(RetryDeadLetter::getNamespaceId, partitionTask.getNamespaceId())
.eq(RetryDeadLetter::getGroupName, partitionTask.getGroupName())
.eq(RetryDeadLetter::getSceneName, partitionTask.getBusinessId()));
if (partitionTask.getNotifyThreshold() > 0 && count >= partitionTask.getNotifyThreshold()) {
List<RecipientInfo> recipientInfos = partitionTask.getRecipientInfos();
for (final RecipientInfo recipientInfo : recipientInfos) {
if (Objects.isNull(recipientInfo)) {
continue;
}
// 预警
AlarmContext context = AlarmContext.build()
.text(retryErrorMoreThresholdTextMessageFormatter,
EnvironmentUtils.getActiveProfile(),
count,
partitionTask.getNamespaceId(),
partitionTask.getGroupName(),
partitionTask.getBusinessId(),
DateUtils.format(now.minusMinutes(30),
DateUtils.NORM_DATETIME_PATTERN),
DateUtils.toNowFormat(DateUtils.NORM_DATETIME_PATTERN), count)
.title("{}环境 场景重试失败数量超过阈值", EnvironmentUtils.getActiveProfile())
.notifyAttribute(recipientInfo.getNotifyAttribute());
Alarm<AlarmContext> alarmType = SnailJobAlarmFactory.getAlarmType(
recipientInfo.getNotifyType());
alarmType.asyncSendMessage(context);
.eq(RetryDeadLetter::getSceneName, partitionTask.getSceneName()));
for (Long notifyId : partitionTask.getNotifyIds()) {
NotifyConfigDTO notifyConfigDTO = notifyConfigInfo.get(notifyId);
if (notifyConfigDTO == null) {
continue;
}
if (notifyConfigDTO.getNotifyThreshold() > 0 && count >= notifyConfigDTO.getNotifyThreshold()) {
List<NotifyConfigDTO.RecipientInfo> recipientInfos = notifyConfigDTO.getRecipientInfos();
for (final NotifyConfigDTO.RecipientInfo recipientInfo : recipientInfos) {
if (Objects.isNull(recipientInfo)) {
continue;
}
// 预警
AlarmContext context = AlarmContext.build()
.text(retryErrorMoreThresholdTextMessageFormatter,
EnvironmentUtils.getActiveProfile(),
count,
partitionTask.getNamespaceId(),
partitionTask.getGroupName(),
partitionTask.getSceneName(),
DateUtils.format(now.minusMinutes(30),
DateUtils.NORM_DATETIME_PATTERN),
DateUtils.toNowFormat(DateUtils.NORM_DATETIME_PATTERN), count)
.title("{}环境 场景重试失败数量超过阈值", EnvironmentUtils.getActiveProfile())
.notifyAttribute(recipientInfo.getNotifyAttribute());
Alarm<AlarmContext> alarmType = SnailJobAlarmFactory.getAlarmType(
recipientInfo.getNotifyType());
alarmType.asyncSendMessage(context);
}
}
}
}
// 1.拿500场景
// 2.获取通知配置ids并查询通知配置
// 3.Map<通知ID, 通知DTO>
// 4.循环场景查询死信数据量
// 5.场景对应的通知ids
private List<NotifyConfigPartitionTask> getNotifyConfigPartitions(Long startId) {
List<RetrySceneConfig> retrySceneConfigList = accessTemplate.getSceneConfigAccess()
.listPage(new PageDTO<>(0, 500), new LambdaQueryWrapper<>())
.getRecords();
Set<Long> retryNotifyIds = new HashSet<>();
for (RetrySceneConfig retrySceneConfig : retrySceneConfigList) {
HashSet<Long> notifyIds = StrUtil.isBlank(retrySceneConfig.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(retrySceneConfig.getNotifyIds(), Long.class));
retryNotifyIds.addAll(notifyIds);
}
if (CollUtil.isEmpty(retryNotifyIds)) {
return Lists.newArrayList();
}
List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess()
.list(new LambdaQueryWrapper<NotifyConfig>()
.gt(NotifyConfig::getId, startId)
.in(NotifyConfig::getId, retryNotifyIds)
.eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus())
.eq(NotifyConfig::getNotifyScene, RetryNotifySceneEnum.MAX_RETRY_ERROR.getNotifyScene())
.orderByAsc(NotifyConfig::getId));
Set<Long> recipientIds = notifyConfigs.stream()
.map(config -> new HashSet<>(JsonUtil.parseList(config.getRecipientIds(), Long.class)))
.reduce((a, b) -> {
HashSet<Long> set = Sets.newHashSet();
set.addAll(a);
set.addAll(b);
return set;
}).orElse(new HashSet<>());
if (CollUtil.isEmpty(recipientIds)) {
return Lists.newArrayList();
}
List<NotifyRecipient> notifyRecipients = recipientMapper.selectBatchIds(recipientIds);
Map<Long, NotifyRecipient> recipientMap = StreamUtils.toIdentityMap(notifyRecipients, NotifyRecipient::getId);
List<NotifyConfigPartitionTask> notifyConfigPartitionTasks = RetryTaskConverter.INSTANCE.toNotifyConfigPartitionTask(
notifyConfigs);
for (final NotifyConfigPartitionTask notifyConfigPartitionTask : notifyConfigPartitionTasks) {
List<RecipientInfo> recipientList = StreamUtils.toList(notifyConfigPartitionTask.getRecipientIds(),
recipientId -> {
NotifyRecipient notifyRecipient = recipientMap.get(recipientId);
if (Objects.isNull(notifyRecipient)) {
return null;
}
RecipientInfo recipientInfo = new RecipientInfo();
recipientInfo.setNotifyType(notifyRecipient.getNotifyType());
recipientInfo.setNotifyAttribute(notifyRecipient.getNotifyAttribute());
return recipientInfo;
});
notifyConfigPartitionTask.setRecipientInfos(recipientList);
}
return notifyConfigPartitionTasks;
}
@Override
public String lockName() {

View File

@ -1,33 +1,15 @@
package com.aizuda.snailjob.server.retry.task.support.schedule;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.alarm.AlarmContext;
import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory;
import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum;
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.util.EnvironmentUtils;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.common.dto.PartitionTask;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.schedule.AbstractSchedule;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.common.util.PartitionTaskUtils;
import com.aizuda.snailjob.server.retry.task.dto.NotifyConfigPartitionTask;
import com.aizuda.snailjob.server.retry.task.dto.NotifyConfigPartitionTask.RecipientInfo;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.mapper.NotifyRecipientMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyRecipient;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.aizuda.snailjob.server.retry.task.dto.NotifyConfigDTO;
import com.aizuda.snailjob.server.retry.task.dto.RetrySceneConfigPartitionTask;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@ -35,7 +17,6 @@ import org.springframework.stereotype.Component;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;
/**
@ -48,7 +29,7 @@ import java.util.stream.Collectors;
@Component
@Slf4j
@RequiredArgsConstructor
public class RetryTaskMoreThresholdAlarmSchedule extends AbstractSchedule implements Lifecycle {
public class RetryTaskMoreThresholdAlarmSchedule extends AbstractRetryTaskAlarmSchedule implements Lifecycle {
private static final String retryTaskMoreThresholdTextMessageFormatter =
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 场景重试数量超过{}个</font> \n" +
@ -58,9 +39,6 @@ public class RetryTaskMoreThresholdAlarmSchedule extends AbstractSchedule implem
"> 告警时间:{} \n" +
"> **共计:{}** \n";
private final AccessTemplate accessTemplate;
private final NotifyRecipientMapper recipientMapper;
@Override
public void start() {
taskScheduler.scheduleWithFixedDelay(this::execute, Instant.now(), Duration.parse("PT10M"));
@ -70,18 +48,8 @@ public class RetryTaskMoreThresholdAlarmSchedule extends AbstractSchedule implem
public void close() {
}
@Override
protected void doExecute() {
PartitionTaskUtils.process(this::getNotifyConfigPartitions, this::doHandler, 0);
}
private void doHandler(List<? extends PartitionTask> partitionTasks) {
for (PartitionTask partitionTask : partitionTasks) {
doSendAlarm((NotifyConfigPartitionTask) partitionTask);
}
}
private void doSendAlarm(NotifyConfigPartitionTask partitionTask) {
protected void doSendAlarm(RetrySceneConfigPartitionTask partitionTask, Map<Long, NotifyConfigDTO> notifyConfigInfo) {
// x分钟内x组x场景进入重试任务的数据量
long count = accessTemplate.getRetryTaskAccess()
@ -89,87 +57,39 @@ public class RetryTaskMoreThresholdAlarmSchedule extends AbstractSchedule implem
new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getNamespaceId, partitionTask.getNamespaceId())
.eq(RetryTask::getGroupName, partitionTask.getGroupName())
.eq(RetryTask::getSceneName, partitionTask.getBusinessId())
.eq(RetryTask::getSceneName, partitionTask.getSceneName())
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()));
if (partitionTask.getNotifyThreshold() > 0 && count >= partitionTask.getNotifyThreshold()) {
List<RecipientInfo> recipientInfos = partitionTask.getRecipientInfos();
for (final RecipientInfo recipientInfo : recipientInfos) {
if (Objects.isNull(recipientInfo)) {
continue;
}
// 预警
AlarmContext context = AlarmContext.build()
.text(retryTaskMoreThresholdTextMessageFormatter,
EnvironmentUtils.getActiveProfile(),
count,
partitionTask.getNamespaceId(),
partitionTask.getGroupName(),
partitionTask.getBusinessId(),
DateUtils.toNowFormat(DateUtils.NORM_DATETIME_PATTERN),
count)
.title("{}环境 场景重试数量超过阈值", EnvironmentUtils.getActiveProfile())
.notifyAttribute(recipientInfo.getNotifyAttribute());
Optional.ofNullable(SnailJobAlarmFactory.getAlarmType(recipientInfo.getNotifyType()))
.ifPresent(alarmType -> alarmType.asyncSendMessage(context));
for (Long notifyId : partitionTask.getNotifyIds()) {
NotifyConfigDTO notifyConfigDTO = notifyConfigInfo.get(notifyId);
if (notifyConfigDTO == null) {
continue;
}
if (notifyConfigDTO.getNotifyThreshold() > 0 && count >= notifyConfigDTO.getNotifyThreshold()) {
List<NotifyConfigDTO.RecipientInfo> recipientInfos = notifyConfigDTO.getRecipientInfos();
for (final NotifyConfigDTO.RecipientInfo recipientInfo : recipientInfos) {
if (Objects.isNull(recipientInfo)) {
continue;
}
// 预警
AlarmContext context = AlarmContext.build()
.text(retryTaskMoreThresholdTextMessageFormatter,
EnvironmentUtils.getActiveProfile(),
count,
partitionTask.getNamespaceId(),
partitionTask.getGroupName(),
partitionTask.getSceneName(),
DateUtils.toNowFormat(DateUtils.NORM_DATETIME_PATTERN),
count)
.title("{}环境 场景重试数量超过阈值", EnvironmentUtils.getActiveProfile())
.notifyAttribute(recipientInfo.getNotifyAttribute());
Optional.ofNullable(SnailJobAlarmFactory.getAlarmType(recipientInfo.getNotifyType()))
.ifPresent(alarmType -> alarmType.asyncSendMessage(context));
}
}
}
}
private List<NotifyConfigPartitionTask> getNotifyConfigPartitions(Long startId) {
List<RetrySceneConfig> retrySceneConfigList = accessTemplate.getSceneConfigAccess()
.listPage(new PageDTO<>(0, 500), new LambdaQueryWrapper<>())
.getRecords();
Set<Long> retryNotifyIds = new HashSet<>();
for (RetrySceneConfig retrySceneConfig : retrySceneConfigList) {
HashSet<Long> notifyIds = StrUtil.isBlank(retrySceneConfig.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(retrySceneConfig.getNotifyIds(), Long.class));
retryNotifyIds.addAll(notifyIds);
}
if (CollUtil.isEmpty(retryNotifyIds)) {
return Lists.newArrayList();
}
List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess()
.list(new LambdaQueryWrapper<NotifyConfig>()
.gt(NotifyConfig::getId, startId)
.in(NotifyConfig::getId, retryNotifyIds)
.eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus())
.eq(NotifyConfig::getSystemTaskType, SyetemTaskTypeEnum.RETRY.getType())
.eq(NotifyConfig::getNotifyScene, RetryNotifySceneEnum.MAX_RETRY.getNotifyScene())
.orderByAsc(NotifyConfig::getId)); // SQLServer 分页必须 ORDER BY
Set<Long> recipientIds = notifyConfigs.stream()
.flatMap(config -> JsonUtil.parseList(config.getRecipientIds(), Long.class).stream())
.collect(Collectors.toSet());
if (CollUtil.isEmpty(recipientIds)) {
return Lists.newArrayList();
}
Map<Long, NotifyRecipient> recipientMap = StreamUtils.toIdentityMap(
recipientMapper.selectBatchIds(recipientIds), NotifyRecipient::getId);
List<NotifyConfigPartitionTask> notifyConfigPartitionTasks = RetryTaskConverter.INSTANCE.toNotifyConfigPartitionTask(
notifyConfigs);
notifyConfigPartitionTasks.forEach(task -> {
List<RecipientInfo> recipientList = StreamUtils.toList(task.getRecipientIds(), recipientId -> {
NotifyRecipient notifyRecipient = recipientMap.get(recipientId);
if (Objects.isNull(notifyRecipient)) {
return null;
}
RecipientInfo recipientInfo = new RecipientInfo();
recipientInfo.setNotifyType(notifyRecipient.getNotifyType());
recipientInfo.setNotifyAttribute(notifyRecipient.getNotifyAttribute());
return recipientInfo;
});
task.setRecipientInfos(recipientList);
});
return notifyConfigPartitionTasks;
}
@Override