在 OpenResty 里实现进程间通讯

标签: openresty | 发表时间:2019-08-03 23:24 | 作者:spacewander
出处:https://segmentfault.com/blogs

在 Nginx 里面,每个 worker 进程都是平等的。但是有些时候,我们需要给它们分配不同的角色,这时候就需要实现进程间通讯的功能。

轮询

一种简单粗暴但却被普遍使用的方案,就是每个进程划分属于自己的 list 类型的 shdict key,每隔一段时间查看是否有新消息。这种方式优点在于实现简单,缺点在于难以保证实时性。当然对于绝大多数需要进程间通讯的场景,每 0.1 起一个 timer 来处理新增消息已经足够了。毕竟 0.1 秒的延迟不算长,每秒起 10 个 timer 开销也不大,应付一般的通信量绰绰有余。

redis外援

要是你觉得轮询很搓,或者在你的环境下,轮询确实很搓,也可以考虑下引入外部依赖来改善实时性。比如在本地起一个 redis,监听 unix socket,然后每个进程通过 Pub/Sub 或者 stream 类型发布/获取最新的消息。这种方案实现起来也简单,实时性和性能也足够好,只是需要引入个 redis 服务。

ngx_lua_ipc

如果你是个极简主义者,对引入外部依赖深恶痛绝,希望什么东西都能在 Nginx 里面实现的话, ngx_lua_ipc 是一个被广泛使用的选择。

ngx_lua_ipc 是一个第三方 Nginx C 模块,提供了一些 Lua API,可供在 OpenResty 代码里完成进程间通讯(IPC)的操作。

它会在 Nginx 的 init 阶段创建 worker process + helper process 对 pipe fd。每对 fd 有一个作为 read fd,负责接收数据,另一个作为 write fd,用于发送数据。当 Nginx 创建 worker 进程时,每个 worker 进程都会继承这些 pipe fd,于是就能通过它们来实现进程间通讯。感兴趣的读者可以 man 7 pipe 一下,了解基于 pipe 的进程间通讯是怎么实现的。

当然 ngx_lua_ipc 还需要把 pipe 的 read fd 通过 ngx_connection_t 接入到 Nginx 的事件循环机制中,具体实现位于 ipc_channel_setup_conn

    c = ngx_get_connection(chan->pipe[conn_type == IPC_CONN_READ ? 0 : 1], cycle->log);
  c->data = data;

  if(conn_type == IPC_CONN_READ) {
    c->read->handler = event_handler;
    c->read->log = cycle->log;
    c->write->handler = NULL;
    ngx_add_event(c->read, NGX_READ_EVENT, 0);
    chan->read_conn=c;
  }
  else if(conn_type == IPC_CONN_WRITE) {
    c->read->handler = NULL;
    c->write->log = cycle->log;
    c->write->handler = ipc_write_handler;
    chan->write_conn=c;
  }
  else {
    return NGX_ERROR;
  }
  return NGX_OK;

write fd 是由 Lua 代码操作的,所以不需要加入到 Nginx 的事件循环机制中。

有一点有趣的细节,pipe fd 只有在写入数据小于 PIPE_BUF 时才会保证写操作的原子性。如果一条消息超过 PIPE_BUF(在 Linux 上大于 4K),那么它的写入就不是原子的,可能写入前面 PIPE_BUF 之后,有另一个 worker 也正巧给同一个进程写入消息。

为了避免不同 worker 进程的消息串在一起, ngx_lua_ipc 定义了一个 packet 概念。每个 packet 都不会大于 PIPE_BUF,同时有一个 header 来保证单个消息分割成多个 packet 之后能够被重新打包回来。

在接收端,为了能在收到消息之后执行对应的 Lua handler, ngx_lua_ipc 使用了 ngx.timer.at 来执行一个函数,这个函数会根据消息类型分发到对应的 handler 上。这样有个问题,就是消息是否能完成投递,取决于 ngx.timer.at 能否被执行。而 ngx.timer.at 是否被执行受限于两个因素:

  1. 如果 lua_max_pending_timer 不够大, ngx.timer.at 可能无法创建 timer
  2. 如果 lua_max_running_timer 不够大,或者没有足够的资源运行 timer, ngx.timer.at 创建的 timer 可能无法运行。

事实上,如果 timer 无法运行(消息无法投递),现阶段的 OpenResty 可能不会记录错误日志。我之前提过一个记录错误日志的 PR: https://github.com/openresty/...,不过一直没有合并。

所以严格意义上, ngx_lua_ipc 并不能保证消息能够被投递,也不能在消息投递失败时报错。不过这个锅得让 ngx.timer.at 来背。

ngx_lua_ipc 能不能不用 ngx.timer.at 那一套呢?这个就需要从 lua-nginx-module 里复制一大段代码,并偶尔同步一下。复制粘贴乃 Nginx C 模块开发的奥义。

动态监听 unix socket

上面的方法中,除了 Redis 外援法,如果不在应用代码里加日志,要想在外部查看消息投递的过程,只能依靠 gdb/systemtap/bcc 这些大招。如果走网络连接,就能使用平民技术,如 tcpdump,来追踪消息的流动。当然如果是 unix socket,还需要临时搞个 TCP proxy 整一下,不过操作难度较前面的大招们已经大大降低了。

那有没有办法让 IPC 走网络,但又不需要借助外部依赖呢?

回想起 Redis 外援法,之所以我们不能直接走 Nginx 的网络请求,是因为 Nginx 里面每个 worker 进程是平等的,你不知道你的请求会落到哪个进程上,而请求 Redis 就没这个问题。那我们能不能让不同的 worker 进程动态监听不同的 unix socket?

答案是肯定的。我们可以实现类似于这样的接口:

  ln = ngx.socket.listen(...)
sock = ln.accept()
sock:read(...)

曾经有人提过类似的 PR: https://github.com/openresty/...,我自己也在公司项目里实现过差不多的东西。声明下,不要用这个方法做 IPC。上面的实现有个致命的问题,就是 ln 和后面创建的所有的 sock,都是在同一个 Nginx 请求里面的。

我曾经写过,在一个 Nginx 请求里做太多的事情,会有资源分配上的问题: https://segmentfault.com/a/11...
后面随着 IPC 的次数的增加,这种问题会越发明显。

要想解决这个问题,我们可以把每个 sock 放到独立的 fake request 里面跑,就像这样:

  ln = ngx.socket.listen(...)
-- 类似于 ngx.timer.at 的处理风格
ln.register_handler(function(sock)
    sock:read(...)
end)

但是还有个问题。如果用 worker id 作为被监听的 unix socket 的 ID, 由于这个 unix socket 是在 worker 进程里动态监听的,而在 Nginx reload 或 binary upgrade 的情况下,多个 worker 进程会有同样的 worker id,尝试监听同样的 unix socket,导致地址被占用的错误。解决方法就是改用 PID 作为被监听的 unix socket 的 ID,然后在首次发送时初始化 PID 到 worker id 的映射。如果有支持在 reload 时正常发送消息的需求,还要记录新旧两组 worker,比如:

  1111 => old worker ID 1
1123 => new worker ID 2

每个 worker 分配不同的 unix socket

还有一种更为巧妙的,借助不同 worker 不同 unix socket 来实现进程间通讯的方法。这种方法是如此地巧妙,我只恨不是我想出来的。该方法可以淘汰掉上面动态监听 unix socket 的方案。

我们可以在 Nginx 配置文件里面声明, listen unix:xxx.sock use_as_ipc_blah_blah。然后修改 Nginx,让它在看到 use_as_ipc_blah_blah 差不多这样的一个标记时,让特定的进程监听特定的 unix sock,比如 xxx_1.sockxxx_2.sock 等。

它跟动态监听 unix socket 方法比起来,实现更为简单,所以也更为可靠。当然要想保证在 reload 或者 binary upgrade 时投递消息到正确的 worker,记得用 PID 而不是 worker id 来作为区分后缀,并维护好两者间的映射。

这个方法是由 datavisor 的同行提出来的,估计最近会开源出来。

相关 [openresty 进程 通讯] 推荐:

在 OpenResty 里实现进程间通讯

- - SegmentFault 最新的文章
在 Nginx 里面,每个 worker 进程都是平等的. 但是有些时候,我们需要给它们分配不同的角色,这时候就需要实现进程间通讯的功能. 一种简单粗暴但却被普遍使用的方案,就是每个进程划分属于自己的 list 类型的 shdict key,每隔一段时间查看是否有新消息. 这种方式优点在于实现简单,缺点在于难以保证实时性.

[转]推荐OpenResty - Nginx全能插件版

- - 天空极速
官网: http://openresty.org/. 虽然是中国人做的,但没几个汉字……. 我用Nginx,是这样一个过程:. 1、系统rpm中的nginx,能让其跑起来. OpenResty,是淘宝一位大牛(agentzh)集成的包含N多好插件的Nginx捆绑源码包,这位仁兄自称Nginx最活跃的第三方模块开发人员哦.

电影通讯2011

- zun - 卫西谛,照常生活
【选了上半年看的一些新片做小小推荐,在微博上陆续写过(http://t.sina.com.cn/weixidi)】. 阿尔及利亚堤比邻修道院的七位法国修士,96年遭到恐怖分子绑架继而被杀害. 牺牲与殉道让人恍若看见最后的晚餐. 坚定者在死亡阴影里免除疑惧,重新回归日常生活,无论未来如何. 诵经歌声圣洁:除了爱,什么都不存在.

网络通讯协议图

- 李斌 - C++博客-首页原创精华区
阿π 2010-11-04 14:13 发表评论.

Android_ContentProvider_访问通讯录

- - CSDN博客推荐文章
本博文为子墨原创,转载请注明出处. 联系人提供者是一个很强很灵活的应用组件,用来管理联系人信息,可以方便的操作和查询联系人信息. 主要的3张表格,contact,raw contact,data,但是我们操作主要为raw contact,data两张表. /** * 使用批处理,插入联系人信息 * 插入姓名,email,家庭电话,工作电话,手机号码信息 * @param view */ public void insert() {.

JAVA 远程通讯机制

- - 移动开发 - ITeye博客
远程服务通讯,需要达到的目标是在一台计算机发起请求,另外一台机器在接收到请求后进行相应的处理并将结果返回给请求端,这其中又会有诸如one way request、同步请求、异步请求等等请求方式,按照网络通信原理,需要实现这个需要做的就是将请求转换成流,通过传输协议传输至远端,远端计算机在接收到请求的流后进行处理,处理完毕后将结果转化为流,并通过传输协议返回给调用端.

TCP协议通讯流程

- - 操作系统 - ITeye博客
服务器调用socket()、bind()、listen()完成初始化后,调用accept()阻塞等待,处于监听端口的状态,客户端调用socket()初始化后,调用connect()发出SYN段并阻塞等待服务器应答,服务器应答一个SYN-ACK段,客户端收到后从connect()返回,同时应答一个ACK段,服务器收到后从accept()返回.

JAVA RPC 通讯框架

- - 经验沉淀 知识结晶
Bison 是一个JAVA 程间的通信框架,基于apache mina 实现,对mina进行了byteBuffer 缓冲区重用以及半包出处时减少拷贝. 客户端(bison-client) 功能点. 2 支持高用性:高可用的一个基本原则,可以接受快速的失败,但不能接受长时间的等待. Githup地址:https://github.com/gavenpeng/Bison.

监控进程

- - 火丁笔记
有时候,进程突然终止服务,可能是没有资源了,也可能是意外,比如说:因为 OOM 被杀;或者由于 BUG 导致崩溃;亦或者误操作等等,此时,我们需要重新启动进程. 实际上,Linux 本身的初始化系统能实现简单的功能,无论是老牌的 SysVinit,还是新潮的  Upstart 或者  Systemd 均可,但它们并不适合处理一些复杂的情况,比如说:CPU 占用超过多少就重启;或者同时管理 100 个 PHP 实现的 Worker 进程等等,如果你有类似的需求,那么可以考虑试试 Monit 和 Supervisor,相信会有不一样的感受.