Sentinel 滑动窗口限流实现解析
Sentinel 是面向分布式服务框架的轻量级流量控制框架,主要以流量为切入点,从流量控制,熔断降级,系统负载保护等多个维度来维护系统的稳定性。
2012 年,Sentinel 诞生于阿里巴巴,其主要目标是流量控制。2013-2017 年,Sentinel 迅速发展,并成为阿里巴巴所有微服务的基本组成部分,涵盖了几乎所有核心电子商务场景。2018 年,Sentinel 演变为一个开源项目。
Sentinel 生态圈
Sentinel 特性
- 丰富的应用场景:Sentinel 承接了阿里巴巴近 10 年的双十一大促流量的核心场景,例如秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、集群流量控制、实时熔断下游不可用应用等。
- 完备的实时监控:Sentinel 同时提供实时的监控功能。您可以在控制台中看到接入应用的单台机器秒级数据,甚至 500 台以下规模的集群的汇总运行情况。
- 广泛的开源生态:Sentinel 提供开箱即用的与其它开源框架/库的整合模块,例如与 Spring Cloud、Apache Dubbo、gRPC、Quarkus 的整合。您只需要引入相应的依赖并进行简单的配置即可快速地接入 Sentinel。同时 Sentinel 提供 Java/Go/C++ 等多语言的原生实现。
- 完善的 SPI 扩展机制:Sentinel 提供简单易用、完善的 SPI 扩展接口。您可以通过实现扩展接口来快速地定制逻辑。例如定制规则管理、适配动态数据源等。
Sentinel 特性
滑动窗口介绍
-
窗口参数:
- intervalLength:区间长度,即统计的时间长度,决定了计算数据时统计的窗口个数
- windowLength:时间窗宽度
- count:时间窗内统计量
- startTime: 每个时间窗的起始时间,每个时间点都会归属于一个时间窗口,即会将时间轴按照时间窗宽度 windowLength 进行划分。每个时间窗都有一个起始时间 startTime。
-
获取某个时间的统计量
- 获取当前时间 currTime
- 根据 currTime - (已用时间窗).startTime < intervalLength,找到参与统计时间窗
- 对参与时间窗的统计量 count 进行求和即可得到当前时间的统计量
-
更新某个时间的统计量
- 获取当前时间 currTime
- 找到当前时间所属的时间窗,更新时间窗里面的 count
在整个时间轴上进行划分的,有无穷多个时间窗。但是在具体实现上是不可能表示出无穷个时间窗的,所以实现时会使用一个固定大小的时间窗数组。采用复用/循环存储时间窗的方式。依据:在某个时间点只需要统计某几个时间窗的数据即可,所以只保存需要的几个时间窗。 此时多了一个参数 sampleCount,数组的大小就是 sampleCount。关系:sampleCount=intervalLength / windowLength
更新某个时间点处的统计量:
- 获取当前时间 currTime
- 计算当前时间点所属时间窗在数组中位置 index = (currTime / windowLength) % sampleCount
- 获取时间窗 window = array[index]
- 判断时间窗是否已过期:currTime - 时间窗.startTime > intervalLength;已过期则重置时间窗口,即将里面的统计量 count 归零,然后累加 count;未过期直接累加 count。
获取某个时间点的统计量:
- 获取当前时间 currTime
- 遍历时间窗数组,判断时间窗是否该统计:currTime - 时间窗.startTime < intervalLength
流控架构
在 Sentinel 里面,所有的资源都对应一个资源名称( resourceName
),每次资源调用都会创建一个 Entry
对象。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 SphU
API 显式创建。Entry 创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责,例如:
-
NodeSelectorSlot
负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级; -
ClusterBuilderSlot
则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据; -
StatisticSlot
则用于记录、统计不同纬度的 runtime 指标监控信息; -
FlowSlot
则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制; -
AuthoritySlot
则根据配置的黑白名单和调用来源信息,来做黑白名单控制; -
DegradeSlot
则通过统计信息以及预设的规则,来做熔断降级; -
SystemSlot
则通过系统的状态,例如 load1 等,来控制总的入口流量;
StatisticSlot
是 Sentinel 的核心功能插槽之一,用于统计实时的调用数据。
-
clusterNode
:资源唯一标识的 ClusterNode 的 runtime 统计 -
origin
:根据来自不同调用者的统计信息 -
defaultnode
: 根据上下文条目名称和资源 ID 的 runtime 统计 - 入口流量的统计 Sentinel 底层采用高性能的滑动窗口数据结构
LeapArray
来统计实时的秒级指标数据,可以很好地支撑写多于读的高并发场景。
统计核心类
-
ArrayMetric 对外使用的类,隐藏了时间窗的具体实现,其有一个成员变量 LeapArray
-
LeapArray 时间窗的底层实现,里面有一个时间窗的数组,数组里面的元素为 WindowWrap,即时间窗
-
WindowWrap 时间窗,T 表示要统计的数据,为 MetricBucket
-
MetricBucket 统计量,里面包含了多个具体统计的变量,变量的"类型"由 MetrciEvent 决定
-
MetricEvent 统计量类型,和 MetricBucktet 里面保存的统计变量一一对应
类之间的关系
资源的实时统计类,有三种统计维度,秒级(rollingCounterInSecond),分钟级(rollingCounterInMinute),连接数级。通过 ArrayMetric、LeapArray 实现滑动窗口统计。
当第一次请求进来,Sentinel 会创建一个新的固定时间宽度窗口 bucket,记录运行时数据指标,例如 RT、QPS,BQ 等,时间宽度取决于 sample count。Sentinel 使用有效桶的统计来决定这个请求是否可以被传递。例如,如果一个规则定义只有 100 个请求可以通过,它将对所有有效桶中的 QPS 求和,并将其与规则中定义的阈值进行比较。请求持续进来,之前的窗口(buckets)可能失效,需要重置窗口数据。
核心实现:LeapArray
-
获取当前窗口, 分为 4 种场景:
- 1、 如果旧的 bucket 不存在,那么我们在 windowStart 处创建一个新的 bucket,然后尝试通过 CAS 操作更新圆形数组。只有一个线程可以成功更新,而其他线程产生它的时间片。
- 2、如果当前 windowStart 等于旧桶的开始时间戳,表示时间在桶内,所以直接返回桶。
- 3、如果旧桶的开始时间戳落后于所提供的时间,这意味着桶已弃用。我们必须将桶重置为当前的 windowStart。注意重置和清理操作很难是原子的,所以我们需要一个更新锁来保证桶更新的正确性。更新锁是有条件的(小范围),将生效,只有当 bucket 已弃用,所以在大多数情况下它不会导致性能损失。
- 4、不应该通过这里,因为提供的时间已经落后了,一般是时钟回拨导致的。
/**
* Get bucket item at provided timestamp.
*
* @param timeMillis a valid timestamp in milliseconds
* @return current bucket item at provided timestamp if the time is valid; null if time is invalid
*/
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis);
// Calculate current bucket start time.
long windowStart = calculateWindowStart(timeMillis);
/*
* Get bucket item at given time from the array.
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
* (2) Bucket is up-to-date, then just return the bucket.
* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
*/
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* bucket is empty, so create new and update
*
* If the old bucket is absent, then we create a new bucket at {@code windowStart},
* then try to update circular array via a CAS operation. Only one thread can
* succeed to update, while other threads yield its time slice.
*/
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* startTime of Bucket 3: 800, so it's up-to-date
*
* If current {@code windowStart} is equal to the start timestamp of old bucket,
* that means the time is within the bucket, so directly return the bucket.
*/
return old;
} else if (windowStart > old.windowStart()) {
/*
* (old)
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* startTime of Bucket 2: 400, deprecated, should be reset
*
* If the start timestamp of old bucket is behind provided time, that means
* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
* Note that the reset and clean-up operations are hard to be atomic,
* so we need a update lock to guarantee the correctness of bucket update.
*
* The update lock is conditional (tiny scope) and will take effect only when
* bucket is deprecated, so in most cases it won't lead to performance loss.
*/
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
- 获取数组索引位置,和窗口开始时间
// 例如:sampleCount=60,intervalInMs=60*1000,curTime= 1640931929894
// timeId = 1640931929894/1000 = 1640931929;
// idx = 1640931929%60 = 29;
// 计算环形数组索引位置,当前时间毫秒数对窗口时间宽度(windowLengthInMs = intervalInMs / sampleCount)取整,在对数组长度(array.length()=sampleCount)取模得到数组下标
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
// Calculate current index so we can map the timestamp to the leap array.
return (int)(timeId % array.length());
}
例如:curTime= 1640931929894
windowStart = 1640931929894-1640931929894%1000 = 1640931929000
// 计算窗口的开始时间
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
- 获取当前所有窗口统计值
public List<T> values(long timeMillis) {
if (timeMillis < 0) {
return new ArrayList<T>();
}
int size = array.length();
List<T> result = new ArrayList<T>(size);
for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
continue;
}
result.add(windowWrap.value());
}
return result;
}
// 核心是:isWindowDeprecated 窗口是否失效,当前时间减去旧窗口开始时间如果大于总的窗口间隔时间,就认为当前窗口已失效
// 例如:sampleCount=60,intervalInMs=60*1000,curTime= 1640931929894
// 1640931929894 - 1640931919000 > 60*1000 = 60894 > 60000 = true,相当于过了 60s 了,之前所有的都是失效的窗口
public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
return time - windowWrap.windowStart() > intervalInMs;
}
滑动窗口的使用场景
FlowRuleManager
流控规则管理,统计每分钟指标,也是采用 LeapArray 环形数组方式维护,分为 60 个是 slot,每个窗口时间间隔 1 秒。
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("sentinel-metrics-record-task", true));
TimeUtil
毫秒数获取工具,当系统空闲时使用系统时间,系统繁忙或时间获取不到使用 sentinel 维护的时间,也是采用 LeapArray 环形数组方式维护,3 个 slot,时间滑动窗口为 1 秒。
public TimeUtil() {
this.statistics = new LeapArray<TimeUtil.Statistic>(3, 3000) {
@Override
public Statistic newEmptyBucket(long timeMillis) {
return new Statistic();
}
@Override
protected WindowWrap<Statistic> resetWindowTo(WindowWrap<Statistic> windowWrap, long startTime) {
Statistic val = windowWrap.value();
val.getReads().reset();
val.getWrites().reset();
windowWrap.resetTo(startTime);
return windowWrap;
}
};
this.currentTimeMillis = System.currentTimeMillis();
this.lastCheck = this.currentTimeMillis;
Thread daemon = new Thread(this);
daemon.setDaemon(true);
daemon.setName("sentinel-time-tick-thread");
daemon.start();
}
业务限流实现
限流实现,统计秒级别指标,进行限流等其他控制,采用 LeapArray 环形数组方式维护,2 个 slot,时间滑动窗口为 500 毫秒。
p (PASS)通过请求数 b (BLOCK)阻塞请求数,限流数 w(OCCUPIED_PASS)在未来的配额中通过数
sampleCount=2,intervalInMs=1*1000, timeMillis= 1640866390362 ---> (2021-12-30 20:13:10)
当前数组索引:
idx = (1640866390362/500)%2 = 0; 当前窗口起始时间: windowStart = 1640866390362-1640866390362%500 = 1640866390000
old = array[idx] = array[0]; old.windowStart() = 1640866348000 ---> (2021-12-30 20:12:28) windowStart > old.windowStart() ===> 重置当前窗口值 ===> resetWindowTo(old, windowStart);
实现借鉴
- MetricBucket 类使用 LongAdder 来做计数统计,相较于 AtomicLong 等性能更高(减少乐观锁的重试次数)。
- 滑动窗口使用环形数组(LeapArray)重复利用 WindowWrap,避免大量重复创建对象,减少 ygc 压力。
对比:Hystrix 限流实现
Hystrix 的滑动窗口实现类 HystrixRollingNumber,在 hystrix 中,一个滑动窗口,包含若干个桶(默认是 10 个),每个桶保存一定时间间隔内的统计数据(默认是 1s)。
每个矩形框代表一个桶,每个桶记录着 1 秒内的 4 个指标数据:成功量、失败量、超时量、拒绝量。这 10 个桶合起来就是一个完整的滑动窗口。
从业务上讲,值得注意的是:桶对象有一个不可变的属性-windowStart,它表明该桶对象用来保存[windowStart, windowStart + bucketSizeInMillseconds)时间段内的统计信息。
从技术来讲,值得注意的是:因为每个桶都会被多个线程并发地更新指标数据,所以桶对象需要提供一些线程安全的数据结构和更新方法。为此,hystrix 大量使用了 CAS,而不是锁。
hystrix 使用一个环形数组来维护这些桶,并且它的环形数组的实现类似于一个 FIFO 的队列。该数组实现有一个叫 addLast(Bucket o)的方法,用于向环形数组的末尾追加新的桶对象,当数组中的元素个数没超过最大大小时,只是简单的维护尾指针;否则,在维护尾指针时,还要通过维护首指针,将第一个位置上元素剔除掉。可以看出,该环形数组的表现类似于 FIFO 队列。
总结
本章主要介绍 Sentinel 由来及发展,浅析了 sentinel 限流滑动窗口的实现 LeapArray 数据结构设计,对比了 Hystrix 限流实现。
参考:
https://github.com/alibaba/Sentinel/wiki/%E4%BB%8B%E7%BB%8D
https://cloud.tencent.com/developer/article/1815838
推荐阅读
招贤纳士
政采云技术团队(Zero),一个富有激情、创造力和执行力的团队,Base 在风景如画的杭州。团队现有300多名研发小伙伴,既有来自阿里、华为、网易的“老”兵,也有来自浙大、中科大、杭电等校的新人。团队在日常业务开发之外,还分别在云原生、区块链、人工智能、低代码平台、中间件、大数据、物料体系、工程平台、性能体验、可视化等领域进行技术探索和实践,推动并落地了一系列的内部技术产品,持续探索技术的新边界。此外,团队还纷纷投身社区建设,目前已经是 google flutter、scikit-learn、Apache Dubbo、Apache Rocketmq、Apache Pulsar、CNCF Dapr、Apache DolphinScheduler、alibaba Seata 等众多优秀开源社区的贡献者。如果你想改变一直被事折腾,希望开始折腾事;如果你想改变一直被告诫需要多些想法,却无从破局;如果你想改变你有能力去做成那个结果,却不需要你;如果你想改变你想做成的事需要一个团队去支撑,但没你带人的位置;如果你想改变本来悟性不错,但总是有那一层窗户纸的模糊……如果你相信相信的力量,相信平凡人能成就非凡事,相信能遇到更好的自己。如果你希望参与到随着业务腾飞的过程,亲手推动一个有着深入的业务理解、完善的技术体系、技术创造价值、影响力外溢的技术团队的成长过程,我觉得我们该聊聊。任何时间,等着你写点什么,发给 [email protected]
微信公众号
文章同步发布,政采云技术团队公众号,欢迎关注