fix(sj_1.3.0-beta1.1):

1、修复重试场景重试数量超过阈值定时任务告警空指针异常
This commit is contained in:
zhengweilin 2025-01-11 15:09:00 +08:00 committed by opensnail
parent 829ce07989
commit 2fb450f48a
3 changed files with 55 additions and 20 deletions

View File

@ -1,6 +1,7 @@
package com.aizuda.snailjob.server.retry.task.support.schedule;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.alarm.Alarm;
import com.aizuda.snailjob.common.core.alarm.AlarmContext;
import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory;
@ -25,6 +26,7 @@ import com.aizuda.snailjob.template.datasource.persistence.mapper.NotifyRecipien
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyRecipient;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists;
@ -71,7 +73,8 @@ public class RetryErrorMoreThresholdAlarmSchedule extends AbstractSchedule imple
@Override
protected void doExecute() {
SnailJobLog.LOCAL.debug("retryErrorMoreThreshold time[{}] ip:[{}]", LocalDateTime.now(),
SnailJobLog.LOCAL.debug("retryErrorMoreThreshold time[{}] ip:[{}]",
LocalDateTime.now(),
NetUtil.getLocalIpStr());
PartitionTaskUtils.process(this::getNotifyConfigPartitions, this::doHandler, 0);
}
@ -84,6 +87,7 @@ public class RetryErrorMoreThresholdAlarmSchedule extends AbstractSchedule imple
}
private void doSendAlarm(NotifyConfigPartitionTask partitionTask) {
// x分钟内x组x场景进入死信队列的数据量
LocalDateTime now = LocalDateTime.now();
TaskAccess<RetryDeadLetter> retryDeadLetterAccess = accessTemplate.getRetryDeadLetterAccess();
@ -93,7 +97,7 @@ public class RetryErrorMoreThresholdAlarmSchedule extends AbstractSchedule imple
between(RetryDeadLetter::getCreateDt, now.minusMinutes(30), now)
.eq(RetryDeadLetter::getGroupName, partitionTask.getGroupName())
.eq(RetryDeadLetter::getSceneName, partitionTask.getBusinessId()));
if (count >= partitionTask.getNotifyThreshold()) {
if (partitionTask.getNotifyThreshold() > 0 && count >= partitionTask.getNotifyThreshold()) {
List<RecipientInfo> recipientInfos = partitionTask.getRecipientInfos();
for (final RecipientInfo recipientInfo : recipientInfos) {
if (Objects.isNull(recipientInfo)) {
@ -120,20 +124,34 @@ public class RetryErrorMoreThresholdAlarmSchedule extends AbstractSchedule imple
}
}
// 1.拿500场景
// 2.获取通知配置ids并查询通知配置
// 3.Map<通知ID, 通知DTO>
// 4.循环场景查询死信数据量
// 5.场景对应的通知ids
private List<NotifyConfigPartitionTask> getNotifyConfigPartitions(Long startId) {
List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess()
.listPage(new PageDTO<>(0, 1000), new LambdaQueryWrapper<NotifyConfig>()
.gt(NotifyConfig::getId, startId)
.eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus())
.eq(NotifyConfig::getNotifyScene, RetryNotifySceneEnum.MAX_RETRY_ERROR.getNotifyScene())
.orderByAsc(NotifyConfig::getId)
).getRecords();
List<RetrySceneConfig> retrySceneConfigList = accessTemplate.getSceneConfigAccess()
.listPage(new PageDTO<>(0, 500), new LambdaQueryWrapper<>())
.getRecords();
if (CollUtil.isEmpty(notifyConfigs)) {
Set<Long> retryNotifyIds = new HashSet<>();
for (RetrySceneConfig retrySceneConfig : retrySceneConfigList) {
HashSet<Long> notifyIds = StrUtil.isBlank(retrySceneConfig.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(retrySceneConfig.getNotifyIds(), Long.class));
retryNotifyIds.addAll(notifyIds);
}
if (CollUtil.isEmpty(retryNotifyIds)) {
return Lists.newArrayList();
}
List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess()
.list(new LambdaQueryWrapper<NotifyConfig>()
.gt(NotifyConfig::getId, startId)
.in(NotifyConfig::getId, retryNotifyIds)
.eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus())
.eq(NotifyConfig::getNotifyScene, RetryNotifySceneEnum.MAX_RETRY_ERROR.getNotifyScene())
.orderByAsc(NotifyConfig::getId));
Set<Long> recipientIds = notifyConfigs.stream()
.map(config -> new HashSet<>(JsonUtil.parseList(config.getRecipientIds(), Long.class)))
.reduce((a, b) -> {

View File

@ -1,6 +1,7 @@
package com.aizuda.snailjob.server.retry.task.support.schedule;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.alarm.AlarmContext;
import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory;
import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum;
@ -22,6 +23,7 @@ import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.mapper.NotifyRecipientMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyRecipient;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
@ -80,6 +82,8 @@ public class RetryTaskMoreThresholdAlarmSchedule extends AbstractSchedule implem
}
private void doSendAlarm(NotifyConfigPartitionTask partitionTask) {
// x分钟内x组x场景进入重试任务的数据量
long count = accessTemplate.getRetryTaskAccess()
.count(partitionTask.getGroupName(), partitionTask.getNamespaceId(),
new LambdaQueryWrapper<RetryTask>()
@ -87,8 +91,7 @@ public class RetryTaskMoreThresholdAlarmSchedule extends AbstractSchedule implem
.eq(RetryTask::getGroupName, partitionTask.getGroupName())
.eq(RetryTask::getSceneName, partitionTask.getBusinessId())
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()));
if (count >= partitionTask.getNotifyThreshold()) {
if (partitionTask.getNotifyThreshold() > 0 && count >= partitionTask.getNotifyThreshold()) {
List<RecipientInfo> recipientInfos = partitionTask.getRecipientInfos();
for (final RecipientInfo recipientInfo : recipientInfos) {
if (Objects.isNull(recipientInfo)) {
@ -115,13 +118,27 @@ public class RetryTaskMoreThresholdAlarmSchedule extends AbstractSchedule implem
private List<NotifyConfigPartitionTask> getNotifyConfigPartitions(Long startId) {
List<RetrySceneConfig> retrySceneConfigList = accessTemplate.getSceneConfigAccess()
.listPage(new PageDTO<>(0, 500), new LambdaQueryWrapper<>())
.getRecords();
Set<Long> retryNotifyIds = new HashSet<>();
for (RetrySceneConfig retrySceneConfig : retrySceneConfigList) {
HashSet<Long> notifyIds = StrUtil.isBlank(retrySceneConfig.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(retrySceneConfig.getNotifyIds(), Long.class));
retryNotifyIds.addAll(notifyIds);
}
if (CollUtil.isEmpty(retryNotifyIds)) {
return Lists.newArrayList();
}
List<NotifyConfig> notifyConfigs = accessTemplate.getNotifyConfigAccess()
.listPage(new PageDTO<>(0, 1000), new LambdaQueryWrapper<NotifyConfig>()
.list(new LambdaQueryWrapper<NotifyConfig>()
.gt(NotifyConfig::getId, startId)
.in(NotifyConfig::getId, retryNotifyIds)
.eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus())
.eq(NotifyConfig::getSystemTaskType, SyetemTaskTypeEnum.RETRY.getType())
.eq(NotifyConfig::getNotifyScene, RetryNotifySceneEnum.MAX_RETRY.getNotifyScene())
.orderByAsc(NotifyConfig::getId)) // SQLServer 分页必须 ORDER BY
.getRecords();
.orderByAsc(NotifyConfig::getId)); // SQLServer 分页必须 ORDER BY
Set<Long> recipientIds = notifyConfigs.stream()
.flatMap(config -> JsonUtil.parseList(config.getRecipientIds(), Long.class).stream())

View File

@ -96,8 +96,7 @@ public class JobServiceImpl implements JobService {
.eq(Job::getNamespaceId, userSessionVO.getNamespaceId())
.in(CollUtil.isNotEmpty(groupNames), Job::getGroupName, groupNames)
.likeRight(StrUtil.isNotBlank(queryVO.getJobName()), Job::getJobName, StrUtil.trim(queryVO.getJobName()))
.like(StrUtil.isNotBlank(queryVO.getExecutorInfo()), Job::getExecutorInfo,
StrUtil.trim(queryVO.getExecutorInfo()))
.like(StrUtil.isNotBlank(queryVO.getExecutorInfo()), Job::getExecutorInfo, StrUtil.trim(queryVO.getExecutorInfo()))
.eq(Objects.nonNull(queryVO.getJobStatus()), Job::getJobStatus, queryVO.getJobStatus())
.eq(Job::getDeleted, StatusEnum.NO.getStatus())
.eq(Objects.nonNull(queryVO.getOwnerId()), Job::getOwnerId, queryVO.getOwnerId())
@ -105,8 +104,9 @@ public class JobServiceImpl implements JobService {
List<JobResponseVO> jobResponseList = JobResponseVOConverter.INSTANCE.convertList(selectPage.getRecords());
for (JobResponseVO jobResponseVO : jobResponseList) {
SystemUser systemUser = systemUserMapper.selectById(jobResponseVO.getOwnerId());
if (Objects.nonNull(systemUser)) {
// 兼容Oracle OwnerId Null查询异常java.sql.SQLException: 无效的列类型: 1111
if (Objects.nonNull(jobResponseVO.getOwnerId())) {
SystemUser systemUser = systemUserMapper.selectById(jobResponseVO.getOwnerId());
jobResponseVO.setOwnerName(systemUser.getUsername());
}
}
@ -241,7 +241,7 @@ public class JobServiceImpl implements JobService {
// 设置now表示立即执行
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli());
jobTaskPrepare.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_JOB.getType());
if (StrUtil.isNotBlank(jobTrigger.getTmpArgsStr())){
if (StrUtil.isNotBlank(jobTrigger.getTmpArgsStr())) {
jobTaskPrepare.setTmpArgsStr(jobTrigger.getTmpArgsStr());
}
// 创建批次