feat: 3.1.0

1. 新增定时任务失败手动重试功能
This commit is contained in:
byteblogs168 2024-02-25 23:22:27 +08:00
parent 4cce228d5c
commit 0b7a45ef21
16 changed files with 337 additions and 74 deletions

View File

@ -53,7 +53,10 @@ public interface JobTaskConverter {
JobTaskBatchGeneratorContext toJobTaskGeneratorContext(BlockStrategyContext context);
JobTaskGenerateContext toJobTaskInstanceGenerateContext(JobExecutorContext context);
@Mappings(
@Mapping(source = "id", target = "jobId")
)
JobTaskGenerateContext toJobTaskInstanceGenerateContext(Job job);
JobTask toJobTaskInstance(JobTaskGenerateContext context);
@ -74,16 +77,17 @@ public interface JobTaskConverter {
JobLogMessage toJobLogMessage(LogTaskDTO logTaskDTO);
LogMetaDTO toJobLogDTO(ClientCallbackContext context);
LogMetaDTO toJobLogDTO(JobExecutorResultDTO resultDTO);
LogMetaDTO toJobLogDTO(BaseDTO baseDTO);
ClientCallbackContext toClientCallbackContext(DispatchJobResultRequest request);
ClientCallbackContext toClientCallbackContext(RealJobExecutorDTO request);
@Mappings(
@Mapping(source = "id", target = "jobId")
)
ClientCallbackContext toClientCallbackContext(Job job);
DispatchJobRequest toDispatchJobRequest(RealJobExecutorDTO realJobExecutorDTO);
@Mappings({
@ -98,6 +102,9 @@ public interface JobTaskConverter {
})
RealJobExecutorDTO toRealJobExecutorDTO(JobExecutorContext context, JobTask jobTask);
@Mappings(
@Mapping(source = "id", target = "jobId")
)
JobExecutorContext toJobExecutorContext(Job job);
JobExecutorResultDTO toJobExecutorResultDTO(ClientCallbackContext context);

View File

@ -30,9 +30,9 @@ import java.util.Objects;
public abstract class AbstractClientCallbackHandler implements ClientCallbackHandler, InitializingBean {
@Autowired
private JobTaskMapper jobTaskMapper;
protected JobTaskMapper jobTaskMapper;
@Autowired
private JobMapper jobMapper;
protected JobMapper jobMapper;
@Override
@Transactional

View File

@ -1,13 +1,25 @@
package com.aizuda.easy.retry.server.job.task.support.callback;
import akka.actor.ActorRef;
import cn.hutool.core.collection.CollUtil;
import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @author: www.byteblogs.com
* @date : 2023-10-07 10:24
@ -36,4 +48,38 @@ public class BroadcastClientCallbackHandler extends AbstractClientCallbackHandle
}
@Override
protected String chooseNewClient(ClientCallbackContext context) {
Set<RegisterNodeInfo> nodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId());
if (CollUtil.isEmpty(nodes)) {
log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
return null;
}
JobTask jobTask = context.getJobTask();
String clientInfo = jobTask.getClientInfo();
String clientId = ClientInfoUtils.clientId(clientInfo);
RegisterNodeInfo serverNode = CacheRegisterTable.getServerNode(context.getGroupName(), context.getNamespaceId(), clientId);
if (Objects.isNull(serverNode)) {
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()));
Set<String> clientIdList = jobTasks.stream()
.map(jobTask1 -> ClientInfoUtils.clientId(jobTask1.getClientInfo()))
.collect(Collectors.toSet());
Set<String> remoteClientIdSet = nodes.stream().map(RegisterNodeInfo::getHostId).collect(Collectors.toSet());
Sets.SetView<String> diff = Sets.difference(remoteClientIdSet, clientIdList);
String newClientId = CollUtil.getFirst(diff.stream().iterator());
RegisterNodeInfo registerNodeInfo = CacheRegisterTable.getServerNode(context.getGroupName(), context.getNamespaceId(), newClientId);
if (Objects.isNull(registerNodeInfo)) {
// 如果找不到新的客户端信息则返回原来的客户端信息
return clientInfo;
}
return ClientInfoUtils.generate(registerNodeInfo);
}
return clientInfo;
}
}

View File

@ -33,11 +33,6 @@ import java.util.Set;
@Slf4j
public class ShardingClientCallbackHandler extends AbstractClientCallbackHandler {
@Autowired
private JobTaskMapper jobTaskMapper;
@Autowired
private JobMapper jobMapper;
@Override
public JobTaskTypeEnum getTaskInstanceType() {
return JobTaskTypeEnum.SHARDING;

View File

@ -26,6 +26,10 @@ import com.aizuda.easy.retry.server.job.task.support.cache.ResidentTaskCache;
import com.aizuda.easy.retry.server.job.task.support.event.JobTaskFailAlarmEvent;
import com.aizuda.easy.retry.server.job.task.support.executor.job.JobExecutorContext;
import com.aizuda.easy.retry.server.job.task.support.executor.job.JobExecutorFactory;
import com.aizuda.easy.retry.server.job.task.support.generator.task.JobTaskGenerateContext;
import com.aizuda.easy.retry.server.job.task.support.generator.task.JobTaskGenerator;
import com.aizuda.easy.retry.server.job.task.support.generator.task.JobTaskGeneratorFactory;
import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.easy.retry.server.job.task.support.timer.JobTimerWheel;
import com.aizuda.easy.retry.server.job.task.support.timer.ResidentJobTimerTask;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.GroupConfigMapper;
@ -33,9 +37,12 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
@ -48,6 +55,7 @@ import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@ -59,15 +67,13 @@ import java.util.concurrent.TimeUnit;
@Component(ActorGenerator.JOB_EXECUTOR_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
@RequiredArgsConstructor
public class JobExecutorActor extends AbstractActor {
@Autowired
private JobMapper jobMapper;
@Autowired
private JobTaskBatchMapper jobTaskBatchMapper;
@Autowired
private TransactionTemplate transactionTemplate;
@Autowired
private GroupConfigMapper groupConfigMapper;
private final JobMapper jobMapper;
private final JobTaskBatchMapper jobTaskBatchMapper;
private final TransactionTemplate transactionTemplate;
private final GroupConfigMapper groupConfigMapper;
private final WorkflowBatchHandler workflowBatchHandler;
@Override
public Receive createReceive() {
@ -112,27 +118,13 @@ public class JobExecutorActor extends AbstractActor {
job.getNamespaceId()))) {
taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();
operationReason = JobOperationReasonEnum.NOT_CLIENT.getReason();
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCompletion(int status) {
if (Objects.nonNull(taskExecute.getWorkflowNodeId()) && Objects.nonNull(taskExecute.getWorkflowTaskBatchId())) {
// 若是工作流则开启下一个任务
try {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
taskExecuteDTO.setTaskExecutorScene(taskExecute.getTaskExecutorScene());
taskExecuteDTO.setParentId(taskExecute.getWorkflowNodeId());
taskExecuteDTO.setTaskBatchId(taskExecute.getTaskBatchId());
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
} catch (Exception e) {
log.error("任务调度执行失败", e);
}
}
}
});
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
taskExecuteDTO.setTaskExecutorScene(taskExecute.getTaskExecutorScene());
taskExecuteDTO.setParentId(taskExecute.getWorkflowNodeId());
taskExecuteDTO.setTaskBatchId(taskExecute.getTaskBatchId());
workflowBatchHandler.openNextNode(taskExecuteDTO);
}
// 更新状态
@ -143,14 +135,18 @@ public class JobExecutorActor extends AbstractActor {
return;
}
// 生成任务
JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(job.getTaskType());
JobTaskGenerateContext instanceGenerateContext = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job);
instanceGenerateContext.setTaskBatchId(taskExecute.getTaskBatchId());
List<JobTask> taskList = taskInstance.generate(instanceGenerateContext);
if (CollectionUtils.isEmpty(taskList)) {
return;
}
// 执行任务
JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(job.getTaskType());
JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job);
context.setTaskBatchId(taskExecute.getTaskBatchId());
context.setJobId(job.getId());
context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
context.setWorkflowNodeId(taskExecute.getWorkflowNodeId());
jobExecutor.execute(context);
jobExecutor.execute(buildJobExecutorContext(taskExecute, job, taskList));
} finally {
log.info("准备执行任务完成.[{}]", JsonUtil.toJsonString(taskExecute));
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@ -166,6 +162,17 @@ public class JobExecutorActor extends AbstractActor {
}
@NotNull
private static JobExecutorContext buildJobExecutorContext(TaskExecuteDTO taskExecute, Job job, List<JobTask> taskList) {
JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job);
context.setTaskList(taskList);
context.setTaskBatchId(taskExecute.getTaskBatchId());
context.setJobId(job.getId());
context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
context.setWorkflowNodeId(taskExecute.getWorkflowNodeId());
return context;
}
private void handlerTaskBatch(TaskExecuteDTO taskExecute, int taskStatus, int operationReason) {
JobTaskBatch jobTaskBatch = new JobTaskBatch();

View File

@ -22,17 +22,6 @@ public abstract class AbstractJobExecutor implements JobExecutor, InitializingBe
@Override
@Transactional
public void execute(JobExecutorContext context) {
// 生成任务
JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(getTaskInstanceType().getType());
JobTaskGenerateContext instanceGenerateContext = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(context);
List<JobTask> taskList = taskInstance.generate(instanceGenerateContext);
if (CollectionUtils.isEmpty(taskList)) {
return;
}
context.setTaskList(taskList);
doExecute(context);
}

View File

@ -62,7 +62,10 @@ public class RequestClientActor extends AbstractActor {
realJobExecutorDTO.getNamespaceId(),
realJobExecutorDTO.getClientId());
if (Objects.isNull(registerNodeInfo)) {
taskExecuteFailure(realJobExecutorDTO, "无可执行的客户端");
taskExecuteFailure(realJobExecutorDTO, "客户端不存在");
LogMetaDTO logMetaDTO = JobTaskConverter.INSTANCE.toJobLogDTO(realJobExecutorDTO);
logMetaDTO.setTimestamp( DateUtils.toNowMilli());
EasyRetryLog.REMOTE.error("taskId:[{}] 任务调度失败. 失败原因: 无可执行的客户端 <|>{}<|>", realJobExecutorDTO.getTaskId(), logMetaDTO);
return;
}

View File

@ -87,7 +87,7 @@ public class JobTaskBatchHandler {
return 1 == jobTaskBatchMapper.update(jobTaskBatch,
new LambdaUpdateWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId())
.in(JobTaskBatch::getTaskBatchStatus, JobTaskStatusEnum.NOT_COMPLETE)
.in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE)
);
}

View File

@ -40,4 +40,11 @@ public class JobBatchController {
return jobBatchService.stop(taskBatchId);
}
@PostMapping("/retry/{taskBatchId}")
@LoginRequired
public Boolean retry(@PathVariable("taskBatchId") Long taskBatchId) {
return jobBatchService.retry(taskBatchId);
}
}

View File

@ -18,13 +18,15 @@ import org.springframework.web.bind.annotation.RestController;
public class WorkflowNodeController {
private final WorkflowNodeService workflowNodeService;
@PostMapping("/stop/{id}")
public Boolean stop(@PathVariable("id") Long id) {
return workflowNodeService.stop(id);
@PostMapping("/stop/{nodeId}/{workflowTaskBatchId}")
public Boolean stop(@PathVariable("nodeId") Long nodeId,
@PathVariable("workflowTaskBatchId") Long workflowTaskBatchId) {
return workflowNodeService.stop(nodeId, workflowTaskBatchId);
}
@PostMapping("/retry/{id}")
public Boolean retry(@PathVariable("id") Long id) {
return workflowNodeService.retry(id);
@PostMapping("/retry/{nodeId}/{workflowTaskBatchId}")
public Boolean retry(@PathVariable("nodeId") Long nodeId,
@PathVariable("workflowTaskBatchId") Long workflowTaskBatchId) {
return workflowNodeService.retry(nodeId, workflowTaskBatchId);
}
}

View File

@ -19,4 +19,5 @@ public interface JobBatchService {
boolean stop(Long taskBatchId);
Boolean retry(Long taskBatchId);
}

View File

@ -6,7 +6,7 @@ package com.aizuda.easy.retry.server.web.service;
* @since 2.6.0
*/
public interface WorkflowNodeService {
Boolean stop(Long id);
Boolean stop(Long nodeId, Long workflowTaskBatchId);
Boolean retry(Long id);
Boolean retry(Long id, Long workflowTaskBatchId);
}

View File

@ -2,15 +2,25 @@ package com.aizuda.easy.retry.server.web.service.impl;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.client.model.ExecuteResult;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.dto.CallbackConfig;
import com.aizuda.easy.retry.server.common.dto.DecisionConfig;
import com.aizuda.easy.retry.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.ClientCallbackHandler;
import com.aizuda.easy.retry.server.job.task.support.JobExecutor;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler;
import com.aizuda.easy.retry.server.job.task.support.callback.ClientCallbackContext;
import com.aizuda.easy.retry.server.job.task.support.callback.ClientCallbackFactory;
import com.aizuda.easy.retry.server.job.task.support.executor.job.JobExecutorContext;
import com.aizuda.easy.retry.server.job.task.support.executor.job.JobExecutorFactory;
import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory;
import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext;
import com.aizuda.easy.retry.server.web.model.base.PageResult;
@ -24,14 +34,19 @@ import com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatch
import com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchResponseDO;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.jetbrains.annotations.NotNull;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.util.List;
@ -49,6 +64,7 @@ public class JobBatchServiceImpl implements JobBatchService {
private final JobTaskBatchMapper jobTaskBatchMapper;
private final JobMapper jobMapper;
private final WorkflowNodeMapper workflowNodeMapper;
private final JobTaskMapper jobTaskMapper;
@Override
public PageResult<List<JobBatchResponseVO>> getJobBatchPage(final JobBatchQueryVO queryVO) {
@ -80,10 +96,10 @@ public class JobBatchServiceImpl implements JobBatchService {
jobBatchQueryDO.setGroupNames(groupNames);
jobBatchQueryDO.setNamespaceId(userSessionVO.getNamespaceId());
List<JobBatchResponseDO> batchResponseDOList = jobTaskBatchMapper.selectJobBatchPageList(pageDTO,
jobBatchQueryDO);
jobBatchQueryDO);
List<JobBatchResponseVO> batchResponseVOList = JobBatchResponseVOConverter.INSTANCE.toJobBatchResponseVOs(
batchResponseDOList);
batchResponseDOList);
return new PageResult<>(pageDTO, batchResponseVOList);
}
@ -141,4 +157,50 @@ public class JobBatchServiceImpl implements JobBatchService {
return Boolean.TRUE;
}
@Override
@Transactional
public Boolean retry(Long taskBatchId) {
JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectOne(new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getId, taskBatchId)
.in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_SUCCESS)
);
Assert.notNull(jobTaskBatch, () -> new EasyRetryServerException("job batch can not be null."));
// 重置状态为运行中
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.RUNNING.getStatus());
Assert.isTrue(jobTaskBatchMapper.updateById(jobTaskBatch) > 0,
() -> new EasyRetryServerException("update job batch to running failed."));
Job job = jobMapper.selectById(jobTaskBatch.getJobId());
Assert.notNull(job, () -> new EasyRetryServerException("job can not be null."));
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.in(JobTask::getTaskStatus, Lists.newArrayList(
JobTaskStatusEnum.FAIL.getStatus(),
JobTaskStatusEnum.STOP.getStatus(),
JobTaskStatusEnum.CANCEL.getStatus()
)
)
.eq(JobTask::getTaskBatchId, taskBatchId));
Assert.notEmpty(jobTasks, () -> new EasyRetryServerException("job task is empty."));
for (JobTask jobTask : jobTasks) {
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
Assert.isTrue(jobTaskMapper.updateById(jobTask) > 0,
() -> new EasyRetryServerException("update job task to running failed."));
// 模拟失败重试
ClientCallbackHandler clientCallback = ClientCallbackFactory.getClientCallback(job.getTaskType());
ClientCallbackContext context = JobTaskConverter.INSTANCE.toClientCallbackContext(job);
context.setTaskBatchId(jobTaskBatch.getId());
context.setTaskId(jobTask.getId());
context.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus());
context.setExecuteResult(ExecuteResult.failure(null, "手动重试"));
clientCallback.callback(context);
}
return Boolean.TRUE;
}
}

View File

@ -1,7 +1,37 @@
package com.aizuda.easy.retry.server.web.service.impl;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.client.model.ExecuteResult;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.ClientCallbackHandler;
import com.aizuda.easy.retry.server.job.task.support.JobExecutor;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler;
import com.aizuda.easy.retry.server.job.task.support.callback.ClientCallbackContext;
import com.aizuda.easy.retry.server.job.task.support.callback.ClientCallbackFactory;
import com.aizuda.easy.retry.server.job.task.support.executor.job.JobExecutorContext;
import com.aizuda.easy.retry.server.job.task.support.executor.job.JobExecutorFactory;
import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory;
import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext;
import com.aizuda.easy.retry.server.web.service.WorkflowNodeService;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
* @author xiaowoniu
@ -9,17 +39,99 @@ import org.springframework.stereotype.Service;
* @since 2.6.0
*/
@Service
@RequiredArgsConstructor
public class WorkflowNodeServiceImpl implements WorkflowNodeService {
private final JobTaskBatchMapper jobTaskBatchMapper;
private final JobMapper jobMapper;
private final WorkflowBatchHandler workflowBatchHandler;
private final JobTaskMapper jobTaskMapper;
@Override
public Boolean stop(Long id) {
public Boolean stop(Long nodeId, Long workflowTaskBatchId) {
// 调用JOB的停止接口
List<JobTaskBatch> jobTaskBatches = jobTaskBatchMapper.selectList(
new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getWorkflowNodeId, nodeId)
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId)
.in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_COMPLETE)
);
if (CollectionUtils.isEmpty(jobTaskBatches)) {
return Boolean.TRUE;
}
for (JobTaskBatch jobTaskBatch : jobTaskBatches) {
Job job = jobMapper.selectById(jobTaskBatch.getJobId());
Assert.notNull(job, () -> new EasyRetryServerException("job can not be null."));
JobTaskStopHandler jobTaskStop = JobTaskStopFactory.getJobTaskStop(job.getTaskType());
TaskStopJobContext taskStopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(job);
taskStopJobContext.setJobOperationReason(JobOperationReasonEnum.MANNER_STOP.getReason());
taskStopJobContext.setTaskBatchId(jobTaskBatch.getId());
taskStopJobContext.setForceStop(Boolean.TRUE);
taskStopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE);
jobTaskStop.stop(taskStopJobContext);
}
// 继续执行后续的任务
return null;
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setWorkflowTaskBatchId(workflowTaskBatchId);
taskExecuteDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType());
taskExecuteDTO.setParentId(nodeId);
workflowBatchHandler.openNextNode(taskExecuteDTO);
return Boolean.TRUE;
}
@Override
public Boolean retry(Long id) {
return null;
public Boolean retry(Long nodeId, Long workflowTaskBatchId) {
// 调用JOB的停止接口
List<JobTaskBatch> jobTaskBatches = jobTaskBatchMapper.selectList(
new LambdaQueryWrapper<JobTaskBatch>()
.select(JobTaskBatch::getId)
.eq(JobTaskBatch::getWorkflowNodeId, nodeId)
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatchId)
.in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.NOT_SUCCESS)
);
Assert.notEmpty(jobTaskBatches, () -> new EasyRetryServerException("job task batch is empty."));
for (JobTaskBatch jobTaskBatch : jobTaskBatches) {
Job job = jobMapper.selectById(jobTaskBatch.getJobId());
Assert.notNull(job, () -> new EasyRetryServerException("job can not be null."));
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getId)
.eq(JobTask::getTaskBatchId, jobTaskBatch.getId()));
Assert.notEmpty(jobTasks, () -> new EasyRetryServerException("job task is empty."));
for (JobTask jobTask : jobTasks) {
// 模拟失败重试
ClientCallbackHandler clientCallback = ClientCallbackFactory.getClientCallback(job.getTaskType());
ClientCallbackContext context = JobTaskConverter.INSTANCE.toClientCallbackContext(job);
context.setTaskBatchId(jobTaskBatch.getId());
context.setTaskId(jobTask.getId());
context.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus());
context.setExecuteResult(ExecuteResult.failure(null, "手动重试"));
clientCallback.callback(context);
}
// JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(job.getTaskType());
//
// JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job);
// context.setTaskList(jobTaRFGVTBCD67YFGVTBUE8SDsks);
// context.setTaskBatchId(jobTaskBatch.getId());
// context.setWorkflowTaskBatchId(workflowTaskBatchId);
// context.setWorkflowNodeId(nodeId);
// jobExecutor.execute(context);
}
return Boolean.TRUE;
}
}

View File

@ -16,6 +16,7 @@ const jobApi = {
jobBatchList: '/job/batch/list',
jobBatchDetail: '/job/batch/',
stop: '/job/batch/stop/',
retry: '/job/batch/retry/',
// 通知
jobNotifyConfigPageList: '/job/notify/config/page/list',
@ -45,6 +46,12 @@ const jobApi = {
export default jobApi
export function retry (id) {
return request({
url: jobApi.retry + id,
method: 'post'
})
}
export function workflowNameList (parameter) {
return request({
url: jobApi.workflowNameList,

View File

@ -100,6 +100,20 @@
</a-popconfirm>
</template>
</span>
<span slot="action" slot-scope="text, record">
<template>
<a @click="handleInfo(record)">详情</a>
<a-divider type="vertical" />
<a-popconfirm
title="是否重试?"
ok-text="重试"
cancel-text="取消"
@confirm="handleRetry(record)"
>
<a href="javascript:;" v-if="record.taskBatchStatus === 4 || record.taskBatchStatus === 5 || record.taskBatchStatus === 6">重试</a>
</a-popconfirm>
</template>
</span>
</s-table>
<Drawer
@ -121,7 +135,7 @@
import ATextarea from 'ant-design-vue/es/input/TextArea'
import AInput from 'ant-design-vue/es/input/Input'
import { Drawer, STable } from '@/components'
import { jobBatchList, jobNameList, stop } from '@/api/jobApi'
import { jobBatchList, jobNameList, stop, retry } from '@/api/jobApi'
import { getAllGroupNameList } from '@/api/manage'
import JobBatchInfo from '@/views/job/JobBatchInfo'
const enums = require('@/utils/jobEnum')
@ -270,6 +284,17 @@ export default {
}
})
},
handleRetry (record) {
retry(record.id).then((res) => {
const { status } = res
if (status === 0) {
this.$message.error('重试失败')
} else {
this.$refs.table.refresh(true)
this.$message.success('重试成功')
}
})
},
refreshTable (v) {
this.$refs.table.refresh(true)
},