23
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Elixir標準ライブラリRegistryを使ったPub/Sub

Last updated at Posted at 2017-03-09

Elixir は標準ライブラリに Registry が含まれている。どんなことができるかは Registryについて を読むとわかる。2020-02-09時点の最新安定版Elixir1.10にて動作確認した。

Registry の公式ドキュメントにも Pub/Sub に使えるよと書いてあるのだけど、どんな感じになるのかサンプルだけだとわかりにくかったので、Pub に Registry、Sub に GenServer を使う実装を試した。

pub_sub.ex
defmodule My.Sub do
  use GenServer, restart: :transient

  @pub My.Pub

  def start_link(topics) do
    GenServer.start_link(__MODULE__, topics)
  end

  def init(topics) do
    IO.puts("#{inspect(self())}: init")
    # TODO Registry.register/3 がエラーになったらどうしよう?
    for topic <- topics, do: Registry.register(@pub, topic, [])
    {:ok, topics}
  end

  def handle_cast(payload, topics) do
    IO.puts("#{inspect(self())}: handle_cast, payload = #{inspect(payload)}")
    # ここに受けとったときの処理を書く
    {:noreply, topics}
  end

  def terminate(reason, _topics) do
    IO.puts("#{inspect(self())}: terminate, reason = #{inspect(reason)}")
  end
end

# Pub
{:ok, _pub_supevisor} =
  Supervisor.start_link(
    [{Registry, keys: :duplicate, name: My.Pub, partitions: System.schedulers_online()}],
    strategy: :one_for_one,
    name: My.PubSupervisor
  )

# Sub
{:ok, sub_dynamic_supervisor} =
  Supervisor.start_link(
    [
      {DynamicSupervisor, strategy: :one_for_one, name: My.SubDynamicSupervisor}
    ],
    strategy: :one_for_one,
    name: My.SubSupervisor
  )

{:ok, sub1} = DynamicSupervisor.start_child(My.SubDynamicSupervisor, {My.Sub, ["foo"]})
IO.puts("#{inspect(sub1)} subscribes foo.")

{:ok, sub2} = DynamicSupervisor.start_child(My.SubDynamicSupervisor, {My.Sub, ["foo", "bar"]})
IO.puts("#{inspect(sub2)} subscribes foo and bar.")

{:ok, sub3} = DynamicSupervisor.start_child(My.SubDynamicSupervisor, {My.Sub, ["bar"]})
IO.puts("#{inspect(sub3)} subscribes bar.")

表にするとこのような状態だ。o が購読しているトピック、x は購読していないトピックになる。

foo bar baz
sub1 o x x
sub2 o o x
sub3 x o x

動作チェック

Registry.dispatch/3 を使って publish する。今回は entry の 2 引数目には興味がなかったので無視 (_ignored) した。

Registry.dispatch("<Registry名>", "<トピック>", fn entries ->
  for {pid, _ignored} <- entries, do: GenServer.cast(pid, "<Publishしたいメッセージ>")
end)

それぞれのトピックについての動作を試そう。

 iex
Erlang/OTP 22 [erts-10.6.4] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe]

Interactive Elixir (1.10.0) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> import_file("pub_sub.exs")
#PID<0.125.0>: init
#PID<0.125.0> subscribes foo.
#PID<0.126.0>: init
#PID<0.126.0> subscribes foo and bar.
#PID<0.127.0>: init
#PID<0.127.0> subscribes bar.
:ok

foo に興味があるクライアントにメッセージが投げられる

iex(2)> IO.puts("foo に興味があるクライアント #{inspect(sub1)}#{inspect(sub2)} にメッセージが投げられる")
foo に興味があるクライアント #PID<0.125.0> と #PID<0.126.0> にメッセージが投げられる
:ok
iex(3)> Registry.dispatch(My.Pub, "foo", fn entries ->
...(3)>   for {pid, _} <- entries,
...(3)>       do: GenServer.cast(pid, "broadcast to subscriber who subscribes topic foo")
...(3)> end)
:ok
#PID<0.126.0>: handle_cast, payload = "broadcast to subscriber who subscribes topic foo"
#PID<0.125.0>: handle_cast, payload = "broadcast to subscriber who subscribes topic foo"

bar に興味があるクライアントにメッセージが投げられる

iex(4)> IO.puts("bar に興味があるクライアント #{inspect(sub2)}#{inspect(sub3)} にメッセージが投げられる")
bar に興味があるクライアント #PID<0.126.0> と #PID<0.127.0> にメッセージが投げられる
:ok
iex(5)> Registry.dispatch(My.Pub, "bar", fn entries ->
...(5)>   for {pid, _} <- entries,
...(5)>       do: GenServer.cast(pid, "broadcast to subscriber who subscribes topic bar")
...(5)> end)
:ok
#PID<0.126.0>: handle_cast, payload = "broadcast to subscriber who subscribes topic bar"
#PID<0.127.0>: handle_cast, payload = "broadcast to subscriber who subscribes topic bar"

baz に興味があるクライアントにメッセージが投げられる

今回興味があるクライアントはいないので誰も反応しない。

iex(6)> IO.puts("baz に興味があるクライアントにメッセージが投げられるが、今回興味があるクライアントはいない") 
baz に興味があるクライアントにメッセージが投げられるが、今回興味があるクライアントはいない
:ok
iex(7)> Registry.dispatch(My.Pub, "baz", fn entries ->
...(7)>   for {pid, _} <- entries,
...(7)>       do: GenServer.cast(pid, "broadcast to subscriber who subscribes topic baz")
...(7)> end)
:ok

期待通りに動いている。

トピック毎に Subscriber を配列で管理するのとは違うのか?

例えばトピック毎に Subscriber を配列で管理するのとは異なるのだろうか?
あまりわかっていないので、知見/意見があれば気軽にコメント欄に書いてほしい。

ただし、少なくとも一つ便利なところがある。

Registrations に書いてあるとおり

if a process crashes, its keys are automatically removed from the registry

Subscriber がクラッシュ/終了したら検知して自動的に取り除いてくれる。試そう。

iex(8)> IO.puts("#{inspect(sub2)} を終わらせると,そのクライアントを取り除いてメッセージを投げてくれる")
#PID<0.126.0> を終わらせると,そのクライアントを取り除いてメッセージを投げてくれる
:ok
iex(9)> IO.puts("""
...(9)> #{inspect(sub2)} を終わらせる前: #{inspect(Registry.lookup(My.Pub, "foo"))}
...(9>> """)
#PID<0.126.0> を終わらせる前: [{#PID<0.125.0>, []}, {#PID<0.126.0>, []}]
iex(10)> :ok = GenServer.stop(sub2)
#PID<0.126.0>: terminate, reason = :normal
:ok
iex(11)> IO.puts("""
...(11)> #{inspect(sub2)} を終わらせた後: #{inspect(Registry.lookup(My.Pub, "foo"))}
...(11)> foo に興味があるクライアントは #{inspect(sub1)} のみとなった
...(11)> """)
#PID<0.126.0> を終わらせた後: [{#PID<0.125.0>, []}]
foo に興味があるクライアントは #PID<0.125.0> のみとなった

:ok
iex(12)> Registry.dispatch(My.Pub, "foo", fn entries ->
...(12)>   for {pid, _} <- entries,
...(12)>       do: GenServer.cast(pid, "broadcast to subscriber who subscribes topic foo")
...(12)> end)
#PID<0.125.0>: handle_cast, payload = "broadcast to subscriber who subscribes topic foo"
:ok

#PID<0.126.0> を終了する前は [{#PID<0.125.0>, []}, {#PID<0.126.0>, []}] だった
#PID<0.126.0> を終了した後は [{#PID<0.125.0>, []}] へと変化した。

期待通りに終了した Subscriber を取り除いてくれている。
有効な Subscriber かどうかの状態を管理しなくてよいのは助かる。

ドキュメントによると

but the change may not propagate immediately

とあり、実際にはすぐに取り除かれるとは限らないようだが、多くの標準ライブラリは存在しないプロセスに向かってメッセージを送っても問題のないの作りになっている。

Registry を Publisher に使おうとしてハマったところ

Registry.register/3 は コードが動いているところのプロセスを登録する

これは Publisher で使うときのみならず、プロセスを Registry へと登録するとき全般に言える。

Registry.register/3 のドキュメントに

Registers the current process

と書いてあった。

「A というプロセスから、B のプロセスを Registry へと登録する」といったことは行えず、
「A というプロセスから、自身のプロセスを Registry へと登録する」ことしかできない。

GenServer の via を使った Publisher への登録はできない

これは Publisher として Registry を使う ( プロセスの重複を許す :duplicate) ときにのみ起こる。

GenServer の作成時に :via を使って名前を Registry へ登録することができる。
https://hexdocs.pm/elixir/1.10/GenServer.html#module-name-registration

当初はこの機能を使えるかなと考えていたのだが、この機能は Registry.start_link/1 の第一引数がプロセスの重複を許さない ( :unique ) な場合のみ利用可能だった。

via.ex
# https://hexdocs.pm/elixir/1.10/Registry.html#module-using-in
defmodule Dummy do
  use GenServer
end
{:ok, _} = Registry.start_link(keys: :duplicate, name: MyRegistry)
{:ok, _} = GenServer.start_link(Dummy, [], name: {:via, Registry, {MyRegistry, "foo_topic"}})
# ** (ArgumentError) :via is not supported for duplicate registries
#     (elixir 1.10.0) lib/registry.ex:240: Registry.whereis_name/2
#     (stdlib 3.11.2) gen.erl:76: :gen.start/6

パフォーマンス

1000 個のクライアントが 1000 個のトピックを購読しているとき、
1 つのトピックに Publish したら 1000 個のクライアントに届くまでにどのくらいかかるか手元の MacbookPro で測ってみた。

perfomance.ex
defmodule My.Sub do
  use GenServer, restart: :transient

  @pub My.Pub

  def start_link(topics) do
    GenServer.start_link(__MODULE__, topics)
  end

  def init(topics) do
    for topic <- topics, do: Registry.register(@pub, topic, [])
    {:ok, topics}
  end
end

# Pub
{:ok, _pub_supevisor} =
  Supervisor.start_link(
    [{Registry, keys: :duplicate, name: My.Pub, partitions: System.schedulers_online()}],
    strategy: :one_for_one,
    name: My.PubSupervisor
  )

# Sub
{:ok, _sub_dynamic_supervisor} =
  Supervisor.start_link(
    [
      {DynamicSupervisor, strategy: :one_for_one, name: My.SubDynamicSupervisor}
    ],
    strategy: :one_for_one,
    name: My.SubSupervisor
  )

topics = 0..999
IO.puts("topics count: #{Enum.count(topics)}")
clients = 0..999
IO.puts("clients count: #{Enum.count(clients)}")

for _client <- clients do
  {:ok, _pid} = DynamicSupervisor.start_child(My.SubDynamicSupervisor, {My.Sub, topics})
end

clients_per_topic = Registry.lookup(My.Pub, 0)
# 1 トピックあたり 1000 個のクライアント
IO.puts("clients count per topic: #{Enum.count(clients_per_topic)}")

{execution_time, _answer} =
  :timer.tc(Registry, :dispatch, [
    My.Pub,
    0,
    fn entries ->
      for {pid, _ignored} <- entries, do: GenServer.cast(pid, "hi")
    end
  ])

# あるトピックを 1000 クライアントに届ける時間
IO.puts("execution time is #{execution_time} micro seconds")

# topics count: 1000
# clients count: 1000
# clients count per topic: 1000
# execution time is 1798 micro seconds

結果 2000 μs = 2 ms 程度なので、人によっては遅く感じるかもしれないが、実用的な速度だと思う。

まとめ

Elixir は標準ライブラリだけで実用的な速度の Pub/Sub を行える。

23
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
23
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?