滑动窗口 替换为 环形滑动窗口
This commit is contained in:
byteblogs168 2023-01-14 20:48:22 +08:00
parent 3b04af96e4
commit 054bef4a8b
8 changed files with 625 additions and 27 deletions

View File

@ -1,16 +1,21 @@
package com.example;
//import com.example.client.DemoClient;
import com.example.client.DemoClient;
import com.example.mapper.SchoolMapper;
import com.example.po.School;
import com.x.retry.client.core.report.ReportHandler;
import com.x.retry.client.core.window.RetryLeapArray;
import com.x.retry.common.core.model.Result;
import com.x.retry.server.model.dto.RetryTaskDTO;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.ConcurrentLinkedQueue;
@SpringBootTest
@Slf4j
public class ExampleApplicationTests {
@ -21,6 +26,8 @@ public class ExampleApplicationTests {
@Autowired
private RestTemplate restTemplate;
private RetryLeapArray retryLeapArray = ReportHandler.slidingWindow;
@Autowired(required = false)
private DemoClient demoClient;
@ -44,6 +51,21 @@ public class ExampleApplicationTests {
System.out.println(template);
}
@SneakyThrows
@Test
public void windows() {
for (int i = 0; i < 100000; i++) {
int finalI = i;
new Thread(() -> {
RetryTaskDTO retryTaskDTO = new RetryTaskDTO();
retryTaskDTO.setBizId(finalI + "");
ConcurrentLinkedQueue<RetryTaskDTO> value = retryLeapArray.currentWindow().value();
value.add(retryTaskDTO);
}).start();
}
Thread.sleep(5 * 1000);
}
}

View File

@ -89,7 +89,7 @@ public class ExistsTransactionalRetryServiceTest {
.thenReturn(new Result(0, "5"))
;
try {
for (int i = 0; i < 100; i++) {
for (int i = 0; i < 1000; i++) {
threadPoolExecutor.execute(() -> testExistsTransactionalRetryService.testSimpleInsert(UUID.randomUUID().toString()));
}
} catch (Exception e) {

View File

@ -9,18 +9,21 @@ import com.x.retry.client.core.exception.XRetryClientException;
import com.x.retry.client.core.intercepter.RetrySiteSnapshot;
import com.x.retry.client.core.retryer.RetryerInfo;
import com.x.retry.client.core.spel.SPELParamFunction;
import com.x.retry.client.core.window.RetryLeapArray;
import com.x.retry.common.core.log.LogUtils;
import com.x.retry.common.core.window.Listener;
import com.x.retry.common.core.window.SlidingWindow;
import com.x.retry.common.core.window.WindowWrap;
import com.x.retry.server.model.dto.RetryTaskDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Method;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
/**
@ -28,13 +31,20 @@ import java.util.function.Function;
* @date : 2022-03-08 09:24
*/
@Component
@Slf4j
public class ReportHandler implements Lifecycle {
@Autowired
@Qualifier("XRetryJacksonSerializer")
private RetryArgSerializer retryArgSerializer;
private static SlidingWindow<RetryTaskDTO> slidingWindow;
public static final int SAMPLE_COUNT = 10;
public static final int INTERVAL_IN_MS = 1000;
private static ScheduledExecutorService dispatchService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "DispatchService"));
public static RetryLeapArray slidingWindow = new RetryLeapArray(SAMPLE_COUNT, INTERVAL_IN_MS, new ReportListener());
/**
* 异步上报到服务端
@ -50,7 +60,7 @@ public class ReportHandler implements Lifecycle {
Method executorMethod = retryerInfo.getExecutorMethod();
RetryTaskDTO retryTaskDTO = new RetryTaskDTO();
String bizId = null;
String bizId;
try {
Class<? extends BizIdGenerate> bizIdGenerate = retryerInfo.getBizIdGenerate();
BizIdGenerate generate = bizIdGenerate.newInstance();
@ -73,31 +83,21 @@ public class ReportHandler implements Lifecycle {
Function<Object[], String> spelParamFunction = new SPELParamFunction(bizNoSpel, executorMethod);
retryTaskDTO.setBizNo(spelParamFunction.apply(args));
slidingWindow.add(retryTaskDTO);
slidingWindow.currentWindow().value().add(retryTaskDTO);
return true;
}
@Override
public void start() {
dispatchService.scheduleAtFixedRate(() -> {
slidingWindow.currentWindow();
}, INTERVAL_IN_MS, INTERVAL_IN_MS / SAMPLE_COUNT, TimeUnit.MILLISECONDS);
Listener<RetryTaskDTO> reportListener = new ReportListener();
slidingWindow = SlidingWindow
.Builder
.<RetryTaskDTO>newBuilder()
.withTotalThreshold(50)
.withDuration(5, ChronoUnit.SECONDS)
.withListener(reportListener)
.build();
slidingWindow.start();
}
@Override
public void close() {
if (Objects.nonNull(slidingWindow)) {
slidingWindow.end();
}
}
}

View File

@ -97,7 +97,7 @@ public class LocalRetryStrategies extends AbstractRetryStrategies {
log.info("内存重试完成且异常未被解决 scene:[{}]", retryerInfo.getScene());
// 上报服务端异常
if (RetryType.LOCAL_REMOTE.name().equals(retryerInfo.getRetryType().name())){
// TODO上报
// 上报
log.debug("上报 scene:[{}]", retryerInfo.getScene());
reportHandler.report(retryerInfo.getScene(), retryerInfo.getExecutorClassName(), params);
}

View File

@ -0,0 +1,69 @@
package com.x.retry.client.core.window;
import com.x.retry.common.core.util.JsonUtil;
import com.x.retry.common.core.window.LeapArray;
import com.x.retry.common.core.window.Listener;
import com.x.retry.common.core.window.WindowWrap;
import com.x.retry.server.model.dto.RetryTaskDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* @author: www.byteblogs.com
* @date : 2022-05-28 15:07
*/
@Slf4j
public class RetryLeapArray extends LeapArray<ConcurrentLinkedQueue<RetryTaskDTO>> {
protected List<Listener<RetryTaskDTO>> listenerList;
/**
* The total bucket count is: {@code sampleCount = intervalInMs / windowLengthInMs}.
*
* @param sampleCount bucket count of the sliding window
* @param intervalInMs the total time interval of this {@link LeapArray} in milliseconds
*/
public RetryLeapArray(int sampleCount, int intervalInMs) {
super(sampleCount, intervalInMs);
}
public RetryLeapArray(int sampleCount, int intervalInMs, List<Listener<RetryTaskDTO>> listenerList) {
super(sampleCount, intervalInMs);
this.listenerList = listenerList;
}
public RetryLeapArray(int sampleCount, int intervalInMs, Listener<RetryTaskDTO> listener) {
this(sampleCount, intervalInMs, Collections.singletonList(listener));
}
@Override
public ConcurrentLinkedQueue<RetryTaskDTO> newEmptyBucket(long timeMillis) {
return new ConcurrentLinkedQueue<>();
}
@Override
protected WindowWrap<ConcurrentLinkedQueue<RetryTaskDTO>> resetWindowTo(WindowWrap<ConcurrentLinkedQueue<RetryTaskDTO>> windowWrap, long startTime) {
ConcurrentLinkedQueue<RetryTaskDTO> deepCopy = new ConcurrentLinkedQueue<>(windowWrap.value());
try {
if (!CollectionUtils.isEmpty(deepCopy)) {
for (Listener<RetryTaskDTO> listener : listenerList) {
listener.handler(new ArrayList<>(deepCopy));
}
}
} catch (Exception e) {
log.error("滑动窗口监听器处理失败 data:[{}]", JsonUtil.toJsonString(windowWrap.value()), e);
}
windowWrap.value().removeAll(deepCopy);
windowWrap.resetTo(startTime);
return windowWrap;
}
}

View File

@ -1,13 +1,12 @@
package com.x.retry.common.client.core.xretryclientcore;
import com.x.retry.client.core.window.RetryLeapArray;
import com.x.retry.server.model.dto.RetryTaskDTO;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class XRetryClientCoreApplicationTests {
@Test
void contextLoads() {
}
}

View File

@ -0,0 +1,406 @@
package com.x.retry.common.core.window;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.ReentrantLock;
/**
* <p>
* Basic data structure for statistic metrics in Sentinel.
* </p>
* <p>
* Leap array use sliding window algorithm to count data. Each bucket cover {@code windowLengthInMs} time span,
* and the total time span is {@link #intervalInMs}, so the total bucket amount is:
* {@code sampleCount = intervalInMs / windowLengthInMs}.
* </p>
*
* @param <T> type of statistic data
* @author jialiang.linjl
* @author Eric Zhao
* @author Carpenter Lee
*/
@Slf4j
public abstract class LeapArray<T> {
protected int windowLengthInMs; // 窗口长度
protected int sampleCount; // 窗口数
protected int intervalInMs; // 总时间间隔
private double intervalInSecond;
protected final AtomicReferenceArray<WindowWrap<T>> array;
/**
* The conditional (predicate) update lock is used only when current bucket is deprecated.
*/
private final ReentrantLock updateLock = new ReentrantLock();
/**
* The total bucket count is: {@code sampleCount = intervalInMs / windowLengthInMs}.
*
* @param sampleCount bucket count of the sliding window
* @param intervalInMs the total time interval of this {@link LeapArray} in milliseconds
*/
public LeapArray(int sampleCount, int intervalInMs) {
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.intervalInSecond = intervalInMs / 1000.0;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
/**
* Get the bucket at current timestamp.
*
* @return the bucket at current timestamp
*/
public WindowWrap<T> currentWindow() {
return currentWindow(System.currentTimeMillis());
}
/**
* Create a new statistic value for bucket.
*
* @param timeMillis current time in milliseconds
* @return the new empty bucket
*/
public abstract T newEmptyBucket(long timeMillis);
/**
* Reset given bucket to provided start time and reset the value.
*
* @param startTime the start time of the bucket in milliseconds
* @param windowWrap current bucket
* @return new clean bucket at given start time
*/
protected abstract WindowWrap<T> resetWindowTo(WindowWrap<T> windowWrap, long startTime);
public int calculateTimeIdx(/*@Valid*/ long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
// Calculate current index so we can map the timestamp to the leap array.
return (int) (timeId % array.length());
}
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
/**
* Get bucket item at provided timestamp.
*
* @param timeMillis a valid timestamp in milliseconds
* @return current bucket item at provided timestamp if the time is valid; null if time is invalid
*/
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 计算出窗口在哪个 元素位置处
int idx = calculateTimeIdx(timeMillis);
// 计算窗口的开始时间
long windowStart = calculateWindowStart(timeMillis);
/*
* Get bucket item at given time from the array.
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
* (2) Bucket is up-to-date, then just return the bucket.
* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
*/
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* bucket is empty, so create new and update
*
* If the old bucket is absent, then we create a new bucket at {@code windowStart},
* then try to update circular array via a CAS operation. Only one thread can
* succeed to update, while other threads yield its time slice.
*/
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* startTime of Bucket 3: 800, so it's up-to-date
*
* If current {@code windowStart} is equal to the start timestamp of old bucket,
* that means the time is within the bucket, so directly return the bucket.
*/
return old;
} else if (windowStart > old.windowStart()) {
/*
* (old)
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* startTime of Bucket 2: 400, deprecated, should be reset
*
* If the start timestamp of old bucket is behind provided time, that means
* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
* Note that the reset and clean-up operations are hard to be atomic,
* so we need a update lock to guarantee the correctness of bucket update.
*
* The update lock is conditional (tiny scope) and will take effect only when
* bucket is deprecated, so in most cases it won't lead to performance loss.
*/
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
/**
* Get the previous bucket item before provided timestamp.
*
* @param timeMillis a valid timestamp in milliseconds
* @return the previous bucket item before provided timestamp
*/
public WindowWrap<T> getPreviousWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis - windowLengthInMs);
timeMillis = timeMillis - windowLengthInMs;
WindowWrap<T> wrap = array.get(idx);
if (wrap == null || isWindowDeprecated(wrap)) {
return null;
}
if (wrap.windowStart() + windowLengthInMs < (timeMillis)) {
return null;
}
return wrap;
}
/**
* Get the previous bucket item for current timestamp.
*
* @return the previous bucket item for current timestamp
*/
public WindowWrap<T> getPreviousWindow() {
return getPreviousWindow(System.currentTimeMillis());
}
/**
* Get statistic value from bucket for provided timestamp.
*
* @param timeMillis a valid timestamp in milliseconds
* @return the statistic value if bucket for provided timestamp is up-to-date; otherwise null
*/
public T getWindowValue(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis);
WindowWrap<T> bucket = array.get(idx);
if (bucket == null || !bucket.isTimeInWindow(timeMillis)) {
return null;
}
return bucket.value();
}
/**
* Check if a bucket is deprecated, which means that the bucket
* has been behind for at least an entire window time span.
*
* @param windowWrap a non-null bucket
* @return true if the bucket is deprecated; otherwise false
*/
public boolean isWindowDeprecated(/*@NonNull*/ WindowWrap<T> windowWrap) {
return isWindowDeprecated(System.currentTimeMillis(), windowWrap);
}
public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
return time - windowWrap.windowStart() > intervalInMs;
}
/**
* Get valid bucket list for entire sliding window.
* The list will only contain "valid" buckets.
*
* @return valid bucket list for entire sliding window.
*/
public List<WindowWrap<T>> list() {
return list(System.currentTimeMillis());
}
public List<WindowWrap<T>> list(long validTime) {
int size = array.length();
List<WindowWrap<T>> result = new ArrayList<WindowWrap<T>>(size);
for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
if (windowWrap == null || isWindowDeprecated(validTime, windowWrap)) {
continue;
}
result.add(windowWrap);
}
return result;
}
/**
* Get all buckets for entire sliding window including deprecated buckets.
*
* @return all buckets for entire sliding window
*/
public List<WindowWrap<T>> listAll() {
int size = array.length();
List<WindowWrap<T>> result = new ArrayList<WindowWrap<T>>(size);
for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
if (windowWrap == null) {
continue;
}
result.add(windowWrap);
}
return result;
}
/**
* Get aggregated value list for entire sliding window.
* The list will only contain value from "valid" buckets.
*
* @return aggregated value list for entire sliding window
*/
public List<T> values() {
return values(System.currentTimeMillis());
}
public List<T> values(long timeMillis) {
if (timeMillis < 0) {
return new ArrayList<T>();
}
int size = array.length();
List<T> result = new ArrayList<T>(size);
for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
continue;
}
result.add(windowWrap.value());
}
return result;
}
/**
* Get the valid "head" bucket of the sliding window for provided timestamp.
* Package-private for test.
*
* @param timeMillis a valid timestamp in milliseconds
* @return the "head" bucket if it exists and is valid; otherwise null
*/
WindowWrap<T> getValidHead(long timeMillis) {
// Calculate index for expected head time.
int idx = calculateTimeIdx(timeMillis + windowLengthInMs);
WindowWrap<T> wrap = array.get(idx);
if (wrap == null || isWindowDeprecated(wrap)) {
return null;
}
return wrap;
}
/**
* Get the valid "head" bucket of the sliding window at current timestamp.
*
* @return the "head" bucket if it exists and is valid; otherwise null
*/
public WindowWrap<T> getValidHead() {
return getValidHead(System.currentTimeMillis());
}
/**
* Get sample count (total amount of buckets).
*
* @return sample count
*/
public int getSampleCount() {
return sampleCount;
}
/**
* Get total interval length of the sliding window in milliseconds.
*
* @return interval in second
*/
public int getIntervalInMs() {
return intervalInMs;
}
/**
* Get total interval length of the sliding window.
*
* @return interval in second
*/
public double getIntervalInSecond() {
return intervalInSecond;
}
public void debug(long time) {
StringBuilder sb = new StringBuilder();
List<WindowWrap<T>> lists = list(time);
sb.append("Thread_").append(Thread.currentThread().getId()).append("_");
for (WindowWrap<T> window : lists) {
sb.append(window.windowStart()).append(":").append(window.value().toString());
}
System.out.println(sb.toString());
}
public long currentWaiting() {
// TODO: default method. Should remove this later.
return 0;
}
public void addWaiting(long time, int acquireCount) {
// Do nothing by default.
throw new UnsupportedOperationException();
}
}

View File

@ -0,0 +1,102 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.x.retry.common.core.window;
/**
* Wrapper entity class for a period of time window.
*
* @param <T> data type
* @author jialiang.linjl
* @author Eric Zhao
*/
public class WindowWrap<T> {
/**
* Time length of a single window bucket in milliseconds.
* 窗口的长度
*/
private final long windowLengthInMs;
/**
* Start timestamp of the window in milliseconds.
* 窗口的开始时间
*/
private long windowStart;
/**
* Statistic data.
* 需要的统计的数据
*/
private T value;
/**
* @param windowLengthInMs a single window bucket's time length in milliseconds.
* @param windowStart the start timestamp of the window
* @param value statistic data
*/
public WindowWrap(long windowLengthInMs, long windowStart, T value) {
this.windowLengthInMs = windowLengthInMs;
this.windowStart = windowStart;
this.value = value;
}
public long windowLength() {
return windowLengthInMs;
}
public long windowStart() {
return windowStart;
}
public T value() {
return value;
}
public void setValue(T value) {
this.value = value;
}
/**
* Reset start timestamp of current bucket to provided time.
*
* @param startTime valid start timestamp
* @return bucket after reset
*/
public WindowWrap<T> resetTo(long startTime) {
this.windowStart = startTime;
return this;
}
/**
* Check whether given timestamp is in current bucket.
*
* @param timeMillis valid timestamp in ms
* @return true if the given time is in current bucket, otherwise false
* @since 1.5.0
*/
public boolean isTimeInWindow(long timeMillis) {
return windowStart <= timeMillis && timeMillis < windowStart + windowLengthInMs;
}
@Override
public String toString() {
return "WindowWrap{" +
"windowLengthInMs=" + windowLengthInMs +
", windowStart=" + windowStart +
", value=" + value +
'}';
}
}