feat(dev_1.1.0):

1、客户端map阶段主流程完成
This commit is contained in:
wodeyangzipingpingwuqi 2024-06-13 18:08:59 +08:00 committed by opensnail
parent 7894cfb284
commit 3f33255792
12 changed files with 102 additions and 29 deletions

View File

@ -9,11 +9,14 @@ import com.aizuda.snailjob.client.job.core.cache.JobExecutorInfoCache;
import com.aizuda.snailjob.client.job.core.cache.ThreadPoolCache;
import com.aizuda.snailjob.client.job.core.dto.JobExecutorInfo;
import com.aizuda.snailjob.client.job.core.executor.AbstractJobExecutor;
import com.aizuda.snailjob.client.job.core.executor.AbstractMapExecutor;
import com.aizuda.snailjob.client.job.core.executor.AbstractMapReduceExecutor;
import com.aizuda.snailjob.client.job.core.executor.AnnotationJobExecutor;
import com.aizuda.snailjob.client.job.core.log.JobLogMeta;
import com.aizuda.snailjob.client.model.StopJobDTO;
import com.aizuda.snailjob.client.model.request.DispatchJobRequest;
import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.model.JobContext;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.common.log.SnailJobLog;
@ -60,7 +63,13 @@ public class JobEndPoint {
Object executor = jobExecutorInfo.getExecutor();
IJobExecutor jobExecutor;
if (IJobExecutor.class.isAssignableFrom(executor.getClass())) {
jobExecutor = (AbstractJobExecutor) executor;
if (JobTaskTypeEnum.MAP.getType() == jobContext.getTaskType()) {
jobExecutor = (AbstractMapExecutor) executor;
} else if (JobTaskTypeEnum.MAP_REDUCE.getType() == jobContext.getTaskId()) {
jobExecutor = (AbstractMapReduceExecutor) executor;
} else {
jobExecutor = (AbstractJobExecutor) executor;
}
} else {
jobExecutor = SpringContext.getBeanByType(AnnotationJobExecutor.class);
}

View File

@ -3,6 +3,7 @@ package com.aizuda.snailjob.client.job.core.client;
import com.aizuda.snailjob.client.common.annotation.Mapping;
import com.aizuda.snailjob.client.common.rpc.client.RequestMethod;
import com.aizuda.snailjob.client.model.request.DispatchJobResultRequest;
import com.aizuda.snailjob.client.model.request.MapTaskRequest;
import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH;
import com.aizuda.snailjob.common.core.model.Result;
@ -18,4 +19,6 @@ public interface JobNettyClient {
@Mapping(method = RequestMethod.POST, path = HTTP_PATH.REPORT_JOB_DISPATCH_RESULT)
Result dispatchResult(DispatchJobResultRequest request);
@Mapping(method = RequestMethod.POST, path = HTTP_PATH.BATCH_REPORT_JOB_MAP_TASK)
Result<Boolean> batchReportMapTask(MapTaskRequest mapTaskRequest);
}

View File

@ -19,7 +19,6 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -94,7 +93,4 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
}
protected abstract ExecuteResult doJobExecute(JobArgs jobArgs);
protected abstract void doMapExecute(List<?> taskList, String mapName);
}

View File

@ -1,5 +1,13 @@
package com.aizuda.snailjob.client.job.core.executor;
import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.client.job.core.IJobExecutor;
import com.aizuda.snailjob.client.job.core.client.JobNettyClient;
import com.aizuda.snailjob.client.model.request.MapTaskRequest;
import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException;
import com.aizuda.snailjob.common.core.model.JobContext;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.model.Result;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
@ -11,14 +19,40 @@ import java.util.List;
* @date 2024/06/12
*/
@Slf4j
public abstract class AbstractMapExecutor extends AbstractJobExecutor {
public abstract class AbstractMapExecutor extends AbstractJobExecutor implements IJobExecutor {
@Override
public void doMapExecute(List<?> taskList, String mapName) {
private static final JobNettyClient CLIENT = RequestBuilder.<JobNettyClient, NettyResult>newBuilder()
.client(JobNettyClient.class)
.build();
public void doMapExecute(JobContext jobContext) {
List<Object> taskList = jobContext.getTaskList();
String mapName = jobContext.getMapName();
if (CollectionUtils.isEmpty(taskList)) {
return;
}
System.out.println("TODO");
// mapName 任务命名和根任务名或者最终任务名称一致导致的问题无限生成子任务或者直接失败
if ("ROOT_TASK".equals(mapName) || "ROOT_TASK".equals(mapName)) {
log.warn("[Map-{}] illegal map task name : {}! please do not use 'OMS_ROOT_TASK' or 'OMS_LAST_TASK' as map task name. as a precaution, it will be renamed 'X-{}' automatically.", jobContext.getTaskId(), mapName, mapName);
mapName = "X-" + mapName;
}
// 1. 构造请求
MapTaskRequest mapTaskRequest = new MapTaskRequest();
mapTaskRequest.setJobId(jobContext.getJobId());
mapTaskRequest.setTaskBatchId(jobContext.getTaskBatchId());
mapTaskRequest.setTaskName(mapName);
mapTaskRequest.setSubTask(taskList);
// 2. 可靠发送请求任务不允许丢失需要使用 ask 方法失败抛异常
Result<Boolean> booleanResult = CLIENT.batchReportMapTask(mapTaskRequest);
if (booleanResult.getData()) {
log.info("[Map-{}] map task[name={},num={}] successfully!", jobContext.getTaskId(), mapName, taskList.size());
} else {
throw new SnailJobMapReduceException("map failed for task: " + mapName);
}
}
}

View File

@ -1,11 +1,9 @@
package com.aizuda.snailjob.client.job.core.executor;
import com.aizuda.snailjob.client.job.core.dto.MrTaskResult;
import com.aizuda.snailjob.client.job.core.dto.JobArgs;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.aizuda.snailjob.common.core.model.JobContext;
import java.util.List;
/**
* @author zhengweilin
* @version 1.0.0
@ -13,5 +11,5 @@ import java.util.List;
*/
public abstract class AbstractMapReduceExecutor extends AbstractMapExecutor {
protected abstract ExecuteResult doReduceExecute(JobContext jobContext, List<MrTaskResult> mrTaskResultList);
protected abstract ExecuteResult doReduceExecute(JobContext jobContext, JobArgs jobArgs);
}

View File

@ -7,8 +7,6 @@ import com.aizuda.snailjob.client.model.ExecuteResult;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;
import java.util.List;
/**
* 基于注解的执行器
*
@ -30,9 +28,4 @@ public class AnnotationJobExecutor extends AbstractJobExecutor {
return (ExecuteResult) ReflectionUtils.invokeMethod(jobExecutorInfo.getMethod(), jobExecutorInfo.getExecutor());
}
}
@Override
protected void doMapExecute(List<?> taskList, String mapName) {
}
}

View File

@ -83,7 +83,7 @@ public interface SystemConstants {
/**
* 生成同步MAP任务
*/
String JOB_MAP_TASK = "/job/map/task/v1";
String BATCH_REPORT_JOB_MAP_TASK = "/batch/report/job/map/task/v1";
/**
* 执行REDUCE任务

View File

@ -16,7 +16,8 @@ public enum JobTaskTypeEnum {
CLUSTER(1),
BROADCAST(2),
SHARDING(3),
MAP_REDUCE(4),
MAP(4),
MAP_REDUCE(5),
;
private final int type;

View File

@ -0,0 +1,32 @@
package com.aizuda.snailjob.common.core.exception;
/**
* 异常信息
*
* @author: opensnail
* @date : 2021-11-19 15:01
*/
public class SnailJobMapReduceException extends BaseSnailJobException {
public SnailJobMapReduceException(String message) {
super(message);
}
public SnailJobMapReduceException(String message, Object... arguments) {
super(message, arguments);
}
public SnailJobMapReduceException(String message, Object[] arguments, Throwable cause) {
super(message, arguments, cause);
}
public SnailJobMapReduceException(String message, Object argument, Throwable cause) {
super(message, argument, cause);
}
public SnailJobMapReduceException(String message, Object argument) {
super(message, argument);
}
}

View File

@ -2,6 +2,8 @@ package com.aizuda.snailjob.common.core.model;
import lombok.Data;
import java.util.List;
/**
* @author: opensnail
* @date : 2023-09-27 09:40
@ -46,4 +48,14 @@ public class JobContext {
* 是否是重试流量
*/
private boolean isRetry;
/**
* Map集合列表
*/
private List<Object> taskList;
/**
* Map名称
*/
private String mapName;
}

View File

@ -2,14 +2,12 @@ package com.aizuda.snailjob.server.job.task.client;
import com.aizuda.snailjob.client.model.StopJobDTO;
import com.aizuda.snailjob.client.model.request.DispatchJobRequest;
import com.aizuda.snailjob.client.model.request.MapTaskRequest;
import com.aizuda.snailjob.common.core.model.Result;
import com.aizuda.snailjob.server.common.rpc.client.RequestMethod;
import com.aizuda.snailjob.server.common.rpc.client.annotation.Body;
import com.aizuda.snailjob.server.common.rpc.client.annotation.Mapping;
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_DISPATCH;
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_MAP_TASK;
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_STOP;
/**
@ -27,7 +25,4 @@ public interface JobRpcClient {
@Mapping(path = JOB_DISPATCH, method = RequestMethod.POST)
Result<Boolean> dispatch(@Body DispatchJobRequest dispatchJobRequest);
@Mapping(path = JOB_MAP_TASK, method = RequestMethod.POST)
Result<Boolean> mapTask(@Body MapTaskRequest mapTaskRequest);
}

View File

@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.job.task.support.request;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.snailjob.client.model.request.MapTaskRequest;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
@ -31,7 +32,6 @@ import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Objects;
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_MAP_TASK;
/**
* 动态分片客户端生成map任务
@ -48,7 +48,7 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
@Override
public boolean supports(final String path) {
return JOB_MAP_TASK.equals(path);
return SystemConstants.HTTP_PATH.BATCH_REPORT_JOB_MAP_TASK.equals(path);
}
@Override