20
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

はじめてな Elixir(33) Syn (v3.3) で Pub/Sub する

Last updated at Posted at 2022-12-10

これは #Elixir Advent Calendar 2022 の5日目です。昨日は alicesky 2127 さんの WEB+DB PRESS vol131 特集『はじめてのElixir』をほんとの初心者がやってみた- 第1章 でした。

はじめに

Elixir をはじめてからはじめてなElixir(0)シリーズをポツポツと書き溜めてました。これが はじめてな Elixir(32) Syn (v2.1) で Pub/Sub する まで書いたところで止まってて、久しぶりの追加です。

久しぶりというと、この記事は Moxy Vienna airport というホテルのオープンロビーといういうべき場所で書いてます。もう4年も前、まだ GenServer の入門したてだったころ、はじめてな Elixir(19) GenServer で定期的なお仕事をする(まだ終わってない編) という記事をここで完成させました。電源もネットもあって、内装に工夫の凝らされた余裕のある大きな空間に小気味よい音楽が流れててハックするにはご機嫌な環境です。天井には Elixir のシンボルカラーのラインも入っていて我々を歓迎しているかのようです。

IMG_6794.jpg

さて、出版/購読型 (Pub/Sub) によるシステムの記述はIoTやFAで重要な通信モデルです。Elixir/Erlang ではプロセス登録のメカニズムで実現することができ、これまでいくつかのパターンを試してきました。Syn というライブラリでも実施することができて、これが ver. 2 から ver. 3 になっていたので、今回はそちらを試してみることにします。この記事を書いている時点での syn の最新バージョンは 3.3 です。

なぜ syn なのか

Syn はもともと Pub/Sub 用に開発されたライブラリではありません。Registry や Swarm 同様にプロセスレジストリとして作られています。ベースは Erlang 用のライブラリとして作られており、ドキュメントには Elixir から使う方法も併記してあります。1

これまで Registry, Swarm, Syn, GenStage で Pub/Sub を試して来てます。Registry がホストマシンをまたがった管理ができないこと、GenStage は通常の Pub/Sub とは異なる使い方になることより、私の中では Swarm v.s. Syn という構図になってました。久しぶりに両者を見てみると swarm はメンテが止まっているように見えます。これに対して syn が継続してメンテされていてメジャーバージョンも上がっています。ということで、新しい syn を試す次第です。

syn の新しい機能の利点

Syn のドキュメントの先頭に能書きが書いてあります。特徴的なのは Cluster の概念が入ったことです。これは Pub/Sub の空間を指し、複数個作ることができます。それぞれの Cluster には scope と呼ばれる名前をつけて区別します。これにより一つの syn で複数の独立した Pub/Sub 空間を構成することが可能になりました。

今回は簡単のため scope は唯一つの :universe で試してます。ですので、新しい機能の嬉しさはこの記事ではわかりません。あくまで Syn 3.3 で Pub/Sub ができることを試してます。あとこの新しい機能は、残念なデメリットも持ち込んでます。それは、この記事の最後に述べます。

準備

以下では最初に Pub/Sub を使いたいと思う元になるモジュールを仮定します。そのうえで、簡単な Pub/Sub をやってみて、その後で若干複雑にした Pub/Sub をやってみます。

Syn をインストールする

Syn のインストールをするには、以下を mix.exs に追加して mix deps.get してください。バージョンは 3.3 以上で。

mix.exs
  defp deps do
    [
      {:syn, "~> 3.3"},
    ]

Pub/Sub させたいモジュール

まず、もともとこういうモジュールがあったと仮定します。ここではまだ syn は登場してません。

defmodule Syn3PubSub do
  require Logger
  use GenServer

  def start_link(pname) do
      GenServer.start_link(__MODULE__, pname)
  end

  @impl GenServer
  def init(pname) do
      {:ok, [pname]} # 状態としてプロセス名のリストが初期状態とする。
  end

おそらく以下のような感じでアクションがあったときの処理が書いてることでしょう。

  @impl GenServer
  def handle_call(arg, _from, state) do
    # GenServer.call の動作の中身
    {:reply, return_val, new_state)
  end

  @impl GenServer
  def handle_cast(arg, state) do
    # GenServer.cast の動作の中身
    {:noreply, new_state}
  end

  @impl GenSesrver
  def handle_info(message, state) do
    # プロセスにメッセージが来たときの動作の中身
    {:noreply, new_state}
  end

というようなモジュールがあったものとして以下を続けます。

簡単な Pub/Sub を作ってみる

上述のモジュールに Pub/Sub の機構を取り込んでみましょう。定義に以下を追加します。

defmodule Syn3PubSub do
  require Logger
  use GenServer

  @scope :universe # 簡単のため scope は :universe しかないものとする。

  def start_link(pname, pub_topics_list, sub_topics_list) do
      # プロセス名、Pub のトピックのリスト、Sub のトピックのリスト
      Logger.info("#{__MODULE__}.start_link: #{pname}, #{inspect(pub_topics_list)}, #{inspect(sub_topics_list)}")
      GenServer.start_link(__MODULE__, {pname, pub_topics_list, sub_topics_list})
  end

  @impl GenServer
  def init({pname, pub_topics_list, sub_topics_list}) do
      Logger.info("#{__MODULE__}.init: #{pname}")
      pid = self()
      :syn.register(@scope, pname, pid, pub_topics_list)
      # プロセスを登録する。その際にメタ情報として Pub 用のトピックリストを入れておく。
      Enum.map(sub_topics_list, &(:syn.join(@scope, &1, pid)))
      # Sub 用のトピックリストからトピックを取り出し、プロセスグループとして登録しておく。
      {:ok, [pname]}
  end

  def publish(pname, message) do # Publish する関数
      {_pid, pub_topics_list} = :syn.lookup(@scope, pname) # Pub 用のトピックリストを得る
      Enum.map(pub_topics_list, &(:syn.publish(@scope, &1, {:pubsub, message}))) # メッセージをトピック名のプロセスグループに Publish する。
      Logger.info("#{__MODULE__}, #{pname}, pub: #{inspect(pub_topics_list)} #{message}")
  end

  @impl GenServer
  def handle_info({:pubsub, message}, [pname]) do # Pub されたメッセージを受け取る
      Logger.info("#{__MODULE__}, #{pname}, sub: #{inspect(message)}")
      # ここに Pub したときの動作を記述する。ここでは Logger でメッセージを出すのみ。
      publish(pname, message) # Subscribe したメッセージをさらに Publish する。
      {:noreply, [pname]}
  end
end

ここで定義したのは syn で Pub/Sub をさせるための関数です。

元々あった関数の Pub/Sub 対応

元のモジュールにあった以下の関数はそのまま入れておきます。ただし、状態の引き回しを state で持ち回るところは pname として入れてあるプロセス名を持ち回るように書いておいて下さい。Pub/Sub として必要なのはこのプロセス名だけで、それ以外はプロセスの動作に必要な状態をもたせておくようにします。

  @impl GenServer
  def handle_call(arg, _from, state) do
    # GenServer.call の動作の中身をそのまま書いておく
    {:reply, return_val, new_state)
  end

  @impl GenServer
  def handle_cast(arg, state) do
    # GenServer.cast の動作の中身をそのまま書いておく
    {:noreply, new_state}
  end

  @impl GenSesrver
  def handle_info(message, state) do
    # プロセスにメッセージが来たときの動作の中身をそのまま書いておく
    {:noreply, new_state}
  end

なお、今回の syn による Pub/Sub の動作を観てみるだけなら、以上の関数群は特になくても試せます。

実際にいごかしてみる

まず最初に scope を登録します。この記事では簡単のため全部 :universe にしてあります。

  :syn.add_node_to_scopes([:universe])

:ok が返って来ます。

次に publisher プロセスを起動します。引数はプロセス名と publish するときのトピックのリストです。ここでは subscribe のトピックリストを空にしているので、これらのプロセスは Publish しかしないようになってます。ちなみにここではプロセス名もトピックも文字列にしていますが、アトムでも他のデータ型の値でも構いません。これは Syn がキーワードとして任意の型を使えることに依存しています。

    Syn3PubSub.start_link("pub_a", ["afo"], [])
    Syn3PubSub.start_link("pub_b", ["bar"], [])
    Syn3PubSub.start_link("pub_c", ["bar", "foo"], [])

次に subscriber プロセスを起動します。引数はプロセス名と Subscribe するときのトピックのリストです。こんどは Publish のトピックリストを空にしているので、これらのプロセスは Subscribe しかしないようになります。

    Syn3PubSub.start_link("sub_x", [], ["afo"])
    Syn3PubSub.start_link("sub_y", [], ["afo", "foo"])
    Syn3PubSub.start_link("sub_z", [], ["zugan"])

さらに Publish も Subscribe もするプロセスも記述可能です。以下の例では、トピック "foo" でメッセージを受け取ると、同じメッセージをトピック "zugan" で Publish するようなプロセスになります。

    Syn3PubSub.start_link("relay", ["zugan"], ["foo"])

以上でプロセスを走らせた上で、Publisher からメッセージを Publish させてみてください。

    Syn3PubSub.publish("pub_a", "hello")
    Syn3PubSub.publish("pub_b", "world")
    Syn3PubSub.publish("pub_c", "everyone")

よりシンプルにしてみる

以上では説明のためにごちゃごちゃと書いてます。これあgどれぐらい簡単に Pub/Sub を記述できるかをわかってもらうために、以下に上述のモジュールをひとまとめにして、ログ出力も極力へらしたプログラムに書き換えてみます。

defmodule Syn3PubSub do
  require Logger
  use GenServer

  @scope :universe

  def start_link(pname, pub_topics_list, sub_topics_list) do
      GenServer.start_link(__MODULE__, {pname, pub_topics_list, sub_topics_list})
  end

  @impl GenServer
  def init({pname, pub_topics_list, sub_topics_list}) do
      pid = self()
      :syn.register(@scope, pname, pid, pub_topics_list)
      Enum.map(sub_topics_list, &(:syn.join(@scope, &1, pid)))
      {:ok, [pname]}
  end

  def publish(pname, message) do
      {_pid, pub_topics_list} = :syn.lookup(@scope, pname)
      Enum.map(pub_topics_list, &(:syn.publish(@scope, &1, {:pubsub, message})))
  end

  @impl GenServer
  def handle_info({:pubsub, message}, [pname]) do
      Logger.info("#{__MODULE__}, #{pname}, sub: #{inspect(message)}")
      publish(pname, message)
      {:noreply, [pname]}
  end

  def test do
    :ok = :syn.add_node_to_scopes([@scope])
    Syn3PubSub.start_link("pub_a", ["afo"], [])
    Syn3PubSub.start_link("pub_b", ["bar"], [])
    Syn3PubSub.start_link("pub_c", ["bar", "foo"], [])
    Syn3PubSub.start_link("sub_x", [], ["afo"])
    Syn3PubSub.start_link("sub_y", [], ["afo", "foo"])
    Syn3PubSub.start_link("sub_z", [], ["zugan"])
    Syn3PubSub.start_link("relay", ["zugan"], ["foo"])

    Syn3PubSub.publish("pub_a", "hello")
    Process.sleep(100)
    Syn3PubSub.publish("pub_b", "world")
    Process.sleep(100)
    Syn3PubSub.publish("pub_c", "everyone")
  end
end

じっさいにいごかしてみる

上のプログラムを実行してみましょう。

❯❯❯ iex -S mix
Erlang/OTP 25 [erts-13.0.3] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [jit]

Compiling 1 file (.ex)
Interactive Elixir (1.14.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> Syn3PubSub.test                     

03:10:12.648 [notice] SYN[nonode@nohost] Adding node to scope <universe>
 
03:10:12.658 [notice] SYN[nonode@nohost] Creating tables for scope <universe>
 
03:10:12.661 [notice] SYN[nonode@nohost|registry<universe>] Discovering the cluster
 
03:10:12.663 [notice] SYN[nonode@nohost|pg<universe>] Discovering the cluster
 
03:10:12.665 [info] Elixir.Syn3PubSub, sub_x, sub: "hello"
 
03:10:12.665 [info] Elixir.Syn3PubSub, sub_y, sub: "hello"
 
03:10:12.865 [info] Elixir.Syn3PubSub, sub_y, sub: "everyone"
 
03:10:12.865 [info] Elixir.Syn3PubSub, relay, sub: "everyone"

03:10:12.865 [info] Elixir.Syn3PubSub, sub_z, sub: "everyone"
[ok: 0, ok: 2]
iex(2)> 

もう一つの実行例

以下は前回 Syn v2.1 の最初の例を今回のバージョンに移植した場合のコードです。

defmodule PubSub do
    require Logger
    use GenServer

    @scope :universe

    def subscriber(name) do
        GenServer.start_link(__MODULE__, name)
    end

    @impl GenServer
    def init(name) do
        Logger.info("#{__MODULE__}.init: start for #{inspect(name)}")
        :syn.register(@scope, name, self())
        {:ok, name}
    end

    @impl GenServer
    def handle_info(message, name) do
        Logger.info("#{__MODULE__} #{inspect(name)} gets #{inspect(message)}")
        {:noreply, name}
    end

    def register_topics(subscriber, topics_list) do
        {pid, _meta} = :syn.lookup(@scope, subscriber)
        Enum.map(topics_list, &(:syn.join(@scope, &1, pid)))
    end

    def publish(topic, message) do
        :syn.publish(@scope, topic, message)
    end

    def test do
        :syn.add_node_to_scopes([@scope])
        subscriber("sub_a")
        subscriber("sub_b")
        subscriber("sub_c")

        Process.sleep(1000)

        PubSub.register_topics("sub_a", ["afo"])
        PubSub.register_topics("sub_b", ["bar"])
        PubSub.register_topics("sub_c", ["afo", "bar"])

        Process.sleep(1000)
        PubSub.publish("afo", "hello")
        PubSub.publish("bar", "world")
        PubSub.publish("foo", "good bye")
    end
end

これを実行してみます。

iex(1)> PubSub.test

17:01:23.536 [notice] SYN[nonode@nohost] Adding node to scope <universe>
17:01:23.563 [notice] SYN[nonode@nohost] Creating tables for scope <universe>
17:01:23.567 [notice] SYN[nonode@nohost|registry<universe>] Discovering the cluster
17:01:23.569 [notice] SYN[nonode@nohost|pg<universe>] Discovering the cluster
17:01:23.573 [info] Elixir.PubSub.init: start for "sub_a"
17:01:23.573 [info] Elixir.PubSub.init: start for "sub_b"
17:01:23.573 [info] Elixir.PubSub.init: start for "sub_c"
17:01:25.575 [info] Elixir.PubSub "sub_a" gets "hello"
17:01:25.575 [info] Elixir.PubSub "sub_c" gets "hello"
17:01:25.575 [info] Elixir.PubSub "sub_c" gets "world"
17:01:25.575 [info] Elixir.PubSub "sub_b" gets "world"
{:ok, 0}
iex(2)> 

Subscriber 側で Publisher を知りたい

上の例では Subscriber がどの Publisher からのメッセージなのかを知る方法がありません。それをやるためには、Publilsher 自身がメッセージに「送り手が誰か」を入れておくようなプログラミングが必要になります。これはちょっと不便なので Pub/Sub の機構自体が送り手の情報を伝えられるようにしてみます。

メッセージとは別に送り手の情報も入れる

送り手が誰かを伝えるためには publish/2handle_info に少しの手を加えることで実現できます。

defmodule Syn3PubSub do
  require Logger
  use GenServer

  @scope :universe

  def start_link(pname, pub_topics_list, sub_topics_list) do
      # Logger.info("#{__MODULE__}.start_link: #{pname}, #{inspect(pub_topics_list)}, #{inspect(sub_topics_list)}")
      GenServer.start_link(__MODULE__, {pname, pub_topics_list, sub_topics_list})
  end

  @impl GenServer
  def init({pname, pub_topics_list, sub_topics_list}) do
      # Logger.info("#{__MODULE__}.init: #{pname}")
      pid = self()
      :syn.register(@scope, pname, pid, pub_topics_list)
      Enum.map(sub_topics_list, &(:syn.join(@scope, &1, pid)))
      {:ok, [pname]}
  end

  def publish(pname, message) do
      {_pid, pub_topics_list} = :syn.lookup(@scope, pname)
      Enum.map(pub_topics_list, &(:syn.publish(@scope, &1, {:pubsub, pname, message})))
      # Logger.info("#{__MODULE__}, #{pname}, pub: #{inspect(pub_topics_list)} #{message}")
  end

  @impl GenServer
  def handle_info({:pubsub, publisher, message}, [pname]) do
      Logger.info("#{__MODULE__}, #{pname} gets #{inspect(message)} from #{publisher}")
      publish(pname, message)
      {:noreply, [pname]}
  end

  def test do
    :ok = :syn.add_node_to_scopes([@scope])
    Syn3PubSub.start_link("pub_a", ["afo"], [])
    Syn3PubSub.start_link("pub_b", ["bar"], [])
    Syn3PubSub.start_link("pub_c", ["bar", "foo"], [])
    Syn3PubSub.start_link("sub_x", [], ["afo"])
    Syn3PubSub.start_link("sub_y", [], ["afo", "foo"])
    Syn3PubSub.start_link("sub_z", [], ["zugan"])
    Syn3PubSub.start_link("relay", ["zugan"], ["foo"])

    Syn3PubSub.publish("pub_a", "hello")
    Process.sleep(100)
    Syn3PubSub.publish("pub_b", "world")
    Process.sleep(100)
    Syn3PubSub.publish("pub_c", "everyone")
  end
end

実際にいごかしてみる。

同じ例で動かすと、メッセージ発信元の publisher がだれかを subscriber が分かるようになってるのが分かります。

iex(1)> Syn3PubSub.test

09:09:00.817 [notice] SYN[nonode@nohost] Adding node to scope <universe>
09:09:00.832 [notice] SYN[nonode@nohost] Creating tables for scope <universe>
09:09:00.836 [notice] SYN[nonode@nohost|registry<universe>] Discovering the cluster
09:09:00.843 [notice] SYN[nonode@nohost|pg<universe>] Discovering the cluster
09:09:00.845 [info] Elixir.Syn3PubSub, sub_x gets "hello" from pub_a
09:09:00.845 [info] Elixir.Syn3PubSub, sub_y gets "hello" from pub_a
09:09:01.045 [info] Elixir.Syn3PubSub, sub_y gets "everyone" from pub_c
09:09:01.045 [info] Elixir.Syn3PubSub, relay gets "everyone" from pub_c
09:09:01.045 [info] Elixir.Syn3PubSub, sub_z gets "everyone" from relay
[ok: 0, ok: 2]
iex(2)> 

実際の Pub/Sub を用いたプログラミングにおいて、メッセージを受け取ったときに publisher ごとに制御を分けるのであれば、一旦 handle_info で subscribe してから publisher ごとで分岐することもできます。ただ handle_infopublisher 仮引数に「受け取りたい相手の publisher 名」を記述することで、パターンマッチで動作を記述することもできます。後者の方がより Elixir らしい記法ですね。

syn の新しい機能の欠点

今回は試しませんでしたが、新しい syn の機能として scope の概念が持ち込まれました。これを使えば、互いに干渉しない Pub/Sub 空間を難なく導入することができます。

ただしこれは残念なデメリットも持ち込んでます。GenServer はプロセス登録に外部ライブラリを使えるようになっています。これを利用して syn ver. 2 では、プロセスの起動時に以下のようなシンプルな記述方法が可能でした。こう書くだけでプロセス登録のメカニズムの syn にプロセス名を登録できます。

GenServer.start_link(__MODULE__, name, name: {:via, :syn, name})

ところがこの記法はなんと ver. 3 では使えません。使うと直ちにエラーします。この外部ライブラリを使う場合のドキュメントGenServer: Name registrationには以下の記載があります。

  • The :via option expects a module that exports register_name/2, unregister_name/1, whereis_name/1 and send/2.

これらの関数が ver. 2 にはあったのですが、ver. 3 ではなくなってます。これと近い関数が register/3, unregister/2, lookup/2, publish/3 としてあるのですが、全部引数が一つ多いです。これ scope を引数に取るためです。このため syn ver. 3 を使ったプロセスの登録は GenServer.start_link ではやることができず init 関数内で :syn.register 関数を呼んでやるしかなくなります。

これパッと見た感じで、scope を使う必要がない場合に引数を省略できるように register/2, unregister/1, lookup/1, publish/2 を作ってやって、それを register_name/2, unregister_name/1, whereis_name/1, send/2 としてやればそのままうまく GenServer.start_link のオプションとしていごきそうな気がするのですが、まだ試してみてません。機会があったらやってみたいと思ってます。

まとめ

Syn の新しいバージョン 3.3 での Pub/Sub をやってみました。これまで同様に Pub/Sub の機能が使えることの確認をしました。

さて、明日の #Elixir Advent Calendar 2022 の記事は 山口 大輝 さんの Stable Diffusion(画像生成AI)をローカルインストールしてみた です。お楽しみに!

参考文献

  1. 要は毎度 Erlang の関数を呼び出す形になります。個人的にはあまり好きではないスタイルです。

20
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
20
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?