diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java index 03c531c1..5fc06c35 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java @@ -18,6 +18,8 @@ import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.util.CollectionUtils; import java.time.LocalDateTime; @@ -43,7 +45,7 @@ public class JobTaskBatchHandler { List jobTasks = jobTaskMapper.selectList( new LambdaQueryWrapper() - .select(JobTask::getTaskStatus, JobTask::getResultMessage) + .select(JobTask::getTaskStatus, JobTask::getResultMessage) .eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId())); JobTaskBatch jobTaskBatch = new JobTaskBatch(); @@ -76,27 +78,34 @@ public class JobTaskBatchHandler { jobTaskBatch.setOperationReason(completeJobBatchDTO.getJobOperationReason()); } - if (Objects.nonNull(completeJobBatchDTO.getWorkflowNodeId()) && Objects.nonNull(completeJobBatchDTO.getWorkflowTaskBatchId())) { - // 若是工作流则开启下一个任务 - try { - WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); - taskExecuteDTO.setWorkflowTaskBatchId(completeJobBatchDTO.getWorkflowTaskBatchId()); - taskExecuteDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType()); - taskExecuteDTO.setParentId(completeJobBatchDTO.getWorkflowNodeId()); - // 这里取第一个的任务执行结果 - taskExecuteDTO.setTaskBatchId(completeJobBatchDTO.getTaskBatchId()); - ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); - actorRef.tell(taskExecuteDTO, actorRef); - } catch (Exception e) { - log.error("任务调度执行失败", e); - } + if (Objects.nonNull(completeJobBatchDTO.getWorkflowNodeId()) && Objects.nonNull( + completeJobBatchDTO.getWorkflowTaskBatchId())) { + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCompletion(int status) { + // 若是工作流则开启下一个任务 + try { + WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); + taskExecuteDTO.setWorkflowTaskBatchId(completeJobBatchDTO.getWorkflowTaskBatchId()); + taskExecuteDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType()); + taskExecuteDTO.setParentId(completeJobBatchDTO.getWorkflowNodeId()); + // 这里取第一个的任务执行结果 + taskExecuteDTO.setTaskBatchId(completeJobBatchDTO.getTaskBatchId()); + ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); + actorRef.tell(taskExecuteDTO, actorRef); + } catch (Exception e) { + log.error("任务调度执行失败", e); + } + } + }); + } jobTaskBatch.setUpdateDt(LocalDateTime.now()); return 1 == jobTaskBatchMapper.update(jobTaskBatch, - new LambdaUpdateWrapper() - .eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId()) - .in(JobTaskBatch::getTaskBatchStatus, JobTaskStatusEnum.NOT_COMPLETE) + new LambdaUpdateWrapper() + .eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId()) + .in(JobTaskBatch::getTaskBatchStatus, JobTaskStatusEnum.NOT_COMPLETE) ); } diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java index 61800c8d..748de144 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java @@ -26,10 +26,13 @@ import com.aizuda.easy.retry.server.web.service.JobService; import com.aizuda.easy.retry.server.web.service.convert.JobConverter; import com.aizuda.easy.retry.server.web.service.convert.JobResponseVOConverter; import com.aizuda.easy.retry.server.web.util.UserSessionUtils; +import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -47,16 +50,14 @@ import java.util.Optional; */ @Service @Slf4j +@RequiredArgsConstructor public class JobServiceImpl implements JobService { - @Autowired - private SystemProperties systemProperties; - @Autowired - private JobMapper jobMapper; - @Autowired - @Qualifier("terminalJobPrepareHandler") + private final SystemProperties systemProperties; + private final JobMapper jobMapper; @Lazy - private JobPrePareHandler jobPrePareHandler; + private final JobPrePareHandler terminalJobPrepareHandler; + private final AccessTemplate accessTemplate; @Override public PageResult> getJobPage(JobQueryVO queryVO) { @@ -108,7 +109,7 @@ public class JobServiceImpl implements JobService { public List getJobNameList(String keywords, Long jobId) { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .select(Job::getId, Job::getJobName); + .select(Job::getId, Job::getJobName); if (StrUtil.isNotBlank(keywords)) { queryWrapper.like(Job::getJobName, keywords.trim() + "%"); } @@ -128,7 +129,7 @@ public class JobServiceImpl implements JobService { // 判断常驻任务 Job job = updateJobResident(jobRequestVO); job.setBucketIndex(HashUtil.bkdrHash(jobRequestVO.getGroupName() + jobRequestVO.getJobName()) - % systemProperties.getBucketTotal()); + % systemProperties.getBucketTotal()); job.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, DateUtils.toNowMilli())); job.setNamespaceId(UserSessionUtils.currentUserSession().getNamespaceId()); return 1 == jobMapper.insert(job); @@ -149,18 +150,19 @@ public class JobServiceImpl implements JobService { if (Objects.equals(jobRequestVO.getTriggerType(), TriggerTypeEnum.WORKFLOW.getType())) { job.setNextTriggerAt(0L); // 非常驻任务 > 非常驻任务 - } else if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals(updateJob.getResident(), - StatusEnum.NO.getStatus())) { + } else if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals( + updateJob.getResident(), + StatusEnum.NO.getStatus())) { updateJob.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, DateUtils.toNowMilli())); } else if (Objects.equals(job.getResident(), StatusEnum.YES.getStatus()) && Objects.equals( - updateJob.getResident(), StatusEnum.NO.getStatus())) { + updateJob.getResident(), StatusEnum.NO.getStatus())) { // 常驻任务的触发时间 long time = Optional.ofNullable(ResidentTaskCache.get(jobRequestVO.getId())) - .orElse(DateUtils.toNowMilli()); + .orElse(DateUtils.toNowMilli()); updateJob.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, time)); // 老的是不是常驻任务 新的是常驻任务 需要使用当前时间计算下次触发时间 } else if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals( - updateJob.getResident(), StatusEnum.YES.getStatus())) { + updateJob.getResident(), StatusEnum.YES.getStatus())) { updateJob.setNextTriggerAt(DateUtils.toNowMilli()); } @@ -227,12 +229,19 @@ public class JobServiceImpl implements JobService { Job job = jobMapper.selectById(jobId); Assert.notNull(job, () -> new EasyRetryServerException("job can not be null.")); + long count = accessTemplate.getGroupConfigAccess().count(new LambdaQueryWrapper() + .eq(GroupConfig::getGroupName, job.getGroupName()) + .eq(GroupConfig::getNamespaceId, job.getNamespaceId()) + .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()) + ); + + Assert.isTrue(count > 0, () -> new EasyRetryServerException("组:[{}]已经关闭,不支持手动执行.", job.getGroupName())); JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job); // 设置now表示立即执行 jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli()); jobTaskPrepare.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_JOB.getType()); // 创建批次 - jobPrePareHandler.handler(jobTaskPrepare); + terminalJobPrepareHandler.handler(jobTaskPrepare); return Boolean.TRUE; } @@ -241,10 +250,10 @@ public class JobServiceImpl implements JobService { public List getJobList(String groupName) { String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); List jobs = jobMapper.selectList(new LambdaQueryWrapper() - .select(Job::getId, Job::getJobName) - .eq(Job::getNamespaceId, namespaceId) - .eq(Job::getGroupName, groupName) - .orderByDesc(Job::getCreateDt)); + .select(Job::getId, Job::getJobName) + .eq(Job::getNamespaceId, namespaceId) + .eq(Job::getGroupName, groupName) + .orderByDesc(Job::getCreateDt)); List jobResponseList = JobResponseVOConverter.INSTANCE.toJobResponseVOs(jobs); return jobResponseList; } diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/RetryTaskServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/RetryTaskServiceImpl.java index be708e95..1d13f730 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/RetryTaskServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/RetryTaskServiceImpl.java @@ -4,6 +4,7 @@ import cn.hutool.core.lang.Assert; import cn.hutool.core.util.StrUtil; import com.aizuda.easy.retry.client.model.GenerateRetryIdempotentIdDTO; import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; +import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.model.Result; import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.common.WaitStrategy; @@ -41,6 +42,7 @@ import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; import com.aizuda.easy.retry.template.datasource.access.TaskAccess; import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMessageMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLogMessage; @@ -349,10 +351,19 @@ public class RetryTaskServiceImpl implements RetryTaskService { @Override public boolean manualTriggerRetryTask(ManualTriggerTaskRequestVO requestVO) { + String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); + + long count = accessTemplate.getGroupConfigAccess().count(new LambdaQueryWrapper() + .eq(GroupConfig::getGroupName, requestVO.getGroupName()) + .eq(GroupConfig::getNamespaceId, namespaceId) + .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()) + ); + + Assert.isTrue(count > 0, () -> new EasyRetryServerException("组:[{}]已经关闭,不支持手动执行.", requestVO.getGroupName())); List uniqueIds = requestVO.getUniqueIds(); - String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); + List list = accessTemplate.getRetryTaskAccess().list( requestVO.getGroupName(), namespaceId, new LambdaQueryWrapper() @@ -377,6 +388,13 @@ public class RetryTaskServiceImpl implements RetryTaskService { List uniqueIds = requestVO.getUniqueIds(); String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); + long count = accessTemplate.getGroupConfigAccess().count(new LambdaQueryWrapper() + .eq(GroupConfig::getGroupName, requestVO.getGroupName()) + .eq(GroupConfig::getNamespaceId, namespaceId) + .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()) + ); + + Assert.isTrue(count > 0, () -> new EasyRetryServerException("组:[{}]已经关闭,不支持手动执行.", requestVO.getGroupName())); List list = accessTemplate.getRetryTaskAccess().list(requestVO.getGroupName(), namespaceId, new LambdaQueryWrapper() diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java index bcd8a07e..efab2083 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowServiceImpl.java @@ -31,9 +31,11 @@ import com.aizuda.easy.retry.server.web.service.WorkflowService; import com.aizuda.easy.retry.server.web.service.convert.WorkflowConverter; import com.aizuda.easy.retry.server.web.service.handler.WorkflowHandler; import com.aizuda.easy.retry.server.web.util.UserSessionUtils; +import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow; import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode; @@ -70,7 +72,7 @@ public class WorkflowServiceImpl implements WorkflowService { @Lazy private final WorkflowPrePareHandler terminalWorkflowPrepareHandler; private final JobMapper jobMapper; - + private final AccessTemplate accessTemplate; @Override @Transactional @@ -280,6 +282,14 @@ public class WorkflowServiceImpl implements WorkflowService { Workflow workflow = workflowMapper.selectById(id); Assert.notNull(workflow, () -> new EasyRetryServerException("workflow can not be null.")); + long count = accessTemplate.getGroupConfigAccess().count(new LambdaQueryWrapper() + .eq(GroupConfig::getGroupName, workflow.getGroupName()) + .eq(GroupConfig::getNamespaceId, workflow.getNamespaceId()) + .eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus()) + ); + + Assert.isTrue(count > 0, () -> new EasyRetryServerException("组:[{}]已经关闭,不支持手动执行.", workflow.getGroupName())); + WorkflowTaskPrepareDTO prepareDTO = WorkflowTaskConverter.INSTANCE.toWorkflowTaskPrepareDTO(workflow); // 设置now表示立即执行 prepareDTO.setNextTriggerAt(DateUtils.toNowMilli());