fix(sj_1.1.0): map任务生成和调度成功
This commit is contained in:
parent
5d0761b766
commit
89d118e554
@ -126,13 +126,6 @@ public class JobEndPoint {
|
|||||||
@Mapping(path = JOB_STOP, method = RequestMethod.POST)
|
@Mapping(path = JOB_STOP, method = RequestMethod.POST)
|
||||||
public Result<Boolean> stopJob(@Valid StopJobDTO interruptJob) {
|
public Result<Boolean> stopJob(@Valid StopJobDTO interruptJob) {
|
||||||
|
|
||||||
ValidatorFactory vf = Validation.buildDefaultValidatorFactory();
|
|
||||||
Validator validator = vf.getValidator();
|
|
||||||
Set<ConstraintViolation<StopJobDTO>> set = validator.validate(interruptJob);
|
|
||||||
for (final ConstraintViolation<StopJobDTO> violation : set) {
|
|
||||||
return new Result<>(violation.getMessage(), Boolean.FALSE);
|
|
||||||
}
|
|
||||||
|
|
||||||
ThreadPoolExecutor threadPool = ThreadPoolCache.getThreadPool(interruptJob.getTaskBatchId());
|
ThreadPoolExecutor threadPool = ThreadPoolCache.getThreadPool(interruptJob.getTaskBatchId());
|
||||||
if (Objects.isNull(threadPool) || threadPool.isShutdown() || threadPool.isTerminated()) {
|
if (Objects.isNull(threadPool) || threadPool.isShutdown() || threadPool.isTerminated()) {
|
||||||
return new Result<>(Boolean.TRUE);
|
return new Result<>(Boolean.TRUE);
|
||||||
|
@ -8,6 +8,7 @@ import com.aizuda.snailjob.client.job.core.dto.JobArgs;
|
|||||||
import com.aizuda.snailjob.client.job.core.dto.MapReduceArgs;
|
import com.aizuda.snailjob.client.job.core.dto.MapReduceArgs;
|
||||||
import com.aizuda.snailjob.client.model.ExecuteResult;
|
import com.aizuda.snailjob.client.model.ExecuteResult;
|
||||||
import com.aizuda.snailjob.client.model.request.MapTaskRequest;
|
import com.aizuda.snailjob.client.model.request.MapTaskRequest;
|
||||||
|
import com.aizuda.snailjob.common.core.constant.SystemConstants;
|
||||||
import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException;
|
import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException;
|
||||||
import com.aizuda.snailjob.common.core.model.JobContext;
|
import com.aizuda.snailjob.common.core.model.JobContext;
|
||||||
import com.aizuda.snailjob.common.core.model.MapContext;
|
import com.aizuda.snailjob.common.core.model.MapContext;
|
||||||
@ -46,8 +47,8 @@ public abstract class AbstractMapExecutor extends AbstractJobExecutor implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
// mapName 任务命名和根任务名或者最终任务名称一致导致的问题(无限生成子任务或者直接失败)
|
// mapName 任务命名和根任务名或者最终任务名称一致导致的问题(无限生成子任务或者直接失败)
|
||||||
if ("ROOT_TASK".equals(nextMapName)) {
|
if (SystemConstants.MAP_ROOT.equals(nextMapName)) {
|
||||||
throw new SnailJobMapReduceException("NextMapName can not be ROOT_TASK");
|
throw new SnailJobMapReduceException("The Next mapName can not be {}", SystemConstants.MAP_ROOT);
|
||||||
}
|
}
|
||||||
|
|
||||||
JobContext jobContext = JobContextManager.getJobContext();
|
JobContext jobContext = JobContextManager.getJobContext();
|
||||||
|
@ -192,4 +192,9 @@ public interface SystemConstants {
|
|||||||
* 组名、场景名、空间ID通用正则
|
* 组名、场景名、空间ID通用正则
|
||||||
*/
|
*/
|
||||||
String REGEXP = "^[A-Za-z0-9_-]{1,64}$";
|
String REGEXP = "^[A-Za-z0-9_-]{1,64}$";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 动态分片的root节点
|
||||||
|
*/
|
||||||
|
String MAP_ROOT = "MAP_ROOT";
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,7 @@ import akka.actor.AbstractActor;
|
|||||||
import cn.hutool.core.collection.CollUtil;
|
import cn.hutool.core.collection.CollUtil;
|
||||||
import cn.hutool.core.lang.Assert;
|
import cn.hutool.core.lang.Assert;
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
|
import com.aizuda.snailjob.common.core.constant.SystemConstants;
|
||||||
import com.aizuda.snailjob.common.core.context.SpringContext;
|
import com.aizuda.snailjob.common.core.context.SpringContext;
|
||||||
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
|
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
|
||||||
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
|
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
|
||||||
@ -134,12 +135,13 @@ public class JobExecutorActor extends AbstractActor {
|
|||||||
JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(job.getTaskType());
|
JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(job.getTaskType());
|
||||||
JobTaskGenerateContext instanceGenerateContext = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job);
|
JobTaskGenerateContext instanceGenerateContext = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job);
|
||||||
instanceGenerateContext.setTaskBatchId(taskExecute.getTaskBatchId());
|
instanceGenerateContext.setTaskBatchId(taskExecute.getTaskBatchId());
|
||||||
instanceGenerateContext.setMapName("ROOT_TASK");
|
instanceGenerateContext.setMapName(SystemConstants.MAP_ROOT);
|
||||||
instanceGenerateContext.setMapSubTask(Lists.newArrayList(StrUtil.EMPTY));
|
instanceGenerateContext.setMapSubTask(Lists.newArrayList(StrUtil.EMPTY));
|
||||||
// TODO 此处需要判断任务类型
|
// TODO 此处需要判断任务类型
|
||||||
instanceGenerateContext.setMrStage(MapReduceStageEnum.MAP);
|
instanceGenerateContext.setMrStage(MapReduceStageEnum.MAP);
|
||||||
List<JobTask> taskList = taskInstance.generate(instanceGenerateContext);
|
List<JobTask> taskList = taskInstance.generate(instanceGenerateContext);
|
||||||
if (CollUtil.isEmpty(taskList)) {
|
if (CollUtil.isEmpty(taskList)) {
|
||||||
|
SnailJobLog.LOCAL.warn("Generate job task is empty, taskBatchId:[{}]", taskExecute.getTaskBatchId());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -179,7 +181,7 @@ public class JobExecutorActor extends AbstractActor {
|
|||||||
context.setJobId(job.getId());
|
context.setJobId(job.getId());
|
||||||
context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
|
context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
|
||||||
context.setWorkflowNodeId(taskExecute.getWorkflowNodeId());
|
context.setWorkflowNodeId(taskExecute.getWorkflowNodeId());
|
||||||
context.setMapName("ROOT_TASK");
|
context.setMapName(SystemConstants.MAP_ROOT);
|
||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -114,6 +114,7 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
|
|||||||
final List<RegisterNodeInfo> nodeInfoList, final Set<RegisterNodeInfo> serverNodes) {
|
final List<RegisterNodeInfo> nodeInfoList, final Set<RegisterNodeInfo> serverNodes) {
|
||||||
List<?> mapSubTask = context.getMapSubTask();
|
List<?> mapSubTask = context.getMapSubTask();
|
||||||
if (CollUtil.isEmpty(mapSubTask)) {
|
if (CollUtil.isEmpty(mapSubTask)) {
|
||||||
|
SnailJobLog.LOCAL.warn("Map sub task is empty. TaskBatchId:[{}]", context.getTaskBatchId());
|
||||||
return Lists.newArrayList();
|
return Lists.newArrayList();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -138,6 +139,6 @@ public class MapReduceTaskGenerator extends AbstractJobTaskGenerator {
|
|||||||
jobTasks.add(jobTask);
|
jobTasks.add(jobTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
return Lists.newArrayList();
|
return jobTasks;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user