在 OpenResty 里实现进程间通讯

标签: openresty | 发表时间:2019-08-03 23:24 | 作者:spacewander
出处:https://segmentfault.com/blogs

在 Nginx 里面,每个 worker 进程都是平等的。但是有些时候,我们需要给它们分配不同的角色,这时候就需要实现进程间通讯的功能。

轮询

一种简单粗暴但却被普遍使用的方案,就是每个进程划分属于自己的 list 类型的 shdict key,每隔一段时间查看是否有新消息。这种方式优点在于实现简单,缺点在于难以保证实时性。当然对于绝大多数需要进程间通讯的场景,每 0.1 起一个 timer 来处理新增消息已经足够了。毕竟 0.1 秒的延迟不算长,每秒起 10 个 timer 开销也不大,应付一般的通信量绰绰有余。

redis外援

要是你觉得轮询很搓,或者在你的环境下,轮询确实很搓,也可以考虑下引入外部依赖来改善实时性。比如在本地起一个 redis,监听 unix socket,然后每个进程通过 Pub/Sub 或者 stream 类型发布/获取最新的消息。这种方案实现起来也简单,实时性和性能也足够好,只是需要引入个 redis 服务。

ngx_lua_ipc

如果你是个极简主义者,对引入外部依赖深恶痛绝,希望什么东西都能在 Nginx 里面实现的话, ngx_lua_ipc 是一个被广泛使用的选择。

ngx_lua_ipc 是一个第三方 Nginx C 模块,提供了一些 Lua API,可供在 OpenResty 代码里完成进程间通讯(IPC)的操作。

它会在 Nginx 的 init 阶段创建 worker process + helper process 对 pipe fd。每对 fd 有一个作为 read fd,负责接收数据,另一个作为 write fd,用于发送数据。当 Nginx 创建 worker 进程时,每个 worker 进程都会继承这些 pipe fd,于是就能通过它们来实现进程间通讯。感兴趣的读者可以 man 7 pipe 一下,了解基于 pipe 的进程间通讯是怎么实现的。

当然 ngx_lua_ipc 还需要把 pipe 的 read fd 通过 ngx_connection_t 接入到 Nginx 的事件循环机制中,具体实现位于 ipc_channel_setup_conn

    c = ngx_get_connection(chan->pipe[conn_type == IPC_CONN_READ ? 0 : 1], cycle->log);
  c->data = data;

  if(conn_type == IPC_CONN_READ) {
    c->read->handler = event_handler;
    c->read->log = cycle->log;
    c->write->handler = NULL;
    ngx_add_event(c->read, NGX_READ_EVENT, 0);
    chan->read_conn=c;
  }
  else if(conn_type == IPC_CONN_WRITE) {
    c->read->handler = NULL;
    c->write->log = cycle->log;
    c->write->handler = ipc_write_handler;
    chan->write_conn=c;
  }
  else {
    return NGX_ERROR;
  }
  return NGX_OK;

write fd 是由 Lua 代码操作的,所以不需要加入到 Nginx 的事件循环机制中。

有一点有趣的细节,pipe fd 只有在写入数据小于 PIPE_BUF 时才会保证写操作的原子性。如果一条消息超过 PIPE_BUF(在 Linux 上大于 4K),那么它的写入就不是原子的,可能写入前面 PIPE_BUF 之后,有另一个 worker 也正巧给同一个进程写入消息。

为了避免不同 worker 进程的消息串在一起, ngx_lua_ipc 定义了一个 packet 概念。每个 packet 都不会大于 PIPE_BUF,同时有一个 header 来保证单个消息分割成多个 packet 之后能够被重新打包回来。

在接收端,为了能在收到消息之后执行对应的 Lua handler, ngx_lua_ipc 使用了 ngx.timer.at 来执行一个函数,这个函数会根据消息类型分发到对应的 handler 上。这样有个问题,就是消息是否能完成投递,取决于 ngx.timer.at 能否被执行。而 ngx.timer.at 是否被执行受限于两个因素:

  1. 如果 lua_max_pending_timer 不够大, ngx.timer.at 可能无法创建 timer
  2. 如果 lua_max_running_timer 不够大,或者没有足够的资源运行 timer, ngx.timer.at 创建的 timer 可能无法运行。

事实上,如果 timer 无法运行(消息无法投递),现阶段的 OpenResty 可能不会记录错误日志。我之前提过一个记录错误日志的 PR: https://github.com/openresty/...,不过一直没有合并。

所以严格意义上, ngx_lua_ipc 并不能保证消息能够被投递,也不能在消息投递失败时报错。不过这个锅得让 ngx.timer.at 来背。

ngx_lua_ipc 能不能不用 ngx.timer.at 那一套呢?这个就需要从 lua-nginx-module 里复制一大段代码,并偶尔同步一下。复制粘贴乃 Nginx C 模块开发的奥义。

动态监听 unix socket

上面的方法中,除了 Redis 外援法,如果不在应用代码里加日志,要想在外部查看消息投递的过程,只能依靠 gdb/systemtap/bcc 这些大招。如果走网络连接,就能使用平民技术,如 tcpdump,来追踪消息的流动。当然如果是 unix socket,还需要临时搞个 TCP proxy 整一下,不过操作难度较前面的大招们已经大大降低了。

那有没有办法让 IPC 走网络,但又不需要借助外部依赖呢?

回想起 Redis 外援法,之所以我们不能直接走 Nginx 的网络请求,是因为 Nginx 里面每个 worker 进程是平等的,你不知道你的请求会落到哪个进程上,而请求 Redis 就没这个问题。那我们能不能让不同的 worker 进程动态监听不同的 unix socket?

答案是肯定的。我们可以实现类似于这样的接口:

  ln = ngx.socket.listen(...)
sock = ln.accept()
sock:read(...)

曾经有人提过类似的 PR: https://github.com/openresty/...,我自己也在公司项目里实现过差不多的东西。声明下,不要用这个方法做 IPC。上面的实现有个致命的问题,就是 ln 和后面创建的所有的 sock,都是在同一个 Nginx 请求里面的。

我曾经写过,在一个 Nginx 请求里做太多的事情,会有资源分配上的问题: https://segmentfault.com/a/11...
后面随着 IPC 的次数的增加,这种问题会越发明显。

要想解决这个问题,我们可以把每个 sock 放到独立的 fake request 里面跑,就像这样:

  ln = ngx.socket.listen(...)
-- 类似于 ngx.timer.at 的处理风格
ln.register_handler(function(sock)
    sock:read(...)
end)

但是还有个问题。如果用 worker id 作为被监听的 unix socket 的 ID, 由于这个 unix socket 是在 worker 进程里动态监听的,而在 Nginx reload 或 binary upgrade 的情况下,多个 worker 进程会有同样的 worker id,尝试监听同样的 unix socket,导致地址被占用的错误。解决方法就是改用 PID 作为被监听的 unix socket 的 ID,然后在首次发送时初始化 PID 到 worker id 的映射。如果有支持在 reload 时正常发送消息的需求,还要记录新旧两组 worker,比如:

  1111 => old worker ID 1
1123 => new worker ID 2

每个 worker 分配不同的 unix socket

还有一种更为巧妙的,借助不同 worker 不同 unix socket 来实现进程间通讯的方法。这种方法是如此地巧妙,我只恨不是我想出来的。该方法可以淘汰掉上面动态监听 unix socket 的方案。

我们可以在 Nginx 配置文件里面声明, listen unix:xxx.sock use_as_ipc_blah_blah。然后修改 Nginx,让它在看到 use_as_ipc_blah_blah 差不多这样的一个标记时,让特定的进程监听特定的 unix sock,比如 xxx_1.sockxxx_2.sock 等。

它跟动态监听 unix socket 方法比起来,实现更为简单,所以也更为可靠。当然要想保证在 reload 或者 binary upgrade 时投递消息到正确的 worker,记得用 PID 而不是 worker id 来作为区分后缀,并维护好两者间的映射。

这个方法是由 datavisor 的同行提出来的,估计最近会开源出来。

相关 [openresty 进程 通讯] 推荐:

在 OpenResty 里实现进程间通讯

- - SegmentFault 最新的文章
在 Nginx 里面,每个 worker 进程都是平等的. 但是有些时候,我们需要给它们分配不同的角色,这时候就需要实现进程间通讯的功能. 一种简单粗暴但却被普遍使用的方案,就是每个进程划分属于自己的 list 类型的 shdict key,每隔一段时间查看是否有新消息. 这种方式优点在于实现简单,缺点在于难以保证实时性.

[转]推荐OpenResty - Nginx全能插件版

- - 天空极速
官网: http://openresty.org/. 虽然是中国人做的,但没几个汉字……. 我用Nginx,是这样一个过程:. 1、系统rpm中的nginx,能让其跑起来. OpenResty,是淘宝一位大牛(agentzh)集成的包含N多好插件的Nginx捆绑源码包,这位仁兄自称Nginx最活跃的第三方模块开发人员哦.

在nginx中安装并调试OpenResty

- - holmofy
OpenResty是基于Lua即时编译器(LuaJIT)对Nginx进行扩展的模块——最核心的就是. lua-nginx-module这个模块. 其他的都是 OpenResty基于lua开发的相关模块,当然也可以基于lua开发自己的第三方模块. 所以要想使用OpenResty首先必须安装 lua-nginx-module.

openresty+lua实现WAF应用防火墙

- - C1G军火库
pcre没找到,编辑时加上–with-pcre=../pcre-8.30 \. 4.下载ngx_cache_purge清缓组件. 伪装openresty为xcdn. 4.下载和配置 ngx_lua_waf. nginx下常见的开源 waf 有 mod_security、naxsi、ngx_lua_waf 这三个,ngx_lua_waf 性能高和易用性强,基本上零配置,而且常见的攻击类型都能防御,是比较省心的选择.

Openresty+Lua+Redis灰度发布 - K‘e0llm - 博客园

- -
灰度发布,简单来说,就是根据各种条件,让一部分用户使用旧版本,另一部分用户使用新版本. 百度百科中解释:灰度发布是指在黑与白之间,能够平滑过渡的一种发布方式. AB test就是一种灰度发布方式,让一部分用户继续用A,一部分用户开始用B,如果用户对B没有什么反对意见,那么逐步扩大范围,把所有用户都迁移到B上面 来.

Openresty流量复制/AB测试/协程_jinnianshilongnian的专栏-CSDN博客

- -
在实际开发中经常涉及到项目的升级,而该升级不能简单的上线就完事了,需要验证该升级是否兼容老的上线,因此可能需要并行运行两个项目一段时间进行数据比对和校验,待没问题后再进行上线. 这其实就需要进行流量复制,把流量复制到其他服务器上,一种方式是使用如. tcpcopy引流;另外我们还可以使用nginx的HttpLuaModule模块中的ngx.location.capture_multi进行并发执行来模拟复制.

电影通讯2011

- zun - 卫西谛,照常生活
【选了上半年看的一些新片做小小推荐,在微博上陆续写过(http://t.sina.com.cn/weixidi)】. 阿尔及利亚堤比邻修道院的七位法国修士,96年遭到恐怖分子绑架继而被杀害. 牺牲与殉道让人恍若看见最后的晚餐. 坚定者在死亡阴影里免除疑惧,重新回归日常生活,无论未来如何. 诵经歌声圣洁:除了爱,什么都不存在.

网络通讯协议图

- 李斌 - C++博客-首页原创精华区
阿π 2010-11-04 14:13 发表评论.

Android_ContentProvider_访问通讯录

- - CSDN博客推荐文章
本博文为子墨原创,转载请注明出处. 联系人提供者是一个很强很灵活的应用组件,用来管理联系人信息,可以方便的操作和查询联系人信息. 主要的3张表格,contact,raw contact,data,但是我们操作主要为raw contact,data两张表. /** * 使用批处理,插入联系人信息 * 插入姓名,email,家庭电话,工作电话,手机号码信息 * @param view */ public void insert() {.