fix(sj_1.1.0): map任务生成和调度成功
This commit is contained in:
		
							parent
							
								
									4e8bd301d2
								
							
						
					
					
						commit
						6f37691be1
					
				@ -122,13 +122,6 @@ public class JobEndPoint {
 | 
			
		||||
    @Mapping(path = JOB_STOP, method = RequestMethod.POST)
 | 
			
		||||
    public Result<Boolean> stopJob(@Valid StopJobDTO interruptJob) {
 | 
			
		||||
 | 
			
		||||
        ValidatorFactory vf = Validation.buildDefaultValidatorFactory();
 | 
			
		||||
        Validator validator = vf.getValidator();
 | 
			
		||||
        Set<ConstraintViolation<StopJobDTO>> set = validator.validate(interruptJob);
 | 
			
		||||
        for (final ConstraintViolation<StopJobDTO> violation : set) {
 | 
			
		||||
            return new Result<>(violation.getMessage(), Boolean.FALSE);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        ThreadPoolExecutor threadPool = ThreadPoolCache.getThreadPool(interruptJob.getTaskBatchId());
 | 
			
		||||
        if (Objects.isNull(threadPool) || threadPool.isShutdown() || threadPool.isTerminated()) {
 | 
			
		||||
            return new Result<>(Boolean.TRUE);
 | 
			
		||||
 | 
			
		||||
@ -8,6 +8,7 @@ import com.aizuda.snailjob.client.job.core.dto.JobArgs;
 | 
			
		||||
import com.aizuda.snailjob.client.job.core.dto.MapReduceArgs;
 | 
			
		||||
import com.aizuda.snailjob.client.model.ExecuteResult;
 | 
			
		||||
import com.aizuda.snailjob.client.model.request.MapTaskRequest;
 | 
			
		||||
import com.aizuda.snailjob.common.core.constant.SystemConstants;
 | 
			
		||||
import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException;
 | 
			
		||||
import com.aizuda.snailjob.common.core.model.JobContext;
 | 
			
		||||
import com.aizuda.snailjob.common.core.model.MapContext;
 | 
			
		||||
@ -46,8 +47,8 @@ public abstract class AbstractMapExecutor extends AbstractJobExecutor implements
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // mapName 任务命名和根任务名或者最终任务名称一致导致的问题(无限生成子任务或者直接失败)
 | 
			
		||||
        if ("ROOT_TASK".equals(nextMapName)) {
 | 
			
		||||
            throw new SnailJobMapReduceException("NextMapName can not be ROOT_TASK");
 | 
			
		||||
        if (SystemConstants.MAP_ROOT.equals(nextMapName)) {
 | 
			
		||||
            throw new SnailJobMapReduceException("The Next mapName can not be {}", SystemConstants.MAP_ROOT);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        JobContext jobContext = JobContextManager.getJobContext();
 | 
			
		||||
 | 
			
		||||
@ -208,4 +208,9 @@ public interface SystemConstants {
 | 
			
		||||
     */
 | 
			
		||||
    String YYYY_MM_DD = "yyyy-MM-dd";
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * 动态分片的root节点
 | 
			
		||||
     */
 | 
			
		||||
    String MAP_ROOT = "MAP_ROOT";
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -4,6 +4,7 @@ import akka.actor.AbstractActor;
 | 
			
		||||
import cn.hutool.core.collection.CollUtil;
 | 
			
		||||
import cn.hutool.core.lang.Assert;
 | 
			
		||||
import cn.hutool.core.util.StrUtil;
 | 
			
		||||
import com.aizuda.snailjob.common.core.constant.SystemConstants;
 | 
			
		||||
import com.aizuda.snailjob.common.core.context.SpringContext;
 | 
			
		||||
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
 | 
			
		||||
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
 | 
			
		||||
@ -134,12 +135,13 @@ public class JobExecutorActor extends AbstractActor {
 | 
			
		||||
            JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(job.getTaskType());
 | 
			
		||||
            JobTaskGenerateContext instanceGenerateContext = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job);
 | 
			
		||||
            instanceGenerateContext.setTaskBatchId(taskExecute.getTaskBatchId());
 | 
			
		||||
            instanceGenerateContext.setMapName("ROOT_TASK");
 | 
			
		||||
            instanceGenerateContext.setMapName(SystemConstants.MAP_ROOT);
 | 
			
		||||
            instanceGenerateContext.setMapSubTask(Lists.newArrayList(StrUtil.EMPTY));
 | 
			
		||||
            // TODO 此处需要判断任务类型
 | 
			
		||||
            instanceGenerateContext.setMrStage(MapReduceStageEnum.MAP);
 | 
			
		||||
            List<JobTask> taskList = taskInstance.generate(instanceGenerateContext);
 | 
			
		||||
            if (CollUtil.isEmpty(taskList)) {
 | 
			
		||||
                SnailJobLog.LOCAL.warn("Generate job task is empty, taskBatchId:[{}]", taskExecute.getTaskBatchId());
 | 
			
		||||
                return;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
@ -179,7 +181,7 @@ public class JobExecutorActor extends AbstractActor {
 | 
			
		||||
        context.setJobId(job.getId());
 | 
			
		||||
        context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
 | 
			
		||||
        context.setWorkflowNodeId(taskExecute.getWorkflowNodeId());
 | 
			
		||||
        context.setMapName("ROOT_TASK");
 | 
			
		||||
        context.setMapName(SystemConstants.MAP_ROOT);
 | 
			
		||||
        return context;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -114,6 +114,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
 | 
			
		||||
        final List<RegisterNodeInfo> nodeInfoList, final Set<RegisterNodeInfo> serverNodes) {
 | 
			
		||||
        List<?> mapSubTask = context.getMapSubTask();
 | 
			
		||||
        if (CollUtil.isEmpty(mapSubTask)) {
 | 
			
		||||
            SnailJobLog.LOCAL.warn("Map sub task is empty. TaskBatchId:[{}]", context.getTaskBatchId());
 | 
			
		||||
            return Lists.newArrayList();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@ -138,6 +139,6 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
 | 
			
		||||
            jobTasks.add(jobTask);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        return Lists.newArrayList();
 | 
			
		||||
        return jobTasks;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user