lib_chan 库说明

在《programming erlang》中,lib_chan是比较复杂的代码,至少我是读了好多遍才算明白。

我不知道是这段代码太复杂,还是我的智商有问题。至少,我不太习惯erlang的这种方式。不过,我很喜欢这个lib_chan的思路和erlang程序的风格。这种方式非常的明确,直接。我不知道为什么erlang没有一个类似的成熟的库。这个lib_chan好像是joearmstrong为这本书写的,难道erlang根本不需要这样的东西?

最好结合书里的那个聊天程序来理解lib_chan。这确实是个很精巧的东西(lib_chan还谈不上精巧,那个chat程序才精巧)。

注意:我只解释重要的地方。

lib_chan是一个类似总外壳的东西,定义接口,连接起诸多细节,这里我去掉了一些琐碎的函数。

%% ---
%%  Excerpted from "Programming Erlang",
%%  published by The Pragmatic Bookshelf.
%%  Copyrights apply to this code. It may not be used to create training material, 
%%  courses, books, articles, and the like. Contact us if you are in doubt.
%%  We make no guarantees that this code is fit for any purpose. 
%%  Visit http://www.pragmaticprogrammer.com/titles/jaerlang for more book information.
%%---
-module(lib_chan).
-export([cast/2, start_server/0, start_server/1,
	 connect/5, disconnect/1, rpc/2]).
-import(lists, [map/2, member/2, foreach/2]).
-import(lib_chan_mm, [send/2, close/1]).

%%----------------------------------------------------------------------
%% Server code
start_server1(ConfigData) ->
    register(lib_chan, spawn(fun() -> start_server2(ConfigData) end)).

start_server2(ConfigData) ->
    [Port] = [ P || {port,P} <- ConfigData],
    start_port_server(Port, ConfigData).

start_port_server(Port, ConfigData) ->
	%% lib_chan_cs:start_raw_server 函数会启动端口监听,并且,一旦接受连接,
	%% 也就是说,拿到一个Socket的时候(一个Channel),就给现在传进来的fun(Socket)。
	%% 
	%% 也就是说,得到Socket就是lib_chan_cs:start_raw_server的第一阶段的成果,
	%% 之后,它的任务就是把拿到的Socket给fun(Socket),就这么简单。
	%%	
	%% 	而fun(Socket)做的事情就是,start_port_instance(Socket,ConfigData)。
	%%	看见了吗?我们现在有了Socket(先不必考虑监听,accept之类的事),并且,
	%%	我们可以从ConfigData里拿到要运行的目标(一个函数,或者说是一个入口)。
	%%	现在,你需要的都全了吧?!更细节的是下一步了。
    lib_chan_cs:start_raw_server(Port, 
				fun(Socket) -> 
					start_port_instance(Socket, 
							    ConfigData) end,
				100,
				4).

start_port_instance(Socket, ConfigData) ->
	%%	现在,我们已经有了Socket,我们还知道ConfigData,但是,Socket不是MM,
	%% 	我们必须弄成中间人模式,所以,要用lib_chan_mm:loop(Socket,Controller)。
	%%	谁运行lib_chan_mm:loop/2,谁就是MM。明白了吧?
	%%	那么,在运行lib_chan_mm:loop/2之前,必须的搞一个Controller,也就是说,
	%%	MM必须的知道把自己收到的消息给谁,这个谁就是Controller。
	%%	Controller也是个process,所以要spawn。
    S = self(),
    Controller = spawn_link(fun() -> start_erl_port_server(S, ConfigData) end),
    lib_chan_mm:loop(Socket, Controller).

start_erl_port_server(MM, ConfigData) ->
	%%	这个函数是Controller要运行的,我知道MM会给我消息,我也知道我要运行的东西
	%%	在ConfigData里。
    receive
	{chan, MM, {startService, Mod, ArgC}} ->
	    case get_service_definition(Mod, ConfigData) of
		{yes, Pwd, MFA} ->
		    case Pwd of
			none ->
			    send(MM, ack),
			    really_start(MM, ArgC, MFA);
			_ ->
			    do_authentication(Pwd, MM, ArgC, MFA)
		    end;
		no ->
		    io:format("sending bad service~n"),
		    send(MM, badService),
		    close(MM)
	    end;
	Any ->
	    io:format("*** ErL port server got:~p ~p~n",[MM, Any]),
	    exit({protocolViolation, Any})
    end.

%% MM is the middle man
%% Mod is the Module we want to execute ArgC and ArgS come from the client and
%% server respectively

really_start(MM, ArgC, {Mod, Func, ArgS}) ->
	%%	这就是start_erl_port_server/2的推进版,已经把ConfigData解析成mfa。
	%%	Controller最终与运行到这里,也就是说Controller会运行apply(Mod,Func,[MM,ArgC,ArgS])。
	%%	ok!
    case (catch apply(Mod,Func,[MM,ArgC,ArgS])) of
	{'EXIT', normal} ->
	    true;
	{'EXIT', Why} ->
	    io:format("server error:~p~n",[Why]);
	Why ->
	    io:format("server error should die with exit(normal) was:~p~n",
		      [Why])
    end.

%% get_service_definition(Name, ConfigData)

get_service_definition(Mod, [{service, Mod, password, Pwd, mfa, M, F, A}|_]) ->
    {yes, Pwd, {M, F, A}};
get_service_definition(Name, [_|T]) ->
    get_service_definition(Name, T);
get_service_definition(_, []) ->
    no.

下面的这个是lib_chan_cs,它构造了服务器端的结构和机制。

而lib_chan_mm则很简单,只是翻译了一下数据。

所以,关于lib_chan_mm我也不想解释什么,如果你读不懂,那么肯定是你自己的问题了。

%% ---
%%  Excerpted from "Programming Erlang",
%%  published by The Pragmatic Bookshelf.
%%  Copyrights apply to this code. It may not be used to create training material, 
%%  courses, books, articles, and the like. Contact us if you are in doubt.
%%  We make no guarantees that this code is fit for any purpose. 
%%  Visit http://www.pragmaticprogrammer.com/titles/jaerlang for more book information.
%%---
-module(lib_chan_cs).
%% cs stands for client_server

-export([start_raw_server/4, start_raw_client/3]).
-export([stop/1]).
-export([children/1]).


%% start_raw_server(Port, Fun, Max)
%%   This server accepts up to Max connections on Port
%%   The *first* time a connection is made to Port
%%   Then Fun(Socket) is called. 
%%   Thereafter messages to the socket result in messages to the handler.

%% tcp_is typically used as follows:
%% To setup a listener
%%   start_agent(Port) ->    
%%     process_flag(trap_exit, true),
%%     lib_chan_server:start_raw_server(Port, 
%% 		         	       fun(Socket) -> input_handler(Socket) end, 
%% 				       15,
%%                                     0).

start_raw_client(Host, Port, PacketLength) ->
    gen_tcp:connect(Host, Port,
		    [binary, {active, true}, {packet, PacketLength}]).

%% Note when start_raw_server returns it should be ready to
%% Immediately accept connections

start_raw_server(Port, Fun, Max, PacketLength) ->
	%%	从这里启动服务器。
	%%	注意,它并没有自己直接去启动,而是用一个进程去干这事。
	%%
	%%	这个进程叫portServer8080,运行cold_start/5。
	%%	注意,8080是我编的,如果端口是3333,那就叫portServer3333,这是port_name搞的。
	%% 	
	%%	为啥叫cold_start呢?因为,cold_start只是用gen_server:listen注册了要监听端口
	%%	和某些配置信息,并没有调用gen_server:accept来接受连接。
    Name = port_name(Port),
    case whereis(Name) of
	undefined ->
	    Self = self(),
	    Pid = spawn_link(fun() ->
				 cold_start(Self,Port,Fun,Max,PacketLength)
			     end),
	    receive
		{Pid, ok} ->
		    register(Name, Pid),
		    {ok, self()};
		{Pid, Error} ->
		    Error
	    end;
	_Pid ->
	    {error, already_started}
    end.

stop(Port) when integer(Port) ->
    Name = port_name(Port),
    case whereis(Name) of
	undefined ->
	    not_started;
	Pid ->
	    exit(Pid, kill),
	    (catch unregister(Name)),
	    stopped
    end.
children(Port) when integer(Port) ->
    port_name(Port) ! {children, self()},
    receive
	{session_server, Reply} -> Reply
    end.


port_name(Port) when integer(Port) ->
    list_to_atom("portServer" ++ integer_to_list(Port)).


cold_start(Master, Port, Fun, Max, PacketLength) ->
	%%	现在,运行本函数的就是那个叫portServer8080的那个进程了。
	%%	没错,它确实是一切事情的起点。但是,它除了注册监听信息,
	%%	所做的事情也只有两个,一个是start_accept/2,一个是启动socket_loop/5。
	%%	
	%%	start_accept会创建一个进程,开始真正的接受连接,然后发消息给portServer8080,
	%%	portServer8080它正在运行socket_loop。
	%%	
	%%	这里必须的说明一下,start_accept一旦接受一个连接后,就没人在接受连接了,
	%%	(这是一段真空时间)所以,要通知portServer,自己已经在处理一个连接,
	%%	portServer会决定是不是再新建一个start_accept再接受连接。portServer就是这样
	%%	管理连接个数的。
    process_flag(trap_exit, true),
    %% io:format("Starting a port server on ~p...~n",[Port]),
    case gen_tcp:listen(Port, [binary,
			       %% {dontroute, true},
			       {nodelay,true},
			       {packet, PacketLength},
			       {reuseaddr, true}, 
			       {active, true}]) of
	{ok, Listen} ->
	    %% io:format("Listening to:~p~n",[Listen]),
	    Master ! {self(), ok},
	    New = start_accept(Listen, Fun),
	    %% Now we're ready to run
	    socket_loop(Listen, New, [], Fun, Max);
	Error ->
	    Master ! {self(), Error}
    end.


socket_loop(Listen, New, Active, Fun, Max) ->
	%%	一旦start_accept进程(也就是运行start_child的进程),接受到连接,
	%%	会发一个消息{istarted,MyPid},给portServer,portServer运行的是Socket_loop,
	%%	也就是到现在这个地方。
	%%	
	%%	possibly_start_another的任务是看看现在连接的个数是否达到最大,如果到了极限,
	%%	就不运行start_accept,直接运行socket_loop(Listen, false, Active, Fun, Max)。
	%%	一旦有某些连接没了,会收到{'EXIT', Pid, _Why}这样的消息,再看看连接个数,
	%%	以决定是否start_accept去监听。
    receive
	{istarted, New} ->
	    Active1 = [New|Active],
	    possibly_start_another(false,Listen,Active1,Fun,Max);
	{'EXIT', New, _Why} ->
	    %% io:format("Child exit=~p~n",[Why]),
	    possibly_start_another(false,Listen,Active,Fun,Max);
	{'EXIT', Pid, _Why} ->
	    %% io:format("Child exit=~p~n",[Why]),
	    Active1 = lists:delete(Pid, Active),
	    possibly_start_another(New,Listen,Active1,Fun,Max);
	{children, From} ->
	    From ! {session_server, Active},
	    socket_loop(Listen,New,Active,Fun,Max);
	_Other ->
	    socket_loop(Listen,New,Active,Fun,Max)
    end.


possibly_start_another(New, Listen, Active, Fun, Max) 
	%% 这个函数和socket_loop纠缠在一起,看看socket_loop的注释吧!
  when pid(New) ->
    socket_loop(Listen, New, Active, Fun, Max);
possibly_start_another(false, Listen, Active, Fun, Max) ->
    case length(Active) of
	N when N < Max ->
	    New = start_accept(Listen, Fun),
	    socket_loop(Listen, New, Active, Fun,Max);
	_ ->
	    socket_loop(Listen, false, Active, Fun, Max)
    end.

start_accept(Listen, Fun) ->
    S = self(),
    spawn_link(fun() -> start_child(S, Listen, Fun) end).

start_child(Parent, Listen, Fun) ->
	%%	注意这个child是个进程,因为一旦它拿到连接,本身还要变成MM呢!
	%%	在Fun(Socket)的时候,还记得那个start_port_instance吗?
	%%	还记得start_port_instance中的lib_chan_mm:loop(Socket,Controller)吗?
	%%	
	%%	我再说一遍,在接到Socket后,会运行
	%%	fun(Socket) ->start_port_instance(Socket,ConfigData) end
	%%	这个东西。
    case gen_tcp:accept(Listen) of
	{ok, Socket} ->
	    Parent ! {istarted,self()},		    % tell the controller
	    inet:setopts(Socket, [{packet,4},
				  binary,
				  {nodelay,true},
				  {active, true}]), 
	    %% before we activate socket
	    %% io:format("running the child:~p Fun=~p~n", [Socket, Fun]),
	    process_flag(trap_exit, true),
	    case (catch Fun(Socket)) of
		{'EXIT', normal} ->
		    true;
		{'EXIT', Why} ->
		    io:format("Port process dies with exit:~p~n",[Why]),
		    true;
		_ ->
		    %% not an exit so everything's ok
		    true
	    end
    end.

最后,我在说些我遇到的问题。

现在我的问题是某些进程在系统关闭后,没有被关闭掉。

也就是说一旦运行lib_chan后,关闭的时候,清理不干净。

比如,portServer就没关闭掉,还有两三个进程没被关闭掉。

顺便再提一下otp。

我曾经很迷惑对otp的使用。最近又看了一遍后,我好像明白一点了。

我注意到otp中并没有socket,accept,gen_tcp,之类的通信程序。

也就是说,otp是一个应用程序框架,不是分布式,也不是网络。

不过,它是并发的,有很多的进程在合作着,而且,erlang进程收消息,本身就是支持并发(不管多少个进程同时给它发消息,都会排队进自己的邮箱)。

所以,一旦我想用otp写一个网络程序,可行的方法就是把otp程序作为一个稳定的后端,分布式和通信用单独的程序来做,然后调用用otp写的模块。正如书中那个web后端的例子。

相关推荐