feat(sj_1.0.0): 工作流导入导出导入优化

This commit is contained in:
opensnail 2024-05-30 17:12:53 +08:00
parent b80da2d616
commit 553379b078
6 changed files with 171 additions and 115 deletions

View File

@ -7,6 +7,7 @@ import com.aizuda.snailjob.server.common.dto.DecisionConfig;
import com.aizuda.snailjob.server.web.annotation.LoginRequired; import com.aizuda.snailjob.server.web.annotation.LoginRequired;
import com.aizuda.snailjob.server.web.annotation.RoleEnum; import com.aizuda.snailjob.server.web.annotation.RoleEnum;
import com.aizuda.snailjob.server.web.model.base.PageResult; import com.aizuda.snailjob.server.web.model.base.PageResult;
import com.aizuda.snailjob.server.web.model.request.ExportWorkflowVO;
import com.aizuda.snailjob.server.web.model.request.WorkflowQueryVO; import com.aizuda.snailjob.server.web.model.request.WorkflowQueryVO;
import com.aizuda.snailjob.server.web.model.request.WorkflowRequestVO; import com.aizuda.snailjob.server.web.model.request.WorkflowRequestVO;
import com.aizuda.snailjob.server.web.model.response.WorkflowDetailResponseVO; import com.aizuda.snailjob.server.web.model.response.WorkflowDetailResponseVO;
@ -96,10 +97,6 @@ public class WorkflowController {
@LoginRequired @LoginRequired
@PostMapping(value = "/import", consumes = MediaType.MULTIPART_FORM_DATA_VALUE) @PostMapping(value = "/import", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public void importScene(@RequestPart("file") MultipartFile file) throws IOException { public void importScene(@RequestPart("file") MultipartFile file) throws IOException {
if (file.isEmpty()) {
throw new SnailJobCommonException("请选择一个文件上传");
}
// 写入数据 // 写入数据
workflowService.importWorkflowTask(ImportUtils.parseList(file, WorkflowRequestVO.class)); workflowService.importWorkflowTask(ImportUtils.parseList(file, WorkflowRequestVO.class));
} }
@ -107,8 +104,8 @@ public class WorkflowController {
@LoginRequired @LoginRequired
@PostMapping("/export") @PostMapping("/export")
@OriginalControllerReturnValue @OriginalControllerReturnValue
public ResponseEntity<String> export(@RequestBody Set<Long> workflowIds) { public ResponseEntity<String> export(@RequestBody ExportWorkflowVO exportWorkflowVO) {
return ExportUtils.doExport(workflowService.exportWorkflowTask(workflowIds)); return ExportUtils.doExport(workflowService.exportWorkflowTask(exportWorkflowVO));
} }
} }

View File

@ -0,0 +1,22 @@
package com.aizuda.snailjob.server.web.model.request;
import lombok.Data;
import java.util.Set;
/**
* @author: opensnail
* @date : 2024-05-30
* @since : sj_1.0.0
*/
@Data
public class ExportWorkflowVO {
private Set<Long> workflowIds;
private String groupName;
private String workflowName;
private Integer workflowStatus;
}

View File

@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.web.service;
import cn.hutool.core.lang.Pair; import cn.hutool.core.lang.Pair;
import com.aizuda.snailjob.server.common.dto.DecisionConfig; import com.aizuda.snailjob.server.common.dto.DecisionConfig;
import com.aizuda.snailjob.server.web.model.base.PageResult; import com.aizuda.snailjob.server.web.model.base.PageResult;
import com.aizuda.snailjob.server.web.model.request.ExportWorkflowVO;
import com.aizuda.snailjob.server.web.model.request.WorkflowQueryVO; import com.aizuda.snailjob.server.web.model.request.WorkflowQueryVO;
import com.aizuda.snailjob.server.web.model.request.WorkflowRequestVO; import com.aizuda.snailjob.server.web.model.request.WorkflowRequestVO;
import com.aizuda.snailjob.server.web.model.response.WorkflowDetailResponseVO; import com.aizuda.snailjob.server.web.model.response.WorkflowDetailResponseVO;
@ -12,7 +13,6 @@ import jakarta.validation.constraints.NotEmpty;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Set;
/** /**
* @author xiaowoniu * @author xiaowoniu
@ -41,5 +41,6 @@ public interface WorkflowService {
void importWorkflowTask(@Valid @NotEmpty(message = "导入数据不能为空") List<WorkflowRequestVO> requests); void importWorkflowTask(@Valid @NotEmpty(message = "导入数据不能为空") List<WorkflowRequestVO> requests);
String exportWorkflowTask(Set<Long> workflowIds); String exportWorkflowTask(ExportWorkflowVO exportWorkflowVO
);
} }

View File

@ -16,6 +16,7 @@ import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.server.common.config.SystemProperties; import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.dto.DecisionConfig; import com.aizuda.snailjob.server.common.dto.DecisionConfig;
import com.aizuda.snailjob.server.common.dto.JobTaskConfig; import com.aizuda.snailjob.server.common.dto.JobTaskConfig;
import com.aizuda.snailjob.server.common.dto.PartitionTask;
import com.aizuda.snailjob.server.common.enums.ExpressionTypeEnum; import com.aizuda.snailjob.server.common.enums.ExpressionTypeEnum;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum; import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
@ -23,12 +24,13 @@ import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
import com.aizuda.snailjob.server.common.util.CronUtils; import com.aizuda.snailjob.server.common.util.CronUtils;
import com.aizuda.snailjob.server.common.util.DateUtils; import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.common.util.GraphUtils; import com.aizuda.snailjob.server.common.util.GraphUtils;
import com.aizuda.snailjob.server.common.util.PartitionTaskUtils;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskPrepareDTO; import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskPrepareDTO;
import com.aizuda.snailjob.server.job.task.support.WorkflowPrePareHandler; import com.aizuda.snailjob.server.job.task.support.WorkflowPrePareHandler;
import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter; import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.snailjob.server.job.task.support.expression.ExpressionInvocationHandler; import com.aizuda.snailjob.server.job.task.support.expression.ExpressionInvocationHandler;
import com.aizuda.snailjob.server.web.model.base.PageResult; import com.aizuda.snailjob.server.web.model.base.PageResult;
import com.aizuda.snailjob.server.web.model.request.SceneConfigRequestVO; import com.aizuda.snailjob.server.web.model.request.ExportWorkflowVO;
import com.aizuda.snailjob.server.web.model.request.UserSessionVO; import com.aizuda.snailjob.server.web.model.request.UserSessionVO;
import com.aizuda.snailjob.server.web.model.request.WorkflowQueryVO; import com.aizuda.snailjob.server.web.model.request.WorkflowQueryVO;
import com.aizuda.snailjob.server.web.model.request.WorkflowRequestVO; import com.aizuda.snailjob.server.web.model.request.WorkflowRequestVO;
@ -36,12 +38,10 @@ import com.aizuda.snailjob.server.web.model.request.WorkflowRequestVO.NodeConfig
import com.aizuda.snailjob.server.web.model.response.WorkflowDetailResponseVO; import com.aizuda.snailjob.server.web.model.response.WorkflowDetailResponseVO;
import com.aizuda.snailjob.server.web.model.response.WorkflowResponseVO; import com.aizuda.snailjob.server.web.model.response.WorkflowResponseVO;
import com.aizuda.snailjob.server.web.service.WorkflowService; import com.aizuda.snailjob.server.web.service.WorkflowService;
import com.aizuda.snailjob.server.web.service.convert.SceneConfigConverter;
import com.aizuda.snailjob.server.web.service.convert.WorkflowConverter; import com.aizuda.snailjob.server.web.service.convert.WorkflowConverter;
import com.aizuda.snailjob.server.web.service.handler.WorkflowHandler; import com.aizuda.snailjob.server.web.service.handler.WorkflowHandler;
import com.aizuda.snailjob.server.web.util.UserSessionUtils; import com.aizuda.snailjob.server.web.util.UserSessionUtils;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate; import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.access.ConfigAccess;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowNodeMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowNodeMapper;
@ -53,13 +53,15 @@ import com.google.common.collect.Sets;
import com.google.common.graph.ElementOrder; import com.google.common.graph.ElementOrder;
import com.google.common.graph.GraphBuilder; import com.google.common.graph.GraphBuilder;
import com.google.common.graph.MutableGraph; import com.google.common.graph.MutableGraph;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.*; import java.util.*;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingDeque;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -95,8 +97,8 @@ public class WorkflowServiceImpl implements WorkflowService {
private static void checkExecuteInterval(WorkflowRequestVO requestVO) { private static void checkExecuteInterval(WorkflowRequestVO requestVO) {
if (Lists.newArrayList(WaitStrategies.WaitStrategyEnum.FIXED.getType(), if (Lists.newArrayList(WaitStrategies.WaitStrategyEnum.FIXED.getType(),
WaitStrategies.WaitStrategyEnum.RANDOM.getType()) WaitStrategies.WaitStrategyEnum.RANDOM.getType())
.contains(requestVO.getTriggerType())) { .contains(requestVO.getTriggerType())) {
if (Integer.parseInt(requestVO.getTriggerInterval()) < 10) { if (Integer.parseInt(requestVO.getTriggerInterval()) < 10) {
throw new SnailJobServerException("触发间隔不得小于10"); throw new SnailJobServerException("触发间隔不得小于10");
} }
@ -122,8 +124,8 @@ public class WorkflowServiceImpl implements WorkflowService {
workflow.setNextTriggerAt(calculateNextTriggerAt(workflowRequestVO, DateUtils.toNowMilli())); workflow.setNextTriggerAt(calculateNextTriggerAt(workflowRequestVO, DateUtils.toNowMilli()));
workflow.setFlowInfo(StrUtil.EMPTY); workflow.setFlowInfo(StrUtil.EMPTY);
workflow.setBucketIndex( workflow.setBucketIndex(
HashUtil.bkdrHash(workflowRequestVO.getGroupName() + workflowRequestVO.getWorkflowName()) HashUtil.bkdrHash(workflowRequestVO.getGroupName() + workflowRequestVO.getWorkflowName())
% systemProperties.getBucketTotal()); % systemProperties.getBucketTotal());
workflow.setNamespaceId(UserSessionUtils.currentUserSession().getNamespaceId()); workflow.setNamespaceId(UserSessionUtils.currentUserSession().getNamespaceId());
Assert.isTrue(1 == workflowMapper.insert(workflow), () -> new SnailJobServerException("新增工作流失败")); Assert.isTrue(1 == workflowMapper.insert(workflow), () -> new SnailJobServerException("新增工作流失败"));
@ -132,10 +134,10 @@ public class WorkflowServiceImpl implements WorkflowService {
// 递归构建图 // 递归构建图
workflowHandler.buildGraph(Lists.newArrayList(SystemConstants.ROOT), workflowHandler.buildGraph(Lists.newArrayList(SystemConstants.ROOT),
new LinkedBlockingDeque<>(), new LinkedBlockingDeque<>(),
workflowRequestVO.getGroupName(), workflowRequestVO.getGroupName(),
workflow.getId(), nodeConfig, graph, workflow.getId(), nodeConfig, graph,
workflow.getVersion()); workflow.getVersion());
log.info("图构建完成. graph:[{}]", graph); log.info("图构建完成. graph:[{}]", graph);
// 保存图信息 // 保存图信息
@ -147,61 +149,25 @@ public class WorkflowServiceImpl implements WorkflowService {
private MutableGraph<Long> createGraph() { private MutableGraph<Long> createGraph() {
return GraphBuilder.directed() return GraphBuilder.directed()
.nodeOrder(ElementOrder.sorted(Long::compare)) .nodeOrder(ElementOrder.sorted(Long::compare))
.incidentEdgeOrder(ElementOrder.stable()) .incidentEdgeOrder(ElementOrder.stable())
.allowsSelfLoops(false) .allowsSelfLoops(false)
.build(); .build();
} }
@Override @Override
public WorkflowDetailResponseVO getWorkflowDetail(Long id) { public WorkflowDetailResponseVO getWorkflowDetail(Long id) {
Workflow workflow = workflowMapper.selectOne( Workflow workflow = workflowMapper.selectOne(
new LambdaQueryWrapper<Workflow>() new LambdaQueryWrapper<Workflow>()
.eq(Workflow::getId, id) .eq(Workflow::getId, id)
.eq(Workflow::getNamespaceId, UserSessionUtils.currentUserSession().getNamespaceId()) .eq(Workflow::getNamespaceId, UserSessionUtils.currentUserSession().getNamespaceId())
); );
if (Objects.isNull(workflow)) { if (Objects.isNull(workflow)) {
return null; return null;
} }
WorkflowDetailResponseVO responseVO = WorkflowConverter.INSTANCE.convert(workflow); return doGetWorkflowDetail(workflow);
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
.eq(WorkflowNode::getDeleted, 0)
.eq(WorkflowNode::getVersion, workflow.getVersion())
.eq(WorkflowNode::getWorkflowId, id)
.orderByAsc(WorkflowNode::getPriorityLevel));
List<Long> jobIds = StreamUtils.toList(workflowNodes, WorkflowNode::getJobId);
List<Job> jobs = jobMapper.selectList(new LambdaQueryWrapper<Job>()
.in(Job::getId, new HashSet<>(jobIds)));
Map<Long, Job> jobMap = StreamUtils.toIdentityMap(jobs, Job::getId);
List<WorkflowDetailResponseVO.NodeInfo> nodeInfos = WorkflowConverter.INSTANCE.convertList(workflowNodes);
Map<Long, WorkflowDetailResponseVO.NodeInfo> workflowNodeMap = nodeInfos.stream()
.peek(nodeInfo -> {
JobTaskConfig jobTask = nodeInfo.getJobTask();
if (Objects.nonNull(jobTask)) {
jobTask.setJobName(jobMap.getOrDefault(jobTask.getJobId(), new Job()).getJobName());
}
}).collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, i -> i));
String flowInfo = workflow.getFlowInfo();
try {
MutableGraph<Long> graph = GraphUtils.deserializeJsonToGraph(flowInfo);
// 反序列化构建图
WorkflowDetailResponseVO.NodeConfig config = workflowHandler.buildNodeConfig(graph, SystemConstants.ROOT,
new HashMap<>(),
workflowNodeMap);
responseVO.setNodeConfig(config);
} catch (Exception e) {
log.error("反序列化失败. json:[{}]", flowInfo, e);
throw new SnailJobServerException("查询工作流详情失败");
}
return responseVO;
} }
@Override @Override
@ -210,15 +176,18 @@ public class WorkflowServiceImpl implements WorkflowService {
UserSessionVO userSessionVO = UserSessionUtils.currentUserSession(); UserSessionVO userSessionVO = UserSessionUtils.currentUserSession();
PageDTO<Workflow> page = workflowMapper.selectPage(pageDTO, PageDTO<Workflow> page = workflowMapper.selectPage(pageDTO,
new LambdaQueryWrapper<Workflow>() new LambdaQueryWrapper<Workflow>()
.eq(Workflow::getDeleted, StatusEnum.NO.getStatus()) .eq(Workflow::getDeleted, StatusEnum.NO.getStatus())
.eq(Workflow::getNamespaceId, userSessionVO.getNamespaceId()) .eq(Workflow::getNamespaceId, userSessionVO.getNamespaceId())
.eq(StrUtil.isNotBlank(queryVO.getGroupName()), Workflow::getGroupName, queryVO.getGroupName()) .eq(StrUtil.isNotBlank(queryVO.getGroupName()), Workflow::getGroupName, queryVO.getGroupName())
.like(StrUtil.isNotBlank(queryVO.getWorkflowName()), Workflow::getWorkflowName, queryVO.getWorkflowName()) .like(StrUtil.isNotBlank(queryVO.getWorkflowName()), Workflow::getWorkflowName,
.eq(Objects.nonNull(queryVO.getWorkflowStatus()), Workflow::getWorkflowStatus, queryVO.getWorkflowStatus()) queryVO.getWorkflowName())
.orderByDesc(Workflow::getId)); .eq(Objects.nonNull(queryVO.getWorkflowStatus()), Workflow::getWorkflowStatus,
queryVO.getWorkflowStatus())
.orderByDesc(Workflow::getId));
List<WorkflowResponseVO> jobResponseList = WorkflowConverter.INSTANCE.convertListToWorkflowList(page.getRecords()); List<WorkflowResponseVO> jobResponseList = WorkflowConverter.INSTANCE.convertListToWorkflowList(
page.getRecords());
return new PageResult<>(pageDTO, jobResponseList); return new PageResult<>(pageDTO, jobResponseList);
} }
@ -243,7 +212,7 @@ public class WorkflowServiceImpl implements WorkflowService {
int version = workflow.getVersion(); int version = workflow.getVersion();
// 递归构建图 // 递归构建图
workflowHandler.buildGraph(Lists.newArrayList(SystemConstants.ROOT), new LinkedBlockingDeque<>(), workflowHandler.buildGraph(Lists.newArrayList(SystemConstants.ROOT), new LinkedBlockingDeque<>(),
workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph, version + 1); workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph, version + 1);
log.info("图构建完成. graph:[{}]", graph); log.info("图构建完成. graph:[{}]", graph);
@ -254,11 +223,11 @@ public class WorkflowServiceImpl implements WorkflowService {
workflow.setNextTriggerAt(calculateNextTriggerAt(workflowRequestVO, DateUtils.toNowMilli())); workflow.setNextTriggerAt(calculateNextTriggerAt(workflowRequestVO, DateUtils.toNowMilli()));
workflow.setFlowInfo(JsonUtil.toJsonString(GraphUtils.serializeGraphToJson(graph))); workflow.setFlowInfo(JsonUtil.toJsonString(GraphUtils.serializeGraphToJson(graph)));
Assert.isTrue( Assert.isTrue(
workflowMapper.update(workflow, workflowMapper.update(workflow,
new LambdaQueryWrapper<Workflow>() new LambdaQueryWrapper<Workflow>()
.eq(Workflow::getId, workflow.getId()) .eq(Workflow::getId, workflow.getId())
.eq(Workflow::getVersion, version)) > 0, .eq(Workflow::getVersion, version)) > 0,
() -> new SnailJobServerException("更新失败")); () -> new SnailJobServerException("更新失败"));
return Boolean.TRUE; return Boolean.TRUE;
} }
@ -266,9 +235,9 @@ public class WorkflowServiceImpl implements WorkflowService {
@Override @Override
public Boolean updateStatus(Long id) { public Boolean updateStatus(Long id) {
Workflow workflow = workflowMapper.selectOne( Workflow workflow = workflowMapper.selectOne(
new LambdaQueryWrapper<Workflow>() new LambdaQueryWrapper<Workflow>()
.select(Workflow::getId, Workflow::getWorkflowStatus) .select(Workflow::getId, Workflow::getWorkflowStatus)
.eq(Workflow::getId, id)); .eq(Workflow::getId, id));
Assert.notNull(workflow, () -> new SnailJobServerException("工作流不存在")); Assert.notNull(workflow, () -> new SnailJobServerException("工作流不存在"));
if (Objects.equals(workflow.getWorkflowStatus(), StatusEnum.NO.getStatus())) { if (Objects.equals(workflow.getWorkflowStatus(), StatusEnum.NO.getStatus())) {
@ -294,13 +263,14 @@ public class WorkflowServiceImpl implements WorkflowService {
Assert.notNull(workflow, () -> new SnailJobServerException("workflow can not be null.")); Assert.notNull(workflow, () -> new SnailJobServerException("workflow can not be null."));
long count = accessTemplate.getGroupConfigAccess().count( long count = accessTemplate.getGroupConfigAccess().count(
new LambdaQueryWrapper<GroupConfig>() new LambdaQueryWrapper<GroupConfig>()
.eq(GroupConfig::getGroupName, workflow.getGroupName()) .eq(GroupConfig::getGroupName, workflow.getGroupName())
.eq(GroupConfig::getNamespaceId, workflow.getNamespaceId()) .eq(GroupConfig::getNamespaceId, workflow.getNamespaceId())
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()) .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
); );
Assert.isTrue(count > 0, () -> new SnailJobServerException("组:[{}]已经关闭,不支持手动执行.", workflow.getGroupName())); Assert.isTrue(count > 0,
() -> new SnailJobServerException("组:[{}]已经关闭,不支持手动执行.", workflow.getGroupName()));
WorkflowTaskPrepareDTO prepareDTO = WorkflowTaskConverter.INSTANCE.toWorkflowTaskPrepareDTO(workflow); WorkflowTaskPrepareDTO prepareDTO = WorkflowTaskConverter.INSTANCE.toWorkflowTaskPrepareDTO(workflow);
// 设置now表示立即执行 // 设置now表示立即执行
@ -315,13 +285,13 @@ public class WorkflowServiceImpl implements WorkflowService {
@Override @Override
public List<WorkflowResponseVO> getWorkflowNameList(String keywords, Long workflowId) { public List<WorkflowResponseVO> getWorkflowNameList(String keywords, Long workflowId) {
PageDTO<Workflow> selectPage = workflowMapper.selectPage( PageDTO<Workflow> selectPage = workflowMapper.selectPage(
new PageDTO<>(1, 20), new PageDTO<>(1, 20),
new LambdaQueryWrapper<Workflow>() new LambdaQueryWrapper<Workflow>()
.select(Workflow::getId, Workflow::getWorkflowName) .select(Workflow::getId, Workflow::getWorkflowName)
.likeRight(StrUtil.isNotBlank(keywords), Workflow::getWorkflowName, StrUtil.trim(keywords)) .likeRight(StrUtil.isNotBlank(keywords), Workflow::getWorkflowName, StrUtil.trim(keywords))
.eq(Objects.nonNull(workflowId), Workflow::getId, workflowId) .eq(Objects.nonNull(workflowId), Workflow::getId, workflowId)
.eq(Workflow::getDeleted, StatusEnum.NO.getStatus()) .eq(Workflow::getDeleted, StatusEnum.NO.getStatus())
.orderByAsc(Workflow::getId)); .orderByAsc(Workflow::getId));
return WorkflowConverter.INSTANCE.convertListToWorkflowList(selectPage.getRecords()); return WorkflowConverter.INSTANCE.convertListToWorkflowList(selectPage.getRecords());
} }
@ -349,33 +319,48 @@ public class WorkflowServiceImpl implements WorkflowService {
} }
@Override @Override
public String exportWorkflowTask(Set<Long> workflowIds) { public String exportWorkflowTask(ExportWorkflowVO exportVO) {
List<Workflow> workflowList = workflowMapper.selectList(new LambdaQueryWrapper<Workflow>()
.eq(Workflow::getNamespaceId, UserSessionUtils.currentUserSession().getNamespaceId())
.eq(Workflow::getDeleted, StatusEnum.NO.getStatus())
// TODO 若导出全部需要分页查询避免一次拉取太多数据
.in(CollUtil.isNotEmpty(workflowIds), Workflow::getId, workflowIds)
);
List<WorkflowDetailResponseVO> workflowDetailResponseVOList = workflowList.stream().map(i -> getWorkflowDetail(i.getId())).collect(Collectors.toList()); List<WorkflowDetailResponseVO> resultList = new ArrayList<>();
return JsonUtil.toJsonString(workflowDetailResponseVOList); PartitionTaskUtils.process(startId -> {
List<Workflow> workflowList = workflowMapper.selectPage(new PageDTO<>(0, 100),
new LambdaQueryWrapper<Workflow>()
.eq(Workflow::getNamespaceId, UserSessionUtils.currentUserSession().getNamespaceId())
.eq(Workflow::getDeleted, StatusEnum.NO.getStatus())
.eq(StrUtil.isNotBlank(exportVO.getGroupName()), Workflow::getGroupName, exportVO.getGroupName())
.eq(Objects.nonNull(exportVO.getWorkflowStatus()), Workflow::getWorkflowStatus, exportVO.getWorkflowStatus())
.likeRight(StrUtil.isNotBlank(exportVO.getWorkflowName()), Workflow::getWorkflowName, exportVO.getWorkflowName())
.in(CollUtil.isNotEmpty(exportVO.getWorkflowIds()), Workflow::getId, exportVO.getWorkflowIds())
.ge(Workflow::getId, startId)
.orderByAsc(Workflow::getId)
).getRecords();
return workflowList.stream()
.map(this::doGetWorkflowDetail)
.map(WorkflowPartitionTask::new)
.collect(Collectors.toList());
}, partitionTasks -> {
List<WorkflowPartitionTask> workflowPartitionTasks = (List<WorkflowPartitionTask>) partitionTasks;
resultList.addAll(StreamUtils.toList(workflowPartitionTasks, WorkflowPartitionTask::getResponseVO));
}, 0);
return JsonUtil.toJsonString(resultList);
} }
private void batchSaveWorkflowTask(final List<WorkflowRequestVO> workflowRequestVOList, final String namespaceId) { private void batchSaveWorkflowTask(final List<WorkflowRequestVO> workflowRequestVOList, final String namespaceId) {
Set<String> groupNameSet = workflowRequestVOList.stream().map(i -> i.getGroupName()).collect(Collectors.toSet()); Set<String> groupNameSet =StreamUtils.toSet(workflowRequestVOList, WorkflowRequestVO::getGroupName);
List<GroupConfig> groupConfigs = accessTemplate.getGroupConfigAccess() List<GroupConfig> groupConfigs = accessTemplate.getGroupConfigAccess()
.list(new LambdaQueryWrapper<GroupConfig>() .list(new LambdaQueryWrapper<GroupConfig>()
.select(GroupConfig::getGroupName) .select(GroupConfig::getGroupName)
.eq(GroupConfig::getNamespaceId, namespaceId) .eq(GroupConfig::getNamespaceId, namespaceId)
.in(GroupConfig::getGroupName, groupNameSet) .in(GroupConfig::getGroupName, groupNameSet)
); );
Sets.SetView<String> notExistedGroupNameSet = Sets.difference(groupNameSet, Sets.SetView<String> notExistedGroupNameSet = Sets.difference(groupNameSet,
StreamUtils.toSet(groupConfigs, GroupConfig::getGroupName)); StreamUtils.toSet(groupConfigs, GroupConfig::getGroupName));
Assert.isTrue(CollUtil.isEmpty(notExistedGroupNameSet), Assert.isTrue(CollUtil.isEmpty(notExistedGroupNameSet),
() -> new SnailJobServerException("导入失败. 原因: 组{}不存在", notExistedGroupNameSet)); () -> new SnailJobServerException("导入失败. 原因: 组{}不存在", notExistedGroupNameSet));
for (final WorkflowRequestVO workflowRequestVO : workflowRequestVOList) { for (final WorkflowRequestVO workflowRequestVO : workflowRequestVOList) {
checkExecuteInterval(workflowRequestVO); checkExecuteInterval(workflowRequestVO);
@ -383,4 +368,54 @@ public class WorkflowServiceImpl implements WorkflowService {
saveWorkflow(workflowRequestVO); saveWorkflow(workflowRequestVO);
} }
} }
private WorkflowDetailResponseVO doGetWorkflowDetail(final Workflow workflow) {
WorkflowDetailResponseVO responseVO = WorkflowConverter.INSTANCE.convert(workflow);
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
.eq(WorkflowNode::getDeleted, 0)
.eq(WorkflowNode::getVersion, workflow.getVersion())
.eq(WorkflowNode::getWorkflowId, workflow.getId())
.orderByAsc(WorkflowNode::getPriorityLevel));
List<Long> jobIds = StreamUtils.toList(workflowNodes, WorkflowNode::getJobId);
List<Job> jobs = jobMapper.selectList(new LambdaQueryWrapper<Job>()
.in(Job::getId, new HashSet<>(jobIds)));
Map<Long, Job> jobMap = StreamUtils.toIdentityMap(jobs, Job::getId);
List<WorkflowDetailResponseVO.NodeInfo> nodeInfos = WorkflowConverter.INSTANCE.convertList(workflowNodes);
Map<Long, WorkflowDetailResponseVO.NodeInfo> workflowNodeMap = nodeInfos.stream()
.peek(nodeInfo -> {
JobTaskConfig jobTask = nodeInfo.getJobTask();
if (Objects.nonNull(jobTask)) {
jobTask.setJobName(jobMap.getOrDefault(jobTask.getJobId(), new Job()).getJobName());
}
}).collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, i -> i));
String flowInfo = workflow.getFlowInfo();
try {
MutableGraph<Long> graph = GraphUtils.deserializeJsonToGraph(flowInfo);
// 反序列化构建图
WorkflowDetailResponseVO.NodeConfig config = workflowHandler.buildNodeConfig(graph, SystemConstants.ROOT,
new HashMap<>(),
workflowNodeMap);
responseVO.setNodeConfig(config);
} catch (Exception e) {
log.error("反序列化失败. json:[{}]", flowInfo, e);
throw new SnailJobServerException("查询工作流详情失败");
}
return responseVO;
}
@EqualsAndHashCode(callSuper = true)
@Getter
private static class WorkflowPartitionTask extends PartitionTask {
private final WorkflowDetailResponseVO responseVO;
public WorkflowPartitionTask(@NotNull WorkflowDetailResponseVO responseVO) {
this.responseVO = responseVO;
setId(responseVO.getId());
}
}
} }

View File

@ -6,7 +6,9 @@ import org.springframework.http.ResponseEntity;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
public class ExportUtils {
public final class ExportUtils {
private ExportUtils() {}
public static ResponseEntity<String> doExport(String json) { public static ResponseEntity<String> doExport(String json) {
HttpHeaders headers = new HttpHeaders(); HttpHeaders headers = new HttpHeaders();

View File

@ -10,9 +10,10 @@ import org.springframework.web.multipart.MultipartFile;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
public class ImportUtils { public final class ImportUtils {
private static final List<String> FILE_EXTENSIONS = List.of("json"); private static final List<String> FILE_EXTENSIONS = List.of("json");
private ImportUtils() {}
public static @NotNull <VO> List<VO> parseList(MultipartFile file, Class<VO> clazz) throws IOException { public static @NotNull <VO> List<VO> parseList(MultipartFile file, Class<VO> clazz) throws IOException {
if (file.isEmpty()) { if (file.isEmpty()) {
@ -26,9 +27,7 @@ public class ImportUtils {
} }
JsonNode node = JsonUtil.toJson(file.getBytes()); JsonNode node = JsonUtil.toJson(file.getBytes());
List<VO> requestList = JsonUtil.parseList(JsonUtil.toJsonString(node), clazz); return JsonUtil.parseList(JsonUtil.toJsonString(node), clazz);
return requestList;
} }
} }