diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java index 3939e4e87..93e84de29 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java @@ -9,11 +9,14 @@ import com.aizuda.snailjob.client.job.core.cache.JobExecutorInfoCache; import com.aizuda.snailjob.client.job.core.cache.ThreadPoolCache; import com.aizuda.snailjob.client.job.core.dto.JobExecutorInfo; import com.aizuda.snailjob.client.job.core.executor.AbstractJobExecutor; +import com.aizuda.snailjob.client.job.core.executor.AbstractMapExecutor; +import com.aizuda.snailjob.client.job.core.executor.AbstractMapReduceExecutor; import com.aizuda.snailjob.client.job.core.executor.AnnotationJobExecutor; import com.aizuda.snailjob.client.job.core.log.JobLogMeta; import com.aizuda.snailjob.client.model.StopJobDTO; import com.aizuda.snailjob.client.model.request.DispatchJobRequest; import com.aizuda.snailjob.common.core.context.SpringContext; +import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; import com.aizuda.snailjob.common.core.model.JobContext; import com.aizuda.snailjob.common.core.model.Result; import com.aizuda.snailjob.common.log.SnailJobLog; @@ -60,7 +63,13 @@ public class JobEndPoint { Object executor = jobExecutorInfo.getExecutor(); IJobExecutor jobExecutor; if (IJobExecutor.class.isAssignableFrom(executor.getClass())) { - jobExecutor = (AbstractJobExecutor) executor; + if (JobTaskTypeEnum.MAP.getType() == jobContext.getTaskType()) { + jobExecutor = (AbstractMapExecutor) executor; + } else if (JobTaskTypeEnum.MAP_REDUCE.getType() == jobContext.getTaskId()) { + jobExecutor = (AbstractMapReduceExecutor) executor; + } else { + jobExecutor = (AbstractJobExecutor) executor; + } } else { jobExecutor = SpringContext.getBeanByType(AnnotationJobExecutor.class); } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobNettyClient.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobNettyClient.java index f76cb82f6..c86f24c68 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobNettyClient.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobNettyClient.java @@ -3,6 +3,7 @@ package com.aizuda.snailjob.client.job.core.client; import com.aizuda.snailjob.client.common.annotation.Mapping; import com.aizuda.snailjob.client.common.rpc.client.RequestMethod; import com.aizuda.snailjob.client.model.request.DispatchJobResultRequest; +import com.aizuda.snailjob.client.model.request.MapTaskRequest; import com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH; import com.aizuda.snailjob.common.core.model.Result; @@ -18,4 +19,6 @@ public interface JobNettyClient { @Mapping(method = RequestMethod.POST, path = HTTP_PATH.REPORT_JOB_DISPATCH_RESULT) Result dispatchResult(DispatchJobResultRequest request); + @Mapping(method = RequestMethod.POST, path = HTTP_PATH.BATCH_REPORT_JOB_MAP_TASK) + Result batchReportMapTask(MapTaskRequest mapTaskRequest); } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java index 56bb425e1..ef8821023 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java @@ -19,7 +19,6 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; -import java.util.List; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -94,7 +93,4 @@ public abstract class AbstractJobExecutor implements IJobExecutor { } protected abstract ExecuteResult doJobExecute(JobArgs jobArgs); - - protected abstract void doMapExecute(List taskList, String mapName); - } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java index af42f4903..dbe748cab 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java @@ -1,5 +1,13 @@ package com.aizuda.snailjob.client.job.core.executor; +import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder; +import com.aizuda.snailjob.client.job.core.IJobExecutor; +import com.aizuda.snailjob.client.job.core.client.JobNettyClient; +import com.aizuda.snailjob.client.model.request.MapTaskRequest; +import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException; +import com.aizuda.snailjob.common.core.model.JobContext; +import com.aizuda.snailjob.common.core.model.NettyResult; +import com.aizuda.snailjob.common.core.model.Result; import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; @@ -11,14 +19,40 @@ import java.util.List; * @date 2024/06/12 */ @Slf4j -public abstract class AbstractMapExecutor extends AbstractJobExecutor { +public abstract class AbstractMapExecutor extends AbstractJobExecutor implements IJobExecutor { - @Override - public void doMapExecute(List taskList, String mapName) { + private static final JobNettyClient CLIENT = RequestBuilder.newBuilder() + .client(JobNettyClient.class) + .build(); + public void doMapExecute(JobContext jobContext) { + + List taskList = jobContext.getTaskList(); + String mapName = jobContext.getMapName(); if (CollectionUtils.isEmpty(taskList)) { return; } - System.out.println("TODO"); + + // mapName 任务命名和根任务名或者最终任务名称一致导致的问题(无限生成子任务或者直接失败) + if ("ROOT_TASK".equals(mapName) || "ROOT_TASK".equals(mapName)) { + log.warn("[Map-{}] illegal map task name : {}! please do not use 'OMS_ROOT_TASK' or 'OMS_LAST_TASK' as map task name. as a precaution, it will be renamed 'X-{}' automatically.", jobContext.getTaskId(), mapName, mapName); + mapName = "X-" + mapName; + } + + // 1. 构造请求 + MapTaskRequest mapTaskRequest = new MapTaskRequest(); + mapTaskRequest.setJobId(jobContext.getJobId()); + mapTaskRequest.setTaskBatchId(jobContext.getTaskBatchId()); + mapTaskRequest.setTaskName(mapName); + mapTaskRequest.setSubTask(taskList); + + // 2. 可靠发送请求(任务不允许丢失,需要使用 ask 方法,失败抛异常) + Result booleanResult = CLIENT.batchReportMapTask(mapTaskRequest); + + if (booleanResult.getData()) { + log.info("[Map-{}] map task[name={},num={}] successfully!", jobContext.getTaskId(), mapName, taskList.size()); + } else { + throw new SnailJobMapReduceException("map failed for task: " + mapName); + } } } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapReduceExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapReduceExecutor.java index 5357d7c15..c80893915 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapReduceExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapReduceExecutor.java @@ -1,11 +1,9 @@ package com.aizuda.snailjob.client.job.core.executor; -import com.aizuda.snailjob.client.job.core.dto.MrTaskResult; +import com.aizuda.snailjob.client.job.core.dto.JobArgs; import com.aizuda.snailjob.client.model.ExecuteResult; import com.aizuda.snailjob.common.core.model.JobContext; -import java.util.List; - /** * @author zhengweilin * @version 1.0.0 @@ -13,5 +11,5 @@ import java.util.List; */ public abstract class AbstractMapReduceExecutor extends AbstractMapExecutor { - protected abstract ExecuteResult doReduceExecute(JobContext jobContext, List mrTaskResultList); + protected abstract ExecuteResult doReduceExecute(JobContext jobContext, JobArgs jobArgs); } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AnnotationJobExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AnnotationJobExecutor.java index 7e17690a1..2d5ae3107 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AnnotationJobExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AnnotationJobExecutor.java @@ -7,8 +7,6 @@ import com.aizuda.snailjob.client.model.ExecuteResult; import org.springframework.stereotype.Component; import org.springframework.util.ReflectionUtils; -import java.util.List; - /** * 基于注解的执行器 * @@ -30,9 +28,4 @@ public class AnnotationJobExecutor extends AbstractJobExecutor { return (ExecuteResult) ReflectionUtils.invokeMethod(jobExecutorInfo.getMethod(), jobExecutorInfo.getExecutor()); } } - - @Override - protected void doMapExecute(List taskList, String mapName) { - - } } diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java index a0c6029cf..a6dbc098a 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/constant/SystemConstants.java @@ -83,7 +83,7 @@ public interface SystemConstants { /** * 生成同步MAP任务 */ - String JOB_MAP_TASK = "/job/map/task/v1"; + String BATCH_REPORT_JOB_MAP_TASK = "/batch/report/job/map/task/v1"; /** * 执行REDUCE任务 diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/JobTaskTypeEnum.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/JobTaskTypeEnum.java index 6446283fe..31c004e20 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/JobTaskTypeEnum.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/enums/JobTaskTypeEnum.java @@ -16,7 +16,8 @@ public enum JobTaskTypeEnum { CLUSTER(1), BROADCAST(2), SHARDING(3), - MAP_REDUCE(4), + MAP(4), + MAP_REDUCE(5), ; private final int type; diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/exception/SnailJobMapReduceException.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/exception/SnailJobMapReduceException.java new file mode 100644 index 000000000..ea85fc855 --- /dev/null +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/exception/SnailJobMapReduceException.java @@ -0,0 +1,32 @@ + +package com.aizuda.snailjob.common.core.exception; + + +/** + * 异常信息 + * + * @author: opensnail + * @date : 2021-11-19 15:01 + */ +public class SnailJobMapReduceException extends BaseSnailJobException { + + public SnailJobMapReduceException(String message) { + super(message); + } + + public SnailJobMapReduceException(String message, Object... arguments) { + super(message, arguments); + } + + public SnailJobMapReduceException(String message, Object[] arguments, Throwable cause) { + super(message, arguments, cause); + } + + public SnailJobMapReduceException(String message, Object argument, Throwable cause) { + super(message, argument, cause); + } + + public SnailJobMapReduceException(String message, Object argument) { + super(message, argument); + } +} diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java index 32b0531fe..ddcf27c7b 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java @@ -2,6 +2,8 @@ package com.aizuda.snailjob.common.core.model; import lombok.Data; +import java.util.List; + /** * @author: opensnail * @date : 2023-09-27 09:40 @@ -46,4 +48,14 @@ public class JobContext { * 是否是重试流量 */ private boolean isRetry; + + /** + * Map集合列表 + */ + private List taskList; + + /** + * Map名称 + */ + private String mapName; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/client/JobRpcClient.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/client/JobRpcClient.java index bec7f42fa..c56efc283 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/client/JobRpcClient.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/client/JobRpcClient.java @@ -2,14 +2,12 @@ package com.aizuda.snailjob.server.job.task.client; import com.aizuda.snailjob.client.model.StopJobDTO; import com.aizuda.snailjob.client.model.request.DispatchJobRequest; -import com.aizuda.snailjob.client.model.request.MapTaskRequest; import com.aizuda.snailjob.common.core.model.Result; import com.aizuda.snailjob.server.common.rpc.client.RequestMethod; import com.aizuda.snailjob.server.common.rpc.client.annotation.Body; import com.aizuda.snailjob.server.common.rpc.client.annotation.Mapping; import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_DISPATCH; -import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_MAP_TASK; import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_STOP; /** @@ -27,7 +25,4 @@ public interface JobRpcClient { @Mapping(path = JOB_DISPATCH, method = RequestMethod.POST) Result dispatch(@Body DispatchJobRequest dispatchJobRequest); - @Mapping(path = JOB_MAP_TASK, method = RequestMethod.POST) - Result mapTask(@Body MapTaskRequest mapTaskRequest); - } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java index 9765114b4..b2dae57c1 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java @@ -3,6 +3,7 @@ package com.aizuda.snailjob.server.job.task.support.request; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.net.url.UrlQuery; import com.aizuda.snailjob.client.model.request.MapTaskRequest; +import com.aizuda.snailjob.common.core.constant.SystemConstants; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.model.NettyResult; @@ -31,7 +32,6 @@ import org.springframework.stereotype.Component; import java.util.List; import java.util.Objects; -import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.JOB_MAP_TASK; /** * 动态分片客户端生成map任务 @@ -48,7 +48,7 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler { @Override public boolean supports(final String path) { - return JOB_MAP_TASK.equals(path); + return SystemConstants.HTTP_PATH.BATCH_REPORT_JOB_MAP_TASK.equals(path); } @Override