fix(sj_1.1.0-beta2): 1. 修复MAP任务失败 2. 获取配置的reduce 分片数

This commit is contained in:
opensnail 2024-06-25 23:04:44 +08:00
parent 2d3ada1634
commit ca2ada3875
6 changed files with 56 additions and 21 deletions

View File

@ -17,5 +17,6 @@ public class CompleteJobBatchDTO {
private Long taskBatchId;
private Integer jobOperationReason;
private Object result;
private Integer taskType;
}

View File

@ -0,0 +1,16 @@
package com.aizuda.snailjob.server.job.task.dto;
import lombok.Data;
/**
* @author opensnail
* @date 2024-06-25 22:58:05
* @since sj_1.1.0
*/
@Data
public class MapReduceArgsStrDTO {
private Integer shardNum;
private String argsStr;
}

View File

@ -1,10 +1,15 @@
package com.aizuda.snailjob.server.job.task.support.block.job;
import akka.actor.ActorRef;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.snailjob.server.job.task.support.JobExecutor;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
@ -19,6 +24,7 @@ import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.stream.Stream;
/**
* 重新触发执行失败的任务
@ -29,7 +35,7 @@ import java.util.List;
*/
@Component
@RequiredArgsConstructor
public class DiscardRetryBlockStrategy extends AbstracJobBlockStrategy {
public class RecoveryBlockStrategy extends AbstracJobBlockStrategy {
private final JobTaskMapper jobTaskMapper;
private final JobMapper jobMapper;
@Override
@ -42,18 +48,26 @@ public class DiscardRetryBlockStrategy extends AbstracJobBlockStrategy {
new LambdaQueryWrapper<JobTask>()
.select(JobTask::getId, JobTask::getTaskStatus)
.eq(JobTask::getTaskBatchId, context.getTaskBatchId())
.eq(JobTask::getTaskStatus, JobTaskStatusEnum.NOT_SUCCESS)
);
// 若任务项为空则生成任务项
if (CollUtil.isEmpty(jobTasks)) {
SnailJobLog.LOCAL.warn("No executable job task. taskBatchId:[{}]", context.getTaskBatchId());
TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO();
taskExecuteDTO.setTaskBatchId(context.getTaskBatchId());
taskExecuteDTO.setJobId(context.getJobId());
taskExecuteDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_JOB.getType());
taskExecuteDTO.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
taskExecuteDTO.setWorkflowNodeId(context.getWorkflowNodeId());
ActorRef actorRef = ActorGenerator.jobTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
return;
}
Job job = jobMapper.selectById(context.getJobId());
// 执行任务
// 执行任务 Stop or Fail 任务
JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(context.getTaskType());
jobExecutor.execute(buildJobExecutorContext(context, job, jobTasks));
jobExecutor.execute(buildJobExecutorContext(context, job,
StreamUtils.filter(jobTasks, (jobTask) -> JobTaskStatusEnum.NOT_SUCCESS.contains(jobTask.getTaskStatus()))));
}
@Override

View File

@ -51,6 +51,7 @@ public class JobExecutorResultActor extends AbstractActor {
Assert.notNull(result.getTaskId(), ()-> new SnailJobServerException("taskId can not be null"));
Assert.notNull(result.getJobId(), ()-> new SnailJobServerException("jobId can not be null"));
Assert.notNull(result.getTaskBatchId(), ()-> new SnailJobServerException("taskBatchId can not be null"));
Assert.notNull(result.getTaskType(), ()-> new SnailJobServerException("taskType can not be null"));
JobTask jobTask = new JobTask();
jobTask.setTaskStatus(result.getTaskStatus());
@ -63,9 +64,6 @@ public class JobExecutorResultActor extends AbstractActor {
new LambdaUpdateWrapper<JobTask>().eq(JobTask::getId, result.getTaskId())),
() -> new SnailJobServerException("更新任务实例失败"));
// 更新工作流的全局上下文 如果并发更新失败则需要自旋重试更新
// workflowBatchHandler.mergeWorkflowContextAndRetry(result.getWorkflowTaskBatchId(), result.getWfContext());
// 除MAP和MAP_REDUCE 任务之外其他任务都是叶子节点
if (Objects.nonNull(result.getIsLeaf()) && StatusEnum.NO.getStatus().equals(result.getIsLeaf())) {
return;

View File

@ -16,6 +16,7 @@ import com.aizuda.snailjob.server.common.cache.CacheRegisterTable;
import com.aizuda.snailjob.server.common.dto.RegisterNodeInfo;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.util.ClientInfoUtils;
import com.aizuda.snailjob.server.job.task.dto.MapReduceArgsStrDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
@ -43,6 +44,8 @@ import java.util.*;
@RequiredArgsConstructor
public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
private static final String MERGE_REDUCE_TASK = "MERGE_REDUCE_TASK";
private static final String REDUCE_TASK = "REDUCE_TASK";
private final JobTaskMapper jobTaskMapper;
private final TransactionTemplate transactionTemplate;
@ -53,7 +56,6 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
@Override
protected List<JobTask> doGenerate(final JobTaskGenerateContext context) {
// TODO 若没有客户端节点JobTask是否需要创建????
Set<RegisterNodeInfo> serverNodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(),
context.getNamespaceId());
if (CollUtil.isEmpty(serverNodes)) {
@ -64,9 +66,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
List<RegisterNodeInfo> nodeInfoList = new ArrayList<>(serverNodes);
MapReduceStageEnum mapReduceStageEnum = MapReduceStageEnum.ofStage(context.getMrStage());
Assert.notNull(mapReduceStageEnum, () -> new SnailJobServerException("Map reduce stage is not existed"));
// todo 待优化
switch (mapReduceStageEnum) {
switch (Objects.requireNonNull(mapReduceStageEnum)) {
case MAP -> {
// MAP任务
return createMapJobTasks(context, nodeInfoList, serverNodes);
@ -105,7 +105,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
jobTask.setMrStage(MapReduceStageEnum.MERGE_REDUCE.getStage());
jobTask.setTaskName("MERGE_REDUCE_TASK");
jobTask.setTaskName(MERGE_REDUCE_TASK);
Assert.isTrue(1 == jobTaskMapper.insert(jobTask),
() -> new SnailJobServerException("新增任务实例失败"));
@ -115,8 +115,15 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
private List<JobTask> createReduceJobTasks(JobTaskGenerateContext context, List<RegisterNodeInfo> nodeInfoList,
Set<RegisterNodeInfo> serverNodes) {
// TODO reduce阶段的并行度
int reduceParallel = 2;
int reduceParallel = 1;
String jobParams = null;
try {
MapReduceArgsStrDTO mapReduceArgsStrDTO = JsonUtil.parseObject(context.getArgsStr(), MapReduceArgsStrDTO.class);
reduceParallel = Optional.ofNullable(mapReduceArgsStrDTO.getShardNum()).orElse(1);
jobParams = mapReduceArgsStrDTO.getArgsStr();
} catch (Exception e) {
SnailJobLog.LOCAL.error("map reduce args parse error. argsStr:[{}]", context.getArgsStr());
}
List<JobTask> jobTasks = jobTaskMapper.selectList(new LambdaQueryWrapper<JobTask>()
.select(JobTask::getResultMessage)
@ -132,6 +139,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
jobTasks = new ArrayList<>(partition.size());
final List<JobTask> finalJobTasks = jobTasks;
String finalJobParams = jobParams;
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {
@ -142,13 +150,13 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo));
jobTask.setArgsType(context.getArgsType());
JobArgsHolder jobArgsHolder = new JobArgsHolder();
jobArgsHolder.setJobParams(StrUtil.isBlank(context.getArgsStr()) ? null : context.getArgsStr());
jobArgsHolder.setJobParams(finalJobParams);
jobArgsHolder.setMaps(JsonUtil.toJsonString(partition.get(index)));
jobTask.setArgsStr(JsonUtil.toJsonString(jobArgsHolder));
jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
jobTask.setMrStage(MapReduceStageEnum.REDUCE.getStage());
jobTask.setTaskName("REDUCE_TASK");
jobTask.setTaskName(REDUCE_TASK);
jobTask.setParentId(0L);
jobTask.setRetryCount(0);
jobTask.setLeaf(StatusEnum.YES.getStatus());

View File

@ -95,12 +95,10 @@ public class JobTaskBatchHandler {
} else if (stopCount > 0) {
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.STOP.getStatus());
} else {
// todo 调试完成删除
SnailJobLog.LOCAL.info("尝试完成任务. taskBatchId:[{}] [{}]", completeJobBatchDTO.getTaskBatchId(),
JsonUtil.toJsonString(jobTasks));
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
if (needReduceTask(completeJobBatchDTO, jobTasks)) {
if (needReduceTask(completeJobBatchDTO, jobTasks)
&& JobTaskTypeEnum.MAP_REDUCE.getType() == completeJobBatchDTO.getTaskType()) {
// 此时中断批次完成需要开启reduce任务
return false;
}