diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java index 3eeb378af..559819d5c 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java @@ -50,8 +50,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor { // 将任务添加到时间轮中,到期停止任务 TimerManager.add(new StopTaskTimerTask(jobContext.getTaskBatchId()), jobContext.getExecutorTimeout(), TimeUnit.SECONDS); - Map changeWfContext = Maps.newConcurrentMap(); - + jobContext.setChangeWfContext(Maps.newConcurrentMap()); // 执行任务 ListenableFuture submit = decorator.submit(() -> { JobArgs jobArgs; @@ -72,7 +71,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor { } jobArgs.setWfContext(jobContext.getWfContext()); - jobArgs.setChangeWfContext(changeWfContext); + jobArgs.setChangeWfContext(jobContext.getChangeWfContext()); try { // 初始化调度信息(日志上报LogUtil) @@ -86,7 +85,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor { }); FutureCache.addFuture(jobContext.getTaskBatchId(), submit); - Futures.addCallback(submit, new JobExecutorFutureCallback(jobContext, changeWfContext), decorator); + Futures.addCallback(submit, new JobExecutorFutureCallback(jobContext), decorator); } private void initLogContext(JobContext jobContext) { diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java index 65d2a97a4..a6d55a86f 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java @@ -64,8 +64,8 @@ public abstract class AbstractMapExecutor extends AbstractJobExecutor implements if (Objects.isNull(method)) { throw new SnailJobMapReduceException( - "[{}] MapTask execution method not found. Please configure the @MapExecutor annotation", - mapArgs.getExecutorInfo()); + "[{}#{}] MapTask execution method not found. Please configure the @MapExecutor annotation", + mapArgs.getExecutorInfo(), mapArgs.getTaskName()); } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java index 9ce04801a..878c37999 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java @@ -66,11 +66,9 @@ public class JobExecutorFutureCallback implements FutureCallback }).build(); private final JobContext jobContext; - private final Map changeWfContext; - public JobExecutorFutureCallback(final JobContext jobContext, Map changeWfContext) { + public JobExecutorFutureCallback(final JobContext jobContext) { this.jobContext = jobContext; - this.changeWfContext = changeWfContext; } @Override @@ -164,9 +162,9 @@ public class JobExecutorFutureCallback implements FutureCallback dispatchJobRequest.setTaskStatus(status); dispatchJobRequest.setRetry(jobContext.isRetry()); dispatchJobRequest.setRetryScene(jobContext.getRetryScene()); - // 传递上下文 - if (CollUtil.isNotEmpty(changeWfContext)) { - dispatchJobRequest.setWfContext(JsonUtil.toJsonString(changeWfContext)); + // 传递变更后的上下文 + if (CollUtil.isNotEmpty(jobContext.getChangeWfContext())) { + dispatchJobRequest.setWfContext(JsonUtil.toJsonString(jobContext.getChangeWfContext())); } return dispatchJobRequest; diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/MapInvokeHandler.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/MapInvokeHandler.java index 01f11e4d1..e6644d284 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/MapInvokeHandler.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/MapInvokeHandler.java @@ -12,12 +12,15 @@ import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException; import com.aizuda.snailjob.common.core.model.JobContext; import com.aizuda.snailjob.common.core.model.NettyResult; import com.aizuda.snailjob.common.core.model.Result; +import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.SnailJobLog; import org.springframework.util.CollectionUtils; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.util.List; +import java.util.Map; +import java.util.Objects; /** * @author: opensnail @@ -72,6 +75,12 @@ public final class MapInvokeHandler implements InvocationHandler { mapTaskRequest.setTaskName(nextTaskName); mapTaskRequest.setSubTask(taskList); mapTaskRequest.setParentId(jobContext.getTaskId()); + mapTaskRequest.setWorkflowTaskBatchId(jobContext.getWorkflowTaskBatchId()); + mapTaskRequest.setWorkflowNodeId(jobContext.getWorkflowNodeId()); + Map changeWfContext = jobContext.getChangeWfContext(); + if (Objects.nonNull(changeWfContext)) { + mapTaskRequest.setWfContext(JsonUtil.toJsonString(changeWfContext)); + } // 2. 同步发送请求 Result result = CLIENT.batchReportMapTask(mapTaskRequest); diff --git a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/MapTaskRequest.java b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/MapTaskRequest.java index 7f9480a2e..5278a1e9d 100644 --- a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/MapTaskRequest.java +++ b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/MapTaskRequest.java @@ -6,6 +6,7 @@ import jakarta.validation.constraints.NotNull; import lombok.Data; import java.util.List; +import java.util.Map; /** * @author: opensnail @@ -27,6 +28,11 @@ public class MapTaskRequest { private Long workflowNodeId; + /** + * 当前节点变更的工作流上下文 + */ + private String wfContext; + @NotBlank(message = "taskName 不能为空") private String taskName; diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java index a0be939e7..432804dba 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java @@ -70,6 +70,11 @@ public class JobContext { */ private Map wfContext; + /** + * 新增或者改动的上下文 + */ + private Map changeWfContext; + /** * 定时任务参数 */ diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/ReduceTaskDTO.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/ReduceTaskDTO.java index 3abac8fed..3098302b4 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/ReduceTaskDTO.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/ReduceTaskDTO.java @@ -14,4 +14,5 @@ public class ReduceTaskDTO { private Long taskBatchId; private Long jobId; private Integer mrStage; + private String wfContext; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/MapClientCallbackHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/MapClientCallbackHandler.java index 27799c7dc..cadb230b0 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/MapClientCallbackHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/MapClientCallbackHandler.java @@ -22,6 +22,6 @@ public class MapClientCallbackHandler extends MapReduceClientCallbackHandler { @Override public void callback(ClientCallbackContext context) { - super.callback(context); + super.doCallback(context); } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java index b6f2f2bf0..5eb9384d2 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.java @@ -20,8 +20,10 @@ import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerat import com.aizuda.snailjob.server.job.task.support.handler.DistributedLockHandler; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; import com.aizuda.snailjob.template.datasource.persistence.po.Job; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; +import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import lombok.RequiredArgsConstructor; @@ -32,6 +34,7 @@ import org.springframework.stereotype.Component; import java.text.MessageFormat; import java.time.Duration; import java.util.List; +import java.util.Objects; /** * 负责生成reduce任务并执行 @@ -44,25 +47,27 @@ import java.util.List; @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) @RequiredArgsConstructor public class ReduceActor extends AbstractActor { + private static final String KEY = "job_generate_reduce_{0}_{1}"; private final DistributedLockHandler distributedLockHandler; private final JobMapper jobMapper; private final JobTaskMapper jobTaskMapper; + private final WorkflowTaskBatchMapper workflowTaskBatchMapper; @Override public Receive createReceive() { return receiveBuilder().match(ReduceTaskDTO.class, reduceTask -> { - SnailJobLog.LOCAL.info("执行Reduce, [{}]", JsonUtil.toJsonString(reduceTask)); try { - Assert.notNull(reduceTask.getMrStage(), ()-> new SnailJobServerException("mrStage can not be null")); - Assert.notNull(reduceTask.getJobId(), ()-> new SnailJobServerException("jobId can not be null")); - Assert.notNull(reduceTask.getTaskBatchId(), ()-> new SnailJobServerException("taskBatchId can not be null")); + Assert.notNull(reduceTask.getMrStage(), () -> new SnailJobServerException("mrStage can not be null")); + Assert.notNull(reduceTask.getJobId(), () -> new SnailJobServerException("jobId can not be null")); + Assert.notNull(reduceTask.getTaskBatchId(), + () -> new SnailJobServerException("taskBatchId can not be null")); String key = MessageFormat.format(KEY, reduceTask.getTaskBatchId(), reduceTask.getJobId()); distributedLockHandler.lockWithDisposableAndRetry(() -> { - doReduce(reduceTask); - }, key, Duration.ofSeconds(1), Duration.ofSeconds(2), 3); + doReduce(reduceTask); + }, key, Duration.ofSeconds(1), Duration.ofSeconds(2), 3); } catch (Exception e) { SnailJobLog.LOCAL.error("Reduce processing exception. [{}]", reduceTask, e); } @@ -73,10 +78,10 @@ public class ReduceActor extends AbstractActor { private void doReduce(final ReduceTaskDTO reduceTask) { List jobTasks = jobTaskMapper.selectList(new PageDTO<>(1, 1), - new LambdaQueryWrapper() - .select(JobTask::getId) - .eq(JobTask::getTaskBatchId, reduceTask.getTaskBatchId()) - .eq(JobTask::getMrStage, reduceTask.getMrStage()) + new LambdaQueryWrapper() + .select(JobTask::getId) + .eq(JobTask::getTaskBatchId, reduceTask.getTaskBatchId()) + .eq(JobTask::getMrStage, reduceTask.getMrStage()) ); if (CollUtil.isNotEmpty(jobTasks)) { @@ -95,25 +100,40 @@ public class ReduceActor extends AbstractActor { JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job); context.setTaskBatchId(reduceTask.getTaskBatchId()); context.setMrStage(reduceTask.getMrStage()); + context.setWfContext(reduceTask.getWfContext()); List taskList = taskInstance.generate(context); if (CollUtil.isEmpty(taskList)) { SnailJobLog.LOCAL.warn("Job task is empty, taskBatchId:[{}]", reduceTask.getTaskBatchId()); return; } + String wfContext = null; + if (Objects.nonNull(reduceTask.getWorkflowTaskBatchId())) { + WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne( + new LambdaQueryWrapper() + .select(WorkflowTaskBatch::getWfContext) + .eq(WorkflowTaskBatch::getId, reduceTask.getWorkflowTaskBatchId()) + ); + wfContext = workflowTaskBatch.getWfContext(); + } + // 执行任务 JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(JobTaskTypeEnum.MAP_REDUCE.getType()); - jobExecutor.execute(buildJobExecutorContext(reduceTask, job, taskList)); + jobExecutor.execute(buildJobExecutorContext(reduceTask, job, taskList, wfContext)); } - private static JobExecutorContext buildJobExecutorContext(ReduceTaskDTO reduceTask, Job job, - List taskList) { + private static JobExecutorContext buildJobExecutorContext( + ReduceTaskDTO reduceTask, + Job job, + List taskList, + String wfContext) { JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job); context.setTaskList(taskList); context.setTaskBatchId(reduceTask.getTaskBatchId()); context.setWorkflowTaskBatchId(reduceTask.getWorkflowTaskBatchId()); context.setWorkflowNodeId(reduceTask.getWorkflowNodeId()); + context.setWfContext(wfContext); return context; } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/JobTaskGenerateContext.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/JobTaskGenerateContext.java index cea89c8d4..7f8a9d4ee 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/JobTaskGenerateContext.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/generator/task/JobTaskGenerateContext.java @@ -50,4 +50,7 @@ public class JobTaskGenerateContext { * 父任务id */ private Long parentId; + + + private String wfContext; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java index 7fdb2b4b5..53028cc87 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java @@ -21,8 +21,10 @@ import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerat import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerator; import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper; import com.aizuda.snailjob.template.datasource.persistence.po.Job; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; +import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; @@ -30,6 +32,7 @@ import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; import java.util.List; +import java.util.Map; import java.util.Objects; @@ -43,7 +46,7 @@ import java.util.Objects; @Component @RequiredArgsConstructor public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler { - + private final WorkflowTaskBatchMapper workflowTaskBatchMapper; private final JobMapper jobMapper; @Override @@ -73,10 +76,12 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler { context.setNamespaceId(HttpHeaderUtil.getNamespace(headers)); context.setMrStage(MapReduceStageEnum.MAP.getStage()); context.setMapSubTask(mapTaskRequest.getSubTask()); + context.setWfContext(mapTaskRequest.getWfContext()); List taskList = taskInstance.generate(context); if (CollUtil.isEmpty(taskList)) { return JsonUtil.toJsonString( - new NettyResult(StatusEnum.NO.getStatus(), "Job task is empty", Boolean.FALSE, retryRequest.getReqId())); + new NettyResult(StatusEnum.NO.getStatus(), "Job task is empty", Boolean.FALSE, + retryRequest.getReqId())); } Job job = jobMapper.selectOne(new LambdaQueryWrapper() @@ -91,9 +96,19 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler { retryRequest.getReqId())); } + String newWfContext = null; + if (Objects.nonNull(mapTaskRequest.getWorkflowTaskBatchId())) { + WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne( + new LambdaQueryWrapper() + .select(WorkflowTaskBatch::getWfContext) + .eq(WorkflowTaskBatch::getId, mapTaskRequest.getWorkflowTaskBatchId()) + ); + newWfContext = workflowTaskBatch.getWfContext(); + } + // 执行任务 JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(JobTaskTypeEnum.MAP_REDUCE.getType()); - jobExecutor.execute(buildJobExecutorContext(mapTaskRequest, job, taskList)); + jobExecutor.execute(buildJobExecutorContext(mapTaskRequest, job, taskList, newWfContext)); return JsonUtil.toJsonString( new NettyResult(StatusEnum.YES.getStatus(), "Report Map Task Processed Successfully", Boolean.TRUE, @@ -101,12 +116,13 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler { } private static JobExecutorContext buildJobExecutorContext(MapTaskRequest mapTaskRequest, Job job, - List taskList) { + List taskList, String newWfContext) { JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job); context.setTaskList(taskList); context.setTaskBatchId(mapTaskRequest.getTaskBatchId()); context.setWorkflowTaskBatchId(mapTaskRequest.getWorkflowTaskBatchId()); context.setWorkflowNodeId(mapTaskRequest.getWorkflowNodeId()); + context.setWfContext(newWfContext); return context; }