feat:2.6.0

1:增加网络失败,客户端上报失败,请求客户端返回失败的重试机制
This commit is contained in:
lizhongyuan3 2024-01-02 16:58:16 +08:00 committed by byteblogs168
parent 539830a268
commit 5d8508480a
4 changed files with 141 additions and 16 deletions

View File

@ -1,11 +1,21 @@
package com.aizuda.easy.retry.server.job.task.support.callback;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO;
import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
@ -17,6 +27,11 @@ import org.springframework.stereotype.Component;
@Slf4j
public class BroadcastClientCallbackHandler extends AbstractClientCallbackHandler {
@Autowired
private JobTaskMapper jobTaskMapper;
@Autowired
private JobMapper jobMapper;
@Override
public JobTaskTypeEnum getTaskInstanceType() {
return JobTaskTypeEnum.BROADCAST;
@ -24,6 +39,26 @@ public class BroadcastClientCallbackHandler extends AbstractClientCallbackHandle
@Override
protected void doCallback(final ClientCallbackContext context) {
if (context.getTaskStatus().equals(JobTaskStatusEnum.FAIL.getStatus())) {
JobTask jobTask = jobTaskMapper.selectById(context.getTaskId());
Job job = jobMapper.selectById(context.getJobId());
if (jobTask == null || job == null) {
return;
}
if (jobTask.getRetryCount() < job.getMaxRetryTimes()) {
// 更新重试次数
jobTaskMapper.update(null, Wrappers.<JobTask>lambdaUpdate()
.setSql("retry_count = retry_count + 1")
.apply("retry_count < 3")
);
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask);
realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo()));
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
actorRef.tell(realJobExecutor, actorRef);
// TODO 记录日志
return;
}
}
JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(context);
jobExecutorResultDTO.setTaskId(context.getTaskId());
jobExecutorResultDTO.setMessage(context.getExecuteResult().getMessage());

View File

@ -1,13 +1,27 @@
package com.aizuda.easy.retry.server.job.task.support.callback;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO;
import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Objects;
/**
* @author www.byteblogs.com
* @date 2023-10-03 23:12:12
@ -17,6 +31,13 @@ import org.springframework.stereotype.Component;
@Slf4j
public class ClusterClientCallbackHandler extends AbstractClientCallbackHandler {
@Autowired
private ClientNodeAllocateHandler clientNodeAllocateHandler;
@Autowired
private JobTaskMapper jobTaskMapper;
@Autowired
private JobMapper jobMapper;
@Override
public JobTaskTypeEnum getTaskInstanceType() {
return JobTaskTypeEnum.CLUSTER;
@ -25,6 +46,36 @@ public class ClusterClientCallbackHandler extends AbstractClientCallbackHandler
@Override
protected void doCallback(ClientCallbackContext context) {
if (context.getTaskStatus().equals(JobTaskStatusEnum.FAIL.getStatus())) {
JobTask jobTask = jobTaskMapper.selectById(context.getTaskId());
Job job = jobMapper.selectById(context.getJobId());
if (jobTask == null || job == null) {
return;
}
if (jobTask.getRetryCount() < job.getMaxRetryTimes()) {
// 选择重试的节点
RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(context.getJobId().toString(),
context.getGroupName(), context.getNamespaceId(), job.getRouteKey());
if (Objects.isNull(serverNode)) {
log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
return;
}
String newClient = ClientInfoUtils.generate(serverNode);
// 更新重试次数
jobTaskMapper.update(null, Wrappers.<JobTask>lambdaUpdate()
.set(JobTask::getClientInfo, newClient)
.setSql("retry_count = retry_count + 1")
.apply("retry_count < 3")
);
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask);
realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient));
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
// 执行重试操作
actorRef.tell(realJobExecutor, actorRef);
// TODO 记录日志
return;
}
}
JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(context);
jobExecutorResultDTO.setTaskId(context.getTaskId());
jobExecutorResultDTO.setMessage(context.getExecuteResult().getMessage());

View File

@ -1,13 +1,27 @@
package com.aizuda.easy.retry.server.job.task.support.callback;
import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO;
import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* @author: www.byteblogs.com
* @date : 2023-10-07 10:24
@ -17,6 +31,11 @@ import org.springframework.stereotype.Component;
@Slf4j
public class ShardingClientCallbackHandler extends AbstractClientCallbackHandler {
@Autowired
private JobTaskMapper jobTaskMapper;
@Autowired
private JobMapper jobMapper;
@Override
public JobTaskTypeEnum getTaskInstanceType() {
return JobTaskTypeEnum.SHARDING;
@ -24,6 +43,33 @@ public class ShardingClientCallbackHandler extends AbstractClientCallbackHandler
@Override
protected void doCallback(final ClientCallbackContext context) {
if (context.getTaskStatus().equals(JobTaskStatusEnum.FAIL.getStatus())) {
JobTask jobTask = jobTaskMapper.selectById(context.getTaskId());
Job job = jobMapper.selectById(context.getJobId());
if (jobTask == null || job == null) {
return;
}
if (jobTask.getRetryCount() < job.getMaxRetryTimes()) {
Optional<RegisterNodeInfo> serverNode = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId()).stream().findAny();
if (!serverNode.isPresent()) {
log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
return;
}
String newClient = ClientInfoUtils.generate(serverNode.get());
// 更新重试次数
jobTaskMapper.update(null, Wrappers.<JobTask>lambdaUpdate()
.set(JobTask::getClientInfo, newClient)
.setSql("retry_count = retry_count + 1")
.apply("retry_count < 3")
);
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask);
realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient));
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
actorRef.tell(realJobExecutor, actorRef);
// TODO 记录日志
return;
}
}
JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(context);
jobExecutorResultDTO.setTaskId(context.getTaskId());

View File

@ -20,13 +20,10 @@ import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO;
import com.aizuda.easy.retry.server.job.task.support.callback.ClientCallbackContext;
import com.aizuda.easy.retry.server.job.task.support.callback.ClientCallbackFactory;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
@ -43,9 +40,6 @@ import java.util.Objects;
@Slf4j
public class RequestClientActor extends AbstractActor {
@Autowired
private JobTaskMapper jobTaskMapper;
@Override
public Receive createReceive() {
return receiveBuilder().match(RealJobExecutorDTO.class, realJobExecutorDTO -> {
@ -103,24 +97,23 @@ public class RequestClientActor extends AbstractActor {
public static class JobExecutorRetryListener implements RetryListener {
private final RealJobExecutorDTO realJobExecutorDTO;
private final JobTaskMapper jobTaskMapper;
private RealJobExecutorDTO realJobExecutorDTO;
public JobExecutorRetryListener(final RealJobExecutorDTO realJobExecutorDTO,
final JobTaskMapper jobTaskMapper) {
public JobExecutorRetryListener(final RealJobExecutorDTO realJobExecutorDTO) {
this.realJobExecutorDTO = realJobExecutorDTO;
this.jobTaskMapper = jobTaskMapper;
}
@Override
public <V> void onRetry(final Attempt<V> attempt) {
// 负载节点
if (attempt.hasException()) {
LogUtils.error(log, "任务调度失败. taskInstanceId:[{}] count:[{}]",
realJobExecutorDTO.getTaskBatchId(), attempt.getAttemptNumber(), attempt.getExceptionCause());
JobTask jobTask = new JobTask();
jobTask.setId(realJobExecutorDTO.getTaskId());
jobTask.setRetryCount((int) attempt.getAttemptNumber());
jobTaskMapper.updateById(jobTask);
ClientCallbackHandler clientCallback = ClientCallbackFactory.getClientCallback(realJobExecutorDTO.getTaskType());
ClientCallbackContext context = JobTaskConverter.INSTANCE.toClientCallbackContext(realJobExecutorDTO);
context.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus());
context.setExecuteResult(ExecuteResult.failure(null, "网络请求失败"));
clientCallback.callback(context);
}
}
}
@ -132,7 +125,7 @@ public class RequestClientActor extends AbstractActor {
.failRetry(Boolean.TRUE)
.retryTimes(realJobExecutorDTO.getMaxRetryTimes())
.retryInterval(realJobExecutorDTO.getRetryInterval())
.retryListener(new JobExecutorRetryListener(realJobExecutorDTO, jobTaskMapper))
.retryListener(new JobExecutorRetryListener(realJobExecutorDTO))
.client(JobRpcClient.class)
.build();
}