Spring Cloud Gateway(限流) | Wind Mt

标签: | 发表时间:2020-02-15 18:49 | 作者:
出处:https://windmt.com

game of thrones

绝境长城(冰与火之歌)

在高并发的应用中, 限流是一个绕不开的话题。限流可以保障我们的 API 服务对所有用户的可用性,也可以防止网络攻击。

一般开发高并发系统常见的限流有:限制总并发数(比如数据库连接池、线程池)、限制瞬时并发数(如 nginx 的 limit_conn 模块,用来限制瞬时并发连接数)、限制时间窗口内的平均速率(如 Guava 的 RateLimiter、nginx 的 limit_req 模块,限制每秒的平均速率);其他还有如限制远程接口调用速率、限制 MQ 的消费速率。另外还可以根据网络连接数、网络流量、CPU 或内存负载等来限流。

本文详细探讨在 Spring Cloud Gateway 中如何实现限流。

限流算法

做限流 (Rate Limiting/Throttling) 的时候,除了简单的控制并发,如果要准确的控制 TPS,简单的做法是维护一个单位时间内的 Counter,如判断单位时间已经过去,则将 Counter 重置零。此做法被认为没有很好的处理单位时间的边界,比如在前一秒的最后一毫秒里和下一秒的第一毫秒都触发了最大的请求数,也就是在两毫秒内发生了两倍的 TPS。

常用的更平滑的限流算法有两种:漏桶算法和令牌桶算法。很多传统的服务提供商如华为中兴都有类似的专利,参考 采用令牌漏桶进行报文限流的方法

漏桶算法

漏桶( Leaky Bucket)算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率),当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率。

Leaky Bucket

可见这里有两个变量,一个是桶的大小,支持流量突发增多时可以存多少的水(burst),另一个是水桶漏洞的大小(rate)。因为漏桶的漏出速率是固定的参数,所以,即使网络中不存在资源冲突(没有发生拥塞),漏桶算法也不能使流突发(burst)到端口速率。因此,漏桶算法对于存在突发特性的流量来说缺乏效率。

令牌桶算法

令牌桶算法(Token Bucket)和 Leaky Bucket 效果一样但方向相反的算法,更加容易理解。随着时间流逝,系统会按恒定 1/QPS 时间间隔(如果 QPS=100,则间隔是 10ms)往桶里加入 Token(想象和漏洞漏水相反,有个水龙头在不断的加水),如果桶已经满了就不再加了。新请求来临时,会各自拿走一个 Token,如果没有 Token 可拿了就阻塞或者拒绝服务。

Token Bucket

令牌桶的另外一个好处是可以方便的改变速度。一旦需要提高速率,则按需提高放入桶中的令牌的速率。一般会定时(比如 100 毫秒)往桶中增加一定数量的令牌,有些变种算法则实时的计算应该增加的令牌的数量。

Guava 中的 RateLimiter 采用了令牌桶的算法,设计思路参见 How is the RateLimiter designed, and why?,详细的算法实现参见 源码

Leakly Bucket vs Token Bucket

对比项 Leakly bucket Token bucket Token bucket 的备注
依赖 token
立即执行 有足够的 token 才能执行
堆积 token
速率恒定 可以大于设定的 QPS

限流实现

在 Gateway 上实现限流是个不错的选择,只需要编写一个过滤器就可以了。有了前边过滤器的基础,写起来很轻松。(如果你对 Spring Cloud Gateway 的过滤器还不了解,请先看 这里

我们这里采用令牌桶算法,Google Guava 的 RateLimiterBucket4jRateLimitJ都是一些基于此算法的实现,只是他们支持的 back-ends(JCache、Hazelcast、Redis 等)不同罢了,你可以根据自己的技术栈选择相应的实现。

这里我们使用 Bucket4j,引入它的依赖坐标,为了方便顺便引入 Lombok

1            
2
3
4
5
6
7
8
9
10
11
12
<dependency>            
<groupId>com.github.vladimir-bukhtoyarov</groupId>
<artifactId>bucket4j-core</artifactId>
<version>4.0.0</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.20</version>
<scope>provided</scope>
</dependency>

我们来实现具体的过滤器

1            
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@CommonsLog            
@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
publicclassRateLimitByIpGatewayFilterimplementsGatewayFilter,Ordered{

intcapacity;
intrefillTokens;
Duration refillDuration;

privatestaticfinalMap<String,Bucket> CACHE =newConcurrentHashMap<>();

privateBucketcreateNewBucket(){
Refill refill = Refill.of(refillTokens,refillDuration);
Bandwidth limit = Bandwidth.classic(capacity,refill);
returnBucket4j.builder().addLimit(limit).build();
}

@Override
publicMono<Void>filter(ServerWebExchange exchange,GatewayFilterChain chain){
// if (!enableRateLimit){
// return chain.filter(exchange);
// }
String ip = exchange.getRequest().getRemoteAddress().getAddress().getHostAddress();
Bucket bucket = CACHE.computeIfAbsent(ip,k -> createNewBucket());

log.debug("IP: "+ ip +",TokenBucket Available Tokens: "+ bucket.getAvailableTokens());
if(bucket.tryConsume(1)) {
returnchain.filter(exchange);
}else{
exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
returnexchange.getResponse().setComplete();
}
}

@Override
publicintgetOrder(){
return-1000;
}

}

通过对令牌桶算法的了解,我们知道需要定义三个变量:

  • capacity:桶的最大容量,即能装载 Token 的最大数量
  • refillTokens:每次 Token 补充量
  • refillDuration:补充 Token 的时间间隔

在这个实现中,我们使用了 IP 来进行限制,当达到最大流量就返回 429错误。这里我们简单使用一个 Map 来存储 bucket,所以也决定了它只能单点使用,如果是分布式的话,可以采用 Hazelcast 或 Redis 等解决方案。

在 Route 中我们添加这个过滤器,这里指定了 bucket 的容量为 10 且每一秒会补充 1 个 Token。

1            
2
3
4
5
6
7
.route(r -> r.path("/throttle/customer/**")            
.filters(f -> f.stripPrefix(2)
.filter(newRateLimitByIpGatewayFilter(10,1,Duration.ofSeconds(1))))
.uri("lb://CONSUMER")
.order(0)
.id("throttle_customer_service")
)

启动服务并多次快速刷新改接口,就会看到 Tokens 的数量在不断减小,等一会又会增加上来

1            
2
3
4
2018-05-09 15:42:08.601 DEBUG 96278 --- [ctor-http-nio-2] com.windmt.filter.RateLimitByIpGatewayFilter  : IP: 0:0:0:0:0:0:0:1,TokenBucket Available Tokens: 2            
2018-05-09 15:42:08.958 DEBUG 96278 --- [ctor-http-nio-2] com.windmt.filter.RateLimitByIpGatewayFilter : IP: 0:0:0:0:0:0:0:1,TokenBucket Available Tokens: 1
2018-05-09 15:42:09.039 DEBUG 96278 --- [ctor-http-nio-2] com.windmt.filter.RateLimitByIpGatewayFilter : IP: 0:0:0:0:0:0:0:1,TokenBucket Available Tokens: 0
2018-05-09 15:42:10.201 DEBUG 96278 --- [ctor-http-nio-2] com.windmt.filter.RateLimitByIpGatewayFilter : IP: 0:0:0:0:0:0:0:1,TokenBucket Available Tokens: 1

RequestRateLimiter

刚刚我们通过过滤器实现了限流的功能,你可能在想为什么不直接创建一个过滤器工厂呢,那样多方便。这是因为 Spring Cloud Gateway 已经内置了一个 RequestRateLimiterGatewayFilterFactory,我们可以直接使用(这里有坑,后边详说)。

目前 RequestRateLimiterGatewayFilterFactory的实现依赖于 Redis,所以我们还要引入 spring-boot-starter-data-redis-reactive

1            
2
3
4
<dependency>            
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>

因为这里有坑,所以把 application.yml 的配置再全部贴一遍,新增的部分我已经用 # ---标出来了

1            
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
spring:            
application:
name:cloud-gateway
cloud:
gateway:
discovery:
locator:
enabled:true
routes:
-id:service_customer
uri:lb://CONSUMER
order:0
predicates:
-Path=/customer/**
filters:
-StripPrefix=1
# -------
-name:RequestRateLimiter
args:
key-resolver:'#{@remoteAddrKeyResolver}'
redis-rate-limiter.replenishRate:1
redis-rate-limiter.burstCapacity:5
# -------
-AddResponseHeader=X-Response-Default-Foo,Default-Bar
default-filters:
-Elapsed=true
# -------
redis:
host:localhost
port:6379
database:0
# -------
server:
port:10000
eureka:
client:
service-url:
defaultZone:http://localhost:7000/eureka/
logging:
level:
org.springframework.cloud.gateway:debug
com.windmt.filter:debug

默认情况下,是基于 令牌桶算法实现的限流,有个三个参数需要配置:

  • burstCapacity,令牌桶容量。
  • replenishRate,令牌桶每秒填充平均速率。
  • key-resolver,用于限流的键的解析器的 Bean 对象名字(有些绕,看代码吧)。它使用 SpEL 表达式根据 #{@beanName}从 Spring 容器中获取 Bean 对象。默认情况下,使用 PrincipalNameKeyResolver,以请求认证的 java.security.Principal作为限流键。

关于 filters的那段配置格式,参考 这里

我们实现一个使用请求 IP 作为限流键的 KeyResolver

1            
2
3
4
5
6
7
8
9
publicclassRemoteAddrKeyResolverimplementsKeyResolver{            
publicstaticfinalString BEAN_NAME ="remoteAddrKeyResolver";

@Override
publicMono<String>resolve(ServerWebExchange exchange){
returnMono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress());
}

}

配置 RemoteAddrKeyResolverBean 对象

1            
2
3
4
@Bean(name = RemoteAddrKeyResolver.BEAN_NAME)            
publicRemoteAddrKeyResolverremoteAddrKeyResolver(){
returnnewRemoteAddrKeyResolver();
}

以上就是代码部分,我们还差一个 Redis,我就本地用 docker 来快速启动了

1            
docker run --name redis -p 6379:6379 -d redis            

万事俱备,只欠测试了。以上的代码的和配置都是 OK 的,可以自行测试。下面来说一下这里边的坑。

遇到的坑

配置不生效

参考这个 issue

No Configuration found for route

这个异常信息如下:

1            
2
java.lang.IllegalArgumentException: No Configuration found for route service_customer            
at org.springframework.cloud.gateway.filter.ratelimit.RedisRateLimiter.isAllowed(RedisRateLimiter.java:93) ~[spring-cloud-gateway-core-2.0.0.RC1.jar:2.0.0.RC1]

出现在将 RequestRateLimiter 配置为 defaultFilters 的情况下,比如像这样

1            
2
3
4
5
6
default-filters:            
-name:RequestRateLimiter
args:
key-resolver:'#{@remoteAddrKeyResolver}'
redis-rate-limiter.replenishRate:1
redis-rate-limiter.burstCapacity:5

这时候就会导致这个异常。我通过分析源码,发现了一些端倪,感觉像是一个 bug,已经提交了 issue

我们从异常入手来看, RedisRateLimiter#isAllowed这个方法要获取 routeId 对应的 routerConfig,如果获取不到就抛出刚才我们看到的那个异常。

1            
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
publicMono<Response>isAllowed(String routeId,String id){            
if(!this.initialized.get()) {
thrownewIllegalStateException("RedisRateLimiter is not initialized");
}
// 只为 defaultFilters 配置 RequestRateLimiter 的时候
// config map 里边的 key 只有 "defaultFilters"
// 但是我们实际请求的 routeId 为 "customer_service"
Config routeConfig = getConfig().get(routeId);

if(routeConfig ==null) {
if(defaultConfig ==null) {
thrownewIllegalArgumentException("No Configuration found for route "+ routeId);
}
routeConfig = defaultConfig;
}

// 省略若干代码...
}

既然这里要 get,那必然有个地方要 put。put 的相关代码在 AbstractRateLimiter#onApplicationEvent这个方法。

1            
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override            
publicvoidonApplicationEvent(FilterArgsEvent event){
Map<String,Object> args = event.getArgs();

// hasRelevantKey 检查 args 是否包含 configurationPropertyName
// 只有 defaultFilters 包含
if(args.isEmpty() || !hasRelevantKey(args)) {
return;
}

String routeId = event.getRouteId();
C routeConfig = newConfig();
ConfigurationUtils.bind(routeConfig,args,
configurationPropertyName,configurationPropertyName,validator);
getConfig().put(routeId,routeConfig);
}

privatebooleanhasRelevantKey(Map<String,Object> args){
returnargs.keySet().stream()
.anyMatch(key -> key.startsWith(configurationPropertyName +"."));
}

上边的 args 里是是配置参数的键值对,比如我们之前自定义的过滤器工厂 Elapsed,有个参数 withParams,这里就是 withParams=true。关键代码在第 7 行, hasRelevantKey方法用于检测 args 里边是否包含 configurationPropertyName.,具体到本例就是是否包含 redis-rate-limiter.。悲剧就发生在这里,因为我们只为 defaultFilters 配置了相关 args,注定其他的 route 到这里就直接 return 了。

现在不清楚这是 bug 还是设计者有意为之,等答复吧。

基于系统负载的动态限流

在实际工作中,我们可能还需要根据网络连接数、网络流量、CPU 或内存负载等来进行动态限流。在这里我们以 CPU 为栗子。

我们需要借助 Spring Boot Actuator 提供的 Metrics 能力进行实现基于 CPU 的限流——当 CPU 使用率高于某个阈值就开启限流,否则不开启限流。

我们在项目中引入 Actuator 的依赖坐标

1            
2
3
4
<dependency>            
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

因为 Spring Boot 2.x 之后,Actuator 被重新设计了,和 1.x 的区别还是挺大的(参考 这里)。我们先在配置中设置 management.endpoints.web.exposure.include=*来观察一下新的 Metrics 的能力

http://localhost:10000/actuator/metrics

1            
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
{            
"names": [
"jvm.buffer.memory.used",
"jvm.memory.used",
"jvm.buffer.count",
"jvm.gc.memory.allocated",
"logback.events",
"process.uptime",
"jvm.memory.committed",
"system.load.average.1m",
"jvm.gc.pause",
"jvm.gc.max.data.size",
"jvm.buffer.total.capacity",
"jvm.memory.max",
"system.cpu.count",
"system.cpu.usage",
"process.files.max",
"jvm.threads.daemon",
"http.server.requests",
"jvm.threads.live",
"process.start.time",
"jvm.classes.loaded",
"jvm.classes.unloaded",
"jvm.threads.peak",
"jvm.gc.live.data.size",
"jvm.gc.memory.promoted",
"process.files.open",
"process.cpu.usage"
]
}

我们可以利用里边的系统 CPU 使用率 system.cpu.usage

http://localhost:10000/actuator/metrics/system.cpu.usage

1            
2
3
4
5
6
7
8
9
10
{            
"name":"system.cpu.usage",
"measurements": [
{
"statistic":"VALUE",
"value":0.5189003436426117
}
],
"availableTags": []
}

最近一分钟内的平均负载 system.load.average.1m也是一样的

http://localhost:10000/actuator/metrics/system.load.average.1m

1            
2
3
4
5
6
7
8
9
10
{            
"name":"system.load.average.1m",
"measurements": [
{
"statistic":"VALUE",
"value":5.33203125
}
],
"availableTags": []
}

知道了 Metrics 提供的指标,我们就来看在代码里具体怎么实现吧。Actuator 2.x 里边已经没有了之前 1.x 里边提供的 SystemPublicMetrics,但是经过阅读源码可以发现 MetricsEndpoint这个类可以提供类似的功能。就用它来撸代码吧

1            
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@CommonsLog            
@Component
publicclassRateLimitByCpuGatewayFilterimplementsGatewayFilter,Ordered{

@Autowired
privateMetricsEndpoint metricsEndpoint;

privatestaticfinalString METRIC_NAME ="system.cpu.usage";
privatestaticfinaldoubleMAX_USAGE =0.50D;

@Override
publicMono<Void>filter(ServerWebExchange exchange, GatewayFilterChain chain){
// if (!enableRateLimit){
// return chain.filter(exchange);
// }
Double systemCpuUsage = metricsEndpoint.metric(METRIC_NAME,null)
.getMeasurements()
.stream()
.filter(Objects::nonNull)
.findFirst()
.map(MetricsEndpoint.Sample::getValue)
.filter(Double::isFinite)
.orElse(0.0D);

booleanok = systemCpuUsage < MAX_USAGE;

log.debug("system.cpu.usage: "+ systemCpuUsage +" ok: "+ ok);

if(!ok) {
exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
returnexchange.getResponse().setComplete();
}else{
returnchain.filter(exchange);
}
}

@Override
publicintgetOrder(){
return0;
}

}

配置 Route

1            
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Autowired            
privateRateLimitByCpuGatewayFilter rateLimitByCpuGatewayFilter;

@Bean
publicRouteLocatorcustomerRouteLocator(RouteLocatorBuilder builder){
// @formatter:off
returnbuilder.routes()
.route(r -> r.path("/throttle/customer/**")
.filters(f -> f.stripPrefix(2)
.filter(rateLimitByCpuGatewayFilter))
.uri("lb://CONSUMER")
.order(0)
.id("throttle_customer_service")
)
.build();
// @formatter:on
}

至于效果嘛,自己试试吧。因为 CPU 的使用率一般波动较大,测试效果还是挺明显的,实际使用就得慎重了。

示例代码可以从 Github 获取: https://github.com/zhaoyibo/spring-cloud-study

改进与提升

实际项目中,除以上实现的限流方式,还可能会:一、在上文的基础上,增加配置项,控制每个路由的限流指标,并实现动态刷新,从而实现更加灵活的管理。二、实现不同维度的限流,例如:

  • 对请求的目标 URL 进行限流(例如:某个 URL 每分钟只允许调用多少次)
  • 对客户端的访问 IP 进行限流(例如:某个 IP 每分钟只允许请求多少次)
  • 对某些特定用户或者用户组进行限流(例如:非 VIP 用户限制每分钟只允许调用 100 次某个 API 等)
  • 多维度混合的限流。此时,就需要实现一些限流规则的编排机制(与、或、非等关系)

参考

Token bucket
RequestRateLimiter GatewayFilter Factory
Scaling your API with rate limiters
Scaling your API with rate limiters 译文
Spring Boot Actuator Web API Documentation
https://github.com/nereuschen/blog/issues/37
http://www.itmuch.com/spring-cloud-sum/spring-cloud-ratelimit

相关 [spring cloud gateway] 推荐:

Spring Cloud Gateway(限流) | Wind Mt

- -
限流可以保障我们的 API 服务对所有用户的可用性,也可以防止网络攻击. 一般开发高并发系统常见的限流有:限制总并发数(比如数据库连接池、线程池)、限制瞬时并发数(如 nginx 的 limit_conn 模块,用来限制瞬时并发连接数)、限制时间窗口内的平均速率(如 Guava 的 RateLimiter、nginx 的 limit_req 模块,限制每秒的平均速率);其他还有如限制远程接口调用速率、限制 MQ 的消费速率.

快速突击 Spring Cloud Gateway

- - 掘金后端
认识 Spring Cloud Gateway. Spring Cloud Gateway 是一款基于 Spring 5,Project Reactor 以及 Spring Boot 2 构建的 API 网关,是 Spring Cloud 微服务生态的主要组件之一. Spring Cloud Gateway 主要负责接口请求的路由分发,并且支持对请求的安全验证,流量监控和流量控制等扩展操作.

SpringCloud实战十三:Gateway之 Spring Cloud Gateway 动态路由_zhuyu19911016520-CSDN博客

- -
前面分别对 Spring Cloud Zuul 与 Spring Cloud Gateway 进行了简单的说明,它门是API网关,API网关负责服务请求路由、组合及协议转换,客户端的所有请求都首先经过API网关,然后由它将匹配的请求路由到合适的微服务,是系统流量的入口,在实际生产环境中为了保证高可靠和高可用,尽量避免重启,如果有新的服务要上线时,可以通过动态路由配置功能上线.

万字 Spring Cloud Gateway 2.0,面向未来的技术,了解一下?

- - IT瘾-dev
原创:小姐姐味道(微信公众号ID:xjjdog),欢迎分享,转载请保留出处. 本文将从知识拓扑讲起,谈一下api网关的功能,以及spring cloud gateway的使用方法. 一、知识拓扑 (使用和原理). 三、Predicate,路由匹配. 为什么很多人觉得spring cloud gateway难用.

apache/apisix: The Cloud-Native API Gateway

- -
Health Checks: Enable health check on the upstream node and will automatically filter unhealthy nodes during load balancing to ensure system stability..

大话 Spring Cloud

- - IT瘾-dev
研究了一段时间spring boot了准备向spirng cloud进发,公司架构和项目也全面拥抱了Spring Cloud. 在使用了一段时间后发现Spring Cloud从技术架构上降低了对大型系统构建的要求,使我们以非常低的成本(技术或者硬件)搭建一套高效、分布式、容错的平台,但Spring Cloud也不是没有缺点,小型独立的项目不适合使用.

Spring Cloud限流详解 | Spring Cloud|周立

- -
限流往往是一个绕不开的话题. 本文详细探讨在Spring Cloud中如何实现限流. Zuul上实现限流是个不错的选择,只需要编写一个过滤器就可以了,关键在于如何实现限流的算法. 常见的限流算法有漏桶算法以及令牌桶算法. https://www.cnblogs.com/LBSer/p/4083131.html,写得通俗易懂,你值得拥有,我就不拽文了.

Spring Cloud Kubernetes指南

- -
当我们构建微服务解决方案时,SpringCloud和Kubernetes都是最佳解决方案,因为它们为解决最常见的挑战提供组件. 但是,如果我们决定选择Kubernetes作为我们的解决方案的主要容器管理器和部署平台,我们仍然可以主要通过SpringCloudKubernetes项目使用SpringCloud的有趣特性.

Spring Cloud 快速入门

- - IT瘾-tuicool
Spring Cloud 是一套完整的微服务解决方案,基于 Spring Boot 框架,准确的说,它不是一个框架,而是一个大的容器,它将市面上较好的微服务框架集成进来,从而简化了开发者的代码量. 本课程由浅入深带领大家一步步攻克 Spring Cloud 各大模块,接着通过一个实例带领大家了解大型分布式微服务架构的搭建过程,最后深入源码加深对它的了解.

Deploy the spring cloud project using jenkins

- - Telami
先简单记录下Jenkins部署maven聚合工程要点. Root pom配置成项目根目录的pom.xml. maven命令单独install 欲构建的项目. 选项后可跟随{groupId}:{artifactId}或者所选模块的相对路径(多个模块以逗号分隔). 表示同时处理选定模块所依赖的模块. 表示同时处理依赖选定模块的模块.