工作流版本管理 up
This commit is contained in:
parent
3f68c1cba8
commit
cff3edb710
@ -2,8 +2,10 @@ package com.aizuda.snailjob.template.datasource.persistence.mapper;
|
||||
|
||||
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowHistory;
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
import java.util.List;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
|
||||
import org.apache.ibatis.annotations.Select;
|
||||
|
||||
/**
|
||||
* @author zjw
|
||||
@ -12,4 +14,12 @@ import org.apache.ibatis.annotations.Mapper;
|
||||
*/
|
||||
@Mapper
|
||||
public interface WorkflowHistoryMapper extends BaseMapper<WorkflowHistory> {
|
||||
List<WorkflowHistory> selectVersionList(@Param("workflowId") Long workflowId);
|
||||
|
||||
/**
|
||||
* 查询指定workflowId的最大版本号
|
||||
*/
|
||||
@Select("SELECT MAX(version) FROM sj_workflow_history WHERE workflow_id = #{workflowId}")
|
||||
Integer selectMaxVersionByWorkflowId(@Param("workflowId") Long workflowId);
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,8 @@
|
||||
package com.aizuda.snailjob.template.datasource.persistence.mapper;
|
||||
|
||||
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowNodeHistory;
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
|
||||
|
||||
public interface WorkflowNodeHistoryMapper extends BaseMapper<WorkflowNodeHistory> {
|
||||
}
|
||||
@ -111,4 +111,8 @@ public class Workflow extends CreateUpdateDt {
|
||||
* 通知告警场景配置id列表
|
||||
*/
|
||||
private String notifyIds;
|
||||
/**
|
||||
* 负责人id
|
||||
*/
|
||||
private Long ownerId;
|
||||
}
|
||||
|
||||
@ -17,96 +17,18 @@ import lombok.EqualsAndHashCode;
|
||||
@Data
|
||||
@TableName("sj_workflow_history")
|
||||
@EqualsAndHashCode(callSuper=true)
|
||||
public class WorkflowHistory extends CreateUpdateDt{
|
||||
public class WorkflowHistory extends Workflow{
|
||||
|
||||
/**
|
||||
* 主键
|
||||
* 原工作流ID
|
||||
*/
|
||||
@TableId(value = "id")
|
||||
private Long id;
|
||||
|
||||
private Long workflowId;
|
||||
/**
|
||||
* 工作流名称
|
||||
* // 版本标签
|
||||
*/
|
||||
private String workflowName;
|
||||
|
||||
private String versionTag;
|
||||
/**
|
||||
* 命名空间id
|
||||
*/
|
||||
private String namespaceId;
|
||||
|
||||
/**
|
||||
* 组名称
|
||||
*/
|
||||
private String groupName;
|
||||
|
||||
/**
|
||||
* 触发类型
|
||||
*/
|
||||
private Integer triggerType;
|
||||
|
||||
/**
|
||||
* 阻塞策略
|
||||
*/
|
||||
private Integer blockStrategy;
|
||||
|
||||
/**
|
||||
* 触发间隔
|
||||
*/
|
||||
private String triggerInterval;
|
||||
|
||||
/**
|
||||
* 执行超时时间
|
||||
*/
|
||||
private Integer executorTimeout;
|
||||
|
||||
/**
|
||||
* 工作流状态 0、关闭、1、开启
|
||||
*/
|
||||
private Integer workflowStatus;
|
||||
|
||||
/**
|
||||
* 任务执行时间
|
||||
*/
|
||||
private Long nextTriggerAt;
|
||||
|
||||
/**
|
||||
* 流程信息
|
||||
*/
|
||||
private String flowInfo;
|
||||
|
||||
/**
|
||||
* bucket
|
||||
*/
|
||||
private Integer bucketIndex;
|
||||
|
||||
/**
|
||||
* 描述
|
||||
*/
|
||||
private String description;
|
||||
|
||||
/**
|
||||
* 工作流上下文
|
||||
*/
|
||||
private String wfContext;
|
||||
|
||||
/**
|
||||
* 版本号
|
||||
*/
|
||||
private Integer version;
|
||||
|
||||
/**
|
||||
* 扩展字段
|
||||
*/
|
||||
private String extAttrs;
|
||||
|
||||
/**
|
||||
* 逻辑删除 1、删除
|
||||
*/
|
||||
private Integer deleted;
|
||||
|
||||
/**
|
||||
* 通知告警场景配置id列表
|
||||
*/
|
||||
private String notifyIds;
|
||||
是否当前版本
|
||||
* */
|
||||
private Integer isCurrent;
|
||||
}
|
||||
|
||||
@ -0,0 +1,14 @@
|
||||
package com.aizuda.snailjob.template.datasource.persistence.po;
|
||||
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
|
||||
@Data
|
||||
@TableName("sj_workflow_node_history")
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
public class WorkflowNodeHistory extends WorkflowNode {
|
||||
private Long workflowId;
|
||||
private String versionTag;
|
||||
}
|
||||
@ -142,4 +142,12 @@ public class WorkflowRequestVO {
|
||||
* 通知告警场景配置id列表
|
||||
*/
|
||||
private Set<Long> notifyIds;
|
||||
|
||||
/**
|
||||
* 版本标签
|
||||
*/
|
||||
private String versionTag;
|
||||
|
||||
private Boolean saveAsNewVersion; // 是否保存为新版本
|
||||
|
||||
}
|
||||
|
||||
@ -9,6 +9,7 @@ import com.aizuda.snailjob.server.web.model.base.PageResult;
|
||||
import com.aizuda.snailjob.server.web.model.request.*;
|
||||
import com.aizuda.snailjob.server.common.vo.WorkflowDetailResponseVO;
|
||||
import com.aizuda.snailjob.server.common.vo.WorkflowResponseVO;
|
||||
import com.aizuda.snailjob.server.web.model.response.WorkflowVersionVO;
|
||||
import com.aizuda.snailjob.server.web.service.WorkflowService;
|
||||
import com.aizuda.snailjob.server.web.util.ExportUtils;
|
||||
import com.aizuda.snailjob.server.web.util.ImportUtils;
|
||||
@ -78,6 +79,39 @@ public class WorkflowController {
|
||||
return workflowService.deleteHistoryById(id,version);
|
||||
}
|
||||
|
||||
//20251124 新的版本管理请求
|
||||
/**
|
||||
* 获取工作流版本列表
|
||||
*/
|
||||
@GetMapping("/{workflowId}/versions")
|
||||
@LoginRequired(role = RoleEnum.USER)
|
||||
public List<WorkflowVersionVO> getVersionList(@PathVariable Long workflowId) {
|
||||
return workflowService.getVersionList(workflowId);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 切换到指定版本
|
||||
*/
|
||||
@PutMapping("/{workflowId}/switch-version")
|
||||
@LoginRequired(role = RoleEnum.USER)
|
||||
public Boolean switchVersion(
|
||||
@PathVariable Long workflowId,
|
||||
@RequestBody SwitchVersionRequest request) {
|
||||
return workflowService.switchVersion(workflowId, request.getVersionTag());
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取指定版本的工作流详情
|
||||
*/
|
||||
@GetMapping("/{workflowId}/versions/{versionTag}")
|
||||
@LoginRequired(role = RoleEnum.USER)
|
||||
public WorkflowDetailResponseVO getVersionDetail(
|
||||
@PathVariable Long workflowId,
|
||||
@PathVariable String versionTag) {
|
||||
return workflowService.getVersionDetail(workflowId, versionTag);
|
||||
}
|
||||
|
||||
|
||||
@PutMapping
|
||||
@LoginRequired(role = RoleEnum.USER)
|
||||
|
||||
@ -0,0 +1,10 @@
|
||||
package com.aizuda.snailjob.server.web.model.request;
|
||||
|
||||
import lombok.Data;
|
||||
import jakarta.validation.constraints.NotBlank;
|
||||
|
||||
@Data
|
||||
public class SwitchVersionRequest {
|
||||
@NotBlank(message = "版本标签不能为空")
|
||||
private String versionTag;
|
||||
}
|
||||
@ -0,0 +1,12 @@
|
||||
package com.aizuda.snailjob.server.web.model.response;
|
||||
|
||||
import lombok.Data;
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
@Data
|
||||
public class WorkflowVersionVO {
|
||||
private String versionTag;
|
||||
private Integer version;
|
||||
private Integer isCurrent;
|
||||
private LocalDateTime createDt;
|
||||
}
|
||||
@ -6,6 +6,7 @@ import com.aizuda.snailjob.server.web.model.base.PageResult;
|
||||
import com.aizuda.snailjob.server.web.model.request.*;
|
||||
import com.aizuda.snailjob.server.common.vo.WorkflowDetailResponseVO;
|
||||
import com.aizuda.snailjob.server.common.vo.WorkflowResponseVO;
|
||||
import com.aizuda.snailjob.server.web.model.response.WorkflowVersionVO;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowHistory;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
@ -49,4 +50,21 @@ public interface WorkflowService {
|
||||
WorkflowDetailResponseVO getWorkflowHistoryDetail(Long id, String version);
|
||||
|
||||
Boolean deleteHistoryById(Long id, String version);
|
||||
|
||||
/**
|
||||
* 获取工作流的所有版本列表
|
||||
*/
|
||||
List<WorkflowVersionVO> getVersionList(Long workflowId);
|
||||
|
||||
/**
|
||||
* 切换到指定版本
|
||||
*/
|
||||
Boolean switchVersion(Long workflowId, String versionTag);
|
||||
|
||||
/**
|
||||
* 获取指定版本的工作流详情
|
||||
*/
|
||||
|
||||
WorkflowDetailResponseVO getVersionDetail(Long workflowId, String versionTag);
|
||||
|
||||
}
|
||||
|
||||
@ -35,6 +35,7 @@ import com.aizuda.snailjob.server.common.vo.request.WorkflowRequestVO.NodeConfig
|
||||
import com.aizuda.snailjob.server.common.vo.WorkflowDetailResponseVO;
|
||||
import com.aizuda.snailjob.server.common.vo.WorkflowResponseVO;
|
||||
import com.aizuda.snailjob.server.web.model.response.GroupConfigResponseVO;
|
||||
import com.aizuda.snailjob.server.web.model.response.WorkflowVersionVO;
|
||||
import com.aizuda.snailjob.server.web.service.GroupConfigService;
|
||||
import com.aizuda.snailjob.server.web.service.WorkflowService;
|
||||
import com.aizuda.snailjob.server.common.convert.WorkflowConverter;
|
||||
@ -45,6 +46,7 @@ import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.mapper.*;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.po.*;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.graph.ElementOrder;
|
||||
@ -60,8 +62,6 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.stream.Collectors;
|
||||
@ -78,7 +78,6 @@ import java.util.stream.Collectors;
|
||||
public class WorkflowServiceImpl implements WorkflowService {
|
||||
|
||||
private final WorkflowMapper workflowMapper;
|
||||
private final WorkflowHistoryMapper workflowHistoryMapper;
|
||||
private final WorkflowNodeMapper workflowNodeMapper;
|
||||
private final SystemProperties systemProperties;
|
||||
private final WorkflowHandler workflowHandler;
|
||||
@ -88,6 +87,10 @@ public class WorkflowServiceImpl implements WorkflowService {
|
||||
private final GroupHandler groupHandler;
|
||||
private final JobSummaryMapper jobSummaryMapper;
|
||||
private final GroupConfigService groupConfigService;
|
||||
|
||||
//202511024 add
|
||||
private final WorkflowHistoryMapper workflowHistoryMapper;
|
||||
private final WorkflowNodeHistoryMapper workflowNodeHistoryMapper;
|
||||
@Autowired
|
||||
private VariableReplacementUtil variableReplacementUtil;
|
||||
private static Long calculateNextTriggerAt(final WorkflowRequestVO workflowRequestVO, Long time) {
|
||||
@ -151,13 +154,8 @@ public class WorkflowServiceImpl implements WorkflowService {
|
||||
workflow.setFlowInfo(JsonUtil.toJsonString(GraphUtils.serializeGraphToJson(graph)));
|
||||
Assert.isTrue(1 == workflowMapper.updateById(workflow), () -> new SnailJobServerException("Failed to save workflow graph"));
|
||||
|
||||
//准备数据到履历表add 20250519
|
||||
WorkflowHistory history = new WorkflowHistory();
|
||||
Workflow workflow1 = workflowMapper.selectById(workflow.getId());
|
||||
BeanUtils.copyProperties(workflow1, history);
|
||||
|
||||
Assert.isTrue(1 == workflowHistoryMapper.insert(history), () -> new SnailJobServerException("Failed to save workflowHistory graph"));
|
||||
|
||||
//20251124 历史版本修改
|
||||
saveToHistory(workflow, workflowRequestVO.getVersionTag(),1,true);
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -239,10 +237,35 @@ public class WorkflowServiceImpl implements WorkflowService {
|
||||
// 获取DAG节点配置
|
||||
NodeConfig nodeConfig = workflowRequestVO.getNodeConfig();
|
||||
|
||||
//取履历表中 最大得版本号 + 1
|
||||
int version = workflow.getVersion();
|
||||
int version_max = workflowHistoryMapper.selectMaxVersionByWorkflowId(workflow.getId());
|
||||
|
||||
//node表版本号 当前版本
|
||||
int node_version = version;
|
||||
// 判断是否保存为新版本
|
||||
boolean saveAsNewVersion = Optional.ofNullable(workflowRequestVO.getSaveAsNewVersion()).orElse(false);
|
||||
//当前版本
|
||||
if (!saveAsNewVersion) {
|
||||
// 保存当前版本到历史表
|
||||
saveToHistory(workflow, null, version, false);
|
||||
}else{//新版本
|
||||
///先把version更成最大版本 号
|
||||
node_version = version_max + 1;
|
||||
version = version_max;
|
||||
LambdaUpdateWrapper<Workflow> updateWrapper = new LambdaUpdateWrapper<>();
|
||||
updateWrapper.eq(Workflow::getId, workflow.getId());
|
||||
updateWrapper.eq(Workflow::getVersion, workflow.getVersion());
|
||||
updateWrapper.set(Workflow::getVersion, version_max);
|
||||
Assert.isTrue(workflowMapper.update(null, updateWrapper) > 0,
|
||||
() -> new SnailJobServerException("版本更新失败"));
|
||||
workflow.setVersion(version_max);
|
||||
}
|
||||
|
||||
|
||||
// 递归构建图
|
||||
workflowHandler.buildGraph(Lists.newArrayList(SystemConstants.ROOT), new LinkedBlockingDeque<>(),
|
||||
workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph, version + 1);
|
||||
workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph, node_version);
|
||||
|
||||
log.info("Graph construction complete. graph:[{}]", graph);
|
||||
|
||||
@ -255,7 +278,13 @@ public class WorkflowServiceImpl implements WorkflowService {
|
||||
// 保存图信息
|
||||
workflow = WorkflowConverter.INSTANCE.convert(workflowRequestVO);
|
||||
workflow.setId(workflowRequestVO.getId());
|
||||
workflow.setVersion(version);
|
||||
// workflow.setVersion(version);
|
||||
// 关键:如果保存到当前版本,将 version 设为 null
|
||||
if (!saveAsNewVersion) {
|
||||
workflow.setVersion(null); // 不更新 version 字段
|
||||
}else{
|
||||
workflow.setVersion(version_max);
|
||||
}
|
||||
workflow.setNextTriggerAt(calculateNextTriggerAt(workflowRequestVO, DateUtils.toNowMilli()));
|
||||
workflow.setFlowInfo(JsonUtil.toJsonString(GraphUtils.serializeGraphToJson(graph)));
|
||||
// 不允许更新组
|
||||
@ -269,11 +298,18 @@ public class WorkflowServiceImpl implements WorkflowService {
|
||||
|
||||
//工作流表更新后插入履历表
|
||||
//准备数据到履历表add 20250520
|
||||
WorkflowHistory history = new WorkflowHistory();
|
||||
Workflow workflow1 = workflowMapper.selectById(workflow.getId());
|
||||
BeanUtils.copyProperties(workflow1, history);
|
||||
history.setCreateDt(LocalDateTime.now());
|
||||
Assert.isTrue(1 == workflowHistoryMapper.insert(history), () -> new SnailJobServerException("Failed to save workflowHistory graph"));
|
||||
// WorkflowHistory history = new WorkflowHistory();
|
||||
// Workflow workflow1 = workflowMapper.selectById(workflow.getId());
|
||||
// BeanUtils.copyProperties(workflow1, history);
|
||||
// history.setCreateDt(LocalDateTime.now());
|
||||
//20251124 add
|
||||
// 如果是保存新版本,保存到历史表
|
||||
if (saveAsNewVersion) {
|
||||
// saveToHistory(workflow, workflowRequestVO.getVersionTag(), version + 1);
|
||||
Workflow updatedWorkflow = workflowMapper.selectById(workflowRequestVO.getId());
|
||||
saveToHistory(updatedWorkflow, workflowRequestVO.getVersionTag(),updatedWorkflow.getVersion(), true);
|
||||
}
|
||||
|
||||
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
@ -520,4 +556,264 @@ public class WorkflowServiceImpl implements WorkflowService {
|
||||
setId(responseVO.getId());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public Boolean switchVersion(Long workflowId, String versionTag) {
|
||||
// 1. 查询目标历史版本
|
||||
WorkflowHistory history = workflowHistoryMapper.selectOne(
|
||||
new LambdaQueryWrapper<WorkflowHistory>()
|
||||
.eq(WorkflowHistory::getWorkflowId, workflowId)
|
||||
.eq(WorkflowHistory::getVersionTag, versionTag)
|
||||
);
|
||||
|
||||
Assert.notNull(history, () -> new SnailJobServerException("版本不存在"));
|
||||
|
||||
// 2. 获取当前工作流
|
||||
Workflow currentWorkflow = workflowMapper.selectById(workflowId);
|
||||
int currentVersion = currentWorkflow.getVersion();
|
||||
|
||||
// 3. 先保存当前版本到历史(如果还没保存)
|
||||
// String currentVersionTag = "v" + currentVersion;
|
||||
// Long existCount = workflowHistoryMapper.selectCount(
|
||||
// new LambdaQueryWrapper<WorkflowHistory>()
|
||||
// .eq(WorkflowHistory::getWorkflowId, workflowId)
|
||||
// .eq(WorkflowHistory::getVersionTag, currentVersionTag)
|
||||
// );
|
||||
//
|
||||
// if (existCount == 0) {
|
||||
// saveToHistory(currentWorkflow, currentVersionTag,currentVersion);
|
||||
// }
|
||||
|
||||
// 4. 复制历史版本的完整数据到主表
|
||||
Workflow workflow = new Workflow();
|
||||
BeanUtils.copyProperties(history, workflow);
|
||||
|
||||
// 关键: 保持ID不变,使用乐观锁更新
|
||||
workflow.setId(workflowId);
|
||||
workflow.setVersion(currentVersion); // 使用当前版本号作为乐观锁条件
|
||||
|
||||
// 5. 使用乐观锁更新工作流主表
|
||||
LambdaUpdateWrapper<Workflow> updateWrapper = new LambdaUpdateWrapper<>();
|
||||
updateWrapper.eq(Workflow::getId, workflowId);
|
||||
updateWrapper.eq(Workflow::getVersion, currentVersion);
|
||||
|
||||
// 设置要更新的字段(包括所有关键字段)
|
||||
updateWrapper.set(Workflow::getWorkflowName, history.getWorkflowName());
|
||||
updateWrapper.set(Workflow::getTriggerType, history.getTriggerType());
|
||||
updateWrapper.set(Workflow::getTriggerInterval, history.getTriggerInterval());
|
||||
updateWrapper.set(Workflow::getBlockStrategy, history.getBlockStrategy());
|
||||
updateWrapper.set(Workflow::getExecutorTimeout, history.getExecutorTimeout());
|
||||
updateWrapper.set(Workflow::getDescription, history.getDescription());
|
||||
updateWrapper.set(Workflow::getFlowInfo, history.getFlowInfo()); // 关键: 恢复流程图
|
||||
updateWrapper.set(Workflow::getWfContext, history.getWfContext()); // 关键: 恢复上下文
|
||||
updateWrapper.set(Workflow::getNotifyIds, history.getNotifyIds());
|
||||
updateWrapper.set(Workflow::getOwnerId, history.getOwnerId());
|
||||
updateWrapper.set(Workflow::getExtAttrs, history.getExtAttrs());
|
||||
updateWrapper.set(Workflow::getVersion, history.getVersion());
|
||||
|
||||
Assert.isTrue(workflowMapper.update(null, updateWrapper) > 0,
|
||||
() -> new SnailJobServerException("版本切换失败"));
|
||||
|
||||
// 6. 恢复节点数据(使用当前版本号+1作为新节点的版本)
|
||||
restoreNodesFromHistory(workflowId, versionTag,currentVersion, history.getVersion());
|
||||
|
||||
// 7. 更新历史表的当前版本标记
|
||||
workflowHistoryMapper.update(null,
|
||||
new LambdaUpdateWrapper<WorkflowHistory>()
|
||||
.eq(WorkflowHistory::getWorkflowId, workflowId)
|
||||
.set(WorkflowHistory::getIsCurrent, 0)
|
||||
);
|
||||
|
||||
workflowHistoryMapper.update(null,
|
||||
new LambdaUpdateWrapper<WorkflowHistory>()
|
||||
.eq(WorkflowHistory::getWorkflowId, workflowId)
|
||||
.eq(WorkflowHistory::getVersionTag, versionTag)
|
||||
.set(WorkflowHistory::getIsCurrent, 1)
|
||||
);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public WorkflowDetailResponseVO getVersionDetail(Long workflowId, String versionTag) {
|
||||
// 1. 从历史表查询指定版本
|
||||
WorkflowHistory history = workflowHistoryMapper.selectOne(
|
||||
new LambdaQueryWrapper<WorkflowHistory>()
|
||||
.eq(WorkflowHistory::getWorkflowId, workflowId)
|
||||
.eq(WorkflowHistory::getVersionTag, versionTag)
|
||||
);
|
||||
|
||||
Assert.notNull(history, () -> new SnailJobServerException("版本不存在"));
|
||||
|
||||
// 2. 转换为 Workflow 对象
|
||||
Workflow workflow = new Workflow();
|
||||
BeanUtils.copyProperties(history, workflow);
|
||||
workflow.setId(workflowId);
|
||||
|
||||
// 3. 复用现有的详情构建逻辑,但需要修改节点查询
|
||||
return doGetWorkflowDetailFromHistory(workflow, versionTag);
|
||||
}
|
||||
private WorkflowDetailResponseVO doGetWorkflowDetailFromHistory(Workflow workflow, String versionTag) {
|
||||
WorkflowDetailResponseVO responseVO = WorkflowConverter.INSTANCE.convert(workflow);
|
||||
|
||||
// 关键差异:从历史表查询节点
|
||||
List<WorkflowNodeHistory> nodeHistoryList = workflowNodeHistoryMapper.selectList(
|
||||
new LambdaQueryWrapper<WorkflowNodeHistory>()
|
||||
.eq(WorkflowNodeHistory::getWorkflowId, workflow.getId())
|
||||
.eq(WorkflowNodeHistory::getVersionTag, versionTag)
|
||||
.orderByAsc(WorkflowNodeHistory::getPriorityLevel)
|
||||
);
|
||||
List<WorkflowNode> workflowNodes = nodeHistoryList.stream()
|
||||
.map(historyNode -> {
|
||||
WorkflowNode node = new WorkflowNode();
|
||||
BeanUtils.copyProperties(historyNode, node);
|
||||
return node;
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
// 后续逻辑与 doGetWorkflowDetail 相同
|
||||
// ... (Job查询、节点转换、图构建等)
|
||||
List<Long> jobIds = StreamUtils.toList(workflowNodes, WorkflowNode::getJobId);
|
||||
List<Job> jobs = jobMapper.selectList(new LambdaQueryWrapper<Job>()
|
||||
.in(Job::getId, new HashSet<>(jobIds)));
|
||||
|
||||
Map<Long, Job> jobMap = StreamUtils.toIdentityMap(jobs, Job::getId);
|
||||
|
||||
List<WorkflowDetailResponseVO.NodeInfo> nodeInfos = WorkflowConverter.INSTANCE.convertList(workflowNodes);
|
||||
|
||||
Map<Long, WorkflowDetailResponseVO.NodeInfo> workflowNodeMap = nodeInfos.stream()
|
||||
.peek(nodeInfo -> {
|
||||
JobTaskConfig jobTask = nodeInfo.getJobTask();
|
||||
if (Objects.nonNull(jobTask)) {
|
||||
jobTask.setJobName(jobMap.getOrDefault(jobTask.getJobId(), new Job()).getJobName());
|
||||
}
|
||||
}).collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, i -> i));
|
||||
|
||||
String flowInfo = workflow.getFlowInfo();
|
||||
try {
|
||||
MutableGraph<Long> graph = GraphUtils.deserializeJsonToGraph(flowInfo);
|
||||
// 反序列化构建图
|
||||
WorkflowDetailResponseVO.NodeConfig config = workflowHandler.buildNodeConfig(graph, SystemConstants.ROOT,
|
||||
new HashMap<>(),
|
||||
workflowNodeMap);
|
||||
responseVO.setNodeConfig(config);
|
||||
} catch (Exception e) {
|
||||
log.error("Deserialization failed. json:[{}]", flowInfo, e);
|
||||
throw new SnailJobServerException("Failed to query workflow details");
|
||||
}
|
||||
return responseVO;
|
||||
// return doGetWorkflowDetail(workflow);
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存工作流数据到历史表中
|
||||
* @param workflow
|
||||
* @param versionTag
|
||||
* @param version
|
||||
*/
|
||||
private void saveToHistory(Workflow workflow, String versionTag, Integer version,boolean flag) {
|
||||
WorkflowHistory history = new WorkflowHistory();
|
||||
BeanUtils.copyProperties(workflow, history);
|
||||
history.setCreateDt(null);
|
||||
history.setWorkflowId(workflow.getId());
|
||||
// 版本号生成逻辑
|
||||
if(flag){
|
||||
String finalVersionTag = Optional.ofNullable(versionTag)
|
||||
// 过滤空串和纯空格(trim() 处理用户误输入的空格)
|
||||
.filter(tag -> !tag.trim().isEmpty())
|
||||
// 无有效值时,生成 v+版本号(版本号兜底 1)
|
||||
.orElse("v" + Optional.ofNullable(workflow.getVersion()).orElse(1));
|
||||
|
||||
history.setVersionTag(finalVersionTag);
|
||||
// history.setVersionTag(Optional.ofNullable(versionTag).orElse("v" + Optional.ofNullable(workflow.getVersion()).orElse(1)));
|
||||
}else {
|
||||
history.setVersionTag(workflowHistoryMapper.selectOne( new LambdaUpdateWrapper<WorkflowHistory>()
|
||||
.eq(WorkflowHistory::getWorkflowId, workflow.getId())
|
||||
.eq(WorkflowHistory::getVersion, version)).getVersionTag());
|
||||
}
|
||||
history.setIsCurrent(1);
|
||||
// history.setVersion(version);
|
||||
|
||||
// 将之前的版本标记为非当前
|
||||
workflowHistoryMapper.update(null,
|
||||
new LambdaUpdateWrapper<WorkflowHistory>()
|
||||
.eq(WorkflowHistory::getWorkflowId, workflow.getId())
|
||||
.set(WorkflowHistory::getIsCurrent, 0)
|
||||
);
|
||||
workflowHistoryMapper.delete(
|
||||
new LambdaUpdateWrapper<WorkflowHistory>()
|
||||
.eq(WorkflowHistory::getWorkflowId, workflow.getId())
|
||||
.eq(WorkflowHistory::getVersion, version));
|
||||
workflowHistoryMapper.insert(history);
|
||||
|
||||
// 同时保存节点历史
|
||||
List<WorkflowNode> nodes = workflowNodeMapper.selectList(
|
||||
new LambdaQueryWrapper<WorkflowNode>()
|
||||
.eq(WorkflowNode::getWorkflowId, workflow.getId())
|
||||
.eq(WorkflowNode::getVersion, version)
|
||||
);
|
||||
|
||||
for (WorkflowNode node : nodes) {
|
||||
WorkflowNodeHistory nodeHistory = new WorkflowNodeHistory();
|
||||
BeanUtils.copyProperties(node, nodeHistory);
|
||||
nodeHistory.setWorkflowId(workflow.getId());
|
||||
nodeHistory.setVersionTag(history.getVersionTag());
|
||||
workflowNodeHistoryMapper.insertOrUpdate(nodeHistory);
|
||||
}
|
||||
}
|
||||
|
||||
/***
|
||||
* 从历史表中回复数据到node表中
|
||||
* @param workflowId
|
||||
* @param versionTag
|
||||
* @param newVersion
|
||||
* @param oldVersion
|
||||
*/
|
||||
private void restoreNodesFromHistory(Long workflowId, String versionTag, Integer newVersion, Integer oldVersion) {
|
||||
// 删除当前版本的节点
|
||||
workflowNodeMapper.delete(
|
||||
new LambdaQueryWrapper<WorkflowNode>()
|
||||
.eq(WorkflowNode::getWorkflowId, workflowId)
|
||||
);
|
||||
|
||||
// 从历史表恢复节点
|
||||
List<WorkflowNodeHistory> nodeHistoryList = workflowNodeHistoryMapper.selectList(
|
||||
new LambdaQueryWrapper<WorkflowNodeHistory>()
|
||||
.eq(WorkflowNodeHistory::getWorkflowId, workflowId)
|
||||
.eq(WorkflowNodeHistory::getVersion, oldVersion)
|
||||
);
|
||||
|
||||
// 遍历历史节点并插入到主表
|
||||
for (WorkflowNodeHistory nodeHistory : nodeHistoryList) {
|
||||
WorkflowNode node = new WorkflowNode();
|
||||
BeanUtils.copyProperties(nodeHistory, node);
|
||||
// node.setId(null); // 清空ID,让数据库自动生成新ID
|
||||
// node.setVersion(newVersion); // 设置新的版本号
|
||||
workflowNodeMapper.insert(node);
|
||||
}
|
||||
}
|
||||
|
||||
// 获取版本列表
|
||||
@Override
|
||||
public List<WorkflowVersionVO> getVersionList(Long workflowId) {
|
||||
List<WorkflowHistory> historyList = workflowHistoryMapper.selectList(
|
||||
new LambdaQueryWrapper<WorkflowHistory>()
|
||||
.eq(WorkflowHistory::getWorkflowId, workflowId)
|
||||
.orderByDesc(WorkflowHistory::getCreateDt)
|
||||
);
|
||||
|
||||
return historyList.stream()
|
||||
.map(this::convertToVersionVO)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
// 手动转换方法
|
||||
private WorkflowVersionVO convertToVersionVO(WorkflowHistory history) {
|
||||
WorkflowVersionVO vo = new WorkflowVersionVO();
|
||||
vo.setVersionTag(history.getVersionTag());
|
||||
vo.setVersion(history.getVersion());
|
||||
vo.setIsCurrent(history.getIsCurrent());
|
||||
vo.setCreateDt(history.getCreateDt());
|
||||
return vo;
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user