基于tornado的异步TCPServer以及TCPClient
- - ITeye博客关于tornado,我这里就不详细讲了,有兴趣的同学可以通过以下两篇博客了解一下:. 我们一般用tornado来编写web程序,但实际上,tornado底层的代码非常优秀,也可以用这些代码来编写TCP应用. tornado最突出的特点就是“异步”,所以,我这里编写了一个异步的TCPServer和一个异步的TCPClient来帮助大家理解,下面直接看代码:.
关于tornado,我这里就不详细讲了,有兴趣的同学可以通过以下两篇博客了解一下:
我们一般用tornado来编写web程序,但实际上,tornado底层的代码非常优秀,也可以用这些代码来编写TCP应用。
tornado最突出的特点就是“异步”,所以,我这里编写了一个异步的TCPServer和一个异步的TCPClient来帮助大家理解,下面直接看代码:
文件:tcp_server.py
#!/usr/bin/env python2.7 # -*- coding: utf-8 -*- from tornado import ioloop, httpclient, gen from tornado.gen import Task from tornado.tcpserver import TCPServer import pdb, time, logging from tornado import stack_context from tornado.escape import native_str #Init logging def init_logging(): logger = logging.getLogger() logger.setLevel(logging.DEBUG) sh = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s') sh.setFormatter(formatter) logger.addHandler(sh) logging.info("Current log level is : %s", logging.getLevelName(logger.getEffectiveLevel())) class MyServer(TCPServer): def __init__(self, io_loop=None, **kwargs): TCPServer.__init__(self, io_loop=io_loop, **kwargs) def handle_stream(self, stream, address): TCPConnection(stream, address, io_loop=self.io_loop) class TCPConnection(object): def __init__(self, stream, address, io_loop): self.io_loop = io_loop self.stream = stream self.address = address self.address_family = stream.socket.family self.EOF = b' END' self._clear_request_state() self._message_callback = stack_context.wrap(self._on_message) self.stream.set_close_callback(self._on_connection_close) self.stream.read_until(self.EOF, self._message_callback) def _on_timeout(self): logging.info("Send message..") self.write("Hello client!" + self.EOF) def _on_message(self, data): try: timeout = 5 data = native_str(data.decode('latin1')) logging.info("Received: %s", data) self.io_loop.add_timeout(self.io_loop.time() + timeout, self._on_timeout) except Exception, ex: logging.error("Exception: %s", str(ex)) def _clear_request_state(self): """Clears the per-request state. """ self._write_callback = None self._close_callback = None def set_close_callback(self, callback): """Sets a callback that will be run when the connection is closed. """ self._close_callback = stack_context.wrap(callback) def _on_connection_close(self): if self._close_callback is not None: callback = self._close_callback self._close_callback = None callback() self._clear_request_state() def close(self): self.stream.close() # Remove this reference to self, which would otherwise cause a self._clear_request_state() def write(self, chunk, callback=None): """Writes a chunk of output to the stream.""" if not self.stream.closed(): self._write_callback = stack_context.wrap(callback) self.stream.write(chunk, self._on_write_complete) def _on_write_complete(self): if self._write_callback is not None: callback = self._write_callback self._write_callback = None callback() def main(): init_logging() server = MyServer() server.listen(8001) ioloop.IOLoop.instance().start() if __name__ == "__main__": try: main() except Exception, ex: print "Ocurred Exception: %s" % str(ex) quit()
文件: tcp_client.py
#!/usr/bin/env python2.7 # -*- coding: utf-8 -*- from tornado import ioloop, httpclient, gen from tornado.gen import Task import pdb, time, logging import tornado.ioloop import tornado.iostream import socket #Init logging def init_logging(): logger = logging.getLogger() logger.setLevel(logging.DEBUG) sh = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s') sh.setFormatter(formatter) logger.addHandler(sh) logging.info("Current log level is : %s", logging.getLevelName(logger.getEffectiveLevel())) class TCPClient(object): def __init__(self, host, port, io_loop=None): self.host = host self.port = port self.io_loop = io_loop self.shutdown = False self.stream = None self.sock_fd = None self.EOF = b' END' def get_stream(self): self.sock_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) self.stream = tornado.iostream.IOStream(self.sock_fd) self.stream.set_close_callback(self.on_close) def connect(self): self.get_stream() self.stream.connect((self.host, self.port), self.send_message) def on_receive(self, data): logging.info("Received: %s", data) self.stream.close() def on_close(self): if self.shutdown: self.io_loop.stop() def send_message(self): logging.info("Send message....") self.stream.write(b"Hello Server!" + self.EOF) self.stream.read_until(self.EOF, self.on_receive) logging.info("After send....") def set_shutdown(self): self.shutdown = True def main(): init_logging() io_loop = tornado.ioloop.IOLoop.instance() c1 = TCPClient("127.0.0.1", 8001, io_loop) c2 = TCPClient("127.0.0.1", 8001, io_loop) c1.connect() c2.connect() c2.set_shutdown() logging.info("**********************start ioloop******************") io_loop.start() if __name__ == "__main__": try: main() except Exception, ex: print "Ocurred Exception: %s" % str(ex) quit()
分别运行tcp_server.py和tcp_client.py,通过测试结果,可以非常明显的了解到tornado的“异步”,以及强大的性能。
测试结果如下:
tcp_server:
tcp_client: