feat(1.4.0-beta1): 1.支持到达最大重试数量的任务告警通知

This commit is contained in:
opensnail 2025-02-24 22:18:44 +08:00
parent 7f350e9a96
commit c2661a988f
9 changed files with 59 additions and 16 deletions

View File

@ -29,8 +29,6 @@ public interface AlarmInfoConverter {
AlarmInfoConverter INSTANCE = Mappers.getMapper(AlarmInfoConverter.class);
List<RetryAlarmInfo> deadLetterToAlarmInfo(List<RetryDeadLetter> retryDeadLetters);
List<NotifyConfigInfo> retryToNotifyConfigInfos(List<NotifyConfig> notifyConfigs);
@Mappings({

View File

@ -0,0 +1,28 @@
package com.aizuda.snailjob.server.retry.task.dto;
import lombok.Data;
/**
* author: zhangshuguang
* date: 2025-02-24
*/
@Data
public class RetryTaskFailDeadLetterAlarmEventDTO {
private String namespaceId;
private String groupName;
private String sceneName;
private String idempotentId;
private String bizNo;
private String executorName;
private String argsStr;
private String extAttrs;
}

View File

@ -175,4 +175,8 @@ public interface RetryTaskConverter {
RequestCallbackExecutorDTO toRequestCallbackExecutorDTO(RetrySceneConfig retrySceneConfig, Retry retry);
RetryCallbackRequest toRetryCallbackDTO(RequestCallbackExecutorDTO executorDTO);
List<RetryTaskFailDeadLetterAlarmEventDTO> toRetryTaskFailDeadLetterAlarmEventDTO(List<RetryDeadLetter> retryDeadLetters);
List<RetryAlarmInfo> toRetryAlarmInfos(List<RetryTaskFailDeadLetterAlarmEventDTO> letterAlarmEventDTOS);
}

View File

@ -1,5 +1,6 @@
package com.aizuda.snailjob.server.retry.task.support.event;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskFailDeadLetterAlarmEventDTO;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
@ -16,9 +17,9 @@ import java.util.List;
@Getter
public class RetryTaskFailDeadLetterAlarmEvent extends ApplicationEvent {
private List<RetryDeadLetter> retryDeadLetters;
private List<RetryTaskFailDeadLetterAlarmEventDTO> retryDeadLetters;
public RetryTaskFailDeadLetterAlarmEvent(List<RetryDeadLetter> retryDeadLetters) {
public RetryTaskFailDeadLetterAlarmEvent(List<RetryTaskFailDeadLetterAlarmEventDTO> retryDeadLetters) {
super(retryDeadLetters);
this.retryDeadLetters = retryDeadLetters;
}

View File

@ -12,6 +12,8 @@ import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo;
import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskFailDeadLetterAlarmEventDTO;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailDeadLetterAlarmEvent;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter;
import com.google.common.collect.Lists;
@ -39,7 +41,7 @@ public class RetryTaskFailDeadLetterAlarmListener extends
/**
* 死信告警数据
*/
private final LinkedBlockingQueue<List<RetryDeadLetter>> queue = new LinkedBlockingQueue<>(1000);
private final LinkedBlockingQueue<List<RetryTaskFailDeadLetterAlarmEventDTO>> queue = new LinkedBlockingQueue<>(1000);
private static final String retryTaskDeadTextMessagesFormatter =
"<font face=\"微软雅黑\" color=#ff0000 size=4>{}环境 重试任务失败进入死信队列</font> \n" +
@ -58,12 +60,12 @@ public class RetryTaskFailDeadLetterAlarmListener extends
@Override
protected List<RetryAlarmInfo> poll() throws InterruptedException {
// 无数据时阻塞线程
List<RetryDeadLetter> allRetryDeadLetterList = queue.poll(100, TimeUnit.MILLISECONDS);
List<RetryTaskFailDeadLetterAlarmEventDTO> allRetryDeadLetterList = queue.poll(100, TimeUnit.MILLISECONDS);
if (CollUtil.isEmpty(allRetryDeadLetterList)) {
return Lists.newArrayList();
}
return AlarmInfoConverter.INSTANCE.deadLetterToAlarmInfo(allRetryDeadLetterList);
return RetryTaskConverter.INSTANCE.toRetryAlarmInfos(allRetryDeadLetterList);
}
@Override

View File

@ -98,8 +98,8 @@ public class RetryFailureHandler extends AbstractRetryResultHandler {
RetryTaskConverter.INSTANCE.toRetryTaskFailAlarmEventDTO(
retry, context.getExceptionMsg(), RETRY_TASK_FAIL_ERROR.getNotifyScene());
SnailSpringContext.getContext().publishEvent(new RetryTaskFailAlarmEvent(retryTaskFailAlarmEventDTO));
return null;
return null;
});
}
}

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.retry.task.support.schedule;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
@ -17,6 +18,7 @@ import com.aizuda.snailjob.server.common.util.PartitionTaskUtils;
import com.aizuda.snailjob.server.retry.task.dto.RetryPartitionTask;
import com.aizuda.snailjob.server.retry.task.service.RetryDeadLetterConverter;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailDeadLetterAlarmEvent;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.access.TaskAccess;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper;
@ -217,6 +219,9 @@ public class CleanerSchedule extends AbstractSchedule implements Lifecycle {
.in(Retry::getId, StreamUtils.toList(retries, RetryPartitionTask::getId))),
() -> new SnailJobServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retries)));
SnailSpringContext.getContext().publishEvent(new RetryTaskFailDeadLetterAlarmEvent(
RetryTaskConverter.INSTANCE.toRetryTaskFailDeadLetterAlarmEventDTO(retryDeadLetters)
));
}

View File

@ -5,12 +5,14 @@ import com.aizuda.snailjob.common.core.alarm.Alarm;
import com.aizuda.snailjob.common.core.alarm.AlarmContext;
import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory;
import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum;
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
import com.aizuda.snailjob.common.core.util.EnvironmentUtils;
import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.retry.task.dto.NotifyConfigDTO;
import com.aizuda.snailjob.server.retry.task.dto.RetrySceneConfigPartitionTask;
import com.aizuda.snailjob.template.datasource.access.TaskAccess;
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
@ -55,14 +57,17 @@ public class RetryErrorMoreThresholdAlarmSchedule extends AbstractRetryTaskAlarm
return;
}
// x分钟内x组x场景进入死信队列的数据量
LocalDateTime now = LocalDateTime.now();
TaskAccess<RetryDeadLetter> retryDeadLetterAccess = accessTemplate.getRetryDeadLetterAccess();
long count = retryDeadLetterAccess.count(new LambdaQueryWrapper<RetryDeadLetter>().
between(RetryDeadLetter::getCreateDt, now.minusMinutes(30), now)
.eq(RetryDeadLetter::getNamespaceId, partitionTask.getNamespaceId())
.eq(RetryDeadLetter::getGroupName, partitionTask.getGroupName())
.eq(RetryDeadLetter::getSceneName, partitionTask.getSceneName()));
// x分钟内x组x场景进入任务到达最大重试次数的数据量
long count = accessTemplate.getRetryAccess()
.count(new LambdaQueryWrapper<Retry>()
.eq(Retry::getNamespaceId, partitionTask.getNamespaceId())
.between(Retry::getUpdateDt, now.minusMinutes(30), now)
.eq(Retry::getGroupName, partitionTask.getGroupName())
.eq(Retry::getSceneName, partitionTask.getSceneName())
.eq(Retry::getRetryStatus, RetryStatusEnum.MAX_COUNT.getStatus())
);
for (Long notifyId : partitionTask.getNotifyIds()) {
NotifyConfigDTO notifyConfigDTO = notifyConfigInfo.get(notifyId);

View File

@ -49,7 +49,7 @@ public class RetryTaskMoreThresholdAlarmSchedule extends AbstractRetryTaskAlarmS
public void close() {
}
@Override
protected void doSendAlarm(RetrySceneConfigPartitionTask partitionTask, Map<Long, NotifyConfigDTO> notifyConfigInfo) {
// x分钟内x组x场景进入重试任务的数据量