memcache 网络驱动模型 (原创+转载)

标签: memcache 网络 模型 | 发表时间:2015-08-06 22:12 | 作者:on__the__way
出处:http://www.iteye.com

memcache使用多线程模型程序,使用libevent事件模型,与nginx类似,由一个主线程和多个worker线程组成。

1,主线程初始化。

    memcache的主线程主要进行在监听套接字上注册事件监听网络中客户端的连接及处理和worker的初始化的工作。
    首先初始化主线程的libevent实例,然后初始化所有worker线程,并为每个worker建立通知的管道用于主线程与worker之间的通信,然后每个worker线程都会注册管道可读的libevent事件,再为每个worker建立自己的CQ队列用于存放主线程分配的客户端连接,最后启动所有worker线程,并在监听套接字上箭筒客户端连接的事件,启动主线程的循环监听。同时每个worker启动后也会进入循环监听。

    线程的初始化

void thread_init(int nthreads, struct event_base *main_base) {
 //。。。省略
   threads = malloc(sizeof(LIBEVENT_THREAD) * nthreads);// 申请内容空间
    if (! threads) {
        perror("Can't allocate thread descriptors");
        exit(1);
    }

    threads[0].base = main_base;
    threads[0].thread_id = pthread_self();
    
    // 为每个线程创建管道
    for (i = 0; i < nthreads; i++) {
        int fds[2];
        if (pipe(fds)) {
            perror("Can't create notify pipe");
            exit(1);
        }

        threads[i].notify_receive_fd = fds[0];
        threads[i].notify_send_fd = fds[1];

    setup_thread(&threads[i]);
    }

    /* Create threads after we've done all the libevent setup. */
    for (i = 1; i < nthreads; i++) {
        create_worker(worker_libevent, &threads[i]);
    }
}

    setup_thread主要做了三个事情,创建所有workers线程的libevent实例,注册所有workers线程的管道读端的libevent的读事件,初始化CQ队列。
create_worker真正启动了线程,进入循环监听。

 

2,几个结构体

    CQ_ITEM 是主线程accept客户端连接后,新分配的客户端套接字的封装,除了fd,还有state,缓冲区大小,协议类型等。

/* An item in the connection queue. */
typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
    int     sfd;
    int     init_state;
    int     event_flags;
    int     read_buffer_size;
    int     is_udp;
    CQ_ITEM *next;
};

    CQ是由CQ_ITEM组成链表,称为CQ队列,每个worker维护自己的CQ队列,对应每个worker自己维护的客户端连接。

/* A connection queue. */
typedef struct conn_queue CQ;
struct conn_queue {
    CQ_ITEM *head;
    CQ_ITEM *tail;
    pthread_mutex_t lock;
    pthread_cond_t  cond;
};

    每当主线程新分配一个客户端连接后都会通过round的方式选择一个worker线程,并将新的CQ_ITEM放入对应worker的CQ队列,同时为了通知worker及时的获悉这个队列,会在管道中写入'c',这样worker端注册的libevent可读事件触发,由于采取的水平触发,因此这里必须将'c'取出否则会一直触发。

typedef struct {
    pthread_t thread_id;        /* unique ID of this thread */
    struct event_base *base;    /* libevent handle this thread uses */
    struct event notify_event;  /* listen event for notify pipe */
    int notify_receive_fd;      /* receiving end of notify pipe */
    int notify_send_fd;         /* sending end of notify pipe */
    CQ  new_conn_queue;         /* queue of new connections to handle */
} LIBEVENT_THREAD;

    这是每个线程的封装,可知每个线程包含线程id,libevent实例,监听事件,管道的接收和发送id,CQ队列。

    CONN是对每个网络连接的封装,上文的CQ_ITEM是主线程对连接的封装,而CONN是worker线程对连接的封装。

typedef struct{
  int sfd;
  int state;
  struct event event;
  short which;
  char *rbuf;
  ... //省去很多状态标志和读写buf信息等
}conn;

   这里state是很重要的属性,因为主线程和worker都进入循环监听,作不同的处理就是根据state这个状态位来作区分。

3,事件处理

    示意图如下图

     当主线程监听套接字事件触发时,会将新的CQ_ITEM写入到worker的CQ队列,同时向管道写入触发worker在管道注册的读事件,通知worker,然后worker将新的客户端连接进行libevent事件注册,指定由该worker线程处理。

      worker线程拿到了这个连接之后,就应该是分配给这个连接一个结构体,包括这个连接所有的状态,都写buf等,这个结构体就是conn,然后这个worker线程会在它自己的event_base加入对这个新的连接的事件的监听。上面也说过了worker的event_base有两套处理逻辑,一个对notify_ receive_fd的,还有一套是对新连接的。这个notify_ receive_fd的处理逻辑就是处理2个事件,一个是建立连接,一个是 改变锁的粒度

      a, 核心循环

        drive_machine是多线程环境执行的,主线程和workers都会执行drive_machine

static void drive_machine(conn *c) {
    bool stop = false;
    int sfd, flags = 1;
    socklen_t addrlen;
    struct sockaddr_storage addr;
    int res;

    assert(c != NULL);

    while (!stop) {

        switch(c->state) {
        case conn_listening:// 监听的fd的connection,初始状态就是这个,表示监听中,
                               有连接
            addrlen = sizeof(addr);
            if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
                //回调到了这个地方,肯定有连接,这个函数直接得到连接的sfd
                break;
            }
            if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
                fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
                perror("setting O_NONBLOCK");
                close(sfd);
                break;
            }
            // 这里把新的请求分配给相应的worker线程
            dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
                                     DATA_BUFFER_SIZE, false);
            break;

        case conn_read:// 客户端连接套接字,由worker处理
            if (try_read_command(c) != 0) {
                continue;
            }
	    ....//省略
     }     
 }

   drive_machine主要是通过当前连接的state来判断该进行何种处理,因为通过libevent注册了读写时间后回调的都是 这个核心函数,所以实际上我们在注册libevent相应事件时,会同时把事件状态写到该conn结构体里,libevent进行回调时会把 该conn结构作为参数传递过来,就是该方法的形参 。

    b, 状态机(转载)

1.listening:这个状态是主线程的connection默认状态,它只有这一个状态,它做的工作就是把接到连接分发到worker子线程。

2.conn_new_cmd:每个新连接的初始状态,这个状态会清空读写buf。

3.conn_waiting:这个状态就是在event_base中设置读事件,然后状态机暂停,挂起当前connection(函数退出,回调函数的attachment会记录这个connection),等待有新的信息过来,然后通过回调函数的attachment重新找到这个connection,然后启动状态机。

4.conn_read:该状态从sfd中读取客户端的指令信息。

5.conn_parse_cmd:判断具体的指令,如果是update的指令,那么需要跳转到conn_nread中,因为需要在从网络中读取固定byte的数据,如果是查询之类的指令,就直接查询完成后,跳转到conn_mwrite中,返回数据

6.conn_nread:从网络中读取指定大小的数据,这个数据就是更新到item的数据,然后将数据更新到hash和lru中去,然后跳转到conn_write

7.conn_write:这个状态主要是调用out_string函数会跳转到这个状态,一般都是提示信息和返回的状态信息,然后输出这些数据,然后根据write_to_go的状态,继续跳转

8.conn_mwrite:这个写是把connection中的msglist返回到客户端,这个msglist存的是item的数据,用于那种get等获得item的操作的返回数据。

9.conn_swallow:对于那种update操作,如果分配item失败,显然后面的nread,是无效的,客户端是不知道的,这样客户端继续发送特定的数量的数据,就需要把读到的这些数据忽略掉,然后如果把后面指定的数据都忽略掉了(set的两部提交,数据部分忽略掉),那么connection跳转到conn_new_cmd,如果读nread的那些特定数量的数据没有读到,直接跳转到conn_closing。

10.conn_closing:服务器端主动关闭连接,调用close函数关闭文件描述符,同时把conn结构体放到空闲队列中,供新的连接重用这写conn结构体。

总结:memcached的网络模块的事件模型依赖于libevent的实现,memcached把fd关心的事件注册给libevent并注册了回调函数,libevent负责回调memcached,主线程把连接dispatch到具体的worker线程,同时把这个连接的描述符注册给worker线程自己的一套libevent,这样worker就接收了这个连接,以后这个fd上的事件回调都是这个线程上的做的工作了,每个连接都有自己的一套状态机,如果接受到数据就通过状态机处理,状态扭转,如果暂时没有数据就把连接暂时挂起,等到有了数据继续执行状态机。

 

 

 

 

参考:

http://blog.chinaunix.net/uid-28345378-id-4239012.html

http://www.iteye.com/topic/344172



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [memcache 网络 模型] 推荐:

memcache 网络驱动模型 (原创+转载)

- - 数据库 - ITeye博客
memcache使用多线程模型程序,使用libevent事件模型,与nginx类似,由一个主线程和多个worker线程组成.     memcache的主线程主要进行在监听套接字上注册事件监听网络中客户端的连接及处理和worker的初始化的工作.     首先初始化主线程的libevent实例,然后初始化所有worker线程,并为每个worker建立通知的管道用于主线程与worker之间的通信,然后每个worker线程都会注册管道可读的libevent事件,再为每个worker建立自己的CQ队列用于存放主线程分配的客户端连接,最后启动所有worker线程,并在监听套接字上箭筒客户端连接的事件,启动主线程的循环监听.

Nosql Redis ttserver Flare memcache比较

- - 小彰
随着互联网web2.0网站的兴起,非关系型的数据库现在成了一个极其热门的新领域,非关系数据库产品的发展非常迅速. 而传统的关系数据库在应付 web2.0网站,特别是超大规模和高并发的SNS类型的web2.0纯动态网站已经显得力不从心,暴露了很多难以克服的问题,例如:. 1、High performance - 对数据库高并发读写的需求.

Memcache架构新思考

- - ITeye博客
2011年初Marc Kwiatkowski通过Memecache@Facebook介绍了Facebook的Memcache架构,现在重新审视这个架构,仍有很多方面在业界保持先进性. 作为weibo内部数据处理量最大,对数据延迟最敏感的部门,基于本厂2年多来对mc的使用心得,我在本文总结对MC架构的一些新思考.

Memcache工作原理总结

- - Java - 编程语言 - ITeye博客
1.  分片原理. 咱们废话话不多说了,直接看Memcache的原理. 首先memcache解决的最大的一个问题就是内存多次读取的内存碎片问题. 内存碎片分为内存内部碎片和内存外部碎片. 一般是指在外部碎片中出现了不连续的细小内存片段,不能够被进程利用.

mctop: 监视 Memcache 流量

- - LinuxTOY
mctop 与 top 相似,主要用于监视 Memcache 的流量,包括 key 的调用次数、对象存储大小、每秒的请求数、以及消耗的网络带宽等. mctop 的源代码可从 GitHub 获取. 收藏到 del.icio.us |.

深入理解Memcache原理

- - CSDN博客研发管理推荐文章
1.为什么要使用memcache.  由于网站的高并发读写需求,传统的关系型数据库开始出现瓶颈,例如:. 1)对数据库的高并发读写:. 关系型数据库本身就是个庞然大物,处理过程非常耗时(如解析SQL语句,事务处理等). 如果对关系型数据库进行高并发读写(每秒上万次的访问),那么它是无法承受的. 对于大型的SNS网站,每天有上千万次的苏剧产生(如twitter, 新浪微博).

[Cacti] memcache安装运行、cacti监控memcache实战

- - CSDN博客系统运维推荐文章
Memcache是danga.com的一个项目,最早是为 LiveJournal 服务的,目前全世界不少人使用这个缓存项目来构建自己大负载的网站,来分担数据库的压力. Memcache官方网站:http://memcached.org/. 下载地址:  http://www.memcached.org/downloads,我们线上使用的比较稳定的版本是1.4.15,如果官网找不到以前的版本了,可以去我的csdn资源里面下载此版本,下载地址:.

Apache与Nginx网络模型

- - CSDN博客互联网推荐文章
      Nginx的高并发得益于其采用了epoll模型,与传统的服务器程序架构不同,epoll是linux内核2.6以后才出现的. 下面通过比较Apache和Nginx工作原理来比较.       传统Apache都是多进程或者多线程来工作,假设是多进程工作(prefork),apache会先生成几个进程,类似进程池的工作原理,只不过这里的进程池会随着请求数目的增加而增加.

深入研究memcache 特性和限制

- - 开源软件 - ITeye博客
转自 http://hi.baidu.com/jqxw4444/item/59c33ea3656ede3e020a4d1c. 在 Memcached中可以保存的item数据量是没有限制的,只要内存足够. Memcached单进程最大使用内存为2G,要使用更多内存,可以分多个端口开启多个Memcached进程,最大30天的数据过期时间,设置为永久的也会在这个时间过期.

[转]Memcache内存分配策略

- - 企业架构 - ITeye博客
一、Memcache内存分配机制.         关于这个机制网上有很多解释的,我个人的总结如下. Memcached的内存分配以page为单位,默认情况下一个page是1M,可以通过-I参数在启动时指定. 如果需要申请内存时,memcached会划分出一个新的page并分配给需要的slab区域.