9
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

はじめてな Elixir(29) Registry で Pub/Sub する

Last updated at Posted at 2020-02-14

今回は昨年2019年にどうにもうまく手懐けられなかった Registry による Pub/Sub をやってみます。一昨年2018年の fukuoka.ex Elixir/Phoenix Advent Calendar 2018 の記事 階段の上でも下でも電灯を点けたり消したりする ではじめて Registry を使って以来、何とかちゃんと理解してうまいこと使ってみたいと思いつつ1年以上が経過してしまいました。

Registry とは

Elixir や Erlang ではプロセスに名前をつけることができます。名前の付け方にはおおよそ以下の種類があります。

  • 名前を付けない: {:ok, pid} = ... でプロセスを起動して pid を持ち回る
  • 決め打ちで名前を付ける:プロセスの起動の際に name: :my_process_name などとやる
  • それなりに自由に名前を付ける:プロセスを登録するレジストリライブラリを用いる

最後のプロセスを登録するライブラリというのには沢山種類があります。「プログラミング Elixir」の15.2章「プロセスの名前付け」には Erlang の :global ライブラリを用いてプロセスに名前を付ける方法が載っています。Elixir には Registry というライブラリがあり、これをプロセスの名前付けに使えます。この他、Erlang の register 関数, Erlang ライブラリの gprop, pg2, syn, Elixir ライブラリの swarm … といくつも乱立してます。1

Elixir の Registry

プロセスの名前を付けるライブラリには、「名前を付けたプロセスに対して何かのアクションを行う」という関数があります。全部調べたわけではないですが、おそらくどのライブラリも持っているでしょう。この「名前を付けたプロセスに対して何かのアクションを行う」動作は、うまいこと使うと Pub/Sub の機構を実現できます。

Elixir の Registry にも Using as a Pub/Sub という説明があって簡単な例が載っています。しかしこれが分かりにくい。これはいきなりここを読むのではなくその前の節の Using as a Dispatcher から読むとまだ分かるのですが、それでもやっぱりチト辛い。

じゃあ日本語の解説はないのかと言うとそんなことはなくてちゃんと @niku さんが Elixir標準ライブラリRegistryを使ったPub/Sub という記事を書いてくれてます。これは Supervisor それに DyanamicSupervisor も使ったちゃんとした例が載っています。

なので今回の私の記事は二番煎じなんですが、プロセスツリーが複雑だと元の Pub/Sub がどのように実現されているか分かりにくくなるきらいがあるので、できるだけシンプルに纏めるのを試みるのが今回の記事です。

Registry での Pub/Sub の実現

Pub/Sub は情報交換のモデルの一つです。情報を発信する側と受信する側では共通のネタ (topic) を持っています。しかしながら、どの受信者がどのネタを欲しがってるのかは発信者はわからないものとします。

今回は(というか実現の方法は一択と思いますが) Subscriber ごとにプロセスを立てて、それを Registry に登録することをします。Publish は Registry に対して刺激を与え、Registry がネタに従って適切な Subscriber へメッセージを伝達するという振る舞いをします。

  • publisher: 情報を提供する側:特定の topic に対する情報を出す
  • ネタとそれに関するメッセージを発信する
  • subscriber: 情報を貰う側:特定の topic に対する情報をもらう
  • subscriber の名前と、興味のある topic(複数可、ゼロでも可)を持つ

最初に subscriber の登録をします。Registry プロセスがすでに起動されているものとして以下をsubscriber の数だけ行います。

  • GenServer で subscriber 単位のプロセスを起動する
  • topic ごとに Registry.register/3 関数で登録をする

登録がある状態で publisher が情報を発信すると、Registry がよろしく subscriber に情報を渡します。

  • Registry.dispatch/3 関数で、当該ネタで登録している subscriber プロセスに対してコールバック関数を適用する
  • 今回は GenServer.cast/2 関数でログ出力する
  • ここに具体的なアクションを書けば subscriber ごとの動作を記述できる

実際のプログラムは以下です。

pubsub.ex
defmodule PubSub do
  require Logger
  @behaviour GenServer

  def subscribe(regname, subname, topics) do
    GenServer.start_link(__MODULE__, {regname, subname, topics})
  end

  def init({regname, subname, topics}) do
    Logger.debug("#{inspect(self())}: #{subname}")
    for topic <- topics do
      Registry.register(regname, topic, subname)
      Logger.debug("register topic #{topic} for #{subname}")
    end
    {:ok, topics}
  end

  def publish(regname, topic, mes) do
    Registry.dispatch(regname, topic,
      fn entries ->
        for {pid, subname} <- entries, do: GenServer.cast(pid, {subname, mes})
      end)
  end

  def handle_cast({subname, mes}, topics) do
    Logger.debug("#{inspect(self())}: #{subname} gets #{inspect(mes)}")
    {:noreply, topics}
  end
end

これを使って実際に Pub/Sub をしてみます。

  • まず Registry を Registry.start_link/0 関数で起動する(Registry も一つのプロセスです)
  • 1つのキーで複数のプロセスを登録するオプション keys: :duplicate を指定する
  • 以下を subscriber として登録
  • A はネタ afo に関する情報が欲しい
  • B はネタ bar に関する情報が欲しい
  • C はネタ afo に関する情報と bar と両方が欲しい
  • Z は特になんのネタに関する情報も欲しくない
  • この状態で publish を実行
  • ネタ afo に関する情報が欲しい相手に対して "hello afo" を送る
  • ネタ bar に関する情報が欲しい相手に対して "hello bar" を送る
  • ネタ foo に関する情報が欲しい相手に対して "hello foo" を送る

最後に 1000ms の遅延を入れてますが、これは平行にプロセスが走ってて :ok がログ出力に挟まれたりして見苦しいので入れたものです。なくても動作には関係ないです。

main.ex
defmodule Main do
  require Logger
  
  def start(regname) do
    {:ok, _pub} = Registry.start_link(keys: :duplicate, name: regname)
  end

  def test(regname \\ :my_pubsub) do
    start(regname)
    PubSub.subscribe(regname, "subscriber_A", ["afo"])
    PubSub.subscribe(regname, "subscriber_B", ["bar"])
    PubSub.subscribe(regname, "subscriber_C", ["afo", "bar"])
    PubSub.subscribe(regname, "subscriber_Z", [])
    PubSub.publish(regname, "afo", "hello afo")
    PubSub.publish(regname, "bar", "hello bar")
    PubSub.publish(regname, "foo", "hello foo")
    Process.sleep(1000)
  end
end

なお、Registry の動作速度を改善するのに起動時に以下のように partitions: オプションも使えます。この場合、プロセッサの数だけ並列に動きます。今回はしょぼい例なので体感では全く変化がありません。

{:ok, _pub} = Registry.start_link(keys: :duplicate, name: regname, partitions: System.schedulers_online())

実行してみる

iex で試験してみます。2つのファイルをコンパイルして Main.test/0 関数を実行するとこうなります。

$ iex
Erlang/OTP 22 [erts-10.6.2] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:1] [hipe] [dtrace]

Interactive Elixir (1.10.0) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> c "pubsub.ex"
[PubSub]
iex(2)> c "main.ex"
[Main]
iex(3)> Main.test()

02:22:13.706 [debug] #PID<0.139.0>: subscriber_A
 
02:22:13.708 [debug] register topic afo for subscriber_A
 
02:22:13.708 [debug] #PID<0.140.0>: subscriber_B
 
02:22:13.708 [debug] register topic bar for subscriber_B
 
02:22:13.708 [debug] #PID<0.141.0>: subscriber_C
 
02:22:13.708 [debug] register topic afo for subscriber_C

02:22:13.708 [debug] register topic bar for subscriber_C
 
02:22:13.708 [debug] #PID<0.142.0>: subscriber_Z
 
02:22:13.710 [debug] #PID<0.139.0>: subscriber_A gets "hello afo"
 
02:22:13.710 [debug] #PID<0.141.0>: subscriber_C gets "hello afo"

02:22:13.710 [debug] #PID<0.141.0>: subscriber_C gets "hello bar"
 
02:22:13.710 [debug] #PID<0.140.0>: subscriber_B gets "hello bar"
:ok
iex(4)> 

ログ出力の最初の8行が subscriber ごとに topic を登録しているところ、その後の4行が publisher のネタごとに subscriber がメッセージを受け取っているところを示しています。

Main.test/1 関数は上の様に引数なしの場合は Registry を :my_pubsub という名前で実行します。異なるレジストリ名を ATOM で引数に与えて起動すると異なる Registry ができます。これは独立したネタ広場ができることに相当します。

まとめ

可能な限り簡単な例で Registry ライブラリによる Pub/Sub の説明をしてみました。

Pub/Sub でいろいろなことができそうなので引き続き研究を続けたいと思います。(引き続き続けるって馬から落馬か)

  • Publisher もプロセスにする(今だと Pub 側と Sub 側で非対称なのがどうも気になる)
  • Subscriber(GenServer によるプロセス)が複数の Registry に登録できるようにする
  • ネタの広場を広げた時に Subscriber が複数の広場を同時に聞けるようにする
  • Pub/Sub を I/O の制御に利用する(1年前から言ってる…)

謝辞

私が Elixir を使おうと決めるときから サッポロビーム には大変お世話になっております。今回の内容は サッポロビーム#310 で実施して記事を起こしたものです。また、Elixir標準ライブラリRegistryを使ったPub/Sub で使ってる Supervisor.Spec が deprecated に分類されてしまったので、サッポロビーム#307 の懇親会で「書き直して」とお願いしたところ最新版に書き直してもらえました。

@niku さんをはじめとするサッポロビームのみなさんに感謝いたします。

参考文献

  1. いつか pros/cons を整理して記事にしてみたいと考えています。乞うご期待。

9
2
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?