diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java index 0928a9f5..ac3a7845 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/config/SystemProperties.java @@ -25,7 +25,7 @@ public class SystemProperties { /** * 重试每次拉取的条数 */ - private int retryPullPageSize = 100; + private int retryPullPageSize = 1000; /** * netty 端口 diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/PartitionTaskUtils.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/PartitionTaskUtils.java index 3b5d3ab1..6d58d814 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/PartitionTaskUtils.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/util/PartitionTaskUtils.java @@ -19,8 +19,8 @@ public class PartitionTaskUtils { } public static long process( - Function> dataSource, Consumer> task, - long startId) { + Function> dataSource, Consumer> task, + long startId) { int total = 0; do { List products = dataSource.apply(startId); @@ -39,8 +39,8 @@ public class PartitionTaskUtils { } private static long maxId(List products) { - Optional max = products.stream().map(PartitionTask::getId).max(Long::compareTo); - return max.orElse(-1L) + 1; + // 使用的地方必须按照id正序排序 + return products.get(products.size() - 1).getId() + 1; } } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java index 7471bf9c..335f8f7b 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/handler/JobTaskBatchHandler.java @@ -12,6 +12,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMappe import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -36,10 +37,10 @@ public class JobTaskBatchHandler { new LambdaQueryWrapper().select(JobTask::getTaskStatus) .eq(JobTask::getTaskBatchId, taskBatchId)); - if (jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) { - return false; - } + if (jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) { + return false; + } long failCount = jobTasks.stream().filter(jobTask -> jobTask.getTaskStatus() == JobTaskBatchStatusEnum.FAIL.getStatus()).count(); long stopCount = jobTasks.stream().filter(jobTask -> jobTask.getTaskStatus() == JobTaskBatchStatusEnum.STOP.getStatus()).count(); @@ -56,9 +57,12 @@ public class JobTaskBatchHandler { if (Objects.nonNull(jobOperationReasonEnum)) { jobTaskBatch.setOperationReason(jobOperationReasonEnum.getReason()); } - jobTaskBatchMapper.updateById(jobTaskBatch); - return true; + return 1 == jobTaskBatchMapper.update(jobTaskBatch, + new LambdaUpdateWrapper() + .eq(JobTaskBatch::getId, taskBatchId) + .in(JobTaskBatch::getTaskBatchStatus, JobTaskStatusEnum.NOT_COMPLETE) + ); } } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/dto/RetryPartitionTask.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/dto/RetryPartitionTask.java new file mode 100644 index 00000000..387e57b4 --- /dev/null +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/dto/RetryPartitionTask.java @@ -0,0 +1,15 @@ +package com.aizuda.easy.retry.server.retry.task.dto; + +import com.aizuda.easy.retry.server.common.dto.PartitionTask; +import lombok.Data; +import lombok.EqualsAndHashCode; + +/** + * @author www.byteblogs.com + * @date 2023-10-25 22:23:24 + * @since 2.4.0 + */ +@EqualsAndHashCode(callSuper = true) +@Data +public class RetryPartitionTask extends PartitionTask { +} diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/RetryTaskConverter.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/RetryTaskConverter.java index 7cb278be..7a078061 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/RetryTaskConverter.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/RetryTaskConverter.java @@ -1,6 +1,7 @@ package com.aizuda.easy.retry.server.retry.task.support; import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO; +import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask; import com.aizuda.easy.retry.server.retry.task.generator.task.TaskContext; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryDeadLetter; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; @@ -29,9 +30,9 @@ public interface RetryTaskConverter { }) RetryTask toRetryTask(RetryDeadLetter retryDeadLetter); -// RetryTask toRetryTask(RetryTaskSaveRequestVO retryTaskSaveRequestVO); - List toRetryTaskList(List retryTaskDTOList); RetryTask toRetryTask(TaskContext.TaskInfo taskInfo); + + List toRetryPartitionTasks(List retryTasks); } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java index 53a6623c..9d07a853 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/dispatch/actor/scan/AbstractScanGroup.java @@ -5,15 +5,21 @@ import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.IdempotentStrategy; +import com.aizuda.easy.retry.server.common.dto.PartitionTask; import com.aizuda.easy.retry.server.common.dto.ScanTask; import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler; +import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils; +import com.aizuda.easy.retry.server.retry.task.dto.RetryPartitionTask; +import com.aizuda.easy.retry.server.retry.task.support.RetryTaskConverter; +import com.aizuda.easy.retry.server.retry.task.support.WaitStrategy; import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutor; -import com.aizuda.easy.retry.server.retry.task.support.timer.TimerWheelHandler; import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum; +import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies; import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; +import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -22,6 +28,8 @@ import org.springframework.util.CollectionUtils; import java.time.LocalDateTime; import java.util.List; import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; /** * 数据扫描模板类 @@ -61,42 +69,66 @@ public abstract class AbstractScanGroup extends AbstractActor { protected void doScan(final ScanTask scanTask) { - LocalDateTime lastAt = LocalDateTime.now().minusDays(systemProperties.getLastDays()); - int retryPullPageSize = systemProperties.getRetryPullPageSize(); +// LocalDateTime lastAt = LocalDateTime.now().minusDays(systemProperties.getLastDays()); String groupName = scanTask.getGroupName(); Long lastId = Optional.ofNullable(getLastId(groupName)).orElse(0L); + int retryPullPageSize = systemProperties.getRetryPullPageSize(); - // 扫描当前Group 待处理的任务 - List list = listAvailableTasks(groupName, lastAt, lastId, retryPullPageSize, taskActuatorScene().getScene()); - if (!CollectionUtils.isEmpty(list)) { - - // 更新拉取的最大的id - putLastId(scanTask.getGroupName(), list.get(list.size() - 1).getId()); - - for (RetryTask retryTask : list) { - // 已经存在时间轮里面的任务由时间轮负责调度 - boolean existed = TimerWheelHandler.isExisted(retryTask.getGroupName(), retryTask.getUniqueId()); - if (existed) { - continue; - } - - for (TaskExecutor taskExecutor : taskExecutors) { - if (taskActuatorScene().getScene() == taskExecutor.getTaskType().getScene()) { - taskExecutor.actuator(retryTask); + AtomicInteger count = new AtomicInteger(0); + long total = PartitionTaskUtils.process(startId -> { + // 没10秒触发一次扫描任务,每次扫描N次 + int i = count.incrementAndGet(); + // TODO 需要支持动态计算循环拉取多少次 + if (i > 5) { + return Lists.newArrayList(); } - } + return listAvailableTasks(groupName, startId, taskActuatorScene().getTaskType().getType()); + }, + partitionTasks -> processRetryPartitionTasks(partitionTasks, scanTask), lastId); + + } + + private void processRetryPartitionTasks(List partitionTasks, ScanTask scanTask) { + if (!CollectionUtils.isEmpty(partitionTasks)) { + + // TODO 更新拉取的最大的id + putLastId(scanTask.getGroupName(), partitionTasks.get(partitionTasks.size() - 1).getId()); + + for (PartitionTask partitionTask : partitionTasks) { + processRetryTask((RetryPartitionTask) partitionTask); + + // 已经存在时间轮里面的任务由时间轮负责调度 +// boolean existed = TimerWheelHandler.isExisted(retryTask.getGroupName(), retryTask.getUniqueId()); +// if (existed) { +// continue; +// } +// +// for (TaskExecutor taskExecutor : taskExecutors) { +// if (taskActuatorScene().getScene() == taskExecutor.getTaskType().getScene()) { +// taskExecutor.actuator(retryTask); +// } +// } } } else { - // 数据为空则休眠5s - try { - Thread.sleep((10 / 2) * 1000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - putLastId(groupName, 0L); +// // 数据为空则休眠5s +// try { +// Thread.sleep((10 / 2) * 1000); +// } catch (InterruptedException e) { +// Thread.currentThread().interrupt(); +// } + + putLastId(scanTask.getGroupName(), 0L); } + } + + private void processRetryTask(RetryPartitionTask partitionTask) { + + + // 更新触发时间, 任务进入时间轮 +// WaitStrategies.WaitStrategyEnum.getWaitStrategy(partitionTask) +// waitStrategy.computeRetryTime(retryContext); } @@ -106,24 +138,19 @@ public abstract class AbstractScanGroup extends AbstractActor { protected abstract void putLastId(String groupName, Long lastId); - public List listAvailableTasks(String groupName, - LocalDateTime lastAt, - Long lastId, - Integer pageSize, - Integer taskType) { - return accessTemplate.getRetryTaskAccess().listPage(groupName, new PageDTO<>(0, pageSize), - new LambdaQueryWrapper() - .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()) - .eq(RetryTask::getGroupName, groupName) - .eq(RetryTask::getTaskType, taskType) - // TODO 提前10秒把需要执行的任务拉取出来 - .le(RetryTask::getNextTriggerAt, LocalDateTime.now().plusSeconds(10)) - .gt(RetryTask::getId, lastId) - // TODO 验证一下lastAt会不会改变 - .gt(RetryTask::getCreateDt, lastAt) - .orderByAsc(RetryTask::getId) - .orderByAsc(RetryTask::getCreateDt)) + public List listAvailableTasks(String groupName, Long lastId, Integer taskType) { + List retryTasks = accessTemplate.getRetryTaskAccess().listPage(groupName, new PageDTO<>(0, systemProperties.getRetryPullPageSize()), + new LambdaQueryWrapper() + .eq(RetryTask::getRetryStatus, RetryStatusEnum.RUNNING.getStatus()) + .eq(RetryTask::getGroupName, groupName).eq(RetryTask::getTaskType, taskType) + // TODO 提前10秒把需要执行的任务拉取出来 + .le(RetryTask::getNextTriggerAt, LocalDateTime.now().plusSeconds(10)).gt(RetryTask::getId, lastId) + // TODO 验证一下lastAt会不会改变 +// .gt(RetryTask::getCreateDt, lastAt) + .orderByAsc(RetryTask::getId)) .getRecords(); + + return RetryTaskConverter.INSTANCE.toRetryPartitionTasks(retryTasks); } } diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/retry/RetryExecutor.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/retry/RetryExecutor.java index f3124fa0..d888866d 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/retry/RetryExecutor.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/retry/RetryExecutor.java @@ -68,7 +68,7 @@ public class RetryExecutor { } // 计算下次触发时间 - retryContext.getRetryTask().setNextTriggerAt(waitStrategy.computeRetryTime(retryContext)); +// retryContext.getRetryTask().setNextTriggerAt(); boolean isStop = Boolean.TRUE;