fix:2.6.0

1:当更细重试次数执行成功之后在进行重试
This commit is contained in:
lizhongyuan3 2024-01-03 15:03:14 +08:00 committed by byteblogs168
parent cda958ef01
commit 85dd27840e
5 changed files with 47 additions and 74 deletions

View File

@ -1,8 +1,6 @@
package com.aizuda.easy.retry.template.datasource.persistence.po; package com.aizuda.easy.retry.template.datasource.persistence.po;
import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.*;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import java.io.Serializable; import java.io.Serializable;
import java.time.LocalDateTime; import java.time.LocalDateTime;
@ -64,6 +62,7 @@ public class JobTask implements Serializable {
/** /**
* 重试次数 * 重试次数
*/ */
@TableField(value = "retry_count", update= "%s+1")
private Integer retryCount; private Integer retryCount;
/** /**

View File

@ -1,43 +0,0 @@
package com.aizuda.easy.retry.template.datasource.utils;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
/**
* LambdaUpdateWrapper 的拓展 支持指定列的自增自减
*
* @author lizhongyuan
*/
public class LambdaUpdateExpandWrapper<T> extends LambdaUpdateWrapper<T> {
public LambdaUpdateExpandWrapper(Class<T> entityClass) {
super(entityClass);
}
/**
* 指定列自增
*
* @param columns 列引用
* @param value 增长值
*/
public LambdaUpdateExpandWrapper<T> incrField(SFunction<T, ?> columns, Object value) {
String columnsToString = super.columnToString(columns);
String format = String.format("%s = %s + %s", columnsToString, columnsToString, formatSqlMaybeWithParam("{0}", value));
setSql(format);
return this;
}
/**
* 指定列自减
*
* @param columns 列引用
* @param value 减少值
*/
public LambdaUpdateExpandWrapper<T> descField(SFunction<T, ?> columns, Object value) {
String columnsToString = super.columnToString(columns);
String format = String.format("%s = %s - %s", columnsToString, columnsToString, formatSqlMaybeWithParam("{0}", value));
setSql(format);
return this;
}
}

View File

@ -12,6 +12,8 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.toolkit.SqlHelper;
import com.aizuda.easy.retry.template.datasource.utils.LambdaUpdateExpandWrapper; import com.aizuda.easy.retry.template.datasource.utils.LambdaUpdateExpandWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum;
@ -48,16 +50,19 @@ public class BroadcastClientCallbackHandler extends AbstractClientCallbackHandle
} }
if (jobTask.getRetryCount() < job.getMaxRetryTimes()) { if (jobTask.getRetryCount() < job.getMaxRetryTimes()) {
// 更新重试次数 // 更新重试次数
jobTaskMapper.update(null, new LambdaUpdateExpandWrapper<>(JobTask.class) JobTask updateJobTask = new JobTask();
.incrField(JobTask::getRetryCount, 1) updateJobTask.setRetryCount(1);
boolean success = SqlHelper.retBool(jobTaskMapper.update(updateJobTask, Wrappers.<JobTask>lambdaUpdate()
.lt(JobTask::getRetryCount, job.getMaxRetryTimes()) .lt(JobTask::getRetryCount, job.getMaxRetryTimes())
.eq(JobTask::getId, context.getTaskId()) .eq(JobTask::getId, context.getTaskId())
); ));
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); if (success) {
realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo())); RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask);
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo()));
actorRef.tell(realJobExecutor, actorRef); ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
// TODO 记录日志 actorRef.tell(realJobExecutor, actorRef);
// TODO 记录日志
}
return; return;
} }
} }

View File

@ -14,6 +14,8 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.toolkit.SqlHelper;
import com.aizuda.easy.retry.template.datasource.utils.LambdaUpdateExpandWrapper; import com.aizuda.easy.retry.template.datasource.utils.LambdaUpdateExpandWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum;
@ -63,18 +65,21 @@ public class ClusterClientCallbackHandler extends AbstractClientCallbackHandler
} }
String newClient = ClientInfoUtils.generate(serverNode); String newClient = ClientInfoUtils.generate(serverNode);
// 更新重试次数 // 更新重试次数
jobTaskMapper.update(null, new LambdaUpdateExpandWrapper<>(JobTask.class) JobTask updateJobTask = new JobTask();
.incrField(JobTask::getRetryCount, 1) updateJobTask.setClientInfo(newClient);
.set(JobTask::getClientInfo, newClient) updateJobTask.setRetryCount(1);
boolean success = SqlHelper.retBool(jobTaskMapper.update(updateJobTask, Wrappers.<JobTask>lambdaUpdate()
.lt(JobTask::getRetryCount, job.getMaxRetryTimes()) .lt(JobTask::getRetryCount, job.getMaxRetryTimes())
.eq(JobTask::getId, context.getTaskId()) .eq(JobTask::getId, context.getTaskId())
); ));
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); // 更新成功执行重试
realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient)); if (success) {
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask);
// 执行重试操作 realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient));
actorRef.tell(realJobExecutor, actorRef); ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
// TODO 记录日志 actorRef.tell(realJobExecutor, actorRef);
// TODO 记录日志
}
return; return;
} }
} }

View File

@ -16,6 +16,8 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.toolkit.SqlHelper;
import com.aizuda.easy.retry.template.datasource.utils.LambdaUpdateExpandWrapper; import com.aizuda.easy.retry.template.datasource.utils.LambdaUpdateExpandWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum;
@ -61,17 +63,22 @@ public class ShardingClientCallbackHandler extends AbstractClientCallbackHandler
RegisterNodeInfo serverNode = RandomUtil.randomEle(nodes.toArray(new RegisterNodeInfo[0])); RegisterNodeInfo serverNode = RandomUtil.randomEle(nodes.toArray(new RegisterNodeInfo[0]));
String newClient = ClientInfoUtils.generate(serverNode); String newClient = ClientInfoUtils.generate(serverNode);
// 更新重试次数 // 更新重试次数
jobTaskMapper.update(null, new LambdaUpdateExpandWrapper<>(JobTask.class) JobTask updateJobTask = new JobTask();
.incrField(JobTask::getRetryCount, 1) updateJobTask.setClientInfo(newClient);
.set(JobTask::getClientInfo, newClient) updateJobTask.setRetryCount(1);
.lt(JobTask::getRetryCount, job.getMaxRetryTimes()) boolean success = SqlHelper.retBool(
.eq(JobTask::getId, context.getTaskId()) jobTaskMapper.update(updateJobTask, Wrappers.<JobTask>lambdaUpdate()
.lt(JobTask::getRetryCount, job.getMaxRetryTimes())
.eq(JobTask::getId, context.getTaskId())
)
); );
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); if (success) {
realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient)); RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask);
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient));
actorRef.tell(realJobExecutor, actorRef); ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
// TODO 记录日志 actorRef.tell(realJobExecutor, actorRef);
// TODO 记录日志
}
return; return;
} }
} }