Netty 用户指南4.x
前言
问题
现在我们经常使用程序或者库和其他人交流信息.例如,我们经常使用http程序库去从一个web server接收信息,或者调用一个远程的web服务.然而,一个通用的传输协议或者实现有的时候不能适应我们自己的场景.例如,我们不会用http server来传输一些大的文件,Email和一些实时性的信息,例如金融方面或者有些游戏数据方面的信息.这些需要一个高度优化的协议,为了使用某一种特定的应用场景. 例如,你可能想要实现一个专门为了多媒体视频流,大文件传出,或者基于ajax的一个聊天程序的Http Server.甚至你为自己的应用场景甚至想要实现自己的一套新的传输协议.另一个问题就是不得不兼容一些老的协议,以便于新的系统能跟老的系统做一些数据的交互.在这种情况下遇到的问题是我们在不牺牲程序的稳定和性能的情况下,快速的实现自己想要的协议.
解决方案
Netty 致力于提供一个异步的事件驱动的网络编程框架,是一个开发高可维护,高性能的,高伸缩性协议服务器或者客户端的工具.
换句话说,Netty是一个基于NIO的客户端服务器开发框架,能够方便快捷的开发网络应用的客户端跟服务器.它大大简化了网络开发的流程,例如TCP和UDP的开发.
但是"简单跟快速"并不意味着将会导致许多难以维护跟性的圣的问题.Netty从大量的已经实现的协议中(FTP,SMTP,HTTP,以及其他的一些二进制文本的协议)获取的相当的经验,并且经过小心的设计.结果Netty找到了一种不折中并且开发简单,高性能,极度灵活的方式去开发程序.
许多用户可能已经发现了其他一些网络编程的框架,这些框架都声称自己有很多优点,你可能想要问Netty跟他们有什么不同.答案是建造的哲学.Netty从设计的第一台你致力于给你最舒适的体验跟最方便的接口.这不是有行的东西,但是这个哲学会是你的生活更加方便,在你阅读这个指南和使用Netty的时候.
开始
这一节用一些简单的例子介绍Netty的核心的结构,让你快速的开始.(带你装逼带你飞的节奏)....这一节结束你将会使用Netty写一个客户端跟服务器.
如果你喜欢一自定向下的学习方法,可能想从第二章体系结构概述开始,然后再回到这里.
开始之前.
运行本章的例子程序仅需要两个简单的条件.1)JDK>=1.6 2) 下载最新的Netty下载页面 http://netty.io/downloads.html
阅读本章时,你可能会有很多的问题关于这些类的介绍,当你想知道能多的信息的时候请参考API手册,为了方便在本文档中所有的类名都连接到了在线的API手册.所以当你发现问题的时候,无论是语法错误,排版的错误或者一些改进文档的意见,不要犹豫的联系我们 http://netty.io/community.html .
A Discard Server
The most simplisticprotocol in the world is not 'Hello, World!' but DISCARD. (你他么在逗我,居然不写hello,world,这句不译) .这个协议会丢弃所有接受的数据,不会用任何的响应.
直线这个丢弃协议,仅仅是要忽略所有接收到的数据这么简单, 我们直接使用Netty处理IO的实现,在实现这个协议.
package io.netty.example.discard;
import io.netty.buffer.ByteBuf;
importio.netty.channel.ChannelHandlerContext;
importio.netty.channel.ChannelInboundHandlerAdapter;
/**
*Handles a server-side channel.
*/
public class DiscardServerHandler extendsChannelInboundHandlerAdapter { // (1)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
// Discard the received data silently.
((ByteBuf) msg).release(); // (3)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){ // (4)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
1.DiscardServerHandler 继承 ChannelInboundHandlerAdapter 实现了 ChannelInboundHandler. ChannelInboundHandler 接口提供了事件处理的方法,你能重写这些方法.现在仅仅需要继承自 ChannelInboundHandlerAdapter 就足够了需要添加自己的实现.
2重写了channelRead()这个事件处理方法,每当客户端发送数据是,这个方法接收数据时被调用.在这个例子中,使用ByteBuf接收消息.
3.为了实现这个丢弃协议.这个Handler忽略了所有接收到的数据.ByteBuf 是一个对象计数的引用,他必须通过release() 这个方法来释放.请记住释放所有的对象计数的引用是Hander的责任(也就是必须做),通常,channelRead() handler的实现方法如下:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg); // (1)
ctx.flush(); // (2)
}
4.当Netty发生IO异常抛出Throwable对象的时候The exceptionCaught()时间将会被调用,在大多数情况下,发现异常应记录及其相关频道应该关闭这里,虽然这个方法可以不同的实现取决于你想做什么来处理特殊情况.例如,您可能想要发送一个响应消息关闭连接前一个错误代码.
目前一切顺利,我们已经实现了第一个discard server.现在我们写一个main()方法来运行这个DiscardServerHander Server.
本例代码在package io.netty.example.discard;中
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
importio.netty.channel.nio.NioEventLoopGroup;
importio.netty.channel.socket.SocketChannel;
importio.netty.channel.socket.nio.NioServerSocketChannel;
/**
*Discards any incoming data.
*/
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer() { // (4)
@Override
public voidinitChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(newDiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that togracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new DiscardServer(port).run();
}
1 NioEventLoopGroup是一个多线程的处理IO操作的事件循环操作.Netty 为多种不同的传输方式提供大量的 NioEventLoopGroup实现.我们实现一个server端的程序作为例子,这里用到了两个 NioEventLoopGroup.第一个被称为"Boss"接受连接.第二个被称为"worker"(有点类似Nginx呢),当"Boss"接受连接后,这个连接就被注册连接到"worker",使用多少线程,worker跟Channel是怎么对应的取决于 EventLoopGroup 实现,甚至可以是可以配置的.
2 ServerBootstrap 是设置服务器的帮助类,你能用 Channel直接设置一个服务器,然而,这是一个比较复杂的过程,大多数情况下不你不需要这么做.
3 现在,我们用 NioServerSocketChannel 作为 Channel 接受连接的示例.
4当一个新的 Channel接受连接的时候,这个handler将被执行, ChannelInitializer 是一个帮助你配置一个 new Channel 的Handler.随着程序的复杂,最终你可能增加许多handlers 抽取许多类去替代这些匿名类.
5你还可以在Channnl实现是设置一些具体的参数.我们写一个tcp/ip服务,可以设置socket参数为tcpNoDelay和keepAlive.具体参考apidocs ChannelOption和 ChannelConfig 提供了大概支持的参数选项.
6你注意到option()和chindoption()了吗? 在 NioServerSocketChannel这个 例子中option()是 NioServerSocketChannel接受连接的配置.childoption()是 Channel接受 ServerChannel的配置.
7我们继续下去...剩下的就是绑定端口,并启动服务.现在我们绑定端口8080并接受所有ip的连接,现在你能调用bind()很多次,绑定不同的端口.
恭喜,你刚刚完成了第一个Netty server.
查看取得的数据
现在我们编写了第一个server,需要测试程序有没有在正常的工作,最简单的方式就是使用telnet来测试,例如telnet localhost 8080 然后输入什么东西.
然而我么能说这个server工作的很好吗?我们什么反馈也拿不到,因为这是一个 discardserver.
我们知道当数据被接收的时候channelRead()方法就会被执行,让我们在DiscardServerHandler channelRead()方法中添加一些代码.
@Overridepublic voidchannelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in =(ByteBuf) msg; try { while(in.isReadable()) { // (1)
System.out.print((char) in.readByte()); System.out.flush();
}
} finally { ReferenceCountUtil.release(msg);// (2)
}
}
1.最没有效率但是最简单的就是System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
2 或者,你能在这里执行in.release() ;
重新运行telnet 命令,server想打印你输入的东西.
本例完成的代码在 io.netty.example.discard包下面
编写一个有回应的server
目前为止,我们已经消费了这些数据但是却没有一点回应,然而通常一个服务支持对于请求给与回应,让我学习怎么实现回应协议,接收到什么信息就返回什么信息。
与前一章节不同的仅仅是,前一章节接收到数据输出到控制台,现在修改一下ChannelRead() method:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg); // (1)
ctx.flush(); // (2)
}
1 ChannelHandlerContext 对象提供了大量的触发I/O事件模型的操作,这里我们执行write(Object) 把收到的数据逐字的返回,注意这里不像DISCARD例子中那样释放收到的数据,因为当你写完数据的时候Netty会帮你释放他们。
2ctx.write(Object)不会使消息写入。它内部缓冲,然后等到刷新的时候通过ctx.flush()写入。换句话说你可以俄调用ctx.writeAndFlush(msg)简单的写入数据。
运行 telnet 再次查看效果,无论你出入什么将会原样返回。
本例代码在
io.netty.example.echo包下面。
A TIME Server
这个协议实现TIME 协议,跟前面的例子不同,这个例子返回一个32位的Intger,不会接收请求的数据,在本例中将会学到怎么洋创建一个消息并发送,怎么去完整的关闭一个连接。
因为一旦建立连接我们要忽略任何接收的数据但发送消息,这次我们不能使用channelRead()方法。相反,我们应该覆盖channelActive()方法。下面是实现:
package io.netty.example.time;
public class TimeServerHandler extendsChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) { // (1)
final ByteBuf time = ctx.alloc().buffer(4); // (2)
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time); // (3)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
cause.printStackTrace();
ctx.close();
}
}.
1解释。建立连接并准备产生流量时channelActive()方法将被调用。我们写回一个32位的整数作为回应。
2为了发送一个新的消息,我们需要分配一个包含消息的新的Buffer。我们要写一个32位的整数,所以这个buffer至少要4字节,通过ChannelHandlerContext.alloc()获取当前ByteBufAllocator并分配新缓冲区。
3 通常,我们写构造好的消息。
但是等一下 flip在哪里?在发送消息之前不调用 java.nio.ByteBuffer.flip()方法吗?ByteBuf没有这个方法有两个指针,一个读指针,一个写指针。当写指针写了一些东西,读指针却还没有变化。他们分别代表消息的开始和结束
与此鲜明对比的是,NIO 没有提供一个明确的方式区计算消息的开始或者结束通过原来的flip方法的方式,当你不使用flip方法的时候你可能陷入困境,没有正确传输的数据,这个问题将不会发生在Netty中因为我们有不同的指针不同的操作类型,你会发现当你使用它生活会更加的美好---没有flipping的世界。
需要注意的另一点是,ChannelHandlerContext.write()(writeAndFlush())方法返回一个ChannelFuture。ChannelFuture代表一个I / O操作,尚未发生。这意味着,任何可能没有执行请求的操作,因为在网状的所有操作都是异步的。例如,下面的代码可能会关闭连接之前就发送消息:
Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
因此,在ChannelFuture完成后,需要调用write()方法返回的close()方法,通知监听器写操作完成。请注意,关闭()也可能不会立即关闭连接,它返回一个ChannelFuture。
4我们如何得到写完成时通知呢?这非常简单,只需使ChannelFutureListener 返回ChannelFuture,在这里,我们创建了一个新的匿名ChannelFutureListener操作完成后关闭Channel。
或者,您可以简化代码使用一个预定义的侦听器:
f.addListener(ChannelFutureListener.CLOSE);
可以使用Unix命令测试我们的代码是否通过。
$ rdate -o-p
port 跟host 在main()方法里有定义。
写一个时间的客户端
不像DISCARD和ECHO Server ,为了将一个32位的数据转换为一个时间类型,我们需要实现一个TIME协议的客户端.在本章节中我们将是学习怎么样正确的运行这个Server和学习怎么样用Netty写一个客户端.
跟前面最大也是仅有的不同就是是用了 Bootstrap 和 Channel 的实现,先看一眼下面的代码.
package io.netty.example.time;
public class TimeClient {
public static void main(String[] args) throws Exception {
String host = args[0];
int port = Integer.parseInt(args[1]);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer() {
@Override
public voidinitChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(newTimeClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
1 Bootstrap 类似于 ServerBootstrap,除了没有Channels更像一个单边的没有连接的Channel.
2 如果你明确一个 EventLoopGroup,他将同时被用于boss group跟work group. Boss worker但不会用于客户端这一边.
3 一个客户端单边的Channel 替代了 NioServerSocketChannel, NioSocketChannel.
4 注意我们没有像ServerBootstrap中使用childOption() ,因为单边 SocketChannel没有一个父类.
5 调用 connect() 取代 bind();
如你所见,跟前面的例子并没有什么不同. ChannelHandler是怎么实现的? 无非就是接收一个32的int 然后转换成一个人类可读的格式,最后关闭连接,仅此而已.
package io.netty.example.time;
import java.util.Date;
public class TimeClientHandler extendsChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
cause.printStackTrace();
ctx.close();
}
}
1 在TCP/IP中,Netry把数据读取到一个ByteBuf
这看起来跟以前没有什么不同.但是有个困扰就是你可能会造成IndexOutOfBoundsException,下一章节将讨论会发生什么情况.
处理一个面向流的传输
一个关于Socker Buffer的警告
在基于流的传输过程中,socket的把数据接收到一个Buffer中.不幸的是,这个Buffer是一个byte的队列而不是一个基于包(packet)的队列,这意味这你发送两个消息,操作系统将会把他们当作一个byte流来进行传输.当你接收到数据的时候仅仅是一个流(流氓的流......).这将不能保证收到的数据跟发送的数据是一致的,例如下面我们发送3个包.
因为这是基于流的传输,有很大的可能性是你手打的数据是下面的样子.
因此不管是server还是client我们都应去收手动的处理数据,才能得到自己想要的东西,才能得到如下的数据;
第一种解决方式
回到Time Client的例子,我们将在这里解决问题,一个32为的int数量比较小,可能并不是经常的破碎.然可可能随着数据传输数量的增加,破碎的可能性会增加.
简单的解决方式是,创建一个内部的Buffer,接收到4个Byte是存储到这个Buffer中,就像下面的TimeClientHandler所示:
packageio.netty.example.time;
importjava.util.Date;
public classTimeClientHandler extends ChannelInboundHandlerAdapter {
private ByteBuf buf;
@Override
public voidhandlerAdded(ChannelHandlerContext ctx) {
buf = ctx.alloc().buffer(4); // (1)
}
@Override
public voidhandlerRemoved(ChannelHandlerContext ctx) {
buf.release(); // (1)
buf = null;
}
@Override
public voidchannelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
buf.writeBytes(m); // (2)
m.release();
if (buf.readableBytes() >= 4) { //(3)
long currentTimeMillis =(buf.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(newDate(currentTimeMillis));
ctx.close();
}
}
@Override
public voidexceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
1 ChannelHandler有两个生命周期的监听方法,handlerAdded() 和 handlerRemoved().你可以执行任意的初始化任务,只要你这个任务别花太多的时间.
2 首先,所有接收到的数据都应该放到buf中.
3 然后,处理程序必须检查有没有足够的4个字节,去处理实际的业务逻辑.另外,当数据到达的时候将会调用 channelRead()方法,最终我们会积累4个字节的数量..
第二种解决方案
尽管第一种方案处理的不够完美但是,还是能够解决问题.如果有有个协议的消息的长度不是固定的,你的 ChannelInboundHandler将很快变得不能维护了.
正如你可能已经注意到,您可以添加多个ChannelHandler ChannelPipeline,因此,您可以将一个单片ChannelHandler分为多个模块化的减少你的应用程序的复杂性 ,例如你可以把 TimeClientHandler 分离为两个handler:
TimeDecoder 用来解决分片的问题
TimeClientHandler是最原始的版本
幸运的是Netty提供一个可以扩展的类,你可以开箱即用.
packageio.netty.example.time;
public classTimeDecoder extends ByteToMessageDecoder { // (1)
@Override
protected void decode(ChannelHandlerContextctx, ByteBuf in, Listout) { // (2)
if (in.readableBytes() < 4) {
return; // (3)
}
out.add(in.readBytes(4)); // (4)
}
}
1 ByteToMessageDecoder是 ChannelInboundHandler 的一个实现,用来帮助分片的问题.
2 每当接收到新的数据 ByteToMessageDecoder调用的decode()方法内部维护累积缓冲区。
3 decode()可以决定在没有足够的累积缓冲区中的数据添的时候什么也不输出.当数据接收的时候 ByteToMessageDecoder 将调用 decode()方法.
4如果decode()输出了消息,这意味着解码器解码成功消息。 ByteToMessageDecoder将会丢弃已经读过的对象.请记住,你不需要解码多个消息,ByteToMessageDecoderwill保持调用decode()方法,直到它输出些什么。
现在给 ChannelPipeline增加另一个handler,修改 ChannelInitializer实现TimeClient;
b.handler(newChannelInitializer() {
@Override
public void initChannel(SocketChannel ch)throws Exception {
ch.pipeline().addLast(newTimeDecoder(), new TimeClientHandler());
}
});
如果你是一个爱冒险的人,可能想去尝试一下 ReplayingDecoder甚至更多,查阅API参考的更多信息.
public classTimeDecoder extends ReplayingDecoder{
@Override
protected void decode(
ChannelHandlerContext ctx, ByteBufin, Listout) {
out.add(in.readBytes(4));
}
}
此外,Netty提供了开箱即用的解码器,使您可以实现大多数协议很容易和帮助你避免从结束了单片不可维护的处理程序的实现
请参考下面的包更详细的例子:
io.netty.example.factorial 二进制协议
io.netty.example.telnet 基于行的协议
POJO代替ByteBuf
我们回顾了迄今为止的所有例子使用ByteBuf作为协议消息的主要数据结构。这一节中,我们将改善时间协议客户机和服务器使用POJO代替ByteBuf。
使用POJO的优势在你ChannelHandlers是显而易见的;我们从ByteBuf从处理程序提取信息时,程序将变得更加可维护和可重用。在客户端和服务器的例子,我们读只有一个32位整数,这不是一个使用ByteBuf的主要问题。然而,你会发现有必要分离你的实现使它成为一个真正的全球协议。
首先,让我们定义一个名为UnixTime的新类型。
package io.netty.example.time;
importjava.util.Date;
public classUnixTime {
private final long value;
public UnixTime() {
this(System.currentTimeMillis() / 1000L+ 2208988800L);
}
public UnixTime(long value) {
this.value = value;
}
public long value() {
return value;
}
@Override
public String toString() {
return new Date((value() - 2208988800L)* 1000L).toString();
}
}
我们现在可以修改TimeDecoder UnixTime代替ByteBuf。
@Override
protected voiddecode(ChannelHandlerContext ctx, ByteBuf in, Listout) {
if (in.readableBytes() < 4) {
return;
}
out.add(newUnixTime(in.readUnsignedInt()));
}
更新后的解码器,TimeClientHandler不使用ByteBuf了:
@Override
public void channelRead(ChannelHandlerContextctx, Object msg) {
UnixTime m = (UnixTime) msg;
System.out.println(m);
ctx.close();
}
更简单和优雅的,对吗?同样的技术可以应用于服务器端。让我们首先更新TimeServerHandler:
@Override
public voidchannelActive(ChannelHandlerContext ctx) {
ChannelFuture f = ctx.writeAndFlush(newUnixTime());
f.addListener(ChannelFutureListener.CLOSE);
}
现在,唯一缺少的功能是一个编码器,实现ChannelOutboundHandler UnixTime转换为 ByteBuf。这是比编写一个译码器简单得多,因为没有需要处理的数据包编码消息时碎片和组装。
package io.netty.example.time;
public class TimeEncoderextends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContextctx, Object msg, ChannelPromise promise) {
UnixTime m = (UnixTime) msg;
ByteBuf encoded =ctx.alloc().buffer(4);
encoded.writeInt((int)m.value());
ctx.write(encoded, promise); // (1)
}
}
1在这里有一些重要的事情。
首先,netty根据原始的ChannelPromise,编码的数据实际上是写入情况,标记成功或失败。
第二,我们调用ctx.flush()。有一个单独的handler方法void flush(ChannelHandlerContext ctx) 覆盖flush()操作。
进一步简化,可以用MessageToByteEncoder:
public classTimeEncoder extends MessageToByteEncoder{
@Override
protected void encode(ChannelHandlerContextctx, UnixTime msg, ByteBuf out) {
out.writeInt((int)msg.value());
}
}
剩下的最后一项任务是在服务器端向ChannelPipeline插入TimeEncoder TimeServerHandler,并作为一个微不足道的练习。
关闭应用程序
关闭一个网状的应用程序通常是简单,通过shutdownGracefully()关闭所有创建的EventLoopGroups 。它返回一个 Future ,当EventLoopGroup已经完全终止, Channel 关闭是返回一个 Future .
总结
在本文中,快速的学习了如何用netty写一个程序.
更多的章节以后慢慢退出,鼓励你去看看 io.netty.example下面的例子
期待你的反馈或者建议 http://netty.io/community.html.
原文地址http://netty.io/wiki/user-guide-for-4.x.html
如果发现错误可以联系我 [email protected]
转载请注明来源