feat(sj_1.3.0-beta1):

1、新增重试场景告警通知配置
2、新增工作流任务告警通知配置
3、重试任务新增告警失败
This commit is contained in:
zhengweilin 2024-12-15 14:11:55 +08:00
parent 01e8518a23
commit bfb8487e4b
18 changed files with 67 additions and 54 deletions

View File

@ -42,4 +42,5 @@ public class RetryTask extends CreateUpdateDt {
private Integer retryStatus;
private Integer taskType;
}

View File

@ -94,7 +94,9 @@ public class JobExecutorActor extends AbstractActor {
SnailJobLog.LOCAL.error("job executor exception. [{}]", taskExecute, e);
handleTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(), JobOperationReasonEnum.TASK_EXECUTION_ERROR.getReason());
SnailSpringContext.getContext().publishEvent(
new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder().jobTaskBatchId(taskExecute.getTaskBatchId()).build()));
new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder()
.jobTaskBatchId(taskExecute.getTaskBatchId())
.build()));
} finally {
getContext().stop(getSelf());
}
@ -133,7 +135,8 @@ public class JobExecutorActor extends AbstractActor {
SnailSpringContext.getContext().publishEvent(
new JobTaskFailNodeAlarmEvent(JobTaskFailAlarmEventDTO.builder()
.jobTaskBatchId(taskExecute.getTaskBatchId())
.reason(JobNotifySceneEnum.JOB_NO_CLIENT_NODES_ERROR.getDesc()).build()));
.reason(JobNotifySceneEnum.JOB_NO_CLIENT_NODES_ERROR.getDesc())
.build()));
}
// 更新状态
@ -232,7 +235,9 @@ public class JobExecutorActor extends AbstractActor {
if (JobTaskBatchStatusEnum.NOT_SUCCESS.contains(taskStatus)) {
SnailSpringContext.getContext().publishEvent(
new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder().jobTaskBatchId(taskExecute.getTaskBatchId()).build()));
new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder()
.jobTaskBatchId(taskExecute.getTaskBatchId())
.build()));
}
}

View File

@ -78,8 +78,8 @@ public class WorkflowExecutorActor extends AbstractActor {
SnailJobLog.LOCAL.error("workflow executor exception. [{}]", taskExecute, e);
handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(),
JobOperationReasonEnum.TASK_EXECUTION_ERROR.getReason());
SnailSpringContext.getContext()
.publishEvent(new WorkflowTaskFailAlarmEvent(taskExecute.getWorkflowTaskBatchId()));
SnailSpringContext.getContext().publishEvent(
new WorkflowTaskFailAlarmEvent(taskExecute.getWorkflowTaskBatchId()));
} finally {
getContext().stop(getSelf());
}

View File

@ -122,7 +122,9 @@ public class RequestClientActor extends AbstractActor {
taskExecuteFailure(realJobExecutorDTO, throwable.getMessage());
SnailSpringContext.getContext().publishEvent(
new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder().jobTaskBatchId(dispatchJobRequest.getTaskBatchId()).build()));
new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder()
.jobTaskBatchId(dispatchJobRequest.getTaskBatchId())
.build()));
}
}

View File

@ -89,7 +89,8 @@ public class JobTaskBatchGenerator {
SnailSpringContext.getContext().publishEvent(
new JobTaskFailNodeAlarmEvent(JobTaskFailAlarmEventDTO.builder()
.jobTaskBatchId(jobTaskBatch.getId())
.reason(JobNotifySceneEnum.JOB_NO_CLIENT_NODES_ERROR.getDesc()).build()));
.reason(JobNotifySceneEnum.JOB_NO_CLIENT_NODES_ERROR.getDesc())
.build()));
}
// 非待处理状态无需进入时间轮中

View File

@ -66,7 +66,9 @@ public class RunningJobPrepareHandler extends AbstractJobPrepareHandler {
instanceInterrupt.stop(stopJobContext);
SnailSpringContext.getContext().publishEvent(
JobTaskFailAlarmEventDTO.builder().jobTaskBatchId(prepare.getTaskBatchId()));
JobTaskFailAlarmEventDTO.builder()
.jobTaskBatchId(prepare.getTaskBatchId())
.build());
}
}

View File

@ -69,7 +69,10 @@ public abstract class AbstractJobExecutorResultHandler implements JobExecutorRes
if (failCount > 0) {
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
SnailSpringContext.getContext().publishEvent(
new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder().jobTaskBatchId(context.getTaskBatchId()).reason(context.getMessage()).build()));
new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder()
.jobTaskBatchId(context.getTaskBatchId())
.reason(context.getMessage())
.build()));
doHandleFail(context);
} else if (stopCount > 0) {
taskBatchStatus = JobTaskBatchStatusEnum.STOP.getStatus();

View File

@ -69,7 +69,9 @@ public class JobTimeoutCheckTask implements TimerTask<String> {
instanceInterrupt.stop(stopJobContext);
SnailSpringContext.getContext().publishEvent(
new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder().jobTaskBatchId(taskBatchId).build()));
new JobTaskFailAlarmEvent(JobTaskFailAlarmEventDTO.builder()
.jobTaskBatchId(taskBatchId)
.build()));
SnailJobLog.LOCAL.info("超时中断.taskBatchId:[{}]", taskBatchId);
}

View File

@ -4,9 +4,7 @@ import lombok.Builder;
import lombok.Data;
/**
* @author zhengweilin
* @version 1.0.0
* @date 2024/12/12
* 重试任务失败告警
*/
@Data
@Builder
@ -15,4 +13,5 @@ public class RetryTaskFailAlarmEventDTO {
private Long retryTaskId;
private String reason;
}

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.retry.task.support.dispatch.actor.result;
import akka.actor.AbstractActor;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.IdempotentStrategy;
@ -91,7 +92,7 @@ public class FailureActor extends AbstractActor {
.eq(RetryTaskLog::getUniqueId, retryTask.getUniqueId())
.eq(RetryTaskLog::getGroupName, retryTask.getGroupName()));
context.publishEvent(new RetryTaskFailMoreThresholdAlarmEvent(retryTask));
SnailSpringContext.getContext().publishEvent(new RetryTaskFailMoreThresholdAlarmEvent(retryTask));
}
});
} catch (Exception e) {

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.server.retry.task.support.dispatch.task;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Pair;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.dto.RetryLogMetaDTO;
@ -10,6 +11,8 @@ import com.aizuda.snailjob.server.common.triple.ImmutableTriple;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.retry.task.support.RetryContext;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailAlarmEvent;
import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailDeadLetterAlarmEvent;
import com.aizuda.snailjob.server.retry.task.support.idempotent.IdempotentHolder;
import com.aizuda.snailjob.server.retry.task.support.idempotent.RetryIdempotentStrategyHandler;
import com.aizuda.snailjob.server.retry.task.support.retry.RetryExecutor;
@ -65,7 +68,6 @@ public abstract class AbstractTaskExecutor implements TaskExecutor, Initializing
retryLogMetaDTO.setTimestamp(DateUtils.toNowMilli());
SnailJobLog.REMOTE.error("触发条件不满足 原因: [{}] <|>{}<|>", pair.getValue().toString(), retryLogMetaDTO);
return false;
}

View File

@ -1,12 +1,9 @@
package com.aizuda.snailjob.server.retry.task.support.event;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskFailAlarmEventDTO;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
import java.util.List;
/**
* 重试任务失败事件
*
@ -16,11 +13,11 @@ import java.util.List;
*/
@Getter
public class RetryTaskFailAlarmEvent extends ApplicationEvent {
private RetryTaskFailAlarmEventDTO retryTaskFailAlarmEventDTO;
public RetryTaskFailAlarmEvent(RetryTaskFailAlarmEventDTO retryTaskFailAlarmEventDTO) {
super(retryTaskFailAlarmEventDTO);
this.retryTaskFailAlarmEventDTO = retryTaskFailAlarmEventDTO;
}
}

View File

@ -1,6 +1,7 @@
package com.aizuda.snailjob.server.retry.task.support.event;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
import java.util.List;
@ -12,15 +13,13 @@ import java.util.List;
* @date : 2023-11-20 21:40
* @since 2.5.0
*/
@Getter
public class RetryTaskFailDeadLetterAlarmEvent extends ApplicationEvent {
private List<RetryDeadLetter> retryDeadLetters;
public RetryTaskFailDeadLetterAlarmEvent(List<RetryDeadLetter> retryDeadLetters) {
super(retryDeadLetters);
this.retryDeadLetters = retryDeadLetters;
}
public List<RetryDeadLetter> getRetryDeadLetters() {
return retryDeadLetters;
}
}

View File

@ -1,6 +1,7 @@
package com.aizuda.snailjob.server.retry.task.support.event;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
/**
@ -10,15 +11,13 @@ import org.springframework.context.ApplicationEvent;
* @date : 2023-11-20 21:40
* @since 2.5.0
*/
@Getter
public class RetryTaskFailMoreThresholdAlarmEvent extends ApplicationEvent {
private RetryTask retryTask;
public RetryTaskFailMoreThresholdAlarmEvent(RetryTask retryTask) {
super(retryTask);
this.retryTask = retryTask;
}
public RetryTask getRetryTask() {
return retryTask;
}
}

View File

@ -1,6 +1,5 @@
package com.aizuda.snailjob.server.retry.task.support.listener;
import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.alarm.AlarmContext;
import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum;
import com.aizuda.snailjob.common.core.util.EnvironmentUtils;
@ -8,26 +7,18 @@ import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.AlarmInfoConverter;
import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.common.alarm.AbstractRetryAlarm;
import com.aizuda.snailjob.server.common.dto.JobAlarmInfo;
import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo;
import com.aizuda.snailjob.server.common.dto.RetryAlarmInfo;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailAlarmEvent;
import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailDeadLetterAlarmEvent;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.JobBatchResponseDO;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
@ -66,21 +57,17 @@ public class RetryTaskFailAlarmListener extends
@Override
protected List<RetryAlarmInfo> poll() throws InterruptedException {
// 无数据时阻塞线程
RetryTaskFailAlarmEventDTO retryTaskFailAlarmEventDTO = queue.poll(100, TimeUnit.MILLISECONDS);
if (Objects.isNull(retryTaskFailAlarmEventDTO)) {
return Lists.newArrayList();
}
// 拉取200条
/*List<Long> retryTaskIds = Lists.newArrayList(retryTaskFailAlarmEventDTO.getRetryTaskId());
queue.drainTo(Collections.singleton(retryTaskIds), 200);
QueryWrapper<RetryTask> wrapper = new QueryWrapper<RetryTask>()
.in("batch.id", retryTaskIds)
.eq("batch.deleted", 0);
List<JobBatchResponseDO> jobTaskBatchList = jobTaskBatchMapper.selectJobBatchListByIds(wrapper);
List<JobAlarmInfo> jobAlarmInfos = AlarmInfoConverter.INSTANCE.retryTaskToAlarmInfo(jobTaskBatchList);
jobAlarmInfos.stream().forEach(i -> i.setReason(jobTaskFailAlarmEventDTO.getReason()));*/
/*List<RetryTask> lists = Lists.newArrayList(retryTask);
queue.drainTo(lists, 200);
return AlarmInfoConverter.INSTANCE.retryTaskToAlarmInfo(lists);*/
return null;
}
@ -88,7 +75,7 @@ public class RetryTaskFailAlarmListener extends
@TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMPLETION)
public void doOnApplicationEvent(RetryTaskFailAlarmEvent retryTaskFailAlarmEvent) {
if (!queue.offer(retryTaskFailAlarmEvent.getRetryTaskFailAlarmEventDTO())) {
SnailJobLog.LOCAL.warn("任务重试失败进入死信队列告警队列已满");
SnailJobLog.LOCAL.warn("任务重试失败告警队列已满");
}
}

View File

@ -58,7 +58,7 @@ public class RetryTaskFailMoreThresholdAlarmListener extends
return Lists.newArrayList();
}
// 拉取100条
// 拉取200条
List<RetryTask> lists = Lists.newArrayList(retryTask);
queue.drainTo(lists, 200);

View File

@ -11,6 +11,7 @@ import com.aizuda.snailjob.server.retry.task.support.FilterStrategy;
import com.aizuda.snailjob.server.retry.task.support.RetryContext;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.StopStrategy;
import com.aizuda.snailjob.server.retry.task.support.strategy.FilterStrategies;
import lombok.extern.slf4j.Slf4j;
import java.util.List;

View File

@ -8,9 +8,11 @@ import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.dto.DistributeInstance;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.triple.ImmutableTriple;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.retry.task.support.FilterStrategy;
import com.aizuda.snailjob.server.retry.task.support.RetryContext;
import com.aizuda.snailjob.server.retry.task.support.cache.CacheGroupRateLimiter;
import com.aizuda.snailjob.server.retry.task.support.event.RetryTaskFailAlarmEvent;
import com.aizuda.snailjob.template.datasource.persistence.mapper.ServerNodeMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode;
@ -185,17 +187,27 @@ public class FilterStrategies {
RetryTask retryTask = retryContext.getRetryTask();
RegisterNodeInfo serverNode = retryContext.getServerNode();
boolean result;
StringBuilder description = new StringBuilder();
if (Objects.isNull(serverNode)) {
return Pair.of(Boolean.FALSE, description.append(MessageFormat.format("没有可执行的客户端节点. uniqueId:[{0}]", retryTask.getUniqueId())));
result = false;
description.append(MessageFormat.format("没有可执行的客户端节点. uniqueId:[{0}]", retryTask.getUniqueId()));
} else {
ServerNodeMapper serverNodeMapper = SnailSpringContext.getBeanByType(ServerNodeMapper.class);
result = 1 == serverNodeMapper.selectCount(new LambdaQueryWrapper<ServerNode>().eq(ServerNode::getHostId, serverNode.getHostId()));
if (!result) {
// 删除缓存中的失效节点
CacheRegisterTable.remove(retryTask.getGroupName(), retryTask.getNamespaceId(), serverNode.getHostId());
description.append(MessageFormat.format("DB中未查询到客户端节点. hostId:[{0}] uniqueId:[{1}]", serverNode.getHostId(), retryTask.getUniqueId()));
}
}
ServerNodeMapper serverNodeMapper = SnailSpringContext.getBeanByType(ServerNodeMapper.class);
boolean result = 1 == serverNodeMapper.selectCount(new LambdaQueryWrapper<ServerNode>().eq(ServerNode::getHostId, serverNode.getHostId()));
if (!result) {
// 删除缓存中的失效节点
CacheRegisterTable.remove(retryTask.getGroupName(), retryTask.getNamespaceId(), serverNode.getHostId());
description.append(MessageFormat.format("DB中未查询到客户端节点. hostId:[{0}] uniqueId:[{1}]", serverNode.getHostId(), retryTask.getUniqueId()));
if (result == false) {
SnailSpringContext.getContext().publishEvent(
new RetryTaskFailAlarmEvent(RetryTaskFailAlarmEventDTO.builder()
.retryTaskId(retryTask.getId())
.reason(description.toString())
.build()));
}
return Pair.of(result, description);