feat:2.4.0

1. 重试扫描器优化未完成
This commit is contained in:
byteblogs168 2023-10-25 23:13:02 +08:00
parent d5409843e3
commit b3e92792a5
7 changed files with 105 additions and 58 deletions

View File

@ -25,7 +25,7 @@ public class SystemProperties {
/**
* 重试每次拉取的条数
*/
private int retryPullPageSize = 100;
private int retryPullPageSize = 1000;
/**
* netty 端口

View File

@ -19,8 +19,8 @@ public class PartitionTaskUtils {
}
public static long process(
Function<Long, List<? extends PartitionTask>> dataSource, Consumer<List<? extends PartitionTask>> task,
long startId) {
Function<Long, List<? extends PartitionTask>> dataSource, Consumer<List<? extends PartitionTask>> task,
long startId) {
int total = 0;
do {
List<? extends PartitionTask> products = dataSource.apply(startId);
@ -39,8 +39,8 @@ public class PartitionTaskUtils {
}
private static long maxId(List<? extends PartitionTask> products) {
Optional<Long> max = products.stream().map(PartitionTask::getId).max(Long::compareTo);
return max.orElse(-1L) + 1;
// 使用的地方必须按照id正序排序
return products.get(products.size() - 1).getId() + 1;
}
}

View File

@ -12,6 +12,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMappe
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -36,10 +37,10 @@ public class JobTaskBatchHandler {
new LambdaQueryWrapper<JobTask>().select(JobTask::getTaskStatus)
.eq(JobTask::getTaskBatchId, taskBatchId));
if (jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) {
return false;
}
if (jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) {
return false;
}
long failCount = jobTasks.stream().filter(jobTask -> jobTask.getTaskStatus() == JobTaskBatchStatusEnum.FAIL.getStatus()).count();
long stopCount = jobTasks.stream().filter(jobTask -> jobTask.getTaskStatus() == JobTaskBatchStatusEnum.STOP.getStatus()).count();
@ -56,9 +57,12 @@ public class JobTaskBatchHandler {
if (Objects.nonNull(jobOperationReasonEnum)) {
jobTaskBatch.setOperationReason(jobOperationReasonEnum.getReason());
}
jobTaskBatchMapper.updateById(jobTaskBatch);
return true;
return 1 == jobTaskBatchMapper.update(jobTaskBatch,
new LambdaUpdateWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getId, taskBatchId)
.in(JobTaskBatch::getTaskBatchStatus, JobTaskStatusEnum.NOT_COMPLETE)
);
}
}

View File

@ -0,0 +1,15 @@
package com.aizuda.easy.retry.server.retry.task.dto;
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* @author www.byteblogs.com
* @date 2023-10-25 22:23:24
* @since 2.4.0
*/
@EqualsAndHashCode(callSuper = true)
@Data
public class RetryPartitionTask extends PartitionTask {
}

View File

@ -1,6 +1,7 @@
package com.aizuda.easy.retry.server.retry.task.support;
import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO;
import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask;
import com.aizuda.easy.retry.server.retry.task.generator.task.TaskContext;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
@ -29,9 +30,9 @@ public interface RetryTaskConverter {
})
RetryTask toRetryTask(RetryDeadLetter retryDeadLetter);
// RetryTask toRetryTask(RetryTaskSaveRequestVO retryTaskSaveRequestVO);
List<RetryTask> toRetryTaskList(List<RetryTaskDTO> retryTaskDTOList);
RetryTask toRetryTask(TaskContext.TaskInfo taskInfo);
List<RetryPartitionTask> toRetryPartitionTasks(List<RetryTask> retryTasks);
}

View File

@ -5,15 +5,21 @@ import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.IdempotentStrategy;
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
import com.aizuda.easy.retry.server.common.dto.ScanTask;
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask;
import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter;
import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutor;
import com.aizuda.easy.retry.server.retry.task.support.timer.TimerWheelHandler;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
@ -22,6 +28,8 @@ import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
* 数据扫描模板类
@ -61,42 +69,66 @@ public abstract class AbstractScanGroup extends AbstractActor {
protected void doScan(final ScanTask scanTask) {
LocalDateTime lastAt = LocalDateTime.now().minusDays(systemProperties.getLastDays());
int retryPullPageSize = systemProperties.getRetryPullPageSize();
// LocalDateTime lastAt = LocalDateTime.now().minusDays(systemProperties.getLastDays());
String groupName = scanTask.getGroupName();
Long lastId = Optional.ofNullable(getLastId(groupName)).orElse(0L);
int retryPullPageSize = systemProperties.getRetryPullPageSize();
// 扫描当前Group 待处理的任务
List<RetryTask> list = listAvailableTasks(groupName, lastAt, lastId, retryPullPageSize, taskActuatorScene().getScene());
if (!CollectionUtils.isEmpty(list)) {
// 更新拉取的最大的id
putLastId(scanTask.getGroupName(), list.get(list.size() - 1).getId());
for (RetryTask retryTask : list) {
// 已经存在时间轮里面的任务由时间轮负责调度
boolean existed = TimerWheelHandler.isExisted(retryTask.getGroupName(), retryTask.getUniqueId());
if (existed) {
continue;
}
for (TaskExecutor taskExecutor : taskExecutors) {
if (taskActuatorScene().getScene() == taskExecutor.getTaskType().getScene()) {
taskExecutor.actuator(retryTask);
AtomicInteger count = new AtomicInteger(0);
long total = PartitionTaskUtils.process(startId -> {
// 没10秒触发一次扫描任务每次扫描N次
int i = count.incrementAndGet();
// TODO 需要支持动态计算循环拉取多少次
if (i > 5) {
return Lists.newArrayList();
}
}
return listAvailableTasks(groupName, startId, taskActuatorScene().getTaskType().getType());
},
partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask), lastId);
}
private void processRetryPartitionTasks(List<? extends PartitionTask> partitionTasks, ScanTask scanTask) {
if (!CollectionUtils.isEmpty(partitionTasks)) {
// TODO 更新拉取的最大的id
putLastId(scanTask.getGroupName(), partitionTasks.get(partitionTasks.size() - 1).getId());
for (PartitionTask partitionTask : partitionTasks) {
processRetryTask((RetryPartitionTask) partitionTask);
// 已经存在时间轮里面的任务由时间轮负责调度
// boolean existed = TimerWheelHandler.isExisted(retryTask.getGroupName(), retryTask.getUniqueId());
// if (existed) {
// continue;
// }
//
// for (TaskExecutor taskExecutor : taskExecutors) {
// if (taskActuatorScene().getScene() == taskExecutor.getTaskType().getScene()) {
// taskExecutor.actuator(retryTask);
// }
// }
}
} else {
// 数据为空则休眠5s
try {
Thread.sleep((10 / 2) * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
putLastId(groupName, 0L);
// // 数据为空则休眠5s
// try {
// Thread.sleep((10 / 2) * 1000);
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// }
putLastId(scanTask.getGroupName(), 0L);
}
}
private void processRetryTask(RetryPartitionTask partitionTask) {
// 更新触发时间, 任务进入时间轮
// WaitStrategies.WaitStrategyEnum.getWaitStrategy(partitionTask)
// waitStrategy.computeRetryTime(retryContext);
}
@ -106,24 +138,19 @@ public abstract class AbstractScanGroup extends AbstractActor {
protected abstract void putLastId(String groupName, Long lastId);
public List<RetryTask> listAvailableTasks(String groupName,
LocalDateTime lastAt,
Long lastId,
Integer pageSize,
Integer taskType) {
return accessTemplate.getRetryTaskAccess().listPage(groupName, new PageDTO<>(0, pageSize),
new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())
.eq(RetryTask::getGroupName, groupName)
.eq(RetryTask::getTaskType, taskType)
// TODO 提前10秒把需要执行的任务拉取出来
.le(RetryTask::getNextTriggerAt, LocalDateTime.now().plusSeconds(10))
.gt(RetryTask::getId, lastId)
// TODO 验证一下lastAt会不会改变
.gt(RetryTask::getCreateDt, lastAt)
.orderByAsc(RetryTask::getId)
.orderByAsc(RetryTask::getCreateDt))
public List<RetryPartitionTask> listAvailableTasks(String groupName, Long lastId, Integer taskType) {
List<RetryTask> retryTasks = accessTemplate.getRetryTaskAccess().listPage(groupName, new PageDTO<>(0, systemProperties.getRetryPullPageSize()),
new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus())
.eq(RetryTask::getGroupName, groupName).eq(RetryTask::getTaskType, taskType)
// TODO 提前10秒把需要执行的任务拉取出来
.le(RetryTask::getNextTriggerAt, LocalDateTime.now().plusSeconds(10)).gt(RetryTask::getId, lastId)
// TODO 验证一下lastAt会不会改变
// .gt(RetryTask::getCreateDt, lastAt)
.orderByAsc(RetryTask::getId))
.getRecords();
return RetryTaskConverter.INSTANCE.toRetryPartitionTasks(retryTasks);
}
}

View File

@ -68,7 +68,7 @@ public class RetryExecutor<V> {
}
// 计算下次触发时间
retryContext.getRetryTask().setNextTriggerAt(waitStrategy.computeRetryTime(retryContext));
// retryContext.getRetryTask().setNextTriggerAt();
boolean isStop = Boolean.TRUE;