From f1265f76597581edd675575a6b8ff16d7fcdb487 Mon Sep 17 00:00:00 2001 From: opensnail <598092184@qq.com> Date: Sat, 5 Apr 2025 19:49:34 +0800 Subject: [PATCH] =?UTF-8?q?feat(1.5.0-beta1):=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E6=BB=91=E5=8A=A8=E7=AA=97=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../client/common/window/SlidingWindow.java | 75 ++++++++++++------- .../generator/id/SegmentIdGeneratorTest.java | 61 --------------- 2 files changed, 48 insertions(+), 88 deletions(-) delete mode 100644 snail-job-server/snail-job-server-starter/src/test/java/com/aizuda/snailjob/server/support/generator/id/SegmentIdGeneratorTest.java diff --git a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/window/SlidingWindow.java b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/window/SlidingWindow.java index 5850800ff..2e09a15c5 100644 --- a/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/window/SlidingWindow.java +++ b/snail-job-client/snail-job-client-common/src/main/java/com/aizuda/snailjob/client/common/window/SlidingWindow.java @@ -5,6 +5,7 @@ import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.core.window.Listener; import com.aizuda.snailjob.common.log.SnailJobLog; import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; import java.time.LocalDateTime; import java.time.temporal.ChronoUnit; @@ -108,9 +109,6 @@ public class SlidingWindow { // 扫描n-1个窗口,是否过期,过期则删除 removeInvalidWindow(); - // 超过窗口阈值预警 - alarmWindowTotal(); - } else { oldWindowAdd(data); } @@ -135,11 +133,11 @@ public class SlidingWindow { } /** - * 扫描n-1个窗口,是否过期,过期则删除 过期条件为窗口期内无数据 + * 扫描n-2个窗口,是否过期,过期则删除 过期条件为窗口期内无数据 */ private void removeInvalidWindow() { - for (int i = 0; i < saveData.size() - 1; i++) { + for (int i = 0; i < saveData.size() - 2; i++) { Map.Entry> firstEntry = saveData.firstEntry(); if (CollUtil.isEmpty(firstEntry.getValue())) { saveData.remove(firstEntry.getKey()); @@ -159,8 +157,24 @@ public class SlidingWindow { return; } + // 高并发下情况下出现刚刚获取的最后一个窗口被删除的情况 + int count = 10; ConcurrentLinkedQueue list = saveData.get(windowPeriod); - list.add(data); + while (Objects.isNull(list) && count > 0) { + count--; + windowPeriod = getNewWindowPeriod(); + if (Objects.isNull(windowPeriod)) { + continue; + } + list = saveData.get(windowPeriod); + } + + if (Objects.nonNull(list)) { + list.add(data); + } else { + // 这里一般走不到,作为兜底 + SnailJobLog.LOCAL.error("Data loss. [{}]", JsonUtil.toJsonString(data)); + } if (list.size() >= totalThreshold) { doHandlerListener(windowPeriod); @@ -176,7 +190,8 @@ public class SlidingWindow { private void doHandlerListener(LocalDateTime windowPeriod) { NOTICE_LOCK.lock(); - + // 深拷贝 + ConcurrentLinkedQueue deepCopy = null; try { ConcurrentLinkedQueue list = saveData.get(windowPeriod); @@ -185,38 +200,28 @@ public class SlidingWindow { } // 深拷贝 - ConcurrentLinkedQueue deepCopy = new ConcurrentLinkedQueue<>(list); + deepCopy = new ConcurrentLinkedQueue<>(list); clear(windowPeriod, deepCopy); if (CollUtil.isEmpty(deepCopy)) { return; } - for (Listener listener : listeners) { - listener.handler(new ArrayList<>(deepCopy)); - } - } catch (Exception e) { - SnailJobLog.LOCAL.error("到达总量窗口期通知异常", e); + SnailJobLog.LOCAL.error("deep copy task queue is error", e); } finally { NOTICE_LOCK.unlock(); } - } - - /** - * 删除2倍窗口期之前无效窗口 - * - * @param windowPeriod 当前最老窗口期 - */ - private void removeInvalidWindow(LocalDateTime windowPeriod) { - - LocalDateTime currentTime = LocalDateTime.now().minus(duration * 2, chronoUnit); - if (windowPeriod.isBefore(currentTime)) { - SnailJobLog.LOCAL.debug("删除过期窗口 windowPeriod:[{}] currentTime:[{}]", windowPeriod, currentTime); - saveData.remove(windowPeriod); + if (!CollectionUtils.isEmpty(deepCopy)) { + try { + for (Listener listener : listeners) { + listener.handler(new ArrayList<>(deepCopy)); + } + } catch (Exception e) { + SnailJobLog.LOCAL.error("notice is error", e); + } } - } /** @@ -248,6 +253,19 @@ public class SlidingWindow { } } + /** + * 删除2倍窗口期之前无效窗口 + * + * @param windowPeriod 当前最老窗口期 + */ + private void removeInvalidWindow(LocalDateTime windowPeriod) { + + LocalDateTime currentTime = LocalDateTime.now().minus(duration * 2, chronoUnit); + if (windowPeriod.isBefore(currentTime) && CollUtil.isEmpty(saveData.get(windowPeriod))) { + saveData.remove(windowPeriod); + } + } + /** * 是否开启新窗口期 * @@ -286,6 +304,9 @@ public class SlidingWindow { // 删除过期窗口期数据 removeInvalidWindow(windowPeriod); + // 超过窗口阈值预警 + alarmWindowTotal(); + if (windowPeriod.isBefore(condition)) { SnailJobLog.LOCAL.debug("到达时间窗口期 [{}] [{}]", windowPeriod, JsonUtil.toJsonString(saveData)); doHandlerListener(windowPeriod); diff --git a/snail-job-server/snail-job-server-starter/src/test/java/com/aizuda/snailjob/server/support/generator/id/SegmentIdGeneratorTest.java b/snail-job-server/snail-job-server-starter/src/test/java/com/aizuda/snailjob/server/support/generator/id/SegmentIdGeneratorTest.java deleted file mode 100644 index 3ed5840da..000000000 --- a/snail-job-server/snail-job-server-starter/src/test/java/com/aizuda/snailjob/server/support/generator/id/SegmentIdGeneratorTest.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.aizuda.snailjob.server.support.generator.id; - -import com.aizuda.snailjob.common.core.constant.SystemConstants; -import com.aizuda.snailjob.common.log.SnailJobLog; -import com.aizuda.snailjob.server.common.exception.SnailJobServerException; -import com.aizuda.snailjob.server.common.generator.id.IdGenerator; -import lombok.extern.slf4j.Slf4j; -import org.junit.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.boot.test.context.SpringBootTest; - -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.CountDownLatch; - -/** - * 测试多线程情况下号段模式的运行情况 - * - * @author: opensnail - * @date : 2023-05-06 09:16 - * @since 1.2.0 - */ -@SpringBootTest -@Slf4j -public class SegmentIdGeneratorTest { - - @Autowired - @Qualifier("segmentIdGenerator") - private IdGenerator idGenerator; - - @Test - public void idGeneratorTest() throws InterruptedException { - - // step: 100, cpu: 4 memory: 16G, 线程1-150之间出现异常id:-3的情况较少。 - // 如果有特殊需求可以提高step, 可通过系统配置: SystemProperties#step属性进行配置 - Set idSet = new HashSet<>(); - int size = 500; - CountDownLatch count = new CountDownLatch(size); - for (int i = 0; i < size; i++) { - new Thread(new Runnable() { - @Override - public void run() { - count.countDown(); - - String id = idGenerator.idGenerator("example_group", SystemConstants.DEFAULT_NAMESPACE); - SnailJobLog.LOCAL.info("id:[{}]", id); - if (Long.parseLong(id) < 0) { - throw new SnailJobServerException("exception id"); - } else if (idSet.contains(id)) { - throw new SnailJobServerException("duplicate id [{}]", id); - } else { - idSet.add(id); - } - } - }).start(); - } - - count.await(); - } -}