From cda958ef01ffd609b889df72d15e1118ee374045 Mon Sep 17 00:00:00 2001 From: lizhongyuan3 Date: Wed, 3 Jan 2024 13:30:54 +0800 Subject: [PATCH] =?UTF-8?q?fix:2.6.0=201=EF=BC=9A=E7=BD=91=E7=BB=9C?= =?UTF-8?q?=E9=87=8D=E8=AF=95=E7=9A=84=E6=9B=B4=E6=96=B0=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E4=BF=AE=E6=94=B9=202=EF=BC=9A=E5=88=86=E7=89=87=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F=E4=B8=8B=E7=9A=84=E9=9A=8F=E6=9C=BA=E9=87=8D=E8=AF=95?= =?UTF-8?q?=E8=8A=82=E7=82=B9=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../utils/LambdaUpdateExpandWrapper.java | 43 +++++++++++++++++++ .../BroadcastClientCallbackHandler.java | 8 ++-- .../ClusterClientCallbackHandler.java | 8 ++-- .../ShardingClientCallbackHandler.java | 19 +++++--- 4 files changed, 65 insertions(+), 13 deletions(-) create mode 100644 easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/utils/LambdaUpdateExpandWrapper.java diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/utils/LambdaUpdateExpandWrapper.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/utils/LambdaUpdateExpandWrapper.java new file mode 100644 index 000000000..7254d432c --- /dev/null +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/utils/LambdaUpdateExpandWrapper.java @@ -0,0 +1,43 @@ +package com.aizuda.easy.retry.template.datasource.utils; + +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.baomidou.mybatisplus.core.toolkit.support.SFunction; + +/** + * LambdaUpdateWrapper 的拓展 支持指定列的自增、自减 + * + * @author lizhongyuan + */ + +public class LambdaUpdateExpandWrapper extends LambdaUpdateWrapper { + + public LambdaUpdateExpandWrapper(Class entityClass) { + super(entityClass); + } + + /** + * 指定列自增 + * + * @param columns 列引用 + * @param value 增长值 + */ + public LambdaUpdateExpandWrapper incrField(SFunction columns, Object value) { + String columnsToString = super.columnToString(columns); + String format = String.format("%s = %s + %s", columnsToString, columnsToString, formatSqlMaybeWithParam("{0}", value)); + setSql(format); + return this; + } + + /** + * 指定列自减 + * + * @param columns 列引用 + * @param value 减少值 + */ + public LambdaUpdateExpandWrapper descField(SFunction columns, Object value) { + String columnsToString = super.columnToString(columns); + String format = String.format("%s = %s - %s", columnsToString, columnsToString, formatSqlMaybeWithParam("{0}", value)); + setSql(format); + return this; + } +} \ No newline at end of file diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java index d3e8c1944..49bad8741 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java @@ -12,6 +12,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; +import com.aizuda.easy.retry.template.datasource.utils.LambdaUpdateExpandWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum; import lombok.extern.slf4j.Slf4j; @@ -47,9 +48,10 @@ public class BroadcastClientCallbackHandler extends AbstractClientCallbackHandle } if (jobTask.getRetryCount() < job.getMaxRetryTimes()) { // 更新重试次数 - jobTaskMapper.update(null, Wrappers.lambdaUpdate() - .setSql("retry_count = retry_count + 1") - .apply("retry_count < 3") + jobTaskMapper.update(null, new LambdaUpdateExpandWrapper<>(JobTask.class) + .incrField(JobTask::getRetryCount, 1) + .lt(JobTask::getRetryCount, job.getMaxRetryTimes()) + .eq(JobTask::getId, context.getTaskId()) ); RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo())); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClusterClientCallbackHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClusterClientCallbackHandler.java index e44cca00e..33e0764f3 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClusterClientCallbackHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClusterClientCallbackHandler.java @@ -14,6 +14,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; +import com.aizuda.easy.retry.template.datasource.utils.LambdaUpdateExpandWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum; import lombok.extern.slf4j.Slf4j; @@ -62,10 +63,11 @@ public class ClusterClientCallbackHandler extends AbstractClientCallbackHandler } String newClient = ClientInfoUtils.generate(serverNode); // 更新重试次数 - jobTaskMapper.update(null, Wrappers.lambdaUpdate() + jobTaskMapper.update(null, new LambdaUpdateExpandWrapper<>(JobTask.class) + .incrField(JobTask::getRetryCount, 1) .set(JobTask::getClientInfo, newClient) - .setSql("retry_count = retry_count + 1") - .apply("retry_count < 3") + .lt(JobTask::getRetryCount, job.getMaxRetryTimes()) + .eq(JobTask::getId, context.getTaskId()) ); RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient)); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java index cfb9295a8..df3db5183 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java @@ -1,6 +1,8 @@ package com.aizuda.easy.retry.server.job.task.support.callback; import akka.actor.ActorRef; +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.RandomUtil; import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; @@ -14,13 +16,14 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; +import com.aizuda.easy.retry.template.datasource.utils.LambdaUpdateExpandWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.Optional; +import java.util.Set; /** * @author: www.byteblogs.com @@ -50,17 +53,19 @@ public class ShardingClientCallbackHandler extends AbstractClientCallbackHandler return; } if (jobTask.getRetryCount() < job.getMaxRetryTimes()) { - Optional serverNode = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()).stream().findAny(); - if (!serverNode.isPresent()) { + Set nodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()); + if (CollUtil.isEmpty(nodes)) { log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId()); return; } - String newClient = ClientInfoUtils.generate(serverNode.get()); + RegisterNodeInfo serverNode = RandomUtil.randomEle(nodes.toArray(new RegisterNodeInfo[0])); + String newClient = ClientInfoUtils.generate(serverNode); // 更新重试次数 - jobTaskMapper.update(null, Wrappers.lambdaUpdate() + jobTaskMapper.update(null, new LambdaUpdateExpandWrapper<>(JobTask.class) + .incrField(JobTask::getRetryCount, 1) .set(JobTask::getClientInfo, newClient) - .setSql("retry_count = retry_count + 1") - .apply("retry_count < 3") + .lt(JobTask::getRetryCount, job.getMaxRetryTimes()) + .eq(JobTask::getId, context.getTaskId()) ); RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient));