From 2fb450f48ad02fc88cdf18d5c46534c52fb80c48 Mon Sep 17 00:00:00 2001 From: zhengweilin Date: Sat, 11 Jan 2025 15:09:00 +0800 Subject: [PATCH] =?UTF-8?q?fix(sj=5F1.3.0-beta1.1):=201=E3=80=81=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=E9=87=8D=E8=AF=95=E5=9C=BA=E6=99=AF=E9=87=8D=E8=AF=95?= =?UTF-8?q?=E6=95=B0=E9=87=8F=E8=B6=85=E8=BF=87=E9=98=88=E5=80=BC=E5=AE=9A?= =?UTF-8?q?=E6=97=B6=E4=BB=BB=E5=8A=A1=E5=91=8A=E8=AD=A6=E7=A9=BA=E6=8C=87?= =?UTF-8?q?=E9=92=88=E5=BC=82=E5=B8=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../RetryErrorMoreThresholdAlarmSchedule.java | 38 ++++++++++++++----- .../RetryTaskMoreThresholdAlarmSchedule.java | 27 ++++++++++--- .../web/service/impl/JobServiceImpl.java | 10 ++--- 3 files changed, 55 insertions(+), 20 deletions(-) diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryErrorMoreThresholdAlarmSchedule.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryErrorMoreThresholdAlarmSchedule.java index 0253537fe..ce7e8fc1b 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryErrorMoreThresholdAlarmSchedule.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryErrorMoreThresholdAlarmSchedule.java @@ -1,6 +1,7 @@ package com.aizuda.snailjob.server.retry.task.support.schedule; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.alarm.Alarm; import com.aizuda.snailjob.common.core.alarm.AlarmContext; import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory; @@ -25,6 +26,7 @@ import com.aizuda.snailjob.template.datasource.persistence.mapper.NotifyRecipien import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig; import com.aizuda.snailjob.template.datasource.persistence.po.NotifyRecipient; import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import com.google.common.collect.Lists; @@ -71,7 +73,8 @@ public class RetryErrorMoreThresholdAlarmSchedule extends AbstractSchedule imple @Override protected void doExecute() { - SnailJobLog.LOCAL.debug("retryErrorMoreThreshold time[{}] ip:[{}]", LocalDateTime.now(), + SnailJobLog.LOCAL.debug("retryErrorMoreThreshold time[{}] ip:[{}]", + LocalDateTime.now(), NetUtil.getLocalIpStr()); PartitionTaskUtils.process(this::getNotifyConfigPartitions, this::doHandler, 0); } @@ -84,6 +87,7 @@ public class RetryErrorMoreThresholdAlarmSchedule extends AbstractSchedule imple } private void doSendAlarm(NotifyConfigPartitionTask partitionTask) { + // x分钟内、x组、x场景进入死信队列的数据量 LocalDateTime now = LocalDateTime.now(); TaskAccess retryDeadLetterAccess = accessTemplate.getRetryDeadLetterAccess(); @@ -93,7 +97,7 @@ public class RetryErrorMoreThresholdAlarmSchedule extends AbstractSchedule imple between(RetryDeadLetter::getCreateDt, now.minusMinutes(30), now) .eq(RetryDeadLetter::getGroupName, partitionTask.getGroupName()) .eq(RetryDeadLetter::getSceneName, partitionTask.getBusinessId())); - if (count >= partitionTask.getNotifyThreshold()) { + if (partitionTask.getNotifyThreshold() > 0 && count >= partitionTask.getNotifyThreshold()) { List recipientInfos = partitionTask.getRecipientInfos(); for (final RecipientInfo recipientInfo : recipientInfos) { if (Objects.isNull(recipientInfo)) { @@ -120,20 +124,34 @@ public class RetryErrorMoreThresholdAlarmSchedule extends AbstractSchedule imple } } + // 1.拿500场景 + // 2.获取通知配置ids并查询通知配置 + // 3.Map<通知ID, 通知DTO> + // 4.循环场景查询死信数据量 + // 5.场景对应的通知ids private List getNotifyConfigPartitions(Long startId) { - List notifyConfigs = accessTemplate.getNotifyConfigAccess() - .listPage(new PageDTO<>(0, 1000), new LambdaQueryWrapper() - .gt(NotifyConfig::getId, startId) - .eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus()) - .eq(NotifyConfig::getNotifyScene, RetryNotifySceneEnum.MAX_RETRY_ERROR.getNotifyScene()) - .orderByAsc(NotifyConfig::getId) - ).getRecords(); + List retrySceneConfigList = accessTemplate.getSceneConfigAccess() + .listPage(new PageDTO<>(0, 500), new LambdaQueryWrapper<>()) + .getRecords(); - if (CollUtil.isEmpty(notifyConfigs)) { + Set retryNotifyIds = new HashSet<>(); + for (RetrySceneConfig retrySceneConfig : retrySceneConfigList) { + HashSet notifyIds = StrUtil.isBlank(retrySceneConfig.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(retrySceneConfig.getNotifyIds(), Long.class)); + retryNotifyIds.addAll(notifyIds); + } + if (CollUtil.isEmpty(retryNotifyIds)) { return Lists.newArrayList(); } + List notifyConfigs = accessTemplate.getNotifyConfigAccess() + .list(new LambdaQueryWrapper() + .gt(NotifyConfig::getId, startId) + .in(NotifyConfig::getId, retryNotifyIds) + .eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus()) + .eq(NotifyConfig::getNotifyScene, RetryNotifySceneEnum.MAX_RETRY_ERROR.getNotifyScene()) + .orderByAsc(NotifyConfig::getId)); + Set recipientIds = notifyConfigs.stream() .map(config -> new HashSet<>(JsonUtil.parseList(config.getRecipientIds(), Long.class))) .reduce((a, b) -> { diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryTaskMoreThresholdAlarmSchedule.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryTaskMoreThresholdAlarmSchedule.java index 6dbaee3c7..74c16fdc7 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryTaskMoreThresholdAlarmSchedule.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/schedule/RetryTaskMoreThresholdAlarmSchedule.java @@ -1,6 +1,7 @@ package com.aizuda.snailjob.server.retry.task.support.schedule; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.StrUtil; import com.aizuda.snailjob.common.core.alarm.AlarmContext; import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory; import com.aizuda.snailjob.common.core.enums.RetryNotifySceneEnum; @@ -22,6 +23,7 @@ import com.aizuda.snailjob.template.datasource.access.AccessTemplate; import com.aizuda.snailjob.template.datasource.persistence.mapper.NotifyRecipientMapper; import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig; import com.aizuda.snailjob.template.datasource.persistence.po.NotifyRecipient; +import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig; import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; @@ -80,6 +82,8 @@ public class RetryTaskMoreThresholdAlarmSchedule extends AbstractSchedule implem } private void doSendAlarm(NotifyConfigPartitionTask partitionTask) { + + // x分钟内、x组、x场景进入重试任务的数据量 long count = accessTemplate.getRetryTaskAccess() .count(partitionTask.getGroupName(), partitionTask.getNamespaceId(), new LambdaQueryWrapper() @@ -87,8 +91,7 @@ public class RetryTaskMoreThresholdAlarmSchedule extends AbstractSchedule implem .eq(RetryTask::getGroupName, partitionTask.getGroupName()) .eq(RetryTask::getSceneName, partitionTask.getBusinessId()) .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())); - if (count >= partitionTask.getNotifyThreshold()) { - + if (partitionTask.getNotifyThreshold() > 0 && count >= partitionTask.getNotifyThreshold()) { List recipientInfos = partitionTask.getRecipientInfos(); for (final RecipientInfo recipientInfo : recipientInfos) { if (Objects.isNull(recipientInfo)) { @@ -115,13 +118,27 @@ public class RetryTaskMoreThresholdAlarmSchedule extends AbstractSchedule implem private List getNotifyConfigPartitions(Long startId) { + List retrySceneConfigList = accessTemplate.getSceneConfigAccess() + .listPage(new PageDTO<>(0, 500), new LambdaQueryWrapper<>()) + .getRecords(); + + Set retryNotifyIds = new HashSet<>(); + for (RetrySceneConfig retrySceneConfig : retrySceneConfigList) { + HashSet notifyIds = StrUtil.isBlank(retrySceneConfig.getNotifyIds()) ? new HashSet<>() : new HashSet<>(JsonUtil.parseList(retrySceneConfig.getNotifyIds(), Long.class)); + retryNotifyIds.addAll(notifyIds); + } + if (CollUtil.isEmpty(retryNotifyIds)) { + return Lists.newArrayList(); + } + List notifyConfigs = accessTemplate.getNotifyConfigAccess() - .listPage(new PageDTO<>(0, 1000), new LambdaQueryWrapper() + .list(new LambdaQueryWrapper() + .gt(NotifyConfig::getId, startId) + .in(NotifyConfig::getId, retryNotifyIds) .eq(NotifyConfig::getNotifyStatus, StatusEnum.YES.getStatus()) .eq(NotifyConfig::getSystemTaskType, SyetemTaskTypeEnum.RETRY.getType()) .eq(NotifyConfig::getNotifyScene, RetryNotifySceneEnum.MAX_RETRY.getNotifyScene()) - .orderByAsc(NotifyConfig::getId)) // SQLServer 分页必须 ORDER BY - .getRecords(); + .orderByAsc(NotifyConfig::getId)); // SQLServer 分页必须 ORDER BY Set recipientIds = notifyConfigs.stream() .flatMap(config -> JsonUtil.parseList(config.getRecipientIds(), Long.class).stream()) diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/JobServiceImpl.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/JobServiceImpl.java index 63893d97d..6abd18c38 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/JobServiceImpl.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/impl/JobServiceImpl.java @@ -96,8 +96,7 @@ public class JobServiceImpl implements JobService { .eq(Job::getNamespaceId, userSessionVO.getNamespaceId()) .in(CollUtil.isNotEmpty(groupNames), Job::getGroupName, groupNames) .likeRight(StrUtil.isNotBlank(queryVO.getJobName()), Job::getJobName, StrUtil.trim(queryVO.getJobName())) - .like(StrUtil.isNotBlank(queryVO.getExecutorInfo()), Job::getExecutorInfo, - StrUtil.trim(queryVO.getExecutorInfo())) + .like(StrUtil.isNotBlank(queryVO.getExecutorInfo()), Job::getExecutorInfo, StrUtil.trim(queryVO.getExecutorInfo())) .eq(Objects.nonNull(queryVO.getJobStatus()), Job::getJobStatus, queryVO.getJobStatus()) .eq(Job::getDeleted, StatusEnum.NO.getStatus()) .eq(Objects.nonNull(queryVO.getOwnerId()), Job::getOwnerId, queryVO.getOwnerId()) @@ -105,8 +104,9 @@ public class JobServiceImpl implements JobService { List jobResponseList = JobResponseVOConverter.INSTANCE.convertList(selectPage.getRecords()); for (JobResponseVO jobResponseVO : jobResponseList) { - SystemUser systemUser = systemUserMapper.selectById(jobResponseVO.getOwnerId()); - if (Objects.nonNull(systemUser)) { + // 兼容Oracle OwnerId Null查询异常:java.sql.SQLException: 无效的列类型: 1111 + if (Objects.nonNull(jobResponseVO.getOwnerId())) { + SystemUser systemUser = systemUserMapper.selectById(jobResponseVO.getOwnerId()); jobResponseVO.setOwnerName(systemUser.getUsername()); } } @@ -241,7 +241,7 @@ public class JobServiceImpl implements JobService { // 设置now表示立即执行 jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli()); jobTaskPrepare.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_JOB.getType()); - if (StrUtil.isNotBlank(jobTrigger.getTmpArgsStr())){ + if (StrUtil.isNotBlank(jobTrigger.getTmpArgsStr())) { jobTaskPrepare.setTmpArgsStr(jobTrigger.getTmpArgsStr()); } // 创建批次