Elixir は標準ライブラリに Registry が含まれている。どんなことができるかは Registryについて を読むとわかる。2020-02-09時点の最新安定版Elixir1.10にて動作確認した。
Registry の公式ドキュメントにも Pub/Sub に使えるよと書いてあるのだけど、どんな感じになるのかサンプルだけだとわかりにくかったので、Pub に Registry、Sub に GenServer を使う実装を試した。
defmodule My.Sub do
use GenServer, restart: :transient
@pub My.Pub
def start_link(topics) do
GenServer.start_link(__MODULE__, topics)
end
def init(topics) do
IO.puts("#{inspect(self())}: init")
# TODO Registry.register/3 がエラーになったらどうしよう?
for topic <- topics, do: Registry.register(@pub, topic, [])
{:ok, topics}
end
def handle_cast(payload, topics) do
IO.puts("#{inspect(self())}: handle_cast, payload = #{inspect(payload)}")
# ここに受けとったときの処理を書く
{:noreply, topics}
end
def terminate(reason, _topics) do
IO.puts("#{inspect(self())}: terminate, reason = #{inspect(reason)}")
end
end
# Pub
{:ok, _pub_supevisor} =
Supervisor.start_link(
[{Registry, keys: :duplicate, name: My.Pub, partitions: System.schedulers_online()}],
strategy: :one_for_one,
name: My.PubSupervisor
)
# Sub
{:ok, sub_dynamic_supervisor} =
Supervisor.start_link(
[
{DynamicSupervisor, strategy: :one_for_one, name: My.SubDynamicSupervisor}
],
strategy: :one_for_one,
name: My.SubSupervisor
)
{:ok, sub1} = DynamicSupervisor.start_child(My.SubDynamicSupervisor, {My.Sub, ["foo"]})
IO.puts("#{inspect(sub1)} subscribes foo.")
{:ok, sub2} = DynamicSupervisor.start_child(My.SubDynamicSupervisor, {My.Sub, ["foo", "bar"]})
IO.puts("#{inspect(sub2)} subscribes foo and bar.")
{:ok, sub3} = DynamicSupervisor.start_child(My.SubDynamicSupervisor, {My.Sub, ["bar"]})
IO.puts("#{inspect(sub3)} subscribes bar.")
表にするとこのような状態だ。o
が購読しているトピック、x
は購読していないトピックになる。
foo | bar | baz | |
---|---|---|---|
sub1 | o | x | x |
sub2 | o | o | x |
sub3 | x | o | x |
動作チェック
Registry.dispatch/3 を使って publish する。今回は entry の 2 引数目には興味がなかったので無視 (_ignored
) した。
Registry.dispatch("<Registry名>", "<トピック>", fn entries ->
for {pid, _ignored} <- entries, do: GenServer.cast(pid, "<Publishしたいメッセージ>")
end)
それぞれのトピックについての動作を試そう。
❯ iex
Erlang/OTP 22 [erts-10.6.4] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe]
Interactive Elixir (1.10.0) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> import_file("pub_sub.exs")
#PID<0.125.0>: init
#PID<0.125.0> subscribes foo.
#PID<0.126.0>: init
#PID<0.126.0> subscribes foo and bar.
#PID<0.127.0>: init
#PID<0.127.0> subscribes bar.
:ok
foo に興味があるクライアントにメッセージが投げられる
iex(2)> IO.puts("foo に興味があるクライアント #{inspect(sub1)} と #{inspect(sub2)} にメッセージが投げられる")
foo に興味があるクライアント #PID<0.125.0> と #PID<0.126.0> にメッセージが投げられる
:ok
iex(3)> Registry.dispatch(My.Pub, "foo", fn entries ->
...(3)> for {pid, _} <- entries,
...(3)> do: GenServer.cast(pid, "broadcast to subscriber who subscribes topic foo")
...(3)> end)
:ok
#PID<0.126.0>: handle_cast, payload = "broadcast to subscriber who subscribes topic foo"
#PID<0.125.0>: handle_cast, payload = "broadcast to subscriber who subscribes topic foo"
bar に興味があるクライアントにメッセージが投げられる
iex(4)> IO.puts("bar に興味があるクライアント #{inspect(sub2)} と #{inspect(sub3)} にメッセージが投げられる")
bar に興味があるクライアント #PID<0.126.0> と #PID<0.127.0> にメッセージが投げられる
:ok
iex(5)> Registry.dispatch(My.Pub, "bar", fn entries ->
...(5)> for {pid, _} <- entries,
...(5)> do: GenServer.cast(pid, "broadcast to subscriber who subscribes topic bar")
...(5)> end)
:ok
#PID<0.126.0>: handle_cast, payload = "broadcast to subscriber who subscribes topic bar"
#PID<0.127.0>: handle_cast, payload = "broadcast to subscriber who subscribes topic bar"
baz に興味があるクライアントにメッセージが投げられる
今回興味があるクライアントはいないので誰も反応しない。
iex(6)> IO.puts("baz に興味があるクライアントにメッセージが投げられるが、今回興味があるクライアントはいない")
baz に興味があるクライアントにメッセージが投げられるが、今回興味があるクライアントはいない
:ok
iex(7)> Registry.dispatch(My.Pub, "baz", fn entries ->
...(7)> for {pid, _} <- entries,
...(7)> do: GenServer.cast(pid, "broadcast to subscriber who subscribes topic baz")
...(7)> end)
:ok
期待通りに動いている。
トピック毎に Subscriber を配列で管理するのとは違うのか?
例えばトピック毎に Subscriber を配列で管理するのとは異なるのだろうか?
あまりわかっていないので、知見/意見があれば気軽にコメント欄に書いてほしい。
ただし、少なくとも一つ便利なところがある。
Registrations に書いてあるとおり
if a process crashes, its keys are automatically removed from the registry
Subscriber がクラッシュ/終了したら検知して自動的に取り除いてくれる。試そう。
iex(8)> IO.puts("#{inspect(sub2)} を終わらせると,そのクライアントを取り除いてメッセージを投げてくれる")
#PID<0.126.0> を終わらせると,そのクライアントを取り除いてメッセージを投げてくれる
:ok
iex(9)> IO.puts("""
...(9)> #{inspect(sub2)} を終わらせる前: #{inspect(Registry.lookup(My.Pub, "foo"))}
...(9>> """)
#PID<0.126.0> を終わらせる前: [{#PID<0.125.0>, []}, {#PID<0.126.0>, []}]
iex(10)> :ok = GenServer.stop(sub2)
#PID<0.126.0>: terminate, reason = :normal
:ok
iex(11)> IO.puts("""
...(11)> #{inspect(sub2)} を終わらせた後: #{inspect(Registry.lookup(My.Pub, "foo"))}
...(11)> foo に興味があるクライアントは #{inspect(sub1)} のみとなった
...(11)> """)
#PID<0.126.0> を終わらせた後: [{#PID<0.125.0>, []}]
foo に興味があるクライアントは #PID<0.125.0> のみとなった
:ok
iex(12)> Registry.dispatch(My.Pub, "foo", fn entries ->
...(12)> for {pid, _} <- entries,
...(12)> do: GenServer.cast(pid, "broadcast to subscriber who subscribes topic foo")
...(12)> end)
#PID<0.125.0>: handle_cast, payload = "broadcast to subscriber who subscribes topic foo"
:ok
#PID<0.126.0>
を終了する前は [{#PID<0.125.0>, []}, {#PID<0.126.0>, []}]
だった
#PID<0.126.0>
を終了した後は [{#PID<0.125.0>, []}]
へと変化した。
期待通りに終了した Subscriber を取り除いてくれている。
有効な Subscriber かどうかの状態を管理しなくてよいのは助かる。
ドキュメントによると
but the change may not propagate immediately
とあり、実際にはすぐに取り除かれるとは限らないようだが、多くの標準ライブラリは存在しないプロセスに向かってメッセージを送っても問題のないの作りになっている。
Registry を Publisher に使おうとしてハマったところ
Registry.register/3 は コードが動いているところのプロセスを登録する
これは Publisher で使うときのみならず、プロセスを Registry へと登録するとき全般に言える。
Registry.register/3 のドキュメントに
Registers the current process
と書いてあった。
「A というプロセスから、B のプロセスを Registry へと登録する」といったことは行えず、
「A というプロセスから、自身のプロセスを Registry へと登録する」ことしかできない。
GenServer の via
を使った Publisher への登録はできない
これは Publisher として Registry を使う ( プロセスの重複を許す :duplicate
) ときにのみ起こる。
GenServer の作成時に :via
を使って名前を Registry へ登録することができる。
https://hexdocs.pm/elixir/1.10/GenServer.html#module-name-registration
当初はこの機能を使えるかなと考えていたのだが、この機能は Registry.start_link/1 の第一引数がプロセスの重複を許さない ( :unique
) な場合のみ利用可能だった。
# https://hexdocs.pm/elixir/1.10/Registry.html#module-using-in
defmodule Dummy do
use GenServer
end
{:ok, _} = Registry.start_link(keys: :duplicate, name: MyRegistry)
{:ok, _} = GenServer.start_link(Dummy, [], name: {:via, Registry, {MyRegistry, "foo_topic"}})
# ** (ArgumentError) :via is not supported for duplicate registries
# (elixir 1.10.0) lib/registry.ex:240: Registry.whereis_name/2
# (stdlib 3.11.2) gen.erl:76: :gen.start/6
パフォーマンス
1000 個のクライアントが 1000 個のトピックを購読しているとき、
1 つのトピックに Publish したら 1000 個のクライアントに届くまでにどのくらいかかるか手元の MacbookPro で測ってみた。
defmodule My.Sub do
use GenServer, restart: :transient
@pub My.Pub
def start_link(topics) do
GenServer.start_link(__MODULE__, topics)
end
def init(topics) do
for topic <- topics, do: Registry.register(@pub, topic, [])
{:ok, topics}
end
end
# Pub
{:ok, _pub_supevisor} =
Supervisor.start_link(
[{Registry, keys: :duplicate, name: My.Pub, partitions: System.schedulers_online()}],
strategy: :one_for_one,
name: My.PubSupervisor
)
# Sub
{:ok, _sub_dynamic_supervisor} =
Supervisor.start_link(
[
{DynamicSupervisor, strategy: :one_for_one, name: My.SubDynamicSupervisor}
],
strategy: :one_for_one,
name: My.SubSupervisor
)
topics = 0..999
IO.puts("topics count: #{Enum.count(topics)}")
clients = 0..999
IO.puts("clients count: #{Enum.count(clients)}")
for _client <- clients do
{:ok, _pid} = DynamicSupervisor.start_child(My.SubDynamicSupervisor, {My.Sub, topics})
end
clients_per_topic = Registry.lookup(My.Pub, 0)
# 1 トピックあたり 1000 個のクライアント
IO.puts("clients count per topic: #{Enum.count(clients_per_topic)}")
{execution_time, _answer} =
:timer.tc(Registry, :dispatch, [
My.Pub,
0,
fn entries ->
for {pid, _ignored} <- entries, do: GenServer.cast(pid, "hi")
end
])
# あるトピックを 1000 クライアントに届ける時間
IO.puts("execution time is #{execution_time} micro seconds")
# topics count: 1000
# clients count: 1000
# clients count per topic: 1000
# execution time is 1798 micro seconds
結果 2000 μs = 2 ms 程度なので、人によっては遅く感じるかもしれないが、実用的な速度だと思う。
まとめ
Elixir は標準ライブラリだけで実用的な速度の Pub/Sub を行える。