From 4de55b5fd350a12f6e1e01098d0b6fd03b85d3fd Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Sun, 6 Apr 2025 08:45:52 +0800 Subject: [PATCH] =?UTF-8?q?feat(1.5.0-beta1):=20=E9=87=8D=E8=AF=95?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E6=94=AF=E6=8C=81=E9=98=B2=E6=AD=A2=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E9=87=8D=E5=A4=8D=E6=8B=89=E5=8F=96=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/aizuda/snailjob/server/common/dto/ScanTask.java | 7 ++----- .../retry/task/support/dispatch/ScanRetryActor.java | 4 ++++ .../server/starter/dispatch/ConsumerBucketActor.java | 9 +++++++++ 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/ScanTask.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/ScanTask.java index c5148a7f3..0e92639d6 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/ScanTask.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/ScanTask.java @@ -14,11 +14,8 @@ import java.util.Set; @Data public class ScanTask { -// private String namespaceId; -// -// private String groupName; - private Set buckets; -// private Integer groupPartition; + private String bucketStr; + } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java index 18d72217e..f5ad958f5 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java @@ -38,6 +38,7 @@ import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.util.*; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.stream.Collectors; /** @@ -53,6 +54,7 @@ import java.util.stream.Collectors; @Slf4j @RequiredArgsConstructor public class ScanRetryActor extends AbstractActor { + public static final ConcurrentSkipListSet REPEATED_PULL= new ConcurrentSkipListSet<>(new ArrayList<>()); private final SystemProperties systemProperties; private final AccessTemplate accessTemplate; private final RetryMapper retryMapper; @@ -67,6 +69,8 @@ public class ScanRetryActor extends AbstractActor { doScan(config); } catch (Exception e) { SnailJobLog.LOCAL.error("Data scanner processing exception. [{}]", config, e); + } finally { + REPEATED_PULL.remove(config.getBucketStr()); } }).build(); diff --git a/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java b/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java index b1d6ec90f..7ba353d8a 100644 --- a/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java +++ b/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java @@ -1,5 +1,7 @@ package com.aizuda.snailjob.server.starter.dispatch; +import cn.hutool.core.util.StrUtil; +import com.aizuda.snailjob.server.retry.task.support.dispatch.ScanRetryActor; import org.apache.pekko.actor.AbstractActor; import org.apache.pekko.actor.ActorRef; import cn.hutool.core.collection.CollUtil; @@ -71,10 +73,17 @@ public class ConsumerBucketActor extends AbstractActor { List> partitions = Lists.partition(new ArrayList<>(totalBuckets), (totalBuckets.size() + retryMaxPullParallel - 1) / retryMaxPullParallel); for (List buckets : partitions) { + String key = StrUtil.join(StrUtil.COMMA, new TreeSet<>(buckets)); + if (ScanRetryActor.REPEATED_PULL.contains(key)) { + SnailJobLog.LOCAL.warn("Discard the current scanning task because there are ongoing tasks in the current batch.[{}]", key); + continue; + } ScanTask scanTask = new ScanTask(); + scanTask.setBucketStr(key); scanTask.setBuckets(new HashSet<>(buckets)); ActorRef scanRetryActorRef = ActorGenerator.scanRetryActor(); scanRetryActorRef.tell(scanTask, scanRetryActorRef); + ScanRetryActor.REPEATED_PULL.add(key); } }