lib_chan 库说明
我不知道是这段代码太复杂,还是我的智商有问题。至少,我不太习惯erlang的这种方式。不过,我很喜欢这个lib_chan的思路和erlang程序的风格。这种方式非常的明确,直接。我不知道为什么erlang没有一个类似的成熟的库。这个lib_chan好像是joearmstrong为这本书写的,难道erlang根本不需要这样的东西?
最好结合书里的那个聊天程序来理解lib_chan。这确实是个很精巧的东西(lib_chan还谈不上精巧,那个chat程序才精巧)。
注意:我只解释重要的地方。
lib_chan是一个类似总外壳的东西,定义接口,连接起诸多细节,这里我去掉了一些琐碎的函数。
%% --- %% Excerpted from "Programming Erlang", %% published by The Pragmatic Bookshelf. %% Copyrights apply to this code. It may not be used to create training material, %% courses, books, articles, and the like. Contact us if you are in doubt. %% We make no guarantees that this code is fit for any purpose. %% Visit http://www.pragmaticprogrammer.com/titles/jaerlang for more book information. %%--- -module(lib_chan). -export([cast/2, start_server/0, start_server/1, connect/5, disconnect/1, rpc/2]). -import(lists, [map/2, member/2, foreach/2]). -import(lib_chan_mm, [send/2, close/1]). %%---------------------------------------------------------------------- %% Server code start_server1(ConfigData) -> register(lib_chan, spawn(fun() -> start_server2(ConfigData) end)). start_server2(ConfigData) -> [Port] = [ P || {port,P} <- ConfigData], start_port_server(Port, ConfigData). start_port_server(Port, ConfigData) -> %% lib_chan_cs:start_raw_server 函数会启动端口监听,并且,一旦接受连接, %% 也就是说,拿到一个Socket的时候(一个Channel),就给现在传进来的fun(Socket)。 %% %% 也就是说,得到Socket就是lib_chan_cs:start_raw_server的第一阶段的成果, %% 之后,它的任务就是把拿到的Socket给fun(Socket),就这么简单。 %% %% 而fun(Socket)做的事情就是,start_port_instance(Socket,ConfigData)。 %% 看见了吗?我们现在有了Socket(先不必考虑监听,accept之类的事),并且, %% 我们可以从ConfigData里拿到要运行的目标(一个函数,或者说是一个入口)。 %% 现在,你需要的都全了吧?!更细节的是下一步了。 lib_chan_cs:start_raw_server(Port, fun(Socket) -> start_port_instance(Socket, ConfigData) end, 100, 4). start_port_instance(Socket, ConfigData) -> %% 现在,我们已经有了Socket,我们还知道ConfigData,但是,Socket不是MM, %% 我们必须弄成中间人模式,所以,要用lib_chan_mm:loop(Socket,Controller)。 %% 谁运行lib_chan_mm:loop/2,谁就是MM。明白了吧? %% 那么,在运行lib_chan_mm:loop/2之前,必须的搞一个Controller,也就是说, %% MM必须的知道把自己收到的消息给谁,这个谁就是Controller。 %% Controller也是个process,所以要spawn。 S = self(), Controller = spawn_link(fun() -> start_erl_port_server(S, ConfigData) end), lib_chan_mm:loop(Socket, Controller). start_erl_port_server(MM, ConfigData) -> %% 这个函数是Controller要运行的,我知道MM会给我消息,我也知道我要运行的东西 %% 在ConfigData里。 receive {chan, MM, {startService, Mod, ArgC}} -> case get_service_definition(Mod, ConfigData) of {yes, Pwd, MFA} -> case Pwd of none -> send(MM, ack), really_start(MM, ArgC, MFA); _ -> do_authentication(Pwd, MM, ArgC, MFA) end; no -> io:format("sending bad service~n"), send(MM, badService), close(MM) end; Any -> io:format("*** ErL port server got:~p ~p~n",[MM, Any]), exit({protocolViolation, Any}) end. %% MM is the middle man %% Mod is the Module we want to execute ArgC and ArgS come from the client and %% server respectively really_start(MM, ArgC, {Mod, Func, ArgS}) -> %% 这就是start_erl_port_server/2的推进版,已经把ConfigData解析成mfa。 %% Controller最终与运行到这里,也就是说Controller会运行apply(Mod,Func,[MM,ArgC,ArgS])。 %% ok! case (catch apply(Mod,Func,[MM,ArgC,ArgS])) of {'EXIT', normal} -> true; {'EXIT', Why} -> io:format("server error:~p~n",[Why]); Why -> io:format("server error should die with exit(normal) was:~p~n", [Why]) end. %% get_service_definition(Name, ConfigData) get_service_definition(Mod, [{service, Mod, password, Pwd, mfa, M, F, A}|_]) -> {yes, Pwd, {M, F, A}}; get_service_definition(Name, [_|T]) -> get_service_definition(Name, T); get_service_definition(_, []) -> no.
下面的这个是lib_chan_cs,它构造了服务器端的结构和机制。
而lib_chan_mm则很简单,只是翻译了一下数据。
所以,关于lib_chan_mm我也不想解释什么,如果你读不懂,那么肯定是你自己的问题了。
%% --- %% Excerpted from "Programming Erlang", %% published by The Pragmatic Bookshelf. %% Copyrights apply to this code. It may not be used to create training material, %% courses, books, articles, and the like. Contact us if you are in doubt. %% We make no guarantees that this code is fit for any purpose. %% Visit http://www.pragmaticprogrammer.com/titles/jaerlang for more book information. %%--- -module(lib_chan_cs). %% cs stands for client_server -export([start_raw_server/4, start_raw_client/3]). -export([stop/1]). -export([children/1]). %% start_raw_server(Port, Fun, Max) %% This server accepts up to Max connections on Port %% The *first* time a connection is made to Port %% Then Fun(Socket) is called. %% Thereafter messages to the socket result in messages to the handler. %% tcp_is typically used as follows: %% To setup a listener %% start_agent(Port) -> %% process_flag(trap_exit, true), %% lib_chan_server:start_raw_server(Port, %% fun(Socket) -> input_handler(Socket) end, %% 15, %% 0). start_raw_client(Host, Port, PacketLength) -> gen_tcp:connect(Host, Port, [binary, {active, true}, {packet, PacketLength}]). %% Note when start_raw_server returns it should be ready to %% Immediately accept connections start_raw_server(Port, Fun, Max, PacketLength) -> %% 从这里启动服务器。 %% 注意,它并没有自己直接去启动,而是用一个进程去干这事。 %% %% 这个进程叫portServer8080,运行cold_start/5。 %% 注意,8080是我编的,如果端口是3333,那就叫portServer3333,这是port_name搞的。 %% %% 为啥叫cold_start呢?因为,cold_start只是用gen_server:listen注册了要监听端口 %% 和某些配置信息,并没有调用gen_server:accept来接受连接。 Name = port_name(Port), case whereis(Name) of undefined -> Self = self(), Pid = spawn_link(fun() -> cold_start(Self,Port,Fun,Max,PacketLength) end), receive {Pid, ok} -> register(Name, Pid), {ok, self()}; {Pid, Error} -> Error end; _Pid -> {error, already_started} end. stop(Port) when integer(Port) -> Name = port_name(Port), case whereis(Name) of undefined -> not_started; Pid -> exit(Pid, kill), (catch unregister(Name)), stopped end. children(Port) when integer(Port) -> port_name(Port) ! {children, self()}, receive {session_server, Reply} -> Reply end. port_name(Port) when integer(Port) -> list_to_atom("portServer" ++ integer_to_list(Port)). cold_start(Master, Port, Fun, Max, PacketLength) -> %% 现在,运行本函数的就是那个叫portServer8080的那个进程了。 %% 没错,它确实是一切事情的起点。但是,它除了注册监听信息, %% 所做的事情也只有两个,一个是start_accept/2,一个是启动socket_loop/5。 %% %% start_accept会创建一个进程,开始真正的接受连接,然后发消息给portServer8080, %% portServer8080它正在运行socket_loop。 %% %% 这里必须的说明一下,start_accept一旦接受一个连接后,就没人在接受连接了, %% (这是一段真空时间)所以,要通知portServer,自己已经在处理一个连接, %% portServer会决定是不是再新建一个start_accept再接受连接。portServer就是这样 %% 管理连接个数的。 process_flag(trap_exit, true), %% io:format("Starting a port server on ~p...~n",[Port]), case gen_tcp:listen(Port, [binary, %% {dontroute, true}, {nodelay,true}, {packet, PacketLength}, {reuseaddr, true}, {active, true}]) of {ok, Listen} -> %% io:format("Listening to:~p~n",[Listen]), Master ! {self(), ok}, New = start_accept(Listen, Fun), %% Now we're ready to run socket_loop(Listen, New, [], Fun, Max); Error -> Master ! {self(), Error} end. socket_loop(Listen, New, Active, Fun, Max) -> %% 一旦start_accept进程(也就是运行start_child的进程),接受到连接, %% 会发一个消息{istarted,MyPid},给portServer,portServer运行的是Socket_loop, %% 也就是到现在这个地方。 %% %% possibly_start_another的任务是看看现在连接的个数是否达到最大,如果到了极限, %% 就不运行start_accept,直接运行socket_loop(Listen, false, Active, Fun, Max)。 %% 一旦有某些连接没了,会收到{'EXIT', Pid, _Why}这样的消息,再看看连接个数, %% 以决定是否start_accept去监听。 receive {istarted, New} -> Active1 = [New|Active], possibly_start_another(false,Listen,Active1,Fun,Max); {'EXIT', New, _Why} -> %% io:format("Child exit=~p~n",[Why]), possibly_start_another(false,Listen,Active,Fun,Max); {'EXIT', Pid, _Why} -> %% io:format("Child exit=~p~n",[Why]), Active1 = lists:delete(Pid, Active), possibly_start_another(New,Listen,Active1,Fun,Max); {children, From} -> From ! {session_server, Active}, socket_loop(Listen,New,Active,Fun,Max); _Other -> socket_loop(Listen,New,Active,Fun,Max) end. possibly_start_another(New, Listen, Active, Fun, Max) %% 这个函数和socket_loop纠缠在一起,看看socket_loop的注释吧! when pid(New) -> socket_loop(Listen, New, Active, Fun, Max); possibly_start_another(false, Listen, Active, Fun, Max) -> case length(Active) of N when N < Max -> New = start_accept(Listen, Fun), socket_loop(Listen, New, Active, Fun,Max); _ -> socket_loop(Listen, false, Active, Fun, Max) end. start_accept(Listen, Fun) -> S = self(), spawn_link(fun() -> start_child(S, Listen, Fun) end). start_child(Parent, Listen, Fun) -> %% 注意这个child是个进程,因为一旦它拿到连接,本身还要变成MM呢! %% 在Fun(Socket)的时候,还记得那个start_port_instance吗? %% 还记得start_port_instance中的lib_chan_mm:loop(Socket,Controller)吗? %% %% 我再说一遍,在接到Socket后,会运行 %% fun(Socket) ->start_port_instance(Socket,ConfigData) end %% 这个东西。 case gen_tcp:accept(Listen) of {ok, Socket} -> Parent ! {istarted,self()}, % tell the controller inet:setopts(Socket, [{packet,4}, binary, {nodelay,true}, {active, true}]), %% before we activate socket %% io:format("running the child:~p Fun=~p~n", [Socket, Fun]), process_flag(trap_exit, true), case (catch Fun(Socket)) of {'EXIT', normal} -> true; {'EXIT', Why} -> io:format("Port process dies with exit:~p~n",[Why]), true; _ -> %% not an exit so everything's ok true end end.
最后,我在说些我遇到的问题。
现在我的问题是某些进程在系统关闭后,没有被关闭掉。
也就是说一旦运行lib_chan后,关闭的时候,清理不干净。
比如,portServer就没关闭掉,还有两三个进程没被关闭掉。
顺便再提一下otp。
我曾经很迷惑对otp的使用。最近又看了一遍后,我好像明白一点了。
我注意到otp中并没有socket,accept,gen_tcp,之类的通信程序。
也就是说,otp是一个应用程序框架,不是分布式,也不是网络。
不过,它是并发的,有很多的进程在合作着,而且,erlang进程收消息,本身就是支持并发(不管多少个进程同时给它发消息,都会排队进自己的邮箱)。
所以,一旦我想用otp写一个网络程序,可行的方法就是把otp程序作为一个稳定的后端,分布式和通信用单独的程序来做,然后调用用otp写的模块。正如书中那个web后端的例子。