diff --git a/easy-retry-client/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/intercepter/EasyRetryInterceptor.java b/easy-retry-client/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/intercepter/EasyRetryInterceptor.java index d137b447..60a26f1c 100644 --- a/easy-retry-client/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/intercepter/EasyRetryInterceptor.java +++ b/easy-retry-client/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/intercepter/EasyRetryInterceptor.java @@ -80,7 +80,7 @@ public class EasyRetryInterceptor implements MethodInterceptor, AfterAdvice, Ser } else if (!RetrySiteSnapshot.existedMethodEntrance()) { RetrySiteSnapshot.setMethodEntrance(methodEntrance); } else { - EasyRetryLog.LOCAL.info("无需设置入口标志:[{}]", traceId); + EasyRetryLog.LOCAL.debug("No need to set entrance signs:[{}]", traceId); } Throwable throwable = null; diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/handler/JobHandler.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/handler/JobHandler.java new file mode 100644 index 00000000..2fbb4433 --- /dev/null +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/handler/JobHandler.java @@ -0,0 +1,112 @@ +package com.aizuda.easy.retry.server.web.service.handler; + +import cn.hutool.core.lang.Assert; +import com.aizuda.easy.retry.client.model.ExecuteResult; +import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum; +import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; +import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; +import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; +import com.aizuda.easy.retry.server.job.task.support.ClientCallbackHandler; +import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; +import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler; +import com.aizuda.easy.retry.server.job.task.support.callback.ClientCallbackContext; +import com.aizuda.easy.retry.server.job.task.support.callback.ClientCallbackFactory; +import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory; +import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.Job; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.google.common.collect.Lists; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @author: xiaowoniu + * @date : 2024-02-26 + * @since : 3.1.0 + */ +@Component +@RequiredArgsConstructor +public class JobHandler { + + private final JobTaskBatchMapper jobTaskBatchMapper; + private final JobMapper jobMapper; + private final JobTaskMapper jobTaskMapper; + + public Boolean retry(Long taskBatchId) { + return retry(taskBatchId, null, null); + } + public Boolean retry (Long taskBatchId, Long workflowNodeId, Long workflowTaskBatchId) { + JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectOne(new LambdaQueryWrapper() + .eq(JobTaskBatch::getId, taskBatchId) + .in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_SUCCESS) + ); + Assert.notNull(jobTaskBatch, () -> new EasyRetryServerException("job batch can not be null.")); + + // 重置状态为运行中 + jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.RUNNING.getStatus()); + + Assert.isTrue(jobTaskBatchMapper.updateById(jobTaskBatch) > 0, + () -> new EasyRetryServerException("update job batch to running failed.")); + + Job job = jobMapper.selectById(jobTaskBatch.getJobId()); + Assert.notNull(job, () -> new EasyRetryServerException("job can not be null.")); + + List jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper() + .in(JobTask::getTaskStatus, Lists.newArrayList( + JobTaskStatusEnum.FAIL.getStatus(), + JobTaskStatusEnum.STOP.getStatus(), + JobTaskStatusEnum.CANCEL.getStatus() + ) + ) + .eq(JobTask::getTaskBatchId, taskBatchId)); + Assert.notEmpty(jobTasks, () -> new EasyRetryServerException("job task is empty.")); + + for (JobTask jobTask : jobTasks) { + jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); + Assert.isTrue(jobTaskMapper.updateById(jobTask) > 0, + () -> new EasyRetryServerException("update job task to running failed.")); + // 模拟失败重试 + ClientCallbackHandler clientCallback = ClientCallbackFactory.getClientCallback(job.getTaskType()); + ClientCallbackContext context = JobTaskConverter.INSTANCE.toClientCallbackContext(job); + context.setTaskBatchId(jobTaskBatch.getId()); + context.setWorkflowNodeId(workflowNodeId); + context.setWorkflowTaskBatchId(workflowTaskBatchId); + context.setTaskId(jobTask.getId()); + context.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus()); + context.setExecuteResult(ExecuteResult.failure(null, "手动重试")); + clientCallback.callback(context); + } + + return Boolean.TRUE; + } + + public Boolean stop (Long taskBatchId) { + + JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectById(taskBatchId); + Assert.notNull(jobTaskBatch, () -> new EasyRetryServerException("job batch can not be null.")); + + Job job = jobMapper.selectById(jobTaskBatch.getJobId()); + Assert.notNull(job, () -> new EasyRetryServerException("job can not be null.")); + + JobTaskStopHandler jobTaskStop = JobTaskStopFactory.getJobTaskStop(job.getTaskType()); + + TaskStopJobContext taskStopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(job); + taskStopJobContext.setJobOperationReason(JobOperationReasonEnum.MANNER_STOP.getReason()); + taskStopJobContext.setTaskBatchId(jobTaskBatch.getId()); + taskStopJobContext.setForceStop(Boolean.TRUE); + taskStopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE); + + jobTaskStop.stop(taskStopJobContext); + + return Boolean.TRUE; + } + +} diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobBatchServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobBatchServiceImpl.java index 94d41a2b..8c4e27a9 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobBatchServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobBatchServiceImpl.java @@ -29,6 +29,7 @@ import com.aizuda.easy.retry.server.web.model.request.UserSessionVO; import com.aizuda.easy.retry.server.web.model.response.JobBatchResponseVO; import com.aizuda.easy.retry.server.web.service.JobBatchService; import com.aizuda.easy.retry.server.web.service.convert.JobBatchResponseVOConverter; +import com.aizuda.easy.retry.server.web.service.handler.JobHandler; import com.aizuda.easy.retry.server.web.util.UserSessionUtils; import com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchQueryDO; import com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchResponseDO; @@ -64,7 +65,7 @@ public class JobBatchServiceImpl implements JobBatchService { private final JobTaskBatchMapper jobTaskBatchMapper; private final JobMapper jobMapper; private final WorkflowNodeMapper workflowNodeMapper; - private final JobTaskMapper jobTaskMapper; + private final JobHandler jobHandler; @Override public PageResult> getJobBatchPage(final JobBatchQueryVO queryVO) { @@ -138,68 +139,13 @@ public class JobBatchServiceImpl implements JobBatchService { @Override public boolean stop(Long taskBatchId) { - JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectById(taskBatchId); - Assert.notNull(jobTaskBatch, () -> new EasyRetryServerException("job batch can not be null.")); - - Job job = jobMapper.selectById(jobTaskBatch.getJobId()); - Assert.notNull(job, () -> new EasyRetryServerException("job can not be null.")); - - JobTaskStopHandler jobTaskStop = JobTaskStopFactory.getJobTaskStop(job.getTaskType()); - - TaskStopJobContext taskStopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(job); - taskStopJobContext.setJobOperationReason(JobOperationReasonEnum.MANNER_STOP.getReason()); - taskStopJobContext.setTaskBatchId(jobTaskBatch.getId()); - taskStopJobContext.setForceStop(Boolean.TRUE); - taskStopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE); - - jobTaskStop.stop(taskStopJobContext); - - return Boolean.TRUE; + return jobHandler.stop(taskBatchId); } @Override @Transactional public Boolean retry(Long taskBatchId) { - JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectOne(new LambdaQueryWrapper() - .eq(JobTaskBatch::getId, taskBatchId) - .in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_SUCCESS) - ); - Assert.notNull(jobTaskBatch, () -> new EasyRetryServerException("job batch can not be null.")); - - // 重置状态为运行中 - jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.RUNNING.getStatus()); - - Assert.isTrue(jobTaskBatchMapper.updateById(jobTaskBatch) > 0, - () -> new EasyRetryServerException("update job batch to running failed.")); - - Job job = jobMapper.selectById(jobTaskBatch.getJobId()); - Assert.notNull(job, () -> new EasyRetryServerException("job can not be null.")); - - List jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper() - .in(JobTask::getTaskStatus, Lists.newArrayList( - JobTaskStatusEnum.FAIL.getStatus(), - JobTaskStatusEnum.STOP.getStatus(), - JobTaskStatusEnum.CANCEL.getStatus() - ) - ) - .eq(JobTask::getTaskBatchId, taskBatchId)); - Assert.notEmpty(jobTasks, () -> new EasyRetryServerException("job task is empty.")); - - for (JobTask jobTask : jobTasks) { - jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus()); - Assert.isTrue(jobTaskMapper.updateById(jobTask) > 0, - () -> new EasyRetryServerException("update job task to running failed.")); - // 模拟失败重试 - ClientCallbackHandler clientCallback = ClientCallbackFactory.getClientCallback(job.getTaskType()); - ClientCallbackContext context = JobTaskConverter.INSTANCE.toClientCallbackContext(job); - context.setTaskBatchId(jobTaskBatch.getId()); - context.setTaskId(jobTask.getId()); - context.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus()); - context.setExecuteResult(ExecuteResult.failure(null, "手动重试")); - clientCallback.callback(context); - } - - return Boolean.TRUE; + return jobHandler.retry(taskBatchId); } diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowNodeServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowNodeServiceImpl.java index c4e4e85f..9894f47b 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowNodeServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/WorkflowNodeServiceImpl.java @@ -20,6 +20,7 @@ import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandle import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory; import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext; import com.aizuda.easy.retry.server.web.service.WorkflowNodeService; +import com.aizuda.easy.retry.server.web.service.handler.JobHandler; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; @@ -41,19 +42,20 @@ import java.util.List; @Service @RequiredArgsConstructor public class WorkflowNodeServiceImpl implements WorkflowNodeService { + private final JobTaskBatchMapper jobTaskBatchMapper; private final JobMapper jobMapper; private final WorkflowBatchHandler workflowBatchHandler; - private final JobTaskMapper jobTaskMapper; + private final JobHandler jobHandler; @Override public Boolean stop(Long nodeId, Long workflowTaskBatchId) { // 调用JOB的停止接口 List jobTaskBatches = jobTaskBatchMapper.selectList( - new LambdaQueryWrapper() - .eq(JobTaskBatch::getWorkflowNodeId, nodeId) - .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId) - .in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE) + new LambdaQueryWrapper() + .eq(JobTaskBatch::getWorkflowNodeId, nodeId) + .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId) + .in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE) ); if (CollectionUtils.isEmpty(jobTaskBatches)) { @@ -61,19 +63,7 @@ public class WorkflowNodeServiceImpl implements WorkflowNodeService { } for (JobTaskBatch jobTaskBatch : jobTaskBatches) { - - Job job = jobMapper.selectById(jobTaskBatch.getJobId()); - Assert.notNull(job, () -> new EasyRetryServerException("job can not be null.")); - - JobTaskStopHandler jobTaskStop = JobTaskStopFactory.getJobTaskStop(job.getTaskType()); - - TaskStopJobContext taskStopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(job); - taskStopJobContext.setJobOperationReason(JobOperationReasonEnum.MANNER_STOP.getReason()); - taskStopJobContext.setTaskBatchId(jobTaskBatch.getId()); - taskStopJobContext.setForceStop(Boolean.TRUE); - taskStopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE); - - jobTaskStop.stop(taskStopJobContext); + jobHandler.stop(jobTaskBatch.getId()); } // 继续执行后续的任务 @@ -92,44 +82,17 @@ public class WorkflowNodeServiceImpl implements WorkflowNodeService { // 调用JOB的停止接口 List jobTaskBatches = jobTaskBatchMapper.selectList( - new LambdaQueryWrapper() - .select(JobTaskBatch::getId) - .eq(JobTaskBatch::getWorkflowNodeId, nodeId) - .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId) - .in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_SUCCESS) + new LambdaQueryWrapper() + .select(JobTaskBatch::getId) + .eq(JobTaskBatch::getWorkflowNodeId, nodeId) + .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId) + .in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_SUCCESS) ); + Assert.notEmpty(jobTaskBatches, () -> new EasyRetryServerException("job task batch is empty.")); for (JobTaskBatch jobTaskBatch : jobTaskBatches) { - - Job job = jobMapper.selectById(jobTaskBatch.getJobId()); - Assert.notNull(job, () -> new EasyRetryServerException("job can not be null.")); - - List jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper() - .select(JobTask::getId) - .eq(JobTask::getTaskBatchId, jobTaskBatch.getId())); - Assert.notEmpty(jobTasks, () -> new EasyRetryServerException("job task is empty.")); - - for (JobTask jobTask : jobTasks) { - // 模拟失败重试 - ClientCallbackHandler clientCallback = ClientCallbackFactory.getClientCallback(job.getTaskType()); - ClientCallbackContext context = JobTaskConverter.INSTANCE.toClientCallbackContext(job); - context.setTaskBatchId(jobTaskBatch.getId()); - context.setTaskId(jobTask.getId()); - context.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus()); - context.setExecuteResult(ExecuteResult.failure(null, "手动重试")); - clientCallback.callback(context); - } - -// JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(job.getTaskType()); -// -// JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job); -// context.setTaskList(jobTaRFGVTBCD67YFGVTBUE8SDsks); -// context.setTaskBatchId(jobTaskBatch.getId()); -// context.setWorkflowTaskBatchId(workflowTaskBatchId); -// context.setWorkflowNodeId(nodeId); -// jobExecutor.execute(context); - + jobHandler.retry(jobTaskBatch.getId(), nodeId, workflowTaskBatchId); } return Boolean.TRUE;