今回は昨年2019年にどうにもうまく手懐けられなかった Registry による Pub/Sub をやってみます。一昨年2018年の fukuoka.ex Elixir/Phoenix Advent Calendar 2018 の記事 階段の上でも下でも電灯を点けたり消したりする ではじめて Registry を使って以来、何とかちゃんと理解してうまいこと使ってみたいと思いつつ1年以上が経過してしまいました。
Registry とは
Elixir や Erlang ではプロセスに名前をつけることができます。名前の付け方にはおおよそ以下の種類があります。
- 名前を付けない:
{:ok, pid} = ...
でプロセスを起動してpid
を持ち回る - 決め打ちで名前を付ける:プロセスの起動の際に
name: :my_process_name
などとやる - それなりに自由に名前を付ける:プロセスを登録するレジストリライブラリを用いる
最後のプロセスを登録するライブラリというのには沢山種類があります。「プログラミング Elixir」の15.2章「プロセスの名前付け」には Erlang の :global ライブラリを用いてプロセスに名前を付ける方法が載っています。Elixir には Registry というライブラリがあり、これをプロセスの名前付けに使えます。この他、Erlang の register 関数, Erlang ライブラリの gprop, pg2, syn, Elixir ライブラリの swarm … といくつも乱立してます。1
Elixir の Registry
プロセスの名前を付けるライブラリには、「名前を付けたプロセスに対して何かのアクションを行う」という関数があります。全部調べたわけではないですが、おそらくどのライブラリも持っているでしょう。この「名前を付けたプロセスに対して何かのアクションを行う」動作は、うまいこと使うと Pub/Sub の機構を実現できます。
Elixir の Registry にも Using as a Pub/Sub という説明があって簡単な例が載っています。しかしこれが分かりにくい。これはいきなりここを読むのではなくその前の節の Using as a Dispatcher から読むとまだ分かるのですが、それでもやっぱりチト辛い。
じゃあ日本語の解説はないのかと言うとそんなことはなくてちゃんと @niku さんが Elixir標準ライブラリRegistryを使ったPub/Sub という記事を書いてくれてます。これは Supervisor それに DyanamicSupervisor も使ったちゃんとした例が載っています。
なので今回の私の記事は二番煎じなんですが、プロセスツリーが複雑だと元の Pub/Sub がどのように実現されているか分かりにくくなるきらいがあるので、できるだけシンプルに纏めるのを試みるのが今回の記事です。
Registry での Pub/Sub の実現
Pub/Sub は情報交換のモデルの一つです。情報を発信する側と受信する側では共通のネタ (topic) を持っています。しかしながら、どの受信者がどのネタを欲しがってるのかは発信者はわからないものとします。
今回は(というか実現の方法は一択と思いますが) Subscriber ごとにプロセスを立てて、それを Registry に登録することをします。Publish は Registry に対して刺激を与え、Registry がネタに従って適切な Subscriber へメッセージを伝達するという振る舞いをします。
- publisher: 情報を提供する側:特定の topic に対する情報を出す
- ネタとそれに関するメッセージを発信する
- subscriber: 情報を貰う側:特定の topic に対する情報をもらう
- subscriber の名前と、興味のある topic(複数可、ゼロでも可)を持つ
最初に subscriber の登録をします。Registry プロセスがすでに起動されているものとして以下をsubscriber の数だけ行います。
- GenServer で subscriber 単位のプロセスを起動する
- topic ごとに
Registry.register/3
関数で登録をする
登録がある状態で publisher が情報を発信すると、Registry がよろしく subscriber に情報を渡します。
-
Registry.dispatch/3
関数で、当該ネタで登録している subscriber プロセスに対してコールバック関数を適用する - 今回は
GenServer.cast/2
関数でログ出力する - ここに具体的なアクションを書けば subscriber ごとの動作を記述できる
実際のプログラムは以下です。
defmodule PubSub do
require Logger
@behaviour GenServer
def subscribe(regname, subname, topics) do
GenServer.start_link(__MODULE__, {regname, subname, topics})
end
def init({regname, subname, topics}) do
Logger.debug("#{inspect(self())}: #{subname}")
for topic <- topics do
Registry.register(regname, topic, subname)
Logger.debug("register topic #{topic} for #{subname}")
end
{:ok, topics}
end
def publish(regname, topic, mes) do
Registry.dispatch(regname, topic,
fn entries ->
for {pid, subname} <- entries, do: GenServer.cast(pid, {subname, mes})
end)
end
def handle_cast({subname, mes}, topics) do
Logger.debug("#{inspect(self())}: #{subname} gets #{inspect(mes)}")
{:noreply, topics}
end
end
これを使って実際に Pub/Sub をしてみます。
- まず Registry を
Registry.start_link/0
関数で起動する(Registry も一つのプロセスです) - 1つのキーで複数のプロセスを登録するオプション
keys: :duplicate
を指定する - 以下を subscriber として登録
- A はネタ afo に関する情報が欲しい
- B はネタ bar に関する情報が欲しい
- C はネタ afo に関する情報と bar と両方が欲しい
- Z は特になんのネタに関する情報も欲しくない
- この状態で publish を実行
- ネタ afo に関する情報が欲しい相手に対して "hello afo" を送る
- ネタ bar に関する情報が欲しい相手に対して "hello bar" を送る
- ネタ foo に関する情報が欲しい相手に対して "hello foo" を送る
最後に 1000ms の遅延を入れてますが、これは平行にプロセスが走ってて :ok
がログ出力に挟まれたりして見苦しいので入れたものです。なくても動作には関係ないです。
defmodule Main do
require Logger
def start(regname) do
{:ok, _pub} = Registry.start_link(keys: :duplicate, name: regname)
end
def test(regname \\ :my_pubsub) do
start(regname)
PubSub.subscribe(regname, "subscriber_A", ["afo"])
PubSub.subscribe(regname, "subscriber_B", ["bar"])
PubSub.subscribe(regname, "subscriber_C", ["afo", "bar"])
PubSub.subscribe(regname, "subscriber_Z", [])
PubSub.publish(regname, "afo", "hello afo")
PubSub.publish(regname, "bar", "hello bar")
PubSub.publish(regname, "foo", "hello foo")
Process.sleep(1000)
end
end
なお、Registry の動作速度を改善するのに起動時に以下のように partitions:
オプションも使えます。この場合、プロセッサの数だけ並列に動きます。今回はしょぼい例なので体感では全く変化がありません。
{:ok, _pub} = Registry.start_link(keys: :duplicate, name: regname, partitions: System.schedulers_online())
実行してみる
iex で試験してみます。2つのファイルをコンパイルして Main.test/0
関数を実行するとこうなります。
$ iex
Erlang/OTP 22 [erts-10.6.2] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:1] [hipe] [dtrace]
Interactive Elixir (1.10.0) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> c "pubsub.ex"
[PubSub]
iex(2)> c "main.ex"
[Main]
iex(3)> Main.test()
02:22:13.706 [debug] #PID<0.139.0>: subscriber_A
02:22:13.708 [debug] register topic afo for subscriber_A
02:22:13.708 [debug] #PID<0.140.0>: subscriber_B
02:22:13.708 [debug] register topic bar for subscriber_B
02:22:13.708 [debug] #PID<0.141.0>: subscriber_C
02:22:13.708 [debug] register topic afo for subscriber_C
02:22:13.708 [debug] register topic bar for subscriber_C
02:22:13.708 [debug] #PID<0.142.0>: subscriber_Z
02:22:13.710 [debug] #PID<0.139.0>: subscriber_A gets "hello afo"
02:22:13.710 [debug] #PID<0.141.0>: subscriber_C gets "hello afo"
02:22:13.710 [debug] #PID<0.141.0>: subscriber_C gets "hello bar"
02:22:13.710 [debug] #PID<0.140.0>: subscriber_B gets "hello bar"
:ok
iex(4)>
ログ出力の最初の8行が subscriber ごとに topic を登録しているところ、その後の4行が publisher のネタごとに subscriber がメッセージを受け取っているところを示しています。
Main.test/1
関数は上の様に引数なしの場合は Registry を :my_pubsub という名前で実行します。異なるレジストリ名を ATOM で引数に与えて起動すると異なる Registry ができます。これは独立したネタ広場ができることに相当します。
まとめ
可能な限り簡単な例で Registry ライブラリによる Pub/Sub の説明をしてみました。
Pub/Sub でいろいろなことができそうなので引き続き研究を続けたいと思います。(引き続き続けるって馬から落馬か)
- Publisher もプロセスにする(今だと Pub 側と Sub 側で非対称なのがどうも気になる)
- Subscriber(GenServer によるプロセス)が複数の Registry に登録できるようにする
- ネタの広場を広げた時に Subscriber が複数の広場を同時に聞けるようにする
- Pub/Sub を I/O の制御に利用する(1年前から言ってる…)
謝辞
私が Elixir を使おうと決めるときから サッポロビーム には大変お世話になっております。今回の内容は サッポロビーム#310 で実施して記事を起こしたものです。また、Elixir標準ライブラリRegistryを使ったPub/Sub で使ってる Supervisor.Spec が deprecated に分類されてしまったので、サッポロビーム#307 の懇親会で「書き直して」とお願いしたところ最新版に書き直してもらえました。
@niku さんをはじめとするサッポロビームのみなさんに感謝いたします。
参考文献
- Elixir Registry
- Erlang Registered Processes
- Erlang global
- Erlang gproc
- Erlang pg2
- Erlang syn(v2)
- Elixir Swarm
- Qiita: Elixir標準ライブラリRegistryを使ったPub/Sub
- Qiita: 階段の上でも下でも電灯を点けたり消したりする
- Qiita: Registry について
- Qiita: はじめてなElixir(0)
- Qiita: はじめてNerves(0) ElixirによるIoTフレームワークNervesがとにかく動くようになるためのリンク集
- クラウドと組込との接点を求めて ITRC RICC/PIoT 2019 講演スライド
-
いつか pros/cons を整理して記事にしてみたいと考えています。乞うご期待。 ↩