Java NIO服务器实例

标签: 基础技术 教程 nio | 发表时间:2014-11-04 08:00 | 作者:一直在路上
出处:http://www.importnew.com

我一直想学习如何用Java写一个 非阻塞IO服务器,但无法从网上找到一个满足要求的服务器。我找到了 这个示例,但仍然没能解决我的问题。还可以选择 Apache MINA框架。但我的要求相对简单,MINA对我来说还稍微有点复杂。所以在MINA和一些教程(参见 这篇这篇)的帮助下,我自己写了一个非阻塞IO服务器。

我的代码可以从 这里下载。这只是个示例代码,如果需要可以随意修改它。这个示例由一个抽象的非阻塞服务器和一个配对的阻塞客户端组成。需要创建一个具体的实现来使用它们——可以通过测试用例来查看这个样例是如何工作的。两者都被设计为在自己的线程中运行(因此实现了Runnable接口),而且是单线程的——后面会有更多的并发选项。当客户端仅连接到单一服务器时是阻塞的,并且仅在自己的线程中运行。客服端还需要等待服务器端的返回信息,所以将客户端设计为非阻塞是没有意义的。本服务器只处理标准的TCP连接。如果使用的是UDP、SSL或别的协议的话,需要自己添加实现。

在写个示例的代码时,我学到了一些东西。除了调用标准的API来打开和管理连接以外,还掌握了selection keys的不同使用方式、消息处理技巧和线程问题,这些都是十分有用的。

打开和管理一个连接的基本方式在网络上十分常用,而且在下面的示例代码段中也有出现(只有代码片段——可以从代码下载中取得 完整版本)。从打开一个 Selector 开始(一种网络信道多路复用器 multiplexor)。Selector通过selectionkey来表示每一个信道,然后打开一个指定端口的套接字节服务器。将selector、SelectionKey.OP_ACCEPT作为参数在socket服务器上注册,任何接入连接在selector上都是有效的。下面的代码一直在循环等待selector的事件。当事件发生时,如果是一个连接请求,套 字节服务器会接受连接并注册链接发出的消息(通过OP_READ 注册)。如果它是一个信息(key.isreadable()),处理信息的代码尚未实现。下面的代码也很脆弱,任何错误都会导致服务器停止工作。

Selector selector = null;
ServerSocketChannel server = null;
try { 
	selector = Selector.open(); 
	server = ServerSocketChannel.open(); 
	server.socket().bind(new InetSocketAddress(port)); 
	server.configureBlocking(false); 
	server.register(selector, SelectionKey.OP_ACCEPT); 
	while (true) {
		selector.select();
		for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) { 
			SelectionKey key = i.next(); 
			i.remove(); 
			if (key.isConnectable()) { 
				((SocketChannel)key.channel()).finishConnect(); 
			} 
			if (key.isAcceptable()) { 
				// accept connection 
				SocketChannel client = server.accept(); 
				client.configureBlocking(false); 
				client.socket().setTcpNoDelay(true); 
				client.register(selector, SelectionKey.OP_READ);
			} 
			if (key.isReadable()) { 
				// ...read messages...
			} 
		}
	}   		
} catch (Throwable e) { 
	throw new RuntimeException("Server failure: "+e.getMessage());
} finally {
	try {
		selector.close();
		server.socket().close();
		server.close();
		stopped();
	} catch (Exception e) {
		// do nothing - server failed
	}
}

值得注意的是,一个selection key不代表一个套接字。相反,他们是selector注册的信道。因此,一个来自客户端的连接事件(OP_ACCEPT 事件)将使用与客户端发送消息(OP_READ事件)不同的key通知。这意味着,来自同一个客户端不同类型的事件将会用不同的key。不要试图对这些key进行比较。这样做的好处是,不同的事件 可以用不同的selector注册(这样做的原因是线程的——下面会详细说明)。

当读取一条信息时,有很多的情况需要考虑。当读取连接的结果数据时,这个信息可能是不完整的(剩余的数据要晚些才能获得),也可能包含不止一条消息。因此, 必须考虑消息结尾是如何表示的。读取数据时要将数据放入缓冲和然后拆分为有效的信息。标识消息结尾通常有以下几种方式:

  1. 固定的消息大小。
  2. 将消息的长度作为消息的前缀。
  3. 用一个特殊的符号来标识消息的结束。

我的代码使用了第二种方式。每种方式都会以2个字节开始,用来存储消息体的字节数(因此消息长度被限制为65535字节以内)。因为数据也是使用 ByteBuffers来读取的,所以了解一下如何使用它们会很有帮助(可以出这里的 API链接入手)。下面的代码会读取数据并将结果传给readmessage方法。在readMessage方法中这些数据被拆分成独立的消息。请注意readbuffer的用法。默认缓冲区应尽可小,但也不要设置过小。这样会造成消息大小经常大于缓冲区。缓冲区越小,处理的速度就越快。但是,如果接收到的消息大小超过缓冲区,那么必须重新缓冲区设置来处理消息。

private List<ByteBuffer> readIncomingMessage(SelectionKey key) throws IOException { 
	ByteBuffer readBuffer = readBuffers.get(key); 
	if (readBuffer==null) {
		readBuffer = ByteBuffer.allocate(defaultBufferSize); 
		readBuffers.put(key, readBuffer); 
	}
	if (((ReadableByteChannel)key.channel()).read(readBuffer)==-1) {
		throw new IOException("Read on closed key");
	}

	readBuffer.flip(); 
	List<ByteBuffer> result = new ArrayList<ByteBuffer>();

	ByteBuffer msg = readMessage(key, readBuffer);
	while (msg!=null) {
		result.add(msg);
		msg = readMessage(key, readBuffer);
	}

 	return result;
}

下面的代码用来将缓存数据转化为消息。

private ByteBuffer readMessage(SelectionKey key, ByteBuffer readBuffer) {
	int bytesToRead; 
	if (readBuffer.remaining()>messageLength.byteLength()) { // must have at least enough bytes to read the size of the message	
 		byte[] lengthBytes = new byte[messageLength.byteLength()];
		readBuffer.get(lengthBytes);
		bytesToRead = (int)messageLength.bytesToLength(lengthBytes);
		if ((readBuffer.limit()-readBuffer.position())<bytesToRead) { 
			// Not enough data - prepare for writing again 
			if (readBuffer.limit()==readBuffer.capacity()) {
	    		// message may be longer than buffer => resize buffer to message size
				int oldCapacity = readBuffer.capacity();
				ByteBuffer tmp = ByteBuffer.allocate(bytesToRead+messageLength.byteLength());
				readBuffer.position(0);
				tmp.put(readBuffer);
				readBuffer = tmp;   				
				readBuffer.position(oldCapacity); 
    			readBuffer.limit(readBuffer.capacity()); 
				readBuffers.put(key, readBuffer); 
	    		return null;
	    	} else {
	    		// rest for writing
    			readBuffer.position(readBuffer.limit()); 
    			readBuffer.limit(readBuffer.capacity()); 
    			return null; 
	    	}
		} 
	} else { 
		// Not enough data - prepare for writing again 
		readBuffer.position(readBuffer.limit()); 
		readBuffer.limit(readBuffer.capacity()); 
		return null; 
	} 
	byte[] resultMessage = new byte[bytesToRead];
	readBuffer.get(resultMessage, 0, bytesToRead); 
	// remove read message from buffer
	int remaining = readBuffer.remaining();
	readBuffer.limit(readBuffer.capacity());
	readBuffer.compact();
	readBuffer.position(0);
	readBuffer.limit(remaining);
	return ByteBuffer.wrap(resultMessage);
}

示例中的代码是单线程的——所有的连接都是由同一个线程处理。也可以使用多线程。尽管在某一时刻只有一个线程可以工作(也就是说,不可能有2个线程都在在执行读操作),但是读写操作可以由不同的线程通过独立的key来完成。同样的,在某一时刻只有一个线程可以使用selector。虽然单线程代码就能满足我的需要,但是有很多的方法可以并发处理。下面我分别描述使用线程池数据读事件、使用单一selector和线程处理OP_ACCEPT事件。

  1. 用一个selector来对应多个客户端连接。收到accept事件时,会创建一个新的selector并在这个新的selector上注册读事件。新创建的selector用来监听和处理读事件,这个任务是在线程池中执行的。由于不能确定selector对资源占用的影响,所以不知道这种做法的扩展性如何。
  2. 每个线程都启用一个selector,在创建执行线程时通过负载均衡的方式分配一个selector。将客户端分配给对应的selector,每个线程都在自己的selector中处理读事件,这是MINA的处理方式。这样处理问题是如何均衡线程的处理(MINA使用了轮叫round-robin调度算法)——如果不小心,结果会导致是有的线程非常繁忙有的线程处于空闲状态。
  3. 所有的事件都在同一个selector上处理,同步时需要小心处理。当传递key给某一个线程准备读取时,要保证这个key没有正准备被其他的线程所读取,直到当前的操作结束。
    在我想到最好的解决方式之前,selector处理的工作会非常繁重。

我会将如何处理并发这个问题留给感兴趣的读者。祝读者们在编码过程中一切顺利,我的例子可以在 这里下载。

2011年12月22日更新:有读者来信指出来原始的测试用例中有bug,有些测试用例中使用的是将字节转换为字节流 InputStreamReader。如果使用了非8位的字符集,那么测试用户将由于消息长度而失败(发生在转意消息头部时),我已更新了示例中的测试用例修正该问题。

相关文章

相关 [java nio 服务器] 推荐:

Java NIO服务器实例

- - ImportNew
我一直想学习如何用Java写一个 非阻塞IO服务器,但无法从网上找到一个满足要求的服务器. 我找到了 这个示例,但仍然没能解决我的问题. 还可以选择 Apache MINA框架. 但我的要求相对简单,MINA对我来说还稍微有点复杂. 所以在MINA和一些教程(参见 这篇和 这篇)的帮助下,我自己写了一个非阻塞IO服务器.

Java BIO、NIO、AIO

- - zzm
先来个例子理解一下概念,以银行取款为例:. 同步 : 自己亲自出马持银行卡到银行取钱(使用同步IO时,Java自己处理IO读写). 异步 : 委托一小弟拿银行卡到银行取钱,然后给你(使用异步IO时,Java将IO读写委托给OS处理,需要将数据缓冲区地址和大小传给OS(银行卡和密码),OS需要支持异步IO操作API).

java nio SocketChannel 服务器端与多客户端 信息交互(聊天功能)

- - BlogJava_首页
    //解码buffer  .     /*接受数据缓冲区*/  .     /*发送数据缓冲区*/  .     /*映射客户端channel */  .          *启动服务器端,配置为非阻塞,绑定端口,注册accept事件 .          *ACCEPT事件:当服务端收到客户端连接请求时,触发该事件 .

java nio和io的比较

- - 互联网 - ITeye博客
第一部分:简单介绍NIO.     服务器在合理时间内处理大量客户机的请求的能力取决于服务器使用I/O流的效率,同时为成百上千的客户提供服务的服务器必须能并发的使用I/O服务.     用Java语言写的服务器,由于其线程与客户机之比几乎是一比一,因而易受到大量线程开销的影响,其结果是即导致性能问题,又缺乏伸缩性.

Java NIO 系列教程

- - 编程语言 - ITeye博客
Java NIO提供了与标准IO不同的IO工作方式:. Channels and Buffers(通道和缓冲区):标准的IO基于字节流和字符流进行操作的,而NIO是基于通道(Channel)和缓冲区(Buffer)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中. Asynchronous IO(异步IO):Java NIO可以让你异步的使用IO,例如:当线程从通道读取数据到缓冲区时,线程还是可以进行其他事情.

Java NIO编程的技巧和陷阱

- 小丑鱼 - 淘宝JAVA中间件团队博客
去年做的分享,一直上传slideshare失败,今天又试了下,成功了. 这个主题主要介绍Java NIO编程的技巧和陷阱,解读了一些NIO框架的源码,以及编写高性能NIO网络框架所需要注意的技巧和缺陷. 去年写了篇blog提供了pdf版本的下载,看这里.

Grizzly 2.2发布,开源Java NIO框架

- - ITeye资讯频道
Grizzly框架近日 发布了2.2版本,该版本带来了相当多新特性与改进,而且加入了最新WebSocket规范的实现. Grizzly是一个应用程序框架,专门用于解决编写成千上万用户访问服务器时候产生的各种问题. Grizzly框架诞生于GlassFish项目,能够帮助开发人员利用Java NIO API构建可扩展、高性能、健壮的服务器,编写出可伸缩的服务器端应用.

攻破JAVA NIO技术壁垒

- - CSDN博客推荐文章
现在使用NIO的场景越来越多,很多网上的技术框架或多或少的使用NIO技术,譬如Tomcat,Jetty. 学习和掌握NIO技术已经不是一个JAVA攻城狮的加分技能,而是一个必备技能. 再者,现在互联网的面试中上点level的都会涉及一下NIO或者AIO的问题(AIO下次再讲述,本篇主要讲述NIO),掌握好NIO也能帮助你获得一份较好的offer.

JAVA安全之JAVA服务器安全漫谈

- - WooYun知识库
本文主要针对JAVA服务器常见的危害较大的安全问题的成因与防护进行分析,主要为了交流和抛砖引玉. 以下为任意文件下载漏洞的示例. DownloadAction为用于下载文件的servlet. 在对应的download.DownloadAction类中,将HTTP请求中的filename参数作为待下载的文件名,从web应用根目录的download目录读取文件内容并返回,代码如下.

socketio-netty(socket.io 服务器端JAVA实现) 近期升级手记

- - BlogJava-首页技术区
针对JAVA开发者, socketio-netty是一个socket.io的服务器端选择,又是目前兼容最新0.9+ – 1.0的JAVA服务器端实现. 从 http://socket.io官网来看,最近版本升级趋于缓和,几乎是没修正一个Bug,小版本就增加一次. 已经是非常稳定的版本了,可以真正使用了.