feat(sj_1.1.0-beta2): 工作流动态分片的任务

This commit is contained in:
opensnail 2024-06-28 15:02:45 +08:00
parent 3bb60dc80a
commit d03ac24715
11 changed files with 87 additions and 30 deletions

View File

@ -50,8 +50,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
// 将任务添加到时间轮中到期停止任务 // 将任务添加到时间轮中到期停止任务
TimerManager.add(new StopTaskTimerTask(jobContext.getTaskBatchId()), jobContext.getExecutorTimeout(), TimeUnit.SECONDS); TimerManager.add(new StopTaskTimerTask(jobContext.getTaskBatchId()), jobContext.getExecutorTimeout(), TimeUnit.SECONDS);
Map<String, Object> changeWfContext = Maps.newConcurrentMap(); jobContext.setChangeWfContext(Maps.newConcurrentMap());
// 执行任务 // 执行任务
ListenableFuture<ExecuteResult> submit = decorator.submit(() -> { ListenableFuture<ExecuteResult> submit = decorator.submit(() -> {
JobArgs jobArgs; JobArgs jobArgs;
@ -72,7 +71,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
} }
jobArgs.setWfContext(jobContext.getWfContext()); jobArgs.setWfContext(jobContext.getWfContext());
jobArgs.setChangeWfContext(changeWfContext); jobArgs.setChangeWfContext(jobContext.getChangeWfContext());
try { try {
// 初始化调度信息日志上报LogUtil // 初始化调度信息日志上报LogUtil
@ -86,7 +85,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
}); });
FutureCache.addFuture(jobContext.getTaskBatchId(), submit); FutureCache.addFuture(jobContext.getTaskBatchId(), submit);
Futures.addCallback(submit, new JobExecutorFutureCallback(jobContext, changeWfContext), decorator); Futures.addCallback(submit, new JobExecutorFutureCallback(jobContext), decorator);
} }
private void initLogContext(JobContext jobContext) { private void initLogContext(JobContext jobContext) {

View File

@ -64,8 +64,8 @@ public abstract class AbstractMapExecutor extends AbstractJobExecutor implements
if (Objects.isNull(method)) { if (Objects.isNull(method)) {
throw new SnailJobMapReduceException( throw new SnailJobMapReduceException(
"[{}] MapTask execution method not found. Please configure the @MapExecutor annotation", "[{}#{}] MapTask execution method not found. Please configure the @MapExecutor annotation",
mapArgs.getExecutorInfo()); mapArgs.getExecutorInfo(), mapArgs.getTaskName());
} }

View File

@ -66,11 +66,9 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
}).build(); }).build();
private final JobContext jobContext; private final JobContext jobContext;
private final Map<String, Object> changeWfContext;
public JobExecutorFutureCallback(final JobContext jobContext, Map<String, Object> changeWfContext) { public JobExecutorFutureCallback(final JobContext jobContext) {
this.jobContext = jobContext; this.jobContext = jobContext;
this.changeWfContext = changeWfContext;
} }
@Override @Override
@ -164,9 +162,9 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
dispatchJobRequest.setTaskStatus(status); dispatchJobRequest.setTaskStatus(status);
dispatchJobRequest.setRetry(jobContext.isRetry()); dispatchJobRequest.setRetry(jobContext.isRetry());
dispatchJobRequest.setRetryScene(jobContext.getRetryScene()); dispatchJobRequest.setRetryScene(jobContext.getRetryScene());
// 传递上下文 // 传递变更后的上下文
if (CollUtil.isNotEmpty(changeWfContext)) { if (CollUtil.isNotEmpty(jobContext.getChangeWfContext())) {
dispatchJobRequest.setWfContext(JsonUtil.toJsonString(changeWfContext)); dispatchJobRequest.setWfContext(JsonUtil.toJsonString(jobContext.getChangeWfContext()));
} }
return dispatchJobRequest; return dispatchJobRequest;

View File

@ -12,12 +12,15 @@ import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException;
import com.aizuda.snailjob.common.core.model.JobContext; import com.aizuda.snailjob.common.core.model.JobContext;
import com.aizuda.snailjob.common.core.model.NettyResult; import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.Result; import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog; import com.aizuda.snailjob.common.log.SnailJobLog;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects;
/** /**
* @author: opensnail * @author: opensnail
@ -72,6 +75,12 @@ public final class MapInvokeHandler implements InvocationHandler {
mapTaskRequest.setTaskName(nextTaskName); mapTaskRequest.setTaskName(nextTaskName);
mapTaskRequest.setSubTask(taskList); mapTaskRequest.setSubTask(taskList);
mapTaskRequest.setParentId(jobContext.getTaskId()); mapTaskRequest.setParentId(jobContext.getTaskId());
mapTaskRequest.setWorkflowTaskBatchId(jobContext.getWorkflowTaskBatchId());
mapTaskRequest.setWorkflowNodeId(jobContext.getWorkflowNodeId());
Map<String, Object> changeWfContext = jobContext.getChangeWfContext();
if (Objects.nonNull(changeWfContext)) {
mapTaskRequest.setWfContext(JsonUtil.toJsonString(changeWfContext));
}
// 2. 同步发送请求 // 2. 同步发送请求
Result<Boolean> result = CLIENT.batchReportMapTask(mapTaskRequest); Result<Boolean> result = CLIENT.batchReportMapTask(mapTaskRequest);

View File

@ -6,6 +6,7 @@ import jakarta.validation.constraints.NotNull;
import lombok.Data; import lombok.Data;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* @author: opensnail * @author: opensnail
@ -27,6 +28,11 @@ public class MapTaskRequest {
private Long workflowNodeId; private Long workflowNodeId;
/**
* 当前节点变更的工作流上下文
*/
private String wfContext;
@NotBlank(message = "taskName 不能为空") @NotBlank(message = "taskName 不能为空")
private String taskName; private String taskName;

View File

@ -70,6 +70,11 @@ public class JobContext {
*/ */
private Map<String, Object> wfContext; private Map<String, Object> wfContext;
/**
* 新增或者改动的上下文
*/
private Map<String, Object> changeWfContext;
/** /**
* 定时任务参数 * 定时任务参数
*/ */

View File

@ -14,4 +14,5 @@ public class ReduceTaskDTO {
private Long taskBatchId; private Long taskBatchId;
private Long jobId; private Long jobId;
private Integer mrStage; private Integer mrStage;
private String wfContext;
} }

View File

@ -22,6 +22,6 @@ public class MapClientCallbackHandler extends MapReduceClientCallbackHandler {
@Override @Override
public void callback(ClientCallbackContext context) { public void callback(ClientCallbackContext context) {
super.callback(context); super.doCallback(context);
} }
} }

View File

@ -20,8 +20,10 @@ import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerat
import com.aizuda.snailjob.server.job.task.support.handler.DistributedLockHandler; import com.aizuda.snailjob.server.job.task.support.handler.DistributedLockHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job; import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
@ -32,6 +34,7 @@ import org.springframework.stereotype.Component;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.time.Duration; import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.Objects;
/** /**
* 负责生成reduce任务并执行 * 负责生成reduce任务并执行
@ -44,21 +47,23 @@ import java.util.List;
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@RequiredArgsConstructor @RequiredArgsConstructor
public class ReduceActor extends AbstractActor { public class ReduceActor extends AbstractActor {
private static final String KEY = "job_generate_reduce_{0}_{1}"; private static final String KEY = "job_generate_reduce_{0}_{1}";
private final DistributedLockHandler distributedLockHandler; private final DistributedLockHandler distributedLockHandler;
private final JobMapper jobMapper; private final JobMapper jobMapper;
private final JobTaskMapper jobTaskMapper; private final JobTaskMapper jobTaskMapper;
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
@Override @Override
public Receive createReceive() { public Receive createReceive() {
return receiveBuilder().match(ReduceTaskDTO.class, reduceTask -> { return receiveBuilder().match(ReduceTaskDTO.class, reduceTask -> {
SnailJobLog.LOCAL.info("执行Reduce, [{}]", JsonUtil.toJsonString(reduceTask)); SnailJobLog.LOCAL.info("执行Reduce, [{}]", JsonUtil.toJsonString(reduceTask));
try { try {
Assert.notNull(reduceTask.getMrStage(), ()-> new SnailJobServerException("mrStage can not be null")); Assert.notNull(reduceTask.getMrStage(), () -> new SnailJobServerException("mrStage can not be null"));
Assert.notNull(reduceTask.getJobId(), ()-> new SnailJobServerException("jobId can not be null")); Assert.notNull(reduceTask.getJobId(), () -> new SnailJobServerException("jobId can not be null"));
Assert.notNull(reduceTask.getTaskBatchId(), ()-> new SnailJobServerException("taskBatchId can not be null")); Assert.notNull(reduceTask.getTaskBatchId(),
() -> new SnailJobServerException("taskBatchId can not be null"));
String key = MessageFormat.format(KEY, reduceTask.getTaskBatchId(), reduceTask.getJobId()); String key = MessageFormat.format(KEY, reduceTask.getTaskBatchId(), reduceTask.getJobId());
distributedLockHandler.lockWithDisposableAndRetry(() -> { distributedLockHandler.lockWithDisposableAndRetry(() -> {
doReduce(reduceTask); doReduce(reduceTask);
@ -95,25 +100,40 @@ public class ReduceActor extends AbstractActor {
JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job); JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job);
context.setTaskBatchId(reduceTask.getTaskBatchId()); context.setTaskBatchId(reduceTask.getTaskBatchId());
context.setMrStage(reduceTask.getMrStage()); context.setMrStage(reduceTask.getMrStage());
context.setWfContext(reduceTask.getWfContext());
List<JobTask> taskList = taskInstance.generate(context); List<JobTask> taskList = taskInstance.generate(context);
if (CollUtil.isEmpty(taskList)) { if (CollUtil.isEmpty(taskList)) {
SnailJobLog.LOCAL.warn("Job task is empty, taskBatchId:[{}]", reduceTask.getTaskBatchId()); SnailJobLog.LOCAL.warn("Job task is empty, taskBatchId:[{}]", reduceTask.getTaskBatchId());
return; return;
} }
String wfContext = null;
if (Objects.nonNull(reduceTask.getWorkflowTaskBatchId())) {
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne(
new LambdaQueryWrapper<WorkflowTaskBatch>()
.select(WorkflowTaskBatch::getWfContext)
.eq(WorkflowTaskBatch::getId, reduceTask.getWorkflowTaskBatchId())
);
wfContext = workflowTaskBatch.getWfContext();
}
// 执行任务 // 执行任务
JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(JobTaskTypeEnum.MAP_REDUCE.getType()); JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(JobTaskTypeEnum.MAP_REDUCE.getType());
jobExecutor.execute(buildJobExecutorContext(reduceTask, job, taskList)); jobExecutor.execute(buildJobExecutorContext(reduceTask, job, taskList, wfContext));
} }
private static JobExecutorContext buildJobExecutorContext(ReduceTaskDTO reduceTask, Job job, private static JobExecutorContext buildJobExecutorContext(
List<JobTask> taskList) { ReduceTaskDTO reduceTask,
Job job,
List<JobTask> taskList,
String wfContext) {
JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job); JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job);
context.setTaskList(taskList); context.setTaskList(taskList);
context.setTaskBatchId(reduceTask.getTaskBatchId()); context.setTaskBatchId(reduceTask.getTaskBatchId());
context.setWorkflowTaskBatchId(reduceTask.getWorkflowTaskBatchId()); context.setWorkflowTaskBatchId(reduceTask.getWorkflowTaskBatchId());
context.setWorkflowNodeId(reduceTask.getWorkflowNodeId()); context.setWorkflowNodeId(reduceTask.getWorkflowNodeId());
context.setWfContext(wfContext);
return context; return context;
} }
} }

View File

@ -50,4 +50,7 @@ public class JobTaskGenerateContext {
* 父任务id * 父任务id
*/ */
private Long parentId; private Long parentId;
private String wfContext;
} }

View File

@ -21,8 +21,10 @@ import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerat
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerator; import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerator;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory; import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job; import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpMethod;
@ -30,6 +32,7 @@ import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -43,7 +46,7 @@ import java.util.Objects;
@Component @Component
@RequiredArgsConstructor @RequiredArgsConstructor
public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler { public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
private final JobMapper jobMapper; private final JobMapper jobMapper;
@Override @Override
@ -73,10 +76,12 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
context.setNamespaceId(HttpHeaderUtil.getNamespace(headers)); context.setNamespaceId(HttpHeaderUtil.getNamespace(headers));
context.setMrStage(MapReduceStageEnum.MAP.getStage()); context.setMrStage(MapReduceStageEnum.MAP.getStage());
context.setMapSubTask(mapTaskRequest.getSubTask()); context.setMapSubTask(mapTaskRequest.getSubTask());
context.setWfContext(mapTaskRequest.getWfContext());
List<JobTask> taskList = taskInstance.generate(context); List<JobTask> taskList = taskInstance.generate(context);
if (CollUtil.isEmpty(taskList)) { if (CollUtil.isEmpty(taskList)) {
return JsonUtil.toJsonString( return JsonUtil.toJsonString(
new NettyResult(StatusEnum.NO.getStatus(), "Job task is empty", Boolean.FALSE, retryRequest.getReqId())); new NettyResult(StatusEnum.NO.getStatus(), "Job task is empty", Boolean.FALSE,
retryRequest.getReqId()));
} }
Job job = jobMapper.selectOne(new LambdaQueryWrapper<Job>() Job job = jobMapper.selectOne(new LambdaQueryWrapper<Job>()
@ -91,9 +96,19 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
retryRequest.getReqId())); retryRequest.getReqId()));
} }
String newWfContext = null;
if (Objects.nonNull(mapTaskRequest.getWorkflowTaskBatchId())) {
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne(
new LambdaQueryWrapper<WorkflowTaskBatch>()
.select(WorkflowTaskBatch::getWfContext)
.eq(WorkflowTaskBatch::getId, mapTaskRequest.getWorkflowTaskBatchId())
);
newWfContext = workflowTaskBatch.getWfContext();
}
// 执行任务 // 执行任务
JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(JobTaskTypeEnum.MAP_REDUCE.getType()); JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(JobTaskTypeEnum.MAP_REDUCE.getType());
jobExecutor.execute(buildJobExecutorContext(mapTaskRequest, job, taskList)); jobExecutor.execute(buildJobExecutorContext(mapTaskRequest, job, taskList, newWfContext));
return JsonUtil.toJsonString( return JsonUtil.toJsonString(
new NettyResult(StatusEnum.YES.getStatus(), "Report Map Task Processed Successfully", Boolean.TRUE, new NettyResult(StatusEnum.YES.getStatus(), "Report Map Task Processed Successfully", Boolean.TRUE,
@ -101,12 +116,13 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
} }
private static JobExecutorContext buildJobExecutorContext(MapTaskRequest mapTaskRequest, Job job, private static JobExecutorContext buildJobExecutorContext(MapTaskRequest mapTaskRequest, Job job,
List<JobTask> taskList) { List<JobTask> taskList, String newWfContext) {
JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job); JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job);
context.setTaskList(taskList); context.setTaskList(taskList);
context.setTaskBatchId(mapTaskRequest.getTaskBatchId()); context.setTaskBatchId(mapTaskRequest.getTaskBatchId());
context.setWorkflowTaskBatchId(mapTaskRequest.getWorkflowTaskBatchId()); context.setWorkflowTaskBatchId(mapTaskRequest.getWorkflowTaskBatchId());
context.setWorkflowNodeId(mapTaskRequest.getWorkflowNodeId()); context.setWorkflowNodeId(mapTaskRequest.getWorkflowNodeId());
context.setWfContext(newWfContext);
return context; return context;
} }