diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/MapHandler.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/MapHandler.java new file mode 100644 index 000000000..a264c8a6d --- /dev/null +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/MapHandler.java @@ -0,0 +1,15 @@ +package com.aizuda.snailjob.client.job.core; + +import com.aizuda.snailjob.client.model.ExecuteResult; + +import java.util.List; + +/** + * @author: opensnail + * @date : 2024-06-26 + * @version : sj_1.1.0 + */ +public interface MapHandler { + + ExecuteResult doMap(List taskList, String nextTaskName); +} diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/annotation/MapExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/annotation/MapExecutor.java new file mode 100644 index 000000000..544eb3aaa --- /dev/null +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/annotation/MapExecutor.java @@ -0,0 +1,30 @@ +package com.aizuda.snailjob.client.job.core.annotation; + +import com.aizuda.snailjob.common.core.constant.SystemConstants; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * @author: opensnail + * @date : 2024-06-26 + * @version : sj_1.1.0 + */ +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +@Documented +public @interface MapExecutor { + + /** + * 任务名称 + * + * @return + */ + String taskName() default SystemConstants.MAP_ROOT; + +} diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/annotation/MergeReduceExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/annotation/MergeReduceExecutor.java new file mode 100644 index 000000000..dad557900 --- /dev/null +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/annotation/MergeReduceExecutor.java @@ -0,0 +1,21 @@ +package com.aizuda.snailjob.client.job.core.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * @author: opensnail + * @date : 2024-06-26 + * @version : sj_1.1.0 + */ +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +@Documented +public @interface MergeReduceExecutor { + +} diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/annotation/ReduceExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/annotation/ReduceExecutor.java new file mode 100644 index 000000000..da153f7eb --- /dev/null +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/annotation/ReduceExecutor.java @@ -0,0 +1,21 @@ +package com.aizuda.snailjob.client.job.core.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * @author: opensnail + * @date : 2024-06-26 + * @version : sj_1.1.0 + */ +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +@Documented +public @interface ReduceExecutor { + +} diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java index bbb6231b2..bde615469 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java @@ -13,6 +13,8 @@ import com.aizuda.snailjob.client.job.core.executor.AbstractJobExecutor; import com.aizuda.snailjob.client.job.core.executor.AbstractMapExecutor; import com.aizuda.snailjob.client.job.core.executor.AbstractMapReduceExecutor; import com.aizuda.snailjob.client.job.core.executor.AnnotationJobExecutor; +import com.aizuda.snailjob.client.job.core.executor.AnnotationMapJobExecutor; +import com.aizuda.snailjob.client.job.core.executor.AnnotationMapReduceJobExecutor; import com.aizuda.snailjob.client.job.core.log.JobLogMeta; import com.aizuda.snailjob.client.model.StopJobDTO; import com.aizuda.snailjob.client.model.request.DispatchJobRequest; @@ -54,7 +56,7 @@ public class JobEndPoint { if (Objects.nonNull(dispatchJob.getRetryCount()) && dispatchJob.getRetryCount() > 0) { SnailJobLog.REMOTE.info("任务执行/调度失败执行重试. 重试次数:[{}]", - dispatchJob.getRetryCount()); + dispatchJob.getRetryCount()); } JobExecutorInfo jobExecutorInfo = JobExecutorInfoCache.get(jobContext.getExecutorInfo()); @@ -75,7 +77,13 @@ public class JobEndPoint { jobExecutor = (AbstractJobExecutor) executor; } } else { - jobExecutor = SpringContext.getBeanByType(AnnotationJobExecutor.class); + if (JobTaskTypeEnum.MAP.getType() == jobContext.getTaskType()) { + jobExecutor = SpringContext.getBeanByType(AnnotationMapJobExecutor.class); + } else if (JobTaskTypeEnum.MAP_REDUCE.getType() == jobContext.getTaskType()) { + jobExecutor = SpringContext.getBeanByType(AnnotationMapReduceJobExecutor.class); + } else { + jobExecutor = SpringContext.getBeanByType(AnnotationJobExecutor.class); + } } SnailJobLog.REMOTE.info("批次:[{}] 任务调度成功. ", dispatchJob.getTaskBatchId()); diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobExecutorInfo.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobExecutorInfo.java index 49f9b9035..339e74c44 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobExecutorInfo.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobExecutorInfo.java @@ -4,6 +4,7 @@ import lombok.AllArgsConstructor; import lombok.Data; import java.lang.reflect.Method; +import java.util.Map; /** * @author opensnail @@ -18,6 +19,12 @@ public class JobExecutorInfo { private final Method method; + private final Map mapExecutorMap; + + Method reduceExecutor; + + Method mergeReduceExecutor; + private Object executor; } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MapReduceArgs.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MapReduceArgs.java deleted file mode 100644 index d4e0a3e3c..000000000 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/MapReduceArgs.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.aizuda.snailjob.client.job.core.dto; - -import lombok.Data; -import lombok.EqualsAndHashCode; - -import java.util.List; - -/** - * @author: opensnail - * @date : 2024-06-13 - * @since : sj_1.1.0 - */ -@EqualsAndHashCode(callSuper = true) -@Data -public class MapReduceArgs extends JobArgs { - - private String taskName; - - private List mapArgsList; -} diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java index 22f523d07..65d2a97a4 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapExecutor.java @@ -1,10 +1,11 @@ package com.aizuda.snailjob.client.job.core.executor; import cn.hutool.core.util.StrUtil; -import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder; import com.aizuda.snailjob.client.job.core.IJobExecutor; -import com.aizuda.snailjob.client.job.core.client.JobNettyClient; +import com.aizuda.snailjob.client.job.core.MapHandler; +import com.aizuda.snailjob.client.job.core.cache.JobExecutorInfoCache; import com.aizuda.snailjob.client.job.core.dto.JobArgs; +import com.aizuda.snailjob.client.job.core.dto.JobExecutorInfo; import com.aizuda.snailjob.client.job.core.dto.MapArgs; import com.aizuda.snailjob.client.model.ExecuteResult; import com.aizuda.snailjob.client.model.request.MapTaskRequest; @@ -12,13 +13,19 @@ import com.aizuda.snailjob.common.core.constant.SystemConstants; import com.aizuda.snailjob.common.core.enums.StatusEnum; import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException; import com.aizuda.snailjob.common.core.model.JobContext; -import com.aizuda.snailjob.common.core.model.NettyResult; import com.aizuda.snailjob.common.core.model.Result; import com.aizuda.snailjob.common.log.SnailJobLog; import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; +import org.springframework.util.ReflectionUtils; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; /** * @author zhengweilin @@ -28,55 +35,48 @@ import java.util.List; @Slf4j public abstract class AbstractMapExecutor extends AbstractJobExecutor implements IJobExecutor { - private static final JobNettyClient CLIENT = RequestBuilder.newBuilder() - .client(JobNettyClient.class) - .async(Boolean.FALSE) - .build(); - @Override protected ExecuteResult doJobExecute(final JobArgs jobArgs) { if (jobArgs instanceof MapArgs) { - return this.doJobMapExecute((MapArgs) jobArgs); + return this.doJobMapExecute((MapArgs) jobArgs, getMapHandler()); } throw new SnailJobMapReduceException("For tasks that are not of type map or map reduce, please do not use the AbstractMapExecutor class."); } - public abstract ExecuteResult doJobMapExecute(MapArgs mapArgs); + public abstract ExecuteResult doJobMapExecute(MapArgs mapArgs, final MapHandler mapHandler); - public ExecuteResult doMap(List taskList, String nextTaskName) { + private MapHandler getMapHandler() { + return (MapHandler) Proxy.newProxyInstance(MapHandler.class.getClassLoader(), + new Class[]{MapHandler.class}, new MapInvokeHandler()); + } - if (StrUtil.isBlank(nextTaskName)) { - throw new SnailJobMapReduceException("The next task name can not blank or null {}", nextTaskName); + protected ExecuteResult invokeMapExecute (MapArgs mapArgs, final MapHandler mapHandler) { + JobExecutorInfo jobExecutorInfo = JobExecutorInfoCache.get(mapArgs.getExecutorInfo()); + + if (Objects.isNull(jobExecutorInfo)) { + throw new SnailJobMapReduceException("[{}] not found", mapArgs.getExecutorInfo()); } - if (CollectionUtils.isEmpty(taskList)) { - throw new SnailJobMapReduceException("The task list can not empty {}", nextTaskName); + Map mapExecutorMap = Optional.ofNullable(jobExecutorInfo.getMapExecutorMap()) + .orElse(new HashMap<>()); + Method method = mapExecutorMap.get(mapArgs.getTaskName()); + + if (Objects.isNull(method)) { + throw new SnailJobMapReduceException( + "[{}] MapTask execution method not found. Please configure the @MapExecutor annotation", + mapArgs.getExecutorInfo()); + } - // taskName 任务命名和根任务名或者最终任务名称一致导致的问题(无限生成子任务或者直接失败) - if (SystemConstants.MAP_ROOT.equals(nextTaskName)) { - throw new SnailJobMapReduceException("The Next taskName can not be {}", SystemConstants.MAP_ROOT); + Class[] paramTypes = method.getParameterTypes(); + if (paramTypes.length == 1) { + return (ExecuteResult) ReflectionUtils.invokeMethod(method, jobExecutorInfo.getExecutor(), mapArgs); + } else if (paramTypes.length == 2) { + return (ExecuteResult) ReflectionUtils.invokeMethod(method, jobExecutorInfo.getExecutor(), mapArgs, + mapHandler); } - JobContext jobContext = JobContextManager.getJobContext(); - - // 1. 构造请求 - MapTaskRequest mapTaskRequest = new MapTaskRequest(); - mapTaskRequest.setJobId(jobContext.getJobId()); - mapTaskRequest.setTaskBatchId(jobContext.getTaskBatchId()); - mapTaskRequest.setTaskName(nextTaskName); - mapTaskRequest.setSubTask(taskList); - mapTaskRequest.setParentId(jobContext.getTaskId()); - - // 2. 同步发送请求 - Result result = CLIENT.batchReportMapTask(mapTaskRequest); - if (StatusEnum.NO.getStatus() == result.getStatus() || result.getData()) { - SnailJobLog.LOCAL.info("Map task create successfully!. taskName:[{}] TaskId:[{}] ", nextTaskName, jobContext.getTaskId()); - } else { - throw new SnailJobMapReduceException("map failed for task: " + nextTaskName); - } - - return ExecuteResult.success(); + throw new SnailJobMapReduceException("Executor for [{}] not found", mapArgs.getTaskName()); } } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapReduceExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapReduceExecutor.java index ce7085d87..546ecd18e 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapReduceExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractMapReduceExecutor.java @@ -1,8 +1,6 @@ package com.aizuda.snailjob.client.job.core.executor; import com.aizuda.snailjob.client.job.core.dto.JobArgs; -import com.aizuda.snailjob.client.job.core.dto.MapArgs; -import com.aizuda.snailjob.client.job.core.dto.MapReduceArgs; import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs; import com.aizuda.snailjob.client.job.core.dto.ReduceArgs; import com.aizuda.snailjob.client.model.ExecuteResult; @@ -10,8 +8,6 @@ import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum; import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException; import com.aizuda.snailjob.common.core.model.JobContext; -import java.util.List; - /** * @author zhengweilin * @version 1.0.0 diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AnnotationMapJobExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AnnotationMapJobExecutor.java new file mode 100644 index 000000000..fe88bfd5b --- /dev/null +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AnnotationMapJobExecutor.java @@ -0,0 +1,32 @@ +package com.aizuda.snailjob.client.job.core.executor; + +import com.aizuda.snailjob.client.job.core.MapHandler; +import com.aizuda.snailjob.client.job.core.cache.JobExecutorInfoCache; +import com.aizuda.snailjob.client.job.core.dto.JobExecutorInfo; +import com.aizuda.snailjob.client.job.core.dto.MapArgs; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException; +import org.springframework.stereotype.Component; +import org.springframework.util.ReflectionUtils; + +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** + * 基于注解的Map任务执行器 + * + * @author opensnail + * @date 2024-06-26 22:20:36 + * @since sj_1.1.0 + */ +@Component +public class AnnotationMapJobExecutor extends AbstractMapExecutor { + + @Override + public ExecuteResult doJobMapExecute(final MapArgs mapArgs, final MapHandler mapHandler) { + return invokeMapExecute(mapArgs, mapHandler); + } +} diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AnnotationMapReduceJobExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AnnotationMapReduceJobExecutor.java new file mode 100644 index 000000000..bf32eb406 --- /dev/null +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AnnotationMapReduceJobExecutor.java @@ -0,0 +1,78 @@ +package com.aizuda.snailjob.client.job.core.executor; + +import com.aizuda.snailjob.client.job.core.MapHandler; +import com.aizuda.snailjob.client.job.core.cache.JobExecutorInfoCache; +import com.aizuda.snailjob.client.job.core.dto.JobExecutorInfo; +import com.aizuda.snailjob.client.job.core.dto.MapArgs; +import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs; +import com.aizuda.snailjob.client.job.core.dto.ReduceArgs; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException; +import org.springframework.stereotype.Component; +import org.springframework.util.ReflectionUtils; + +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** + * 基于注解的MapReduce执行器 + * + * @author opensnail + * @date 2024-06-26 22:20:36 + * @since sj_1.1.0 + */ +@Component +public class AnnotationMapReduceJobExecutor extends AbstractMapReduceExecutor { + + @Override + protected ExecuteResult doReduceExecute(final ReduceArgs reduceArgs) { + JobExecutorInfo jobExecutorInfo = JobExecutorInfoCache.get(reduceArgs.getExecutorInfo()); + if (Objects.isNull(jobExecutorInfo)) { + throw new SnailJobMapReduceException("[{}]未发现", reduceArgs.getExecutorInfo()); + } + + if (Objects.isNull(jobExecutorInfo.getReduceExecutor())) { + throw new SnailJobMapReduceException("未发现ReduceTask执行方法, 请配置@ReduceExecutor注解", + reduceArgs.getExecutorInfo()); + } + + Class[] paramTypes = jobExecutorInfo.getReduceExecutor().getParameterTypes(); + if (paramTypes.length > 0) { + return (ExecuteResult) ReflectionUtils.invokeMethod(jobExecutorInfo.getReduceExecutor(), + jobExecutorInfo.getExecutor(), reduceArgs); + } + + throw new SnailJobMapReduceException("[{}]未发现ReduceTask执行方法", reduceArgs.getExecutorInfo()); + } + + @Override + protected ExecuteResult doMergeReduceExecute(final MergeReduceArgs mergeReduceArgs) { + JobExecutorInfo jobExecutorInfo = JobExecutorInfoCache.get(mergeReduceArgs.getExecutorInfo()); + + if (Objects.isNull(jobExecutorInfo)) { + throw new SnailJobMapReduceException("[{}]未发现", mergeReduceArgs.getExecutorInfo()); + } + + if (Objects.isNull(jobExecutorInfo.getReduceExecutor())) { + throw new SnailJobMapReduceException("[{}]未发现ReduceTask执行方法, 请配置@MergeReduceExecutor注解", + mergeReduceArgs.getExecutorInfo()); + } + + Class[] paramTypes = jobExecutorInfo.getMergeReduceExecutor().getParameterTypes(); + if (paramTypes.length > 0) { + return (ExecuteResult) ReflectionUtils.invokeMethod(jobExecutorInfo.getReduceExecutor(), + jobExecutorInfo.getExecutor(), mergeReduceArgs); + } + + throw new SnailJobMapReduceException("[{}]未发现MergeReduceTask执行方法 [{}]", + mergeReduceArgs.getExecutorInfo()); + } + + @Override + public ExecuteResult doJobMapExecute(final MapArgs mapArgs, final MapHandler mapHandler) { + return invokeMapExecute(mapArgs, mapHandler); + } +} diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobContextManager.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobContextManager.java index eaa2b6e37..0749f027b 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobContextManager.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobContextManager.java @@ -11,7 +11,6 @@ public final class JobContextManager { private static final ThreadLocal JOB_CONTEXT_LOCAL = new ThreadLocal<>(); - public static void setJobContext(JobContext jobContext) { JOB_CONTEXT_LOCAL.set(jobContext); } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/MapInvokeHandler.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/MapInvokeHandler.java new file mode 100644 index 000000000..4787a8271 --- /dev/null +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/MapInvokeHandler.java @@ -0,0 +1,85 @@ +package com.aizuda.snailjob.client.job.core.executor; + +import cn.hutool.core.util.StrUtil; +import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder; +import com.aizuda.snailjob.client.job.core.client.JobNettyClient; +import com.aizuda.snailjob.client.model.ExecuteResult; +import com.aizuda.snailjob.client.model.request.MapTaskRequest; +import com.aizuda.snailjob.common.core.constant.SystemConstants; +import com.aizuda.snailjob.common.core.enums.StatusEnum; +import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException; +import com.aizuda.snailjob.common.core.model.JobContext; +import com.aizuda.snailjob.common.core.model.NettyResult; +import com.aizuda.snailjob.common.core.model.Result; +import com.aizuda.snailjob.common.log.SnailJobLog; +import org.springframework.util.CollectionUtils; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.util.List; + +/** + * @author: opensnail + * @date : 2024-06-26 + * @version : sj_1.1.0 + */ +public final class MapInvokeHandler implements InvocationHandler { + + private static final JobNettyClient CLIENT = RequestBuilder.newBuilder() + .client(JobNettyClient.class) + .async(Boolean.FALSE) + .build(); + + @Override + public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { + return doMap((List) args[0], (String) args[1]); + } + + public ExecuteResult doMap(List taskList, String nextTaskName) { + + if (StrUtil.isBlank(nextTaskName)) { + throw new SnailJobMapReduceException("The next task name can not blank or null {}", nextTaskName); + } + + if (CollectionUtils.isEmpty(taskList)) { + throw new SnailJobMapReduceException("The task list can not empty {}", nextTaskName); + } + + // 超过200提醒用户注意分片数量过多 + if (taskList.size() > 200) { + SnailJobLog.LOCAL.warn("[{}] map task size is too large, network maybe overload... please try to split the tasks.", nextTaskName); + } + + // 超过500强制禁止分片 + if (taskList.size() > 500) { + throw new SnailJobMapReduceException("[{}] map task size is too large, network maybe overload... please try to split the tasks.", nextTaskName); + } + + // taskName 任务命名和根任务名或者最终任务名称一致导致的问题(无限生成子任务或者直接失败) + if (SystemConstants.MAP_ROOT.equals(nextTaskName)) { + throw new SnailJobMapReduceException("The Next taskName can not be {}", SystemConstants.MAP_ROOT); + } + + JobContext jobContext = JobContextManager.getJobContext(); + + // 1. 构造请求 + MapTaskRequest mapTaskRequest = new MapTaskRequest(); + mapTaskRequest.setJobId(jobContext.getJobId()); + mapTaskRequest.setTaskBatchId(jobContext.getTaskBatchId()); + mapTaskRequest.setTaskName(nextTaskName); + mapTaskRequest.setSubTask(taskList); + mapTaskRequest.setParentId(jobContext.getTaskId()); + + // 2. 同步发送请求 + Result result = CLIENT.batchReportMapTask(mapTaskRequest); + if (StatusEnum.NO.getStatus() == result.getStatus() || result.getData()) { + SnailJobLog.LOCAL.info("Map task create successfully!. taskName:[{}] TaskId:[{}] ", nextTaskName, jobContext.getTaskId()); + } else { + throw new SnailJobMapReduceException("map failed for task: " + nextTaskName); + } + + return ExecuteResult.success(); + } + + +} diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/register/scan/JobExecutorScanner.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/register/scan/JobExecutorScanner.java index 80055be8b..532745106 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/register/scan/JobExecutorScanner.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/register/scan/JobExecutorScanner.java @@ -4,9 +4,15 @@ import cn.hutool.core.collection.CollUtil; import com.aizuda.snailjob.client.job.core.IJobExecutor; import com.aizuda.snailjob.client.job.core.Scanner; import com.aizuda.snailjob.client.job.core.annotation.JobExecutor; +import com.aizuda.snailjob.client.job.core.annotation.MapExecutor; +import com.aizuda.snailjob.client.job.core.annotation.MergeReduceExecutor; +import com.aizuda.snailjob.client.job.core.annotation.ReduceExecutor; import com.aizuda.snailjob.client.job.core.cache.JobExecutorInfoCache; import com.aizuda.snailjob.client.job.core.dto.JobArgs; import com.aizuda.snailjob.client.job.core.dto.JobExecutorInfo; +import com.aizuda.snailjob.client.job.core.dto.MapArgs; +import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs; +import com.aizuda.snailjob.client.job.core.dto.ReduceArgs; import com.aizuda.snailjob.common.log.SnailJobLog; import lombok.extern.slf4j.Slf4j; import org.springframework.aop.framework.AopProxyUtils; @@ -20,6 +26,7 @@ import org.springframework.util.ReflectionUtils; import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -49,8 +56,8 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware { Map annotatedMethods = null; try { annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(), - (MethodIntrospector.MetadataLookup) method -> AnnotatedElementUtils - .findMergedAnnotation(method, JobExecutor.class)); + (MethodIntrospector.MetadataLookup) method -> AnnotatedElementUtils + .findMergedAnnotation(method, JobExecutor.class)); } catch (Throwable ex) { SnailJobLog.LOCAL.error("{} JobExecutor加载异常:{}", beanDefinitionName, ex); } @@ -61,7 +68,9 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware { // 通过实现接口进行注册 if (IJobExecutor.class.isAssignableFrom(bean.getClass())) { if (!JobExecutorInfoCache.isExisted(executorClassName)) { - jobExecutorInfoList.add(new JobExecutorInfo(executorClassName, ReflectionUtils.findMethod(bean.getClass(), "jobExecute"), bean)); + jobExecutorInfoList.add(new JobExecutorInfo(executorClassName, + ReflectionUtils.findMethod(bean.getClass(), "jobExecute"), + null,null, null, bean)); } } @@ -75,12 +84,45 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware { method = ReflectionUtils.findMethod(bean.getClass(), jobExecutor.method()); } + // 扫描MapExecutor、ReduceExecutor、MergeReduceExecutor注解 + Map mapExecutorMethodMap = new HashMap<>(); + Method reduceExecutor = null; + Method mergeReduceExecutor = null; + Method[] methods = bean.getClass().getMethods(); + for (final Method method1 : methods) { + Class[] parameterTypes = method1.getParameterTypes(); + MapExecutor mapExecutor = method1.getAnnotation(MapExecutor.class); + if (Objects.nonNull(mapExecutor) + && parameterTypes.length >0 + && parameterTypes[0].isAssignableFrom(MapArgs.class)) { + mapExecutorMethodMap.put(mapExecutor.taskName(), method1); + } + + ReduceExecutor reduceExecutorAnno = method1.getAnnotation(ReduceExecutor.class); + if (Objects.nonNull(reduceExecutorAnno) + && parameterTypes.length >0 + && parameterTypes[0].isAssignableFrom(ReduceArgs.class)) { + reduceExecutor = method1; + continue; + } + + MergeReduceExecutor mergeReduceExecutorAnno = method1.getAnnotation(MergeReduceExecutor.class); + if (Objects.nonNull(mergeReduceExecutorAnno) + && parameterTypes.length >0 + && parameterTypes[0].isAssignableFrom(MergeReduceArgs.class)) { + mergeReduceExecutor = method1; + } + } + JobExecutorInfo jobExecutorInfo = - new JobExecutorInfo( - executorName, - method, - bean - ); + new JobExecutorInfo( + executorName, + method, + mapExecutorMethodMap, + reduceExecutor, + mergeReduceExecutor, + bean + ); jobExecutorInfoList.add(jobExecutorInfo); } @@ -99,11 +141,12 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware { } JobExecutorInfo jobExecutorInfo = - new JobExecutorInfo( - jobExecutor.name(), - executeMethod, - bean - ); + new JobExecutorInfo( + jobExecutor.name(), + executeMethod, + null,null, null, + bean + ); jobExecutorInfoList.add(jobExecutorInfo); } }