feat(sj_1.1.0-beta3): 1. 优化重试模块删除逻辑

This commit is contained in:
opensnail 2024-07-13 08:33:13 +08:00
parent 2cd0d0ca8b
commit dec32efc2a
4 changed files with 38 additions and 18 deletions

View File

@ -194,6 +194,7 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService {
retryTaskLogMapper.delete(new LambdaQueryWrapper<RetryTaskLog>()
.in(RetryTaskLog::getRetryStatus, ALLOW_DELETE_STATUS)
.eq(RetryTaskLog::getNamespaceId, namespaceId)
.eq(RetryTaskLog::getGroupName, deadLetterVO.getGroupName())
.in(RetryTaskLog::getUniqueId, uniqueIds));
retryTaskLogMessageMapper.delete(

View File

@ -173,7 +173,8 @@ public class RetryTaskLogServiceImpl implements RetryTaskLogService {
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
RetryTaskLog retryTaskLog = retryTaskLogMapper.selectOne(
new LambdaQueryWrapper<RetryTaskLog>().eq(RetryTaskLog::getRetryStatus, RetryStatusEnum.FINISH.getStatus())
new LambdaQueryWrapper<RetryTaskLog>()
.in(RetryTaskLog::getRetryStatus, List.of(RetryStatusEnum.FINISH.getStatus(), RetryStatusEnum.MAX_COUNT.getStatus()))
.eq(RetryTaskLog::getNamespaceId, namespaceId)
.eq(RetryTaskLog::getId, id));
Assert.notNull(retryTaskLog, () -> new SnailJobServerException("数据删除失败"));
@ -194,7 +195,7 @@ public class RetryTaskLogServiceImpl implements RetryTaskLogService {
List<RetryTaskLog> retryTaskLogs = retryTaskLogMapper.selectList(
new LambdaQueryWrapper<RetryTaskLog>()
.in(RetryTaskLog::getRetryStatus, ALLOW_DELETE_STATUS)
.in(RetryTaskLog::getRetryStatus, List.of(RetryStatusEnum.FINISH.getStatus(), RetryStatusEnum.MAX_COUNT.getStatus()))
.eq(RetryTaskLog::getNamespaceId, namespaceId)
.in(RetryTaskLog::getId, ids));
Assert.notEmpty(retryTaskLogs, () -> new SnailJobServerException("数据不存在"));

View File

@ -44,6 +44,7 @@ import com.aizuda.snailjob.template.datasource.persistence.po.*;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
@ -258,6 +259,17 @@ public class RetryTaskServiceImpl implements RetryTaskService {
TaskAccess<RetryTask> retryTaskAccess = accessTemplate.getRetryTaskAccess();
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
List<RetryTask> tasks = retryTaskAccess.list(requestVO.getGroupName(), namespaceId,
new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getNamespaceId, namespaceId)
.eq(RetryTask::getGroupName, requestVO.getGroupName())
.in(RetryTask::getRetryStatus, ALLOW_DELETE_STATUS)
.in(RetryTask::getId, requestVO.getIds())
);
Assert.notEmpty(tasks,
() -> new SnailJobServerException("没有可删除的数据, 只有非【处理中】的数据可以删除"));
Assert.isTrue(requestVO.getIds().size() == retryTaskAccess.delete(requestVO.getGroupName(), namespaceId,
new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getNamespaceId, namespaceId)
@ -266,22 +278,10 @@ public class RetryTaskServiceImpl implements RetryTaskService {
.in(RetryTask::getId, requestVO.getIds()))
, () -> new SnailJobServerException("删除重试任务失败, 请检查任务状态是否为已完成或者最大次数"));
List<RetryTask> tasks = retryTaskAccess.list(requestVO.getGroupName(), namespaceId,
new LambdaQueryWrapper<RetryTask>()
.select(RetryTask::getUniqueId)
.eq(RetryTask::getNamespaceId, namespaceId)
.eq(RetryTask::getGroupName, requestVO.getGroupName())
.eq(RetryTask::getRetryStatus, ALLOW_DELETE_STATUS)
.in(RetryTask::getId, requestVO.getIds())
);
if (CollUtil.isEmpty(tasks)) {
return Boolean.TRUE;
}
Set<String> uniqueIds = StreamUtils.toSet(tasks, RetryTask::getUniqueId);
retryTaskLogMapper.delete(new LambdaQueryWrapper<RetryTaskLog>()
.in(RetryTaskLog::getRetryStatus, ALLOW_DELETE_STATUS)
.eq(RetryTaskLog::getGroupName, requestVO.getGroupName())
.eq(RetryTaskLog::getNamespaceId, namespaceId)
.in(RetryTaskLog::getUniqueId, uniqueIds));

View File

@ -25,9 +25,12 @@ import com.aizuda.snailjob.server.web.service.handler.SyncConfigHandler;
import com.aizuda.snailjob.server.web.util.UserSessionUtils;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.access.ConfigAccess;
import com.aizuda.snailjob.template.datasource.access.TaskAccess;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetrySummaryMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySummary;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
@ -226,7 +229,6 @@ public class SceneConfigServiceImpl implements SceneConfigService {
@Transactional
public boolean deleteByIds(Set<Long> ids) {
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
LambdaQueryWrapper<RetrySceneConfig> queryWrapper = new LambdaQueryWrapper<RetrySceneConfig>()
.select(RetrySceneConfig::getSceneName, RetrySceneConfig::getGroupName)
.eq(RetrySceneConfig::getNamespaceId, namespaceId)
@ -236,11 +238,27 @@ public class SceneConfigServiceImpl implements SceneConfigService {
List<RetrySceneConfig> sceneConfigs = accessTemplate.getSceneConfigAccess().list(queryWrapper);
Assert.notEmpty(sceneConfigs, () -> new SnailJobServerException("删除重试场景失败, 请检查场景状态是否关闭状态"));
Set<String> sceneNames = StreamUtils.toSet(sceneConfigs, RetrySceneConfig::getSceneName);
Set<String> groupNames = StreamUtils.toSet(sceneConfigs, RetrySceneConfig::getGroupName);
TaskAccess<RetryTask> retryTaskAccess = accessTemplate.getRetryTaskAccess();
TaskAccess<RetryDeadLetter> retryTaskTaskAccess = accessTemplate.getRetryDeadLetterAccess();
for (String groupName : groupNames) {
List<RetryTask> retryTasks = retryTaskAccess.listPage(groupName, namespaceId, new PageDTO<>(1, 1),
new LambdaQueryWrapper<RetryTask>().in(RetryTask::getSceneName, sceneNames).orderByAsc(RetryTask::getId)).getRecords();
Assert.isTrue(CollUtil.isEmpty(retryTasks),
() -> new SnailJobServerException("删除重试场景失败, 存在【重试任务】请先删除【重试任务】在重试"));
List<RetryDeadLetter> retryDeadLetters = retryTaskTaskAccess.listPage(groupName, namespaceId, new PageDTO<>(1, 1),
new LambdaQueryWrapper<RetryDeadLetter>().in(RetryDeadLetter::getSceneName, sceneNames).orderByAsc(RetryDeadLetter::getId)).getRecords();
Assert.isTrue(CollUtil.isEmpty(retryDeadLetters),
() -> new SnailJobServerException("删除重试场景失败, 存在【死信任务】请先删除【死信任务】在重试"));
}
Assert.isTrue(ids.size() == accessTemplate.getSceneConfigAccess().delete(queryWrapper),
() -> new SnailJobServerException("删除重试场景失败, 请检查场景状态是否关闭状态"));
Set<String> sceneNames = StreamUtils.toSet(sceneConfigs, RetrySceneConfig::getSceneName);
Set<String> groupNames = StreamUtils.toSet(sceneConfigs, RetrySceneConfig::getGroupName);
List<RetrySummary> retrySummaries = retrySummaryMapper.selectList(
new LambdaQueryWrapper<RetrySummary>()
.select(RetrySummary::getId)