diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/utils/LambdaUpdateExpandWrapper.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/utils/LambdaUpdateExpandWrapper.java new file mode 100644 index 00000000..7254d432 --- /dev/null +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/utils/LambdaUpdateExpandWrapper.java @@ -0,0 +1,43 @@ +package com.aizuda.easy.retry.template.datasource.utils; + +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.baomidou.mybatisplus.core.toolkit.support.SFunction; + +/** + * LambdaUpdateWrapper 的拓展 支持指定列的自增、自减 + * + * @author lizhongyuan + */ + +public class LambdaUpdateExpandWrapper extends LambdaUpdateWrapper { + + public LambdaUpdateExpandWrapper(Class entityClass) { + super(entityClass); + } + + /** + * 指定列自增 + * + * @param columns 列引用 + * @param value 增长值 + */ + public LambdaUpdateExpandWrapper incrField(SFunction columns, Object value) { + String columnsToString = super.columnToString(columns); + String format = String.format("%s = %s + %s", columnsToString, columnsToString, formatSqlMaybeWithParam("{0}", value)); + setSql(format); + return this; + } + + /** + * 指定列自减 + * + * @param columns 列引用 + * @param value 减少值 + */ + public LambdaUpdateExpandWrapper descField(SFunction columns, Object value) { + String columnsToString = super.columnToString(columns); + String format = String.format("%s = %s - %s", columnsToString, columnsToString, formatSqlMaybeWithParam("{0}", value)); + setSql(format); + return this; + } +} \ No newline at end of file diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java index d3e8c194..49bad874 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java @@ -12,6 +12,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; +import com.aizuda.easy.retry.template.datasource.utils.LambdaUpdateExpandWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum; import lombok.extern.slf4j.Slf4j; @@ -47,9 +48,10 @@ public class BroadcastClientCallbackHandler extends AbstractClientCallbackHandle } if (jobTask.getRetryCount() < job.getMaxRetryTimes()) { // 更新重试次数 - jobTaskMapper.update(null, Wrappers.lambdaUpdate() - .setSql("retry_count = retry_count + 1") - .apply("retry_count < 3") + jobTaskMapper.update(null, new LambdaUpdateExpandWrapper<>(JobTask.class) + .incrField(JobTask::getRetryCount, 1) + .lt(JobTask::getRetryCount, job.getMaxRetryTimes()) + .eq(JobTask::getId, context.getTaskId()) ); RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo())); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClusterClientCallbackHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClusterClientCallbackHandler.java index e44cca00..33e0764f 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClusterClientCallbackHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClusterClientCallbackHandler.java @@ -14,6 +14,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; +import com.aizuda.easy.retry.template.datasource.utils.LambdaUpdateExpandWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum; import lombok.extern.slf4j.Slf4j; @@ -62,10 +63,11 @@ public class ClusterClientCallbackHandler extends AbstractClientCallbackHandler } String newClient = ClientInfoUtils.generate(serverNode); // 更新重试次数 - jobTaskMapper.update(null, Wrappers.lambdaUpdate() + jobTaskMapper.update(null, new LambdaUpdateExpandWrapper<>(JobTask.class) + .incrField(JobTask::getRetryCount, 1) .set(JobTask::getClientInfo, newClient) - .setSql("retry_count = retry_count + 1") - .apply("retry_count < 3") + .lt(JobTask::getRetryCount, job.getMaxRetryTimes()) + .eq(JobTask::getId, context.getTaskId()) ); RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient)); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java index cfb9295a..df3db518 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java @@ -1,6 +1,8 @@ package com.aizuda.easy.retry.server.job.task.support.callback; import akka.actor.ActorRef; +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.RandomUtil; import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; @@ -14,13 +16,14 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; +import com.aizuda.easy.retry.template.datasource.utils.LambdaUpdateExpandWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.Optional; +import java.util.Set; /** * @author: www.byteblogs.com @@ -50,17 +53,19 @@ public class ShardingClientCallbackHandler extends AbstractClientCallbackHandler return; } if (jobTask.getRetryCount() < job.getMaxRetryTimes()) { - Optional serverNode = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()).stream().findAny(); - if (!serverNode.isPresent()) { + Set nodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()); + if (CollUtil.isEmpty(nodes)) { log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId()); return; } - String newClient = ClientInfoUtils.generate(serverNode.get()); + RegisterNodeInfo serverNode = RandomUtil.randomEle(nodes.toArray(new RegisterNodeInfo[0])); + String newClient = ClientInfoUtils.generate(serverNode); // 更新重试次数 - jobTaskMapper.update(null, Wrappers.lambdaUpdate() + jobTaskMapper.update(null, new LambdaUpdateExpandWrapper<>(JobTask.class) + .incrField(JobTask::getRetryCount, 1) .set(JobTask::getClientInfo, newClient) - .setSql("retry_count = retry_count + 1") - .apply("retry_count < 3") + .lt(JobTask::getRetryCount, job.getMaxRetryTimes()) + .eq(JobTask::getId, context.getTaskId()) ); RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient));