rabbitmq_commonにはgen_serverを魔改造したgen_server2というモジュールがある、その中のmcallを勉強してみた
description of mcall
%% 10) an mcall/1 function has been added for performing multiple
%% call/3 in parallel. Unlike multi_call, which sends the same request
%% to same-named processes residing on a supplied list of nodes, it
%% operates on name/request pairs, where name is anything accepted by
%% call/3, i.e. a pid, global name, local name, or local name on a
%% particular node.
原型のmulti_call
%%% Make a call to servers at several nodes.
%%% Returns: {[Replies],[BadNodes]}
%%% A Timeout can be given
%%%
%%% A middleman process is used in case late answers arrives after
%%% the timeout. If they would be allowed to glog the callers message
%%% queue, it would probably become confused. Late answers will
%%% now arrive to the terminated middleman and so be discarded.
%%% -----------------------------------------------------------------
multi_call(Name, Req)
when is_atom(Name) ->
do_multi_call([node() | nodes()], Name, Req, infinity).
multi_call(Nodes, Name, Req)
when is_list(Nodes), is_atom(Name) ->
do_multi_call(Nodes, Name, Req, infinity).
multi_call(Nodes, Name, Req, infinity) ->
do_multi_call(Nodes, Name, Req, infinity);
multi_call(Nodes, Name, Req, Timeout)
when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 ->
do_multi_call(Nodes, Name, Req, Timeout).
まずNodeのリストを作成し、do_multi_callに渡します
do_multi_call(Nodes, Name, Req, infinity) ->
Tag = make_ref(),
Monitors = send_nodes(Nodes, Name, Tag, Req),
rec_nodes(Tag, Monitors, Name, undefined);
do_multi_call(Nodes, Name, Req, Timeout) ->
Tag = make_ref(),
Caller = self(),
Receiver =
spawn(
fun () ->
%% Middleman process. Should be unsensitive to regular
%% exit signals. The sychronization is needed in case
%% the receiver would exit before the caller started
%% the monitor.
process_flag(trap_exit, true),
Mref = erlang:monitor(process, Caller),
receive
{Caller,Tag} ->
Monitors = send_nodes(Nodes, Name, Tag, Req),
TimerId = erlang:start_timer(Timeout, self(), ok),
Result = rec_nodes(Tag, Monitors, Name, TimerId),
exit({self(),Tag,Result});
{'DOWN',Mref,_,_,_} ->
%% Caller died before sending us the go-ahead.
%% Give up silently.
exit(normal)
end
end),
Mref = erlang:monitor(process, Receiver),
Receiver ! {self(),Tag},
receive
{'DOWN',Mref,_,_,{Receiver,Tag,Result}} ->
Result;
{'DOWN',Mref,_,_,Reason} ->
%% The middleman code failed. Or someone did
%% exit(_, kill) on the middleman process => Reason==killed
exit(Reason)
end.
do_multi_callに入り、ここではReceiver
という中間プロセス(the middleman process)を作り、
Receiver ! {self(),Tag},
messageを投げてドーンと起動する、あとは同じリクエストを各Dest(Node)に送って、レシーブして、結果をもらった中間プロセスは
exit({self(),Tag,Result})
で終了し、呼び出しプロセスがDown
メッセージ経由でResult
を回収する
中間プロセスを使う理由は呼び出しプロセスのメッセージキューを綺麗にして、multi_callで送ったメッセージエラーハンドリングに混乱されないためです (IMO)
生まれ変わったmcall
mcall(CallSpecs) ->
Tag = make_ref(),
{_, MRef} = spawn_monitor(
fun() ->
Refs = lists:foldl(
fun ({Dest, _Request}=S, Dict) ->
dict:store(do_mcall(S), Dest, Dict)
end, dict:new(), CallSpecs),
collect_replies(Tag, Refs, [], [])
end),
receive
{'DOWN', MRef, _, _, {Tag, Result}} -> Result;
{'DOWN', MRef, _, _, Reason} -> exit(Reason)
end.
do_mcall({{global,Name}=Dest, Request}) ->
%% whereis_name is simply an ets lookup, and is precisely what
%% global:send/2 does, yet we need a Ref to put in the call to the
%% server, so invoking whereis_name makes a lot more sense here.
case global:whereis_name(Name) of
Pid when is_pid(Pid) ->
MRef = erlang:monitor(process, Pid),
catch msend(Pid, MRef, Request),
MRef;
undefined ->
Ref = make_ref(),
self() ! {'DOWN', Ref, process, Dest, noproc},
Ref
end;
do_mcall({{Name,Node}=Dest, Request}) when is_atom(Name), is_atom(Node) ->
{_Node, MRef} = start_monitor(Node, Name), %% NB: we don't handle R6
catch msend(Dest, MRef, Request),
MRef;
do_mcall({Dest, Request}) when is_atom(Dest); is_pid(Dest) ->
MRef = erlang:monitor(process, Dest),
catch msend(Dest, MRef, Request),
MRef.
msend(Dest, MRef, Request) ->
erlang:send(Dest, {'$gen_call', {self(), MRef}, Request}, [noconnect]).
mcallの引数はCallSpecs(CallSpecのlist)、CallSpecの中身は{Dest, Request}
{送信先、メッセージ}
mcallのロジックフローはこうです
1. 中間プロセスを立て
Refs = lists:foldl(
fun ({Dest, _Request}=S, Dict) ->
dict:store(do_mcall(S), Dest, Dict)
end, dict:new(), CallSpecs),
RefsというDictsに送信先のモニターのRefと送信先(Pidなど)を入れる
2. 中間プロセスがcollect_repliesで返信を回収する
collect_replies(Tag, Refs, Replies, Errors) ->
case dict:size(Refs) of
0 -> exit({Tag, {Replies, Errors}});
_ -> receive
{MRef, Reply} ->
{Refs1, Replies1} = handle_call_result(MRef, Reply,
Refs, Replies),
collect_replies(Tag, Refs1, Replies1, Errors);
{'DOWN', MRef, _, _, Reason} ->
Reason1 = case Reason of
noconnection -> nodedown;
_ -> Reason
end,
{Refs1, Errors1} = handle_call_result(MRef, Reason1,
Refs, Errors),
collect_replies(Tag, Refs1, Replies, Errors1)
end
end.
ここでは直列的にRefsにあるRefたちをマッチングして、送信の結果またはエラーを{Replies, Errors}
に入れて、最終的にexitを使って呼び出しプロセスに送信する
まとめ
- multi_callでは同じメッセージを複数Nodeにある同名のプロセスに送信することができる
- mcallでは、CallSpecs次第に自由に送信できる(同じNodeの違うプロセス、違うNodeの違うプロセス)