feat(sj_1.1.0-beta2): mapreduce 和 map支持注解方式
This commit is contained in:
parent
51540bc4b4
commit
e658529f8f
@ -0,0 +1,15 @@
|
||||
package com.aizuda.snailjob.client.job.core;
|
||||
|
||||
import com.aizuda.snailjob.client.model.ExecuteResult;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author: opensnail
|
||||
* @date : 2024-06-26
|
||||
* @version : sj_1.1.0
|
||||
*/
|
||||
public interface MapHandler {
|
||||
|
||||
ExecuteResult doMap(List<Object> taskList, String nextTaskName);
|
||||
}
|
@ -0,0 +1,30 @@
|
||||
package com.aizuda.snailjob.client.job.core.annotation;
|
||||
|
||||
import com.aizuda.snailjob.common.core.constant.SystemConstants;
|
||||
|
||||
import java.lang.annotation.Documented;
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Inherited;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
|
||||
/**
|
||||
* @author: opensnail
|
||||
* @date : 2024-06-26
|
||||
* @version : sj_1.1.0
|
||||
*/
|
||||
@Target({ElementType.METHOD})
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Inherited
|
||||
@Documented
|
||||
public @interface MapExecutor {
|
||||
|
||||
/**
|
||||
* 任务名称
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
String taskName() default SystemConstants.MAP_ROOT;
|
||||
|
||||
}
|
@ -0,0 +1,21 @@
|
||||
package com.aizuda.snailjob.client.job.core.annotation;
|
||||
|
||||
import java.lang.annotation.Documented;
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Inherited;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
|
||||
/**
|
||||
* @author: opensnail
|
||||
* @date : 2024-06-26
|
||||
* @version : sj_1.1.0
|
||||
*/
|
||||
@Target({ElementType.METHOD})
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Inherited
|
||||
@Documented
|
||||
public @interface MergeReduceExecutor {
|
||||
|
||||
}
|
@ -0,0 +1,21 @@
|
||||
package com.aizuda.snailjob.client.job.core.annotation;
|
||||
|
||||
import java.lang.annotation.Documented;
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Inherited;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
|
||||
/**
|
||||
* @author: opensnail
|
||||
* @date : 2024-06-26
|
||||
* @version : sj_1.1.0
|
||||
*/
|
||||
@Target({ElementType.METHOD})
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Inherited
|
||||
@Documented
|
||||
public @interface ReduceExecutor {
|
||||
|
||||
}
|
@ -13,6 +13,8 @@ import com.aizuda.snailjob.client.job.core.executor.AbstractJobExecutor;
|
||||
import com.aizuda.snailjob.client.job.core.executor.AbstractMapExecutor;
|
||||
import com.aizuda.snailjob.client.job.core.executor.AbstractMapReduceExecutor;
|
||||
import com.aizuda.snailjob.client.job.core.executor.AnnotationJobExecutor;
|
||||
import com.aizuda.snailjob.client.job.core.executor.AnnotationMapJobExecutor;
|
||||
import com.aizuda.snailjob.client.job.core.executor.AnnotationMapReduceJobExecutor;
|
||||
import com.aizuda.snailjob.client.job.core.log.JobLogMeta;
|
||||
import com.aizuda.snailjob.client.model.StopJobDTO;
|
||||
import com.aizuda.snailjob.client.model.request.DispatchJobRequest;
|
||||
@ -54,7 +56,7 @@ public class JobEndPoint {
|
||||
|
||||
if (Objects.nonNull(dispatchJob.getRetryCount()) && dispatchJob.getRetryCount() > 0) {
|
||||
SnailJobLog.REMOTE.info("任务执行/调度失败执行重试. 重试次数:[{}]",
|
||||
dispatchJob.getRetryCount());
|
||||
dispatchJob.getRetryCount());
|
||||
}
|
||||
|
||||
JobExecutorInfo jobExecutorInfo = JobExecutorInfoCache.get(jobContext.getExecutorInfo());
|
||||
@ -75,7 +77,13 @@ public class JobEndPoint {
|
||||
jobExecutor = (AbstractJobExecutor) executor;
|
||||
}
|
||||
} else {
|
||||
jobExecutor = SpringContext.getBeanByType(AnnotationJobExecutor.class);
|
||||
if (JobTaskTypeEnum.MAP.getType() == jobContext.getTaskType()) {
|
||||
jobExecutor = SpringContext.getBeanByType(AnnotationMapJobExecutor.class);
|
||||
} else if (JobTaskTypeEnum.MAP_REDUCE.getType() == jobContext.getTaskType()) {
|
||||
jobExecutor = SpringContext.getBeanByType(AnnotationMapReduceJobExecutor.class);
|
||||
} else {
|
||||
jobExecutor = SpringContext.getBeanByType(AnnotationJobExecutor.class);
|
||||
}
|
||||
}
|
||||
|
||||
SnailJobLog.REMOTE.info("批次:[{}] 任务调度成功. ", dispatchJob.getTaskBatchId());
|
||||
|
@ -4,6 +4,7 @@ import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author opensnail
|
||||
@ -18,6 +19,12 @@ public class JobExecutorInfo {
|
||||
|
||||
private final Method method;
|
||||
|
||||
private final Map<String, Method> mapExecutorMap;
|
||||
|
||||
Method reduceExecutor;
|
||||
|
||||
Method mergeReduceExecutor;
|
||||
|
||||
private Object executor;
|
||||
|
||||
}
|
||||
|
@ -1,20 +0,0 @@
|
||||
package com.aizuda.snailjob.client.job.core.dto;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author: opensnail
|
||||
* @date : 2024-06-13
|
||||
* @since : sj_1.1.0
|
||||
*/
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@Data
|
||||
public class MapReduceArgs extends JobArgs {
|
||||
|
||||
private String taskName;
|
||||
|
||||
private List<MapArgs> mapArgsList;
|
||||
}
|
@ -1,10 +1,11 @@
|
||||
package com.aizuda.snailjob.client.job.core.executor;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder;
|
||||
import com.aizuda.snailjob.client.job.core.IJobExecutor;
|
||||
import com.aizuda.snailjob.client.job.core.client.JobNettyClient;
|
||||
import com.aizuda.snailjob.client.job.core.MapHandler;
|
||||
import com.aizuda.snailjob.client.job.core.cache.JobExecutorInfoCache;
|
||||
import com.aizuda.snailjob.client.job.core.dto.JobArgs;
|
||||
import com.aizuda.snailjob.client.job.core.dto.JobExecutorInfo;
|
||||
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
|
||||
import com.aizuda.snailjob.client.model.ExecuteResult;
|
||||
import com.aizuda.snailjob.client.model.request.MapTaskRequest;
|
||||
@ -12,13 +13,19 @@ import com.aizuda.snailjob.common.core.constant.SystemConstants;
|
||||
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
||||
import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException;
|
||||
import com.aizuda.snailjob.common.core.model.JobContext;
|
||||
import com.aizuda.snailjob.common.core.model.NettyResult;
|
||||
import com.aizuda.snailjob.common.core.model.Result;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* @author zhengweilin
|
||||
@ -28,55 +35,48 @@ import java.util.List;
|
||||
@Slf4j
|
||||
public abstract class AbstractMapExecutor extends AbstractJobExecutor implements IJobExecutor {
|
||||
|
||||
private static final JobNettyClient CLIENT = RequestBuilder.<JobNettyClient, NettyResult>newBuilder()
|
||||
.client(JobNettyClient.class)
|
||||
.async(Boolean.FALSE)
|
||||
.build();
|
||||
|
||||
@Override
|
||||
protected ExecuteResult doJobExecute(final JobArgs jobArgs) {
|
||||
if (jobArgs instanceof MapArgs) {
|
||||
return this.doJobMapExecute((MapArgs) jobArgs);
|
||||
return this.doJobMapExecute((MapArgs) jobArgs, getMapHandler());
|
||||
}
|
||||
|
||||
throw new SnailJobMapReduceException("For tasks that are not of type map or map reduce, please do not use the AbstractMapExecutor class.");
|
||||
}
|
||||
|
||||
public abstract ExecuteResult doJobMapExecute(MapArgs mapArgs);
|
||||
public abstract ExecuteResult doJobMapExecute(MapArgs mapArgs, final MapHandler mapHandler);
|
||||
|
||||
public ExecuteResult doMap(List<Object> taskList, String nextTaskName) {
|
||||
private MapHandler getMapHandler() {
|
||||
return (MapHandler) Proxy.newProxyInstance(MapHandler.class.getClassLoader(),
|
||||
new Class[]{MapHandler.class}, new MapInvokeHandler());
|
||||
}
|
||||
|
||||
if (StrUtil.isBlank(nextTaskName)) {
|
||||
throw new SnailJobMapReduceException("The next task name can not blank or null {}", nextTaskName);
|
||||
protected ExecuteResult invokeMapExecute (MapArgs mapArgs, final MapHandler mapHandler) {
|
||||
JobExecutorInfo jobExecutorInfo = JobExecutorInfoCache.get(mapArgs.getExecutorInfo());
|
||||
|
||||
if (Objects.isNull(jobExecutorInfo)) {
|
||||
throw new SnailJobMapReduceException("[{}] not found", mapArgs.getExecutorInfo());
|
||||
}
|
||||
|
||||
if (CollectionUtils.isEmpty(taskList)) {
|
||||
throw new SnailJobMapReduceException("The task list can not empty {}", nextTaskName);
|
||||
Map<String, Method> mapExecutorMap = Optional.ofNullable(jobExecutorInfo.getMapExecutorMap())
|
||||
.orElse(new HashMap<>());
|
||||
Method method = mapExecutorMap.get(mapArgs.getTaskName());
|
||||
|
||||
if (Objects.isNull(method)) {
|
||||
throw new SnailJobMapReduceException(
|
||||
"[{}] MapTask execution method not found. Please configure the @MapExecutor annotation",
|
||||
mapArgs.getExecutorInfo());
|
||||
|
||||
}
|
||||
|
||||
// taskName 任务命名和根任务名或者最终任务名称一致导致的问题(无限生成子任务或者直接失败)
|
||||
if (SystemConstants.MAP_ROOT.equals(nextTaskName)) {
|
||||
throw new SnailJobMapReduceException("The Next taskName can not be {}", SystemConstants.MAP_ROOT);
|
||||
Class<?>[] paramTypes = method.getParameterTypes();
|
||||
if (paramTypes.length == 1) {
|
||||
return (ExecuteResult) ReflectionUtils.invokeMethod(method, jobExecutorInfo.getExecutor(), mapArgs);
|
||||
} else if (paramTypes.length == 2) {
|
||||
return (ExecuteResult) ReflectionUtils.invokeMethod(method, jobExecutorInfo.getExecutor(), mapArgs,
|
||||
mapHandler);
|
||||
}
|
||||
|
||||
JobContext jobContext = JobContextManager.getJobContext();
|
||||
|
||||
// 1. 构造请求
|
||||
MapTaskRequest mapTaskRequest = new MapTaskRequest();
|
||||
mapTaskRequest.setJobId(jobContext.getJobId());
|
||||
mapTaskRequest.setTaskBatchId(jobContext.getTaskBatchId());
|
||||
mapTaskRequest.setTaskName(nextTaskName);
|
||||
mapTaskRequest.setSubTask(taskList);
|
||||
mapTaskRequest.setParentId(jobContext.getTaskId());
|
||||
|
||||
// 2. 同步发送请求
|
||||
Result<Boolean> result = CLIENT.batchReportMapTask(mapTaskRequest);
|
||||
if (StatusEnum.NO.getStatus() == result.getStatus() || result.getData()) {
|
||||
SnailJobLog.LOCAL.info("Map task create successfully!. taskName:[{}] TaskId:[{}] ", nextTaskName, jobContext.getTaskId());
|
||||
} else {
|
||||
throw new SnailJobMapReduceException("map failed for task: " + nextTaskName);
|
||||
}
|
||||
|
||||
return ExecuteResult.success();
|
||||
throw new SnailJobMapReduceException("Executor for [{}] not found", mapArgs.getTaskName());
|
||||
}
|
||||
}
|
||||
|
@ -1,8 +1,6 @@
|
||||
package com.aizuda.snailjob.client.job.core.executor;
|
||||
|
||||
import com.aizuda.snailjob.client.job.core.dto.JobArgs;
|
||||
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
|
||||
import com.aizuda.snailjob.client.job.core.dto.MapReduceArgs;
|
||||
import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs;
|
||||
import com.aizuda.snailjob.client.job.core.dto.ReduceArgs;
|
||||
import com.aizuda.snailjob.client.model.ExecuteResult;
|
||||
@ -10,8 +8,6 @@ import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
|
||||
import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException;
|
||||
import com.aizuda.snailjob.common.core.model.JobContext;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author zhengweilin
|
||||
* @version 1.0.0
|
||||
|
@ -0,0 +1,32 @@
|
||||
package com.aizuda.snailjob.client.job.core.executor;
|
||||
|
||||
import com.aizuda.snailjob.client.job.core.MapHandler;
|
||||
import com.aizuda.snailjob.client.job.core.cache.JobExecutorInfoCache;
|
||||
import com.aizuda.snailjob.client.job.core.dto.JobExecutorInfo;
|
||||
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
|
||||
import com.aizuda.snailjob.client.model.ExecuteResult;
|
||||
import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* 基于注解的Map任务执行器
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2024-06-26 22:20:36
|
||||
* @since sj_1.1.0
|
||||
*/
|
||||
@Component
|
||||
public class AnnotationMapJobExecutor extends AbstractMapExecutor {
|
||||
|
||||
@Override
|
||||
public ExecuteResult doJobMapExecute(final MapArgs mapArgs, final MapHandler mapHandler) {
|
||||
return invokeMapExecute(mapArgs, mapHandler);
|
||||
}
|
||||
}
|
@ -0,0 +1,78 @@
|
||||
package com.aizuda.snailjob.client.job.core.executor;
|
||||
|
||||
import com.aizuda.snailjob.client.job.core.MapHandler;
|
||||
import com.aizuda.snailjob.client.job.core.cache.JobExecutorInfoCache;
|
||||
import com.aizuda.snailjob.client.job.core.dto.JobExecutorInfo;
|
||||
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
|
||||
import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs;
|
||||
import com.aizuda.snailjob.client.job.core.dto.ReduceArgs;
|
||||
import com.aizuda.snailjob.client.model.ExecuteResult;
|
||||
import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* 基于注解的MapReduce执行器
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2024-06-26 22:20:36
|
||||
* @since sj_1.1.0
|
||||
*/
|
||||
@Component
|
||||
public class AnnotationMapReduceJobExecutor extends AbstractMapReduceExecutor {
|
||||
|
||||
@Override
|
||||
protected ExecuteResult doReduceExecute(final ReduceArgs reduceArgs) {
|
||||
JobExecutorInfo jobExecutorInfo = JobExecutorInfoCache.get(reduceArgs.getExecutorInfo());
|
||||
if (Objects.isNull(jobExecutorInfo)) {
|
||||
throw new SnailJobMapReduceException("[{}]未发现", reduceArgs.getExecutorInfo());
|
||||
}
|
||||
|
||||
if (Objects.isNull(jobExecutorInfo.getReduceExecutor())) {
|
||||
throw new SnailJobMapReduceException("未发现ReduceTask执行方法, 请配置@ReduceExecutor注解",
|
||||
reduceArgs.getExecutorInfo());
|
||||
}
|
||||
|
||||
Class<?>[] paramTypes = jobExecutorInfo.getReduceExecutor().getParameterTypes();
|
||||
if (paramTypes.length > 0) {
|
||||
return (ExecuteResult) ReflectionUtils.invokeMethod(jobExecutorInfo.getReduceExecutor(),
|
||||
jobExecutorInfo.getExecutor(), reduceArgs);
|
||||
}
|
||||
|
||||
throw new SnailJobMapReduceException("[{}]未发现ReduceTask执行方法", reduceArgs.getExecutorInfo());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ExecuteResult doMergeReduceExecute(final MergeReduceArgs mergeReduceArgs) {
|
||||
JobExecutorInfo jobExecutorInfo = JobExecutorInfoCache.get(mergeReduceArgs.getExecutorInfo());
|
||||
|
||||
if (Objects.isNull(jobExecutorInfo)) {
|
||||
throw new SnailJobMapReduceException("[{}]未发现", mergeReduceArgs.getExecutorInfo());
|
||||
}
|
||||
|
||||
if (Objects.isNull(jobExecutorInfo.getReduceExecutor())) {
|
||||
throw new SnailJobMapReduceException("[{}]未发现ReduceTask执行方法, 请配置@MergeReduceExecutor注解",
|
||||
mergeReduceArgs.getExecutorInfo());
|
||||
}
|
||||
|
||||
Class<?>[] paramTypes = jobExecutorInfo.getMergeReduceExecutor().getParameterTypes();
|
||||
if (paramTypes.length > 0) {
|
||||
return (ExecuteResult) ReflectionUtils.invokeMethod(jobExecutorInfo.getReduceExecutor(),
|
||||
jobExecutorInfo.getExecutor(), mergeReduceArgs);
|
||||
}
|
||||
|
||||
throw new SnailJobMapReduceException("[{}]未发现MergeReduceTask执行方法 [{}]",
|
||||
mergeReduceArgs.getExecutorInfo());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecuteResult doJobMapExecute(final MapArgs mapArgs, final MapHandler mapHandler) {
|
||||
return invokeMapExecute(mapArgs, mapHandler);
|
||||
}
|
||||
}
|
@ -11,7 +11,6 @@ public final class JobContextManager {
|
||||
|
||||
private static final ThreadLocal<JobContext> JOB_CONTEXT_LOCAL = new ThreadLocal<>();
|
||||
|
||||
|
||||
public static void setJobContext(JobContext jobContext) {
|
||||
JOB_CONTEXT_LOCAL.set(jobContext);
|
||||
}
|
||||
|
@ -0,0 +1,85 @@
|
||||
package com.aizuda.snailjob.client.job.core.executor;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder;
|
||||
import com.aizuda.snailjob.client.job.core.client.JobNettyClient;
|
||||
import com.aizuda.snailjob.client.model.ExecuteResult;
|
||||
import com.aizuda.snailjob.client.model.request.MapTaskRequest;
|
||||
import com.aizuda.snailjob.common.core.constant.SystemConstants;
|
||||
import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
||||
import com.aizuda.snailjob.common.core.exception.SnailJobMapReduceException;
|
||||
import com.aizuda.snailjob.common.core.model.JobContext;
|
||||
import com.aizuda.snailjob.common.core.model.NettyResult;
|
||||
import com.aizuda.snailjob.common.core.model.Result;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.lang.reflect.InvocationHandler;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author: opensnail
|
||||
* @date : 2024-06-26
|
||||
* @version : sj_1.1.0
|
||||
*/
|
||||
public final class MapInvokeHandler implements InvocationHandler {
|
||||
|
||||
private static final JobNettyClient CLIENT = RequestBuilder.<JobNettyClient, NettyResult>newBuilder()
|
||||
.client(JobNettyClient.class)
|
||||
.async(Boolean.FALSE)
|
||||
.build();
|
||||
|
||||
@Override
|
||||
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
|
||||
return doMap((List<Object>) args[0], (String) args[1]);
|
||||
}
|
||||
|
||||
public ExecuteResult doMap(List<Object> taskList, String nextTaskName) {
|
||||
|
||||
if (StrUtil.isBlank(nextTaskName)) {
|
||||
throw new SnailJobMapReduceException("The next task name can not blank or null {}", nextTaskName);
|
||||
}
|
||||
|
||||
if (CollectionUtils.isEmpty(taskList)) {
|
||||
throw new SnailJobMapReduceException("The task list can not empty {}", nextTaskName);
|
||||
}
|
||||
|
||||
// 超过200提醒用户注意分片数量过多
|
||||
if (taskList.size() > 200) {
|
||||
SnailJobLog.LOCAL.warn("[{}] map task size is too large, network maybe overload... please try to split the tasks.", nextTaskName);
|
||||
}
|
||||
|
||||
// 超过500强制禁止分片
|
||||
if (taskList.size() > 500) {
|
||||
throw new SnailJobMapReduceException("[{}] map task size is too large, network maybe overload... please try to split the tasks.", nextTaskName);
|
||||
}
|
||||
|
||||
// taskName 任务命名和根任务名或者最终任务名称一致导致的问题(无限生成子任务或者直接失败)
|
||||
if (SystemConstants.MAP_ROOT.equals(nextTaskName)) {
|
||||
throw new SnailJobMapReduceException("The Next taskName can not be {}", SystemConstants.MAP_ROOT);
|
||||
}
|
||||
|
||||
JobContext jobContext = JobContextManager.getJobContext();
|
||||
|
||||
// 1. 构造请求
|
||||
MapTaskRequest mapTaskRequest = new MapTaskRequest();
|
||||
mapTaskRequest.setJobId(jobContext.getJobId());
|
||||
mapTaskRequest.setTaskBatchId(jobContext.getTaskBatchId());
|
||||
mapTaskRequest.setTaskName(nextTaskName);
|
||||
mapTaskRequest.setSubTask(taskList);
|
||||
mapTaskRequest.setParentId(jobContext.getTaskId());
|
||||
|
||||
// 2. 同步发送请求
|
||||
Result<Boolean> result = CLIENT.batchReportMapTask(mapTaskRequest);
|
||||
if (StatusEnum.NO.getStatus() == result.getStatus() || result.getData()) {
|
||||
SnailJobLog.LOCAL.info("Map task create successfully!. taskName:[{}] TaskId:[{}] ", nextTaskName, jobContext.getTaskId());
|
||||
} else {
|
||||
throw new SnailJobMapReduceException("map failed for task: " + nextTaskName);
|
||||
}
|
||||
|
||||
return ExecuteResult.success();
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -4,9 +4,15 @@ import cn.hutool.core.collection.CollUtil;
|
||||
import com.aizuda.snailjob.client.job.core.IJobExecutor;
|
||||
import com.aizuda.snailjob.client.job.core.Scanner;
|
||||
import com.aizuda.snailjob.client.job.core.annotation.JobExecutor;
|
||||
import com.aizuda.snailjob.client.job.core.annotation.MapExecutor;
|
||||
import com.aizuda.snailjob.client.job.core.annotation.MergeReduceExecutor;
|
||||
import com.aizuda.snailjob.client.job.core.annotation.ReduceExecutor;
|
||||
import com.aizuda.snailjob.client.job.core.cache.JobExecutorInfoCache;
|
||||
import com.aizuda.snailjob.client.job.core.dto.JobArgs;
|
||||
import com.aizuda.snailjob.client.job.core.dto.JobExecutorInfo;
|
||||
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
|
||||
import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs;
|
||||
import com.aizuda.snailjob.client.job.core.dto.ReduceArgs;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.aop.framework.AopProxyUtils;
|
||||
@ -20,6 +26,7 @@ import org.springframework.util.ReflectionUtils;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
@ -49,8 +56,8 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware {
|
||||
Map<Method, JobExecutor> annotatedMethods = null;
|
||||
try {
|
||||
annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
|
||||
(MethodIntrospector.MetadataLookup<JobExecutor>) method -> AnnotatedElementUtils
|
||||
.findMergedAnnotation(method, JobExecutor.class));
|
||||
(MethodIntrospector.MetadataLookup<JobExecutor>) method -> AnnotatedElementUtils
|
||||
.findMergedAnnotation(method, JobExecutor.class));
|
||||
} catch (Throwable ex) {
|
||||
SnailJobLog.LOCAL.error("{} JobExecutor加载异常:{}", beanDefinitionName, ex);
|
||||
}
|
||||
@ -61,7 +68,9 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware {
|
||||
// 通过实现接口进行注册
|
||||
if (IJobExecutor.class.isAssignableFrom(bean.getClass())) {
|
||||
if (!JobExecutorInfoCache.isExisted(executorClassName)) {
|
||||
jobExecutorInfoList.add(new JobExecutorInfo(executorClassName, ReflectionUtils.findMethod(bean.getClass(), "jobExecute"), bean));
|
||||
jobExecutorInfoList.add(new JobExecutorInfo(executorClassName,
|
||||
ReflectionUtils.findMethod(bean.getClass(), "jobExecute"),
|
||||
null,null, null, bean));
|
||||
}
|
||||
}
|
||||
|
||||
@ -75,12 +84,45 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware {
|
||||
method = ReflectionUtils.findMethod(bean.getClass(), jobExecutor.method());
|
||||
}
|
||||
|
||||
// 扫描MapExecutor、ReduceExecutor、MergeReduceExecutor注解
|
||||
Map<String, Method> mapExecutorMethodMap = new HashMap<>();
|
||||
Method reduceExecutor = null;
|
||||
Method mergeReduceExecutor = null;
|
||||
Method[] methods = bean.getClass().getMethods();
|
||||
for (final Method method1 : methods) {
|
||||
Class<?>[] parameterTypes = method1.getParameterTypes();
|
||||
MapExecutor mapExecutor = method1.getAnnotation(MapExecutor.class);
|
||||
if (Objects.nonNull(mapExecutor)
|
||||
&& parameterTypes.length >0
|
||||
&& parameterTypes[0].isAssignableFrom(MapArgs.class)) {
|
||||
mapExecutorMethodMap.put(mapExecutor.taskName(), method1);
|
||||
}
|
||||
|
||||
ReduceExecutor reduceExecutorAnno = method1.getAnnotation(ReduceExecutor.class);
|
||||
if (Objects.nonNull(reduceExecutorAnno)
|
||||
&& parameterTypes.length >0
|
||||
&& parameterTypes[0].isAssignableFrom(ReduceArgs.class)) {
|
||||
reduceExecutor = method1;
|
||||
continue;
|
||||
}
|
||||
|
||||
MergeReduceExecutor mergeReduceExecutorAnno = method1.getAnnotation(MergeReduceExecutor.class);
|
||||
if (Objects.nonNull(mergeReduceExecutorAnno)
|
||||
&& parameterTypes.length >0
|
||||
&& parameterTypes[0].isAssignableFrom(MergeReduceArgs.class)) {
|
||||
mergeReduceExecutor = method1;
|
||||
}
|
||||
}
|
||||
|
||||
JobExecutorInfo jobExecutorInfo =
|
||||
new JobExecutorInfo(
|
||||
executorName,
|
||||
method,
|
||||
bean
|
||||
);
|
||||
new JobExecutorInfo(
|
||||
executorName,
|
||||
method,
|
||||
mapExecutorMethodMap,
|
||||
reduceExecutor,
|
||||
mergeReduceExecutor,
|
||||
bean
|
||||
);
|
||||
jobExecutorInfoList.add(jobExecutorInfo);
|
||||
}
|
||||
|
||||
@ -99,11 +141,12 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware {
|
||||
}
|
||||
|
||||
JobExecutorInfo jobExecutorInfo =
|
||||
new JobExecutorInfo(
|
||||
jobExecutor.name(),
|
||||
executeMethod,
|
||||
bean
|
||||
);
|
||||
new JobExecutorInfo(
|
||||
jobExecutor.name(),
|
||||
executeMethod,
|
||||
null,null, null,
|
||||
bean
|
||||
);
|
||||
jobExecutorInfoList.add(jobExecutorInfo);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user