From 47bd736d2f789f4386efc397b7416ace62451023 Mon Sep 17 00:00:00 2001 From: lizhongyuan3 Date: Tue, 2 Jan 2024 16:58:16 +0800 Subject: [PATCH 1/3] =?UTF-8?q?feat:2.6.0=201=EF=BC=9A=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E7=BD=91=E7=BB=9C=E5=A4=B1=E8=B4=A5=EF=BC=8C=E5=AE=A2=E6=88=B7?= =?UTF-8?q?=E7=AB=AF=E4=B8=8A=E6=8A=A5=E5=A4=B1=E8=B4=A5=EF=BC=8C=E8=AF=B7?= =?UTF-8?q?=E6=B1=82=E5=AE=A2=E6=88=B7=E7=AB=AF=E8=BF=94=E5=9B=9E=E5=A4=B1?= =?UTF-8?q?=E8=B4=A5=E7=9A=84=E9=87=8D=E8=AF=95=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../BroadcastClientCallbackHandler.java | 34 +++++++++++++ .../ClusterClientCallbackHandler.java | 50 +++++++++++++++++++ .../ShardingClientCallbackHandler.java | 45 +++++++++++++++++ .../support/executor/RequestClientActor.java | 25 ++++------ 4 files changed, 138 insertions(+), 16 deletions(-) diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java index ef922280..0cf6d625 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java @@ -1,11 +1,20 @@ package com.aizuda.easy.retry.server.job.task.support.callback; import akka.actor.ActorRef; +import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.util.ClientInfoUtils; +import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO; import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.Job; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** @@ -17,6 +26,11 @@ import org.springframework.stereotype.Component; @Slf4j public class BroadcastClientCallbackHandler extends AbstractClientCallbackHandler { + @Autowired + private JobTaskMapper jobTaskMapper; + @Autowired + private JobMapper jobMapper; + @Override public TaskTypeEnum getTaskInstanceType() { return TaskTypeEnum.BROADCAST; @@ -24,6 +38,26 @@ public class BroadcastClientCallbackHandler extends AbstractClientCallbackHandle @Override protected void doCallback(final ClientCallbackContext context) { + if (context.getTaskStatus().equals(JobTaskStatusEnum.FAIL.getStatus())) { + JobTask jobTask = jobTaskMapper.selectById(context.getTaskId()); + Job job = jobMapper.selectById(context.getJobId()); + if (jobTask == null || job == null) { + return; + } + if (jobTask.getRetryCount() < job.getMaxRetryTimes()) { + // 更新重试次数 + jobTaskMapper.update(null, Wrappers.lambdaUpdate() + .setSql("retry_count = retry_count + 1") + .apply("retry_count < 3") + ); + RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); + realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo())); + ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); + actorRef.tell(realJobExecutor, actorRef); + // TODO 记录日志 + return; + } + } JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(context); jobExecutorResultDTO.setTaskId(context.getTaskId()); jobExecutorResultDTO.setMessage(context.getExecuteResult().getMessage()); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClusterClientCallbackHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClusterClientCallbackHandler.java index d1411cc9..e7f27eb9 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClusterClientCallbackHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClusterClientCallbackHandler.java @@ -1,13 +1,26 @@ package com.aizuda.easy.retry.server.job.task.support.callback; import akka.actor.ActorRef; +import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; +import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler; +import com.aizuda.easy.retry.server.common.util.ClientInfoUtils; +import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO; import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.Job; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.util.Objects; + /** * @author www.byteblogs.com * @date 2023-10-03 23:12:12 @@ -17,6 +30,13 @@ import org.springframework.stereotype.Component; @Slf4j public class ClusterClientCallbackHandler extends AbstractClientCallbackHandler { + @Autowired + private ClientNodeAllocateHandler clientNodeAllocateHandler; + @Autowired + private JobTaskMapper jobTaskMapper; + @Autowired + private JobMapper jobMapper; + @Override public TaskTypeEnum getTaskInstanceType() { return TaskTypeEnum.CLUSTER; @@ -25,6 +45,36 @@ public class ClusterClientCallbackHandler extends AbstractClientCallbackHandler @Override protected void doCallback(ClientCallbackContext context) { + if (context.getTaskStatus().equals(JobTaskStatusEnum.FAIL.getStatus())) { + JobTask jobTask = jobTaskMapper.selectById(context.getTaskId()); + Job job = jobMapper.selectById(context.getJobId()); + if (jobTask == null || job == null) { + return; + } + if (jobTask.getRetryCount() < job.getMaxRetryTimes()) { + // 选择重试的节点 + RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(context.getJobId().toString(), + context.getGroupName(), context.getNamespaceId(), job.getRouteKey()); + if (Objects.isNull(serverNode)) { + log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId()); + return; + } + String newClient = ClientInfoUtils.generate(serverNode); + // 更新重试次数 + jobTaskMapper.update(null, Wrappers.lambdaUpdate() + .set(JobTask::getClientInfo, newClient) + .setSql("retry_count = retry_count + 1") + .apply("retry_count < 3") + ); + RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); + realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient)); + ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); + // 执行重试操作 + actorRef.tell(realJobExecutor, actorRef); + // TODO 记录日志 + return; + } + } JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(context); jobExecutorResultDTO.setTaskId(context.getTaskId()); jobExecutorResultDTO.setMessage(context.getExecuteResult().getMessage()); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java index e2f7af63..bdb6efed 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java @@ -1,13 +1,26 @@ package com.aizuda.easy.retry.server.job.task.support.callback; import akka.actor.ActorRef; +import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; +import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; +import com.aizuda.easy.retry.server.common.util.ClientInfoUtils; +import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO; import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.Job; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.util.Optional; + /** * @author: www.byteblogs.com * @date : 2023-10-07 10:24 @@ -17,6 +30,11 @@ import org.springframework.stereotype.Component; @Slf4j public class ShardingClientCallbackHandler extends AbstractClientCallbackHandler { + @Autowired + private JobTaskMapper jobTaskMapper; + @Autowired + private JobMapper jobMapper; + @Override public TaskTypeEnum getTaskInstanceType() { return TaskTypeEnum.SHARDING; @@ -24,6 +42,33 @@ public class ShardingClientCallbackHandler extends AbstractClientCallbackHandler @Override protected void doCallback(final ClientCallbackContext context) { + if (context.getTaskStatus().equals(JobTaskStatusEnum.FAIL.getStatus())) { + JobTask jobTask = jobTaskMapper.selectById(context.getTaskId()); + Job job = jobMapper.selectById(context.getJobId()); + if (jobTask == null || job == null) { + return; + } + if (jobTask.getRetryCount() < job.getMaxRetryTimes()) { + Optional serverNode = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()).stream().findAny(); + if (!serverNode.isPresent()) { + log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId()); + return; + } + String newClient = ClientInfoUtils.generate(serverNode.get()); + // 更新重试次数 + jobTaskMapper.update(null, Wrappers.lambdaUpdate() + .set(JobTask::getClientInfo, newClient) + .setSql("retry_count = retry_count + 1") + .apply("retry_count < 3") + ); + RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); + realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient)); + ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); + actorRef.tell(realJobExecutor, actorRef); + // TODO 记录日志 + return; + } + } JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(context); jobExecutorResultDTO.setTaskId(context.getTaskId()); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/RequestClientActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/RequestClientActor.java index 13ce8152..599cd8a7 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/RequestClientActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/RequestClientActor.java @@ -20,13 +20,10 @@ import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO; import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO; import com.aizuda.easy.retry.server.job.task.support.callback.ClientCallbackContext; import com.aizuda.easy.retry.server.job.task.support.callback.ClientCallbackFactory; -import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; -import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; import com.github.rholder.retry.Attempt; import com.github.rholder.retry.RetryException; import com.github.rholder.retry.RetryListener; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; @@ -43,9 +40,6 @@ import java.util.Objects; @Slf4j public class RequestClientActor extends AbstractActor { - @Autowired - private JobTaskMapper jobTaskMapper; - @Override public Receive createReceive() { return receiveBuilder().match(RealJobExecutorDTO.class, realJobExecutorDTO -> { @@ -91,7 +85,7 @@ public class RequestClientActor extends AbstractActor { } catch (Exception e) { log.error("调用客户端失败.", e); - Throwable throwable = e; + Throwable throwable; if (e.getClass().isAssignableFrom(RetryException.class)) { RetryException re = (RetryException) e; throwable = re.getLastFailedAttempt().getExceptionCause(); @@ -104,23 +98,22 @@ public class RequestClientActor extends AbstractActor { public static class JobExecutorRetryListener implements RetryListener { private RealJobExecutorDTO realJobExecutorDTO; - private JobTaskMapper jobTaskMapper; - public JobExecutorRetryListener(final RealJobExecutorDTO realJobExecutorDTO, - final JobTaskMapper jobTaskMapper) { + public JobExecutorRetryListener(final RealJobExecutorDTO realJobExecutorDTO) { this.realJobExecutorDTO = realJobExecutorDTO; - this.jobTaskMapper = jobTaskMapper; } @Override public void onRetry(final Attempt attempt) { + // 负载节点 if (attempt.hasException()) { LogUtils.error(log, "任务调度失败. taskInstanceId:[{}] count:[{}]", realJobExecutorDTO.getTaskBatchId(), attempt.getAttemptNumber(), attempt.getExceptionCause()); - JobTask jobTask = new JobTask(); - jobTask.setId(realJobExecutorDTO.getTaskId()); - jobTask.setRetryCount((int) attempt.getAttemptNumber()); - jobTaskMapper.updateById(jobTask); + ClientCallbackHandler clientCallback = ClientCallbackFactory.getClientCallback(realJobExecutorDTO.getTaskType()); + ClientCallbackContext context = JobTaskConverter.INSTANCE.toClientCallbackContext(realJobExecutorDTO); + context.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus()); + context.setExecuteResult(ExecuteResult.failure(null, "网络请求失败")); + clientCallback.callback(context); } } } @@ -132,7 +125,7 @@ public class RequestClientActor extends AbstractActor { .failRetry(Boolean.TRUE) .retryTimes(realJobExecutorDTO.getMaxRetryTimes()) .retryInterval(realJobExecutorDTO.getRetryInterval()) - .retryListener(new JobExecutorRetryListener(realJobExecutorDTO, jobTaskMapper)) + .retryListener(new JobExecutorRetryListener(realJobExecutorDTO)) .client(JobRpcClient.class) .build(); } From 68fa60794e617527709072a9743704afce86e6e9 Mon Sep 17 00:00:00 2001 From: lizhongyuan3 Date: Wed, 3 Jan 2024 13:30:54 +0800 Subject: [PATCH 2/3] =?UTF-8?q?fix:2.6.0=201=EF=BC=9A=E7=BD=91=E7=BB=9C?= =?UTF-8?q?=E9=87=8D=E8=AF=95=E7=9A=84=E6=9B=B4=E6=96=B0=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E4=BF=AE=E6=94=B9=202=EF=BC=9A=E5=88=86=E7=89=87=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F=E4=B8=8B=E7=9A=84=E9=9A=8F=E6=9C=BA=E9=87=8D=E8=AF=95?= =?UTF-8?q?=E8=8A=82=E7=82=B9=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../utils/LambdaUpdateExpandWrapper.java | 43 +++++++++++++++++++ .../BroadcastClientCallbackHandler.java | 9 ++-- .../ClusterClientCallbackHandler.java | 9 ++-- .../ShardingClientCallbackHandler.java | 20 +++++---- 4 files changed, 65 insertions(+), 16 deletions(-) create mode 100644 easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/utils/LambdaUpdateExpandWrapper.java diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/utils/LambdaUpdateExpandWrapper.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/utils/LambdaUpdateExpandWrapper.java new file mode 100644 index 00000000..7254d432 --- /dev/null +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/utils/LambdaUpdateExpandWrapper.java @@ -0,0 +1,43 @@ +package com.aizuda.easy.retry.template.datasource.utils; + +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.baomidou.mybatisplus.core.toolkit.support.SFunction; + +/** + * LambdaUpdateWrapper 的拓展 支持指定列的自增、自减 + * + * @author lizhongyuan + */ + +public class LambdaUpdateExpandWrapper extends LambdaUpdateWrapper { + + public LambdaUpdateExpandWrapper(Class entityClass) { + super(entityClass); + } + + /** + * 指定列自增 + * + * @param columns 列引用 + * @param value 增长值 + */ + public LambdaUpdateExpandWrapper incrField(SFunction columns, Object value) { + String columnsToString = super.columnToString(columns); + String format = String.format("%s = %s + %s", columnsToString, columnsToString, formatSqlMaybeWithParam("{0}", value)); + setSql(format); + return this; + } + + /** + * 指定列自减 + * + * @param columns 列引用 + * @param value 减少值 + */ + public LambdaUpdateExpandWrapper descField(SFunction columns, Object value) { + String columnsToString = super.columnToString(columns); + String format = String.format("%s = %s - %s", columnsToString, columnsToString, formatSqlMaybeWithParam("{0}", value)); + setSql(format); + return this; + } +} \ No newline at end of file diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java index 0cf6d625..385dffe0 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java @@ -12,7 +12,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; -import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.aizuda.easy.retry.template.datasource.utils.LambdaUpdateExpandWrapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -46,9 +46,10 @@ public class BroadcastClientCallbackHandler extends AbstractClientCallbackHandle } if (jobTask.getRetryCount() < job.getMaxRetryTimes()) { // 更新重试次数 - jobTaskMapper.update(null, Wrappers.lambdaUpdate() - .setSql("retry_count = retry_count + 1") - .apply("retry_count < 3") + jobTaskMapper.update(null, new LambdaUpdateExpandWrapper<>(JobTask.class) + .incrField(JobTask::getRetryCount, 1) + .lt(JobTask::getRetryCount, job.getMaxRetryTimes()) + .eq(JobTask::getId, context.getTaskId()) ); RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo())); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClusterClientCallbackHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClusterClientCallbackHandler.java index e7f27eb9..4e3f2f2c 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClusterClientCallbackHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClusterClientCallbackHandler.java @@ -14,7 +14,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; -import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.aizuda.easy.retry.template.datasource.utils.LambdaUpdateExpandWrapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -61,10 +61,11 @@ public class ClusterClientCallbackHandler extends AbstractClientCallbackHandler } String newClient = ClientInfoUtils.generate(serverNode); // 更新重试次数 - jobTaskMapper.update(null, Wrappers.lambdaUpdate() + jobTaskMapper.update(null, new LambdaUpdateExpandWrapper<>(JobTask.class) + .incrField(JobTask::getRetryCount, 1) .set(JobTask::getClientInfo, newClient) - .setSql("retry_count = retry_count + 1") - .apply("retry_count < 3") + .lt(JobTask::getRetryCount, job.getMaxRetryTimes()) + .eq(JobTask::getId, context.getTaskId()) ); RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient)); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java index bdb6efed..0bb604b7 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java @@ -1,6 +1,8 @@ package com.aizuda.easy.retry.server.job.task.support.callback; import akka.actor.ActorRef; +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.RandomUtil; import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; @@ -14,12 +16,12 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; -import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.aizuda.easy.retry.template.datasource.utils.LambdaUpdateExpandWrapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.Optional; +import java.util.Set; /** * @author: www.byteblogs.com @@ -49,17 +51,19 @@ public class ShardingClientCallbackHandler extends AbstractClientCallbackHandler return; } if (jobTask.getRetryCount() < job.getMaxRetryTimes()) { - Optional serverNode = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()).stream().findAny(); - if (!serverNode.isPresent()) { + Set nodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()); + if (CollUtil.isEmpty(nodes)) { log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId()); return; } - String newClient = ClientInfoUtils.generate(serverNode.get()); + RegisterNodeInfo serverNode = RandomUtil.randomEle(nodes.toArray(new RegisterNodeInfo[0])); + String newClient = ClientInfoUtils.generate(serverNode); // 更新重试次数 - jobTaskMapper.update(null, Wrappers.lambdaUpdate() + jobTaskMapper.update(null, new LambdaUpdateExpandWrapper<>(JobTask.class) + .incrField(JobTask::getRetryCount, 1) .set(JobTask::getClientInfo, newClient) - .setSql("retry_count = retry_count + 1") - .apply("retry_count < 3") + .lt(JobTask::getRetryCount, job.getMaxRetryTimes()) + .eq(JobTask::getId, context.getTaskId()) ); RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient)); From fb8e746a33332bb6b569bb9f771cd400f911361f Mon Sep 17 00:00:00 2001 From: lizhongyuan3 Date: Wed, 3 Jan 2024 15:03:14 +0800 Subject: [PATCH 3/3] =?UTF-8?q?fix:2.6.0=201=EF=BC=9A=E5=BD=93=E6=9B=B4?= =?UTF-8?q?=E7=BB=86=E9=87=8D=E8=AF=95=E6=AC=A1=E6=95=B0=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E6=88=90=E5=8A=9F=E4=B9=8B=E5=90=8E=E5=9C=A8=E8=BF=9B=E8=A1=8C?= =?UTF-8?q?=E9=87=8D=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../datasource/persistence/po/JobTask.java | 5 +-- .../utils/LambdaUpdateExpandWrapper.java | 43 ------------------- .../BroadcastClientCallbackHandler.java | 22 ++++++---- .../ClusterClientCallbackHandler.java | 26 ++++++----- .../ShardingClientCallbackHandler.java | 28 +++++++----- 5 files changed, 47 insertions(+), 77 deletions(-) delete mode 100644 easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/utils/LambdaUpdateExpandWrapper.java diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/JobTask.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/JobTask.java index 12cbd487..e9facf8e 100644 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/JobTask.java +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/po/JobTask.java @@ -1,8 +1,6 @@ package com.aizuda.easy.retry.template.datasource.persistence.po; -import com.baomidou.mybatisplus.annotation.IdType; -import com.baomidou.mybatisplus.annotation.TableId; -import com.baomidou.mybatisplus.annotation.TableName; +import com.baomidou.mybatisplus.annotation.*; import java.io.Serializable; import java.time.LocalDateTime; @@ -64,6 +62,7 @@ public class JobTask implements Serializable { /** * 重试次数 */ + @TableField(value = "retry_count", update= "%s+1") private Integer retryCount; /** diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/utils/LambdaUpdateExpandWrapper.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/utils/LambdaUpdateExpandWrapper.java deleted file mode 100644 index 7254d432..00000000 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/utils/LambdaUpdateExpandWrapper.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.aizuda.easy.retry.template.datasource.utils; - -import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; -import com.baomidou.mybatisplus.core.toolkit.support.SFunction; - -/** - * LambdaUpdateWrapper 的拓展 支持指定列的自增、自减 - * - * @author lizhongyuan - */ - -public class LambdaUpdateExpandWrapper extends LambdaUpdateWrapper { - - public LambdaUpdateExpandWrapper(Class entityClass) { - super(entityClass); - } - - /** - * 指定列自增 - * - * @param columns 列引用 - * @param value 增长值 - */ - public LambdaUpdateExpandWrapper incrField(SFunction columns, Object value) { - String columnsToString = super.columnToString(columns); - String format = String.format("%s = %s + %s", columnsToString, columnsToString, formatSqlMaybeWithParam("{0}", value)); - setSql(format); - return this; - } - - /** - * 指定列自减 - * - * @param columns 列引用 - * @param value 减少值 - */ - public LambdaUpdateExpandWrapper descField(SFunction columns, Object value) { - String columnsToString = super.columnToString(columns); - String format = String.format("%s = %s - %s", columnsToString, columnsToString, formatSqlMaybeWithParam("{0}", value)); - setSql(format); - return this; - } -} \ No newline at end of file diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java index 385dffe0..56a23af3 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java @@ -12,7 +12,8 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; -import com.aizuda.easy.retry.template.datasource.utils.LambdaUpdateExpandWrapper; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.baomidou.mybatisplus.extension.toolkit.SqlHelper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -46,16 +47,19 @@ public class BroadcastClientCallbackHandler extends AbstractClientCallbackHandle } if (jobTask.getRetryCount() < job.getMaxRetryTimes()) { // 更新重试次数 - jobTaskMapper.update(null, new LambdaUpdateExpandWrapper<>(JobTask.class) - .incrField(JobTask::getRetryCount, 1) + JobTask updateJobTask = new JobTask(); + updateJobTask.setRetryCount(1); + boolean success = SqlHelper.retBool(jobTaskMapper.update(updateJobTask, Wrappers.lambdaUpdate() .lt(JobTask::getRetryCount, job.getMaxRetryTimes()) .eq(JobTask::getId, context.getTaskId()) - ); - RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); - realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo())); - ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); - actorRef.tell(realJobExecutor, actorRef); - // TODO 记录日志 + )); + if (success) { + RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); + realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo())); + ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); + actorRef.tell(realJobExecutor, actorRef); + // TODO 记录日志 + } return; } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClusterClientCallbackHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClusterClientCallbackHandler.java index 4e3f2f2c..3adc7488 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClusterClientCallbackHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClusterClientCallbackHandler.java @@ -14,7 +14,8 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; -import com.aizuda.easy.retry.template.datasource.utils.LambdaUpdateExpandWrapper; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.baomidou.mybatisplus.extension.toolkit.SqlHelper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -61,18 +62,21 @@ public class ClusterClientCallbackHandler extends AbstractClientCallbackHandler } String newClient = ClientInfoUtils.generate(serverNode); // 更新重试次数 - jobTaskMapper.update(null, new LambdaUpdateExpandWrapper<>(JobTask.class) - .incrField(JobTask::getRetryCount, 1) - .set(JobTask::getClientInfo, newClient) + JobTask updateJobTask = new JobTask(); + updateJobTask.setClientInfo(newClient); + updateJobTask.setRetryCount(1); + boolean success = SqlHelper.retBool(jobTaskMapper.update(updateJobTask, Wrappers.lambdaUpdate() .lt(JobTask::getRetryCount, job.getMaxRetryTimes()) .eq(JobTask::getId, context.getTaskId()) - ); - RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); - realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient)); - ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); - // 执行重试操作 - actorRef.tell(realJobExecutor, actorRef); - // TODO 记录日志 + )); + // 更新成功执行重试 + if (success) { + RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); + realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient)); + ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); + actorRef.tell(realJobExecutor, actorRef); + // TODO 记录日志 + } return; } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java index 0bb604b7..7e376943 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java @@ -16,7 +16,8 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; -import com.aizuda.easy.retry.template.datasource.utils.LambdaUpdateExpandWrapper; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.baomidou.mybatisplus.extension.toolkit.SqlHelper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -59,17 +60,22 @@ public class ShardingClientCallbackHandler extends AbstractClientCallbackHandler RegisterNodeInfo serverNode = RandomUtil.randomEle(nodes.toArray(new RegisterNodeInfo[0])); String newClient = ClientInfoUtils.generate(serverNode); // 更新重试次数 - jobTaskMapper.update(null, new LambdaUpdateExpandWrapper<>(JobTask.class) - .incrField(JobTask::getRetryCount, 1) - .set(JobTask::getClientInfo, newClient) - .lt(JobTask::getRetryCount, job.getMaxRetryTimes()) - .eq(JobTask::getId, context.getTaskId()) + JobTask updateJobTask = new JobTask(); + updateJobTask.setClientInfo(newClient); + updateJobTask.setRetryCount(1); + boolean success = SqlHelper.retBool( + jobTaskMapper.update(updateJobTask, Wrappers.lambdaUpdate() + .lt(JobTask::getRetryCount, job.getMaxRetryTimes()) + .eq(JobTask::getId, context.getTaskId()) + ) ); - RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); - realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient)); - ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); - actorRef.tell(realJobExecutor, actorRef); - // TODO 记录日志 + if (success) { + RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); + realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient)); + ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); + actorRef.tell(realJobExecutor, actorRef); + // TODO 记录日志 + } return; } }