feat(1.5.0-beta1): 优化滑动窗口

This commit is contained in:
opensnail 2025-04-05 19:49:34 +08:00
parent 15e341fcac
commit 34e826cd97
2 changed files with 48 additions and 88 deletions

View File

@ -5,6 +5,7 @@ import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.window.Listener;
import com.aizuda.snailjob.common.log.SnailJobLog;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
@ -108,9 +109,6 @@ public class SlidingWindow<T> {
// 扫描n-1个窗口是否过期过期则删除
removeInvalidWindow();
// 超过窗口阈值预警
alarmWindowTotal();
} else {
oldWindowAdd(data);
}
@ -135,11 +133,11 @@ public class SlidingWindow<T> {
}
/**
* 扫描n-1个窗口是否过期过期则删除 过期条件为窗口期内无数据
* 扫描n-2个窗口是否过期过期则删除 过期条件为窗口期内无数据
*/
private void removeInvalidWindow() {
for (int i = 0; i < saveData.size() - 1; i++) {
for (int i = 0; i < saveData.size() - 2; i++) {
Map.Entry<LocalDateTime, ConcurrentLinkedQueue<T>> firstEntry = saveData.firstEntry();
if (CollUtil.isEmpty(firstEntry.getValue())) {
saveData.remove(firstEntry.getKey());
@ -159,8 +157,24 @@ public class SlidingWindow<T> {
return;
}
// 高并发下情况下出现刚刚获取的最后一个窗口被删除的情况
int count = 10;
ConcurrentLinkedQueue<T> list = saveData.get(windowPeriod);
while (Objects.isNull(list) && count > 0) {
count--;
windowPeriod = getNewWindowPeriod();
if (Objects.isNull(windowPeriod)) {
continue;
}
list = saveData.get(windowPeriod);
}
if (Objects.nonNull(list)) {
list.add(data);
} else {
// 这里一般走不到作为兜底
SnailJobLog.LOCAL.error("Data loss. [{}]", JsonUtil.toJsonString(data));
}
if (list.size() >= totalThreshold) {
doHandlerListener(windowPeriod);
@ -176,7 +190,8 @@ public class SlidingWindow<T> {
private void doHandlerListener(LocalDateTime windowPeriod) {
NOTICE_LOCK.lock();
// 深拷贝
ConcurrentLinkedQueue<T> deepCopy = null;
try {
ConcurrentLinkedQueue<T> list = saveData.get(windowPeriod);
@ -185,38 +200,28 @@ public class SlidingWindow<T> {
}
// 深拷贝
ConcurrentLinkedQueue<T> deepCopy = new ConcurrentLinkedQueue<>(list);
deepCopy = new ConcurrentLinkedQueue<>(list);
clear(windowPeriod, deepCopy);
if (CollUtil.isEmpty(deepCopy)) {
return;
}
for (Listener<T> listener : listeners) {
listener.handler(new ArrayList<>(deepCopy));
}
} catch (Exception e) {
SnailJobLog.LOCAL.error("到达总量窗口期通知异常", e);
SnailJobLog.LOCAL.error("deep copy task queue is error", e);
} finally {
NOTICE_LOCK.unlock();
}
if (!CollectionUtils.isEmpty(deepCopy)) {
try {
for (Listener<T> listener : listeners) {
listener.handler(new ArrayList<>(deepCopy));
}
} catch (Exception e) {
SnailJobLog.LOCAL.error("notice is error", e);
}
/**
* 删除2倍窗口期之前无效窗口
*
* @param windowPeriod 当前最老窗口期
*/
private void removeInvalidWindow(LocalDateTime windowPeriod) {
LocalDateTime currentTime = LocalDateTime.now().minus(duration * 2, chronoUnit);
if (windowPeriod.isBefore(currentTime)) {
SnailJobLog.LOCAL.debug("删除过期窗口 windowPeriod:[{}] currentTime:[{}]", windowPeriod, currentTime);
saveData.remove(windowPeriod);
}
}
/**
@ -248,6 +253,19 @@ public class SlidingWindow<T> {
}
}
/**
* 删除2倍窗口期之前无效窗口
*
* @param windowPeriod 当前最老窗口期
*/
private void removeInvalidWindow(LocalDateTime windowPeriod) {
LocalDateTime currentTime = LocalDateTime.now().minus(duration * 2, chronoUnit);
if (windowPeriod.isBefore(currentTime) && CollUtil.isEmpty(saveData.get(windowPeriod))) {
saveData.remove(windowPeriod);
}
}
/**
* 是否开启新窗口期
*
@ -286,6 +304,9 @@ public class SlidingWindow<T> {
// 删除过期窗口期数据
removeInvalidWindow(windowPeriod);
// 超过窗口阈值预警
alarmWindowTotal();
if (windowPeriod.isBefore(condition)) {
SnailJobLog.LOCAL.debug("到达时间窗口期 [{}] [{}]", windowPeriod, JsonUtil.toJsonString(saveData));
doHandlerListener(windowPeriod);

View File

@ -1,61 +0,0 @@
package com.aizuda.snailjob.server.support.generator.id;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.generator.id.IdGenerator;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
/**
* 测试多线程情况下号段模式的运行情况
*
* @author: opensnail
* @date : 2023-05-06 09:16
* @since 1.2.0
*/
@SpringBootTest
@Slf4j
public class SegmentIdGeneratorTest {
@Autowired
@Qualifier("segmentIdGenerator")
private IdGenerator idGenerator;
@Test
public void idGeneratorTest() throws InterruptedException {
// step: 100, cpu: 4 memory: 16G, 线程1-150之间出现异常id:-3的情况较少
// 如果有特殊需求可以提高step 可通过系统配置: SystemProperties#step属性进行配置
Set<String> idSet = new HashSet<>();
int size = 500;
CountDownLatch count = new CountDownLatch(size);
for (int i = 0; i < size; i++) {
new Thread(new Runnable() {
@Override
public void run() {
count.countDown();
String id = idGenerator.idGenerator("example_group", SystemConstants.DEFAULT_NAMESPACE);
SnailJobLog.LOCAL.info("id:[{}]", id);
if (Long.parseLong(id) < 0) {
throw new SnailJobServerException("exception id");
} else if (idSet.contains(id)) {
throw new SnailJobServerException("duplicate id [{}]", id);
} else {
idSet.add(id);
}
}
}).start();
}
count.await();
}
}