feat(1.5.0-beta1): 重试任务支持防止任务重复拉取配置

This commit is contained in:
opensnail 2025-04-06 08:45:52 +08:00
parent 34e826cd97
commit 4de55b5fd3
3 changed files with 15 additions and 5 deletions

View File

@ -14,11 +14,8 @@ import java.util.Set;
@Data @Data
public class ScanTask { public class ScanTask {
// private String namespaceId;
//
// private String groupName;
private Set<Integer> buckets; private Set<Integer> buckets;
// private Integer groupPartition; private String bucketStr;
} }

View File

@ -38,6 +38,7 @@ import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -53,6 +54,7 @@ import java.util.stream.Collectors;
@Slf4j @Slf4j
@RequiredArgsConstructor @RequiredArgsConstructor
public class ScanRetryActor extends AbstractActor { public class ScanRetryActor extends AbstractActor {
public static final ConcurrentSkipListSet<String> REPEATED_PULL= new ConcurrentSkipListSet<>(new ArrayList<>());
private final SystemProperties systemProperties; private final SystemProperties systemProperties;
private final AccessTemplate accessTemplate; private final AccessTemplate accessTemplate;
private final RetryMapper retryMapper; private final RetryMapper retryMapper;
@ -67,6 +69,8 @@ public class ScanRetryActor extends AbstractActor {
doScan(config); doScan(config);
} catch (Exception e) { } catch (Exception e) {
SnailJobLog.LOCAL.error("Data scanner processing exception. [{}]", config, e); SnailJobLog.LOCAL.error("Data scanner processing exception. [{}]", config, e);
} finally {
REPEATED_PULL.remove(config.getBucketStr());
} }
}).build(); }).build();

View File

@ -1,5 +1,7 @@
package com.aizuda.snailjob.server.starter.dispatch; package com.aizuda.snailjob.server.starter.dispatch;
import cn.hutool.core.util.StrUtil;
import com.aizuda.snailjob.server.retry.task.support.dispatch.ScanRetryActor;
import org.apache.pekko.actor.AbstractActor; import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef; import org.apache.pekko.actor.ActorRef;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
@ -71,10 +73,17 @@ public class ConsumerBucketActor extends AbstractActor {
List<List<Integer>> partitions = Lists.partition(new ArrayList<>(totalBuckets), List<List<Integer>> partitions = Lists.partition(new ArrayList<>(totalBuckets),
(totalBuckets.size() + retryMaxPullParallel - 1) / retryMaxPullParallel); (totalBuckets.size() + retryMaxPullParallel - 1) / retryMaxPullParallel);
for (List<Integer> buckets : partitions) { for (List<Integer> buckets : partitions) {
String key = StrUtil.join(StrUtil.COMMA, new TreeSet<>(buckets));
if (ScanRetryActor.REPEATED_PULL.contains(key)) {
SnailJobLog.LOCAL.warn("Discard the current scanning task because there are ongoing tasks in the current batch.[{}]", key);
continue;
}
ScanTask scanTask = new ScanTask(); ScanTask scanTask = new ScanTask();
scanTask.setBucketStr(key);
scanTask.setBuckets(new HashSet<>(buckets)); scanTask.setBuckets(new HashSet<>(buckets));
ActorRef scanRetryActorRef = ActorGenerator.scanRetryActor(); ActorRef scanRetryActorRef = ActorGenerator.scanRetryActor();
scanRetryActorRef.tell(scanTask, scanRetryActorRef); scanRetryActorRef.tell(scanTask, scanRetryActorRef);
ScanRetryActor.REPEATED_PULL.add(key);
} }
} }