From 5d8508480a39ec1dc4ead06cb74543ab4a16b828 Mon Sep 17 00:00:00 2001 From: lizhongyuan3 Date: Tue, 2 Jan 2024 16:58:16 +0800 Subject: [PATCH] =?UTF-8?q?feat:2.6.0=201=EF=BC=9A=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E7=BD=91=E7=BB=9C=E5=A4=B1=E8=B4=A5=EF=BC=8C=E5=AE=A2=E6=88=B7?= =?UTF-8?q?=E7=AB=AF=E4=B8=8A=E6=8A=A5=E5=A4=B1=E8=B4=A5=EF=BC=8C=E8=AF=B7?= =?UTF-8?q?=E6=B1=82=E5=AE=A2=E6=88=B7=E7=AB=AF=E8=BF=94=E5=9B=9E=E5=A4=B1?= =?UTF-8?q?=E8=B4=A5=E7=9A=84=E9=87=8D=E8=AF=95=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../BroadcastClientCallbackHandler.java | 35 +++++++++++++ .../ClusterClientCallbackHandler.java | 51 +++++++++++++++++++ .../ShardingClientCallbackHandler.java | 46 +++++++++++++++++ .../executor/job/RequestClientActor.java | 25 ++++----- 4 files changed, 141 insertions(+), 16 deletions(-) diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java index 6fb351f26..d3e8c1944 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/BroadcastClientCallbackHandler.java @@ -1,11 +1,21 @@ package com.aizuda.easy.retry.server.job.task.support.callback; import akka.actor.ActorRef; +import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.util.ClientInfoUtils; +import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO; +import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.Job; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** @@ -17,6 +27,11 @@ import org.springframework.stereotype.Component; @Slf4j public class BroadcastClientCallbackHandler extends AbstractClientCallbackHandler { + @Autowired + private JobTaskMapper jobTaskMapper; + @Autowired + private JobMapper jobMapper; + @Override public JobTaskTypeEnum getTaskInstanceType() { return JobTaskTypeEnum.BROADCAST; @@ -24,6 +39,26 @@ public class BroadcastClientCallbackHandler extends AbstractClientCallbackHandle @Override protected void doCallback(final ClientCallbackContext context) { + if (context.getTaskStatus().equals(JobTaskStatusEnum.FAIL.getStatus())) { + JobTask jobTask = jobTaskMapper.selectById(context.getTaskId()); + Job job = jobMapper.selectById(context.getJobId()); + if (jobTask == null || job == null) { + return; + } + if (jobTask.getRetryCount() < job.getMaxRetryTimes()) { + // 更新重试次数 + jobTaskMapper.update(null, Wrappers.lambdaUpdate() + .setSql("retry_count = retry_count + 1") + .apply("retry_count < 3") + ); + RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); + realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo())); + ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); + actorRef.tell(realJobExecutor, actorRef); + // TODO 记录日志 + return; + } + } JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(context); jobExecutorResultDTO.setTaskId(context.getTaskId()); jobExecutorResultDTO.setMessage(context.getExecuteResult().getMessage()); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClusterClientCallbackHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClusterClientCallbackHandler.java index 48503b974..e44cca00e 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClusterClientCallbackHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ClusterClientCallbackHandler.java @@ -1,13 +1,27 @@ package com.aizuda.easy.retry.server.job.task.support.callback; import akka.actor.ActorRef; +import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; +import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler; +import com.aizuda.easy.retry.server.common.util.ClientInfoUtils; +import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO; +import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.Job; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.util.Objects; + /** * @author www.byteblogs.com * @date 2023-10-03 23:12:12 @@ -17,6 +31,13 @@ import org.springframework.stereotype.Component; @Slf4j public class ClusterClientCallbackHandler extends AbstractClientCallbackHandler { + @Autowired + private ClientNodeAllocateHandler clientNodeAllocateHandler; + @Autowired + private JobTaskMapper jobTaskMapper; + @Autowired + private JobMapper jobMapper; + @Override public JobTaskTypeEnum getTaskInstanceType() { return JobTaskTypeEnum.CLUSTER; @@ -25,6 +46,36 @@ public class ClusterClientCallbackHandler extends AbstractClientCallbackHandler @Override protected void doCallback(ClientCallbackContext context) { + if (context.getTaskStatus().equals(JobTaskStatusEnum.FAIL.getStatus())) { + JobTask jobTask = jobTaskMapper.selectById(context.getTaskId()); + Job job = jobMapper.selectById(context.getJobId()); + if (jobTask == null || job == null) { + return; + } + if (jobTask.getRetryCount() < job.getMaxRetryTimes()) { + // 选择重试的节点 + RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(context.getJobId().toString(), + context.getGroupName(), context.getNamespaceId(), job.getRouteKey()); + if (Objects.isNull(serverNode)) { + log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId()); + return; + } + String newClient = ClientInfoUtils.generate(serverNode); + // 更新重试次数 + jobTaskMapper.update(null, Wrappers.lambdaUpdate() + .set(JobTask::getClientInfo, newClient) + .setSql("retry_count = retry_count + 1") + .apply("retry_count < 3") + ); + RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); + realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient)); + ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); + // 执行重试操作 + actorRef.tell(realJobExecutor, actorRef); + // TODO 记录日志 + return; + } + } JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(context); jobExecutorResultDTO.setTaskId(context.getTaskId()); jobExecutorResultDTO.setMessage(context.getExecuteResult().getMessage()); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java index a41c53bf1..cfb9295a8 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/callback/ShardingClientCallbackHandler.java @@ -1,13 +1,27 @@ package com.aizuda.easy.retry.server.job.task.support.callback; import akka.actor.ActorRef; +import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; +import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; +import com.aizuda.easy.retry.server.common.util.ClientInfoUtils; +import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO; +import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; +import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.Job; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.util.Optional; + /** * @author: www.byteblogs.com * @date : 2023-10-07 10:24 @@ -17,6 +31,11 @@ import org.springframework.stereotype.Component; @Slf4j public class ShardingClientCallbackHandler extends AbstractClientCallbackHandler { + @Autowired + private JobTaskMapper jobTaskMapper; + @Autowired + private JobMapper jobMapper; + @Override public JobTaskTypeEnum getTaskInstanceType() { return JobTaskTypeEnum.SHARDING; @@ -24,6 +43,33 @@ public class ShardingClientCallbackHandler extends AbstractClientCallbackHandler @Override protected void doCallback(final ClientCallbackContext context) { + if (context.getTaskStatus().equals(JobTaskStatusEnum.FAIL.getStatus())) { + JobTask jobTask = jobTaskMapper.selectById(context.getTaskId()); + Job job = jobMapper.selectById(context.getJobId()); + if (jobTask == null || job == null) { + return; + } + if (jobTask.getRetryCount() < job.getMaxRetryTimes()) { + Optional serverNode = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()).stream().findAny(); + if (!serverNode.isPresent()) { + log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId()); + return; + } + String newClient = ClientInfoUtils.generate(serverNode.get()); + // 更新重试次数 + jobTaskMapper.update(null, Wrappers.lambdaUpdate() + .set(JobTask::getClientInfo, newClient) + .setSql("retry_count = retry_count + 1") + .apply("retry_count < 3") + ); + RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask); + realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient)); + ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); + actorRef.tell(realJobExecutor, actorRef); + // TODO 记录日志 + return; + } + } JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(context); jobExecutorResultDTO.setTaskId(context.getTaskId()); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/job/RequestClientActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/job/RequestClientActor.java index 86fae3e21..6b535f358 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/job/RequestClientActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/job/RequestClientActor.java @@ -20,13 +20,10 @@ import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO; import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO; import com.aizuda.easy.retry.server.job.task.support.callback.ClientCallbackContext; import com.aizuda.easy.retry.server.job.task.support.callback.ClientCallbackFactory; -import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; -import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; import com.github.rholder.retry.Attempt; import com.github.rholder.retry.RetryException; import com.github.rholder.retry.RetryListener; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; @@ -43,9 +40,6 @@ import java.util.Objects; @Slf4j public class RequestClientActor extends AbstractActor { - @Autowired - private JobTaskMapper jobTaskMapper; - @Override public Receive createReceive() { return receiveBuilder().match(RealJobExecutorDTO.class, realJobExecutorDTO -> { @@ -103,24 +97,23 @@ public class RequestClientActor extends AbstractActor { public static class JobExecutorRetryListener implements RetryListener { - private final RealJobExecutorDTO realJobExecutorDTO; - private final JobTaskMapper jobTaskMapper; + private RealJobExecutorDTO realJobExecutorDTO; - public JobExecutorRetryListener(final RealJobExecutorDTO realJobExecutorDTO, - final JobTaskMapper jobTaskMapper) { + public JobExecutorRetryListener(final RealJobExecutorDTO realJobExecutorDTO) { this.realJobExecutorDTO = realJobExecutorDTO; - this.jobTaskMapper = jobTaskMapper; } @Override public void onRetry(final Attempt attempt) { + // 负载节点 if (attempt.hasException()) { LogUtils.error(log, "任务调度失败. taskInstanceId:[{}] count:[{}]", realJobExecutorDTO.getTaskBatchId(), attempt.getAttemptNumber(), attempt.getExceptionCause()); - JobTask jobTask = new JobTask(); - jobTask.setId(realJobExecutorDTO.getTaskId()); - jobTask.setRetryCount((int) attempt.getAttemptNumber()); - jobTaskMapper.updateById(jobTask); + ClientCallbackHandler clientCallback = ClientCallbackFactory.getClientCallback(realJobExecutorDTO.getTaskType()); + ClientCallbackContext context = JobTaskConverter.INSTANCE.toClientCallbackContext(realJobExecutorDTO); + context.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus()); + context.setExecuteResult(ExecuteResult.failure(null, "网络请求失败")); + clientCallback.callback(context); } } } @@ -132,7 +125,7 @@ public class RequestClientActor extends AbstractActor { .failRetry(Boolean.TRUE) .retryTimes(realJobExecutorDTO.getMaxRetryTimes()) .retryInterval(realJobExecutorDTO.getRetryInterval()) - .retryListener(new JobExecutorRetryListener(realJobExecutorDTO, jobTaskMapper)) + .retryListener(new JobExecutorRetryListener(realJobExecutorDTO)) .client(JobRpcClient.class) .build(); }