Redis的事件循环与定时器模型

标签: NoSql epoll Redis | 发表时间:2011-10-07 19:33 | 作者:levin ndv
出处:http://basiccoder.com

假期的最后一天,简单翻阅了下Redis的源码,读一款server软件的源码我一般是从进程/线程模型开始的,Redis让我有些诧异,它采用了单进程单线程的模型,一般的server软件都会采用多进程或者多线程再或者多线程多进程混合的模型来设计,从而充分利用多核处理器的并行计算能力来提高软件的性能,Redis这种模型我只能推断程序的可并行化程度不高,顺序计算反而能省去多线程同步和维护线程池/进程池的开销,我对于数据库server端的设计没有什么经验也没有太多的理解,如有谬误欢迎大家指正。

当然,这里要写的不是关于Redis的进程模型,而是Redis的事件模型和定时器模型。

Redis没有依赖libevent,而是自己通过IO多路复用的方式来实现了事件循环和定时器,不像nginx或者apache有多种多路复用方式可供选择,Redis只采用了三种:epoll/kqueue/select,默认采用epoll,在linux环境下最优的方式当然是epoll,当在FreeBSD平台下epoll不存在时则使用kqueue,当然若两种方式都未定义则使用性能最差的select,我只阅读了跟epoll相关的代码。

main()函数的最后调用了aeMain()这个函数进入Redis的事件循环,这个函数的很简单,循环调用aeProcessEvents()来对事件进行处理:

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

在此之前Redis做了很多初始化的工作,这些工作大多是在initServer()这个函数中执行的,初始化一些相关的list,dict等,调用aeCreateEventLoop()初始化eventloop,这个函数初始化eventloop相关的数据结构,并最终调用了epoll_create()函数,对epoll上下文进行初始化。紧接着Redis创建了用于listen的socket对象,并调用aeCreateFileEvent()把该socket描述符的读事件加入到事件池中去,另外,还调用了aeCreateTimeEvent()函数来初始化一下定时器,定期地执行serverCron()这个函数,接下来看一下aeCreateFileEvent()aeCreateTimeEvent()这两个函数。

aeCreateFileEvent()这个函数初始化aeFileEvent结构(该结构保存事件的一些状态,以及事件的文件描述符等),并调用aeApiAddEvent()函数将描述符相关的事件添加到事件池中,对于epoll它的实现如下:

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee;
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;
 
    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.u64 = 0; /* avoid valgrind warning */
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

非常简洁,这个函数只不过是把epoll_ctl()相关的操作做了一下封装,至此描述符已经加入到事件池中进行监听了,接着看aeCreateTimeEvent()这个函数。

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;
 
    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    te->next = eventLoop->timeEventHead;
    eventLoop->timeEventHead = te;
    return id;
}

同样是初始化数据结构,但没有调用aeApiAddEvent()这个函数,当然,定时器又不需要文件描述符,当然不需要添加相关事件,定时器的实现只是使用了epoll_wait()的定时功能,aeAddMillisecondsToNow()这个函数顾名思义是把当前时间加上一个给定的毫秒数,然后算出一个when_sec和when_ms,eventloop对象的timeEventHead实际上是一个单向链表,它用于保存所有的定时器事件,当添加一个定时器事件时其实只是向该链表头中插入了一个元素,其会后由aeProcessEvents()这个函数遍历该链表取出超时的事件进行处理,接着我们看下这个事件处理里面最核心的函数。

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;
 
    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
 
    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;
 
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;
 
            /* Calculate the time missing for the nearest
             * timer to fire. */
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to se the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }
 
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;
 
	    /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
 
    return processed; /* return the number of processed file/time events */
}

这其中的aeApiPoll()这个函数其实就是对epoll_wait()操作的一个封装,epoll_wait()的最后一个参数是一个毫秒级的超时时间,Redis充分利用了这个时间在对IO事件进行监听的同时来实现了定时。这个函数的前面一大部分代码是在计算这个超时时间的,它调用aeSearchNearestTimer()这个函数来获取最近要超时的一个定时器对象,如何获取的呢?就是遍历刚才的提到的那个timeEventHead链表,来找出时间值最小的一个,注意是遍历,因为链表中的定时器也是无序的,不过我相信作者有一天会把它换成红黑树或者其它的数据结构吧。如果找到一个将要超时的定时器,则将它与当前时间进行比较,如果当前时间大于定时器时间则表示定时器已超时,将超时时间设为0,若当前时间小于定时器时间,则将超时时间设为两者之差。如果定时器队列为空,或者说没有任何定时器事件,则可以根据AE_DONT_WAIT这个标志来决定epoll_wait()是non-blocking立即返回,还是一直阻塞在那里。

aeApiPoll()函数返回时,有两种情况,一种是IO事件被触发,另一种是定时器超时,当IO事件被触发时,遍历所有活跃描述符并调用相关的回调函数对其进行处理。当没有IO事件被触发,而是超时时,则返回值numevents为0,函数会转向processTimeEvents()来遍历定时器列表,调用定时器回调函数处理定时器事件,当IO事件被触发而并没有定时器超时时,如果设置了AE_TIME_EVENTS标志则也会对定时器列表进行遍历,主循环便是如此,我认为这会多少对效率有一定的影响,当然可能现在的Redis定时器列表并不太大,所以效率问题也可以忽略。

以上是简单地对今天的工作做的总结,欢迎大家批评指教。

相关 [redis 事件循环 定时器] 推荐:

Redis的事件循环与定时器模型

- ndv - basic coder
当然,这里要写的不是关于Redis的进程模型,而是Redis的事件模型和定时器模型. 在main()函数的最后调用了aeMain()这个函数进入Redis的事件循环,这个函数的很简单,循环调用aeProcessEvents()来对事件进行处理:. 在此之前Redis做了很多初始化的工作,这些工作大多是在initServer()这个函数中执行的,初始化一些相关的list,dict等,调用aeCreateEventLoop()初始化eventloop,这个函数初始化eventloop相关的数据结构,并最终调用了epoll_create()函数,对epoll上下文进行初始化.

JavaScript单线程和浏览器事件循环简述

- - 破狼 Blog
JavaScript单线程. 在上篇博客 《Promise的前世今生和妙用技巧》的开篇中,我们曾简述了JavaScript的单线程机制和浏览器的事件模型. 应很多网友的回复,在这篇文章中将继续展开这一个话题. 当然这里是博主的一些理解,如果还存在什么纰漏的话,请不吝指教. JavaScript这门语言运行在浏览器中,是以单线程的方式运行的.

Redis 负载监控——redis-monitor

- - ITeye资讯频道
redis-monitor是一个Web可视化的 redis 监控程序. 使用 Flask 来开发的,代码结构非常简单,适合移植到公司内网使用. redis 服务器信息,包括 redis 版本、上线时间、 os 系统信息等等. 实时的消息处理信息,例如处理 command 数量、连接总数量等. 内存占用、 cpu 消耗实时动态图表.

Redis 起步

- - 博客园_首页
Rdis和JQuery一样是纯粹为应用而产生的,这里记录的是在CentOS 5.7上学习入门文章:. Redis是一个key-value存储系统. 和Memcached类似,但是解决了断电后数据完全丢失的情况,而且她支持更多无化的value类型,除了和string外,还支持lists(链表)、sets(集合)和zsets(有序集合)几种数据类型.

redis 配置

- - 谁主沉浮
# 当配置中需要配置内存大小时,可以使用 1k, 5GB, 4M 等类似的格式,其转换方式如下(不区分大小写). # 内存配置大小写是一样的.比如 1gb 1Gb 1GB 1gB. # daemonize no 默认情况下,redis不是在后台运行的,如果需要在后台运行,把该项的值更改为yes. # 当redis在后台运行的时候,Redis默认会把pid文件放在/var/run/redis.pid,你可以配置到其他地址.

Cassandra代替Redis?

- - Tim[后端技术]
最近用Cassandra的又逐渐多了,除了之前的360案例,在月初的QCon Shanghai 2013 篱笆网也介绍了其使用案例. 而这篇 百万用户时尚分享网站feed系统扩展实践文章则提到了Fashiolista和Instagram从Redis迁移到Cassandra的案例. 考虑到到目前仍然有不少网友在讨论Redis的用法问题,Redis是一个数据库、内存、还是Key value store?以及Redis和memcache在实际场景的抉择问题,因此简单谈下相关区别.

redis 部署

- - CSDN博客云计算推荐文章
一、单机部署 tar xvf redis-2.6.16.tar.gz cd redis-2.6.16 make make PREFIX=/usr/local/redis install  #指定安装目录为/usr/local/redis,默认安装安装到/usr/local/bin. # chkconfig: 2345 80 10       #添加redhat系列操作系统平台,开机启动需求项(运行级别,开机时服务启动顺序、关机时服务关闭顺序) # description:  Starts, stops redis server.

nagios 监控redis

- - C1G军火库
下载check_redis.pl. OK: REDIS 2.6.12 on 192.168.0.130:6379 has 1 databases (db0) with 49801 keys, up 3 days 14 hours - connected_clients is 1, blocked_clients is 0 | connected_clients=1 blocked_clients=0.

转 redis vs memcached

- - 数据库 - ITeye博客
传统MySQL+ Memcached架构遇到的问题.   实际MySQL是适合进行海量数据存储的,通过Memcached将热点数据加载到cache,加速访问,很多公司都曾经使用过这样的架构,但随着业务数据量的不断增加,和访问量的持续增长,我们遇到了很多问题:.   1.MySQL需要不断进行拆库拆表,Memcached也需不断跟着扩容,扩容和维护工作占据大量开发时间.

Redis优化

- - 数据库 - ITeye博客
键名:尽量精简,但是也不能单纯为了节约空间而使用不易理解的键名. 键值:对于键值的数量固定的话可以使用0和1这样的数字来表示,(例如:male/female、right/wrong). 当业务场景不需要数据持久化时,关闭所有的持久化方式可以获得最佳的性能,不过一般都要持久化比较安全,而且是快照和aof同时使用比较安全.