feat: 3.1.0
1. 优化代码
This commit is contained in:
parent
0b7a45ef21
commit
b5c93f991e
@ -80,7 +80,7 @@ public class EasyRetryInterceptor implements MethodInterceptor, AfterAdvice, Ser
|
||||
} else if (!RetrySiteSnapshot.existedMethodEntrance()) {
|
||||
RetrySiteSnapshot.setMethodEntrance(methodEntrance);
|
||||
} else {
|
||||
EasyRetryLog.LOCAL.info("无需设置入口标志:[{}]", traceId);
|
||||
EasyRetryLog.LOCAL.debug("No need to set entrance signs:[{}]", traceId);
|
||||
}
|
||||
|
||||
Throwable throwable = null;
|
||||
|
@ -0,0 +1,112 @@
|
||||
package com.aizuda.easy.retry.server.web.service.handler;
|
||||
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import com.aizuda.easy.retry.client.model.ExecuteResult;
|
||||
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
|
||||
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
|
||||
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
|
||||
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
|
||||
import com.aizuda.easy.retry.server.job.task.support.ClientCallbackHandler;
|
||||
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
|
||||
import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler;
|
||||
import com.aizuda.easy.retry.server.job.task.support.callback.ClientCallbackContext;
|
||||
import com.aizuda.easy.retry.server.job.task.support.callback.ClientCallbackFactory;
|
||||
import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory;
|
||||
import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author: xiaowoniu
|
||||
* @date : 2024-02-26
|
||||
* @since : 3.1.0
|
||||
*/
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class JobHandler {
|
||||
|
||||
private final JobTaskBatchMapper jobTaskBatchMapper;
|
||||
private final JobMapper jobMapper;
|
||||
private final JobTaskMapper jobTaskMapper;
|
||||
|
||||
public Boolean retry(Long taskBatchId) {
|
||||
return retry(taskBatchId, null, null);
|
||||
}
|
||||
public Boolean retry (Long taskBatchId, Long workflowNodeId, Long workflowTaskBatchId) {
|
||||
JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectOne(new LambdaQueryWrapper<JobTaskBatch>()
|
||||
.eq(JobTaskBatch::getId, taskBatchId)
|
||||
.in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_SUCCESS)
|
||||
);
|
||||
Assert.notNull(jobTaskBatch, () -> new EasyRetryServerException("job batch can not be null."));
|
||||
|
||||
// 重置状态为运行中
|
||||
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.RUNNING.getStatus());
|
||||
|
||||
Assert.isTrue(jobTaskBatchMapper.updateById(jobTaskBatch) > 0,
|
||||
() -> new EasyRetryServerException("update job batch to running failed."));
|
||||
|
||||
Job job = jobMapper.selectById(jobTaskBatch.getJobId());
|
||||
Assert.notNull(job, () -> new EasyRetryServerException("job can not be null."));
|
||||
|
||||
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
|
||||
.in(JobTask::getTaskStatus, Lists.newArrayList(
|
||||
JobTaskStatusEnum.FAIL.getStatus(),
|
||||
JobTaskStatusEnum.STOP.getStatus(),
|
||||
JobTaskStatusEnum.CANCEL.getStatus()
|
||||
)
|
||||
)
|
||||
.eq(JobTask::getTaskBatchId, taskBatchId));
|
||||
Assert.notEmpty(jobTasks, () -> new EasyRetryServerException("job task is empty."));
|
||||
|
||||
for (JobTask jobTask : jobTasks) {
|
||||
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
|
||||
Assert.isTrue(jobTaskMapper.updateById(jobTask) > 0,
|
||||
() -> new EasyRetryServerException("update job task to running failed."));
|
||||
// 模拟失败重试
|
||||
ClientCallbackHandler clientCallback = ClientCallbackFactory.getClientCallback(job.getTaskType());
|
||||
ClientCallbackContext context = JobTaskConverter.INSTANCE.toClientCallbackContext(job);
|
||||
context.setTaskBatchId(jobTaskBatch.getId());
|
||||
context.setWorkflowNodeId(workflowNodeId);
|
||||
context.setWorkflowTaskBatchId(workflowTaskBatchId);
|
||||
context.setTaskId(jobTask.getId());
|
||||
context.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus());
|
||||
context.setExecuteResult(ExecuteResult.failure(null, "手动重试"));
|
||||
clientCallback.callback(context);
|
||||
}
|
||||
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
|
||||
public Boolean stop (Long taskBatchId) {
|
||||
|
||||
JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectById(taskBatchId);
|
||||
Assert.notNull(jobTaskBatch, () -> new EasyRetryServerException("job batch can not be null."));
|
||||
|
||||
Job job = jobMapper.selectById(jobTaskBatch.getJobId());
|
||||
Assert.notNull(job, () -> new EasyRetryServerException("job can not be null."));
|
||||
|
||||
JobTaskStopHandler jobTaskStop = JobTaskStopFactory.getJobTaskStop(job.getTaskType());
|
||||
|
||||
TaskStopJobContext taskStopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(job);
|
||||
taskStopJobContext.setJobOperationReason(JobOperationReasonEnum.MANNER_STOP.getReason());
|
||||
taskStopJobContext.setTaskBatchId(jobTaskBatch.getId());
|
||||
taskStopJobContext.setForceStop(Boolean.TRUE);
|
||||
taskStopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE);
|
||||
|
||||
jobTaskStop.stop(taskStopJobContext);
|
||||
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
|
||||
}
|
@ -29,6 +29,7 @@ import com.aizuda.easy.retry.server.web.model.request.UserSessionVO;
|
||||
import com.aizuda.easy.retry.server.web.model.response.JobBatchResponseVO;
|
||||
import com.aizuda.easy.retry.server.web.service.JobBatchService;
|
||||
import com.aizuda.easy.retry.server.web.service.convert.JobBatchResponseVOConverter;
|
||||
import com.aizuda.easy.retry.server.web.service.handler.JobHandler;
|
||||
import com.aizuda.easy.retry.server.web.util.UserSessionUtils;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchQueryDO;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchResponseDO;
|
||||
@ -64,7 +65,7 @@ public class JobBatchServiceImpl implements JobBatchService {
|
||||
private final JobTaskBatchMapper jobTaskBatchMapper;
|
||||
private final JobMapper jobMapper;
|
||||
private final WorkflowNodeMapper workflowNodeMapper;
|
||||
private final JobTaskMapper jobTaskMapper;
|
||||
private final JobHandler jobHandler;
|
||||
|
||||
@Override
|
||||
public PageResult<List<JobBatchResponseVO>> getJobBatchPage(final JobBatchQueryVO queryVO) {
|
||||
@ -138,68 +139,13 @@ public class JobBatchServiceImpl implements JobBatchService {
|
||||
|
||||
@Override
|
||||
public boolean stop(Long taskBatchId) {
|
||||
JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectById(taskBatchId);
|
||||
Assert.notNull(jobTaskBatch, () -> new EasyRetryServerException("job batch can not be null."));
|
||||
|
||||
Job job = jobMapper.selectById(jobTaskBatch.getJobId());
|
||||
Assert.notNull(job, () -> new EasyRetryServerException("job can not be null."));
|
||||
|
||||
JobTaskStopHandler jobTaskStop = JobTaskStopFactory.getJobTaskStop(job.getTaskType());
|
||||
|
||||
TaskStopJobContext taskStopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(job);
|
||||
taskStopJobContext.setJobOperationReason(JobOperationReasonEnum.MANNER_STOP.getReason());
|
||||
taskStopJobContext.setTaskBatchId(jobTaskBatch.getId());
|
||||
taskStopJobContext.setForceStop(Boolean.TRUE);
|
||||
taskStopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE);
|
||||
|
||||
jobTaskStop.stop(taskStopJobContext);
|
||||
|
||||
return Boolean.TRUE;
|
||||
return jobHandler.stop(taskBatchId);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public Boolean retry(Long taskBatchId) {
|
||||
JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectOne(new LambdaQueryWrapper<JobTaskBatch>()
|
||||
.eq(JobTaskBatch::getId, taskBatchId)
|
||||
.in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_SUCCESS)
|
||||
);
|
||||
Assert.notNull(jobTaskBatch, () -> new EasyRetryServerException("job batch can not be null."));
|
||||
|
||||
// 重置状态为运行中
|
||||
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.RUNNING.getStatus());
|
||||
|
||||
Assert.isTrue(jobTaskBatchMapper.updateById(jobTaskBatch) > 0,
|
||||
() -> new EasyRetryServerException("update job batch to running failed."));
|
||||
|
||||
Job job = jobMapper.selectById(jobTaskBatch.getJobId());
|
||||
Assert.notNull(job, () -> new EasyRetryServerException("job can not be null."));
|
||||
|
||||
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
|
||||
.in(JobTask::getTaskStatus, Lists.newArrayList(
|
||||
JobTaskStatusEnum.FAIL.getStatus(),
|
||||
JobTaskStatusEnum.STOP.getStatus(),
|
||||
JobTaskStatusEnum.CANCEL.getStatus()
|
||||
)
|
||||
)
|
||||
.eq(JobTask::getTaskBatchId, taskBatchId));
|
||||
Assert.notEmpty(jobTasks, () -> new EasyRetryServerException("job task is empty."));
|
||||
|
||||
for (JobTask jobTask : jobTasks) {
|
||||
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
|
||||
Assert.isTrue(jobTaskMapper.updateById(jobTask) > 0,
|
||||
() -> new EasyRetryServerException("update job task to running failed."));
|
||||
// 模拟失败重试
|
||||
ClientCallbackHandler clientCallback = ClientCallbackFactory.getClientCallback(job.getTaskType());
|
||||
ClientCallbackContext context = JobTaskConverter.INSTANCE.toClientCallbackContext(job);
|
||||
context.setTaskBatchId(jobTaskBatch.getId());
|
||||
context.setTaskId(jobTask.getId());
|
||||
context.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus());
|
||||
context.setExecuteResult(ExecuteResult.failure(null, "手动重试"));
|
||||
clientCallback.callback(context);
|
||||
}
|
||||
|
||||
return Boolean.TRUE;
|
||||
return jobHandler.retry(taskBatchId);
|
||||
}
|
||||
|
||||
|
||||
|
@ -20,6 +20,7 @@ import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandle
|
||||
import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory;
|
||||
import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext;
|
||||
import com.aizuda.easy.retry.server.web.service.WorkflowNodeService;
|
||||
import com.aizuda.easy.retry.server.web.service.handler.JobHandler;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
|
||||
@ -41,19 +42,20 @@ import java.util.List;
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class WorkflowNodeServiceImpl implements WorkflowNodeService {
|
||||
|
||||
private final JobTaskBatchMapper jobTaskBatchMapper;
|
||||
private final JobMapper jobMapper;
|
||||
private final WorkflowBatchHandler workflowBatchHandler;
|
||||
private final JobTaskMapper jobTaskMapper;
|
||||
private final JobHandler jobHandler;
|
||||
|
||||
@Override
|
||||
public Boolean stop(Long nodeId, Long workflowTaskBatchId) {
|
||||
// 调用JOB的停止接口
|
||||
List<JobTaskBatch> jobTaskBatches = jobTaskBatchMapper.selectList(
|
||||
new LambdaQueryWrapper<JobTaskBatch>()
|
||||
.eq(JobTaskBatch::getWorkflowNodeId, nodeId)
|
||||
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId)
|
||||
.in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE)
|
||||
new LambdaQueryWrapper<JobTaskBatch>()
|
||||
.eq(JobTaskBatch::getWorkflowNodeId, nodeId)
|
||||
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId)
|
||||
.in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE)
|
||||
);
|
||||
|
||||
if (CollectionUtils.isEmpty(jobTaskBatches)) {
|
||||
@ -61,19 +63,7 @@ public class WorkflowNodeServiceImpl implements WorkflowNodeService {
|
||||
}
|
||||
|
||||
for (JobTaskBatch jobTaskBatch : jobTaskBatches) {
|
||||
|
||||
Job job = jobMapper.selectById(jobTaskBatch.getJobId());
|
||||
Assert.notNull(job, () -> new EasyRetryServerException("job can not be null."));
|
||||
|
||||
JobTaskStopHandler jobTaskStop = JobTaskStopFactory.getJobTaskStop(job.getTaskType());
|
||||
|
||||
TaskStopJobContext taskStopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(job);
|
||||
taskStopJobContext.setJobOperationReason(JobOperationReasonEnum.MANNER_STOP.getReason());
|
||||
taskStopJobContext.setTaskBatchId(jobTaskBatch.getId());
|
||||
taskStopJobContext.setForceStop(Boolean.TRUE);
|
||||
taskStopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE);
|
||||
|
||||
jobTaskStop.stop(taskStopJobContext);
|
||||
jobHandler.stop(jobTaskBatch.getId());
|
||||
}
|
||||
|
||||
// 继续执行后续的任务
|
||||
@ -92,44 +82,17 @@ public class WorkflowNodeServiceImpl implements WorkflowNodeService {
|
||||
|
||||
// 调用JOB的停止接口
|
||||
List<JobTaskBatch> jobTaskBatches = jobTaskBatchMapper.selectList(
|
||||
new LambdaQueryWrapper<JobTaskBatch>()
|
||||
.select(JobTaskBatch::getId)
|
||||
.eq(JobTaskBatch::getWorkflowNodeId, nodeId)
|
||||
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId)
|
||||
.in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_SUCCESS)
|
||||
new LambdaQueryWrapper<JobTaskBatch>()
|
||||
.select(JobTaskBatch::getId)
|
||||
.eq(JobTaskBatch::getWorkflowNodeId, nodeId)
|
||||
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId)
|
||||
.in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_SUCCESS)
|
||||
);
|
||||
|
||||
Assert.notEmpty(jobTaskBatches, () -> new EasyRetryServerException("job task batch is empty."));
|
||||
|
||||
for (JobTaskBatch jobTaskBatch : jobTaskBatches) {
|
||||
|
||||
Job job = jobMapper.selectById(jobTaskBatch.getJobId());
|
||||
Assert.notNull(job, () -> new EasyRetryServerException("job can not be null."));
|
||||
|
||||
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
|
||||
.select(JobTask::getId)
|
||||
.eq(JobTask::getTaskBatchId, jobTaskBatch.getId()));
|
||||
Assert.notEmpty(jobTasks, () -> new EasyRetryServerException("job task is empty."));
|
||||
|
||||
for (JobTask jobTask : jobTasks) {
|
||||
// 模拟失败重试
|
||||
ClientCallbackHandler clientCallback = ClientCallbackFactory.getClientCallback(job.getTaskType());
|
||||
ClientCallbackContext context = JobTaskConverter.INSTANCE.toClientCallbackContext(job);
|
||||
context.setTaskBatchId(jobTaskBatch.getId());
|
||||
context.setTaskId(jobTask.getId());
|
||||
context.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus());
|
||||
context.setExecuteResult(ExecuteResult.failure(null, "手动重试"));
|
||||
clientCallback.callback(context);
|
||||
}
|
||||
|
||||
// JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(job.getTaskType());
|
||||
//
|
||||
// JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job);
|
||||
// context.setTaskList(jobTaRFGVTBCD67YFGVTBUE8SDsks);
|
||||
// context.setTaskBatchId(jobTaskBatch.getId());
|
||||
// context.setWorkflowTaskBatchId(workflowTaskBatchId);
|
||||
// context.setWorkflowNodeId(nodeId);
|
||||
// jobExecutor.execute(context);
|
||||
|
||||
jobHandler.retry(jobTaskBatch.getId(), nodeId, workflowTaskBatchId);
|
||||
}
|
||||
|
||||
return Boolean.TRUE;
|
||||
|
Loading…
Reference in New Issue
Block a user