From 0b7a45ef210c09f1e2aa2f0ccd06d5ecf870e10c Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Sun, 25 Feb 2024 23:22:27 +0800 Subject: [PATCH] =?UTF-8?q?feat:=203.1.0=201.=20=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1=E5=A4=B1=E8=B4=A5=E6=89=8B?= =?UTF-8?q?=E5=8A=A8=E9=87=8D=E8=AF=95=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../job/task/support/JobTaskConverter.java | 17 ++- .../AbstractClientCallbackHandler.java | 4 +- .../BroadcastClientCallbackHandler.java | 46 +++++++ .../ShardingClientCallbackHandler.java | 5 - .../support/dispatch/JobExecutorActor.java | 75 ++++++----- .../executor/job/AbstractJobExecutor.java | 11 -- .../executor/job/RequestClientActor.java | 5 +- .../support/handler/JobTaskBatchHandler.java | 2 +- .../web/controller/JobBatchController.java | 7 + .../controller/WorkflowNodeController.java | 14 +- .../server/web/service/JobBatchService.java | 1 + .../web/service/WorkflowNodeService.java | 4 +- .../web/service/impl/JobBatchServiceImpl.java | 66 +++++++++- .../service/impl/WorkflowNodeServiceImpl.java | 120 +++++++++++++++++- frontend/src/api/jobApi.js | 7 + frontend/src/views/job/JobBatchList.vue | 27 +++- 16 files changed, 337 insertions(+), 74 deletions(-) diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java index 4adb999f..56829d35 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/JobTaskConverter.java @@ -53,7 +53,10 @@ public interface JobTaskConverter { JobTaskBatchGeneratorContext toJobTaskGeneratorContext(BlockStrategyContext context); - JobTaskGenerateContext toJobTaskInstanceGenerateContext(JobExecutorContext context); + @Mappings( + @Mapping(source = "id", target = "jobId") + ) + JobTaskGenerateContext toJobTaskInstanceGenerateContext(Job job); JobTask toJobTaskInstance(JobTaskGenerateContext context); @@ -74,16 +77,17 @@ public interface JobTaskConverter { JobLogMessage toJobLogMessage(LogTaskDTO logTaskDTO); - LogMetaDTO toJobLogDTO(ClientCallbackContext context); - - LogMetaDTO toJobLogDTO(JobExecutorResultDTO resultDTO); - LogMetaDTO toJobLogDTO(BaseDTO baseDTO); ClientCallbackContext toClientCallbackContext(DispatchJobResultRequest request); ClientCallbackContext toClientCallbackContext(RealJobExecutorDTO request); + @Mappings( + @Mapping(source = "id", target = "jobId") + ) + ClientCallbackContext toClientCallbackContext(Job job); + DispatchJobRequest toDispatchJobRequest(RealJobExecutorDTO realJobExecutorDTO); @Mappings({ @@ -98,6 +102,9 @@ public interface JobTaskConverter { }) RealJobExecutorDTO toRealJobExecutorDTO(JobExecutorContext context, JobTask jobTask); + @Mappings( + @Mapping(source = "id", target = "jobId") + ) JobExecutorContext toJobExecutorContext(Job job); JobExecutorResultDTO toJobExecutorResultDTO(ClientCallbackContext context); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/AbstractClientCallbackHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/AbstractClientCallbackHandler.java index 12aff79d..9f3069a7 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/AbstractClientCallbackHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/AbstractClientCallbackHandler.java @@ -30,9 +30,9 @@ import java.util.Objects; public abstract class AbstractClientCallbackHandler implements ClientCallbackHandler, InitializingBean { @Autowired - private JobTaskMapper jobTaskMapper; + protected JobTaskMapper jobTaskMapper; @Autowired - private JobMapper jobMapper; + protected JobMapper jobMapper; @Override @Transactional diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java index 176d7311..d753b1a3 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java @@ -1,13 +1,25 @@ package com.aizuda.easy.retry.server.job.task.support.callback; import akka.actor.ActorRef; +import cn.hutool.core.collection.CollUtil; import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; +import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; +import com.aizuda.easy.retry.server.common.util.ClientInfoUtils; import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + /** * @author: www.byteblogs.com * @date : 2023-10-07 10:24 @@ -36,4 +48,38 @@ public class BroadcastClientCallbackHandler extends AbstractClientCallbackHandle } + @Override + protected String chooseNewClient(ClientCallbackContext context) { + Set nodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()); + if (CollUtil.isEmpty(nodes)) { + log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId()); + return null; + } + + JobTask jobTask = context.getJobTask(); + String clientInfo = jobTask.getClientInfo(); + String clientId = ClientInfoUtils.clientId(clientInfo); + RegisterNodeInfo serverNode = CacheRegisterTable.getServerNode(context.getGroupName(), context.getNamespaceId(), clientId); + if (Objects.isNull(serverNode)) { + List jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper() + .eq(JobTask::getTaskBatchId, context.getTaskBatchId())); + + Set clientIdList = jobTasks.stream() + .map(jobTask1 -> ClientInfoUtils.clientId(jobTask1.getClientInfo())) + .collect(Collectors.toSet()); + Set remoteClientIdSet = nodes.stream().map(RegisterNodeInfo::getHostId).collect(Collectors.toSet()); + Sets.SetView diff = Sets.difference(remoteClientIdSet, clientIdList); + + String newClientId = CollUtil.getFirst(diff.stream().iterator()); + RegisterNodeInfo registerNodeInfo = CacheRegisterTable.getServerNode(context.getGroupName(), context.getNamespaceId(), newClientId); + if (Objects.isNull(registerNodeInfo)) { + // 如果找不到新的客户端信息,则返回原来的客户端信息 + return clientInfo; + } + + return ClientInfoUtils.generate(registerNodeInfo); + } + + return clientInfo; + } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java index dcc066c4..28c52e89 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java @@ -33,11 +33,6 @@ import java.util.Set; @Slf4j public class ShardingClientCallbackHandler extends AbstractClientCallbackHandler { - @Autowired - private JobTaskMapper jobTaskMapper; - @Autowired - private JobMapper jobMapper; - @Override public JobTaskTypeEnum getTaskInstanceType() { return JobTaskTypeEnum.SHARDING; diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java index ee4dd5c7..4c7a896e 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java @@ -26,6 +26,10 @@ import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache; import com.aizuda.easy.retry.server.job.task.support.event.JobTaskFailAlarmEvent; import com.aizuda.easy.retry.server.job.task.support.executor.job.JobExecutorContext; import com.aizuda.easy.retry.server.job.task.support.executor.job.JobExecutorFactory; +import com.aizuda.easy.retry.server.job.task.support.generator.task.JobTaskGenerateContext; +import com.aizuda.easy.retry.server.job.task.support.generator.task.JobTaskGenerator; +import com.aizuda.easy.retry.server.job.task.support.generator.task.JobTaskGeneratorFactory; +import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler; import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerWheel; import com.aizuda.easy.retry.server.job.task.support.timer.ResidentJobTimerTask; import com.aizuda.easy.retry.template.datasource.persistence.mapper.GroupConfigMapper; @@ -33,9 +37,12 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; @@ -48,6 +55,7 @@ import org.springframework.transaction.support.TransactionTemplate; import org.springframework.util.CollectionUtils; import java.time.LocalDateTime; +import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -59,15 +67,13 @@ import java.util.concurrent.TimeUnit; @Component(ActorGenerator.JOB_EXECUTOR_ACTOR) @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) @Slf4j +@RequiredArgsConstructor public class JobExecutorActor extends AbstractActor { - @Autowired - private JobMapper jobMapper; - @Autowired - private JobTaskBatchMapper jobTaskBatchMapper; - @Autowired - private TransactionTemplate transactionTemplate; - @Autowired - private GroupConfigMapper groupConfigMapper; + private final JobMapper jobMapper; + private final JobTaskBatchMapper jobTaskBatchMapper; + private final TransactionTemplate transactionTemplate; + private final GroupConfigMapper groupConfigMapper; + private final WorkflowBatchHandler workflowBatchHandler; @Override public Receive createReceive() { @@ -112,27 +118,13 @@ public class JobExecutorActor extends AbstractActor { job.getNamespaceId()))) { taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus(); operationReason = JobOperationReasonEnum.NOT_CLIENT.getReason(); - TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { - @Override - public void afterCompletion(int status) { - - if (Objects.nonNull(taskExecute.getWorkflowNodeId()) && Objects.nonNull(taskExecute.getWorkflowTaskBatchId())) { - // 若是工作流则开启下一个任务 - try { - WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); - taskExecuteDTO.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId()); - taskExecuteDTO.setTaskExecutorScene(taskExecute.getTaskExecutorScene()); - taskExecuteDTO.setParentId(taskExecute.getWorkflowNodeId()); - taskExecuteDTO.setTaskBatchId(taskExecute.getTaskBatchId()); - ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); - actorRef.tell(taskExecuteDTO, actorRef); - } catch (Exception e) { - log.error("任务调度执行失败", e); - } - } - } - }); + WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); + taskExecuteDTO.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId()); + taskExecuteDTO.setTaskExecutorScene(taskExecute.getTaskExecutorScene()); + taskExecuteDTO.setParentId(taskExecute.getWorkflowNodeId()); + taskExecuteDTO.setTaskBatchId(taskExecute.getTaskBatchId()); + workflowBatchHandler.openNextNode(taskExecuteDTO); } // 更新状态 @@ -143,14 +135,18 @@ public class JobExecutorActor extends AbstractActor { return; } + // 生成任务 + JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(job.getTaskType()); + JobTaskGenerateContext instanceGenerateContext = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job); + instanceGenerateContext.setTaskBatchId(taskExecute.getTaskBatchId()); + List taskList = taskInstance.generate(instanceGenerateContext); + if (CollectionUtils.isEmpty(taskList)) { + return; + } + // 执行任务 JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(job.getTaskType()); - JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job); - context.setTaskBatchId(taskExecute.getTaskBatchId()); - context.setJobId(job.getId()); - context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId()); - context.setWorkflowNodeId(taskExecute.getWorkflowNodeId()); - jobExecutor.execute(context); + jobExecutor.execute(buildJobExecutorContext(taskExecute, job, taskList)); } finally { log.info("准备执行任务完成.[{}]", JsonUtil.toJsonString(taskExecute)); TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { @@ -166,6 +162,17 @@ public class JobExecutorActor extends AbstractActor { } + @NotNull + private static JobExecutorContext buildJobExecutorContext(TaskExecuteDTO taskExecute, Job job, List taskList) { + JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job); + context.setTaskList(taskList); + context.setTaskBatchId(taskExecute.getTaskBatchId()); + context.setJobId(job.getId()); + context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId()); + context.setWorkflowNodeId(taskExecute.getWorkflowNodeId()); + return context; + } + private void handlerTaskBatch(TaskExecuteDTO taskExecute, int taskStatus, int operationReason) { JobTaskBatch jobTaskBatch = new JobTaskBatch(); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/job/AbstractJobExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/job/AbstractJobExecutor.java index 95ac47b7..bc5312bf 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/job/AbstractJobExecutor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/job/AbstractJobExecutor.java @@ -22,17 +22,6 @@ public abstract class AbstractJobExecutor implements JobExecutor, InitializingBe @Override @Transactional public void execute(JobExecutorContext context) { - - // 生成任务 - JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(getTaskInstanceType().getType()); - JobTaskGenerateContext instanceGenerateContext = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(context); - List taskList = taskInstance.generate(instanceGenerateContext); - if (CollectionUtils.isEmpty(taskList)) { - return; - } - - context.setTaskList(taskList); - doExecute(context); } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/job/RequestClientActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/job/RequestClientActor.java index ff4c2fea..79497cc0 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/job/RequestClientActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/job/RequestClientActor.java @@ -62,7 +62,10 @@ public class RequestClientActor extends AbstractActor { realJobExecutorDTO.getNamespaceId(), realJobExecutorDTO.getClientId()); if (Objects.isNull(registerNodeInfo)) { - taskExecuteFailure(realJobExecutorDTO, "无可执行的客户端"); + taskExecuteFailure(realJobExecutorDTO, "客户端不存在"); + LogMetaDTO logMetaDTO = JobTaskConverter.INSTANCE.toJobLogDTO(realJobExecutorDTO); + logMetaDTO.setTimestamp( DateUtils.toNowMilli()); + EasyRetryLog.REMOTE.error("taskId:[{}] 任务调度失败. 失败原因: 无可执行的客户端 <|>{}<|>", realJobExecutorDTO.getTaskId(), logMetaDTO); return; } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java index a569e25d..907e22e0 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java @@ -87,7 +87,7 @@ public class JobTaskBatchHandler { return 1 == jobTaskBatchMapper.update(jobTaskBatch, new LambdaUpdateWrapper() .eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId()) - .in(JobTaskBatch::getTaskBatchStatus, JobTaskStatusEnum.NOT_COMPLETE) + .in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE) ); } diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/controller/JobBatchController.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/controller/JobBatchController.java index 0d1af690..06bab0c8 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/controller/JobBatchController.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/controller/JobBatchController.java @@ -40,4 +40,11 @@ public class JobBatchController { return jobBatchService.stop(taskBatchId); } + + @PostMapping("/retry/{taskBatchId}") + @LoginRequired + public Boolean retry(@PathVariable("taskBatchId") Long taskBatchId) { + return jobBatchService.retry(taskBatchId); + } + } diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/controller/WorkflowNodeController.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/controller/WorkflowNodeController.java index 4f9ac484..44d649e0 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/controller/WorkflowNodeController.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/controller/WorkflowNodeController.java @@ -18,13 +18,15 @@ import org.springframework.web.bind.annotation.RestController; public class WorkflowNodeController { private final WorkflowNodeService workflowNodeService; - @PostMapping("/stop/{id}") - public Boolean stop(@PathVariable("id") Long id) { - return workflowNodeService.stop(id); + @PostMapping("/stop/{nodeId}/{workflowTaskBatchId}") + public Boolean stop(@PathVariable("nodeId") Long nodeId, + @PathVariable("workflowTaskBatchId") Long workflowTaskBatchId) { + return workflowNodeService.stop(nodeId, workflowTaskBatchId); } - @PostMapping("/retry/{id}") - public Boolean retry(@PathVariable("id") Long id) { - return workflowNodeService.retry(id); + @PostMapping("/retry/{nodeId}/{workflowTaskBatchId}") + public Boolean retry(@PathVariable("nodeId") Long nodeId, + @PathVariable("workflowTaskBatchId") Long workflowTaskBatchId) { + return workflowNodeService.retry(nodeId, workflowTaskBatchId); } } diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/JobBatchService.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/JobBatchService.java index 8e49f128..af606a32 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/JobBatchService.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/JobBatchService.java @@ -19,4 +19,5 @@ public interface JobBatchService { boolean stop(Long taskBatchId); + Boolean retry(Long taskBatchId); } diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/WorkflowNodeService.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/WorkflowNodeService.java index e62e3b7e..10ede0dd 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/WorkflowNodeService.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/WorkflowNodeService.java @@ -6,7 +6,7 @@ package com.aizuda.easy.retry.server.web.service; * @since 2.6.0 */ public interface WorkflowNodeService { - Boolean stop(Long id); + Boolean stop(Long nodeId, Long workflowTaskBatchId); - Boolean retry(Long id); + Boolean retry(Long id, Long workflowTaskBatchId); } diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobBatchServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobBatchServiceImpl.java index 858f201f..94d41a2b 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobBatchServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobBatchServiceImpl.java @@ -2,15 +2,25 @@ package com.aizuda.easy.retry.server.web.service.impl; import cn.hutool.core.lang.Assert; import cn.hutool.core.util.StrUtil; +import com.aizuda.easy.retry.client.model.ExecuteResult; import com.aizuda.easy.retry.common.core.constant.SystemConstants; import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; +import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.server.common.dto.CallbackConfig; import com.aizuda.easy.retry.server.common.dto.DecisionConfig; import com.aizuda.easy.retry.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO; +import com.aizuda.easy.retry.server.job.task.support.ClientCallbackHandler; +import com.aizuda.easy.retry.server.job.task.support.JobExecutor; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler; +import com.aizuda.easy.retry.server.job.task.support.callback.ClientCallbackContext; +import com.aizuda.easy.retry.server.job.task.support.callback.ClientCallbackFactory; +import com.aizuda.easy.retry.server.job.task.support.executor.job.JobExecutorContext; +import com.aizuda.easy.retry.server.job.task.support.executor.job.JobExecutorFactory; import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory; import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext; import com.aizuda.easy.retry.server.web.model.base.PageResult; @@ -24,14 +34,19 @@ import com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatch import com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchResponseDO; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import com.google.common.collect.Lists; import lombok.RequiredArgsConstructor; +import org.jetbrains.annotations.NotNull; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; import java.util.List; @@ -49,6 +64,7 @@ public class JobBatchServiceImpl implements JobBatchService { private final JobTaskBatchMapper jobTaskBatchMapper; private final JobMapper jobMapper; private final WorkflowNodeMapper workflowNodeMapper; + private final JobTaskMapper jobTaskMapper; @Override public PageResult> getJobBatchPage(final JobBatchQueryVO queryVO) { @@ -80,10 +96,10 @@ public class JobBatchServiceImpl implements JobBatchService { jobBatchQueryDO.setGroupNames(groupNames); jobBatchQueryDO.setNamespaceId(userSessionVO.getNamespaceId()); List batchResponseDOList = jobTaskBatchMapper.selectJobBatchPageList(pageDTO, - jobBatchQueryDO); + jobBatchQueryDO); List batchResponseVOList = JobBatchResponseVOConverter.INSTANCE.toJobBatchResponseVOs( - batchResponseDOList); + batchResponseDOList); return new PageResult<>(pageDTO, batchResponseVOList); } @@ -141,4 +157,50 @@ public class JobBatchServiceImpl implements JobBatchService { return Boolean.TRUE; } + @Override + @Transactional + public Boolean retry(Long taskBatchId) { + JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectOne(new LambdaQueryWrapper() + .eq(JobTaskBatch::getId, taskBatchId) + .in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_SUCCESS) + ); + Assert.notNull(jobTaskBatch, () -> new EasyRetryServerException("job batch can not be null.")); + + // 重置状态为运行中 + jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.RUNNING.getStatus()); + + Assert.isTrue(jobTaskBatchMapper.updateById(jobTaskBatch) > 0, + () -> new EasyRetryServerException("update job batch to running failed.")); + + Job job = jobMapper.selectById(jobTaskBatch.getJobId()); + Assert.notNull(job, () -> new EasyRetryServerException("job can not be null.")); + + List jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper() + .in(JobTask::getTaskStatus, Lists.newArrayList( + JobTaskStatusEnum.FAIL.getStatus(), + JobTaskStatusEnum.STOP.getStatus(), + JobTaskStatusEnum.CANCEL.getStatus() + ) + ) + .eq(JobTask::getTaskBatchId, taskBatchId)); + Assert.notEmpty(jobTasks, () -> new EasyRetryServerException("job task is empty.")); + + for (JobTask jobTask : jobTasks) { + jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); + Assert.isTrue(jobTaskMapper.updateById(jobTask) > 0, + () -> new EasyRetryServerException("update job task to running failed.")); + // 模拟失败重试 + ClientCallbackHandler clientCallback = ClientCallbackFactory.getClientCallback(job.getTaskType()); + ClientCallbackContext context = JobTaskConverter.INSTANCE.toClientCallbackContext(job); + context.setTaskBatchId(jobTaskBatch.getId()); + context.setTaskId(jobTask.getId()); + context.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus()); + context.setExecuteResult(ExecuteResult.failure(null, "手动重试")); + clientCallback.callback(context); + } + + return Boolean.TRUE; + } + + } diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowNodeServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowNodeServiceImpl.java index d94cdedb..c4e4e85f 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowNodeServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowNodeServiceImpl.java @@ -1,7 +1,37 @@ package com.aizuda.easy.retry.server.web.service.impl; +import cn.hutool.core.lang.Assert; +import com.aizuda.easy.retry.client.model.ExecuteResult; +import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; +import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; +import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum; +import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO; +import com.aizuda.easy.retry.server.job.task.support.ClientCallbackHandler; +import com.aizuda.easy.retry.server.job.task.support.JobExecutor; +import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; +import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler; +import com.aizuda.easy.retry.server.job.task.support.callback.ClientCallbackContext; +import com.aizuda.easy.retry.server.job.task.support.callback.ClientCallbackFactory; +import com.aizuda.easy.retry.server.job.task.support.executor.job.JobExecutorContext; +import com.aizuda.easy.retry.server.job.task.support.executor.job.JobExecutorFactory; +import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler; +import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory; +import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext; import com.aizuda.easy.retry.server.web.service.WorkflowNodeService; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.Job; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import java.util.List; /** * @author xiaowoniu @@ -9,17 +39,99 @@ import org.springframework.stereotype.Service; * @since 2.6.0 */ @Service +@RequiredArgsConstructor public class WorkflowNodeServiceImpl implements WorkflowNodeService { + private final JobTaskBatchMapper jobTaskBatchMapper; + private final JobMapper jobMapper; + private final WorkflowBatchHandler workflowBatchHandler; + private final JobTaskMapper jobTaskMapper; @Override - public Boolean stop(Long id) { + public Boolean stop(Long nodeId, Long workflowTaskBatchId) { // 调用JOB的停止接口 + List jobTaskBatches = jobTaskBatchMapper.selectList( + new LambdaQueryWrapper() + .eq(JobTaskBatch::getWorkflowNodeId, nodeId) + .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId) + .in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE) + ); + + if (CollectionUtils.isEmpty(jobTaskBatches)) { + return Boolean.TRUE; + } + + for (JobTaskBatch jobTaskBatch : jobTaskBatches) { + + Job job = jobMapper.selectById(jobTaskBatch.getJobId()); + Assert.notNull(job, () -> new EasyRetryServerException("job can not be null.")); + + JobTaskStopHandler jobTaskStop = JobTaskStopFactory.getJobTaskStop(job.getTaskType()); + + TaskStopJobContext taskStopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(job); + taskStopJobContext.setJobOperationReason(JobOperationReasonEnum.MANNER_STOP.getReason()); + taskStopJobContext.setTaskBatchId(jobTaskBatch.getId()); + taskStopJobContext.setForceStop(Boolean.TRUE); + taskStopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE); + + jobTaskStop.stop(taskStopJobContext); + } + // 继续执行后续的任务 - return null; + WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); + taskExecuteDTO.setWorkflowTaskBatchId(workflowTaskBatchId); + taskExecuteDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType()); + taskExecuteDTO.setParentId(nodeId); + + workflowBatchHandler.openNextNode(taskExecuteDTO); + + return Boolean.TRUE; } @Override - public Boolean retry(Long id) { - return null; + public Boolean retry(Long nodeId, Long workflowTaskBatchId) { + + // 调用JOB的停止接口 + List jobTaskBatches = jobTaskBatchMapper.selectList( + new LambdaQueryWrapper() + .select(JobTaskBatch::getId) + .eq(JobTaskBatch::getWorkflowNodeId, nodeId) + .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId) + .in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_SUCCESS) + ); + Assert.notEmpty(jobTaskBatches, () -> new EasyRetryServerException("job task batch is empty.")); + + for (JobTaskBatch jobTaskBatch : jobTaskBatches) { + + Job job = jobMapper.selectById(jobTaskBatch.getJobId()); + Assert.notNull(job, () -> new EasyRetryServerException("job can not be null.")); + + List jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper() + .select(JobTask::getId) + .eq(JobTask::getTaskBatchId, jobTaskBatch.getId())); + Assert.notEmpty(jobTasks, () -> new EasyRetryServerException("job task is empty.")); + + for (JobTask jobTask : jobTasks) { + // 模拟失败重试 + ClientCallbackHandler clientCallback = ClientCallbackFactory.getClientCallback(job.getTaskType()); + ClientCallbackContext context = JobTaskConverter.INSTANCE.toClientCallbackContext(job); + context.setTaskBatchId(jobTaskBatch.getId()); + context.setTaskId(jobTask.getId()); + context.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus()); + context.setExecuteResult(ExecuteResult.failure(null, "手动重试")); + clientCallback.callback(context); + } + +// JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(job.getTaskType()); +// +// JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job); +// context.setTaskList(jobTaRFGVTBCD67YFGVTBUE8SDsks); +// context.setTaskBatchId(jobTaskBatch.getId()); +// context.setWorkflowTaskBatchId(workflowTaskBatchId); +// context.setWorkflowNodeId(nodeId); +// jobExecutor.execute(context); + + } + + return Boolean.TRUE; } } diff --git a/frontend/src/api/jobApi.js b/frontend/src/api/jobApi.js index 8d2197ce..f8148985 100644 --- a/frontend/src/api/jobApi.js +++ b/frontend/src/api/jobApi.js @@ -16,6 +16,7 @@ const jobApi = { jobBatchList: '/job/batch/list', jobBatchDetail: '/job/batch/', stop: '/job/batch/stop/', + retry: '/job/batch/retry/', // 通知 jobNotifyConfigPageList: '/job/notify/config/page/list', @@ -45,6 +46,12 @@ const jobApi = { export default jobApi +export function retry (id) { + return request({ + url: jobApi.retry + id, + method: 'post' + }) +} export function workflowNameList (parameter) { return request({ url: jobApi.workflowNameList, diff --git a/frontend/src/views/job/JobBatchList.vue b/frontend/src/views/job/JobBatchList.vue index 35346208..f6f2623e 100644 --- a/frontend/src/views/job/JobBatchList.vue +++ b/frontend/src/views/job/JobBatchList.vue @@ -100,6 +100,20 @@ + + + { + const { status } = res + if (status === 0) { + this.$message.error('重试失败') + } else { + this.$refs.table.refresh(true) + this.$message.success('重试成功') + } + }) + }, refreshTable (v) { this.$refs.table.refresh(true) },