feat(sj_1.1.0-beta3): 新增组、命名空间删除功能。优化定时任务批次、工作流批次、重试任务删除逻辑

This commit is contained in:
opensnail 2024-07-09 23:48:11 +08:00
parent 7321b6fb79
commit a202e23b09
31 changed files with 324 additions and 179 deletions

View File

@ -3,6 +3,7 @@ package com.aizuda.snailjob.common.core.enums;
import lombok.Getter;
import org.springframework.lang.NonNull;
import java.util.List;
import java.util.Objects;
/**
@ -36,6 +37,9 @@ public enum RetryStatusEnum {
private final Integer status;
public static final List<Integer> ALLOW_DELETE_STATUS =
List.of(RetryStatusEnum.FINISH.getStatus(), RetryStatusEnum.MAX_COUNT.status, RetryStatusEnum.SUSPEND.status);
RetryStatusEnum(int status) {
this.status = status;
}

View File

@ -107,8 +107,8 @@ public class GroupConfigController {
}
@LoginRequired(role = RoleEnum.ADMIN)
@DeleteMapping("{id}")
public boolean deleteByIds(@PathVariable("id") Long id) {
return groupConfigService.deleteByIds(id);
@DeleteMapping("{groupName}")
public boolean deleteByGroupName(@PathVariable("groupName") String groupName) {
return groupConfigService.deleteByGroupName(groupName);
}
}

View File

@ -7,6 +7,7 @@ import com.aizuda.snailjob.server.web.model.response.JobBatchResponseVO;
import com.aizuda.snailjob.server.web.service.JobBatchService;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.Size;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;
@ -51,7 +52,10 @@ public class JobBatchController {
@DeleteMapping("/ids")
@LoginRequired
public Boolean deleteJobBatchById(@RequestBody @Valid @NotEmpty(message = "ids不能为空") Set<Long> ids) {
return jobBatchService.deleteJobBatchById(ids);
public Boolean deleteJobBatchByIds(@RequestBody @Valid
@NotEmpty(message = "ids不能为空")
@Size(max = 100, message = "最多删除5个")
Set<Long> ids) {
return jobBatchService.deleteJobBatchByIds(ids);
}
}

View File

@ -73,13 +73,6 @@ public class JobController {
return jobService.updateJobStatus(jobRequestVO);
}
@DeleteMapping("{id}")
@LoginRequired
@Deprecated
public Boolean deleteJobById(@PathVariable("id") Long id) {
return jobService.deleteJobById(id);
}
@DeleteMapping("/ids")
@LoginRequired
public Boolean deleteJobById(@RequestBody @Valid @NotEmpty(message = "ids不能为空") Set<Long> ids) {

View File

@ -37,9 +37,4 @@ public class JobTaskController {
return jobTaskService.getTreeJobTask(jobTaskQueryVO);
}
@DeleteMapping("/ids")
@LoginRequired
public Boolean deleteJobTaskById(@RequestBody @Valid @NotEmpty(message = "ids不能为空") Set<Long> ids) {
return jobTaskService.deleteJobTaskById(ids);
}
}

View File

@ -44,9 +44,9 @@ public class NamespaceController {
}
@LoginRequired(role = RoleEnum.ADMIN)
@DeleteMapping("{id}")
public Boolean deleteNamespace(@PathVariable("id") Long id) {
return namespaceService.deleteNamespace(id);
@DeleteMapping("{uniqueId}")
public Boolean deleteByUniqueId(@PathVariable("uniqueId") String uniqueId) {
return namespaceService.deleteByUniqueId(uniqueId);
}
@LoginRequired(role = RoleEnum.ADMIN)

View File

@ -47,7 +47,7 @@ public class RetryDeadLetterController {
@LoginRequired
@DeleteMapping("/batch")
public int batchDelete(@RequestBody @Validated BatchDeleteRetryDeadLetterVO deadLetterVO) {
public boolean batchDelete(@RequestBody @Validated BatchDeleteRetryDeadLetterVO deadLetterVO) {
return retryDeadLetterService.batchDelete(deadLetterVO);
}
}

View File

@ -64,8 +64,8 @@ public class RetryTaskController {
@LoginRequired
@DeleteMapping("/batch")
public Integer deleteRetryTask(@RequestBody @Validated BatchDeleteRetryTaskVO requestVO) {
return retryTaskService.deleteRetryTask(requestVO);
public boolean batchDeleteRetryTask(@RequestBody @Validated BatchDeleteRetryTaskVO requestVO) {
return retryTaskService.batchDeleteRetryTask(requestVO);
}
@LoginRequired

View File

@ -1,15 +1,20 @@
package com.aizuda.snailjob.server.web.controller;
import com.aizuda.snailjob.server.web.annotation.LoginRequired;
import com.aizuda.snailjob.server.web.annotation.RoleEnum;
import com.aizuda.snailjob.server.web.model.base.PageResult;
import com.aizuda.snailjob.server.web.model.request.WorkflowBatchQueryVO;
import com.aizuda.snailjob.server.web.model.response.WorkflowBatchResponseVO;
import com.aizuda.snailjob.server.web.model.response.WorkflowDetailResponseVO;
import com.aizuda.snailjob.server.web.service.WorkflowBatchService;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.Size;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Set;
/**
* @author xiaowoniu
@ -39,4 +44,13 @@ public class WorkflowBatchController {
public Boolean stop(@PathVariable("id") Long id) {
return workflowBatchService.stop(id);
}
@DeleteMapping("/ids")
@LoginRequired(role = RoleEnum.USER)
public Boolean deleteByIds(@RequestBody @Valid
@NotEmpty(message = "ids不能为空")
@Size(max = 100, message = "最多删除5个")
Set<Long> ids) {
return workflowBatchService.deleteByIds(ids);
}
}

View File

@ -15,6 +15,8 @@ import com.aizuda.snailjob.server.web.model.response.WorkflowResponseVO;
import com.aizuda.snailjob.server.web.service.WorkflowService;
import com.aizuda.snailjob.server.web.util.ExportUtils;
import com.aizuda.snailjob.server.web.util.ImportUtils;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import lombok.RequiredArgsConstructor;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
@ -24,6 +26,7 @@ import org.springframework.web.multipart.MultipartFile;
import java.io.IOException;
import java.util.List;
import java.util.Set;
/**
* @author xiaowoniu
@ -67,10 +70,10 @@ public class WorkflowController {
return workflowService.updateStatus(id);
}
@DeleteMapping("/{id}")
@DeleteMapping("/ids")
@LoginRequired(role = RoleEnum.USER)
public Boolean deleteById(@PathVariable("id") Long id) {
return workflowService.deleteById(id);
public Boolean deleteByIds(@RequestBody @Valid @NotEmpty(message = "ids不能为空") Set<Long> ids) {
return workflowService.deleteByIds(ids);
}
@PostMapping("/trigger/{id}")

View File

@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.web.model.request;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.Pattern;
import jakarta.validation.constraints.Size;
import lombok.Data;
import java.util.List;
@ -27,5 +28,6 @@ public class BatchDeleteRetryTaskVO {
* 重试表id
*/
@NotEmpty(message = "至少选择一项")
@Size(max = 100, message = "最多只能删除100条")
private List<Long> ids;
}

View File

@ -39,5 +39,5 @@ public interface GroupConfigService {
String exportGroup(ExportGroupVO exportGroupVO);
boolean deleteByIds(Long id);
boolean deleteByGroupName(String groupName);
}

View File

@ -22,5 +22,5 @@ public interface JobBatchService {
Boolean retry(Long taskBatchId);
Boolean deleteJobBatchById(Set<Long> ids);
Boolean deleteJobBatchByIds(Set<Long> ids);
}

View File

@ -28,8 +28,6 @@ public interface JobService {
Boolean updateJobStatus(JobStatusUpdateRequestVO jobRequestVO);
Boolean deleteJobById(Long id);
List<String> getTimeByCron(String cron);
List<JobResponseVO> getJobNameList(String keywords, Long jobId, String groupName);

View File

@ -19,5 +19,4 @@ public interface JobTaskService {
List<JobTaskResponseVO> getTreeJobTask(JobTaskQueryVO jobTaskQueryVO);
Boolean deleteJobTaskById(Set<Long> ids);
}

View File

@ -20,7 +20,7 @@ public interface NamespaceService {
PageResult<List<NamespaceResponseVO>> getNamespacePage(NamespaceQueryVO queryVO);
Boolean deleteNamespace(Long id);
Boolean deleteByUniqueId(String id);
List<NamespaceResponseVO> getAllNamespace();

View File

@ -20,5 +20,5 @@ public interface RetryDeadLetterService {
int rollback(BatchRollBackRetryDeadLetterVO rollBackRetryDeadLetterVO);
int batchDelete(BatchDeleteRetryDeadLetterVO deadLetterVO);
boolean batchDelete(BatchDeleteRetryDeadLetterVO deadLetterVO);
}

View File

@ -62,7 +62,7 @@ public interface RetryTaskService {
* @param requestVO 批量删除重试数据
* @return
*/
Integer deleteRetryTask(BatchDeleteRetryTaskVO requestVO);
boolean batchDeleteRetryTask(BatchDeleteRetryTaskVO requestVO);
/**
* 解析日志

View File

@ -6,6 +6,7 @@ import com.aizuda.snailjob.server.web.model.response.WorkflowBatchResponseVO;
import com.aizuda.snailjob.server.web.model.response.WorkflowDetailResponseVO;
import java.util.List;
import java.util.Set;
/**
* @author xiaowoniu
@ -19,4 +20,6 @@ public interface WorkflowBatchService {
WorkflowDetailResponseVO getWorkflowBatchDetail(Long id);
Boolean stop(Long id);
Boolean deleteByIds(Set<Long> ids);
}

View File

@ -14,6 +14,7 @@ import jakarta.validation.constraints.NotEmpty;
import java.io.IOException;
import java.util.List;
import java.util.Set;
/**
* @author xiaowoniu
@ -32,8 +33,6 @@ public interface WorkflowService {
Boolean updateStatus(Long id);
Boolean deleteById(Long id);
Boolean trigger(Long id);
List<WorkflowResponseVO> getWorkflowNameList(String keywords, Long workflowId, String groupName);
@ -44,4 +43,6 @@ public interface WorkflowService {
String exportWorkflowTask(ExportWorkflowVO exportWorkflowVO
);
Boolean deleteByIds(Set<Long> ids);
}

View File

@ -22,21 +22,17 @@ import com.aizuda.snailjob.server.job.task.support.stop.JobTaskStopFactory;
import com.aizuda.snailjob.server.job.task.support.stop.TaskStopJobContext;
import com.aizuda.snailjob.server.job.task.support.timer.JobTimeoutCheckTask;
import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch;
import com.aizuda.snailjob.template.datasource.persistence.mapper.*;
import com.aizuda.snailjob.template.datasource.persistence.po.*;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Set;
/**
* @author: xiaowoniu
@ -51,7 +47,7 @@ public class JobHandler {
private final JobMapper jobMapper;
private final JobTaskMapper jobTaskMapper;
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
private final JobLogMessageMapper jobLogMessageMapper;
public Boolean retry(Long taskBatchId) {
return retry(taskBatchId, null, null);
@ -171,4 +167,32 @@ public class JobHandler {
return workflowTaskBatch.getWfContext();
}
/**
* 批次删除定时任务批次
*
* @param ids 任务批次id
* @param namespaceId 命名空间
*/
@Transactional
public void deleteJobTaskBatchByIds(Set<Long> ids, String namespaceId) {
Assert.isTrue(ids.size() == jobTaskBatchMapper.delete(
new LambdaQueryWrapper<JobTaskBatch>()
.in(JobTaskBatch::getId, ids)
), () -> new SnailJobServerException("删除任务批次失败"));
Assert.isTrue(ids.size() == jobTaskMapper.delete(
new LambdaQueryWrapper<JobTask>()
.eq(JobTask::getNamespaceId, namespaceId)
.in(JobTask::getId, ids)
), () -> new SnailJobServerException("删除任务批次失败"));
// 删除日志信息
jobLogMessageMapper.delete(new LambdaQueryWrapper<JobLogMessage>()
.eq(JobLogMessage::getNamespaceId, namespaceId)
.in(JobLogMessage::getTaskId, ids)
);
}
}

View File

@ -28,9 +28,7 @@ import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.access.ConfigAccess;
import com.aizuda.snailjob.template.datasource.access.TaskAccess;
import com.aizuda.snailjob.template.datasource.enums.DbTypeEnum;
import com.aizuda.snailjob.template.datasource.persistence.mapper.NamespaceMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.SequenceAllocMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.ServerNodeMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.*;
import com.aizuda.snailjob.template.datasource.persistence.po.*;
import com.aizuda.snailjob.template.datasource.utils.DbUtils;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@ -75,6 +73,9 @@ public class GroupConfigServiceImpl implements GroupConfigService {
private final SystemProperties systemProperties;
private final JdbcTemplate jdbcTemplate;
private final NamespaceMapper namespaceMapper;
private final JobMapper jobMapper;
private final WorkflowMapper workflowMapper;
private final SystemUserPermissionMapper systemUserPermissionMapper;
@Override
@Transactional
@ -446,21 +447,41 @@ public class GroupConfigServiceImpl implements GroupConfigService {
}
@Override
public boolean deleteByIds(Long id) {
public boolean deleteByGroupName(String groupName) {
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
// 前置检查
// 1. 定时任务是否删除
Assert.isTrue(CollUtil.isEmpty(jobMapper.selectList(new PageDTO<>(1, 1), new LambdaQueryWrapper<Job>()
.eq(Job::getNamespaceId, namespaceId)
.eq(Job::getGroupName, groupName).orderByAsc(Job::getId))),
() -> new SnailJobServerException("存在未删除的定时任务. 请先删除当前组的定时任务后再重试删除"));
// 2. 工作流是否删除
Assert.isTrue(CollUtil.isEmpty(workflowMapper.selectList(new PageDTO<>(1, 1), new LambdaQueryWrapper<Workflow>()
.eq(Workflow::getNamespaceId, namespaceId)
.eq(Workflow::getGroupName, groupName).orderByAsc(Workflow::getId))),
() -> new SnailJobServerException("存在未删除的工作流任务. 请先删除当前组的工作流任务后再重试删除"));
// 3. 重试场景是否删除
Assert.isTrue(CollUtil.isEmpty(accessTemplate.getSceneConfigAccess().listPage(new PageDTO<>(1, 1), new LambdaQueryWrapper<RetrySceneConfig>()
.eq(RetrySceneConfig::getNamespaceId, namespaceId)
.eq(RetrySceneConfig::getGroupName, groupName).orderByAsc(RetrySceneConfig::getId)).getRecords()),
() -> new SnailJobServerException("存在未删除的重试场景. 请先删除当前组的重试场景后再重试删除"));
// 4. 是否存在已分配的权限
Assert.isTrue(CollUtil.isEmpty(systemUserPermissionMapper.selectList(new PageDTO<>(1, 1), new LambdaQueryWrapper<SystemUserPermission>()
.eq(SystemUserPermission::getNamespaceId, namespaceId)
.eq(SystemUserPermission::getGroupName, groupName).orderByAsc(SystemUserPermission::getId))),
() -> new SnailJobServerException("存在已分配组权限. 请先删除已分配的组权限后再重试删除"));
// 5. 检查是否存活的客户端节点
Assert.isTrue(CollUtil.isEmpty(serverNodeMapper.selectList(new PageDTO<>(1, 1), new LambdaQueryWrapper<ServerNode>()
.eq(ServerNode::getNamespaceId, namespaceId)
.eq(ServerNode::getGroupName, groupName).orderByAsc(ServerNode::getId))),
() -> new SnailJobServerException("存在存活中客户端节点."));
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
Assert.isTrue(1 == accessTemplate.getGroupConfigAccess().delete(
new LambdaQueryWrapper<GroupConfig>()
.eq(GroupConfig::getNamespaceId, namespaceId)
.eq(GroupConfig::getGroupStatus, StatusEnum.NO.getStatus())
.eq(GroupConfig::getId, id)),
() -> new SnailJobServerException("删除组失败, 请检查状态是否关闭状态"));
new LambdaQueryWrapper<GroupConfig>()
.eq(GroupConfig::getNamespaceId, namespaceId)
.eq(GroupConfig::getGroupStatus, StatusEnum.NO.getStatus())
.eq(GroupConfig::getGroupName, groupName)),
() -> new SnailJobServerException("删除组失败, 请检查状态是否关闭状态"));
return Boolean.TRUE;
}

View File

@ -124,14 +124,9 @@ public class JobBatchServiceImpl implements JobBatchService {
}
@Override
public Boolean deleteJobBatchById(Set<Long> ids) {
public Boolean deleteJobBatchByIds(Set<Long> ids) {
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
Assert.isTrue(ids.size() == jobTaskBatchMapper.delete(
new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getNamespaceId, namespaceId)
.in(JobTaskBatch::getId, ids)
), () -> new SnailJobServerException("删除任务批次失败"));
jobHandler.deleteJobTaskBatchByIds(ids, namespaceId);
return Boolean.TRUE;
}
}

View File

@ -208,13 +208,6 @@ public class JobServiceImpl implements JobService {
return 1 == jobMapper.updateById(job);
}
@Override
public Boolean deleteJobById(Long id) {
Job job = new Job();
job.setId(id);
job.setDeleted(StatusEnum.YES.getStatus());
return 1 == jobMapper.updateById(job);
}
@Override
public boolean trigger(Long jobId) {

View File

@ -66,25 +66,6 @@ public class JobTaskServiceImpl implements JobTaskService {
return convertJobTaskList(taskList);
}
@Override
public Boolean deleteJobTaskById(Set<Long> ids) {
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
Assert.isTrue(ids.size() == jobTaskMapper.delete(
new LambdaQueryWrapper<JobTask>()
.eq(JobTask::getNamespaceId, namespaceId)
.in(JobTask::getId, ids)
), () -> new SnailJobServerException("删除任务批次失败"));
// 删除日志信息
jobLogMessageMapper.delete(new LambdaQueryWrapper<JobLogMessage>()
.eq(JobLogMessage::getNamespaceId, namespaceId)
.eq(JobLogMessage::getTaskId, ids)
);
return Boolean.TRUE;
}
private List<JobTaskResponseVO> convertJobTaskList(List<JobTask> taskList) {
if (CollUtil.isEmpty(taskList)) {
return new ArrayList<>();

View File

@ -1,5 +1,6 @@
package com.aizuda.snailjob.server.web.service.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
@ -12,7 +13,9 @@ import com.aizuda.snailjob.server.web.model.request.NamespaceRequestVO;
import com.aizuda.snailjob.server.web.model.response.NamespaceResponseVO;
import com.aizuda.snailjob.server.web.service.NamespaceService;
import com.aizuda.snailjob.server.web.service.convert.NamespaceResponseVOConverter;
import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.NamespaceMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.Namespace;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
@ -23,6 +26,8 @@ import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static com.aizuda.snailjob.common.core.constant.SystemConstants.DEFAULT_NAMESPACE;
/**
* @author: xiaowoniu
* @date : 2023-11-21 15:42
@ -32,6 +37,7 @@ import java.util.regex.Pattern;
@RequiredArgsConstructor
public class NamespaceServiceImpl implements NamespaceService {
private final NamespaceMapper namespaceMapper;
private final GroupConfigMapper groupConfigMapper;
@Override
public Boolean saveNamespace(final NamespaceRequestVO namespaceRequestVO) {
@ -87,8 +93,18 @@ public class NamespaceServiceImpl implements NamespaceService {
}
@Override
public Boolean deleteNamespace(Long id) {
return 1 == namespaceMapper.deleteById(id);
public Boolean deleteByUniqueId(String uniqueId) {
Assert.isFalse(DEFAULT_NAMESPACE.equals(uniqueId), ()-> new SnailJobServerException("默认空间禁止删除"));
Assert.isTrue(CollUtil.isEmpty(groupConfigMapper.selectList(new PageDTO<>(1, 1), new LambdaQueryWrapper<GroupConfig>()
.eq(GroupConfig::getNamespaceId, uniqueId).orderByAsc(GroupConfig::getId))),
() -> new SnailJobServerException("存在未删除的组配置任务. 请先删除当前关联的组配置再重试删除"));
Assert.isTrue(1 == namespaceMapper.delete(new LambdaQueryWrapper<Namespace>().eq(Namespace::getUniqueId, uniqueId)),
() -> new SnailJobServerException("删除命名空间失败"));
return Boolean.TRUE;
}
@Override

View File

@ -25,10 +25,8 @@ import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.access.ConfigAccess;
import com.aizuda.snailjob.template.datasource.access.TaskAccess;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryDeadLetter;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLog;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMessageMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.*;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
@ -42,6 +40,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import static com.aizuda.snailjob.common.core.enums.RetryStatusEnum.ALLOW_DELETE_STATUS;
/**
* @author: opensnail
* @date : 2022-02-28 09:46
@ -53,6 +53,8 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService {
private AccessTemplate accessTemplate;
@Autowired
private RetryTaskLogMapper retryTaskLogMapper;
@Autowired
private RetryTaskLogMessageMapper retryTaskLogMessageMapper;
@Override
public PageResult<List<RetryDeadLetterResponseVO>> getRetryDeadLetterPage(RetryDeadLetterQueryVO queryVO) {
@ -164,13 +166,43 @@ public class RetryDeadLetterServiceImpl implements RetryDeadLetterService {
}
@Override
public int batchDelete(BatchDeleteRetryDeadLetterVO deadLetterVO) {
public boolean batchDelete(BatchDeleteRetryDeadLetterVO deadLetterVO) {
TaskAccess<RetryDeadLetter> retryDeadLetterAccess = accessTemplate.getRetryDeadLetterAccess();
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
return retryDeadLetterAccess.delete(deadLetterVO.getGroupName(), namespaceId,
Assert.isTrue(deadLetterVO.getIds().size() == retryDeadLetterAccess.delete(deadLetterVO.getGroupName(), namespaceId,
new LambdaQueryWrapper<RetryDeadLetter>()
.eq(RetryDeadLetter::getNamespaceId, namespaceId)
.eq(RetryDeadLetter::getGroupName, deadLetterVO.getGroupName())
.in(RetryDeadLetter::getId, deadLetterVO.getIds())),
() -> new SnailJobServerException("删除死信任务失败"));
List<RetryDeadLetter> tasks = retryDeadLetterAccess.list(deadLetterVO.getGroupName(), namespaceId,
new LambdaQueryWrapper<RetryDeadLetter>()
.select(RetryDeadLetter::getUniqueId)
.eq(RetryDeadLetter::getNamespaceId, namespaceId)
.eq(RetryDeadLetter::getGroupName, deadLetterVO.getGroupName())
.in(RetryDeadLetter::getId, deadLetterVO.getIds()));
.in(RetryDeadLetter::getId, deadLetterVO.getIds())
);
if (CollUtil.isEmpty(tasks)) {
return Boolean.TRUE;
}
Set<String> uniqueIds = StreamUtils.toSet(tasks, RetryDeadLetter::getUniqueId);
retryTaskLogMapper.delete(new LambdaQueryWrapper<RetryTaskLog>()
.in(RetryTaskLog::getRetryStatus, ALLOW_DELETE_STATUS)
.eq(RetryTaskLog::getNamespaceId, namespaceId)
.in(RetryTaskLog::getUniqueId, uniqueIds));
retryTaskLogMessageMapper.delete(
new LambdaQueryWrapper<RetryTaskLogMessage>()
.eq(RetryTaskLogMessage::getNamespaceId, namespaceId)
.eq(RetryTaskLogMessage::getGroupName, deadLetterVO.getGroupName())
.in(RetryTaskLogMessage::getUniqueId, uniqueIds));
return Boolean.TRUE;
}
}

View File

@ -31,6 +31,8 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.*;
import java.util.stream.Collectors;
import static com.aizuda.snailjob.common.core.enums.RetryStatusEnum.ALLOW_DELETE_STATUS;
/**
* @author: opensnail
* @date : 2022-02-28 09:10
@ -192,7 +194,7 @@ public class RetryTaskLogServiceImpl implements RetryTaskLogService {
List<RetryTaskLog> retryTaskLogs = retryTaskLogMapper.selectList(
new LambdaQueryWrapper<RetryTaskLog>()
.eq(RetryTaskLog::getRetryStatus, RetryStatusEnum.FINISH.getStatus())
.in(RetryTaskLog::getRetryStatus, ALLOW_DELETE_STATUS)
.eq(RetryTaskLog::getNamespaceId, namespaceId)
.in(RetryTaskLog::getId, ids));
Assert.notEmpty(retryTaskLogs, () -> new SnailJobServerException("数据不存在"));

View File

@ -39,10 +39,8 @@ import com.aizuda.snailjob.server.web.util.UserSessionUtils;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.access.TaskAccess;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTaskLog;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskLogMessageMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.*;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
@ -56,6 +54,8 @@ import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static com.aizuda.snailjob.common.core.enums.RetryStatusEnum.ALLOW_DELETE_STATUS;
/**
* @author opensnail
* @date 2022-02-27
@ -76,6 +76,8 @@ public class RetryTaskServiceImpl implements RetryTaskService {
@Lazy
@Autowired
private List<TaskExecutor> taskExecutors;
@Autowired
private RetryTaskLogMessageMapper retryTaskLogMessageMapper;
@Override
public PageResult<List<RetryTaskResponseVO>> getRetryTaskPage(RetryTaskQueryVO queryVO) {
@ -251,14 +253,44 @@ public class RetryTaskServiceImpl implements RetryTaskService {
}
@Override
public Integer deleteRetryTask(final BatchDeleteRetryTaskVO requestVO) {
@Transactional
public boolean batchDeleteRetryTask(final BatchDeleteRetryTaskVO requestVO) {
TaskAccess<RetryTask> retryTaskAccess = accessTemplate.getRetryTaskAccess();
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
return retryTaskAccess.delete(requestVO.getGroupName(), namespaceId,
Assert.isTrue(requestVO.getIds().size() == retryTaskAccess.delete(requestVO.getGroupName(), namespaceId,
new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getNamespaceId, namespaceId)
.eq(RetryTask::getGroupName, requestVO.getGroupName())
.in(RetryTask::getRetryStatus, ALLOW_DELETE_STATUS)
.in(RetryTask::getId, requestVO.getIds()))
, () -> new SnailJobServerException("删除重试任务失败, 请检查任务状态是否为已完成或者最大次数"));
List<RetryTask> tasks = retryTaskAccess.list(requestVO.getGroupName(), namespaceId,
new LambdaQueryWrapper<RetryTask>()
.select(RetryTask::getUniqueId)
.eq(RetryTask::getNamespaceId, namespaceId)
.eq(RetryTask::getGroupName, requestVO.getGroupName())
.in(RetryTask::getId, requestVO.getIds()));
.eq(RetryTask::getRetryStatus, ALLOW_DELETE_STATUS)
.in(RetryTask::getId, requestVO.getIds())
);
if (CollUtil.isEmpty(tasks)) {
return Boolean.TRUE;
}
Set<String> uniqueIds = StreamUtils.toSet(tasks, RetryTask::getUniqueId);
retryTaskLogMapper.delete(new LambdaQueryWrapper<RetryTaskLog>()
.in(RetryTaskLog::getRetryStatus, ALLOW_DELETE_STATUS)
.eq(RetryTaskLog::getNamespaceId, namespaceId)
.in(RetryTaskLog::getUniqueId, uniqueIds));
retryTaskLogMessageMapper.delete(
new LambdaQueryWrapper<RetryTaskLogMessage>()
.eq(RetryTaskLogMessage::getNamespaceId, namespaceId)
.eq(RetryTaskLogMessage::getGroupName, requestVO.getGroupName())
.in(RetryTaskLogMessage::getUniqueId, uniqueIds));
return Boolean.TRUE;
}
@Override

View File

@ -22,6 +22,7 @@ import com.aizuda.snailjob.server.web.model.response.WorkflowDetailResponseVO;
import com.aizuda.snailjob.server.web.service.WorkflowBatchService;
import com.aizuda.snailjob.server.web.service.convert.JobBatchResponseVOConverter;
import com.aizuda.snailjob.server.web.service.convert.WorkflowConverter;
import com.aizuda.snailjob.server.web.service.handler.JobHandler;
import com.aizuda.snailjob.server.web.service.handler.WorkflowHandler;
import com.aizuda.snailjob.server.web.util.UserSessionUtils;
import com.aizuda.snailjob.template.datasource.persistence.dataobject.WorkflowBatchResponseDO;
@ -36,6 +37,7 @@ import com.google.common.graph.MutableGraph;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.*;
import java.util.function.Function;
@ -51,7 +53,6 @@ import java.util.stream.Collectors;
@RequiredArgsConstructor
public class WorkflowBatchServiceImpl implements WorkflowBatchService {
private static final Integer NOT_HANDLE_STATUS = 99;
private static final Integer WORKFLOW_DECISION_FAILED_STATUS = 98;
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
private final WorkflowMapper workflowMapper;
@ -60,10 +61,11 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
private final WorkflowHandler workflowHandler;
private final WorkflowBatchHandler workflowBatchHandler;
private final JobMapper jobMapper;
private final JobHandler jobHandler;
private static boolean isNoOperation(JobTaskBatch i) {
return JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(i.getOperationReason())
|| i.getTaskBatchStatus() == JobTaskBatchStatusEnum.STOP.getStatus();
|| i.getTaskBatchStatus() == JobTaskBatchStatusEnum.STOP.getStatus();
}
@Override
@ -79,20 +81,20 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
}
QueryWrapper<WorkflowTaskBatch> wrapper = new QueryWrapper<WorkflowTaskBatch>()
.eq("batch.namespace_id", userSessionVO.getNamespaceId())
.eq(queryVO.getWorkflowId() != null, "batch.workflow_id", queryVO.getWorkflowId())
.in(CollUtil.isNotEmpty(groupNames), "batch.group_name", groupNames)
.eq(queryVO.getTaskBatchStatus() != null, "batch.task_batch_status", queryVO.getTaskBatchStatus())
.likeRight(StrUtil.isNotBlank(queryVO.getWorkflowName()), "flow.workflow_name", queryVO.getWorkflowName())
.between(ObjUtil.isNotNull(queryVO.getDatetimeRange()),
"batch.create_dt", queryVO.getStartDt(), queryVO.getEndDt())
.eq("batch.deleted", 0)
.orderByDesc("batch.id");
.eq("batch.namespace_id", userSessionVO.getNamespaceId())
.eq(queryVO.getWorkflowId() != null, "batch.workflow_id", queryVO.getWorkflowId())
.in(CollUtil.isNotEmpty(groupNames), "batch.group_name", groupNames)
.eq(queryVO.getTaskBatchStatus() != null, "batch.task_batch_status", queryVO.getTaskBatchStatus())
.likeRight(StrUtil.isNotBlank(queryVO.getWorkflowName()), "flow.workflow_name", queryVO.getWorkflowName())
.between(ObjUtil.isNotNull(queryVO.getDatetimeRange()),
"batch.create_dt", queryVO.getStartDt(), queryVO.getEndDt())
.eq("batch.deleted", 0)
.orderByDesc("batch.id");
List<WorkflowBatchResponseDO> batchResponseDOList = workflowTaskBatchMapper.selectWorkflowBatchPageList(pageDTO,
wrapper);
wrapper);
List<WorkflowBatchResponseVO> batchResponseVOList =
WorkflowConverter.INSTANCE.convertListToWorkflowBatchList(batchResponseDOList);
WorkflowConverter.INSTANCE.convertListToWorkflowBatchList(batchResponseDOList);
return new PageResult<>(pageDTO, batchResponseVOList);
}
@ -101,9 +103,9 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
public WorkflowDetailResponseVO getWorkflowBatchDetail(Long id) {
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne(
new LambdaQueryWrapper<WorkflowTaskBatch>()
.eq(WorkflowTaskBatch::getId, id)
.eq(WorkflowTaskBatch::getNamespaceId, UserSessionUtils.currentUserSession().getNamespaceId()));
new LambdaQueryWrapper<WorkflowTaskBatch>()
.eq(WorkflowTaskBatch::getId, id)
.eq(WorkflowTaskBatch::getNamespaceId, UserSessionUtils.currentUserSession().getNamespaceId()));
if (Objects.isNull(workflowTaskBatch)) {
return null;
}
@ -113,22 +115,22 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
WorkflowDetailResponseVO responseVO = WorkflowConverter.INSTANCE.convert(workflow);
responseVO.setWorkflowBatchStatus(workflowTaskBatch.getTaskBatchStatus());
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
.eq(WorkflowNode::getDeleted, StatusEnum.NO.getStatus())
.eq(WorkflowNode::getWorkflowId, workflow.getId()));
.eq(WorkflowNode::getDeleted, StatusEnum.NO.getStatus())
.eq(WorkflowNode::getWorkflowId, workflow.getId()));
List<Job> jobs = jobMapper.selectList(
new LambdaQueryWrapper<Job>()
.in(Job::getId, StreamUtils.toSet(workflowNodes, WorkflowNode::getJobId)));
new LambdaQueryWrapper<Job>()
.in(Job::getId, StreamUtils.toSet(workflowNodes, WorkflowNode::getJobId)));
Map<Long, Job> jobMap = StreamUtils.toIdentityMap(jobs, Job::getId);
List<JobTaskBatch> alJobTaskBatchList = jobTaskBatchMapper.selectList(
new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getWorkflowTaskBatchId, id)
.orderByDesc(JobTaskBatch::getId));
new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getWorkflowTaskBatchId, id)
.orderByDesc(JobTaskBatch::getId));
Map<Long, List<JobTaskBatch>> jobTaskBatchMap = StreamUtils.groupByKey(alJobTaskBatchList,
JobTaskBatch::getWorkflowNodeId);
JobTaskBatch::getWorkflowNodeId);
List<WorkflowDetailResponseVO.NodeInfo> nodeInfos = WorkflowConverter.INSTANCE.convertList(workflowNodes);
String flowInfo = workflowTaskBatch.getFlowInfo();
@ -136,49 +138,49 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
Set<Long> allNoOperationNode = Sets.newHashSet();
Map<Long, WorkflowDetailResponseVO.NodeInfo> workflowNodeMap = nodeInfos.stream()
.peek(nodeInfo -> {
.peek(nodeInfo -> {
JobTaskConfig jobTask = nodeInfo.getJobTask();
if (Objects.nonNull(jobTask)) {
jobTask.setJobName(jobMap.getOrDefault(jobTask.getJobId(), new Job()).getJobName());
}
JobTaskConfig jobTask = nodeInfo.getJobTask();
if (Objects.nonNull(jobTask)) {
jobTask.setJobName(jobMap.getOrDefault(jobTask.getJobId(), new Job()).getJobName());
}
List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMap.get(nodeInfo.getId());
if (CollUtil.isNotEmpty(jobTaskBatchList)) {
jobTaskBatchList = jobTaskBatchList.stream()
.sorted(Comparator.comparingInt(JobTaskBatch::getTaskBatchStatus))
.collect(Collectors.toList());
nodeInfo.setJobBatchList(
JobBatchResponseVOConverter.INSTANCE.convertListToJobBatchList(jobTaskBatchList));
List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMap.get(nodeInfo.getId());
if (CollUtil.isNotEmpty(jobTaskBatchList)) {
jobTaskBatchList = jobTaskBatchList.stream()
.sorted(Comparator.comparingInt(JobTaskBatch::getTaskBatchStatus))
.collect(Collectors.toList());
nodeInfo.setJobBatchList(
JobBatchResponseVOConverter.INSTANCE.convertListToJobBatchList(jobTaskBatchList));
// 取第最新的一条状态
JobTaskBatch jobTaskBatch = jobTaskBatchList.get(0);
if (JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason()
== jobTaskBatch.getOperationReason()) {
// 前端展示使用
nodeInfo.setTaskBatchStatus(WORKFLOW_DECISION_FAILED_STATUS);
} else {
nodeInfo.setTaskBatchStatus(jobTaskBatch.getTaskBatchStatus());
}
if (jobTaskBatchList.stream()
.filter(Objects::nonNull)
.anyMatch(WorkflowBatchServiceImpl::isNoOperation)) {
// 当前节点下面的所有节点都是无需处理的节点
Set<Long> allDescendants = MutableGraphCache.getAllDescendants(graph, nodeInfo.getId());
allNoOperationNode.addAll(allDescendants);
} else {
// 删除被误添加的节点
allNoOperationNode.remove(nodeInfo.getId());
}
// 取第最新的一条状态
JobTaskBatch jobTaskBatch = jobTaskBatchList.get(0);
if (JobOperationReasonEnum.WORKFLOW_DECISION_FAILED.getReason()
== jobTaskBatch.getOperationReason()) {
// 前端展示使用
nodeInfo.setTaskBatchStatus(WORKFLOW_DECISION_FAILED_STATUS);
} else {
nodeInfo.setTaskBatchStatus(jobTaskBatch.getTaskBatchStatus());
if (JobTaskBatchStatusEnum.NOT_SUCCESS.contains(workflowTaskBatch.getTaskBatchStatus())) {
allNoOperationNode.add(nodeInfo.getId());
}
}
if (jobTaskBatchList.stream()
.filter(Objects::nonNull)
.anyMatch(WorkflowBatchServiceImpl::isNoOperation)) {
// 当前节点下面的所有节点都是无需处理的节点
Set<Long> allDescendants = MutableGraphCache.getAllDescendants(graph, nodeInfo.getId());
allNoOperationNode.addAll(allDescendants);
} else {
// 删除被误添加的节点
allNoOperationNode.remove(nodeInfo.getId());
}
} else {
if (JobTaskBatchStatusEnum.NOT_SUCCESS.contains(workflowTaskBatch.getTaskBatchStatus())) {
allNoOperationNode.add(nodeInfo.getId());
}
}
})
.collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, Function.identity()));
})
.collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, Function.identity()));
for (Long noOperationNodeId : allNoOperationNode) {
WorkflowDetailResponseVO.NodeInfo nodeInfo = workflowNodeMap.get(noOperationNodeId);
@ -186,10 +188,10 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
if (CollUtil.isNotEmpty(jobTaskBatches)) {
jobTaskBatches = jobTaskBatches.stream()
.sorted(Comparator.comparingInt(JobTaskBatch::getTaskBatchStatus))
.collect(Collectors.toList());
.sorted(Comparator.comparingInt(JobTaskBatch::getTaskBatchStatus))
.collect(Collectors.toList());
nodeInfo.setJobBatchList(
JobBatchResponseVOConverter.INSTANCE.convertListToJobBatchList(jobTaskBatches));
JobBatchResponseVOConverter.INSTANCE.convertListToJobBatchList(jobTaskBatches));
} else {
JobBatchResponseVO jobBatchResponseVO = new JobBatchResponseVO();
JobTaskConfig jobTask = nodeInfo.getJobTask();
@ -207,7 +209,7 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
try {
// 反序列化构建图
WorkflowDetailResponseVO.NodeConfig config = workflowHandler.buildNodeConfig(graph, SystemConstants.ROOT,
new HashMap<>(), workflowNodeMap);
new HashMap<>(), workflowNodeMap);
responseVO.setNodeConfig(config);
} catch (Exception e) {
log.error("反序列化失败. json:[{}]", flowInfo, e);
@ -226,4 +228,29 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
return Boolean.TRUE;
}
@Override
@Transactional
public Boolean deleteByIds(Set<Long> ids) {
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
Assert.isTrue(ids.size() == workflowTaskBatchMapper.delete(
new LambdaQueryWrapper<WorkflowTaskBatch>()
.eq(WorkflowTaskBatch::getNamespaceId, namespaceId)
.in(WorkflowTaskBatch::getId, ids)
), () -> new SnailJobServerException("删除工作流任务失败, 请检查任务状态是否关闭状态"));
List<JobTaskBatch> jobTaskBatches = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getNamespaceId, namespaceId)
.in(JobTaskBatch::getWorkflowTaskBatchId, ids));
if (CollUtil.isEmpty(jobTaskBatches)) {
return Boolean.TRUE;
}
Set<Long> jobTaskBatchIds = StreamUtils.toSet(jobTaskBatches, JobTaskBatch::getId);
jobHandler.deleteJobTaskBatchByIds(jobTaskBatchIds, namespaceId);
return Boolean.TRUE;
}
}

View File

@ -256,14 +256,6 @@ public class WorkflowServiceImpl implements WorkflowService {
return 1 == workflowMapper.updateById(workflow);
}
@Override
public Boolean deleteById(Long id) {
Workflow workflow = new Workflow();
workflow.setId(id);
workflow.setDeleted(StatusEnum.YES.getStatus());
return 1 == workflowMapper.updateById(workflow);
}
@Override
public Boolean trigger(Long id) {
Workflow workflow = workflowMapper.selectById(id);
@ -357,6 +349,20 @@ public class WorkflowServiceImpl implements WorkflowService {
return JsonUtil.toJsonString(resultList);
}
@Override
public Boolean deleteByIds(Set<Long> ids) {
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
Assert.isTrue(ids.size() == workflowMapper.delete(
new LambdaQueryWrapper<Workflow>()
.eq(Workflow::getNamespaceId, namespaceId)
.eq(Workflow::getWorkflowStatus, StatusEnum.NO.getStatus())
.in(Workflow::getId, ids)
), () -> new SnailJobServerException("删除工作流任务失败, 请检查任务状态是否关闭状态"));
return Boolean.TRUE;
}
private void batchSaveWorkflowTask(final List<WorkflowRequestVO> workflowRequestVOList, final String namespaceId) {
Set<String> groupNameSet = StreamUtils.toSet(workflowRequestVOList, WorkflowRequestVO::getGroupName);