feat(1.4.0-beta1): 1.支持到达最大重试数量的任务告警通知

This commit is contained in:
opensnail 2025-02-24 22:18:44 +08:00
parent 9e1f9ed531
commit d212ed1b94
9 changed files with 59 additions and 16 deletions

View File

@ -29,8 +29,6 @@ public interface AlarmInfoConverter {
AlarmInfoConverter INSTANCE = Mappers.getMapper(AlarmInfoConverter.class); AlarmInfoConverter INSTANCE = Mappers.getMapper(AlarmInfoConverter.class);
List<RetryAlarmInfo> deadLetterToAlarmInfo(List<RetryDeadLetter> retryDeadLetters);
List<NotifyConfigInfo> retryToNotifyConfigInfos(List<NotifyConfig> notifyConfigs); List<NotifyConfigInfo> retryToNotifyConfigInfos(List<NotifyConfig> notifyConfigs);
@Mappings({ @Mappings({

View File

@ -0,0 +1,28 @@
package com.aizuda.snailjob.server.retry.task.dto;
import lombok.Data;
/**
* author: zhangshuguang
* date: 2025-02-24
*/
@Data
public class RetryTaskFailDeadLetterAlarmEventDTO {
private String namespaceId;
private String groupName;
private String sceneName;
private String idempotentId;
private String bizNo;
private String executorName;
private String argsStr;
private String extAttrs;
}

View File

@ -175,4 +175,8 @@ public interface RetryTaskConverter {
RequestCallbackExecutorDTO toRequestCallbackExecutorDTO(RetrySceneConfig retrySceneConfig, Retry retry); RequestCallbackExecutorDTO toRequestCallbackExecutorDTO(RetrySceneConfig retrySceneConfig, Retry retry);
RetryCallbackRequest toRetryCallbackDTO(RequestCallbackExecutorDTO executorDTO); RetryCallbackRequest toRetryCallbackDTO(RequestCallbackExecutorDTO executorDTO);
List<RetryTaskFailDeadLetterAlarmEventDTO> toRetryTaskFailDeadLetterAlarmEventDTO(List<RetryDeadLetter> retryDeadLetters);
List<RetryAlarmInfo> toRetryAlarmInfos(List<RetryTaskFailDeadLetterAlarmEventDTO> letterAlarmEventDTOS);
} }

View File

@ -1,5 +1,6 @@
package com.aizuda.snailjob.server.retry.task.support.event; package com.aizuda.snailjob.server.retry.task.support.event;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskFailDeadLetterAlarmEventDTO;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter; import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter;
import lombok.Getter; import lombok.Getter;
import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEvent;
@ -16,9 +17,9 @@ import java.util.List;
@Getter @Getter
public class RetryTaskFailDeadLetterAlarmEvent extends ApplicationEvent { public class RetryTaskFailDeadLetterAlarmEvent extends ApplicationEvent {
private List<RetryDeadLetter> retryDeadLetters; private List<RetryTaskFailDeadLetterAlarmEventDTO> retryDeadLetters;
public RetryTaskFailDeadLetterAlarmEvent(List<RetryDeadLetter> retryDeadLetters) { public RetryTaskFailDeadLetterAlarmEvent(List<RetryTaskFailDeadLetterAlarmEventDTO> retryDeadLetters) {
super(retryDeadLetters); super(retryDeadLetters);
this.retryDeadLetters = retryDeadLetters; this.retryDeadLetters = retryDeadLetters;
} }

View File

@ -12,6 +12,8 @@ import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo;
import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo; import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskFailDeadLetterAlarmEventDTO;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailDeadLetterAlarmEvent; import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailDeadLetterAlarmEvent;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter; import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -39,7 +41,7 @@ public class RetryTaskFailDeadLetterAlarmListener extends
/** /**
* 死信告警数据 * 死信告警数据
*/ */
private final LinkedBlockingQueue<List<RetryDeadLetter>> queue = new LinkedBlockingQueue<>(1000); private final LinkedBlockingQueue<List<RetryTaskFailDeadLetterAlarmEventDTO>> queue = new LinkedBlockingQueue<>(1000);
private static final String retryTaskDeadTextMessagesFormatter = private static final String retryTaskDeadTextMessagesFormatter =
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 重试任务失败进入死信队列</font> \n" + "<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 重试任务失败进入死信队列</font> \n" +
@ -58,12 +60,12 @@ public class RetryTaskFailDeadLetterAlarmListener extends
@Override @Override
protected List<RetryAlarmInfo> poll() throws InterruptedException { protected List<RetryAlarmInfo> poll() throws InterruptedException {
// 无数据时阻塞线程 // 无数据时阻塞线程
List<RetryDeadLetter> allRetryDeadLetterList = queue.poll(100, TimeUnit.MILLISECONDS); List<RetryTaskFailDeadLetterAlarmEventDTO> allRetryDeadLetterList = queue.poll(100, TimeUnit.MILLISECONDS);
if (CollUtil.isEmpty(allRetryDeadLetterList)) { if (CollUtil.isEmpty(allRetryDeadLetterList)) {
return Lists.newArrayList(); return Lists.newArrayList();
} }
return AlarmInfoConverter.INSTANCE.deadLetterToAlarmInfo(allRetryDeadLetterList); return RetryTaskConverter.INSTANCE.toRetryAlarmInfos(allRetryDeadLetterList);
} }
@Override @Override

View File

@ -98,8 +98,8 @@ public class RetryFailureHandler extends AbstractRetryResultHandler {
RetryTaskConverter.INSTANCE.toRetryTaskFailAlarmEventDTO( RetryTaskConverter.INSTANCE.toRetryTaskFailAlarmEventDTO(
retry, context.getExceptionMsg(), RETRY_TASK_FAIL_ERROR.getNotifyScene()); retry, context.getExceptionMsg(), RETRY_TASK_FAIL_ERROR.getNotifyScene());
SnailSpringContext.getContext().publishEvent(new RetryTaskFailAlarmEvent(retryTaskFailAlarmEventDTO)); SnailSpringContext.getContext().publishEvent(new RetryTaskFailAlarmEvent(retryTaskFailAlarmEventDTO));
return null;
return null;
}); });
} }
} }

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.retry.task.support.schedule;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum; import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.core.util.JsonUtil;
@ -17,6 +18,7 @@ import com.aizuda.snailjob.server.common.util.PartitionTaskUtils;
import com.aizuda.snailjob.server.retry.task.dto.RetryPartitionTask; import com.aizuda.snailjob.server.retry.task.dto.RetryPartitionTask;
import com.aizuda.snailjob.server.retry.task.service.RetryDeadLetterConverter; import com.aizuda.snailjob.server.retry.task.service.RetryDeadLetterConverter;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailDeadLetterAlarmEvent;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate; import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.access.TaskAccess; import com.aizuda.snailjob.template.datasource.access.TaskAccess;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper;
@ -217,6 +219,9 @@ public class CleanerSchedule extends AbstractSchedule implements Lifecycle {
.in(Retry::getId, StreamUtils.toList(retries, RetryPartitionTask::getId))), .in(Retry::getId, StreamUtils.toList(retries, RetryPartitionTask::getId))),
() -> new SnailJobServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retries))); () -> new SnailJobServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retries)));
SnailSpringContext.getContext().publishEvent(new RetryTaskFailDeadLetterAlarmEvent(
RetryTaskConverter.INSTANCE.toRetryTaskFailDeadLetterAlarmEventDTO(retryDeadLetters)
));
} }

View File

@ -5,12 +5,14 @@ import com.aizuda.snailjob.common.core.alarm.Alarm;
import com.aizuda.snailjob.common.core.alarm.AlarmContext; import com.aizuda.snailjob.common.core.alarm.AlarmContext;
import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory; import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory;
import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum; import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum;
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
import com.aizuda.snailjob.common.core.util.EnvironmentUtils; import com.aizuda.snailjob.common.core.util.EnvironmentUtils;
import com.aizuda.snailjob.server.common.Lifecycle; import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.retry.task.dto.NotifyConfigDTO; import com.aizuda.snailjob.server.retry.task.dto.NotifyConfigDTO;
import com.aizuda.snailjob.server.retry.task.dto.RetrySceneConfigPartitionTask; import com.aizuda.snailjob.server.retry.task.dto.RetrySceneConfigPartitionTask;
import com.aizuda.snailjob.template.datasource.access.TaskAccess; import com.aizuda.snailjob.template.datasource.access.TaskAccess;
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter; import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
@ -55,14 +57,17 @@ public class RetryErrorMoreThresholdAlarmSchedule extends AbstractRetryTaskAlarm
return; return;
} }
// x分钟内x组x场景进入死信队列的数据量
LocalDateTime now = LocalDateTime.now(); LocalDateTime now = LocalDateTime.now();
TaskAccess<RetryDeadLetter> retryDeadLetterAccess = accessTemplate.getRetryDeadLetterAccess();
long count = retryDeadLetterAccess.count(new LambdaQueryWrapper<RetryDeadLetter>(). // x分钟内x组x场景进入任务到达最大重试次数的数据量
between(RetryDeadLetter::getCreateDt, now.minusMinutes(30), now) long count = accessTemplate.getRetryAccess()
.eq(RetryDeadLetter::getNamespaceId, partitionTask.getNamespaceId()) .count(new LambdaQueryWrapper<Retry>()
.eq(RetryDeadLetter::getGroupName, partitionTask.getGroupName()) .eq(Retry::getNamespaceId, partitionTask.getNamespaceId())
.eq(RetryDeadLetter::getSceneName, partitionTask.getSceneName())); .between(Retry::getUpdateDt, now.minusMinutes(30), now)
.eq(Retry::getGroupName, partitionTask.getGroupName())
.eq(Retry::getSceneName, partitionTask.getSceneName())
.eq(Retry::getRetryStatus, RetryStatusEnum.MAX_COUNT.getStatus())
);
for (Long notifyId : partitionTask.getNotifyIds()) { for (Long notifyId : partitionTask.getNotifyIds()) {
NotifyConfigDTO notifyConfigDTO = notifyConfigInfo.get(notifyId); NotifyConfigDTO notifyConfigDTO = notifyConfigInfo.get(notifyId);

View File

@ -49,7 +49,7 @@ public class RetryTaskMoreThresholdAlarmSchedule extends AbstractRetryTaskAlarmS
public void close() { public void close() {
} }
@Override
protected void doSendAlarm(RetrySceneConfigPartitionTask partitionTask, Map<Long, NotifyConfigDTO> notifyConfigInfo) { protected void doSendAlarm(RetrySceneConfigPartitionTask partitionTask, Map<Long, NotifyConfigDTO> notifyConfigInfo) {
// x分钟内x组x场景进入重试任务的数据量 // x分钟内x组x场景进入重试任务的数据量