diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java index 11d6eb4e..8da80965 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java @@ -11,8 +11,11 @@ import com.aizuda.snailjob.server.job.task.support.ClientCallbackHandler; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; import com.aizuda.snailjob.template.datasource.persistence.po.Job; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; +import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.toolkit.SqlHelper; import org.springframework.beans.factory.InitializingBean; @@ -29,9 +32,12 @@ import java.util.Objects; public abstract class AbstractClientCallbackHandler implements ClientCallbackHandler, InitializingBean { @Autowired - protected JobTaskMapper jobTaskMapper; + private JobTaskMapper jobTaskMapper; @Autowired - protected JobMapper jobMapper; + private JobMapper jobMapper; + @Autowired + private WorkflowTaskBatchMapper workflowTaskBatchMapper; + @Override @Transactional @@ -44,7 +50,8 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan if (updateRetryCount(context)) { Job job = context.getJob(); JobTask jobTask = context.getJobTask(); - RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); + RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO( + JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); realJobExecutor.setClientId(ClientInfoUtils.clientId(context.getClientInfo())); realJobExecutor.setWorkflowNodeId(context.getWorkflowNodeId()); realJobExecutor.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); @@ -52,6 +59,10 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan realJobExecutor.setRetry(Boolean.TRUE); realJobExecutor.setRetryScene(context.getRetryScene()); realJobExecutor.setTaskName(jobTask.getTaskName()); + // 这里统一收口传递上下文 + if (StrUtil.isBlank(realJobExecutor.getWfContext())) { + realJobExecutor.setWfContext(getWfContext(realJobExecutor.getWorkflowTaskBatchId())); + } ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); actorRef.tell(realJobExecutor, actorRef); return; @@ -62,6 +73,30 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan doCallback(context); } + /** + * 获取工作流批次 + * + * @param workflowTaskBatchId 工作流批次 + * @return + */ + private String getWfContext(Long workflowTaskBatchId) { + if (Objects.isNull(workflowTaskBatchId)) { + return null; + } + + WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne( + new LambdaQueryWrapper() + .select(WorkflowTaskBatch::getWfContext) + .eq(WorkflowTaskBatch::getId, workflowTaskBatchId) + ); + + if (Objects.isNull(workflowTaskBatch)) { + return null; + } + + return workflowTaskBatch.getWfContext(); + } + private boolean updateRetryCount(ClientCallbackContext context) { JobTask updateJobTask = new JobTask(); updateJobTask.setRetryCount(1); @@ -78,7 +113,7 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan LambdaUpdateWrapper updateWrapper = new LambdaUpdateWrapper<>(); updateWrapper.eq(JobTask::getId, context.getTaskId()); if (Objects.isNull(context.getRetryScene()) - || Objects.equals(JobRetrySceneEnum.AUTO.getRetryScene(), context.getRetryScene())) { + || Objects.equals(JobRetrySceneEnum.AUTO.getRetryScene(), context.getRetryScene())) { updateWrapper.lt(JobTask::getRetryCount, job.getMaxRetryTimes()); } @@ -98,8 +133,8 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan // 手动重试策略 if (Objects.nonNull(context.getRetryScene()) - && Objects.equals(JobRetrySceneEnum.MANUAL.getRetryScene(), context.getRetryScene()) - && !context.isRetry()) { + && Objects.equals(JobRetrySceneEnum.MANUAL.getRetryScene(), context.getRetryScene()) + && !context.isRetry()) { return Boolean.TRUE; } diff --git a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/handler/JobHandler.java b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/handler/JobHandler.java index a51365d8..5d868ec9 100644 --- a/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/handler/JobHandler.java +++ b/snail-job-server/snail-job-server-web/src/main/java/com/aizuda/snailjob/server/web/service/handler/JobHandler.java @@ -25,15 +25,18 @@ import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; import com.aizuda.snailjob.template.datasource.persistence.po.Job; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch; +import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; import java.time.Duration; import java.util.List; +import java.util.Objects; /** * @author: xiaowoniu @@ -47,6 +50,8 @@ public class JobHandler { private final JobTaskBatchMapper jobTaskBatchMapper; private final JobMapper jobMapper; private final JobTaskMapper jobTaskMapper; + private WorkflowTaskBatchMapper workflowTaskBatchMapper; + public Boolean retry(Long taskBatchId) { return retry(taskBatchId, null, null); @@ -88,6 +93,9 @@ public class JobHandler { return Boolean.TRUE; } + // 获取工作流上下文 + String wfContext = getWfContext(workflowTaskBatchId); + for (JobTask jobTask : jobTasks) { if (jobTask.getTaskStatus() == JobTaskStatusEnum.RUNNING.getStatus()) { continue; @@ -105,6 +113,7 @@ public class JobHandler { context.setTaskId(jobTask.getId()); context.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus()); context.setRetryScene(JobRetrySceneEnum.MANUAL.getRetryScene()); + context.setWfContext(wfContext); context.setExecuteResult(ExecuteResult.failure(null, "手动重试")); clientCallback.callback(context); } @@ -138,4 +147,28 @@ public class JobHandler { return Boolean.TRUE; } + /** + * 获取工作流批次 + * + * @param workflowTaskBatchId 工作流批次 + * @return + */ + private String getWfContext(Long workflowTaskBatchId) { + if (Objects.isNull(workflowTaskBatchId)) { + return null; + } + + WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne( + new LambdaQueryWrapper() + .select(WorkflowTaskBatch::getWfContext) + .eq(WorkflowTaskBatch::getId, workflowTaskBatchId) + ); + + if (Objects.isNull(workflowTaskBatch)) { + return null; + } + + return workflowTaskBatch.getWfContext(); + } + }