feat: 3.1.0

1. 优化工作流重试逻辑
This commit is contained in:
byteblogs168 2024-02-26 23:25:44 +08:00
parent b5c93f991e
commit be12ed7e78
3 changed files with 36 additions and 7 deletions

View File

@ -1,13 +1,16 @@
package com.aizuda.easy.retry.server.job.task.support.stop;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
@ -24,6 +27,8 @@ public abstract class AbstractJobTaskStopHandler implements JobTaskStopHandler,
@Autowired
private JobTaskMapper jobTaskMapper;
@Autowired
private JobTaskBatchMapper jobTaskBatchMapper;
protected abstract void doStop(TaskStopJobContext context);
@ -40,6 +45,11 @@ public abstract class AbstractJobTaskStopHandler implements JobTaskStopHandler,
List<JobTask> jobTasks = jobTaskMapper.selectList(queryWrapper);
if (CollectionUtils.isEmpty(jobTasks)) {
// 若没有任务项直接变更状态为已停止
JobTaskBatch jobTaskBatch = new JobTaskBatch();
jobTaskBatch.setId(context.getTaskBatchId());
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.STOP.getStatus());
jobTaskBatchMapper.updateById(jobTaskBatch);
return;
}

View File

@ -1,11 +1,15 @@
package com.aizuda.easy.retry.server.web.service.handler;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.client.model.ExecuteResult;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.ClientCallbackHandler;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler;
@ -24,6 +28,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
@ -60,16 +65,27 @@ public class JobHandler {
Assert.notNull(job, () -> new EasyRetryServerException("job can not be null."));
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.in(JobTask::getTaskStatus, Lists.newArrayList(
JobTaskStatusEnum.FAIL.getStatus(),
JobTaskStatusEnum.STOP.getStatus(),
JobTaskStatusEnum.CANCEL.getStatus()
)
)
.eq(JobTask::getTaskBatchId, taskBatchId));
Assert.notEmpty(jobTasks, () -> new EasyRetryServerException("job task is empty."));
// 若任务项为空则生成
if (CollectionUtils.isEmpty(jobTasks)) {
TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO();
taskExecuteDTO.setTaskBatchId(taskBatchId);
taskExecuteDTO.setJobId(jobTaskBatch.getJobId());
taskExecuteDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_JOB.getType());
taskExecuteDTO.setWorkflowTaskBatchId(workflowTaskBatchId);
taskExecuteDTO.setWorkflowNodeId(workflowNodeId);
ActorRef actorRef = ActorGenerator.jobTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
return Boolean.TRUE;
}
for (JobTask jobTask : jobTasks) {
if (jobTask.getTaskStatus() == JobTaskStatusEnum.RUNNING.getStatus()) {
continue;
}
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
Assert.isTrue(jobTaskMapper.updateById(jobTask) > 0,
() -> new EasyRetryServerException("update job task to running failed."));

View File

@ -30,6 +30,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.util.List;
@ -49,6 +50,7 @@ public class WorkflowNodeServiceImpl implements WorkflowNodeService {
private final JobHandler jobHandler;
@Override
@Transactional
public Boolean stop(Long nodeId, Long workflowTaskBatchId) {
// 调用JOB的停止接口
List<JobTaskBatch> jobTaskBatches = jobTaskBatchMapper.selectList(
@ -78,6 +80,7 @@ public class WorkflowNodeServiceImpl implements WorkflowNodeService {
}
@Override
@Transactional
public Boolean retry(Long nodeId, Long workflowTaskBatchId) {
// 调用JOB的停止接口