Vert.x 线程模型揭秘 | 鸟窝

标签: | 发表时间:2020-04-20 10:05 | 作者:
出处:https://colobu.com

Vert.x是一个在JVM开发reactive应用的框架,可用于开发异步、可伸缩、高并发的Web应用(虽然不限于web应用)。其目的在于为JVM提供一个Node.js的替代方案。开发者可以通过它使用JavaScript、Ruby、Groovy、Java,甚至是混合语言来编写应用。
使用Vertx.x框架,可以用JavaScript、CoffeeScript、Ruby、Python、Groovy或Java开发应用程序的组件,最终应用程序可以是混合语言构建的。

本文试图揭示Vert.x的线程模型的应用,通过源代码分析Vert.x如何使用线程池处理请求的,以及比较Netty和Vert.x使用线程池的异同。

也许你觉得奇怪,默认启动一个Vert.x Verticle实例,它只用一个线程处理事件,在多核的情况下你需要创建多个Verticle实例以充分利用多个CPU Core的性能。

Vert.x 实例

首先先啰嗦地介绍一些Vert.x概念,熟悉Vert.x开发的朋友可以跳过这一节。

在Vert.x里,如果你不使用Vertx对象,你几乎是寸步难行。

Vertx对象扮演着Vert.x控制中心的角色,同时它也提供了大量的功能,例如:

  • 编写TCP客户端和服务器
  • 编写HTTP客户端和服务器,包括websocket
  • Event bus
  • 共享数据
  • 定时器
  • 发布和卸载Verticle
  • UDP
  • DNS client
  • 文件系统访问
  • 高可用
  • 集群

如果你将Vert.x嵌入到你的应用程序中,你可以向下面这样获得一个Vertx对象的引用

     
1
     
Vertx vertx = Vertx.vertx();

当你实例化Vertx对象时,如果你感觉默认的参数不符合你的需求,你可以指定实例化时的参数:

     
1
     
Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(40));

VertxOptions对象拥有很多关于Vertx实例设置,例如配置集群,高可用设置,线程池大小以及等等其他参数。下面就介绍一下它的线程池。

线程池

1、eventLoopGroup
这个对象是 NioEventLoopGroup的一个实例,它的线程池的大小由 options.getEventLoopPoolSize()决定,如果没有设置,默认为CPU核数 * 2。

     
1
2
3
     
eventLoopThreadFactory = new VertxThreadFactory("vert.x-eventloop-thread-", checker, false);
eventLoopGroup = new NioEventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory);
eventLoopGroup.setIoRatio(NETTY_IO_RATIO);

它的 EventLoop和一个Context对应:

     
1
2
3
4
5
6
7
8
9
10
     
protected ContextImpl(……) {
……
EventLoopGroup group = vertx.getEventLoopGroup();
if (group != null) {
this.eventLoop = group.next();
} else {
this.eventLoop = null;
}
……
}

它用来执行标准的Verticle。

2、WorkerPool
用来执行worker Verticle。

     
1
2
     
workerPool = Executors.newFixedThreadPool(options.getWorkerPoolSize(),
new VertxThreadFactory("vert.x-worker-thread-", checker, true));

3、Internal Blocking Pool
内部使用的线程池,可以用来将阻塞代码异步化。

     
1
2
     
internalBlockingPool = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(),
new VertxThreadFactory("vert.x-internal-blocking-", checker, true));

不要在event loop中执行阻塞操作, 比如访问数据库或者网络资源,这绝对会影响你的应用的性能。对于这些阻塞操作,你可以将它们异步化:

     
1
2
3
4
5
6
7
     
vertx.executeBlocking(future -> {
// 下面这行代码可能花费很长时间
String result = someAPI.blockingMethod("hello");
future.complete(result);
}, res -> {
System.out.println("The result is: " + res.result());
});

默认情况下executeBlocking会在同一个context中执行(同一个verticle实例),它们会串行化执行。如果不关心这个执行的顺序,可以将ordered参数设为false,它们会在worker pool线程池中并行的执行。

另外一种执行阻塞代码的方式就是使用worker verticle,worker verticle总是在worker pool线程池中执行。

Verticle

Verticle有点类似Actor模型,也可以实现并发的,可扩展的,易于发布的模型。

一个vert.x应用可以包含多个verticle实例,实例之间可以通过event bus通讯。

三种类型

http://vertx.io/docs/vertx-core/java/#_verticle_types

1、Standard Verticle: 最通用的类型,总是在event loop中执行。
2、Worker Verticle:它们使用worker pool线程池运行。一个verticle实例绝对不会在两个或者更多线程中并发执行。
3、Multi-threaded worker verticle:它们使用worker pool线程池运行。 一个verticle实例可以在多个线程中并发执行。

实现一个Verticle很简单:

     
1
2
3
4
5
6
7
8
9
10
11
     
public class MyVerticle extends AbstractVerticle {
// 当发布verticle时调用
public void start() {
}
// 可以不实现。当 verticle 卸载时调用
public void stop() {
}
}

发布方式

1、命令行方式
vertx run SomeJavaSourceFile.java
或者通过 maven-shade-plugin打包成一个 fat包:

     
1
2
3
4
5
6
7
8
9
10
11
     
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<manifestEntries>
<Main-Class>io.vertx.core.Starter</Main-Class>
<Main-Verticle>com.colobu.vertx.Main</Main-Verticle>
</manifestEntries>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/services/io.vertx.core.spi.VerticleFactory</resource>
</transformer>
</transformers>

然后运行  java -jar xxx-fat.jar,你还可以传递一些参数。

2、编程方式
你也可以编程的方式,通过 vertx.deployVerticle发布:

     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
     
public class Main extends AbstractVerticle {
public static void main(String[] args) {
VertxOptions vo = new VertxOptions();
vo.setEventLoopPoolSize(16);
Vertx vertx = Vertx.vertx(vo);
DeploymentOptions options = new DeploymentOptions();
options.setInstances(100);
vertx.deployVerticle(Main.class.getName(), options, e -> {
System.out.println(e.succeeded());
System.out.println(e.failed());
System.out.println(e.cause());
System.out.println(e.result());
});
}
@Override
public void start() {
Handler<HttpServerRequest> handler = e -> {
HttpServerResponse response = e.response();
response.putHeader("content-type", "application/json").end("Hello world");
};
vertx.createHttpServer().requestHandler(handler).listen(8080);
}
}

Verticle发布和Vert.x线程模型

以上比较啰嗦,主要介绍了一些Vert.x的一些概念。下面是我想重点介绍的内容。

本节以实现一个简单的http server为例(编程方式发布Verticle),分析 vert.x 的线程和Verticle的关系。只分析标准的Verticle。代码如上。

Verticle发布过程

首先先创建一个Vertx实例,可以你可以通过 VertxOptions设置线程池的大小。上面的例子中设置Event Loop线程池的大小为16:

     
1
     
vo.setEventLoopPoolSize(16);

因此即使你创建几百个Verticle,也只会有16个Event Loop处理它们,你可以通过 jstack查看这些线程。你会看到多个名为 vert.x-eventloop-thread-<num>的线程,一个 vertx-blocked-thread-checker线程,一个 vert.x-acceptor-thread-0

调用 void deployVerticle(String name, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler)方法发布 Verticle
DeploymentOptions对象可以设置发布参数,比如是否是worker verticle,多线程worker verticle, ha, 隔离组等, 重要的是instances,它用来指定分布的Verticle实例的数量,默认是一个。

底层调用 DeploymentManagerdoDeployVerticle来实现,它会根据实例数创建相应多的 Verticle,然后调用 doDeploy发布这些 Verticle:

     
1
     
Verticle[] verticles = createVerticles(verticleFactory, identifier, options.getInstances(), cl);

我将 doDeploy方法简化,让我们看一下关键代码:

     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
     
private void doDeploy(String identifier, String deploymentID, DeploymentOptions options,
ContextImpl parentContext,
ContextImpl callingContext,
Handler<AsyncResult<String>> completionHandler,
ClassLoader tccl, Verticle... verticles) {
//准备工作
……
for (Verticle verticle: verticles) {
//创建上下文
ContextImpl context = options.isWorker() ? vertx.createWorkerContext(options.isMultiThreaded(), deploymentID, conf, tccl) :
vertx.createEventLoopContext(deploymentID, conf, tccl);
deployment.addVerticle(new VerticleHolder(verticle, context));
context.runOnContext(v -> {
try {
verticle.init(vertx, context);
Future<Void> startFuture = Future.future();
verticle.start(startFuture);
startFuture.setHandler(……);
} catch (Throwable t) {}
});
}
}

可以看到#11 行创建了一个上下文ContextImpl, 因为本例中我们不用worker模式,所以这个上下文是通过 vertx.createEventLoopContext(deploymentID, conf, tccl)创建的。每个verticle都会创建一个新的上下文,因此verticle和上下文是意义对应的。

#17 行初始化verticle,#19 行启动这个verticle。还记得我们的例子中实现的 start方法吗,它会在这里被调用。

这样,多个verticle实例被发布了。

线程模型

首先插播一下Netty的线程模型,不感兴趣的可以略过。

Netty的线程模型

虽然Vert.x底层籍由Netty实现,但是它的处理方式与Netty NIO的线程模型是不同的。
(以下谈论的Netty线程模型是指NIO的情况)
比如下面的Netty代码片段:

     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
     
EventLoopGroup parentGroup = new NioEventLoopGroup(1);
EventLoopGroup childGroup = new NioEventLoopGroup(50);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>(){……});
Channel ch = b.bind("0.0.0.0",8080).sync().channel();
ch.closeFuture().sync();
} finally {
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}

NioEventLoopGroup代表一组 EventLoop,每个 EventLoop映射一个线程,每个 Channel注册一个 EventLoop,但是一个 EventLoop可以关联多个 Channel
parentGroup用来处理Accept事件,而 childGroup用来处理其余的IO事件。当有并发连接的时候, Handler会在 childGroup线程池中执行。你可以指定 childGroup的线程数量,如果没有指定,则从系统属性中读取"io.netty.eventLoopThreads",如果这个属性没有设置,则使用CPU核数  2 (Runtime.getRuntime().availableProcessors() 2))。一般 parentGroup设置为1,我们只需要一个Acceptor处理客户端的连接即可。

当有多个并发连接的时候,每个连接/Channel被分配到一个 EventLoop上。 EventLoop选择是均匀地 (如果线程数是2的n次方,可以用比较快的选择方法PowerOfTwoEventExecutorChooser):

     
1
2
3
4
5
6
7
8
9
10
11
12
13
     
private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
return children[childIndex.getAndIncrement() & children.length - 1];
}
}
private final class GenericEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}
}

因此一旦如果某个 EventLoop处理慢了,则这个线程上的event可能出现堆积。
比如下面的代码故意在某个线程上处理慢一些,导致这个 EventLoop上出现堆积,Netty并没有根据压力将时间分配到其它处理快的 EventLoop上。

     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
     
public class HelloServerHandler extends ChannelInboundHandlerAdapter {
……
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String name = Thread.currentThread().getName();
System.out.println(name);
if (name.endsWith("-5")) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
……

输出结果可以看到 nioEventLoopGroup-3-5处理了同样多的请求,而且都堆积在后面了。

     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
     
……
nioEventLoopGroup-3-19
nioEventLoopGroup-3-18
nioEventLoopGroup-3-19
nioEventLoopGroup-3-18
nioEventLoopGroup-3-20
nioEventLoopGroup-3-20
nioEventLoopGroup-3-5
nioEventLoopGroup-3-5
nioEventLoopGroup-3-5
nioEventLoopGroup-3-5
nioEventLoopGroup-3-5
nioEventLoopGroup-3-5
nioEventLoopGroup-3-5
nioEventLoopGroup-3-5
nioEventLoopGroup-3-5

因此,我们可以了解到,当启动一个NIO方式的Netty实例的时候,它会使用一个线程池来处理http请求。

Netty 4.0的线程模型被很好的重定义,一个 ChannelHandler实例的方法不会被并发的调用,除非它被 @Sharable标记,因此你不应该增加一个ChannelHandler 实例多次。当你增加一个handler到ChannelPipeline中时,你可以指定一个特定的 EventExecutorGroup来执行这个handler。如果没有指定,则使用Channel注册的 EventLoop来执行。如果两个Handler被指定不同的 EventExecutorGroup,则它们会并发执行,因此如果它们会访问共享数据的化,你需要关注并发控制的问题。更多内容可以查看  Netty的文档

Vert.x的线程模型

Vert.x如何在线程中处理事件的呢,还是以我们的例子分析。

回顾一下我们实现的Verticle的start方法。

     
1
2
3
4
5
6
7
8
     
@Override
public void start() {
Handler<HttpServerRequest> handler = e -> {
HttpServerResponse response = e.response();
response.putHeader("content-type", "application/json").end("Hello world");
};
vertx.createHttpServer().requestHandler(handler).listen(8080);
}

在这个start方法中,我们创建了一个http server,让它监听 8080端口, http request的处理交给handler执行。 那么监听线程是哪一个?handler又是在哪个线程池中执行的呢?调用多个Verticle实例的方法为什么没有出现"地址/端口被占用"的异常呢?

首先vertx.createHttpServer()会创建一个HttpServerImpl对象,可以通过HttpServerOptions配置更多的参数,每个Verticle实例都会创建一个HttpServerImpl对象。requestHandler(handler)方法设置处理器,你还可以使用Vert.x-Web设置路由的功能。

listen(8080)启动http 服务器,它实际调用netty实现的。
我将 listen方法简化,去除一些检查代码和回调处理,只保留关键代码如下:

     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
     
public synchronized HttpServer listen(int port, String host, Handler<AsyncResult<HttpServer>> listenHandler) {
listenContext = vertx.getOrCreateContext();
listening = true;
synchronized (vertx.sharedHttpServers()) {
id = new ServerID(port, host);
HttpServerImpl shared = vertx.sharedHttpServers().get(id);
if (shared == null) {
serverChannelGroup = new DefaultChannelGroup("vertx-acceptor-channels", GlobalEventExecutor.INSTANCE);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(vertx.getAcceptorEventLoopGroup(), availableWorkers);
bootstrap.channelFactory(new VertxNioServerChannelFactory());
bootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
……
pipeline.addLast("handler", new ServerHandler());
}
});
addHandlers(this, listenContext);
vertx.sharedHttpServers().put(id, this);
actualServer = this;
} else {
// Server already exists with that host/port - we will use that
actualServer = shared;
addHandlers(actualServer, listenContext);
metrics = vertx.metricsSPI().createMetrics(this, new SocketAddressImpl(port, host), options);
}
}
return this;
}

#6 行可以看到它会检查使用这个IP地址和端口的http server是否存在,如果存在的化直接跳到# 27行。因此回答上面的问题,多个Verticle实例不会引起冲突,因为它们会共享同一个http server。

这个http server通过netty ServerBootstrap创建。#10 行可以看到acceptor是一个单线程执行的,acceptorEventLoopGroup在VertxImpl中定义。

     
1
     
acceptorEventLoopGroup = new NioEventLoopGroup(1, acceptorEventLoopThreadFactory);

#10 行还显示,netty的IO worker线程池由 availableWorkers确定,它是一个VertxEventLoopGroup对象。VertxEventLoopGroup类扩展 AbstractEventExecutorGroup,实现了 EventLoopGroup接口:

     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
     
……
@Override
public synchronized EventLoop next() {
if (workers.isEmpty()) {
throw new IllegalStateException();
} else {
EventLoop worker = workers.get(pos).worker;
pos++;
checkPos();
return worker;
}
}
public synchronized void addWorker(EventLoop worker) {
EventLoopHolder holder = findHolder(worker);
if (holder == null) {
workers.add(new EventLoopHolder(worker));
} else {
holder.count++;
}
}
……

线程的数量由 worker的数量决定,worker的类型是EventLoop,对应一个线程,有多少 worker就会有多少线程。

通过 addWorker可以增加线程的数量,worker不会重复。

回到刚才的 listen方法, #21 行addHandlers方法会配置handler在哪一个event loop中执行:

     
1
2
3
4
5
6
7
8
     
private void addHandlers(HttpServerImpl server, ContextImpl context) {
if (requestStream.handler() != null) {
server.reqHandlerManager.addHandler(requestStream.handler(), context);
}
if (wsStream.handler() != null) {
server.wsHandlerManager.addHandler(wsStream.handler(), context);
}
}

server.reqHandlerManager.addHandler方法如下:

     
1
2
3
4
5
6
7
8
9
10
11
     
public synchronized void addHandler(Handler<T> handler, ContextImpl context) {
EventLoop worker = context.nettyEventLoop();
availableWorkers.addWorker(worker);
Handlers<T> handlers = new Handlers<>();
Handlers<T> prev = handlerMap.putIfAbsent(worker, handlers);
if (prev != null) {
handlers = prev;
}
handlers.addHandler(new HandlerHolder<>(context, handler));
hasHandlers = true;
}

#2 行得到这个上下文的EventLoop。 还记得上下文的EventLoop怎么创建出来的吗?每个 Verticle实例关联一个上下文,因此一个 Verticle实例只会创建一个worker。
把这个worker加入到availableWorkers,这样就增加了一个事件处理线程。

因此我们可以看出正常情况下Vert.x的每个 Verticle实例只会用一个线程处理请求,在多核情况下一定要配置instance的数量。

如果配置的instance的数量大于eventLoopPoolSize数量,那么就会有一个Event Loop处理多个instance的情况。 线程配置的过多有时不会带来性能的提升,由于线程也有context swicthing,反而会带来性能的降低。

相关 [vert 线程 模型] 推荐:

浅析C++多线程内存模型

- 2sin18 - 并行实验室 | Parallel Labs
注:本文发表于《程序员》2011年第6期并行编程专栏,略有删改. 在即将到来的C++1x标准中,一个重大的更新就是引入了C++多线程内存模型. 本文的主要目的在于介绍C++多线程内存模型涉及到的一些原理和概念,以帮助大家理解C++多线程内存模型的作用和意义. 顺序一致性模型(Sequential Consistency).

Java 多线程内存模型

- - ITeye博客
Java 多线程内存模型.       Java虚拟机规范中试图定义一种Java内存模型(Java Memory Model,JMM)来屏蔽掉各种硬件和操作系统的内存访问差异,以实现让Java程序在各种平台下都能达到一致的并发效果. 在此之前,主流程序怨言(如C/C++等)直接使用物理硬件(或者说操作系统的内存模型),因此,会由于不同的平台上内存模型差异,导致程序在一套平台上并发完成正常,而在另一套平台上并发访问却经常出错,因此经常需要针对不同的平台来编写程序.

Vert.x 线程模型揭秘 | 鸟窝

- -
Vert.x是一个在JVM开发reactive应用的框架,可用于开发异步、可伸缩、高并发的Web应用(虽然不限于web应用). 其目的在于为JVM提供一个Node.js的替代方案. 开发者可以通过它使用JavaScript、Ruby、Groovy、Java,甚至是混合语言来编写应用. 使用Vertx.x框架,可以用JavaScript、CoffeeScript、Ruby、Python、Groovy或Java开发应用程序的组件,最终应用程序可以是混合语言构建的.

读书笔记:对线程模型的批评

- 阿贡 - 酷壳 - CoolShell.cn
——感谢Ian.Sian投递本文——. 多线程模型是主流的并发编程模型. 在过去几十年来,多线程模型一直是开发并发程序的有力工具. 1997年,NASA 的“火星探路者”号在执行任务的途中遭遇了严重的时序异常(参见 “What really happend on Mars“,注目 follow-up 中的现身说法),无法发回探测数据.

模型制作

- 小鱼儿 - 非正常人类研究中心 – Mtime时光网
1.材料:一大袋的一次性筷子(花了60块钱);5支502胶水;5张粗砂纸;记号笔一只;锋利的美工刀片若干,破剪刀一把. 就是这种屌毛筷子,质量也太他妈的差了点,80%都是弯的 . 随便提一下:我的脚丫子还是蛮性感滴 . 开始动工了!!  先做门框跟房子的底架. 3.不好意思,忘了交代一下了,我是先画图纸的,看到那张纸了没有.

MapReduce编程模型

- - CSDN博客云计算推荐文章
MapReduce是一个Google发明的编程模型,也是一个处理和生成超大规模数据集的算法模型的相关实现. 用户首先创建一个Map函数处理一个基于对的数据集合,输出的中间结果基于对的数据集合,然后再创建一个Reduce函数用来合并所有的具有相同中间Key值的中间Value值.

关于BOM模型

- - CSDN博客编程语言推荐文章
当我们使用浏览器打开一个网页程序时,那么,js系统会自动创建对象,首先创建浏览器对象window,然后再为window对象创建它的子级对象,最后形成一个树状模型,这个就是BOM模型. BOM定义了JavaScript可以进行操作的浏览器的各个功能部件的接口. BOM 主要处理浏览器窗口和框架,不过通常浏览器特定的 JavaScript 扩展都被看做 BOM 的一部分.

MySQL Replication 线程

- - CSDN博客推荐文章
Replication 线程. Mysql 的Replication 是一个异步的复制过程,从一个Mysql instace(我们称之为Master)复制到另一个Mysql instance(我们称之Slave). 在Master 与Slave 之间的实现整个复制过程主. 要由三个线程来完成,其中两个线程(Sql 线程和IO 线程)在Slave 端,另外一个线程(IO 线程)在Master 端.

Java线程池

- - 企业架构 - ITeye博客
线程的使用在java中占有极其重要的地位,在jdk1.4极其之前的jdk版本中,关于线程池的使用是极其简陋的. 在jdk1.5之后这一情况有了很大的改观. Jdk1.5之后加入了java.util.concurrent包,这个包中主要介绍java中线程以及线程池的使用. 为我们在开发中处理线程的问题提供了非常大的帮助.

Java 线程池

- - 编程语言 - ITeye博客
在项目中,系统启动一个新线程的成本是比较高的,因为它涉及与操作系统交互. 在这种情形下,使用线程池可以很好地提高性能,尤其是当程序中需要创建大量生存周期很短的线程时,更应该考虑使用线程池. 使用线程池可以有效地控制系统中并发线程的数量,当系统中包含大量并发线程时,会导致系统性能剧烈下降,甚至导致JVM崩溃,而线程池的最大线程数参数可以控制系统中并发线程数不超过此数.