diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/JobTaskConfig.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/JobTaskConfig.java index 574fa6fb..21b31d77 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/JobTaskConfig.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/dto/JobTaskConfig.java @@ -15,7 +15,10 @@ public class JobTaskConfig { /** * 任务ID */ - @NotNull(message = "任务ID不能为空") private Long jobId; + /** + * 任务名称 + */ + private String jobName; } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java index 5b660dbf..1820ae6e 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanJobTaskActor.java @@ -20,10 +20,13 @@ import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTaskDTO; import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.GroupConfigMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; @@ -32,6 +35,7 @@ import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.util.*; +import java.util.stream.Collectors; /** @@ -44,12 +48,12 @@ import java.util.*; @Component(ActorGenerator.SCAN_JOB_ACTOR) @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) @Slf4j +@RequiredArgsConstructor public class ScanJobTaskActor extends AbstractActor { - @Autowired - private JobMapper jobMapper; - @Autowired - private SystemProperties systemProperties; + private final JobMapper jobMapper; + private final SystemProperties systemProperties; + private final GroupConfigMapper groupConfigMapper; @Override public Receive createReceive() { @@ -71,7 +75,7 @@ public class ScanJobTaskActor extends AbstractActor { } long total = PartitionTaskUtils.process(startId -> listAvailableJobs(startId, scanTask), - this::processJobPartitionTasks, 0); + this::processJobPartitionTasks, 0); log.info("job scan end. total:[{}]", total); } @@ -97,7 +101,7 @@ public class ScanJobTaskActor extends AbstractActor { } private void processJob(JobPartitionTaskDTO partitionTask, final List waitUpdateJobs, - final List waitExecJobs, long now) { + final List waitExecJobs, long now) { CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName(), partitionTask.getNamespaceId()); Job job = new Job(); @@ -113,7 +117,7 @@ public class ScanJobTaskActor extends AbstractActor { triggerTask = Objects.isNull(nextTriggerAt); // 若出现常驻任务时间为null或者常驻任务的内存时间长期未更新, 刷新为now if (Objects.isNull(nextTriggerAt) - || (nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) { + || (nextTriggerAt + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) < now) { nextTriggerAt = now; } } @@ -158,19 +162,30 @@ public class ScanJobTaskActor extends AbstractActor { } List jobs = jobMapper.selectPage(new PageDTO<>(0, systemProperties.getJobPullPageSize()), - new LambdaQueryWrapper() - .select(Job::getGroupName, Job::getNextTriggerAt, Job::getBlockStrategy, Job::getTriggerType, - Job::getTriggerInterval, Job::getExecutorTimeout, Job::getTaskType, Job::getResident, - Job::getId, Job::getNamespaceId) - .eq(Job::getJobStatus, StatusEnum.YES.getStatus()) - .eq(Job::getDeleted, StatusEnum.NO.getStatus()) - .ne(Job::getTriggerType,TriggerTypeEnum.WORKFLOW.getType()) - .in(Job::getBucketIndex, scanTask.getBuckets()) - .le(Job::getNextTriggerAt, DateUtils.toNowMilli() + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) - .ge(Job::getId, startId) - .orderByAsc(Job::getId) + new LambdaQueryWrapper() + .select(Job::getGroupName, Job::getNextTriggerAt, Job::getBlockStrategy, Job::getTriggerType, + Job::getTriggerInterval, Job::getExecutorTimeout, Job::getTaskType, Job::getResident, + Job::getId, Job::getNamespaceId) + .eq(Job::getJobStatus, StatusEnum.YES.getStatus()) + .eq(Job::getDeleted, StatusEnum.NO.getStatus()) + .ne(Job::getTriggerType, TriggerTypeEnum.WORKFLOW.getType()) + .in(Job::getBucketIndex, scanTask.getBuckets()) + .le(Job::getNextTriggerAt, + DateUtils.toNowMilli() + DateUtils.toEpochMilli(SystemConstants.SCHEDULE_PERIOD)) + .ge(Job::getId, startId) + .orderByAsc(Job::getId) ).getRecords(); + // 过滤已关闭的组 + if (!CollectionUtils.isEmpty(jobs)) { + List groupConfigs = groupConfigMapper.selectList(new LambdaQueryWrapper() + .select(GroupConfig::getGroupName) + .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()) + .eq(GroupConfig::getGroupName, jobs.stream().map(Job::getGroupName).collect(Collectors.toList()))) + .stream().map(GroupConfig::getGroupName).collect(Collectors.toList()); + jobs = jobs.stream().filter(job -> groupConfigs.contains(job.getGroupName())).collect(Collectors.toList()); + } + return JobTaskConverter.INSTANCE.toJobPartitionTasks(jobs); } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java index b9814a6c..9ca818a5 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/ScanWorkflowTaskActor.java @@ -18,7 +18,10 @@ import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils; import com.aizuda.easy.retry.server.job.task.dto.WorkflowPartitionTaskDTO; import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.GroupConfigMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig; +import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; @@ -32,6 +35,7 @@ import org.springframework.util.CollectionUtils; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; /** * @author xiaowoniu @@ -45,6 +49,7 @@ import java.util.List; public class ScanWorkflowTaskActor extends AbstractActor { private final WorkflowMapper workflowMapper; private final SystemProperties systemProperties; + private final GroupConfigMapper groupConfigMapper; @Override public Receive createReceive() { @@ -134,6 +139,16 @@ public class ScanWorkflowTaskActor extends AbstractActor { .orderByAsc(Workflow::getId) ).getRecords(); + // 过滤已关闭的组 + if (!CollectionUtils.isEmpty(workflows)) { + List groupConfigs = groupConfigMapper.selectList(new LambdaQueryWrapper() + .select(GroupConfig::getGroupName) + .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()) + .eq(GroupConfig::getGroupName, workflows.stream().map(Workflow::getGroupName).collect(Collectors.toList()))) + .stream().map(GroupConfig::getGroupName).collect(Collectors.toList()); + workflows = workflows.stream().filter(workflow -> groupConfigs.contains(workflow.getGroupName())).collect(Collectors.toList()); + } + return WorkflowTaskConverter.INSTANCE.toWorkflowPartitionTaskList(workflows); } } diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowBatchServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowBatchServiceImpl.java index b77976c7..987d039f 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowBatchServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowBatchServiceImpl.java @@ -23,10 +23,12 @@ import com.aizuda.easy.retry.server.web.service.handler.WorkflowHandler; import com.aizuda.easy.retry.server.web.util.UserSessionUtils; import com.aizuda.easy.retry.template.datasource.persistence.dataobject.WorkflowBatchQueryDO; import com.aizuda.easy.retry.template.datasource.persistence.dataobject.WorkflowBatchResponseDO; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow; import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode; @@ -62,6 +64,7 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService { private final JobTaskBatchMapper jobTaskBatchMapper; private final WorkflowHandler workflowHandler; private final WorkflowBatchHandler workflowBatchHandler; + private final JobMapper jobMapper; @Override public PageResult> listPage(WorkflowBatchQueryVO queryVO) { @@ -102,7 +105,10 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService { @Override public WorkflowDetailResponseVO getWorkflowBatchDetail(Long id) { - WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectById(id); + WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne( + new LambdaQueryWrapper() + .eq(WorkflowTaskBatch::getId, id) + .eq(WorkflowTaskBatch::getNamespaceId, UserSessionUtils.currentUserSession().getNamespaceId())); if (Objects.isNull(workflowTaskBatch)) { return null; } @@ -114,6 +120,12 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService { .eq(WorkflowNode::getDeleted, StatusEnum.NO.getStatus()) .eq(WorkflowNode::getWorkflowId, workflow.getId())); + List jobIds = workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toList()); + List jobs = jobMapper.selectList(new LambdaQueryWrapper() + .in(Job::getId, new HashSet<>(jobIds))); + + Map jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, job -> job)); + List alJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper() .eq(JobTaskBatch::getWorkflowTaskBatchId, id).orderByDesc(JobTaskBatch::getId)); @@ -127,6 +139,12 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService { Set allNoOperationNode = Sets.newHashSet(); Map workflowNodeMap = nodeInfos.stream() .peek(nodeInfo -> { + + JobTaskConfig jobTask = nodeInfo.getJobTask(); + if(Objects.nonNull(jobTask)) { + jobTask.setJobName(jobMap.getOrDefault(jobTask.getJobId(), new Job()).getJobName()); + } + List jobTaskBatchList = jobTaskBatchMap.get(nodeInfo.getId()); if (!CollectionUtils.isEmpty(jobTaskBatchList)) { nodeInfo.setJobBatchList(JobBatchResponseVOConverter.INSTANCE.jobTaskBatchToJobBatchResponseVOs(jobTaskBatchList)); diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java index a5aa40f1..7e34d5d8 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java @@ -8,6 +8,7 @@ import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.common.WaitStrategy; import com.aizuda.easy.retry.server.common.config.SystemProperties; +import com.aizuda.easy.retry.server.common.dto.JobTaskConfig; import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.strategy.WaitStrategies; @@ -30,8 +31,10 @@ import com.aizuda.easy.retry.server.web.service.WorkflowService; import com.aizuda.easy.retry.server.web.service.convert.WorkflowConverter; import com.aizuda.easy.retry.server.web.service.handler.WorkflowHandler; import com.aizuda.easy.retry.server.web.util.UserSessionUtils; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow; import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; @@ -66,6 +69,8 @@ public class WorkflowServiceImpl implements WorkflowService { private final WorkflowHandler workflowHandler; @Lazy private final WorkflowPrePareHandler terminalWorkflowPrepareHandler; + private final JobMapper jobMapper; + @Override @Transactional @@ -80,7 +85,8 @@ public class WorkflowServiceImpl implements WorkflowService { workflow.setVersion(1); workflow.setNextTriggerAt(calculateNextTriggerAt(workflowRequestVO, DateUtils.toNowMilli())); workflow.setFlowInfo(StrUtil.EMPTY); - workflow.setBucketIndex(HashUtil.bkdrHash(workflowRequestVO.getGroupName() + workflowRequestVO.getWorkflowName()) + workflow.setBucketIndex( + HashUtil.bkdrHash(workflowRequestVO.getGroupName() + workflowRequestVO.getWorkflowName()) % systemProperties.getBucketTotal()); workflow.setNamespaceId(UserSessionUtils.currentUserSession().getNamespaceId()); Assert.isTrue(1 == workflowMapper.insert(workflow), () -> new EasyRetryServerException("新增工作流失败")); @@ -90,10 +96,10 @@ public class WorkflowServiceImpl implements WorkflowService { // 递归构建图 workflowHandler.buildGraph(Lists.newArrayList(SystemConstants.ROOT), - new LinkedBlockingDeque<>(), - workflowRequestVO.getGroupName(), - workflow.getId(), nodeConfig, graph, - workflow.getVersion()); + new LinkedBlockingDeque<>(), + workflowRequestVO.getGroupName(), + workflow.getId(), nodeConfig, graph, + workflow.getVersion()); log.info("图构建完成. graph:[{}]", graph); // 保存图信息 @@ -115,8 +121,8 @@ public class WorkflowServiceImpl implements WorkflowService { private static void checkExecuteInterval(WorkflowRequestVO requestVO) { if (Lists.newArrayList(WaitStrategies.WaitStrategyEnum.FIXED.getType(), - WaitStrategies.WaitStrategyEnum.RANDOM.getType()) - .contains(requestVO.getTriggerType())) { + WaitStrategies.WaitStrategyEnum.RANDOM.getType()) + .contains(requestVO.getTriggerType())) { if (Integer.parseInt(requestVO.getTriggerInterval()) < 10) { throw new EasyRetryServerException("触发间隔不得小于10"); } @@ -130,29 +136,45 @@ public class WorkflowServiceImpl implements WorkflowService { @Override public WorkflowDetailResponseVO getWorkflowDetail(Long id) { - Workflow workflow = workflowMapper.selectById(id); + Workflow workflow = workflowMapper.selectOne( + new LambdaQueryWrapper() + .eq(Workflow::getId, id) + .eq(Workflow::getNamespaceId, UserSessionUtils.currentUserSession().getNamespaceId()) + ); if (Objects.isNull(workflow)) { return null; } WorkflowDetailResponseVO responseVO = WorkflowConverter.INSTANCE.toWorkflowDetailResponseVO(workflow); List workflowNodes = workflowNodeMapper.selectList(new LambdaQueryWrapper() - .eq(WorkflowNode::getDeleted, 0) - .eq(WorkflowNode::getVersion, workflow.getVersion()) - .eq(WorkflowNode::getWorkflowId, id) - .orderByAsc(WorkflowNode::getPriorityLevel)); + .eq(WorkflowNode::getDeleted, 0) + .eq(WorkflowNode::getVersion, workflow.getVersion()) + .eq(WorkflowNode::getWorkflowId, id) + .orderByAsc(WorkflowNode::getPriorityLevel)); + + List jobIds = workflowNodes.stream().map(WorkflowNode::getJobId).collect(Collectors.toList()); + List jobs = jobMapper.selectList(new LambdaQueryWrapper() + .in(Job::getId, new HashSet<>(jobIds))); + + Map jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, job -> job)); List nodeInfos = WorkflowConverter.INSTANCE.toNodeInfo(workflowNodes); Map workflowNodeMap = nodeInfos.stream() - .collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, i -> i)); + .peek(nodeInfo -> { + JobTaskConfig jobTask = nodeInfo.getJobTask(); + if (Objects.nonNull(jobTask)) { + jobTask.setJobName(jobMap.getOrDefault(jobTask.getJobId(), new Job()).getJobName()); + } + }).collect(Collectors.toMap(WorkflowDetailResponseVO.NodeInfo::getId, i -> i)); String flowInfo = workflow.getFlowInfo(); try { MutableGraph graph = GraphUtils.deserializeJsonToGraph(flowInfo); // 反序列化构建图 - WorkflowDetailResponseVO.NodeConfig config = workflowHandler.buildNodeConfig(graph, SystemConstants.ROOT, new HashMap<>(), - workflowNodeMap); + WorkflowDetailResponseVO.NodeConfig config = workflowHandler.buildNodeConfig(graph, SystemConstants.ROOT, + new HashMap<>(), + workflowNodeMap); responseVO.setNodeConfig(config); } catch (Exception e) { log.error("反序列化失败. json:[{}]", flowInfo, e); @@ -197,7 +219,7 @@ public class WorkflowServiceImpl implements WorkflowService { int version = workflow.getVersion(); // 递归构建图 workflowHandler.buildGraph(Lists.newArrayList(SystemConstants.ROOT), new LinkedBlockingDeque<>(), - workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph, version + 1); + workflowRequestVO.getGroupName(), workflowRequestVO.getId(), nodeConfig, graph, version + 1); log.info("图构建完成. graph:[{}]", graph); @@ -208,8 +230,8 @@ public class WorkflowServiceImpl implements WorkflowService { workflow.setNextTriggerAt(calculateNextTriggerAt(workflowRequestVO, DateUtils.toNowMilli())); workflow.setFlowInfo(JsonUtil.toJsonString(GraphUtils.serializeGraphToJson(graph))); Assert.isTrue(workflowMapper.update(workflow, new LambdaQueryWrapper() - .eq(Workflow::getId, workflow.getId()) - .eq(Workflow::getVersion, version) + .eq(Workflow::getId, workflow.getId()) + .eq(Workflow::getVersion, version) ) > 0, () -> new EasyRetryServerException("更新失败")); return Boolean.TRUE; @@ -218,9 +240,9 @@ public class WorkflowServiceImpl implements WorkflowService { @Override public Boolean updateStatus(Long id) { Workflow workflow = workflowMapper.selectOne( - new LambdaQueryWrapper() - .select(Workflow::getId, Workflow::getWorkflowStatus) - .eq(Workflow::getId, id)); + new LambdaQueryWrapper() + .select(Workflow::getId, Workflow::getWorkflowStatus) + .eq(Workflow::getId, id)); Assert.notNull(workflow, () -> new EasyRetryServerException("工作流不存在")); if (Objects.equals(workflow.getWorkflowStatus(), StatusEnum.NO.getStatus())) {