fix(sj_1.1.0): reduce任务创建成功

This commit is contained in:
opensnail 2024-06-14 15:00:57 +08:00
parent ffdc3d9dc2
commit a64ac93b06
9 changed files with 32 additions and 19 deletions

View File

@ -46,6 +46,8 @@ public class DispatchJobRequest {
*/ */
private String mapName; private String mapName;
private String mrStage;
private String argsStr; private String argsStr;
private Integer shardingTotal; private Integer shardingTotal;

View File

@ -11,6 +11,7 @@ import lombok.Data;
@Data @Data
public class CompleteJobBatchDTO { public class CompleteJobBatchDTO {
private Long jobId;
private Long workflowNodeId; private Long workflowNodeId;
private Long workflowTaskBatchId; private Long workflowTaskBatchId;
private Long taskBatchId; private Long taskBatchId;

View File

@ -31,10 +31,15 @@ public class RealJobExecutorDTO extends BaseDTO {
private String argsType; private String argsType;
/** /**
* 任务名称 * MAP名称
*/ */
private String mapName; private String mapName;
/**
* MAP名称
*/
private String mrState;
/** /**
* 扩展字段 * 扩展字段
*/ */

View File

@ -182,6 +182,7 @@ public class JobExecutorActor extends AbstractActor {
context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId()); context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
context.setWorkflowNodeId(taskExecute.getWorkflowNodeId()); context.setWorkflowNodeId(taskExecute.getWorkflowNodeId());
context.setMapName(SystemConstants.MAP_ROOT); context.setMapName(SystemConstants.MAP_ROOT);
context.setMrStage(MapReduceStageEnum.MAP.name());
return context; return context;
} }

View File

@ -18,6 +18,7 @@ import com.aizuda.snailjob.server.job.task.support.stop.TaskStopJobContext;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.beans.factory.config.ConfigurableBeanFactory;
@ -35,20 +36,17 @@ import java.util.Objects;
*/ */
@Component(ActorGenerator.JOB_EXECUTOR_RESULT_ACTOR) @Component(ActorGenerator.JOB_EXECUTOR_RESULT_ACTOR)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j @RequiredArgsConstructor
public class JobExecutorResultActor extends AbstractActor { public class JobExecutorResultActor extends AbstractActor {
private static final String KEY = "job_complete_{0}_{1}"; private static final String KEY = "job_complete_{0}_{1}";
@Autowired private final JobTaskMapper jobTaskMapper;
private JobTaskMapper jobTaskMapper; private final JobTaskBatchHandler jobTaskBatchHandler;
@Autowired private final DistributedLockHandler distributedLockHandler;
private JobTaskBatchHandler jobTaskBatchHandler;
@Autowired
private DistributedLockHandler distributedLockHandler;
@Override @Override
public Receive createReceive() { public Receive createReceive() {
return receiveBuilder().match(JobExecutorResultDTO.class, result -> { return receiveBuilder().match(JobExecutorResultDTO.class, result -> {
log.debug("更新任务状态. 参数:[{}]", JsonUtil.toJsonString(result)); SnailJobLog.LOCAL.info("更新任务状态. 参数:[{}]", JsonUtil.toJsonString(result));
try { try {
JobTask jobTask = new JobTask(); JobTask jobTask = new JobTask();
jobTask.setTaskStatus(result.getTaskStatus()); jobTask.setTaskStatus(result.getTaskStatus());

View File

@ -57,9 +57,11 @@ public class ReduceActor extends AbstractActor {
// 创建reduce任务 // 创建reduce任务
JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(JobTaskTypeEnum.MAP_REDUCE.getType()); JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(JobTaskTypeEnum.MAP_REDUCE.getType());
JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job); JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job);
context.setTaskBatchId(reduceTask.getTaskBatchId());
context.setMrStage(MapReduceStageEnum.REDUCE); context.setMrStage(MapReduceStageEnum.REDUCE);
List<JobTask> taskList = taskInstance.generate(context); List<JobTask> taskList = taskInstance.generate(context);
if (CollUtil.isEmpty(taskList)) { if (CollUtil.isEmpty(taskList)) {
SnailJobLog.LOCAL.warn("Job task is empty, taskBatchId:[{}]", reduceTask.getTaskBatchId());
return; return;
} }
@ -76,6 +78,7 @@ public class ReduceActor extends AbstractActor {
context.setTaskBatchId(reduceTask.getTaskBatchId()); context.setTaskBatchId(reduceTask.getTaskBatchId());
context.setWorkflowTaskBatchId(reduceTask.getWorkflowTaskBatchId()); context.setWorkflowTaskBatchId(reduceTask.getWorkflowTaskBatchId());
context.setWorkflowNodeId(reduceTask.getWorkflowNodeId()); context.setWorkflowNodeId(reduceTask.getWorkflowNodeId());
context.setMrStage(MapReduceStageEnum.REDUCE.name());
return context; return context;
} }
} }

View File

@ -92,4 +92,6 @@ public class JobExecutorContext {
private String mapName; private String mapName;
private String mrStage;
} }

View File

@ -70,9 +70,10 @@ public class JobTaskBatchHandler {
List<JobTask> jobTasks = jobTaskMapper.selectList( List<JobTask> jobTasks = jobTaskMapper.selectList(
new LambdaQueryWrapper<JobTask>() new LambdaQueryWrapper<JobTask>()
.select(JobTask::getTaskStatus, JobTask::getResultMessage) .select(JobTask::getTaskStatus, JobTask::getResultMessage, JobTask::getExtAttrs)
.eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId())); .eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId()));
SnailJobLog.LOCAL.info("尝试完成任务. taskBatchId:[{}] [{}]", completeJobBatchDTO.getTaskBatchId(), JsonUtil.toJsonString(jobTasks));
JobTaskBatch jobTaskBatch = new JobTaskBatch(); JobTaskBatch jobTaskBatch = new JobTaskBatch();
jobTaskBatch.setId(completeJobBatchDTO.getTaskBatchId()); jobTaskBatch.setId(completeJobBatchDTO.getTaskBatchId());
@ -106,19 +107,18 @@ public class JobTaskBatchHandler {
JobTaskExtAttrsDTO jobTaskExtAttrsDTO = JsonUtil.parseObject(firstJobTask.getExtAttrs(), JobTaskExtAttrsDTO jobTaskExtAttrsDTO = JsonUtil.parseObject(firstJobTask.getExtAttrs(),
JobTaskExtAttrsDTO.class); JobTaskExtAttrsDTO.class);
Integer taskType = jobTaskExtAttrsDTO.getTaskType(); Integer taskType = jobTaskExtAttrsDTO.getTaskType();
if (Objects.nonNull(taskType) && JobTaskTypeEnum.MAP_REDUCE.getType() == taskType) { if (Objects.nonNull(taskType) && JobTaskTypeEnum.MAP_REDUCE.getType() == taskType && isAllMapTask(jobTasks)) {
if (isAllMapTask(jobTasks)) {
// 开启reduce阶段 // 开启reduce阶段
try { try {
ActorRef actorRef = ActorGenerator.jobReduceActor(); ActorRef actorRef = ActorGenerator.jobReduceActor();
actorRef.tell(JobTaskConverter.INSTANCE.toReduceTaskDTO(completeJobBatchDTO), actorRef); actorRef.tell(JobTaskConverter.INSTANCE.toReduceTaskDTO(completeJobBatchDTO), actorRef);
return false;
} catch (Exception e) { } catch (Exception e) {
SnailJobLog.LOCAL.error("tell reduce actor error", e); SnailJobLog.LOCAL.error("tell reduce actor error", e);
} }
} }
} }
} }
}
if (Objects.nonNull(completeJobBatchDTO.getJobOperationReason())) { if (Objects.nonNull(completeJobBatchDTO.getJobOperationReason())) {
jobTaskBatch.setOperationReason(completeJobBatchDTO.getJobOperationReason()); jobTaskBatch.setOperationReason(completeJobBatchDTO.getJobOperationReason());

View File

@ -108,6 +108,7 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
context.setWorkflowTaskBatchId(mapTaskRequest.getWorkflowTaskBatchId()); context.setWorkflowTaskBatchId(mapTaskRequest.getWorkflowTaskBatchId());
context.setWorkflowNodeId(mapTaskRequest.getWorkflowNodeId()); context.setWorkflowNodeId(mapTaskRequest.getWorkflowNodeId());
context.setMapName(mapTaskRequest.getMapName()); context.setMapName(mapTaskRequest.getMapName());
context.setMrStage(MapReduceStageEnum.MAP.name());
return context; return context;
} }