feat:(1.2.0-beta2): 1. 修复服务端生成channel并发问题 2. 修复客户端分片参数为ShardingJobArgs时不生效问题

This commit is contained in:
opensnail 2024-10-04 21:47:28 +08:00
parent b308d2692b
commit 399ed8298d
3 changed files with 41 additions and 37 deletions

View File

@ -114,6 +114,8 @@ public class JobEndPoint {
private static JobContext buildJobContext(DispatchJobRequest dispatchJob) {
JobContext jobContext = new JobContext();
jobContext.setJobId(dispatchJob.getJobId());
jobContext.setShardingTotal(dispatchJob.getShardingTotal());
jobContext.setShardingIndex(dispatchJob.getShardingIndex());
jobContext.setNamespaceId(dispatchJob.getNamespaceId());
jobContext.setTaskId(dispatchJob.getTaskId());
jobContext.setTaskBatchId(dispatchJob.getTaskBatchId());

View File

@ -8,12 +8,9 @@ import com.aizuda.snailjob.client.job.core.annotation.MapExecutor;
import com.aizuda.snailjob.client.job.core.annotation.MergeReduceExecutor;
import com.aizuda.snailjob.client.job.core.annotation.ReduceExecutor;
import com.aizuda.snailjob.client.job.core.cache.JobExecutorInfoCache;
import com.aizuda.snailjob.client.job.core.dto.JobArgs;
import com.aizuda.snailjob.client.job.core.dto.JobExecutorInfo;
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs;
import com.aizuda.snailjob.client.job.core.dto.ReduceArgs;
import com.aizuda.snailjob.client.job.core.dto.*;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.beans.BeansException;
@ -25,11 +22,7 @@ import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
/**
* @author: opensnail
@ -56,8 +49,8 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware {
Map<Method, JobExecutor> annotatedMethods = null;
try {
annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
(MethodIntrospector.MetadataLookup<JobExecutor>) method -> AnnotatedElementUtils
.findMergedAnnotation(method, JobExecutor.class));
(MethodIntrospector.MetadataLookup<JobExecutor>) method -> AnnotatedElementUtils
.findMergedAnnotation(method, JobExecutor.class));
} catch (Throwable ex) {
SnailJobLog.LOCAL.error("{} JobExecutor加载异常{}", beanDefinitionName, ex);
}
@ -69,8 +62,8 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware {
if (IJobExecutor.class.isAssignableFrom(bean.getClass())) {
if (!JobExecutorInfoCache.isExisted(executorClassName)) {
jobExecutorInfoList.add(new JobExecutorInfo(executorClassName,
ReflectionUtils.findMethod(bean.getClass(), "jobExecute"),
null,null, null, bean));
ReflectionUtils.findMethod(bean.getClass(), "jobExecute"),
null, null, null, bean));
}
}
@ -79,7 +72,15 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware {
if (Objects.nonNull(jobExecutor)) {
String executorName = jobExecutor.name();
if (!JobExecutorInfoCache.isExisted(executorName)) {
Method method = ReflectionUtils.findMethod(bean.getClass(), jobExecutor.method(), JobArgs.class);
List<Class<? extends JobArgs>> classes = Lists.newArrayList(ShardingJobArgs.class, JobArgs.class);
Method method = null;
for (Class<? extends JobArgs> clazz : classes) {
method = ReflectionUtils.findMethod(bean.getClass(), jobExecutor.method(), clazz);
if (Objects.nonNull(method)) {
break;
}
}
if (method == null) {
method = ReflectionUtils.findMethod(bean.getClass(), jobExecutor.method());
}
@ -93,36 +94,36 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware {
Class<?>[] parameterTypes = method1.getParameterTypes();
MapExecutor mapExecutor = method1.getAnnotation(MapExecutor.class);
if (Objects.nonNull(mapExecutor)
&& parameterTypes.length >0
&& parameterTypes[0].isAssignableFrom(MapArgs.class)) {
&& parameterTypes.length > 0
&& parameterTypes[0].isAssignableFrom(MapArgs.class)) {
mapExecutorMethodMap.put(mapExecutor.taskName(), method1);
}
ReduceExecutor reduceExecutorAnno = method1.getAnnotation(ReduceExecutor.class);
if (Objects.nonNull(reduceExecutorAnno)
&& parameterTypes.length >0
&& parameterTypes[0].isAssignableFrom(ReduceArgs.class)) {
&& parameterTypes.length > 0
&& parameterTypes[0].isAssignableFrom(ReduceArgs.class)) {
reduceExecutor = method1;
continue;
}
MergeReduceExecutor mergeReduceExecutorAnno = method1.getAnnotation(MergeReduceExecutor.class);
if (Objects.nonNull(mergeReduceExecutorAnno)
&& parameterTypes.length >0
&& parameterTypes[0].isAssignableFrom(MergeReduceArgs.class)) {
&& parameterTypes.length > 0
&& parameterTypes[0].isAssignableFrom(MergeReduceArgs.class)) {
mergeReduceExecutor = method1;
}
}
JobExecutorInfo jobExecutorInfo =
new JobExecutorInfo(
executorName,
method,
mapExecutorMethodMap,
reduceExecutor,
mergeReduceExecutor,
bean
);
new JobExecutorInfo(
executorName,
method,
mapExecutorMethodMap,
reduceExecutor,
mergeReduceExecutor,
bean
);
jobExecutorInfoList.add(jobExecutorInfo);
}
@ -141,12 +142,12 @@ public class JobExecutorScanner implements Scanner, ApplicationContextAware {
}
JobExecutorInfo jobExecutorInfo =
new JobExecutorInfo(
jobExecutor.name(),
executeMethod,
null,null, null,
bean
);
new JobExecutorInfo(
jobExecutor.name(),
executeMethod,
null, null, null,
bean
);
jobExecutorInfoList.add(jobExecutorInfo);
}
}

View File

@ -52,7 +52,7 @@ public class NettyChannel {
* @param body 请求的消息体
* @throws InterruptedException
*/
public static void send(String hostId, String hostIp, Integer port, HttpMethod method, String url, String body, HttpHeaders requestHeaders) throws InterruptedException {
public static synchronized void send(String hostId, String hostIp, Integer port, HttpMethod method, String url, String body, HttpHeaders requestHeaders) throws InterruptedException {
Channel channel = CHANNEL_MAP.get(Pair.of(hostId, hostIp));
if (Objects.isNull(channel) || !channel.isActive()) {
@ -100,7 +100,8 @@ public class NettyChannel {
if (notTimeout) {
// 连接成功
if (channel != null && channel.isActive()) {
SnailJobLog.LOCAL.info("netty client started {} connect to server", channel.localAddress());
SnailJobLog.LOCAL.info("netty client started {} connect to server id:[{}] ip:[{}] channel:[{}]",
channel.localAddress(), hostId, ip, channel);
NettyChannel.setChannel(hostId, ip, channel);
return channel;
}