inside gen_server call
动机
前段时间的两个工作.
一个是entity集群库, 可以通过entity_id调用任意节点上的entity.
一个是名字服务, 可以为一系列pid注册名字, 并可以以这些名字调用对应的pid.
都会遇到同一些问题: 当我们使用GenServer.call/2时, 发生了什么, 会有什么异常情况发生? 哪些异常应该捕获? 以什么样的方式处理这些异常/错误?
当call的pid所在的node崩溃时, 会有什么异常? 在调用开始前/中崩溃, 有什么不同.
当call的pid所在的node网络突然中断呢? 会有什么表现?
当call的pid崩溃时呢?
是否应该捕获timeout?
这些问题在文档中并没有答案. 所以, 探索一下.
深挖实现
源码版本
erlang: OTP-21.0.9
inside gen_server.erl call
gen_server.erl:203
%% ----------------------------------------------------------------- %% Make a call to a generic server. %% If the server is located at another node, that node will %% be monitored. %% If the client is trapping exits and is linked server termination %% is handled here (? Shall we do that here (or rely on timeouts) ?). %% ----------------------------------------------------------------- call(Name, Request) -> case catch gen:call(Name, '$gen_call', Request) of {ok,Res} -> Res; {'EXIT',Reason} -> exit({Reason, {?MODULE, call, [Name, Request]}}) end. call(Name, Request, Timeout) -> case catch gen:call(Name, '$gen_call', Request, Timeout) of {ok,Res} -> Res; {'EXIT',Reason} -> exit({Reason, {?MODULE, call, [Name, Request, Timeout]}}) end.
gen.erl:160
do_call(Process, Label, Request, Timeout) when is_atom(Process) =:= false -> Mref = erlang:monitor(process, Process), %% OTP-21: %% Auto-connect is asynchronous. But we still use 'noconnect' to make sure %% we send on the monitored connection, and not trigger a new auto-connect. %% erlang:send(Process, {Label, {self(), Mref}, Request}, [noconnect]), receive {Mref, Reply} -> erlang:demonitor(Mref, [flush]), {ok, Reply}; {'DOWN', Mref, _, _, noconnection} -> Node = get_node(Process), exit({nodedown, Node}); {'DOWN', Mref, _, _, Reason} -> exit(Reason) after Timeout -> erlang:demonitor(Mref, [flush]), exit(timeout) end.
可以看到, call一个process的过程:
- monitor process
- send_msg to process and receive for reply.
可能的情况有
- 正常返回, demonitor
- noconnection
- pid down for any reason
- timeout
那么, 前面的各种异常, 会对应到哪些情况呢? 有没有意外?
先看看monitor一个process时到底做了什么.
inside monitor
erlang.erl:1291
-type registered_name() :: atom(). -type registered_process_identifier() :: registered_name() | {registered_name(), node()}. -type monitor_process_identifier() :: pid() | registered_process_identifier(). -type monitor_port_identifier() :: port() | registered_name(). %% monitor/2 -spec monitor (process, monitor_process_identifier()) -> MonitorRef when MonitorRef :: reference(); (port, monitor_port_identifier()) -> MonitorRef when MonitorRef :: reference(); (time_offset, clock_service) -> MonitorRef when MonitorRef :: reference(). monitor(_Type, _Item) -> erlang:nif_error(undefined).
在monitor process时, 可以是一个pid, name, name node tuple. 但这里没有具体实现, 找一下nif.
调试发现, 入口在bif.c:monitor_2
Thread 5 "1_scheduler" hit Breakpoint 9, erts_monitor_create (type=type@entry=0, ref=ref@entry=140046391205050, orgn=3092376454563, trgt=trgt@entry=2645699855571, name=name@entry=18446744073709551611) at beam/erl_monitor_link.c:759 759 { (gdb) bt #0 erts_monitor_create (type=type@entry=0, ref=ref@entry=140046391205050, orgn=3092376454563, trgt=trgt@entry=2645699855571, name=name@entry=18446744073709551611) at beam/erl_monitor_link.c:759 #1 0x00000000004d5029 in monitor_2 (A__p=0x7f5f36e803e0, BIF__ARGS=0x7f5f379c0100, A__I=<optimized out>) at beam/bif.c:514 #2 0x000000000044042e in process_main () at x86_64-unknown-linux-gnu/opt/smp/beam_cold.h:59 #3 0x000000000043a0c6 in sched_thread_func (vesdp=0x7f5f35e44dc0) at beam/erl_process.c:8332 #4 0x00000000006467c9 in thr_wrapper (vtwd=0x7ffdb57b4180) at pthread/ethread.c:118 #5 0x00007f5f78d5c6ba in start_thread (arg=0x7f5f34e7f700) at pthread_create.c:333 #6 0x00007f5f7888a41d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:109
先看本地pid的注册, 当target为atom时, 只是取一下pid, goto local_proccess, 逻辑上只有name是否有值的区别.
// 若是本地pid, 名字为空 if (is_internal_pid(target)) { name = NIL; id = target; local_process: // make ref in call process ref = erts_make_ref(BIF_P); // 判断monitor的进程非调用进程 if (id != BIF_P->common.id) { // create monitor data mdp = erts_monitor_create(ERTS_MON_TYPE_PROC, ref, BIF_P->common.id, id, name); // insert to process monitor tree erts_monitor_tree_insert(&ERTS_P_MONITORS(BIF_P), &mdp->origin); // 从这里开始, 远程pid的monitor有逻辑上的不同 // 给调用进程mdp->target发sig, monitor target id. if (!erts_proc_sig_send_monitor(&mdp->target, id)) erts_proc_sig_send_monitor_down(&mdp->target, am_noproc); } BIF_RET(ref); }
再看远程pid的注册
if (is_external_pid(target)) { ErtsDSigData dsd; int code; dep = external_pid_dist_entry(target); if (dep == erts_this_dist_entry) goto noproc; id = target; name = NIL; byname = 0; remote_process: ref = erts_make_ref(BIF_P); mdp = erts_monitor_create(ERTS_MON_TYPE_DIST_PROC, ref, BIF_P->common.id, id, name); erts_monitor_tree_insert(&ERTS_P_MONITORS(BIF_P), &mdp->origin); // 这里开始和local pid有所不同 code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_PROC_LOCK_MAIN, ERTS_DSP_RLOCK, 0, 1); switch (code) { // 如果调用时, 链接就没建立, 或远端节点NOT_ALIVE, 马上会抛出 noconnection 异常. case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: erts_monitor_set_dead_dist(&mdp->target, dep->sysname); erts_proc_sig_send_monitor_down(&mdp->target, am_noconnection); code = ERTS_DSIG_SEND_OK; break; // 链接中或已建链 case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: { #ifdef DEBUG int inserted = #endif // monitor a process (named or unnamed) on another node erts_monitor_dist_insert(&mdp->target, dep->mld); ASSERT(inserted); erts_de_runlock(dep); code = erts_dsig_send_monitor(&dsd, BIF_P->common.id, target, ref); break; } default: ERTS_ASSERT(! "Invalid dsig prepare result"); code = ERTS_DSIG_SEND_OK; break; } if (byname) erts_deref_dist_entry(dep); if (code == ERTS_DSIG_SEND_YIELD) ERTS_BIF_YIELD_RETURN(BIF_P, ref); BIF_RET(ref); }
若调用中pid退出, 本地pid, 会直接调用erl_proc_sig_queue.c:erts_proc_sig_send_monitor_down.
远端pid, 会收到DOP_MONITOR_P_EXIT消息. dist.c:1665
case DOP_MONITOR_P_EXIT: { /* We are monitoring a process on the remote node which dies, we get {DOP_MONITOR_P_EXIT, Remote pid or name, Local pid, ref, reason} */ if (tuple_arity != 5) { goto invalid_message; } watched = tuple[2]; /* remote proc or name which died */ watcher = tuple[3]; ref = tuple[4]; reason = tuple[5]; if (is_not_ref(ref)) goto invalid_message; if (is_not_external_pid(watched) && is_not_atom(watched)) goto invalid_message; if (is_not_internal_pid(watcher)) { if (!is_external_pid(watcher)) goto invalid_message; if (erts_this_dist_entry == external_pid_dist_entry(watcher)) break; goto invalid_message; } erts_proc_sig_send_dist_monitor_down(dep, ref, watched, watcher, reason); break; }
最终和本地pid down一样, 都会通过send_gen_exit_signal, 通知monitor的进程.
node down
dist.c:erts_do_net_exits
dist.c:schedule_con_monitor_link_cleanup
dist.c:con_monitor_link_cleanup
过程, 检测到网络断开后, 对本node的monitor的pid, 发送reason信息. todo 调试拿一下callstack.
验证
先将EchoService的代码贴出.
defmodule Service.Echo do @moduledoc """ """ use GenServer require Logger require Record Record.defrecordp :state, [ ] def start_link(params) do GenServer.start_link(__MODULE__, params, name: __MODULE__) end def init(_) do {:ok, state()} end def handle_info(_what, state) do {:noreply, state} end def handle_cast(_what, state) do {:noreply, state} end def handle_call({:sleep, timeout}=what, _from, state) do :timer.sleep(timeout) {:reply, what, state} end def handle_call({:raise_after, timeout, raise_msg}=what, _from, state) do :timer.sleep(timeout) raise raise_msg {:reply, what, state} end def handle_call(what, _from, state) do {:reply, what, state} end end
调用开始前, Node崩溃/未链接, exit no connection
iex(xxxxxx@xxxxxx.)47> GenServer.call({Service.Echo, :"xxxxx@xxxxxx."}, :hello) ** (exit) exited in: GenServer.call({Service.Echo, :"xxxxx@xxxxxx."}, :hello, 5000) ** (EXIT) no connection to xxxxx@xxxxxx. (elixir) lib/gen_server.ex:924: GenServer.call/3
调用开始后, shutdown Node, exit shutdown
iex(xxxxx@xxxxx.)47> xxxxx.call(:echo_service, {:sleep, 50000}, 100000) ** (exit) exited in: GenServer.call(#PID<45205.2967.0>, {:sleep, 50000}, 100000) ** (EXIT) shutdown (elixir) lib/gen_server.ex:924: GenServer.call/3 (xxxxxxxxx) lib/xxxxxxx/xxxxxxxxx.ex:35: xxxxxxx/4
调用开始后, kill -9 {node_pid}, exit by no connection
iex(xxxxxx@xxxxxx.)54> GenServer.call({Service.Echo, :"xxxxxx@xxxxxx."}, {:sleep, 1000000}, 50000000) ** (exit) exited in: GenServer.call({Service.Echo, :"xxxxxx@xxxxxx."}, {:sleep, 1000000}, 50000000) ** (EXIT) no connection to xxxxxx@xxxxxx. (elixir) lib/gen_server.ex:924: GenServer.call/3
调用开始后, pid崩溃, exit by reason
iex(xxxxx@xxxxx.)48> GenServer.call({Service.Echo, :"xxxxxxx@xxxxxxx."}, {:raise_after, 3000, "hello"}) ** (exit) exited in: GenServer.call({Service.Echo, :"xxxxxxx@xxxxxx."}, {:raise_after, 3000, "hello"}, 5000) ** (EXIT) an exception was raised: ** (RuntimeError) hello lib/echo_service.ex:99: Service.Echo.handle_call/3 (stdlib) gen_server.erl:661: :gen_server.try_handle_call/4 (stdlib) gen_server.erl:690: :gen_server.handle_msg/6 (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3 (elixir) lib/gen_server.ex:924: GenServer.call/3
调用开始后, 网络中断
用docker network disconnect multi-host-network xxxxx_echo_service模拟拔网线的效果
大约30-60S后, exit by no connection
iex(xxxxx@xxxxx.)48> GenServer.call({Service.Echo, :"xxxxxx@xxxxxxx."}, {:sleep, 1000000}, 50000000) ** (exit) exited in: GenServer.call({Service.Echo, :"xxxxxx@xxxxxxx."}, {:sleep, 1000000}, 50000000) ** (EXIT) no connection to xxxxxx@xxxxxxx. (elixir) lib/gen_server.ex:924: GenServer.call/3