From c6e1bc6dff5ef103e9c5f5e86bd2bf5e14c69838 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Sat, 11 Jan 2025 16:55:28 +0800 Subject: [PATCH] =?UTF-8?q?feat(1.3.0-beta1.1):=E4=BC=98=E5=8C=96=E9=87=8D?= =?UTF-8?q?=E8=AF=95=E5=AE=9A=E6=97=B6=E5=91=8A=E8=AD=A6=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../persistence/po/NotifyConfig.java | 6 - .../retry/task/dto/NotifyConfigDTO.java | 37 ++++ .../task/dto/NotifyConfigPartitionTask.java | 1 + .../dto/RetrySceneConfigPartitionTask.java | 28 +++ .../task/support/RetryTaskConverter.java | 24 ++- .../AbstractRetryTaskAlarmSchedule.java | 159 +++++++++++++++++ .../RetryErrorMoreThresholdAlarmSchedule.java | 167 ++++-------------- .../RetryTaskMoreThresholdAlarmSchedule.java | 144 ++++----------- 8 files changed, 312 insertions(+), 254 deletions(-) create mode 100644 snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/NotifyConfigDTO.java create mode 100644 snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetrySceneConfigPartitionTask.java create mode 100644 snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/AbstractRetryTaskAlarmSchedule.java diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/NotifyConfig.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/NotifyConfig.java index f4fc3ec34..ac8010e59 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/NotifyConfig.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/NotifyConfig.java @@ -18,12 +18,6 @@ public class NotifyConfig extends CreateUpdateDt { private String groupName; -// -// /** -// * 业务id (scene_name或job_id或workflow_id) -// */ -// private String businessId; - /** * 任务类型 1、重试任务 2、回调任务、 3、JOB任务 4、WORKFLOW任务 */ diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/NotifyConfigDTO.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/NotifyConfigDTO.java new file mode 100644 index 000000000..d218b7044 --- /dev/null +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/NotifyConfigDTO.java @@ -0,0 +1,37 @@ +package com.aizuda.snailjob.server.retry.task.dto; + +import lombok.Data; + +import java.util.List; +import java.util.Set; + +/** + * @author opensnail + * @date 2025-01-11 + * @since 1.3.0-beta1.1 + */ +@Data +public class NotifyConfigDTO { + + private Long id; + + private Set recipientIds; + + private Integer notifyThreshold; + + private Integer notifyScene; + + private Integer rateLimiterStatus; + + private Integer rateLimiterThreshold; + + private List recipientInfos; + + @Data + public static class RecipientInfo { + + private Integer notifyType; + + private String notifyAttribute; + } +} diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/NotifyConfigPartitionTask.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/NotifyConfigPartitionTask.java index b007da0a2..3a268d399 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/NotifyConfigPartitionTask.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/NotifyConfigPartitionTask.java @@ -14,6 +14,7 @@ import java.util.Set; */ @EqualsAndHashCode(callSuper = true) @Data +@Deprecated public class NotifyConfigPartitionTask extends PartitionTask { private String namespaceId; diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetrySceneConfigPartitionTask.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetrySceneConfigPartitionTask.java new file mode 100644 index 000000000..93f1b7619 --- /dev/null +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetrySceneConfigPartitionTask.java @@ -0,0 +1,28 @@ +package com.aizuda.snailjob.server.retry.task.dto; + +import com.aizuda.snailjob.server.common.dto.PartitionTask; +import lombok.Data; +import lombok.EqualsAndHashCode; + +import java.util.Set; + +/** + * @author opensnail + * @date 2025-01-11 + * @since 1.3.0-beta1.1 + */ +@EqualsAndHashCode(callSuper = true) +@Data +public class RetrySceneConfigPartitionTask extends PartitionTask { + + private String namespaceId; + + private String groupName; + + private String sceneName; + + /** + * 通知告警场景配置id列表 + */ + private Set notifyIds; +} diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskConverter.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskConverter.java index b628db0ac..9fef331fd 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskConverter.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskConverter.java @@ -6,10 +6,7 @@ import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo; import com.aizuda.snailjob.server.common.dto.RetryLogMetaDTO; import com.aizuda.snailjob.server.model.dto.RetryLogTaskDTO; import com.aizuda.snailjob.server.model.dto.RetryTaskDTO; -import com.aizuda.snailjob.server.retry.task.dto.NotifyConfigPartitionTask; -import com.aizuda.snailjob.server.retry.task.dto.RetryPartitionTask; -import com.aizuda.snailjob.server.retry.task.dto.RetryTaskExecutorDTO; -import com.aizuda.snailjob.server.retry.task.dto.RetryTaskFailAlarmEventDTO; +import com.aizuda.snailjob.server.retry.task.dto.*; import com.aizuda.snailjob.server.retry.task.generator.task.TaskContext; import com.aizuda.snailjob.server.retry.task.support.timer.RetryTimerContext; import com.aizuda.snailjob.template.datasource.persistence.po.*; @@ -52,12 +49,27 @@ public interface RetryTaskConverter { RetryTimerContext toRetryTimerContext(RetryPartitionTask retryPartitionTask); - List toNotifyConfigPartitionTask(List notifyConfigs); + List toNotifyConfigDTO(List notifyConfigs); + + List toRetrySceneConfigPartitionTask(List retrySceneConfigs); + + @Mappings({ + @Mapping(target = "notifyIds", expression = "java(RetryTaskConverter.toNotifyIds(retrySceneConfig.getNotifyIds()))") + }) + RetrySceneConfigPartitionTask toRetrySceneConfigPartitionTask(RetrySceneConfig retrySceneConfig); @Mappings({ @Mapping(target = "recipientIds", expression = "java(RetryTaskConverter.toNotifyRecipientIds(notifyConfig.getRecipientIds()))") }) - NotifyConfigPartitionTask toNotifyConfigPartitionTask(NotifyConfig notifyConfig); + NotifyConfigDTO toNotifyConfigDTO(NotifyConfig notifyConfig); + + static Set toNotifyIds(String notifyIdsStr) { + if (StrUtil.isBlank(notifyIdsStr)) { + return new HashSet<>(); + } + + return new HashSet<>(JsonUtil.parseList(notifyIdsStr, Long.class)); + } static Set toNotifyRecipientIds(String notifyRecipientIdsStr) { if (StrUtil.isBlank(notifyRecipientIdsStr)) { diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/AbstractRetryTaskAlarmSchedule.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/AbstractRetryTaskAlarmSchedule.java new file mode 100644 index 000000000..9bd3f45aa --- /dev/null +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/AbstractRetryTaskAlarmSchedule.java @@ -0,0 +1,159 @@ +package com.aizuda.snailjob.server.retry.task.support.schedule; + +import cn.hutool.core.collection.CollUtil; +import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum; +import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.common.core.util.JsonUtil; +import com.aizuda.snailjob.common.core.util.StreamUtils; +import com.aizuda.snailjob.server.common.Lifecycle; +import com.aizuda.snailjob.server.common.dto.PartitionTask; +import com.aizuda.snailjob.server.common.schedule.AbstractSchedule; +import com.aizuda.snailjob.server.common.util.PartitionTaskUtils; +import com.aizuda.snailjob.server.retry.task.dto.NotifyConfigDTO; +import com.aizuda.snailjob.server.retry.task.dto.RetrySceneConfigPartitionTask; +import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; +import com.aizuda.snailjob.template.datasource.access.AccessTemplate; +import com.aizuda.snailjob.template.datasource.persistence.mapper.NotifyRecipientMapper; +import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig; +import com.aizuda.snailjob.template.datasource.persistence.po.NotifyRecipient; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.*; + +/** + * @author opensnail + * @date 2025-01-11 + * @since 1.3.0-beta1.1 + */ +public abstract class AbstractRetryTaskAlarmSchedule extends AbstractSchedule implements Lifecycle { + @Autowired + protected AccessTemplate accessTemplate; + @Autowired + private NotifyRecipientMapper recipientMapper; + + @Override + protected void doExecute() { + PartitionTaskUtils.process(this::queryPartitionList, this::doHandler, 0); + } + + /** + * 循环场景信息 + * + * @param partitionTasks 需要告警的场景信息 + */ + private void doHandler(List partitionTasks) { + + // 处理通知信息 + Map notifyConfigInfo = getNotifyConfigInfo((List) partitionTasks); + if (notifyConfigInfo.isEmpty()) { + return; + } + + for (PartitionTask partitionTask : partitionTasks) { + doSendAlarm((RetrySceneConfigPartitionTask) partitionTask, notifyConfigInfo); + } + } + + protected abstract void doSendAlarm(RetrySceneConfigPartitionTask partitionTask, Map notifyConfigInfo); + + + /** + * 获取需要处理的配置信息 + * + * @param startId 偏移id + * @return 需要处理的场景列表 + */ + protected List queryPartitionList(Long startId) { + List retrySceneConfigList = accessTemplate.getSceneConfigAccess() + .listPage(new PageDTO<>(0, 500), + new LambdaQueryWrapper() + .gt(RetrySceneConfig::getId, startId) + .eq(RetrySceneConfig::getSceneStatus, StatusEnum.YES.getStatus()) + .orderByDesc(RetrySceneConfig::getId) + ).getRecords(); + return RetryTaskConverter.INSTANCE.toRetrySceneConfigPartitionTask(retrySceneConfigList); + } + + + /** + * 获取通知信息 + * @param partitionTasks 本次需要处理的场景列表 + * @return Map + */ + protected Map getNotifyConfigInfo(List partitionTasks) { + + // 提前通知配置id + Set retryNotifyIds = partitionTasks.stream() + .map(RetrySceneConfigPartitionTask::getNotifyIds) + .filter(CollUtil::isNotEmpty) + .reduce((a, b) -> { + HashSet set = Sets.newHashSet(); + set.addAll(a); + set.addAll(b); + return set; + }).orElse(new HashSet<>()); + + if (CollUtil.isEmpty(retryNotifyIds)) { + return Maps.newHashMap(); + } + + // 从DB中获取通知配置信息 + List notifyConfigs = RetryTaskConverter.INSTANCE.toNotifyConfigDTO(accessTemplate.getNotifyConfigAccess() + .list(new LambdaQueryWrapper() + .in(NotifyConfig::getId, retryNotifyIds) + .eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus()) + .eq(NotifyConfig::getNotifyScene, RetryNotifySceneEnum.MAX_RETRY_ERROR.getNotifyScene()) + .orderByAsc(NotifyConfig::getId))); + if (CollUtil.isEmpty(notifyConfigs)) { + return Maps.newHashMap(); + } + + // 提前通知人信息 + Set recipientIds = notifyConfigs.stream() + .map(NotifyConfigDTO::getRecipientIds) + .filter(CollUtil::isNotEmpty) + .reduce((a, b) -> { + HashSet set = Sets.newHashSet(); + set.addAll(a); + set.addAll(b); + return set; + }).orElse(new HashSet<>()); + + if (CollUtil.isEmpty(recipientIds)) { + return Maps.newHashMap(); + } + + // 从DB中获取通知人信息 + List notifyRecipients = recipientMapper.selectByIds(recipientIds); + Map recipientMap = StreamUtils.toIdentityMap(notifyRecipients, NotifyRecipient::getId); + + Map notifyConfigMap = Maps.newHashMap(); + for (final NotifyConfigDTO notifyConfigDTO : notifyConfigs) { + + List recipientList = StreamUtils.toList(notifyConfigDTO.getRecipientIds(), + recipientId -> { + NotifyRecipient notifyRecipient = recipientMap.get(recipientId); + if (Objects.isNull(notifyRecipient)) { + return null; + } + + NotifyConfigDTO.RecipientInfo recipientInfo = new NotifyConfigDTO.RecipientInfo(); + recipientInfo.setNotifyType(notifyRecipient.getNotifyType()); + recipientInfo.setNotifyAttribute(notifyRecipient.getNotifyAttribute()); + + return recipientInfo; + }); + + notifyConfigDTO.setRecipientInfos(recipientList); + notifyConfigMap.put(notifyConfigDTO.getId(), notifyConfigDTO); + } + + return notifyConfigMap; + } + +} diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryErrorMoreThresholdAlarmSchedule.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryErrorMoreThresholdAlarmSchedule.java index ce7e8fc1b..77073b63b 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryErrorMoreThresholdAlarmSchedule.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryErrorMoreThresholdAlarmSchedule.java @@ -1,36 +1,17 @@ package com.aizuda.snailjob.server.retry.task.support.schedule; import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.alarm.Alarm; import com.aizuda.snailjob.common.core.alarm.AlarmContext; import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory; -import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum; -import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.util.EnvironmentUtils; -import com.aizuda.snailjob.common.core.util.JsonUtil; -import com.aizuda.snailjob.common.core.util.NetUtil; -import com.aizuda.snailjob.common.core.util.StreamUtils; -import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.server.common.Lifecycle; -import com.aizuda.snailjob.server.common.dto.PartitionTask; -import com.aizuda.snailjob.server.common.schedule.AbstractSchedule; import com.aizuda.snailjob.server.common.util.DateUtils; -import com.aizuda.snailjob.server.common.util.PartitionTaskUtils; -import com.aizuda.snailjob.server.retry.task.dto.NotifyConfigPartitionTask; -import com.aizuda.snailjob.server.retry.task.dto.NotifyConfigPartitionTask.RecipientInfo; -import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; -import com.aizuda.snailjob.template.datasource.access.AccessTemplate; +import com.aizuda.snailjob.server.retry.task.dto.NotifyConfigDTO; +import com.aizuda.snailjob.server.retry.task.dto.RetrySceneConfigPartitionTask; import com.aizuda.snailjob.template.datasource.access.TaskAccess; -import com.aizuda.snailjob.template.datasource.persistence.mapper.NotifyRecipientMapper; -import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig; -import com.aizuda.snailjob.template.datasource.persistence.po.NotifyRecipient; import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter; -import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; @@ -48,7 +29,7 @@ import java.util.*; */ @Component @RequiredArgsConstructor -public class RetryErrorMoreThresholdAlarmSchedule extends AbstractSchedule implements Lifecycle { +public class RetryErrorMoreThresholdAlarmSchedule extends AbstractRetryTaskAlarmSchedule implements Lifecycle { private static final String retryErrorMoreThresholdTextMessageFormatter = "{}环境 场景重试失败数量超过{}个 \n" + @@ -58,9 +39,6 @@ public class RetryErrorMoreThresholdAlarmSchedule extends AbstractSchedule imple "> 时间窗口:{} ~ {} \n" + "> **共计:{}** \n"; - private final AccessTemplate accessTemplate; - private final NotifyRecipientMapper recipientMapper; - @Override public void start() { taskScheduler.scheduleWithFixedDelay(this::execute, Instant.now(), Duration.parse("PT10M")); @@ -68,25 +46,13 @@ public class RetryErrorMoreThresholdAlarmSchedule extends AbstractSchedule imple @Override public void close() { - } @Override - protected void doExecute() { - SnailJobLog.LOCAL.debug("retryErrorMoreThreshold time[{}] ip:[{}]", - LocalDateTime.now(), - NetUtil.getLocalIpStr()); - PartitionTaskUtils.process(this::getNotifyConfigPartitions, this::doHandler, 0); - } - - private void doHandler(List partitionTasks) { - - for (PartitionTask partitionTask : partitionTasks) { - doSendAlarm((NotifyConfigPartitionTask) partitionTask); + protected void doSendAlarm(RetrySceneConfigPartitionTask partitionTask, Map notifyConfigInfo) { + if (CollUtil.isEmpty(partitionTask.getNotifyIds())) { + return; } - } - - private void doSendAlarm(NotifyConfigPartitionTask partitionTask) { // x分钟内、x组、x场景进入死信队列的数据量 LocalDateTime now = LocalDateTime.now(); @@ -95,102 +61,43 @@ public class RetryErrorMoreThresholdAlarmSchedule extends AbstractSchedule imple partitionTask.getNamespaceId(), new LambdaQueryWrapper(). between(RetryDeadLetter::getCreateDt, now.minusMinutes(30), now) + .eq(RetryDeadLetter::getNamespaceId, partitionTask.getNamespaceId()) .eq(RetryDeadLetter::getGroupName, partitionTask.getGroupName()) - .eq(RetryDeadLetter::getSceneName, partitionTask.getBusinessId())); - if (partitionTask.getNotifyThreshold() > 0 && count >= partitionTask.getNotifyThreshold()) { - List recipientInfos = partitionTask.getRecipientInfos(); - for (final RecipientInfo recipientInfo : recipientInfos) { - if (Objects.isNull(recipientInfo)) { - continue; - } - // 预警 - AlarmContext context = AlarmContext.build() - .text(retryErrorMoreThresholdTextMessageFormatter, - EnvironmentUtils.getActiveProfile(), - count, - partitionTask.getNamespaceId(), - partitionTask.getGroupName(), - partitionTask.getBusinessId(), - DateUtils.format(now.minusMinutes(30), - DateUtils.NORM_DATETIME_PATTERN), - DateUtils.toNowFormat(DateUtils.NORM_DATETIME_PATTERN), count) - .title("{}环境 场景重试失败数量超过阈值", EnvironmentUtils.getActiveProfile()) - .notifyAttribute(recipientInfo.getNotifyAttribute()); - Alarm alarmType = SnailJobAlarmFactory.getAlarmType( - recipientInfo.getNotifyType()); - alarmType.asyncSendMessage(context); + .eq(RetryDeadLetter::getSceneName, partitionTask.getSceneName())); + + for (Long notifyId : partitionTask.getNotifyIds()) { + NotifyConfigDTO notifyConfigDTO = notifyConfigInfo.get(notifyId); + if (notifyConfigDTO == null) { + continue; } + if (notifyConfigDTO.getNotifyThreshold() > 0 && count >= notifyConfigDTO.getNotifyThreshold()) { + List recipientInfos = notifyConfigDTO.getRecipientInfos(); + for (final NotifyConfigDTO.RecipientInfo recipientInfo : recipientInfos) { + if (Objects.isNull(recipientInfo)) { + continue; + } + // 预警 + AlarmContext context = AlarmContext.build() + .text(retryErrorMoreThresholdTextMessageFormatter, + EnvironmentUtils.getActiveProfile(), + count, + partitionTask.getNamespaceId(), + partitionTask.getGroupName(), + partitionTask.getSceneName(), + DateUtils.format(now.minusMinutes(30), + DateUtils.NORM_DATETIME_PATTERN), + DateUtils.toNowFormat(DateUtils.NORM_DATETIME_PATTERN), count) + .title("{}环境 场景重试失败数量超过阈值", EnvironmentUtils.getActiveProfile()) + .notifyAttribute(recipientInfo.getNotifyAttribute()); + Alarm alarmType = SnailJobAlarmFactory.getAlarmType( + recipientInfo.getNotifyType()); + alarmType.asyncSendMessage(context); + } + } } } - // 1.拿500场景 - // 2.获取通知配置ids并查询通知配置 - // 3.Map<通知ID, 通知DTO> - // 4.循环场景查询死信数据量 - // 5.场景对应的通知ids - private List getNotifyConfigPartitions(Long startId) { - - List retrySceneConfigList = accessTemplate.getSceneConfigAccess() - .listPage(new PageDTO<>(0, 500), new LambdaQueryWrapper<>()) - .getRecords(); - - Set retryNotifyIds = new HashSet<>(); - for (RetrySceneConfig retrySceneConfig : retrySceneConfigList) { - HashSet notifyIds = StrUtil.isBlank(retrySceneConfig.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(retrySceneConfig.getNotifyIds(), Long.class)); - retryNotifyIds.addAll(notifyIds); - } - if (CollUtil.isEmpty(retryNotifyIds)) { - return Lists.newArrayList(); - } - - List notifyConfigs = accessTemplate.getNotifyConfigAccess() - .list(new LambdaQueryWrapper() - .gt(NotifyConfig::getId, startId) - .in(NotifyConfig::getId, retryNotifyIds) - .eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus()) - .eq(NotifyConfig::getNotifyScene, RetryNotifySceneEnum.MAX_RETRY_ERROR.getNotifyScene()) - .orderByAsc(NotifyConfig::getId)); - - Set recipientIds = notifyConfigs.stream() - .map(config -> new HashSet<>(JsonUtil.parseList(config.getRecipientIds(), Long.class))) - .reduce((a, b) -> { - HashSet set = Sets.newHashSet(); - set.addAll(a); - set.addAll(b); - return set; - }).orElse(new HashSet<>()); - - if (CollUtil.isEmpty(recipientIds)) { - return Lists.newArrayList(); - } - - List notifyRecipients = recipientMapper.selectBatchIds(recipientIds); - Map recipientMap = StreamUtils.toIdentityMap(notifyRecipients, NotifyRecipient::getId); - - List notifyConfigPartitionTasks = RetryTaskConverter.INSTANCE.toNotifyConfigPartitionTask( - notifyConfigs); - for (final NotifyConfigPartitionTask notifyConfigPartitionTask : notifyConfigPartitionTasks) { - - List recipientList = StreamUtils.toList(notifyConfigPartitionTask.getRecipientIds(), - recipientId -> { - NotifyRecipient notifyRecipient = recipientMap.get(recipientId); - if (Objects.isNull(notifyRecipient)) { - return null; - } - - RecipientInfo recipientInfo = new RecipientInfo(); - recipientInfo.setNotifyType(notifyRecipient.getNotifyType()); - recipientInfo.setNotifyAttribute(notifyRecipient.getNotifyAttribute()); - - return recipientInfo; - }); - - notifyConfigPartitionTask.setRecipientInfos(recipientList); - } - - return notifyConfigPartitionTasks; - } @Override public String lockName() { diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryTaskMoreThresholdAlarmSchedule.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryTaskMoreThresholdAlarmSchedule.java index 74c16fdc7..f5ab78595 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryTaskMoreThresholdAlarmSchedule.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryTaskMoreThresholdAlarmSchedule.java @@ -1,33 +1,15 @@ package com.aizuda.snailjob.server.retry.task.support.schedule; -import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.alarm.AlarmContext; import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory; -import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum; import com.aizuda.snailjob.common.core.enums.RetryStatusEnum; -import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.util.EnvironmentUtils; -import com.aizuda.snailjob.common.core.util.JsonUtil; -import com.aizuda.snailjob.common.core.util.StreamUtils; import com.aizuda.snailjob.server.common.Lifecycle; -import com.aizuda.snailjob.server.common.dto.PartitionTask; -import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; -import com.aizuda.snailjob.server.common.schedule.AbstractSchedule; import com.aizuda.snailjob.server.common.util.DateUtils; -import com.aizuda.snailjob.server.common.util.PartitionTaskUtils; -import com.aizuda.snailjob.server.retry.task.dto.NotifyConfigPartitionTask; -import com.aizuda.snailjob.server.retry.task.dto.NotifyConfigPartitionTask.RecipientInfo; -import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; -import com.aizuda.snailjob.template.datasource.access.AccessTemplate; -import com.aizuda.snailjob.template.datasource.persistence.mapper.NotifyRecipientMapper; -import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig; -import com.aizuda.snailjob.template.datasource.persistence.po.NotifyRecipient; -import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; +import com.aizuda.snailjob.server.retry.task.dto.NotifyConfigDTO; +import com.aizuda.snailjob.server.retry.task.dto.RetrySceneConfigPartitionTask; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; -import com.google.common.collect.Lists; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -35,7 +17,6 @@ import org.springframework.stereotype.Component; import java.time.Duration; import java.time.Instant; import java.util.*; -import java.util.stream.Collectors; /** @@ -48,7 +29,7 @@ import java.util.stream.Collectors; @Component @Slf4j @RequiredArgsConstructor -public class RetryTaskMoreThresholdAlarmSchedule extends AbstractSchedule implements Lifecycle { +public class RetryTaskMoreThresholdAlarmSchedule extends AbstractRetryTaskAlarmSchedule implements Lifecycle { private static final String retryTaskMoreThresholdTextMessageFormatter = "{}环境 场景重试数量超过{}个 \n" + @@ -58,9 +39,6 @@ public class RetryTaskMoreThresholdAlarmSchedule extends AbstractSchedule implem "> 告警时间:{} \n" + "> **共计:{}** \n"; - private final AccessTemplate accessTemplate; - private final NotifyRecipientMapper recipientMapper; - @Override public void start() { taskScheduler.scheduleWithFixedDelay(this::execute, Instant.now(), Duration.parse("PT10M")); @@ -70,18 +48,8 @@ public class RetryTaskMoreThresholdAlarmSchedule extends AbstractSchedule implem public void close() { } - @Override - protected void doExecute() { - PartitionTaskUtils.process(this::getNotifyConfigPartitions, this::doHandler, 0); - } - private void doHandler(List partitionTasks) { - for (PartitionTask partitionTask : partitionTasks) { - doSendAlarm((NotifyConfigPartitionTask) partitionTask); - } - } - - private void doSendAlarm(NotifyConfigPartitionTask partitionTask) { + protected void doSendAlarm(RetrySceneConfigPartitionTask partitionTask, Map notifyConfigInfo) { // x分钟内、x组、x场景进入重试任务的数据量 long count = accessTemplate.getRetryTaskAccess() @@ -89,87 +57,39 @@ public class RetryTaskMoreThresholdAlarmSchedule extends AbstractSchedule implem new LambdaQueryWrapper() .eq(RetryTask::getNamespaceId, partitionTask.getNamespaceId()) .eq(RetryTask::getGroupName, partitionTask.getGroupName()) - .eq(RetryTask::getSceneName, partitionTask.getBusinessId()) + .eq(RetryTask::getSceneName, partitionTask.getSceneName()) .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())); - if (partitionTask.getNotifyThreshold() > 0 && count >= partitionTask.getNotifyThreshold()) { - List recipientInfos = partitionTask.getRecipientInfos(); - for (final RecipientInfo recipientInfo : recipientInfos) { - if (Objects.isNull(recipientInfo)) { - continue; - } - // 预警 - AlarmContext context = AlarmContext.build() - .text(retryTaskMoreThresholdTextMessageFormatter, - EnvironmentUtils.getActiveProfile(), - count, - partitionTask.getNamespaceId(), - partitionTask.getGroupName(), - partitionTask.getBusinessId(), - DateUtils.toNowFormat(DateUtils.NORM_DATETIME_PATTERN), - count) - .title("{}环境 场景重试数量超过阈值", EnvironmentUtils.getActiveProfile()) - .notifyAttribute(recipientInfo.getNotifyAttribute()); - Optional.ofNullable(SnailJobAlarmFactory.getAlarmType(recipientInfo.getNotifyType())) - .ifPresent(alarmType -> alarmType.asyncSendMessage(context)); + for (Long notifyId : partitionTask.getNotifyIds()) { + NotifyConfigDTO notifyConfigDTO = notifyConfigInfo.get(notifyId); + if (notifyConfigDTO == null) { + continue; + } + if (notifyConfigDTO.getNotifyThreshold() > 0 && count >= notifyConfigDTO.getNotifyThreshold()) { + List recipientInfos = notifyConfigDTO.getRecipientInfos(); + for (final NotifyConfigDTO.RecipientInfo recipientInfo : recipientInfos) { + if (Objects.isNull(recipientInfo)) { + continue; + } + // 预警 + AlarmContext context = AlarmContext.build() + .text(retryTaskMoreThresholdTextMessageFormatter, + EnvironmentUtils.getActiveProfile(), + count, + partitionTask.getNamespaceId(), + partitionTask.getGroupName(), + partitionTask.getSceneName(), + DateUtils.toNowFormat(DateUtils.NORM_DATETIME_PATTERN), + count) + .title("{}环境 场景重试数量超过阈值", EnvironmentUtils.getActiveProfile()) + .notifyAttribute(recipientInfo.getNotifyAttribute()); + Optional.ofNullable(SnailJobAlarmFactory.getAlarmType(recipientInfo.getNotifyType())) + .ifPresent(alarmType -> alarmType.asyncSendMessage(context)); + + } } } - } - private List getNotifyConfigPartitions(Long startId) { - - List retrySceneConfigList = accessTemplate.getSceneConfigAccess() - .listPage(new PageDTO<>(0, 500), new LambdaQueryWrapper<>()) - .getRecords(); - - Set retryNotifyIds = new HashSet<>(); - for (RetrySceneConfig retrySceneConfig : retrySceneConfigList) { - HashSet notifyIds = StrUtil.isBlank(retrySceneConfig.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(retrySceneConfig.getNotifyIds(), Long.class)); - retryNotifyIds.addAll(notifyIds); - } - if (CollUtil.isEmpty(retryNotifyIds)) { - return Lists.newArrayList(); - } - - List notifyConfigs = accessTemplate.getNotifyConfigAccess() - .list(new LambdaQueryWrapper() - .gt(NotifyConfig::getId, startId) - .in(NotifyConfig::getId, retryNotifyIds) - .eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus()) - .eq(NotifyConfig::getSystemTaskType, SyetemTaskTypeEnum.RETRY.getType()) - .eq(NotifyConfig::getNotifyScene, RetryNotifySceneEnum.MAX_RETRY.getNotifyScene()) - .orderByAsc(NotifyConfig::getId)); // SQLServer 分页必须 ORDER BY - - Set recipientIds = notifyConfigs.stream() - .flatMap(config -> JsonUtil.parseList(config.getRecipientIds(), Long.class).stream()) - .collect(Collectors.toSet()); - - if (CollUtil.isEmpty(recipientIds)) { - return Lists.newArrayList(); - } - - Map recipientMap = StreamUtils.toIdentityMap( - recipientMapper.selectBatchIds(recipientIds), NotifyRecipient::getId); - - List notifyConfigPartitionTasks = RetryTaskConverter.INSTANCE.toNotifyConfigPartitionTask( - notifyConfigs); - notifyConfigPartitionTasks.forEach(task -> { - List recipientList = StreamUtils.toList(task.getRecipientIds(), recipientId -> { - NotifyRecipient notifyRecipient = recipientMap.get(recipientId); - if (Objects.isNull(notifyRecipient)) { - return null; - } - - RecipientInfo recipientInfo = new RecipientInfo(); - recipientInfo.setNotifyType(notifyRecipient.getNotifyType()); - recipientInfo.setNotifyAttribute(notifyRecipient.getNotifyAttribute()); - - return recipientInfo; - }); - task.setRecipientInfos(recipientList); - }); - - return notifyConfigPartitionTasks; } @Override