fix:2.6.0
1:网络重试的更新任务修改 2:分片模式下的随机重试节点修改
This commit is contained in:
parent
921f4a98bb
commit
cda958ef01
@ -0,0 +1,43 @@
|
|||||||
|
package com.aizuda.easy.retry.template.datasource.utils;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
||||||
|
import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* LambdaUpdateWrapper 的拓展 支持指定列的自增、自减
|
||||||
|
*
|
||||||
|
* @author lizhongyuan
|
||||||
|
*/
|
||||||
|
|
||||||
|
public class LambdaUpdateExpandWrapper<T> extends LambdaUpdateWrapper<T> {
|
||||||
|
|
||||||
|
public LambdaUpdateExpandWrapper(Class<T> entityClass) {
|
||||||
|
super(entityClass);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 指定列自增
|
||||||
|
*
|
||||||
|
* @param columns 列引用
|
||||||
|
* @param value 增长值
|
||||||
|
*/
|
||||||
|
public LambdaUpdateExpandWrapper<T> incrField(SFunction<T, ?> columns, Object value) {
|
||||||
|
String columnsToString = super.columnToString(columns);
|
||||||
|
String format = String.format("%s = %s + %s", columnsToString, columnsToString, formatSqlMaybeWithParam("{0}", value));
|
||||||
|
setSql(format);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 指定列自减
|
||||||
|
*
|
||||||
|
* @param columns 列引用
|
||||||
|
* @param value 减少值
|
||||||
|
*/
|
||||||
|
public LambdaUpdateExpandWrapper<T> descField(SFunction<T, ?> columns, Object value) {
|
||||||
|
String columnsToString = super.columnToString(columns);
|
||||||
|
String format = String.format("%s = %s - %s", columnsToString, columnsToString, formatSqlMaybeWithParam("{0}", value));
|
||||||
|
setSql(format);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
}
|
@ -12,6 +12,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
|
|||||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.utils.LambdaUpdateExpandWrapper;
|
||||||
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||||
import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum;
|
import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@ -47,9 +48,10 @@ public class BroadcastClientCallbackHandler extends AbstractClientCallbackHandle
|
|||||||
}
|
}
|
||||||
if (jobTask.getRetryCount() < job.getMaxRetryTimes()) {
|
if (jobTask.getRetryCount() < job.getMaxRetryTimes()) {
|
||||||
// 更新重试次数
|
// 更新重试次数
|
||||||
jobTaskMapper.update(null, Wrappers.<JobTask>lambdaUpdate()
|
jobTaskMapper.update(null, new LambdaUpdateExpandWrapper<>(JobTask.class)
|
||||||
.setSql("retry_count = retry_count + 1")
|
.incrField(JobTask::getRetryCount, 1)
|
||||||
.apply("retry_count < 3")
|
.lt(JobTask::getRetryCount, job.getMaxRetryTimes())
|
||||||
|
.eq(JobTask::getId, context.getTaskId())
|
||||||
);
|
);
|
||||||
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask);
|
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask);
|
||||||
realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo()));
|
realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo()));
|
||||||
|
@ -14,6 +14,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
|
|||||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.utils.LambdaUpdateExpandWrapper;
|
||||||
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||||
import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum;
|
import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@ -62,10 +63,11 @@ public class ClusterClientCallbackHandler extends AbstractClientCallbackHandler
|
|||||||
}
|
}
|
||||||
String newClient = ClientInfoUtils.generate(serverNode);
|
String newClient = ClientInfoUtils.generate(serverNode);
|
||||||
// 更新重试次数
|
// 更新重试次数
|
||||||
jobTaskMapper.update(null, Wrappers.<JobTask>lambdaUpdate()
|
jobTaskMapper.update(null, new LambdaUpdateExpandWrapper<>(JobTask.class)
|
||||||
|
.incrField(JobTask::getRetryCount, 1)
|
||||||
.set(JobTask::getClientInfo, newClient)
|
.set(JobTask::getClientInfo, newClient)
|
||||||
.setSql("retry_count = retry_count + 1")
|
.lt(JobTask::getRetryCount, job.getMaxRetryTimes())
|
||||||
.apply("retry_count < 3")
|
.eq(JobTask::getId, context.getTaskId())
|
||||||
);
|
);
|
||||||
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask);
|
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask);
|
||||||
realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient));
|
realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient));
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
package com.aizuda.easy.retry.server.job.task.support.callback;
|
package com.aizuda.easy.retry.server.job.task.support.callback;
|
||||||
|
|
||||||
import akka.actor.ActorRef;
|
import akka.actor.ActorRef;
|
||||||
|
import cn.hutool.core.collection.CollUtil;
|
||||||
|
import cn.hutool.core.util.RandomUtil;
|
||||||
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
|
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
|
||||||
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||||
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
|
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
|
||||||
@ -14,13 +16,14 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
|
|||||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
|
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
|
||||||
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
|
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
|
||||||
|
import com.aizuda.easy.retry.template.datasource.utils.LambdaUpdateExpandWrapper;
|
||||||
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||||
import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum;
|
import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.util.Optional;
|
import java.util.Set;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author: www.byteblogs.com
|
* @author: www.byteblogs.com
|
||||||
@ -50,17 +53,19 @@ public class ShardingClientCallbackHandler extends AbstractClientCallbackHandler
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (jobTask.getRetryCount() < job.getMaxRetryTimes()) {
|
if (jobTask.getRetryCount() < job.getMaxRetryTimes()) {
|
||||||
Optional<RegisterNodeInfo> serverNode = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()).stream().findAny();
|
Set<RegisterNodeInfo> nodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId());
|
||||||
if (!serverNode.isPresent()) {
|
if (CollUtil.isEmpty(nodes)) {
|
||||||
log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
|
log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
String newClient = ClientInfoUtils.generate(serverNode.get());
|
RegisterNodeInfo serverNode = RandomUtil.randomEle(nodes.toArray(new RegisterNodeInfo[0]));
|
||||||
|
String newClient = ClientInfoUtils.generate(serverNode);
|
||||||
// 更新重试次数
|
// 更新重试次数
|
||||||
jobTaskMapper.update(null, Wrappers.<JobTask>lambdaUpdate()
|
jobTaskMapper.update(null, new LambdaUpdateExpandWrapper<>(JobTask.class)
|
||||||
|
.incrField(JobTask::getRetryCount, 1)
|
||||||
.set(JobTask::getClientInfo, newClient)
|
.set(JobTask::getClientInfo, newClient)
|
||||||
.setSql("retry_count = retry_count + 1")
|
.lt(JobTask::getRetryCount, job.getMaxRetryTimes())
|
||||||
.apply("retry_count < 3")
|
.eq(JobTask::getId, context.getTaskId())
|
||||||
);
|
);
|
||||||
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask);
|
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask);
|
||||||
realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient));
|
realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient));
|
||||||
|
Loading…
Reference in New Issue
Block a user