fix(sj_1.1.0): reduce主流程完成

This commit is contained in:
opensnail 2024-06-13 15:20:11 +08:00
parent 93fd46001e
commit ae31c5ef87
10 changed files with 210 additions and 19 deletions

View File

@ -42,6 +42,7 @@ public class ActorGenerator {
public static final String SCAN_JOB_ACTOR = "ScanJobActor";
public static final String SCAN_WORKFLOW_ACTOR = "ScanWorkflowTaskActor";
public static final String JOB_TASK_PREPARE_ACTOR = "JobTaskPrepareActor";
public static final String JOB_REDUCE_ACTOR = "JobReduceActor";
public static final String WORKFLOW_TASK_PREPARE_ACTOR = "WorkflowTaskPrepareActor";
public static final String JOB_EXECUTOR_ACTOR = "JobExecutorActor";
public static final String WORKFLOW_EXECUTOR_ACTOR = "WorkflowExecutorActor";
@ -199,6 +200,17 @@ public class ActorGenerator {
.withDispatcher(JOB_TASK_DISPATCHER));
}
/**
* 动态分片任务处理reduce阶段actor
*
* @return actor 引用
*/
public static ActorRef jobReduceActor() {
return getJobActorSystem().actorOf(getSpringExtension().props(JOB_REDUCE_ACTOR)
.withDispatcher(JOB_TASK_EXECUTOR_DISPATCHER));
}
/**
* Job调度准备阶段actor
*

View File

@ -0,0 +1,16 @@
package com.aizuda.snailjob.server.job.task.dto;
import lombok.Data;
/**
* @author: shuguang.zhang
* @date : 2024-06-12
*/
@Data
public class ReduceTaskDTO {
private Long workflowNodeId;
private Long workflowTaskBatchId;
private Long taskBatchId;
private Long jobId;
}

View File

@ -136,6 +136,5 @@ public interface JobTaskConverter {
JobLogMessage toJobLogMessage(JobLogMessage jobLogMessage);
List<JobAlarmInfo> toJobAlarmInfos(List<JobTaskFailAlarmEvent> events);
ReduceTaskDTO toReduceTaskDTO(CompleteJobBatchDTO jobBatchDTO);
}

View File

@ -131,7 +131,7 @@ public class JobExecutorActor extends AbstractActor {
JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(job.getTaskType());
JobTaskGenerateContext instanceGenerateContext = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job);
instanceGenerateContext.setTaskBatchId(taskExecute.getTaskBatchId());
instanceGenerateContext.setTaskName("ROOT_TASK");
instanceGenerateContext.setMapName("ROOT_TASK");
List<JobTask> taskList = taskInstance.generate(instanceGenerateContext);
if (CollUtil.isEmpty(taskList)) {
return;

View File

@ -0,0 +1,81 @@
package com.aizuda.snailjob.server.job.task.support.dispatch;
import akka.actor.AbstractActor;
import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.job.task.dto.ReduceTaskDTO;
import com.aizuda.snailjob.server.job.task.enums.MapReduceStageEnum;
import com.aizuda.snailjob.server.job.task.support.JobExecutor;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext;
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorFactory;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerator;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 负责生成reduce任务并执行
*
* @author: opensnail
* @date : 2024-06-12
* @since : sj_1.1.0
*/
@Component(ActorGenerator.JOB_REDUCE_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@RequiredArgsConstructor
public class ReduceActor extends AbstractActor {
private final JobMapper jobMapper;
@Override
public Receive createReceive() {
return receiveBuilder().match(ReduceTaskDTO.class, reduceTask -> {
try {
doReduce(reduceTask);
} catch (Exception e) {
SnailJobLog.LOCAL.error("Reduce processing exception. [{}]", reduceTask, e);
}
}).build();
}
private void doReduce(final ReduceTaskDTO reduceTask) {
Job job = jobMapper.selectById(reduceTask.getJobId());
// 创建reduce任务
JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(JobTaskTypeEnum.MAP_REDUCE.getType());
JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job);
context.setMrStage(MapReduceStageEnum.REDUCE);
List<JobTask> taskList = taskInstance.generate(context);
if (CollUtil.isEmpty(taskList)) {
return;
}
// 执行任务
JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(JobTaskTypeEnum.MAP_REDUCE.getType());
jobExecutor.execute(buildJobExecutorContext(reduceTask, job, taskList));
}
private static JobExecutorContext buildJobExecutorContext(ReduceTaskDTO reduceTask, Job job,
List<JobTask> taskList) {
JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job);
context.setTaskList(taskList);
context.setTaskBatchId(reduceTask.getTaskBatchId());
context.setWorkflowTaskBatchId(reduceTask.getWorkflowTaskBatchId());
context.setWorkflowNodeId(reduceTask.getWorkflowNodeId());
return context;
}
}

View File

@ -27,13 +27,11 @@ public class MapReduceJobExecutor extends AbstractJobExecutor {
@Override
protected void doExecute(final JobExecutorContext context) {
List<JobTask> taskList = context.getTaskList();
for (int i = 0; i < taskList.size(); i++) {
JobTask jobTask = taskList.get(i);
for (final JobTask jobTask : taskList) {
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(context, jobTask);
realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo()));
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
actorRef.tell(realJobExecutor, actorRef);
}
}
}

View File

@ -1,5 +1,6 @@
package com.aizuda.snailjob.server.job.task.support.generator.task;
import com.aizuda.snailjob.server.job.task.enums.MapReduceStageEnum;
import lombok.Data;
import java.util.List;
@ -38,5 +39,10 @@ public class JobTaskGenerateContext {
/**
* 任务名称
*/
private String taskName;
private String mapName;
/**
* 动态分片的阶段
*/
private MapReduceStageEnum mrStage;
}

View File

@ -6,18 +6,21 @@ import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
import com.aizuda.snailjob.server.job.task.dto.JobTaskExtAttrsDTO;
import com.aizuda.snailjob.server.job.task.enums.MapReduceStageEnum;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.jetbrains.annotations.Nullable;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
@ -26,7 +29,7 @@ import java.util.Optional;
import java.util.Set;
/**
* 生成Map任务
* 生成Map Reduce任务
*
* @author: opensnail
* @date : 2024-06-12
@ -37,6 +40,7 @@ import java.util.Set;
public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
private final JobTaskMapper jobTaskMapper;
@Override
public JobTaskTypeEnum getTaskInstanceType() {
return JobTaskTypeEnum.MAP_REDUCE;
@ -44,19 +48,80 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
@Override
protected List<JobTask> doGenerate(final JobTaskGenerateContext context) {
Set<RegisterNodeInfo> serverNodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId());
// TODO 若没有客户端节点JobTask是否需要创建????
Set<RegisterNodeInfo> serverNodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(),
context.getNamespaceId());
if (CollUtil.isEmpty(serverNodes)) {
SnailJobLog.LOCAL.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
return Lists.newArrayList();
}
List<RegisterNodeInfo> nodeInfoList = new ArrayList<>(serverNodes);
switch (context.getMrStage()) {
case MAP -> {
// MAP任务
List<JobTask> newArrayList = createMapJobTasks(context, nodeInfoList, serverNodes);
if (newArrayList != null) {
return newArrayList;
}
}
case REDUCE -> {
createReduceJobTasks(context, nodeInfoList, serverNodes);
}
default -> throw new SnailJobServerException("Map reduce stage is not existed");
}
return Lists.newArrayList();
}
private void createReduceJobTasks(JobTaskGenerateContext context, List<RegisterNodeInfo> nodeInfoList,
Set<RegisterNodeInfo> serverNodes) {
// TODO reduce阶段的并行度
int reduceParallel = 10;
List<String> allMapJobTasks = StreamUtils.toList(jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getResultMessage)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId())
), JobTask::getResultMessage);
List<List<String>> partition = Lists.partition(allMapJobTasks, reduceParallel);
JobTaskExtAttrsDTO jobTaskExtAttrsDTO = new JobTaskExtAttrsDTO();
jobTaskExtAttrsDTO.setMapName(context.getMapName());
jobTaskExtAttrsDTO.setTaskType(JobTaskTypeEnum.MAP_REDUCE.getType());
jobTaskExtAttrsDTO.setMrStage(MapReduceStageEnum.REDUCE.name());
List<JobTask> jobTasks = new ArrayList<>(partition.size());
for (int index = 0; index < partition.size(); index++) {
RegisterNodeInfo registerNodeInfo = nodeInfoList.get(index % serverNodes.size());
// 新增任务实例
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo));
jobTask.setArgsType(context.getArgsType());
jobTask.setArgsStr(JsonUtil.toJsonString(partition.get(index)));
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
jobTask.setExtAttrs(jobTaskExtAttrsDTO.toString());
Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
() -> new SnailJobServerException("新增任务实例失败"));
jobTasks.add(jobTask);
}
}
private @Nullable List<JobTask> createMapJobTasks(final JobTaskGenerateContext context,
final List<RegisterNodeInfo> nodeInfoList, final Set<RegisterNodeInfo> serverNodes) {
List<?> mapSubTask = context.getMapSubTask();
if (CollUtil.isEmpty(mapSubTask)) {
return Lists.newArrayList();
}
List<RegisterNodeInfo> nodeInfoList = new ArrayList<>(serverNodes);
JobTaskExtAttrsDTO jobTaskExtAttrsDTO = new JobTaskExtAttrsDTO();
jobTaskExtAttrsDTO.setMapName(context.getMapName());
jobTaskExtAttrsDTO.setTaskType(JobTaskTypeEnum.MAP_REDUCE.getType());
jobTaskExtAttrsDTO.setMrStage(MapReduceStageEnum.MAP.name());
List<JobTask> jobTasks = new ArrayList<>(mapSubTask.size());
for (int index = 0; index < mapSubTask.size(); index++) {
RegisterNodeInfo registerNodeInfo = nodeInfoList.get(index % serverNodes.size());
@ -67,12 +132,12 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
jobTask.setArgsStr(JsonUtil.toJsonString(mapSubTask.get(index)));
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new SnailJobServerException("新增任务实例失败"));
jobTask.setExtAttrs(jobTaskExtAttrsDTO.toString());
Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
() -> new SnailJobServerException("新增任务实例失败"));
jobTasks.add(jobTask);
}
return jobTasks;
return Lists.newArrayList();
}
}

View File

@ -1,5 +1,6 @@
package com.aizuda.snailjob.server.job.task.support.handler;
import akka.actor.ActorRef;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.common.core.context.SpringContext;
@ -9,7 +10,9 @@ import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.dto.DistributeInstance;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
@ -17,9 +20,11 @@ import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.CompleteJobBatchDTO;
import com.aizuda.snailjob.server.job.task.dto.JobTaskExtAttrsDTO;
import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.snailjob.server.job.task.dto.ReduceTaskDTO;
import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.enums.MapReduceStageEnum;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.cache.ResidentTaskCache;
import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel;
@ -94,6 +99,7 @@ public class JobTaskBatchHandler {
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
// 判断是否是mapreduce任务
// todo 此处待优化
JobTask firstJobTask = jobTasks.get(0);
String extAttrs = firstJobTask.getExtAttrs();
if (StrUtil.isNotBlank(extAttrs)) {
@ -102,7 +108,13 @@ public class JobTaskBatchHandler {
Integer taskType = jobTaskExtAttrsDTO.getTaskType();
if (Objects.nonNull(taskType) && JobTaskTypeEnum.MAP_REDUCE.getType() == taskType) {
if (isAllMapTask(jobTasks)) {
// TODO 开启reduce阶段
// 开启reduce阶段
try {
ActorRef actorRef = ActorGenerator.jobReduceActor();
actorRef.tell(JobTaskConverter.INSTANCE.toReduceTaskDTO(completeJobBatchDTO), actorRef);
} catch (Exception e) {
SnailJobLog.LOCAL.error("tell reduce actor error", e);
}
}
}
}

View File

@ -11,6 +11,7 @@ import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler;
import com.aizuda.snailjob.server.common.util.HttpHeaderUtil;
import com.aizuda.snailjob.server.job.task.enums.MapReduceStageEnum;
import com.aizuda.snailjob.server.job.task.support.JobExecutor;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext;
@ -70,6 +71,7 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(mapTaskRequest);
context.setGroupName(HttpHeaderUtil.getGroupName(headers));
context.setNamespaceId(HttpHeaderUtil.getNamespace(headers));
context.setMrStage(MapReduceStageEnum.MAP);
List<JobTask> taskList = taskInstance.generate(context);
if (CollUtil.isEmpty(taskList)) {
return JsonUtil.toJsonString(