diff --git a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/ScanTask.java b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/ScanTask.java index c5148a7f3..0e92639d6 100644 --- a/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/ScanTask.java +++ b/snail-job-server/snail-job-server-common/src/main/java/com/aizuda/snailjob/server/common/dto/ScanTask.java @@ -14,11 +14,8 @@ import java.util.Set; @Data public class ScanTask { -// private String namespaceId; -// -// private String groupName; - private Set buckets; -// private Integer groupPartition; + private String bucketStr; + } diff --git a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java index 18d72217e..f5ad958f5 100644 --- a/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java +++ b/snail-job-server/snail-job-server-retry-task/src/main/java/com/aizuda/snailjob/server/retry/task/support/dispatch/ScanRetryActor.java @@ -38,6 +38,7 @@ import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.util.*; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.stream.Collectors; /** @@ -53,6 +54,7 @@ import java.util.stream.Collectors; @Slf4j @RequiredArgsConstructor public class ScanRetryActor extends AbstractActor { + public static final ConcurrentSkipListSet REPEATED_PULL= new ConcurrentSkipListSet<>(new ArrayList<>()); private final SystemProperties systemProperties; private final AccessTemplate accessTemplate; private final RetryMapper retryMapper; @@ -67,6 +69,8 @@ public class ScanRetryActor extends AbstractActor { doScan(config); } catch (Exception e) { SnailJobLog.LOCAL.error("Data scanner processing exception. [{}]", config, e); + } finally { + REPEATED_PULL.remove(config.getBucketStr()); } }).build(); diff --git a/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java b/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java index b1d6ec90f..7ba353d8a 100644 --- a/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java +++ b/snail-job-server/snail-job-server-starter/src/main/java/com/aizuda/snailjob/server/starter/dispatch/ConsumerBucketActor.java @@ -1,5 +1,7 @@ package com.aizuda.snailjob.server.starter.dispatch; +import cn.hutool.core.util.StrUtil; +import com.aizuda.snailjob.server.retry.task.support.dispatch.ScanRetryActor; import org.apache.pekko.actor.AbstractActor; import org.apache.pekko.actor.ActorRef; import cn.hutool.core.collection.CollUtil; @@ -71,10 +73,17 @@ public class ConsumerBucketActor extends AbstractActor { List> partitions = Lists.partition(new ArrayList<>(totalBuckets), (totalBuckets.size() + retryMaxPullParallel - 1) / retryMaxPullParallel); for (List buckets : partitions) { + String key = StrUtil.join(StrUtil.COMMA, new TreeSet<>(buckets)); + if (ScanRetryActor.REPEATED_PULL.contains(key)) { + SnailJobLog.LOCAL.warn("Discard the current scanning task because there are ongoing tasks in the current batch.[{}]", key); + continue; + } ScanTask scanTask = new ScanTask(); + scanTask.setBucketStr(key); scanTask.setBuckets(new HashSet<>(buckets)); ActorRef scanRetryActorRef = ActorGenerator.scanRetryActor(); scanRetryActorRef.tell(scanTask, scanRetryActorRef); + ScanRetryActor.REPEATED_PULL.add(key); } }