diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/convert/AlarmInfoConverter.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/convert/AlarmInfoConverter.java index 8497892ec..74377c2d7 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/convert/AlarmInfoConverter.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/convert/AlarmInfoConverter.java @@ -29,8 +29,6 @@ public interface AlarmInfoConverter { AlarmInfoConverter INSTANCE = Mappers.getMapper(AlarmInfoConverter.class); - List deadLetterToAlarmInfo(List retryDeadLetters); - List retryToNotifyConfigInfos(List notifyConfigs); @Mappings({ diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryTaskFailDeadLetterAlarmEventDTO.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryTaskFailDeadLetterAlarmEventDTO.java new file mode 100644 index 000000000..804e3e189 --- /dev/null +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/dto/RetryTaskFailDeadLetterAlarmEventDTO.java @@ -0,0 +1,28 @@ +package com.aizuda.snailjob.server.retry.task.dto; + +import lombok.Data; + +/** + * author: zhangshuguang + * date: 2025-02-24 + */ +@Data +public class RetryTaskFailDeadLetterAlarmEventDTO { + + private String namespaceId; + + private String groupName; + + private String sceneName; + + private String idempotentId; + + private String bizNo; + + private String executorName; + + private String argsStr; + + private String extAttrs; + +} diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskConverter.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskConverter.java index 3eec3de81..6c73eead0 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskConverter.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/RetryTaskConverter.java @@ -175,4 +175,8 @@ public interface RetryTaskConverter { RequestCallbackExecutorDTO toRequestCallbackExecutorDTO(RetrySceneConfig retrySceneConfig, Retry retry); RetryCallbackRequest toRetryCallbackDTO(RequestCallbackExecutorDTO executorDTO); + + List toRetryTaskFailDeadLetterAlarmEventDTO(List retryDeadLetters); + + List toRetryAlarmInfos(List letterAlarmEventDTOS); } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/event/RetryTaskFailDeadLetterAlarmEvent.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/event/RetryTaskFailDeadLetterAlarmEvent.java index 31e6ca697..a1891c033 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/event/RetryTaskFailDeadLetterAlarmEvent.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/event/RetryTaskFailDeadLetterAlarmEvent.java @@ -1,5 +1,6 @@ package com.aizuda.snailjob.server.retry.task.support.event; +import com.aizuda.snailjob.server.retry.task.dto.RetryTaskFailDeadLetterAlarmEventDTO; import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter; import lombok.Getter; import org.springframework.context.ApplicationEvent; @@ -16,9 +17,9 @@ import java.util.List; @Getter public class RetryTaskFailDeadLetterAlarmEvent extends ApplicationEvent { - private List retryDeadLetters; + private List retryDeadLetters; - public RetryTaskFailDeadLetterAlarmEvent(List retryDeadLetters) { + public RetryTaskFailDeadLetterAlarmEvent(List retryDeadLetters) { super(retryDeadLetters); this.retryDeadLetters = retryDeadLetters; } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/listener/RetryTaskFailDeadLetterAlarmListener.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/listener/RetryTaskFailDeadLetterAlarmListener.java index e6c7e1684..e6db67bf4 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/listener/RetryTaskFailDeadLetterAlarmListener.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/listener/RetryTaskFailDeadLetterAlarmListener.java @@ -12,6 +12,8 @@ import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo; import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo; import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.snailjob.server.common.util.DateUtils; +import com.aizuda.snailjob.server.retry.task.dto.RetryTaskFailDeadLetterAlarmEventDTO; +import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailDeadLetterAlarmEvent; import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter; import com.google.common.collect.Lists; @@ -39,7 +41,7 @@ public class RetryTaskFailDeadLetterAlarmListener extends /** * 死信告警数据 */ - private final LinkedBlockingQueue> queue = new LinkedBlockingQueue<>(1000); + private final LinkedBlockingQueue> queue = new LinkedBlockingQueue<>(1000); private static final String retryTaskDeadTextMessagesFormatter = "{}环境 重试任务失败进入死信队列 \n" + @@ -58,12 +60,12 @@ public class RetryTaskFailDeadLetterAlarmListener extends @Override protected List poll() throws InterruptedException { // 无数据时阻塞线程 - List allRetryDeadLetterList = queue.poll(100, TimeUnit.MILLISECONDS); + List allRetryDeadLetterList = queue.poll(100, TimeUnit.MILLISECONDS); if (CollUtil.isEmpty(allRetryDeadLetterList)) { return Lists.newArrayList(); } - return AlarmInfoConverter.INSTANCE.deadLetterToAlarmInfo(allRetryDeadLetterList); + return RetryTaskConverter.INSTANCE.toRetryAlarmInfos(allRetryDeadLetterList); } @Override diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryFailureHandler.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryFailureHandler.java index 50b26fc01..0b1b9a294 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryFailureHandler.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/result/RetryFailureHandler.java @@ -98,8 +98,8 @@ public class RetryFailureHandler extends AbstractRetryResultHandler { RetryTaskConverter.INSTANCE.toRetryTaskFailAlarmEventDTO( retry, context.getExceptionMsg(), RETRY_TASK_FAIL_ERROR.getNotifyScene()); SnailSpringContext.getContext().publishEvent(new RetryTaskFailAlarmEvent(retryTaskFailAlarmEventDTO)); - return null; + return null; }); } } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/CleanerSchedule.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/CleanerSchedule.java index 0bffe846b..190e00bbf 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/CleanerSchedule.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/CleanerSchedule.java @@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.retry.task.support.schedule; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Assert; +import com.aizuda.snailjob.common.core.context.SnailSpringContext; import com.aizuda.snailjob.common.core.enums.RetryStatusEnum; import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.util.JsonUtil; @@ -17,6 +18,7 @@ import com.aizuda.snailjob.server.common.util.PartitionTaskUtils; import com.aizuda.snailjob.server.retry.task.dto.RetryPartitionTask; import com.aizuda.snailjob.server.retry.task.service.RetryDeadLetterConverter; import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter; +import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailDeadLetterAlarmEvent; import com.aizuda.snailjob.template.datasource.access.AccessTemplate; import com.aizuda.snailjob.template.datasource.access.TaskAccess; import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper; @@ -217,6 +219,9 @@ public class CleanerSchedule extends AbstractSchedule implements Lifecycle { .in(Retry::getId, StreamUtils.toList(retries, RetryPartitionTask::getId))), () -> new SnailJobServerException("删除重试数据失败 [{}]", JsonUtil.toJsonString(retries))); + SnailSpringContext.getContext().publishEvent(new RetryTaskFailDeadLetterAlarmEvent( + RetryTaskConverter.INSTANCE.toRetryTaskFailDeadLetterAlarmEventDTO(retryDeadLetters) + )); } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryErrorMoreThresholdAlarmSchedule.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryErrorMoreThresholdAlarmSchedule.java index cddef00be..fccae3a1c 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryErrorMoreThresholdAlarmSchedule.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryErrorMoreThresholdAlarmSchedule.java @@ -5,12 +5,14 @@ import com.aizuda.snailjob.common.core.alarm.Alarm; import com.aizuda.snailjob.common.core.alarm.AlarmContext; import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory; import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum; +import com.aizuda.snailjob.common.core.enums.RetryStatusEnum; import com.aizuda.snailjob.common.core.util.EnvironmentUtils; import com.aizuda.snailjob.server.common.Lifecycle; import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.retry.task.dto.NotifyConfigDTO; import com.aizuda.snailjob.server.retry.task.dto.RetrySceneConfigPartitionTask; import com.aizuda.snailjob.template.datasource.access.TaskAccess; +import com.aizuda.snailjob.template.datasource.persistence.po.Retry; import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.RequiredArgsConstructor; @@ -55,14 +57,17 @@ public class RetryErrorMoreThresholdAlarmSchedule extends AbstractRetryTaskAlarm return; } - // x分钟内、x组、x场景进入死信队列的数据量 LocalDateTime now = LocalDateTime.now(); - TaskAccess retryDeadLetterAccess = accessTemplate.getRetryDeadLetterAccess(); - long count = retryDeadLetterAccess.count(new LambdaQueryWrapper(). - between(RetryDeadLetter::getCreateDt, now.minusMinutes(30), now) - .eq(RetryDeadLetter::getNamespaceId, partitionTask.getNamespaceId()) - .eq(RetryDeadLetter::getGroupName, partitionTask.getGroupName()) - .eq(RetryDeadLetter::getSceneName, partitionTask.getSceneName())); + + // x分钟内、x组、x场景进入任务到达最大重试次数的数据量 + long count = accessTemplate.getRetryAccess() + .count(new LambdaQueryWrapper() + .eq(Retry::getNamespaceId, partitionTask.getNamespaceId()) + .between(Retry::getUpdateDt, now.minusMinutes(30), now) + .eq(Retry::getGroupName, partitionTask.getGroupName()) + .eq(Retry::getSceneName, partitionTask.getSceneName()) + .eq(Retry::getRetryStatus, RetryStatusEnum.MAX_COUNT.getStatus()) + ); for (Long notifyId : partitionTask.getNotifyIds()) { NotifyConfigDTO notifyConfigDTO = notifyConfigInfo.get(notifyId); diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryTaskMoreThresholdAlarmSchedule.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryTaskMoreThresholdAlarmSchedule.java index c454c88b7..dd51bc95f 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryTaskMoreThresholdAlarmSchedule.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryTaskMoreThresholdAlarmSchedule.java @@ -49,7 +49,7 @@ public class RetryTaskMoreThresholdAlarmSchedule extends AbstractRetryTaskAlarmS public void close() { } - + @Override protected void doSendAlarm(RetrySceneConfigPartitionTask partitionTask, Map notifyConfigInfo) { // x分钟内、x组、x场景进入重试任务的数据量