LoginSignup
0
0

More than 5 years have passed since last update.

Reading mcall function in Rabbimq gen_server2

Last updated at Posted at 2018-11-13

rabbitmq_commonにはgen_serverを魔改造したgen_server2というモジュールがある、その中のmcallを勉強してみた

description of mcall

%% 10) an mcall/1 function has been added for performing multiple
%% call/3 in parallel. Unlike multi_call, which sends the same request
%% to same-named processes residing on a supplied list of nodes, it
%% operates on name/request pairs, where name is anything accepted by
%% call/3, i.e. a pid, global name, local name, or local name on a
%% particular node.

 原型のmulti_call

%%% Make a call to servers at several nodes.
%%% Returns: {[Replies],[BadNodes]}
%%% A Timeout can be given
%%%
%%% A middleman process is used in case late answers arrives after
%%% the timeout. If they would be allowed to glog the callers message
%%% queue, it would probably become confused. Late answers will
%%% now arrive to the terminated middleman and so be discarded.
%%% -----------------------------------------------------------------
multi_call(Name, Req)
  when is_atom(Name) ->
    do_multi_call([node() | nodes()], Name, Req, infinity).

multi_call(Nodes, Name, Req)
  when is_list(Nodes), is_atom(Name) ->
    do_multi_call(Nodes, Name, Req, infinity).

multi_call(Nodes, Name, Req, infinity) ->
    do_multi_call(Nodes, Name, Req, infinity);
multi_call(Nodes, Name, Req, Timeout)
  when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 ->
    do_multi_call(Nodes, Name, Req, Timeout).

まずNodeのリストを作成し、do_multi_callに渡します

do_multi_call(Nodes, Name, Req, infinity) ->
    Tag = make_ref(),
    Monitors = send_nodes(Nodes, Name, Tag, Req),
    rec_nodes(Tag, Monitors, Name, undefined);
do_multi_call(Nodes, Name, Req, Timeout) ->
    Tag = make_ref(),
    Caller = self(),
    Receiver =
        spawn(
          fun () ->
                  %% Middleman process. Should be unsensitive to regular
                  %% exit signals. The sychronization is needed in case
                  %% the receiver would exit before the caller started
                  %% the monitor.
                  process_flag(trap_exit, true),
                  Mref = erlang:monitor(process, Caller),
                  receive
                      {Caller,Tag} ->
                          Monitors = send_nodes(Nodes, Name, Tag, Req),
                          TimerId = erlang:start_timer(Timeout, self(), ok),
                          Result = rec_nodes(Tag, Monitors, Name, TimerId),
                          exit({self(),Tag,Result});
                      {'DOWN',Mref,_,_,_} ->
                          %% Caller died before sending us the go-ahead.
                          %% Give up silently.
                          exit(normal)
                  end
          end),
    Mref = erlang:monitor(process, Receiver),
    Receiver ! {self(),Tag},
    receive
        {'DOWN',Mref,_,_,{Receiver,Tag,Result}} ->
            Result;
        {'DOWN',Mref,_,_,Reason} ->
            %% The middleman code failed. Or someone did
            %% exit(_, kill) on the middleman process => Reason==killed
            exit(Reason)
    end.

do_multi_callに入り、ここではReceiverという中間プロセス(the middleman process)を作り、

Receiver ! {self(),Tag},

messageを投げてドーンと起動する、あとは同じリクエストを各Dest(Node)に送って、レシーブして、結果をもらった中間プロセスは

 exit({self(),Tag,Result})

で終了し、呼び出しプロセスがDownメッセージ経由でResultを回収する
中間プロセスを使う理由は呼び出しプロセスのメッセージキューを綺麗にして、multi_callで送ったメッセージエラーハンドリングに混乱されないためです (IMO)

生まれ変わったmcall

mcall(CallSpecs) ->
    Tag = make_ref(),
    {_, MRef} = spawn_monitor(
                  fun() ->
                          Refs = lists:foldl(
                                   fun ({Dest, _Request}=S, Dict) ->
                                           dict:store(do_mcall(S), Dest, Dict)
                                   end, dict:new(), CallSpecs),
                          collect_replies(Tag, Refs, [], [])
                  end),
    receive
        {'DOWN', MRef, _, _, {Tag, Result}} -> Result;
        {'DOWN', MRef, _, _, Reason}        -> exit(Reason)
    end.

do_mcall({{global,Name}=Dest, Request}) ->
    %% whereis_name is simply an ets lookup, and is precisely what
    %% global:send/2 does, yet we need a Ref to put in the call to the
    %% server, so invoking whereis_name makes a lot more sense here.
    case global:whereis_name(Name) of
        Pid when is_pid(Pid) ->
            MRef = erlang:monitor(process, Pid),
            catch msend(Pid, MRef, Request),
            MRef;
        undefined ->
            Ref = make_ref(),
            self() ! {'DOWN', Ref, process, Dest, noproc},
            Ref
    end;
do_mcall({{Name,Node}=Dest, Request}) when is_atom(Name), is_atom(Node) ->
    {_Node, MRef} = start_monitor(Node, Name), %% NB: we don't handle R6
    catch msend(Dest, MRef, Request),
    MRef;
do_mcall({Dest, Request}) when is_atom(Dest); is_pid(Dest) ->
    MRef = erlang:monitor(process, Dest),
    catch msend(Dest, MRef, Request),
    MRef.

msend(Dest, MRef, Request) ->
    erlang:send(Dest, {'$gen_call', {self(), MRef}, Request}, [noconnect]).

mcallの引数はCallSpecs(CallSpecのlist)、CallSpecの中身は{Dest, Request}{送信先、メッセージ}
mcallのロジックフローはこうです
1. 中間プロセスを立て

                          Refs = lists:foldl(
                                   fun ({Dest, _Request}=S, Dict) ->
                                           dict:store(do_mcall(S), Dest, Dict)
                                   end, dict:new(), CallSpecs),

RefsというDictsに送信先のモニターのRefと送信先(Pidなど)を入れる
2. 中間プロセスがcollect_repliesで返信を回収する

collect_replies(Tag, Refs, Replies, Errors) ->
    case dict:size(Refs) of
        0 -> exit({Tag, {Replies, Errors}});
        _ -> receive
                 {MRef, Reply} ->
                     {Refs1, Replies1} = handle_call_result(MRef, Reply,
                                                            Refs, Replies),
                     collect_replies(Tag, Refs1, Replies1, Errors);
                 {'DOWN', MRef, _, _, Reason} ->
                     Reason1 = case Reason of
                                   noconnection -> nodedown;
                                   _            -> Reason
                               end,
                     {Refs1, Errors1} = handle_call_result(MRef, Reason1,
                                                           Refs, Errors),
                     collect_replies(Tag, Refs1, Replies, Errors1)
             end
    end.

ここでは直列的にRefsにあるRefたちをマッチングして、送信の結果またはエラーを{Replies, Errors}に入れて、最終的にexitを使って呼び出しプロセスに送信する

まとめ

  • multi_callでは同じメッセージを複数Nodeにある同名のプロセスに送信することができる
  • mcallでは、CallSpecs次第に自由に送信できる(同じNodeの違うプロセス、違うNodeの違うプロセス)
0
0
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0