在 OpenResty 里实现进程间通讯

在 Nginx 里面,每个 worker 进程都是平等的。但是有些时候,我们需要给它们分配不同的角色,这时候就需要实现进程间通讯的功能。

轮询

一种简单粗暴但却被普遍使用的方案,就是每个进程划分属于自己的 list 类型的 shdict key,每隔一段时间查看是否有新消息。这种方式优点在于实现简单,缺点在于难以保证实时性。当然对于绝大多数需要进程间通讯的场景,每 0.1 起一个 timer 来处理新增消息已经足够了。毕竟 0.1 秒的延迟不算长,每秒起 10 个 timer 开销也不大,应付一般的通信量绰绰有余。

redis外援

要是你觉得轮询很搓,或者在你的环境下,轮询确实很搓,也可以考虑下引入外部依赖来改善实时性。比如在本地起一个 redis,监听 unix socket,然后每个进程通过 Pub/Sub 或者 stream 类型发布/获取最新的消息。这种方案实现起来也简单,实时性和性能也足够好,只是需要引入个 redis 服务。

ngx_lua_ipc

如果你是个极简主义者,对引入外部依赖深恶痛绝,希望什么东西都能在 Nginx 里面实现的话,ngx_lua_ipc 是一个被广泛使用的选择。

ngx_lua_ipc 是一个第三方 Nginx C 模块,提供了一些 Lua API,可供在 OpenResty 代码里完成进程间通讯(IPC)的操作。

它会在 Nginx 的 init 阶段创建 worker process + helper process 对 pipe fd。每对 fd 有一个作为 read fd,负责接收数据,另一个作为 write fd,用于发送数据。当 Nginx 创建 worker 进程时,每个 worker 进程都会继承这些 pipe fd,于是就能通过它们来实现进程间通讯。感兴趣的读者可以 man 7 pipe 一下,了解基于 pipe 的进程间通讯是怎么实现的。

当然 ngx_lua_ipc 还需要把 pipe 的 read fd 通过 ngx_connection_t 接入到 Nginx 的事件循环机制中,具体实现位于 ipc_channel_setup_conn

c = ngx_get_connection(chan->pipe[conn_type == IPC_CONN_READ ? 0 : 1], cycle->log);
  c->data = data;

  if(conn_type == IPC_CONN_READ) {
    c->read->handler = event_handler;
    c->read->log = cycle->log;
    c->write->handler = NULL;
    ngx_add_event(c->read, NGX_READ_EVENT, 0);
    chan->read_conn=c;
  }
  else if(conn_type == IPC_CONN_WRITE) {
    c->read->handler = NULL;
    c->write->log = cycle->log;
    c->write->handler = ipc_write_handler;
    chan->write_conn=c;
  }
  else {
    return NGX_ERROR;
  }
  return NGX_OK;

write fd 是由 Lua 代码操作的,所以不需要加入到 Nginx 的事件循环机制中。

有一点有趣的细节,pipe fd 只有在写入数据小于 PIPE_BUF 时才会保证写操作的原子性。如果一条消息超过 PIPE_BUF(在 Linux 上大于 4K),那么它的写入就不是原子的,可能写入前面 PIPE_BUF 之后,有另一个 worker 也正巧给同一个进程写入消息。

为了避免不同 worker 进程的消息串在一起,ngx_lua_ipc 定义了一个 packet 概念。每个 packet 都不会大于 PIPE_BUF,同时有一个 header 来保证单个消息分割成多个 packet 之后能够被重新打包回来。

在接收端,为了能在收到消息之后执行对应的 Lua handler,ngx_lua_ipc 使用了 ngx.timer.at 来执行一个函数,这个函数会根据消息类型分发到对应的 handler 上。这样有个问题,就是消息是否能完成投递,取决于 ngx.timer.at 能否被执行。而 ngx.timer.at 是否被执行受限于两个因素:

  1. 如果 lua_max_pending_timer 不够大,ngx.timer.at 可能无法创建 timer
  2. 如果 lua_max_running_timer 不够大,或者没有足够的资源运行 timer,ngx.timer.at 创建的 timer 可能无法运行。

事实上,如果 timer 无法运行(消息无法投递),现阶段的 OpenResty 可能不会记录错误日志。我之前提过一个记录错误日志的 PR:https://github.com/openresty/...,不过一直没有合并。

所以严格意义上, ngx_lua_ipc 并不能保证消息能够被投递,也不能在消息投递失败时报错。不过这个锅得让 ngx.timer.at 来背。

ngx_lua_ipc 能不能不用 ngx.timer.at 那一套呢?这个就需要从 lua-nginx-module 里复制一大段代码,并偶尔同步一下。复制粘贴乃 Nginx C 模块开发的奥义。

动态监听 unix socket

上面的方法中,除了 Redis 外援法,如果不在应用代码里加日志,要想在外部查看消息投递的过程,只能依靠 gdb/systemtap/bcc 这些大招。如果走网络连接,就能使用平民技术,如 tcpdump,来追踪消息的流动。当然如果是 unix socket,还需要临时搞个 TCP proxy 整一下,不过操作难度较前面的大招们已经大大降低了。

那有没有办法让 IPC 走网络,但又不需要借助外部依赖呢?

回想起 Redis 外援法,之所以我们不能直接走 Nginx 的网络请求,是因为 Nginx 里面每个 worker 进程是平等的,你不知道你的请求会落到哪个进程上,而请求 Redis 就没这个问题。那我们能不能让不同的 worker 进程动态监听不同的 unix socket?

答案是肯定的。我们可以实现类似于这样的接口:

ln = ngx.socket.listen(...)
sock = ln.accept()
sock:read(...)

曾经有人提过类似的 PR:https://github.com/openresty/...,我自己也在公司项目里实现过差不多的东西。声明下,不要用这个方法做 IPC。上面的实现有个致命的问题,就是 ln 和后面创建的所有的 sock,都是在同一个 Nginx 请求里面的。

我曾经写过,在一个 Nginx 请求里做太多的事情,会有资源分配上的问题:https://segmentfault.com/a/11...
后面随着 IPC 的次数的增加,这种问题会越发明显。

要想解决这个问题,我们可以把每个 sock 放到独立的 fake request 里面跑,就像这样:

ln = ngx.socket.listen(...)
-- 类似于 ngx.timer.at 的处理风格
ln.register_handler(function(sock)
    sock:read(...)
end)

但是还有个问题。如果用 worker id 作为被监听的 unix socket 的 ID, 由于这个 unix socket 是在 worker 进程里动态监听的,而在 Nginx reload 或 binary upgrade 的情况下,多个 worker 进程会有同样的 worker id,尝试监听同样的 unix socket,导致地址被占用的错误。解决方法就是改用 PID 作为被监听的 unix socket 的 ID,然后在首次发送时初始化 PID 到 worker id 的映射。如果有支持在 reload 时正常发送消息的需求,还要记录新旧两组 worker,比如:

1111 => old worker ID 1
1123 => new worker ID 2

每个 worker 分配不同的 unix socket

还有一种更为巧妙的,借助不同 worker 不同 unix socket 来实现进程间通讯的方法。这种方法是如此地巧妙,我只恨不是我想出来的。该方法可以淘汰掉上面动态监听 unix socket 的方案。

我们可以在 Nginx 配置文件里面声明,listen unix:xxx.sock use_as_ipc_blah_blah。然后修改 Nginx,让它在看到 use_as_ipc_blah_blah 差不多这样的一个标记时,让特定的进程监听特定的 unix sock,比如 xxx_1.sockxxx_2.sock 等。

它跟动态监听 unix socket 方法比起来,实现更为简单,所以也更为可靠。当然要想保证在 reload 或者 binary upgrade 时投递消息到正确的 worker,记得用 PID 而不是 worker id 来作为区分后缀,并维护好两者间的映射。

这个方法是由蚂蚁金服的同行提出来的,估计最近会开源出来。

相关推荐