使用epoll 在 linux 上开发高性能应用服务器

标签: epoll linux 开发 | 发表时间:2013-09-17 17:31 | 作者:C++技术中心
出处:http://www.cppblog.com/

概述

epoll是linux提供一种多路复用的技术,类似各个平台都支持的select,只是epoll在内核的实现做了更多地优化,可以支持比select更多的文件描述符,当然也支持 socket这种网络的文件描述符。linux上的大并发的接入服务器,目前的实现方式肯定都通过epoll实现。


epoll和线程

有很多开发人员用epoll的时候,会开多个线程来进行数据通信,比如一个线程专门accept(我个人早些年在FreeBSD用kqueue的时候,由于对内部机制没有基本了解也这样搞),一个线程收发,或者接收和发送都用各自独立的线程。

通常情况下,accept独立线程是没必要的,accept对于内核而言,就应用程序从内核的未完成的SYN队列读取一点数据而已。具体参见 accept部分:

TCP三次握手过程与对应的Berkeley Socket APIs的介绍

 

收发独立成两个线程也没有必要,因为大部分的应用服务器,通常情况下,启动一个线程收发数据,最大数据的收发量瓶颈在于网卡,而不是CPU;像网游接入服务器配置一个KM的网卡,很少有游戏会申请1G的带宽,那一台机器得有多少数据输入和输出。所以我们通信线程为epoll服务器就够了。


epoll的基本原理

为了让某些朋友能读得更连惯,我还是说一下epoll基本原理。

epoll外部表现和select是一样的。他提供READ, WRITE和ERROR等事件。

大致流程像下面这样:

1. 应用注册感兴趣的事件到内核;

2. 内核在某种条件下,将事件通知应用程序;

3. 应用程序收到事件后,根据事件类型做对应的逻辑处理。


原理部分我再说一下,不容易理解的地方,包括水平触发和边缘触发,WRITE事件的事件利用(这个可以结合参考文献1的kqueue的WRITE事件,原理一致的)和修改事件的细节。

水平触发

READ事件,socket recv buff有数据,将一直向应用程序通知,直到buff为空。

WRITE事件,socket send buff从满的状态到可发送数据,将一直通知应用程序,直到buff满。


边缘触发

READ事件,socket recv buff有数据了,只通知应用一次,不管应用程序有没有调用read api,接下来都不通知了。

WRITE事件,socket send buff从满的状态到可以发送数据,只通知一次。

上面这个解释不知道大家能否理解,也只能这样说了。有疑问的做一下试验。另外,这些细节的东西,前几年固定下来后,这几年做的项目,是直接用的,也就很少在涉及细节,是凭理解和记忆写下的文字,万一有错请指正^-^。


WRITE事件的利用

这个还一下不好描述。大概描述一下,详细看参考文献1。大致这样:

1. 逻辑层写数据到应用层的发送buff,向epoll注册一下WRITE事件;

2. 这时epoll会通知应用程序一个WRITE事件;

3. 在WRITE事件响应函数里,从应用层的发送buff读数据,然后用socket send api发送。

因为我在很多实际项目中,看到大家没有利用epoll的WRITE的事件来发数据,特意地说一下。大部分的项目,是直接轮询应用程序的发送队列,我早期项目也是这么干的。


epoll的修改事件

对于这个我的映像比较深刻。epoll的修改事件比较坑爹,不能单独修改某个事件!怎么说呢?比如epoll里已经注册了READ&WRITE事件,你如果想单单重注册一下WRITE事件而且READ事件不变,epoll的epoll_ctl API是做不到的,你必须同时注册READ&WRITE,这个在下面的代码中可以看到。FreeBSD的kqueue在这一点完全满足我们程序员的要求。


抽象epoll API

我把herm socket epoll封装部分贴出来,让朋友们参考一下epoll的用法。大部分错误抛异常代码被我去掉了。

 

  1. class Multiplexor
  2. {
  3. public:
  4. Multiplexor(int size, int timeout = -1, bool lt = true);
  5. ~Multiplexor();
  6. void Run();
  7. void Register(ISockHandler* eh, MultiplexorMask mask);
  8. void Remove(ISockHandler* eh);
  9. void EnableMask(ISockHandler* eh, MultiplexorMask mask);
  10. void DisableMask(ISockHandler* eh, MultiplexorMask mask);
  11. private:
  12. inline bool OperateHandler(int op, ISockHandler* eh, MultiplexorMask mask)
  13. {
  14. struct epoll_event evt;
  15. evt.data.ptr = eh;
  16. evt.events = mask;
  17. return epoll_ctl(m_epfd, op, eh->GetHandle(), &evt) != -1;
  18. }
  19. private:
  20. int m_epfd;
  21. struct epoll_event* m_evts;
  22. int m_size;
  23. int m_timeout;
  24. __uint32_t m_otherMasks;
  25. };
class Multiplexor
{
public:
	Multiplexor(int size, int timeout = -1, bool lt = true);
	~Multiplexor();

	void Run();
	void Register(ISockHandler* eh, MultiplexorMask mask);
	void Remove(ISockHandler* eh);
	void EnableMask(ISockHandler* eh, MultiplexorMask mask);
	void DisableMask(ISockHandler* eh, MultiplexorMask mask);
private:
	inline bool OperateHandler(int op, ISockHandler* eh, MultiplexorMask mask)
	{
		struct epoll_event evt;
		evt.data.ptr = eh;
		evt.events = mask;
		return epoll_ctl(m_epfd, op, eh->GetHandle(), &evt) != -1;
	}
private:
	int m_epfd;
	struct epoll_event* m_evts;
	int m_size;
	int m_timeout;
	__uint32_t m_otherMasks;
};
 
  1. Multiplexor::Multiplexor(int size, int timeout, bool lt)
  2. {
  3. m_epfd = epoll_create(size);
  4. if (m_epfd == -1)
  5. throw HERM_SOCKET_EXCEPTION(ST_OTHER);
  6. m_size = size;
  7. m_evts = new struct epoll_event[size];
  8. m_timeout = timeout;
  9. // sys/epoll.h is no EPOLLRDHUP(0X2000), don't add EPOLLRDHUP
  10. m_otherMasks = EPOLLERR | EPOLLHUP;
  11. if (!lt)
  12. m_otherMasks |= EPOLLET;
  13. }
  14. Multiplexor::~Multiplexor()
  15. {
  16. close(m_epfd);
  17. delete[] m_evts;
  18. }
  19. void Multiplexor::Run()
  20. {
  21. int fds = epoll_wait(m_epfd, m_evts, m_size, m_timeout);
  22. if (fds == -1)
  23. {
  24. if (errno == EINTR)
  25. return;
  26. }
  27. for (int i = 0; i < fds; ++i)
  28. {
  29. __uint32_t evts = m_evts[i].events;
  30. ISockHandler* eh = reinterpret_cast<ISockHandler*>(m_evts[i].data.ptr);
  31. int stateType = ST_SUCCESS;
  32. if (evts & EPOLLIN)
  33. stateType = eh->OnReceive();
  34. if (evts & EPOLLOUT)
  35. stateType = eh->OnSend();
  36. if (evts & EPOLLERR || evts & EPOLLHUP)
  37. stateType = ST_EXCEPT_FAILED;
  38. if (stateType != ST_SUCCESS)
  39. eh->OnError(stateType, errno);
  40. }
  41. }
  42. void Multiplexor::Register(ISockHandler* eh, MultiplexorMask mask)
  43. {
  44. MultiplexorMask masks = mask | m_otherMasks;
  45. OperateHandler(EPOLL_CTL_ADD, eh, masks);
  46. }
  47. void Multiplexor::Remove(ISockHandler* eh)
  48. {
  49. // Delete fd from epoll, don't need masks
  50. OperateHandler(EPOLL_CTL_DEL, eh, ALL_EVENTS_MASK);
  51. }
  52. void Multiplexor::EnableMask(ISockHandler* eh, MultiplexorMask mask)
  53. {
  54. MultiplexorMask masks = mask | Herm::READ_MASK | Herm::WRITE_MASK;
  55. OperateHandler(EPOLL_CTL_MOD, eh, masks | m_otherMasks);
  56. }
  57. void Multiplexor::DisableMask(ISockHandler* eh, MultiplexorMask mask)
  58. {
  59. MultiplexorMask masks = (Herm::READ_MASK | Herm::WRITE_MASK) & (~mask);
  60. if (!OperateHandler(EPOLL_CTL_MOD, eh, masks | m_otherMasks))
  61. throw HERM_SOCKET_EXCEPTION(ST_OTHER);
  62. }
Multiplexor::Multiplexor(int size, int timeout, bool lt) 
{
	m_epfd = epoll_create(size);
	if (m_epfd == -1)
		throw HERM_SOCKET_EXCEPTION(ST_OTHER);
	
	m_size = size;
	m_evts = new struct epoll_event[size];

	m_timeout = timeout;

	// sys/epoll.h is no EPOLLRDHUP(0X2000), don't add EPOLLRDHUP
	m_otherMasks = EPOLLERR | EPOLLHUP;
	if (!lt)
		m_otherMasks |= EPOLLET;
}

Multiplexor::~Multiplexor()
{
	close(m_epfd);
	delete[] m_evts;
}

void Multiplexor::Run()
{
	int fds = epoll_wait(m_epfd, m_evts, m_size, m_timeout); 
	if (fds == -1)
	{
		if (errno == EINTR)
			return;
	}
	
	for (int i = 0; i < fds; ++i)
	{
		__uint32_t evts = m_evts[i].events;
		ISockHandler* eh = reinterpret_cast<ISockHandler*>(m_evts[i].data.ptr);
		int stateType = ST_SUCCESS;
		if (evts & EPOLLIN)
			stateType = eh->OnReceive();

		if (evts & EPOLLOUT)
			stateType = eh->OnSend();

		if (evts & EPOLLERR || evts & EPOLLHUP)
			stateType = ST_EXCEPT_FAILED;

		if (stateType != ST_SUCCESS)
			eh->OnError(stateType, errno);
	}
}

void Multiplexor::Register(ISockHandler* eh, MultiplexorMask mask)
{
	MultiplexorMask masks = mask | m_otherMasks;
	OperateHandler(EPOLL_CTL_ADD, eh, masks);
}

void Multiplexor::Remove(ISockHandler* eh)
{
	// Delete fd from epoll, don't need masks
	OperateHandler(EPOLL_CTL_DEL, eh, ALL_EVENTS_MASK);
}

void Multiplexor::EnableMask(ISockHandler* eh, MultiplexorMask mask)
{
	MultiplexorMask masks = mask | Herm::READ_MASK | Herm::WRITE_MASK;
	OperateHandler(EPOLL_CTL_MOD, eh, masks | m_otherMasks);
}

void Multiplexor::DisableMask(ISockHandler* eh, MultiplexorMask mask)
{
	MultiplexorMask masks = (Herm::READ_MASK | Herm::WRITE_MASK) & (~mask);
	if (!OperateHandler(EPOLL_CTL_MOD, eh, masks | m_otherMasks))
		throw HERM_SOCKET_EXCEPTION(ST_OTHER);
}

上面类就用到epoll_create(), epoll_ctl()和epoll_wait(),以及几种事件。epoll用起来比select清爽一些。


大致用法类似下面这样:

先定义一个Handler

 

  1. class StreamHandler : public Herm::ISockHandler
  2. {
  3. public:
  4. virtual Herm::Handle GetHandle() const;
  5. virtual int OnReceive(int);
  6. virtual int OnSend(int);
  7. };
class StreamHandler : public Herm::ISockHandler
{  
public:	
	virtual Herm::Handle GetHandle() const;
    virtual int OnReceive(int); 
	virtual int OnSend(int);
};

 

在OnReceive()处理收到数据的动作,在OnSend()。。。。

在通信线程中,大概类似这样的代码,实际看情况。

 

  1. Multiplexor multiplexor;
  2. StreamHandler sh;
  3. multiplexor.Register(&sh, READ_EVT);
  4. multiplexor.Run(...);
Multiplexor multiplexor;
StreamHandler sh;
multiplexor.Register(&sh, READ_EVT);
multiplexor.Run(...);


 

参考文献

1.

 

使用 kqueue 在 FreeBSD 上开发高性能应用服务器

FreeBSD上kqueue和epoll是类似的,有兴趣的朋友请参考。

http://blog.csdn.net/herm_lib/article/details/6047038

 

2.

【Herm程序员开发指导】第2章 Herm Framework和网络通信组件

这里涉及到epoll的通信层如何和逻辑数据交互的问题

 

http://blog.csdn.net/herm_lib/article/details/5980657



C++技术中心 2013-09-17 17:31 发表评论

相关 [epoll linux 开发] 推荐:

使用epoll 在 linux 上开发高性能应用服务器

- - C++博客-首页原创精华区
epoll是linux提供一种多路复用的技术,类似各个平台都支持的select,只是epoll在内核的实现做了更多地优化,可以支持比select更多的文件描述符,当然也支持 socket这种网络的文件描述符. linux上的大并发的接入服务器,目前的实现方式肯定都通过epoll实现. 有很多开发人员用epoll的时候,会开多个线程来进行数据通信,比如一个线程专门accept(我个人早些年在FreeBSD用kqueue的时候,由于对内部机制没有基本了解也这样搞),一个线程收发,或者接收和发送都用各自独立的线程.

linux kernel中epoll的设计和实现

- 神气 - pagefault
原创文章,转载请注明: 转载自pagefault. 本文链接地址: linux kernel中epoll的设计和实现. 这里就不贴源码了,源码分析的话,网上一大堆,我这里只是简要的描述下epoll的实现和一些关键的代码片段. 相关的文件在 fs/eventpoll.c中,我看的是2.6.38的内核代码..

linux下epoll模式和select模式的区别

- - Linux - 操作系统 - ITeye博客
        支持高并发连接. 官方测试的是5w并发连接但在实际生产中可制成2-4w并发连接数,得益于nginx使用最新的epoll(linux 2.6内核)和kqueue(freebsd)网络I/O模型. 而apache使用的则是传统的select模型,其比较稳定的prefork模式为多进程模式,需要经常派生子进程,所消耗的CPU等服务器资源要比nginx高的多.

poll,select与epoll

- - 操作系统 - ITeye博客
select的参数类型fd_set没有将文件描述符和事件绑定,它仅仅是一个文件描述符集合,因此select需要提供3个这种类型的参数来分别传入和输出可读,可写及异常等事件.这一方面使得select不能处理更多类型的事件,另一方面由于内核对fd_set集合的在线修改,应用程序下次调用select前不得不重置这3个fd_set集合.

WPS for Linux 开发中

- Quantum - Wow! Ubuntu
据消息称,WPS for Linux 办公套件目前正在开发中,如图:. # 本文采用CC协议进行授权,转载本文请注明本文链接. - Twitter / 微博 / 问答 / 投稿 / 加入我们 wow0slx6bcs721xo1udcc. - 高性价比 Ubuntu VPS / 本站架设于 PhotonVPS / 定制 Ubuntu T-Shirt.

EPOLL下的accept(转载)

- chuang - C++博客-首页原创精华区
     摘要: (转载者注:看完这个,再回头看看nginx源码,发现它在accept时用的是LT模式,read,write时是ET模式)不知道是谁第一个犯了错,在网上贴出所谓epoll通用框架的代码. 注意看accpet的处理:1epfd = epoll_create(10);2 3struct sockaddr_in clientaddr;4struct sockaddr_in se...  阅读全文.

epoll网络编程实例

- - CSDN博客推荐文章
       在前面已经经过了PPC、TPC、select之类( TPC就是使用进程处理data,TPC就是使用线程处理 ),前面两个的缺点大家应该都是知道的是吧,对于select( 其实poll和他差不多 ),缺点是能同时连接的fd是在是不多,在linux中一般是1024/2048,对于很大的服务器来说是不够的.

select、poll、epoll之间的区别总结

- - 企业架构 - ITeye博客
select、poll、epoll之间的区别总结. select,poll,epoll都是IO多路复用的机制. I/O多路复用就通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作. 但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间.

epoll机制在搜索引擎spider中的应用

- - 海之沙
本文将介绍epoll的概念,原理, 优点,及使用接口,同时结合作者在搜索引擎spider开发中epoll使用方式的代码向大家具体介绍epoll的使用方式. 笔者08年曾有使用epoll编写未考虑压力控制的crawler,将国内著名票务网站压垮并在boss的带领下登门道歉的经历:) 足见epoll的强悍!.

epoll 或者 kqueue 的原理(摘自知乎蓝形参)

- - Linux - 操作系统 - ITeye博客
首先我们来定义流的概念,一个流可以是文件,socket,pipe等等可以进行I/O操作的内核对象. 不管是文件,还是套接字,还是管道,我们都可以把他们看作流. 之后我们来讨论I/O的操作,通过read,我们可以从流中读入数据;通过write,我们可以往流写入数据. 现在假定一个情形,我们需要从流中读数据,但是流中还没有数据,(典型的例子为,客户端要从socket读如数据,但是服务器还没有把数据传回来),这时候该怎么办.