sbc(七)分布式限流

标签: sbc Distributed Tools Distributed Lock | 发表时间:2018-04-28 01:03 | 作者:
出处:http://crossoverjie.top/

前言

本文接着上文 应用限流进行讨论。

之前谈到的限流方案只能针对于单个 JVM 有效,也就是单机应用。而对于现在普遍的分布式应用也得有一个分布式限流的方案。

基于此尝试写了这个组件:

https://github.com/crossoverJie/distributed-redis-tool

DEMO

以下采用的是

https://github.com/crossoverJie/springboot-cloud

来做演示。

在 Order 应用提供的接口中采取了限流。首先是配置了限流工具的 Bean:

     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
     
@Configuration
public class RedisLimitConfig {
@Value("${redis.limit}")
private int limit;
@Autowired
private JedisConnectionFactory jedisConnectionFactory;
@Bean
public RedisLimit build() {
RedisClusterConnection clusterConnection = jedisConnectionFactory.getClusterConnection();
JedisCluster jedisCluster = (JedisCluster) clusterConnection.getNativeConnection();
RedisLimit redisLimit = new RedisLimit.Builder<>(jedisCluster)
.limit(limit)
.build();
return redisLimit;
}
}

接着在 Controller 使用组件:

     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
     
@Autowired
private RedisLimit redisLimit ;
@Override
@CheckReqNo
public BaseResponse<OrderNoResVO> getOrderNo(@RequestBody OrderNoReqVO orderNoReq) {
BaseResponse<OrderNoResVO> res = new BaseResponse();
//限流
boolean limit = redisLimit.limit();
if (!limit){
res.setCode(StatusEnum.REQUEST_LIMIT.getCode());
res.setMessage(StatusEnum.REQUEST_LIMIT.getMessage());
return res ;
}
res.setReqNo(orderNoReq.getReqNo());
if (null == orderNoReq.getAppId()){
throw new SBCException(StatusEnum.FAIL);
}
OrderNoResVO orderNoRes = new OrderNoResVO() ;
orderNoRes.setOrderId(DateUtil.getLongTime());
res.setCode(StatusEnum.SUCCESS.getCode());
res.setMessage(StatusEnum.SUCCESS.getMessage());
res.setDataBody(orderNoRes);
return res ;
}

为了方便使用,也提供了注解:

     
1
2
3
4
5
6
7
     
@Override
@ControllerLimit
public BaseResponse<OrderNoResVO> getOrderNoLimit(@RequestBody OrderNoReqVO orderNoReq) {
BaseResponse<OrderNoResVO> res = new BaseResponse();
// 业务逻辑
return res ;
}

该注解拦截了 http 请求,会再请求达到阈值时直接返回。

普通方法也可使用:

     
1
2
     
@CommonLimit
public void doSomething(){}

会在调用达到阈值时抛出异常。

为了模拟并发,在 User 应用中开启了 10 个线程调用 Order( 限流次数为5) 接口(也可使用专业的并发测试工具 JMeter 等)。

     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
     
@Override
public BaseResponse<UserResVO> getUserByFeign(@RequestBody UserReqVO userReq) {
//调用远程服务
OrderNoReqVO vo = new OrderNoReqVO();
vo.setAppId(1L);
vo.setReqNo(userReq.getReqNo());
for (int i = 0; i < 10; i++) {
executorService.execute(new Worker(vo, orderServiceClient));
}
UserRes userRes = new UserRes();
userRes.setUserId(123);
userRes.setUserName("张三");
userRes.setReqNo(userReq.getReqNo());
userRes.setCode(StatusEnum.SUCCESS.getCode());
userRes.setMessage("成功");
return userRes;
}
private static class Worker implements Runnable {
private OrderNoReqVO vo;
private OrderServiceClient orderServiceClient;
public Worker(OrderNoReqVO vo, OrderServiceClient orderServiceClient) {
this.vo = vo;
this.orderServiceClient = orderServiceClient;
}
@Override
public void run() {
BaseResponse<OrderNoResVO> orderNo = orderServiceClient.getOrderNoCommonLimit(vo);
logger.info("远程返回:" + JSON.toJSONString(orderNo));
}
}

为了验证分布式效果启动了两个 Order 应用。

效果如下:

实现原理

实现原理其实很简单。既然要达到分布式全局限流的效果,那自然需要一个第三方组件来记录请求的次数。

其中 Redis 就非常适合这样的场景。

  • 每次请求时将当前时间(精确到秒)作为 Key 写入到 Redis 中,超时时间设置为 2 秒,Redis 将该 Key 的值进行自增。
  • 当达到阈值时返回错误。
  • 写入 Redis 的操作用 Lua 脚本来完成,利用 Redis 的单线程机制可以保证每个 Redis 请求的原子性。

Lua 脚本如下:

     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
     
--lua 下标从 1 开始
-- 限流 key
local key = KEYS[1]
-- 限流大小
local limit = tonumber(ARGV[1])
-- 获取当前流量大小
local curentLimit = tonumber(redis.call('get', key) or "0")
if curentLimit + 1 > limit then
-- 达到限流大小 返回
return 0;
else
-- 没有达到阈值 value + 1
redis.call("INCRBY", key, 1)
redis.call("EXPIRE", key, 2)
return curentLimit + 1
end

Java 中的调用逻辑:

     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
     
public boolean limit() {
String key = String.valueOf(System.currentTimeMillis() / 1000);
Object result = null;
if (jedis instanceof Jedis) {
result = ((Jedis) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit)));
} else if (jedis instanceof JedisCluster) {
result = ((JedisCluster) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit)));
} else {
//throw new RuntimeException("instance is error") ;
return false;
}
if (FAIL_CODE != (Long) result) {
return true;
} else {
return false;
}
}

所以只需要在需要限流的地方调用该方法对返回值进行判断即可达到限流的目的。

当然这只是利用 Redis 做了一个粗暴的计数器,如果想实现类似于上文中的令牌桶算法可以基于 Lua 自行实现。

Builder 构建器

在设计这个组件时想尽量的提供给使用者清晰、可读性、不易出错的 API。

比如第一步,如何构建一个限流对象。

最常用的方式自然就是构造函数,如果有多个域则可以采用重叠构造器的方式:

     
1
2
3
     
public A(){}
public A(int a){}
public A(int a,int b){}

缺点也是显而易见的:如果参数过多会导致难以阅读,甚至如果参数类型一致的情况下客户端颠倒了顺序,但不会引起警告从而出现难以预测的结果。

第二种方案可以采用 JavaBean 模式,利用 setter 方法进行构建:

     
1
2
3
     
A a = new A();
a.setA(a);
a.setB(b);

这种方式清晰易读,但却容易让对象处于不一致的状态,使对象处于线程不安全的状态。

所以这里采用了第三种创建对象的方式,构建器:

     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
     
public class RedisLimit {
private JedisCommands jedis;
private int limit = 200;
private static final int FAIL_CODE = 0;
/**
* lua script
*/
private String script;
private RedisLimit(Builder builder) {
this.limit = builder.limit ;
this.jedis = builder.jedis ;
buildScript();
}
/**
* limit traffic
* @return if true
*/
public boolean limit() {
String key = String.valueOf(System.currentTimeMillis() / 1000);
Object result = null;
if (jedis instanceof Jedis) {
result = ((Jedis) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit)));
} else if (jedis instanceof JedisCluster) {
result = ((JedisCluster) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit)));
} else {
//throw new RuntimeException("instance is error") ;
return false;
}
if (FAIL_CODE != (Long) result) {
return true;
} else {
return false;
}
}
/**
* read lua script
*/
private void buildScript() {
script = ScriptUtil.getScript("limit.lua");
}
/**
* the builder
* @param <T>
*/
public static class Builder<T extends JedisCommands>{
private T jedis = null ;
private int limit = 200;
public Builder(T jedis){
this.jedis = jedis ;
}
public Builder limit(int limit){
this.limit = limit ;
return this;
}
public RedisLimit build(){
return new RedisLimit(this) ;
}
}
}

这样客户端在使用时:

     
1
2
3
     
RedisLimit redisLimit = new RedisLimit.Builder<>(jedisCluster)
.limit(limit)
.build();

更加的简单直接,并且避免了将创建过程分成了多个子步骤。

这在有多个构造参数,但又不是必选字段时很有作用。

因此顺便将分布式锁的构建器方式也一并更新了:

https://github.com/crossoverJie/distributed-redis-tool#features

更多内容可以参考 Effective Java

API

从上文可以看出,使用过程就是调用 limit 方法。

     
1
2
3
4
5
     
//限流
boolean limit = redisLimit.limit();
if (!limit){
//具体限流逻辑
}

为了减少侵入性,也为了简化客户端提供了两种注解方式。

@ControllerLimit

该注解可以作用于 @RequestMapping 修饰的接口中,并会在限流后提供限流响应。

实现如下:

     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
     
@Component
public class WebIntercept extends WebMvcConfigurerAdapter {
private static Logger logger = LoggerFactory.getLogger(WebIntercept.class);
@Autowired
private RedisLimit redisLimit;
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new CustomInterceptor())
.addPathPatterns("/**");
}
private class CustomInterceptor extends HandlerInterceptorAdapter {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
Object handler) throws Exception {
if (redisLimit == null) {
throw new NullPointerException("redisLimit is null");
}
if (handler instanceof HandlerMethod) {
HandlerMethod method = (HandlerMethod) handler;
ControllerLimit annotation = method.getMethodAnnotation(ControllerLimit.class);
if (annotation == null) {
//skip
return true;
}
boolean limit = redisLimit.limit();
if (!limit) {
logger.warn("request has bean limit");
response.sendError(500, "request limit");
return false;
}
}
return true;
}
}
}

其实就是实现了 SpringMVC 中的拦截器,并在拦截过程中判断是否有使用注解,从而调用限流逻辑。

前提是应用需要扫描到该类,让 Spring 进行管理。

     
1
     
@ComponentScan(value = "com.crossoverjie.distributed.intercept")

@CommonLimit

当然也可以在普通方法中使用。实现原理则是 Spring AOP (SpringMVC 的拦截器本质也是 AOP)。

     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
     
@Aspect
@Component
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class CommonAspect {
private static Logger logger = LoggerFactory.getLogger(CommonAspect.class);
@Autowired
private RedisLimit redisLimit ;
@Pointcut("@annotation(com.crossoverjie.distributed.annotation.CommonLimit)")
private void check(){}
@Before("check()")
public void before(JoinPoint joinPoint) throws Exception {
if (redisLimit == null) {
throw new NullPointerException("redisLimit is null");
}
boolean limit = redisLimit.limit();
if (!limit) {
logger.warn("request has bean limit");
throw new RuntimeException("request has bean limit") ;
}
}
}

很简单,也是在拦截过程中调用限流。

当然使用时也得扫描到该包:

     
1
     
@ComponentScan(value = "com.crossoverjie.distributed.intercept")

总结

限流在一个高并发大流量的系统中是保护应用的一个利器,成熟的方案也很多,希望对刚了解这一块的朋友提供一些思路。

以上所有的源码:

感兴趣的朋友可以点个 Star 或是提交 PR。

号外

最近在总结一些 Java 相关的知识点,感兴趣的朋友可以一起维护。

地址: https://github.com/crossoverJie/Java-Interview

相关 [sbc 分布] 推荐:

sbc(七)分布式限流

- - crossoverJie's Blog
本文接着上文 应用限流进行讨论. 之前谈到的限流方案只能针对于单个 JVM 有效,也就是单机应用. 而对于现在普遍的分布式应用也得有一个分布式限流的方案. 在 Order 应用提供的接口中采取了限流. 首先是配置了限流工具的 Bean:. 接着在 Controller 使用组件:. 为了方便使用,也提供了注解:.

分布式日志

- - Java - 编程语言 - ITeye博客
最近完成一个简单的日志管理系统,拿出来跟大家分享一下. 3、支持文件输出、habse输出、mongodb输出. 基于以上三点功能,我们下面详细说明. 说道支持这个功能,有个同事认为没有这个必要,他的观点是log4j的配置不需要经常变动,不需要支持这样的功能;本人的观点是“配置可以进行统一管理、而且正式机跟测试机的log4j的配置肯定会有一些差异的”,因此这个功能是必须的.

分布式事务简述

- If you are thinking one year ahead, you plant rice. If you are thinking twenty years ahead, you plant trees. If you are thinking a hundred years ahead, you educate people. - BlogJava-首页技术区
  随着系统越来越大,不断的模块化和SOA化,你的系统可能被分散于不同的机器上,这时候,你原先的单机本地事务可能已经无法满足你的需求,你可能要跨系统跨资源的去使用事务.   具体就不多介绍了,相信大家都能明白ACID特性的基本含义. 而一个具体的事务需要涉及到的模型(无论哪种模型)一般由下面几部分组成:.

Hadoop与分布式计算

- 透明 - 丕子
写本文由leftnoteasy发布于http://leftnoteasy.cnblogs.com 本文可以被全部或者部分的使用,但请注明出处,如果有问题,可以联系wheeleast (at) gmail.com, 也可以加作者的新浪微博:http://weibo.com/leftnoteasy. 很久没有写写博客了,之前主要是换工作,耽误了很多的时间,让人也变得懒散,不想花大时间来写东西.

分布式缓存-Memcached

- - 人月神话的BLOG
分布式缓存出于如下考虑,首先是缓存本身的水平线性扩展问题,其次是缓存大并发下的本身的性能问题,再次避免缓存的单点故障问题(多副本和副本一致性). 分布式缓存的核心技术包括首先是内存本身的管理问题,包括了内存的分配,管理和回收机制. 其次是分布式管理和分布式算法,其次是缓存键值管理和路由. 原文: http://wenku.baidu.com/view/8686d46c7e21af45b307a8c3.html.

再谈集中和分布

- - 人月神话的BLOG
上篇文章转载了关于mysql数据库的垂直和水平拆分的相关内容,本篇文章再谈下关于应用集中化后的集中和分布相关策略问题,以及最近关于完全去IOE思路的一些思考和回顾. 现在有一个问题我暂时还没得到比较明确的一些验证,如对于当前x86的pc server服务器好的配置完全可以达到200万TPMC,对于磁盘阵列可以挂接到20T甚至更高的存储容量.

hadoop分布式配置

- - CSDN博客云计算推荐文章
一、前面的部分见伪分布式配置. 二、实现SSH无密码登录远程主机(只在源主机上配置). 注意:以上scp命令表示把authoriezd_keys远程复制到对应主机的相应目录下. slave2是目的主机的名字,需要在源主机的/etc/hosts下配置slave2以及对应的IP地址 192.168.0.5.

浅谈分布式缓存

- - CSDN博客推荐文章
在前面的一些文章中,从实战的角度,讲解了有关 memcached的应用、容灾、监控等等. 但是缺乏对理论的讲解和原理性的剖析. 本文将从理论的角度去介绍,让大家从宏观上对“分布式缓存、nosql”等技术有所了解,以便进一步学习和使用. 在构建大规模的web应用时,缓存技术可以说是必备的,学习的必要性不言而喻.

关于分布式事务

- - Web前端 - ITeye博客
Mysql当前分布式事务只支持Innodb存储引擎. 1个分布式事务由多个行为在不同的数据库上执行,1个分布式事务的执行成功意味着相关数据库上的行为执行均成功. 使用分布式事务的应用程序设计1个或多个资源管理器和一个事务管理器. 资源管理器(RM):用户提供通向事务的途径. 数据库服务器是一个种资源管理器.

BDRP分布式redis集群

- - 百度运维团队技术博客
BDRP(baidu distributed redis platform)是包含 twemproxy, redis,redis-sentinel等多个模块开发的分布式redis平台. bdrp已经在github上进行了开源, bdrp的github项目点这里. 目前redis集群架构主要有以下几个组件: twemproxy:redis的代理系统,可以选择多种数据分片算法 redis:集群的redis存储节点 sentinel:redis官方的集群高可用组件,可以监控redis主节点故障,并进行主备切换.