feat: 2.6.0

1. 修复定时任务和工作流未过滤关闭组问题
2. 修复工作流详情未过滤命名空间问题
3. 工作流详情添加定时任务名称
This commit is contained in:
byteblogs168 2024-01-15 17:36:00 +08:00
parent 09b0854184
commit b8a2322cbb
5 changed files with 114 additions and 41 deletions

View File

@ -15,7 +15,10 @@ public class JobTaskConfig {
/**
* 任务ID
*/
@NotNull(message = "任务ID不能为空")
private Long jobId;
/**
* 任务名称
*/
private String jobName;
}

View File

@ -20,10 +20,13 @@ import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.GroupConfigMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
@ -32,6 +35,7 @@ import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.*;
import java.util.stream.Collectors;
/**
@ -44,12 +48,12 @@ import java.util.*;
@Component(ActorGenerator.SCAN_JOB_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
@RequiredArgsConstructor
public class ScanJobTaskActor extends AbstractActor {
@Autowired
private JobMapper jobMapper;
@Autowired
private SystemProperties systemProperties;
private final JobMapper jobMapper;
private final SystemProperties systemProperties;
private final GroupConfigMapper groupConfigMapper;
@Override
public Receive createReceive() {
@ -71,7 +75,7 @@ public class ScanJobTaskActor extends AbstractActor {
}
long total = PartitionTaskUtils.process(startId -> listAvailableJobs(startId, scanTask),
this::processJobPartitionTasks, 0);
this::processJobPartitionTasks, 0);
log.info("job scan end. total:[{}]", total);
}
@ -97,7 +101,7 @@ public class ScanJobTaskActor extends AbstractActor {
}
private void processJob(JobPartitionTaskDTO partitionTask, final List<Job> waitUpdateJobs,
final List<JobTaskPrepareDTO> waitExecJobs, long now) {
final List<JobTaskPrepareDTO> waitExecJobs, long now) {
CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName(), partitionTask.getNamespaceId());
Job job = new Job();
@ -113,7 +117,7 @@ public class ScanJobTaskActor extends AbstractActor {
triggerTask = Objects.isNull(nextTriggerAt);
// 若出现常驻任务时间为null或者常驻任务的内存时间长期未更新, 刷新为now
if (Objects.isNull(nextTriggerAt)
|| (nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) {
|| (nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) {
nextTriggerAt = now;
}
}
@ -158,19 +162,30 @@ public class ScanJobTaskActor extends AbstractActor {
}
List<Job> jobs = jobMapper.selectPage(new PageDTO<>(0, systemProperties.getJobPullPageSize()),
new LambdaQueryWrapper<Job>()
.select(Job::getGroupName, Job::getNextTriggerAt, Job::getBlockStrategy, Job::getTriggerType,
Job::getTriggerInterval, Job::getExecutorTimeout, Job::getTaskType, Job::getResident,
Job::getId, Job::getNamespaceId)
.eq(Job::getJobStatus, StatusEnum.YES.getStatus())
.eq(Job::getDeleted, StatusEnum.NO.getStatus())
.ne(Job::getTriggerType,TriggerTypeEnum.WORKFLOW.getType())
.in(Job::getBucketIndex, scanTask.getBuckets())
.le(Job::getNextTriggerAt, DateUtils.toNowMilli() + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD))
.ge(Job::getId, startId)
.orderByAsc(Job::getId)
new LambdaQueryWrapper<Job>()
.select(Job::getGroupName, Job::getNextTriggerAt, Job::getBlockStrategy, Job::getTriggerType,
Job::getTriggerInterval, Job::getExecutorTimeout, Job::getTaskType, Job::getResident,
Job::getId, Job::getNamespaceId)
.eq(Job::getJobStatus, StatusEnum.YES.getStatus())
.eq(Job::getDeleted, StatusEnum.NO.getStatus())
.ne(Job::getTriggerType, TriggerTypeEnum.WORKFLOW.getType())
.in(Job::getBucketIndex, scanTask.getBuckets())
.le(Job::getNextTriggerAt,
DateUtils.toNowMilli() + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD))
.ge(Job::getId, startId)
.orderByAsc(Job::getId)
).getRecords();
// 过滤已关闭的组
if (!CollectionUtils.isEmpty(jobs)) {
List<String> groupConfigs = groupConfigMapper.selectList(new LambdaQueryWrapper<GroupConfig>()
.select(GroupConfig::getGroupName)
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
.eq(GroupConfig::getGroupName, jobs.stream().map(Job::getGroupName).collect(Collectors.toList())))
.stream().map(GroupConfig::getGroupName).collect(Collectors.toList());
jobs = jobs.stream().filter(job -> groupConfigs.contains(job.getGroupName())).collect(Collectors.toList());
}
return JobTaskConverter.INSTANCE.toJobPartitionTasks(jobs);
}
}

View File

@ -18,7 +18,10 @@ import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowPartitionTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.GroupConfigMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
@ -32,6 +35,7 @@ import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
/**
* @author xiaowoniu
@ -45,6 +49,7 @@ import java.util.List;
public class ScanWorkflowTaskActor extends AbstractActor {
private final WorkflowMapper workflowMapper;
private final SystemProperties systemProperties;
private final GroupConfigMapper groupConfigMapper;
@Override
public Receive createReceive() {
@ -134,6 +139,16 @@ public class ScanWorkflowTaskActor extends AbstractActor {
.orderByAsc(Workflow::getId)
).getRecords();
// 过滤已关闭的组
if (!CollectionUtils.isEmpty(workflows)) {
List<String> groupConfigs = groupConfigMapper.selectList(new LambdaQueryWrapper<GroupConfig>()
.select(GroupConfig::getGroupName)
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
.eq(GroupConfig::getGroupName, workflows.stream().map(Workflow::getGroupName).collect(Collectors.toList())))
.stream().map(GroupConfig::getGroupName).collect(Collectors.toList());
workflows = workflows.stream().filter(workflow -> groupConfigs.contains(workflow.getGroupName())).collect(Collectors.toList());
}
return WorkflowTaskConverter.INSTANCE.toWorkflowPartitionTaskList(workflows);
}
}

View File

@ -23,10 +23,12 @@ import com.aizuda.easy.retry.server.web.service.handler.WorkflowHandler;
import com.aizuda.easy.retry.server.web.util.UserSessionUtils;
import com.aizuda.easy.retry.template.datasource.persistence.dataobject.WorkflowBatchQueryDO;
import com.aizuda.easy.retry.template.datasource.persistence.dataobject.WorkflowBatchResponseDO;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode;
@ -62,6 +64,7 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
private final JobTaskBatchMapper jobTaskBatchMapper;
private final WorkflowHandler workflowHandler;
private final WorkflowBatchHandler workflowBatchHandler;
private final JobMapper jobMapper;
@Override
public PageResult<List<WorkflowBatchResponseVO>> listPage(WorkflowBatchQueryVO queryVO) {
@ -102,7 +105,10 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
@Override
public WorkflowDetailResponseVO getWorkflowBatchDetail(Long id) {
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectById(id);
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne(
new LambdaQueryWrapper<WorkflowTaskBatch>()
.eq(WorkflowTaskBatch::getId, id)
.eq(WorkflowTaskBatch::getNamespaceId, UserSessionUtils.currentUserSession().getNamespaceId()));
if (Objects.isNull(workflowTaskBatch)) {
return null;
}
@ -114,6 +120,12 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
.eq(WorkflowNode::getDeleted, StatusEnum.NO.getStatus())
.eq(WorkflowNode::getWorkflowId, workflow.getId()));
List<Long> jobIds = workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toList());
List<Job> jobs = jobMapper.selectList(new LambdaQueryWrapper<Job>()
.in(Job::getId, new HashSet<>(jobIds)));
Map<Long, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, job -> job));
List<JobTaskBatch> alJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getWorkflowTaskBatchId, id).orderByDesc(JobTaskBatch::getId));
@ -127,6 +139,12 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
Set<Long> allNoOperationNode = Sets.newHashSet();
Map<Long, WorkflowDetailResponseVO.NodeInfo> workflowNodeMap = nodeInfos.stream()
.peek(nodeInfo -> {
JobTaskConfig jobTask = nodeInfo.getJobTask();
if(Objects.nonNull(jobTask)) {
jobTask.setJobName(jobMap.getOrDefault(jobTask.getJobId(), new Job()).getJobName());
}
List<JobTaskBatch> jobTaskBatchList = jobTaskBatchMap.get(nodeInfo.getId());
if (!CollectionUtils.isEmpty(jobTaskBatchList)) {
nodeInfo.setJobBatchList(JobBatchResponseVOConverter.INSTANCE.jobTaskBatchToJobBatchResponseVOs(jobTaskBatchList));

View File

@ -8,6 +8,7 @@ import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.JobTaskConfig;
import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
@ -30,8 +31,10 @@ import com.aizuda.easy.retry.server.web.service.WorkflowService;
import com.aizuda.easy.retry.server.web.service.convert.WorkflowConverter;
import com.aizuda.easy.retry.server.web.service.handler.WorkflowHandler;
import com.aizuda.easy.retry.server.web.util.UserSessionUtils;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@ -66,6 +69,8 @@ public class WorkflowServiceImpl implements WorkflowService {
private final WorkflowHandler workflowHandler;
@Lazy
private final WorkflowPrePareHandler terminalWorkflowPrepareHandler;
private final JobMapper jobMapper;
@Override
@Transactional
@ -80,7 +85,8 @@ public class WorkflowServiceImpl implements WorkflowService {
workflow.setVersion(1);
workflow.setNextTriggerAt(calculateNextTriggerAt(workflowRequestVO, DateUtils.toNowMilli()));
workflow.setFlowInfo(StrUtil.EMPTY);
workflow.setBucketIndex(HashUtil.bkdrHash(workflowRequestVO.getGroupName() + workflowRequestVO.getWorkflowName())
workflow.setBucketIndex(
HashUtil.bkdrHash(workflowRequestVO.getGroupName() + workflowRequestVO.getWorkflowName())
% systemProperties.getBucketTotal());
workflow.setNamespaceId(UserSessionUtils.currentUserSession().getNamespaceId());
Assert.isTrue(1 == workflowMapper.insert(workflow), () -> new EasyRetryServerException("新增工作流失败"));
@ -90,10 +96,10 @@ public class WorkflowServiceImpl implements WorkflowService {
// 递归构建图
workflowHandler.buildGraph(Lists.newArrayList(SystemConstants.ROOT),
new LinkedBlockingDeque<>(),
workflowRequestVO.getGroupName(),
workflow.getId(), nodeConfig, graph,
workflow.getVersion());
new LinkedBlockingDeque<>(),
workflowRequestVO.getGroupName(),
workflow.getId(), nodeConfig, graph,
workflow.getVersion());
log.info("图构建完成. graph:[{}]", graph);
// 保存图信息
@ -115,8 +121,8 @@ public class WorkflowServiceImpl implements WorkflowService {
private static void checkExecuteInterval(WorkflowRequestVO requestVO) {
if (Lists.newArrayList(WaitStrategies.WaitStrategyEnum.FIXED.getType(),
WaitStrategies.WaitStrategyEnum.RANDOM.getType())
.contains(requestVO.getTriggerType())) {
WaitStrategies.WaitStrategyEnum.RANDOM.getType())
.contains(requestVO.getTriggerType())) {
if (Integer.parseInt(requestVO.getTriggerInterval()) < 10) {
throw new EasyRetryServerException("触发间隔不得小于10");
}
@ -130,29 +136,45 @@ public class WorkflowServiceImpl implements WorkflowService {
@Override
public WorkflowDetailResponseVO getWorkflowDetail(Long id) {
Workflow workflow = workflowMapper.selectById(id);
Workflow workflow = workflowMapper.selectOne(
new LambdaQueryWrapper<Workflow>()
.eq(Workflow::getId, id)
.eq(Workflow::getNamespaceId, UserSessionUtils.currentUserSession().getNamespaceId())
);
if (Objects.isNull(workflow)) {
return null;
}
WorkflowDetailResponseVO responseVO = WorkflowConverter.INSTANCE.toWorkflowDetailResponseVO(workflow);
List<WorkflowNode> workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper<WorkflowNode>()
.eq(WorkflowNode::getDeleted, 0)
.eq(WorkflowNode::getVersion, workflow.getVersion())
.eq(WorkflowNode::getWorkflowId, id)
.orderByAsc(WorkflowNode::getPriorityLevel));
.eq(WorkflowNode::getDeleted, 0)
.eq(WorkflowNode::getVersion, workflow.getVersion())
.eq(WorkflowNode::getWorkflowId, id)
.orderByAsc(WorkflowNode::getPriorityLevel));
List<Long> jobIds = workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toList());
List<Job> jobs = jobMapper.selectList(new LambdaQueryWrapper<Job>()
.in(Job::getId, new HashSet<>(jobIds)));
Map<Long, Job> jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, job -> job));
List<WorkflowDetailResponseVO.NodeInfo> nodeInfos = WorkflowConverter.INSTANCE.toNodeInfo(workflowNodes);
Map<Long, WorkflowDetailResponseVO.NodeInfo> workflowNodeMap = nodeInfos.stream()
.collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, i -> i));
.peek(nodeInfo -> {
JobTaskConfig jobTask = nodeInfo.getJobTask();
if (Objects.nonNull(jobTask)) {
jobTask.setJobName(jobMap.getOrDefault(jobTask.getJobId(), new Job()).getJobName());
}
}).collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, i -> i));
String flowInfo = workflow.getFlowInfo();
try {
MutableGraph<Long> graph = GraphUtils.deserializeJsonToGraph(flowInfo);
// 反序列化构建图
WorkflowDetailResponseVO.NodeConfig config = workflowHandler.buildNodeConfig(graph, SystemConstants.ROOT, new HashMap<>(),
workflowNodeMap);
WorkflowDetailResponseVO.NodeConfig config = workflowHandler.buildNodeConfig(graph, SystemConstants.ROOT,
new HashMap<>(),
workflowNodeMap);
responseVO.setNodeConfig(config);
} catch (Exception e) {
log.error("反序列化失败. json:[{}]", flowInfo, e);
@ -197,7 +219,7 @@ public class WorkflowServiceImpl implements WorkflowService {
int version = workflow.getVersion();
// 递归构建图
workflowHandler.buildGraph(Lists.newArrayList(SystemConstants.ROOT), new LinkedBlockingDeque<>(),
workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph, version + 1);
workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph, version + 1);
log.info("图构建完成. graph:[{}]", graph);
@ -208,8 +230,8 @@ public class WorkflowServiceImpl implements WorkflowService {
workflow.setNextTriggerAt(calculateNextTriggerAt(workflowRequestVO, DateUtils.toNowMilli()));
workflow.setFlowInfo(JsonUtil.toJsonString(GraphUtils.serializeGraphToJson(graph)));
Assert.isTrue(workflowMapper.update(workflow, new LambdaQueryWrapper<Workflow>()
.eq(Workflow::getId, workflow.getId())
.eq(Workflow::getVersion, version)
.eq(Workflow::getId, workflow.getId())
.eq(Workflow::getVersion, version)
) > 0, () -> new EasyRetryServerException("更新失败"));
return Boolean.TRUE;
@ -218,9 +240,9 @@ public class WorkflowServiceImpl implements WorkflowService {
@Override
public Boolean updateStatus(Long id) {
Workflow workflow = workflowMapper.selectOne(
new LambdaQueryWrapper<Workflow>()
.select(Workflow::getId, Workflow::getWorkflowStatus)
.eq(Workflow::getId, id));
new LambdaQueryWrapper<Workflow>()
.select(Workflow::getId, Workflow::getWorkflowStatus)
.eq(Workflow::getId, id));
Assert.notNull(workflow, () -> new EasyRetryServerException("工作流不存在"));
if (Objects.equals(workflow.getWorkflowStatus(), StatusEnum.NO.getStatus())) {