Pre Merge pull request !39 from 李中原/dev_2.6.0
This commit is contained in:
commit
e4ddc1df82
@ -1,8 +1,6 @@
|
||||
package com.aizuda.easy.retry.template.datasource.persistence.po;
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.IdType;
|
||||
import com.baomidou.mybatisplus.annotation.TableId;
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
import com.baomidou.mybatisplus.annotation.*;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.time.LocalDateTime;
|
||||
@ -64,6 +62,7 @@ public class JobTask implements Serializable {
|
||||
/**
|
||||
* 重试次数
|
||||
*/
|
||||
@TableField(value = "retry_count", update= "%s+1")
|
||||
private Integer retryCount;
|
||||
|
||||
/**
|
||||
|
@ -1,11 +1,21 @@
|
||||
package com.aizuda.easy.retry.server.job.task.support.callback;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
|
||||
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
|
||||
import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO;
|
||||
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
|
||||
import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO;
|
||||
import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
|
||||
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||
import com.baomidou.mybatisplus.extension.toolkit.SqlHelper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
@ -17,6 +27,11 @@ import org.springframework.stereotype.Component;
|
||||
@Slf4j
|
||||
public class BroadcastClientCallbackHandler extends AbstractClientCallbackHandler {
|
||||
|
||||
@Autowired
|
||||
private JobTaskMapper jobTaskMapper;
|
||||
@Autowired
|
||||
private JobMapper jobMapper;
|
||||
|
||||
@Override
|
||||
public TaskTypeEnum getTaskInstanceType() {
|
||||
return TaskTypeEnum.BROADCAST;
|
||||
@ -24,6 +39,30 @@ public class BroadcastClientCallbackHandler extends AbstractClientCallbackHandle
|
||||
|
||||
@Override
|
||||
protected void doCallback(final ClientCallbackContext context) {
|
||||
if (context.getTaskStatus().equals(JobTaskStatusEnum.FAIL.getStatus())) {
|
||||
JobTask jobTask = jobTaskMapper.selectById(context.getTaskId());
|
||||
Job job = jobMapper.selectById(context.getJobId());
|
||||
if (jobTask == null || job == null) {
|
||||
return;
|
||||
}
|
||||
if (jobTask.getRetryCount() < job.getMaxRetryTimes()) {
|
||||
// 更新重试次数
|
||||
JobTask updateJobTask = new JobTask();
|
||||
updateJobTask.setRetryCount(1);
|
||||
boolean success = SqlHelper.retBool(jobTaskMapper.update(updateJobTask, Wrappers.<JobTask>lambdaUpdate()
|
||||
.lt(JobTask::getRetryCount, job.getMaxRetryTimes())
|
||||
.eq(JobTask::getId, context.getTaskId())
|
||||
));
|
||||
if (success) {
|
||||
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask);
|
||||
realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo()));
|
||||
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
|
||||
actorRef.tell(realJobExecutor, actorRef);
|
||||
// TODO 记录日志
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(context);
|
||||
jobExecutorResultDTO.setTaskId(context.getTaskId());
|
||||
jobExecutorResultDTO.setMessage(context.getExecuteResult().getMessage());
|
||||
|
@ -1,13 +1,27 @@
|
||||
package com.aizuda.easy.retry.server.job.task.support.callback;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
|
||||
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
|
||||
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
|
||||
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
|
||||
import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO;
|
||||
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
|
||||
import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO;
|
||||
import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
|
||||
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||
import com.baomidou.mybatisplus.extension.toolkit.SqlHelper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* @author www.byteblogs.com
|
||||
* @date 2023-10-03 23:12:12
|
||||
@ -17,6 +31,13 @@ import org.springframework.stereotype.Component;
|
||||
@Slf4j
|
||||
public class ClusterClientCallbackHandler extends AbstractClientCallbackHandler {
|
||||
|
||||
@Autowired
|
||||
private ClientNodeAllocateHandler clientNodeAllocateHandler;
|
||||
@Autowired
|
||||
private JobTaskMapper jobTaskMapper;
|
||||
@Autowired
|
||||
private JobMapper jobMapper;
|
||||
|
||||
@Override
|
||||
public TaskTypeEnum getTaskInstanceType() {
|
||||
return TaskTypeEnum.CLUSTER;
|
||||
@ -25,6 +46,40 @@ public class ClusterClientCallbackHandler extends AbstractClientCallbackHandler
|
||||
@Override
|
||||
protected void doCallback(ClientCallbackContext context) {
|
||||
|
||||
if (context.getTaskStatus().equals(JobTaskStatusEnum.FAIL.getStatus())) {
|
||||
JobTask jobTask = jobTaskMapper.selectById(context.getTaskId());
|
||||
Job job = jobMapper.selectById(context.getJobId());
|
||||
if (jobTask == null || job == null) {
|
||||
return;
|
||||
}
|
||||
if (jobTask.getRetryCount() < job.getMaxRetryTimes()) {
|
||||
// 选择重试的节点
|
||||
RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(context.getJobId().toString(),
|
||||
context.getGroupName(), context.getNamespaceId(), job.getRouteKey());
|
||||
if (Objects.isNull(serverNode)) {
|
||||
log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
|
||||
return;
|
||||
}
|
||||
String newClient = ClientInfoUtils.generate(serverNode);
|
||||
// 更新重试次数
|
||||
JobTask updateJobTask = new JobTask();
|
||||
updateJobTask.setClientInfo(newClient);
|
||||
updateJobTask.setRetryCount(1);
|
||||
boolean success = SqlHelper.retBool(jobTaskMapper.update(updateJobTask, Wrappers.<JobTask>lambdaUpdate()
|
||||
.lt(JobTask::getRetryCount, job.getMaxRetryTimes())
|
||||
.eq(JobTask::getId, context.getTaskId())
|
||||
));
|
||||
// 更新成功执行重试
|
||||
if (success) {
|
||||
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask);
|
||||
realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient));
|
||||
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
|
||||
actorRef.tell(realJobExecutor, actorRef);
|
||||
// TODO 记录日志
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(context);
|
||||
jobExecutorResultDTO.setTaskId(context.getTaskId());
|
||||
jobExecutorResultDTO.setMessage(context.getExecuteResult().getMessage());
|
||||
|
@ -1,13 +1,29 @@
|
||||
package com.aizuda.easy.retry.server.job.task.support.callback;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.util.RandomUtil;
|
||||
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
|
||||
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
|
||||
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
|
||||
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
|
||||
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
|
||||
import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO;
|
||||
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
|
||||
import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO;
|
||||
import com.aizuda.easy.retry.common.core.enums.TaskTypeEnum;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
|
||||
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||
import com.baomidou.mybatisplus.extension.toolkit.SqlHelper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* @author: www.byteblogs.com
|
||||
* @date : 2023-10-07 10:24
|
||||
@ -17,6 +33,11 @@ import org.springframework.stereotype.Component;
|
||||
@Slf4j
|
||||
public class ShardingClientCallbackHandler extends AbstractClientCallbackHandler {
|
||||
|
||||
@Autowired
|
||||
private JobTaskMapper jobTaskMapper;
|
||||
@Autowired
|
||||
private JobMapper jobMapper;
|
||||
|
||||
@Override
|
||||
public TaskTypeEnum getTaskInstanceType() {
|
||||
return TaskTypeEnum.SHARDING;
|
||||
@ -24,6 +45,40 @@ public class ShardingClientCallbackHandler extends AbstractClientCallbackHandler
|
||||
|
||||
@Override
|
||||
protected void doCallback(final ClientCallbackContext context) {
|
||||
if (context.getTaskStatus().equals(JobTaskStatusEnum.FAIL.getStatus())) {
|
||||
JobTask jobTask = jobTaskMapper.selectById(context.getTaskId());
|
||||
Job job = jobMapper.selectById(context.getJobId());
|
||||
if (jobTask == null || job == null) {
|
||||
return;
|
||||
}
|
||||
if (jobTask.getRetryCount() < job.getMaxRetryTimes()) {
|
||||
Set<RegisterNodeInfo> nodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId());
|
||||
if (CollUtil.isEmpty(nodes)) {
|
||||
log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
|
||||
return;
|
||||
}
|
||||
RegisterNodeInfo serverNode = RandomUtil.randomEle(nodes.toArray(new RegisterNodeInfo[0]));
|
||||
String newClient = ClientInfoUtils.generate(serverNode);
|
||||
// 更新重试次数
|
||||
JobTask updateJobTask = new JobTask();
|
||||
updateJobTask.setClientInfo(newClient);
|
||||
updateJobTask.setRetryCount(1);
|
||||
boolean success = SqlHelper.retBool(
|
||||
jobTaskMapper.update(updateJobTask, Wrappers.<JobTask>lambdaUpdate()
|
||||
.lt(JobTask::getRetryCount, job.getMaxRetryTimes())
|
||||
.eq(JobTask::getId, context.getTaskId())
|
||||
)
|
||||
);
|
||||
if (success) {
|
||||
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask);
|
||||
realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient));
|
||||
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
|
||||
actorRef.tell(realJobExecutor, actorRef);
|
||||
// TODO 记录日志
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(context);
|
||||
jobExecutorResultDTO.setTaskId(context.getTaskId());
|
||||
|
@ -20,13 +20,10 @@ import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
|
||||
import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO;
|
||||
import com.aizuda.easy.retry.server.job.task.support.callback.ClientCallbackContext;
|
||||
import com.aizuda.easy.retry.server.job.task.support.callback.ClientCallbackFactory;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
|
||||
import com.github.rholder.retry.Attempt;
|
||||
import com.github.rholder.retry.RetryException;
|
||||
import com.github.rholder.retry.RetryListener;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||
import org.springframework.context.annotation.Scope;
|
||||
import org.springframework.stereotype.Component;
|
||||
@ -43,9 +40,6 @@ import java.util.Objects;
|
||||
@Slf4j
|
||||
public class RequestClientActor extends AbstractActor {
|
||||
|
||||
@Autowired
|
||||
private JobTaskMapper jobTaskMapper;
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder().match(RealJobExecutorDTO.class, realJobExecutorDTO -> {
|
||||
@ -91,7 +85,7 @@ public class RequestClientActor extends AbstractActor {
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("调用客户端失败.", e);
|
||||
Throwable throwable = e;
|
||||
Throwable throwable;
|
||||
if (e.getClass().isAssignableFrom(RetryException.class)) {
|
||||
RetryException re = (RetryException) e;
|
||||
throwable = re.getLastFailedAttempt().getExceptionCause();
|
||||
@ -104,23 +98,22 @@ public class RequestClientActor extends AbstractActor {
|
||||
public static class JobExecutorRetryListener implements RetryListener {
|
||||
|
||||
private RealJobExecutorDTO realJobExecutorDTO;
|
||||
private JobTaskMapper jobTaskMapper;
|
||||
|
||||
public JobExecutorRetryListener(final RealJobExecutorDTO realJobExecutorDTO,
|
||||
final JobTaskMapper jobTaskMapper) {
|
||||
public JobExecutorRetryListener(final RealJobExecutorDTO realJobExecutorDTO) {
|
||||
this.realJobExecutorDTO = realJobExecutorDTO;
|
||||
this.jobTaskMapper = jobTaskMapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> void onRetry(final Attempt<V> attempt) {
|
||||
// 负载节点
|
||||
if (attempt.hasException()) {
|
||||
LogUtils.error(log, "任务调度失败. taskInstanceId:[{}] count:[{}]",
|
||||
realJobExecutorDTO.getTaskBatchId(), attempt.getAttemptNumber(), attempt.getExceptionCause());
|
||||
JobTask jobTask = new JobTask();
|
||||
jobTask.setId(realJobExecutorDTO.getTaskId());
|
||||
jobTask.setRetryCount((int) attempt.getAttemptNumber());
|
||||
jobTaskMapper.updateById(jobTask);
|
||||
ClientCallbackHandler clientCallback = ClientCallbackFactory.getClientCallback(realJobExecutorDTO.getTaskType());
|
||||
ClientCallbackContext context = JobTaskConverter.INSTANCE.toClientCallbackContext(realJobExecutorDTO);
|
||||
context.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus());
|
||||
context.setExecuteResult(ExecuteResult.failure(null, "网络请求失败"));
|
||||
clientCallback.callback(context);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -132,7 +125,7 @@ public class RequestClientActor extends AbstractActor {
|
||||
.failRetry(Boolean.TRUE)
|
||||
.retryTimes(realJobExecutorDTO.getMaxRetryTimes())
|
||||
.retryInterval(realJobExecutorDTO.getRetryInterval())
|
||||
.retryListener(new JobExecutorRetryListener(realJobExecutorDTO, jobTaskMapper))
|
||||
.retryListener(new JobExecutorRetryListener(realJobExecutorDTO))
|
||||
.client(JobRpcClient.class)
|
||||
.build();
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user