fix(sj_1.1.1): 1. 修复MailProperties的bean冲突问题 2. 客户端采用10秒一次主动上报 3. 修复argStr空字符序列化问题

This commit is contained in:
opensnail 2024-07-17 10:34:52 +08:00
parent aca65eb1f5
commit fabc7f444a
14 changed files with 118 additions and 42 deletions

View File

@ -21,7 +21,7 @@
<java.version>17</java.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<revision>1.1.0</revision>
<revision>1.1.1</revision>
<netty-all.version>4.1.94.Final</netty-all.version>
<hutool-all.version>5.8.25</hutool-all.version>
<mybatis-plus.version>3.5.7</mybatis-plus.version>

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.client.common.config;
import com.aizuda.snailjob.common.core.alarm.email.MailProperties;
import com.aizuda.snailjob.common.core.alarm.email.SnailJobMailProperties;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
@ -75,7 +75,7 @@ public class SnailJobProperties {
* 邮件配置
*/
@NestedConfigurationProperty
private MailProperties mail = new MailProperties();
private SnailJobMailProperties mail = new SnailJobMailProperties();
@Data
public static class ServerConfig {

View File

@ -0,0 +1,55 @@
package com.aizuda.snailjob.client.common.handler;
import com.aizuda.snailjob.client.common.Lifecycle;
import com.aizuda.snailjob.client.common.NettyClient;
import com.aizuda.snailjob.client.common.rpc.client.RequestBuilder;
import com.aizuda.snailjob.common.core.constant.SystemConstants.BEAT;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.log.SnailJobLog;
import org.springframework.stereotype.Component;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author: oepnsnail
* @date : 2024-07-17
* @since : 1.1.1
*/
@Component
public class ClientRegister implements Lifecycle {
private static final ScheduledExecutorService SCHEDULE_EXECUTOR = Executors.newSingleThreadScheduledExecutor(
r -> new Thread(r, "sj-client-register"));
public static final NettyClient CLIENT;
public static final int REGISTER_TIME = 10;
static {
CLIENT = RequestBuilder.<NettyClient, NettyResult>newBuilder()
.client(NettyClient.class)
.callback(
nettyResult -> {
if (StatusEnum.NO.getStatus().equals(nettyResult.getStatus())) {
SnailJobLog.LOCAL.error("heartbeat check requestId:[{}] message:[{}]", nettyResult.getReqId(), nettyResult.getMessage());
}
})
.build();
}
@Override
public void start() {
SCHEDULE_EXECUTOR.scheduleAtFixedRate(() -> {
try {
CLIENT.beat(BEAT.PING);
} catch (Exception e) {
SnailJobLog.LOCAL.error("发送心跳失败", e);
}
}, 0, REGISTER_TIME, TimeUnit.SECONDS);
}
@Override
public void close() {
}
}

View File

@ -2,8 +2,10 @@ package com.aizuda.snailjob.client.common.rpc.client;
import com.aizuda.snailjob.client.common.NettyClient;
import com.aizuda.snailjob.client.common.event.SnailChannelReconnectEvent;
import com.aizuda.snailjob.client.common.handler.ClientRegister;
import com.aizuda.snailjob.common.core.constant.SystemConstants.BEAT;
import com.aizuda.snailjob.common.core.context.SpringContext;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.model.NettyResult;
import com.aizuda.snailjob.common.core.rpc.RpcContext;
import com.aizuda.snailjob.common.core.util.JsonUtil;
@ -28,17 +30,9 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public class NettyHttpClientHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
private NettyClient client;
private NettyHttpConnectClient nettyHttpConnectClient;
public NettyHttpClientHandler(NettyHttpConnectClient nettyHttpConnectClient) {
client = RequestBuilder.<NettyClient, NettyResult>newBuilder()
.client(NettyClient.class)
.callback(
nettyResult -> SnailJobLog.LOCAL.debug("heartbeat check requestId:[{}]", nettyResult.getReqId()))
.build();
this.nettyHttpConnectClient = nettyHttpConnectClient;
}
@ -112,7 +106,7 @@ public class NettyHttpClientHandler extends SimpleChannelInboundHandler<FullHttp
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
SnailJobLog.LOCAL.debug("userEventTriggered");
if (evt instanceof IdleStateEvent) {
client.beat(BEAT.PING);
ClientRegister.CLIENT.beat(BEAT.PING);
} else {
super.userEventTriggered(ctx, evt);
}

View File

@ -1,6 +1,7 @@
package com.aizuda.snailjob.client.common.rpc.client;
import com.aizuda.snailjob.client.common.Lifecycle;
import com.aizuda.snailjob.client.common.handler.ClientRegister;
import com.aizuda.snailjob.common.log.SnailJobLog;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
@ -48,7 +49,7 @@ public class NettyHttpConnectClient implements Lifecycle {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30, TimeUnit.SECONDS))
.addLast(new IdleStateHandler(0, 0, 3 * ClientRegister.REGISTER_TIME, TimeUnit.SECONDS))
.addLast(new HttpClientCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024))
.addLast(new NettyHttpClientHandler(thisClient));

View File

@ -1,6 +1,5 @@
package com.aizuda.snailjob.client.job.core.executor;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.client.common.log.support.SnailJobLogManager;
import com.aizuda.snailjob.client.job.core.IJobExecutor;
import com.aizuda.snailjob.client.job.core.cache.FutureCache;
@ -102,7 +101,12 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
private static JobArgs buildJobArgs(JobContext jobContext) {
JobArgs jobArgs = new JobArgs();
// 下一个版本即将删除本期兼容此问题
jobArgs.setArgsStr(JsonUtil.toJsonString(jobContext.getJobArgsHolder().getJobParams()));
Object jobParams = jobContext.getJobArgsHolder().getJobParams();
if (jobParams instanceof String) {
jobArgs.setArgsStr((String) jobParams);
} else {
jobArgs.setArgsStr(JsonUtil.toJsonString(jobParams));
}
jobArgs.setJobParams(jobContext.getJobArgsHolder().getJobParams());
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
jobArgs.setTaskBatchId(jobContext.getTaskBatchId());
@ -113,7 +117,13 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
ShardingJobArgs jobArgs = new ShardingJobArgs();
jobArgs.setJobParams(jobContext.getJobArgsHolder().getJobParams());
// 下一个版本即将删除本期兼容此问题
jobArgs.setArgsStr(JsonUtil.toJsonString(jobContext.getJobArgsHolder().getJobParams()));
Object jobParams = jobContext.getJobArgsHolder().getJobParams();
if (jobParams instanceof String) {
jobArgs.setArgsStr((String) jobParams);
} else {
jobArgs.setArgsStr(JsonUtil.toJsonString(jobParams));
}
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
jobArgs.setShardingIndex(jobContext.getShardingIndex());
jobArgs.setShardingTotal(jobContext.getShardingTotal());
@ -124,7 +134,12 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
MapArgs jobArgs = new MapArgs();
JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder();
// 下一个版本即将删除本期兼容此问题
jobArgs.setArgsStr(JsonUtil.toJsonString(jobContext.getJobArgsHolder().getJobParams()));
Object jobParams = jobContext.getJobArgsHolder().getJobParams();
if (jobParams instanceof String) {
jobArgs.setArgsStr((String) jobParams);
} else {
jobArgs.setArgsStr(JsonUtil.toJsonString(jobParams));
}
jobArgs.setJobParams(jobArgsHolder.getJobParams());
jobArgs.setMapResult(jobArgsHolder.getMaps());
jobArgs.setExecutorInfo(jobContext.getExecutorInfo());
@ -136,6 +151,13 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
private static JobArgs buildReduceJobArgs(JobContext jobContext) {
ReduceArgs jobArgs = new ReduceArgs();
JobArgsHolder jobArgsHolder = jobContext.getJobArgsHolder();
// 下一个版本即将删除本期兼容此问题
Object jobParams = jobContext.getJobArgsHolder().getJobParams();
if (jobParams instanceof String) {
jobArgs.setArgsStr((String) jobParams);
} else {
jobArgs.setArgsStr(JsonUtil.toJsonString(jobParams));
}
jobArgs.setJobParams(jobArgsHolder.getJobParams());
Object maps = jobArgsHolder.getMaps();
if (Objects.nonNull(maps)) {

View File

@ -14,7 +14,7 @@ import java.io.Serializable;
@Data
@Configuration
@ConfigurationProperties(value = "snail-job.mail")
public class MailProperties implements Serializable {
public class SnailJobMailProperties implements Serializable {
/**
* 过滤开关

View File

@ -3,7 +3,7 @@ package com.aizuda.snailjob.common.core.alarm.strategy;
import com.aizuda.snailjob.common.core.alarm.AlarmContext;
import com.aizuda.snailjob.common.core.alarm.attribute.EmailAttribute;
import com.aizuda.snailjob.common.core.alarm.email.MailAccount;
import com.aizuda.snailjob.common.core.alarm.email.MailProperties;
import com.aizuda.snailjob.common.core.alarm.email.SnailJobMailProperties;
import com.aizuda.snailjob.common.core.enums.AlarmTypeEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.MailUtils;
@ -22,7 +22,7 @@ import java.util.Optional;
@Component
@RequiredArgsConstructor
public class EmailAlarm extends AbstractAlarm<AlarmContext> {
private final MailProperties mailProperties;
private final SnailJobMailProperties snailJobMailProperties;
private MailAccount mailAccount;
@Override
@ -63,28 +63,28 @@ public class EmailAlarm extends AbstractAlarm<AlarmContext> {
@Override
public void afterPropertiesSet() throws Exception {
super.afterPropertiesSet();
Boolean enabled = mailProperties.getEnabled();
Boolean enabled = snailJobMailProperties.getEnabled();
if (Objects.isNull(enabled) || Boolean.FALSE.equals(enabled)) {
return;
}
mailAccount = initMailAccount(mailProperties);
mailAccount = initMailAccount(snailJobMailProperties);
MailUtils.setMailAccount(mailAccount);
}
private MailAccount initMailAccount(MailProperties mailProperties) {
private MailAccount initMailAccount(SnailJobMailProperties snailJobMailProperties) {
MailAccount account = new MailAccount();
account.setHost(mailProperties.getHost());
account.setPort(mailProperties.getPort());
account.setAuth(Optional.ofNullable(mailProperties.getAuth()).orElse(Boolean.FALSE));
account.setFrom(mailProperties.getFrom());
account.setUser(mailProperties.getUser());
account.setPass(mailProperties.getPass());
account.setSocketFactoryPort(Optional.ofNullable(mailProperties.getPort()).orElse(465));
account.setStarttlsEnable(Optional.ofNullable(mailProperties.getStarttlsEnable()).orElse(Boolean.FALSE));
account.setSslEnable(Optional.ofNullable(mailProperties.getSslEnable()).orElse(Boolean.FALSE));
account.setTimeout(Optional.ofNullable(mailProperties.getTimeout()).orElse(0L));
account.setConnectionTimeout(Optional.ofNullable(mailProperties.getConnectionTimeout()).orElse(0L));
account.setHost(snailJobMailProperties.getHost());
account.setPort(snailJobMailProperties.getPort());
account.setAuth(Optional.ofNullable(snailJobMailProperties.getAuth()).orElse(Boolean.FALSE));
account.setFrom(snailJobMailProperties.getFrom());
account.setUser(snailJobMailProperties.getUser());
account.setPass(snailJobMailProperties.getPass());
account.setSocketFactoryPort(Optional.ofNullable(snailJobMailProperties.getPort()).orElse(465));
account.setStarttlsEnable(Optional.ofNullable(snailJobMailProperties.getStarttlsEnable()).orElse(Boolean.FALSE));
account.setSslEnable(Optional.ofNullable(snailJobMailProperties.getSslEnable()).orElse(Boolean.FALSE));
account.setTimeout(Optional.ofNullable(snailJobMailProperties.getTimeout()).orElse(0L));
account.setConnectionTimeout(Optional.ofNullable(snailJobMailProperties.getConnectionTimeout()).orElse(0L));
return account;
}
}

View File

@ -1,6 +1,6 @@
package com.aizuda.snailjob.server.common.config;
import com.aizuda.snailjob.common.core.alarm.email.MailProperties;
import com.aizuda.snailjob.common.core.alarm.email.SnailJobMailProperties;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
@ -109,6 +109,6 @@ public class SystemProperties {
* 邮件配置
*/
@NestedConfigurationProperty
private MailProperties mail = new MailProperties();
private SnailJobMailProperties mail = new SnailJobMailProperties();
}

View File

@ -93,7 +93,7 @@ public abstract class AbstractRegister implements Register, Lifecycle {
try {
// 批量更新
if (updateDBs.size() != serverNodeMapper.updateBatchExpireAt(updateDBs)) {
if (CollUtil.isNotEmpty(updateDBs) && updateDBs.size() != serverNodeMapper.updateBatchExpireAt(updateDBs)) {
SnailJobLog.LOCAL.warn("续租失败 [{}]", JsonUtil.toJsonString(updateDBs));
}
} catch (Exception e) {
@ -101,7 +101,7 @@ public abstract class AbstractRegister implements Register, Lifecycle {
}
try {
if (insertDBs.size() != serverNodeMapper.insertBatch(insertDBs)) {
if (CollUtil.isNotEmpty(insertDBs) && insertDBs.size() != serverNodeMapper.insertBatch(insertDBs)) {
SnailJobLog.LOCAL.warn("注册节点失败 [{}]", JsonUtil.toJsonString(insertDBs));
}
} catch (DuplicateKeyException ignored) {

View File

@ -102,7 +102,7 @@ public class ClientRegister extends AbstractRegister implements Runnable {
} finally {
// 防止刷的过快
try {
TimeUnit.MILLISECONDS.sleep(5000);
TimeUnit.MILLISECONDS.sleep(2000);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}

View File

@ -88,10 +88,10 @@ public class RequestHandlerActor extends AbstractActor {
String token = headers.get(HeadersEnum.TOKEN.getKey());
if (StrUtil.isBlank(token) || !CacheToken.get(groupName, namespace).equals(token)) {
throw new SnailJobServerException("Token authentication failed. [{}]", token);
throw new SnailJobServerException("Token authentication failed. [namespace:{} groupName:{} token:{}]", namespace, groupName, token);
}
// 注册版本
// 注册版本 此后后续版本将迁移至BeatHttpRequestHandler 只处理beat的心态注册
RegisterContext registerContext = new RegisterContext();
registerContext.setGroupName(groupName);
registerContext.setHostPort(hostPort);

View File

@ -132,6 +132,10 @@ public abstract class AbstractScanGroup extends AbstractActor {
waitUpdateRetryTasks.add(retryTask);
}
if (CollUtil.isEmpty(waitUpdateRetryTasks)) {
return;
}
// 批量更新
retryTaskMapper.updateBatchNextTriggerAtById(scanTask.getGroupPartition(), waitUpdateRetryTasks);

View File

@ -16,7 +16,7 @@
},
{
"name": "snail-job.mail",
"type": "com.aizuda.snailjob.common.core.alarm.email.MailProperties",
"type": "com.aizuda.snailjob.common.core.alarm.email.SnailJobMailProperties",
"description": "邮箱配置",
"sourceType": "com.aizuda.snailjob.server.common.config.SystemProperties"
}