fix(sj_1.1.0-beta2): 优化工作流重试时上下文丢失问题

This commit is contained in:
opensnail 2024-07-01 11:33:03 +08:00
parent 3cb31a14d5
commit b45944d74a
2 changed files with 74 additions and 6 deletions

View File

@ -11,8 +11,11 @@ import com.aizuda.snailjob.server.job.task.support.ClientCallbackHandler;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.toolkit.SqlHelper;
import org.springframework.beans.factory.InitializingBean;
@ -29,9 +32,12 @@ import java.util.Objects;
public abstract class AbstractClientCallbackHandler implements ClientCallbackHandler, InitializingBean {
@Autowired
protected JobTaskMapper jobTaskMapper;
private JobTaskMapper jobTaskMapper;
@Autowired
protected JobMapper jobMapper;
private JobMapper jobMapper;
@Autowired
private WorkflowTaskBatchMapper workflowTaskBatchMapper;
@Override
@Transactional
@ -44,7 +50,8 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan
if (updateRetryCount(context)) {
Job job = context.getJob();
JobTask jobTask = context.getJobTask();
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask);
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(
JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask);
realJobExecutor.setClientId(ClientInfoUtils.clientId(context.getClientInfo()));
realJobExecutor.setWorkflowNodeId(context.getWorkflowNodeId());
realJobExecutor.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
@ -52,6 +59,10 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan
realJobExecutor.setRetry(Boolean.TRUE);
realJobExecutor.setRetryScene(context.getRetryScene());
realJobExecutor.setTaskName(jobTask.getTaskName());
// 这里统一收口传递上下文
if (StrUtil.isBlank(realJobExecutor.getWfContext())) {
realJobExecutor.setWfContext(getWfContext(realJobExecutor.getWorkflowTaskBatchId()));
}
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
actorRef.tell(realJobExecutor, actorRef);
return;
@ -62,6 +73,30 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan
doCallback(context);
}
/**
* 获取工作流批次
*
* @param workflowTaskBatchId 工作流批次
* @return
*/
private String getWfContext(Long workflowTaskBatchId) {
if (Objects.isNull(workflowTaskBatchId)) {
return null;
}
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne(
new LambdaQueryWrapper<WorkflowTaskBatch>()
.select(WorkflowTaskBatch::getWfContext)
.eq(WorkflowTaskBatch::getId, workflowTaskBatchId)
);
if (Objects.isNull(workflowTaskBatch)) {
return null;
}
return workflowTaskBatch.getWfContext();
}
private boolean updateRetryCount(ClientCallbackContext context) {
JobTask updateJobTask = new JobTask();
updateJobTask.setRetryCount(1);
@ -78,7 +113,7 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan
LambdaUpdateWrapper<JobTask> updateWrapper = new LambdaUpdateWrapper<>();
updateWrapper.eq(JobTask::getId, context.getTaskId());
if (Objects.isNull(context.getRetryScene())
|| Objects.equals(JobRetrySceneEnum.AUTO.getRetryScene(), context.getRetryScene())) {
|| Objects.equals(JobRetrySceneEnum.AUTO.getRetryScene(), context.getRetryScene())) {
updateWrapper.lt(JobTask::getRetryCount, job.getMaxRetryTimes());
}
@ -98,8 +133,8 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan
// 手动重试策略
if (Objects.nonNull(context.getRetryScene())
&& Objects.equals(JobRetrySceneEnum.MANUAL.getRetryScene(), context.getRetryScene())
&& !context.isRetry()) {
&& Objects.equals(JobRetrySceneEnum.MANUAL.getRetryScene(), context.getRetryScene())
&& !context.isRetry()) {
return Boolean.TRUE;
}

View File

@ -25,15 +25,18 @@ import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
/**
* @author: xiaowoniu
@ -47,6 +50,8 @@ public class JobHandler {
private final JobTaskBatchMapper jobTaskBatchMapper;
private final JobMapper jobMapper;
private final JobTaskMapper jobTaskMapper;
private WorkflowTaskBatchMapper workflowTaskBatchMapper;
public Boolean retry(Long taskBatchId) {
return retry(taskBatchId, null, null);
@ -88,6 +93,9 @@ public class JobHandler {
return Boolean.TRUE;
}
// 获取工作流上下文
String wfContext = getWfContext(workflowTaskBatchId);
for (JobTask jobTask : jobTasks) {
if (jobTask.getTaskStatus() == JobTaskStatusEnum.RUNNING.getStatus()) {
continue;
@ -105,6 +113,7 @@ public class JobHandler {
context.setTaskId(jobTask.getId());
context.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus());
context.setRetryScene(JobRetrySceneEnum.MANUAL.getRetryScene());
context.setWfContext(wfContext);
context.setExecuteResult(ExecuteResult.failure(null, "手动重试"));
clientCallback.callback(context);
}
@ -138,4 +147,28 @@ public class JobHandler {
return Boolean.TRUE;
}
/**
* 获取工作流批次
*
* @param workflowTaskBatchId 工作流批次
* @return
*/
private String getWfContext(Long workflowTaskBatchId) {
if (Objects.isNull(workflowTaskBatchId)) {
return null;
}
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne(
new LambdaQueryWrapper<WorkflowTaskBatch>()
.select(WorkflowTaskBatch::getWfContext)
.eq(WorkflowTaskBatch::getId, workflowTaskBatchId)
);
if (Objects.isNull(workflowTaskBatch)) {
return null;
}
return workflowTaskBatch.getWfContext();
}
}