diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java index 4ca286ff6..629d3bfc9 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java @@ -114,6 +114,8 @@ public class JobEndPoint { private static JobContext buildJobContext(DispatchJobRequest dispatchJob) { JobContext jobContext = new JobContext(); jobContext.setJobId(dispatchJob.getJobId()); + jobContext.setShardingTotal(dispatchJob.getShardingTotal()); + jobContext.setShardingIndex(dispatchJob.getShardingIndex()); jobContext.setNamespaceId(dispatchJob.getNamespaceId()); jobContext.setTaskId(dispatchJob.getTaskId()); jobContext.setTaskBatchId(dispatchJob.getTaskBatchId()); diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/register/scan/JobExecutorScanner.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/register/scan/JobExecutorScanner.java index 532745106..a50288914 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/register/scan/JobExecutorScanner.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/register/scan/JobExecutorScanner.java @@ -8,12 +8,9 @@ import com.aizuda.snailjob.client.job.core.annotation.MapExecutor; import com.aizuda.snailjob.client.job.core.annotation.MergeReduceExecutor; import com.aizuda.snailjob.client.job.core.annotation.ReduceExecutor; import com.aizuda.snailjob.client.job.core.cache.JobExecutorInfoCache; -import com.aizuda.snailjob.client.job.core.dto.JobArgs; -import com.aizuda.snailjob.client.job.core.dto.JobExecutorInfo; -import com.aizuda.snailjob.client.job.core.dto.MapArgs; -import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs; -import com.aizuda.snailjob.client.job.core.dto.ReduceArgs; +import com.aizuda.snailjob.client.job.core.dto.*; import com.aizuda.snailjob.common.log.SnailJobLog; +import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.springframework.aop.framework.AopProxyUtils; import org.springframework.beans.BeansException; @@ -25,11 +22,7 @@ import org.springframework.stereotype.Component; import org.springframework.util.ReflectionUtils; import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; /** * @author: opensnail @@ -56,8 +49,8 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware { Map annotatedMethods = null; try { annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(), - (MethodIntrospector.MetadataLookup) method -> AnnotatedElementUtils - .findMergedAnnotation(method, JobExecutor.class)); + (MethodIntrospector.MetadataLookup) method -> AnnotatedElementUtils + .findMergedAnnotation(method, JobExecutor.class)); } catch (Throwable ex) { SnailJobLog.LOCAL.error("{} JobExecutor加载异常:{}", beanDefinitionName, ex); } @@ -69,8 +62,8 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware { if (IJobExecutor.class.isAssignableFrom(bean.getClass())) { if (!JobExecutorInfoCache.isExisted(executorClassName)) { jobExecutorInfoList.add(new JobExecutorInfo(executorClassName, - ReflectionUtils.findMethod(bean.getClass(), "jobExecute"), - null,null, null, bean)); + ReflectionUtils.findMethod(bean.getClass(), "jobExecute"), + null, null, null, bean)); } } @@ -79,7 +72,15 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware { if (Objects.nonNull(jobExecutor)) { String executorName = jobExecutor.name(); if (!JobExecutorInfoCache.isExisted(executorName)) { - Method method = ReflectionUtils.findMethod(bean.getClass(), jobExecutor.method(), JobArgs.class); + List> classes = Lists.newArrayList(ShardingJobArgs.class, JobArgs.class); + Method method = null; + for (Class clazz : classes) { + method = ReflectionUtils.findMethod(bean.getClass(), jobExecutor.method(), clazz); + if (Objects.nonNull(method)) { + break; + } + } + if (method == null) { method = ReflectionUtils.findMethod(bean.getClass(), jobExecutor.method()); } @@ -93,36 +94,36 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware { Class[] parameterTypes = method1.getParameterTypes(); MapExecutor mapExecutor = method1.getAnnotation(MapExecutor.class); if (Objects.nonNull(mapExecutor) - && parameterTypes.length >0 - && parameterTypes[0].isAssignableFrom(MapArgs.class)) { + && parameterTypes.length > 0 + && parameterTypes[0].isAssignableFrom(MapArgs.class)) { mapExecutorMethodMap.put(mapExecutor.taskName(), method1); } ReduceExecutor reduceExecutorAnno = method1.getAnnotation(ReduceExecutor.class); if (Objects.nonNull(reduceExecutorAnno) - && parameterTypes.length >0 - && parameterTypes[0].isAssignableFrom(ReduceArgs.class)) { + && parameterTypes.length > 0 + && parameterTypes[0].isAssignableFrom(ReduceArgs.class)) { reduceExecutor = method1; continue; } MergeReduceExecutor mergeReduceExecutorAnno = method1.getAnnotation(MergeReduceExecutor.class); if (Objects.nonNull(mergeReduceExecutorAnno) - && parameterTypes.length >0 - && parameterTypes[0].isAssignableFrom(MergeReduceArgs.class)) { + && parameterTypes.length > 0 + && parameterTypes[0].isAssignableFrom(MergeReduceArgs.class)) { mergeReduceExecutor = method1; } } JobExecutorInfo jobExecutorInfo = - new JobExecutorInfo( - executorName, - method, - mapExecutorMethodMap, - reduceExecutor, - mergeReduceExecutor, - bean - ); + new JobExecutorInfo( + executorName, + method, + mapExecutorMethodMap, + reduceExecutor, + mergeReduceExecutor, + bean + ); jobExecutorInfoList.add(jobExecutorInfo); } @@ -141,12 +142,12 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware { } JobExecutorInfo jobExecutorInfo = - new JobExecutorInfo( - jobExecutor.name(), - executeMethod, - null,null, null, - bean - ); + new JobExecutorInfo( + jobExecutor.name(), + executeMethod, + null, null, null, + bean + ); jobExecutorInfoList.add(jobExecutorInfo); } } diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/NettyChannel.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/NettyChannel.java index b30773da0..63618704c 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/NettyChannel.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/rpc/client/NettyChannel.java @@ -52,7 +52,7 @@ public class NettyChannel { * @param body 请求的消息体 * @throws InterruptedException */ - public static void send(String hostId, String hostIp, Integer port, HttpMethod method, String url, String body, HttpHeaders requestHeaders) throws InterruptedException { + public static synchronized void send(String hostId, String hostIp, Integer port, HttpMethod method, String url, String body, HttpHeaders requestHeaders) throws InterruptedException { Channel channel = CHANNEL_MAP.get(Pair.of(hostId, hostIp)); if (Objects.isNull(channel) || !channel.isActive()) { @@ -100,7 +100,8 @@ public class NettyChannel { if (notTimeout) { // 连接成功 if (channel != null && channel.isActive()) { - SnailJobLog.LOCAL.info("netty client started {} connect to server", channel.localAddress()); + SnailJobLog.LOCAL.info("netty client started {} connect to server id:[{}] ip:[{}] channel:[{}]", + channel.localAddress(), hostId, ip, channel); NettyChannel.setChannel(hostId, ip, channel); return channel; }