これは #Elixir Advent Calendar 2022 の5日目です。昨日は alicesky 2127 さんの WEB+DB PRESS vol131 特集『はじめてのElixir』をほんとの初心者がやってみた- 第1章 でした。
はじめに
Elixir をはじめてからはじめてなElixir(0)シリーズをポツポツと書き溜めてました。これが はじめてな Elixir(32) Syn (v2.1) で Pub/Sub する まで書いたところで止まってて、久しぶりの追加です。
久しぶりというと、この記事は Moxy Vienna airport というホテルのオープンロビーといういうべき場所で書いてます。もう4年も前、まだ GenServer の入門したてだったころ、はじめてな Elixir(19) GenServer で定期的なお仕事をする(まだ終わってない編) という記事をここで完成させました。電源もネットもあって、内装に工夫の凝らされた余裕のある大きな空間に小気味よい音楽が流れててハックするにはご機嫌な環境です。天井には Elixir のシンボルカラーのラインも入っていて我々を歓迎しているかのようです。
さて、出版/購読型 (Pub/Sub) によるシステムの記述はIoTやFAで重要な通信モデルです。Elixir/Erlang ではプロセス登録のメカニズムで実現することができ、これまでいくつかのパターンを試してきました。Syn というライブラリでも実施することができて、これが ver. 2 から ver. 3 になっていたので、今回はそちらを試してみることにします。この記事を書いている時点での syn の最新バージョンは 3.3 です。
なぜ syn なのか
Syn はもともと Pub/Sub 用に開発されたライブラリではありません。Registry や Swarm 同様にプロセスレジストリとして作られています。ベースは Erlang 用のライブラリとして作られており、ドキュメントには Elixir から使う方法も併記してあります。1
これまで Registry, Swarm, Syn, GenStage で Pub/Sub を試して来てます。Registry がホストマシンをまたがった管理ができないこと、GenStage は通常の Pub/Sub とは異なる使い方になることより、私の中では Swarm v.s. Syn という構図になってました。久しぶりに両者を見てみると swarm はメンテが止まっているように見えます。これに対して syn が継続してメンテされていてメジャーバージョンも上がっています。ということで、新しい syn を試す次第です。
syn の新しい機能の利点
Syn のドキュメントの先頭に能書きが書いてあります。特徴的なのは Cluster
の概念が入ったことです。これは Pub/Sub の空間を指し、複数個作ることができます。それぞれの Cluster には scope
と呼ばれる名前をつけて区別します。これにより一つの syn で複数の独立した Pub/Sub 空間を構成することが可能になりました。
今回は簡単のため scope
は唯一つの :universe
で試してます。ですので、新しい機能の嬉しさはこの記事ではわかりません。あくまで Syn 3.3 で Pub/Sub ができることを試してます。あとこの新しい機能は、残念なデメリットも持ち込んでます。それは、この記事の最後に述べます。
準備
以下では最初に Pub/Sub を使いたいと思う元になるモジュールを仮定します。そのうえで、簡単な Pub/Sub をやってみて、その後で若干複雑にした Pub/Sub をやってみます。
Syn をインストールする
Syn のインストールをするには、以下を mix.exs
に追加して mix deps.get
してください。バージョンは 3.3 以上で。
defp deps do
[
{:syn, "~> 3.3"},
]
Pub/Sub させたいモジュール
まず、もともとこういうモジュールがあったと仮定します。ここではまだ syn は登場してません。
defmodule Syn3PubSub do
require Logger
use GenServer
def start_link(pname) do
GenServer.start_link(__MODULE__, pname)
end
@impl GenServer
def init(pname) do
{:ok, [pname]} # 状態としてプロセス名のリストが初期状態とする。
end
おそらく以下のような感じでアクションがあったときの処理が書いてることでしょう。
@impl GenServer
def handle_call(arg, _from, state) do
# GenServer.call の動作の中身
{:reply, return_val, new_state)
end
@impl GenServer
def handle_cast(arg, state) do
# GenServer.cast の動作の中身
{:noreply, new_state}
end
@impl GenSesrver
def handle_info(message, state) do
# プロセスにメッセージが来たときの動作の中身
{:noreply, new_state}
end
というようなモジュールがあったものとして以下を続けます。
簡単な Pub/Sub を作ってみる
上述のモジュールに Pub/Sub の機構を取り込んでみましょう。定義に以下を追加します。
defmodule Syn3PubSub do
require Logger
use GenServer
@scope :universe # 簡単のため scope は :universe しかないものとする。
def start_link(pname, pub_topics_list, sub_topics_list) do
# プロセス名、Pub のトピックのリスト、Sub のトピックのリスト
Logger.info("#{__MODULE__}.start_link: #{pname}, #{inspect(pub_topics_list)}, #{inspect(sub_topics_list)}")
GenServer.start_link(__MODULE__, {pname, pub_topics_list, sub_topics_list})
end
@impl GenServer
def init({pname, pub_topics_list, sub_topics_list}) do
Logger.info("#{__MODULE__}.init: #{pname}")
pid = self()
:syn.register(@scope, pname, pid, pub_topics_list)
# プロセスを登録する。その際にメタ情報として Pub 用のトピックリストを入れておく。
Enum.map(sub_topics_list, &(:syn.join(@scope, &1, pid)))
# Sub 用のトピックリストからトピックを取り出し、プロセスグループとして登録しておく。
{:ok, [pname]}
end
def publish(pname, message) do # Publish する関数
{_pid, pub_topics_list} = :syn.lookup(@scope, pname) # Pub 用のトピックリストを得る
Enum.map(pub_topics_list, &(:syn.publish(@scope, &1, {:pubsub, message}))) # メッセージをトピック名のプロセスグループに Publish する。
Logger.info("#{__MODULE__}, #{pname}, pub: #{inspect(pub_topics_list)} #{message}")
end
@impl GenServer
def handle_info({:pubsub, message}, [pname]) do # Pub されたメッセージを受け取る
Logger.info("#{__MODULE__}, #{pname}, sub: #{inspect(message)}")
# ここに Pub したときの動作を記述する。ここでは Logger でメッセージを出すのみ。
publish(pname, message) # Subscribe したメッセージをさらに Publish する。
{:noreply, [pname]}
end
end
ここで定義したのは syn で Pub/Sub をさせるための関数です。
元々あった関数の Pub/Sub 対応
元のモジュールにあった以下の関数はそのまま入れておきます。ただし、状態の引き回しを state で持ち回るところは pname
として入れてあるプロセス名を持ち回るように書いておいて下さい。Pub/Sub として必要なのはこのプロセス名だけで、それ以外はプロセスの動作に必要な状態をもたせておくようにします。
@impl GenServer
def handle_call(arg, _from, state) do
# GenServer.call の動作の中身をそのまま書いておく
{:reply, return_val, new_state)
end
@impl GenServer
def handle_cast(arg, state) do
# GenServer.cast の動作の中身をそのまま書いておく
{:noreply, new_state}
end
@impl GenSesrver
def handle_info(message, state) do
# プロセスにメッセージが来たときの動作の中身をそのまま書いておく
{:noreply, new_state}
end
なお、今回の syn による Pub/Sub の動作を観てみるだけなら、以上の関数群は特になくても試せます。
実際にいごかしてみる
まず最初に scope を登録します。この記事では簡単のため全部 :universe
にしてあります。
:syn.add_node_to_scopes([:universe])
:ok
が返って来ます。
次に publisher プロセスを起動します。引数はプロセス名と publish するときのトピックのリストです。ここでは subscribe のトピックリストを空にしているので、これらのプロセスは Publish しかしないようになってます。ちなみにここではプロセス名もトピックも文字列にしていますが、アトムでも他のデータ型の値でも構いません。これは Syn がキーワードとして任意の型を使えることに依存しています。
Syn3PubSub.start_link("pub_a", ["afo"], [])
Syn3PubSub.start_link("pub_b", ["bar"], [])
Syn3PubSub.start_link("pub_c", ["bar", "foo"], [])
次に subscriber プロセスを起動します。引数はプロセス名と Subscribe するときのトピックのリストです。こんどは Publish のトピックリストを空にしているので、これらのプロセスは Subscribe しかしないようになります。
Syn3PubSub.start_link("sub_x", [], ["afo"])
Syn3PubSub.start_link("sub_y", [], ["afo", "foo"])
Syn3PubSub.start_link("sub_z", [], ["zugan"])
さらに Publish も Subscribe もするプロセスも記述可能です。以下の例では、トピック "foo"
でメッセージを受け取ると、同じメッセージをトピック "zugan"
で Publish するようなプロセスになります。
Syn3PubSub.start_link("relay", ["zugan"], ["foo"])
以上でプロセスを走らせた上で、Publisher からメッセージを Publish させてみてください。
Syn3PubSub.publish("pub_a", "hello")
Syn3PubSub.publish("pub_b", "world")
Syn3PubSub.publish("pub_c", "everyone")
よりシンプルにしてみる
以上では説明のためにごちゃごちゃと書いてます。これあgどれぐらい簡単に Pub/Sub を記述できるかをわかってもらうために、以下に上述のモジュールをひとまとめにして、ログ出力も極力へらしたプログラムに書き換えてみます。
defmodule Syn3PubSub do
require Logger
use GenServer
@scope :universe
def start_link(pname, pub_topics_list, sub_topics_list) do
GenServer.start_link(__MODULE__, {pname, pub_topics_list, sub_topics_list})
end
@impl GenServer
def init({pname, pub_topics_list, sub_topics_list}) do
pid = self()
:syn.register(@scope, pname, pid, pub_topics_list)
Enum.map(sub_topics_list, &(:syn.join(@scope, &1, pid)))
{:ok, [pname]}
end
def publish(pname, message) do
{_pid, pub_topics_list} = :syn.lookup(@scope, pname)
Enum.map(pub_topics_list, &(:syn.publish(@scope, &1, {:pubsub, message})))
end
@impl GenServer
def handle_info({:pubsub, message}, [pname]) do
Logger.info("#{__MODULE__}, #{pname}, sub: #{inspect(message)}")
publish(pname, message)
{:noreply, [pname]}
end
def test do
:ok = :syn.add_node_to_scopes([@scope])
Syn3PubSub.start_link("pub_a", ["afo"], [])
Syn3PubSub.start_link("pub_b", ["bar"], [])
Syn3PubSub.start_link("pub_c", ["bar", "foo"], [])
Syn3PubSub.start_link("sub_x", [], ["afo"])
Syn3PubSub.start_link("sub_y", [], ["afo", "foo"])
Syn3PubSub.start_link("sub_z", [], ["zugan"])
Syn3PubSub.start_link("relay", ["zugan"], ["foo"])
Syn3PubSub.publish("pub_a", "hello")
Process.sleep(100)
Syn3PubSub.publish("pub_b", "world")
Process.sleep(100)
Syn3PubSub.publish("pub_c", "everyone")
end
end
じっさいにいごかしてみる
上のプログラムを実行してみましょう。
❯❯❯ iex -S mix
Erlang/OTP 25 [erts-13.0.3] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [jit]
Compiling 1 file (.ex)
Interactive Elixir (1.14.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> Syn3PubSub.test
03:10:12.648 [notice] SYN[nonode@nohost] Adding node to scope <universe>
03:10:12.658 [notice] SYN[nonode@nohost] Creating tables for scope <universe>
03:10:12.661 [notice] SYN[nonode@nohost|registry<universe>] Discovering the cluster
03:10:12.663 [notice] SYN[nonode@nohost|pg<universe>] Discovering the cluster
03:10:12.665 [info] Elixir.Syn3PubSub, sub_x, sub: "hello"
03:10:12.665 [info] Elixir.Syn3PubSub, sub_y, sub: "hello"
03:10:12.865 [info] Elixir.Syn3PubSub, sub_y, sub: "everyone"
03:10:12.865 [info] Elixir.Syn3PubSub, relay, sub: "everyone"
03:10:12.865 [info] Elixir.Syn3PubSub, sub_z, sub: "everyone"
[ok: 0, ok: 2]
iex(2)>
もう一つの実行例
以下は前回 Syn v2.1 の最初の例を今回のバージョンに移植した場合のコードです。
defmodule PubSub do
require Logger
use GenServer
@scope :universe
def subscriber(name) do
GenServer.start_link(__MODULE__, name)
end
@impl GenServer
def init(name) do
Logger.info("#{__MODULE__}.init: start for #{inspect(name)}")
:syn.register(@scope, name, self())
{:ok, name}
end
@impl GenServer
def handle_info(message, name) do
Logger.info("#{__MODULE__} #{inspect(name)} gets #{inspect(message)}")
{:noreply, name}
end
def register_topics(subscriber, topics_list) do
{pid, _meta} = :syn.lookup(@scope, subscriber)
Enum.map(topics_list, &(:syn.join(@scope, &1, pid)))
end
def publish(topic, message) do
:syn.publish(@scope, topic, message)
end
def test do
:syn.add_node_to_scopes([@scope])
subscriber("sub_a")
subscriber("sub_b")
subscriber("sub_c")
Process.sleep(1000)
PubSub.register_topics("sub_a", ["afo"])
PubSub.register_topics("sub_b", ["bar"])
PubSub.register_topics("sub_c", ["afo", "bar"])
Process.sleep(1000)
PubSub.publish("afo", "hello")
PubSub.publish("bar", "world")
PubSub.publish("foo", "good bye")
end
end
これを実行してみます。
iex(1)> PubSub.test
17:01:23.536 [notice] SYN[nonode@nohost] Adding node to scope <universe>
17:01:23.563 [notice] SYN[nonode@nohost] Creating tables for scope <universe>
17:01:23.567 [notice] SYN[nonode@nohost|registry<universe>] Discovering the cluster
17:01:23.569 [notice] SYN[nonode@nohost|pg<universe>] Discovering the cluster
17:01:23.573 [info] Elixir.PubSub.init: start for "sub_a"
17:01:23.573 [info] Elixir.PubSub.init: start for "sub_b"
17:01:23.573 [info] Elixir.PubSub.init: start for "sub_c"
17:01:25.575 [info] Elixir.PubSub "sub_a" gets "hello"
17:01:25.575 [info] Elixir.PubSub "sub_c" gets "hello"
17:01:25.575 [info] Elixir.PubSub "sub_c" gets "world"
17:01:25.575 [info] Elixir.PubSub "sub_b" gets "world"
{:ok, 0}
iex(2)>
Subscriber 側で Publisher を知りたい
上の例では Subscriber がどの Publisher からのメッセージなのかを知る方法がありません。それをやるためには、Publilsher 自身がメッセージに「送り手が誰か」を入れておくようなプログラミングが必要になります。これはちょっと不便なので Pub/Sub の機構自体が送り手の情報を伝えられるようにしてみます。
メッセージとは別に送り手の情報も入れる
送り手が誰かを伝えるためには publish/2
と handle_info
に少しの手を加えることで実現できます。
defmodule Syn3PubSub do
require Logger
use GenServer
@scope :universe
def start_link(pname, pub_topics_list, sub_topics_list) do
# Logger.info("#{__MODULE__}.start_link: #{pname}, #{inspect(pub_topics_list)}, #{inspect(sub_topics_list)}")
GenServer.start_link(__MODULE__, {pname, pub_topics_list, sub_topics_list})
end
@impl GenServer
def init({pname, pub_topics_list, sub_topics_list}) do
# Logger.info("#{__MODULE__}.init: #{pname}")
pid = self()
:syn.register(@scope, pname, pid, pub_topics_list)
Enum.map(sub_topics_list, &(:syn.join(@scope, &1, pid)))
{:ok, [pname]}
end
def publish(pname, message) do
{_pid, pub_topics_list} = :syn.lookup(@scope, pname)
Enum.map(pub_topics_list, &(:syn.publish(@scope, &1, {:pubsub, pname, message})))
# Logger.info("#{__MODULE__}, #{pname}, pub: #{inspect(pub_topics_list)} #{message}")
end
@impl GenServer
def handle_info({:pubsub, publisher, message}, [pname]) do
Logger.info("#{__MODULE__}, #{pname} gets #{inspect(message)} from #{publisher}")
publish(pname, message)
{:noreply, [pname]}
end
def test do
:ok = :syn.add_node_to_scopes([@scope])
Syn3PubSub.start_link("pub_a", ["afo"], [])
Syn3PubSub.start_link("pub_b", ["bar"], [])
Syn3PubSub.start_link("pub_c", ["bar", "foo"], [])
Syn3PubSub.start_link("sub_x", [], ["afo"])
Syn3PubSub.start_link("sub_y", [], ["afo", "foo"])
Syn3PubSub.start_link("sub_z", [], ["zugan"])
Syn3PubSub.start_link("relay", ["zugan"], ["foo"])
Syn3PubSub.publish("pub_a", "hello")
Process.sleep(100)
Syn3PubSub.publish("pub_b", "world")
Process.sleep(100)
Syn3PubSub.publish("pub_c", "everyone")
end
end
実際にいごかしてみる。
同じ例で動かすと、メッセージ発信元の publisher がだれかを subscriber が分かるようになってるのが分かります。
iex(1)> Syn3PubSub.test
09:09:00.817 [notice] SYN[nonode@nohost] Adding node to scope <universe>
09:09:00.832 [notice] SYN[nonode@nohost] Creating tables for scope <universe>
09:09:00.836 [notice] SYN[nonode@nohost|registry<universe>] Discovering the cluster
09:09:00.843 [notice] SYN[nonode@nohost|pg<universe>] Discovering the cluster
09:09:00.845 [info] Elixir.Syn3PubSub, sub_x gets "hello" from pub_a
09:09:00.845 [info] Elixir.Syn3PubSub, sub_y gets "hello" from pub_a
09:09:01.045 [info] Elixir.Syn3PubSub, sub_y gets "everyone" from pub_c
09:09:01.045 [info] Elixir.Syn3PubSub, relay gets "everyone" from pub_c
09:09:01.045 [info] Elixir.Syn3PubSub, sub_z gets "everyone" from relay
[ok: 0, ok: 2]
iex(2)>
実際の Pub/Sub を用いたプログラミングにおいて、メッセージを受け取ったときに publisher ごとに制御を分けるのであれば、一旦 handle_info
で subscribe してから publisher ごとで分岐することもできます。ただ handle_info
の publisher
仮引数に「受け取りたい相手の publisher 名」を記述することで、パターンマッチで動作を記述することもできます。後者の方がより Elixir らしい記法ですね。
syn の新しい機能の欠点
今回は試しませんでしたが、新しい syn の機能として scope
の概念が持ち込まれました。これを使えば、互いに干渉しない Pub/Sub 空間を難なく導入することができます。
ただしこれは残念なデメリットも持ち込んでます。GenServer
はプロセス登録に外部ライブラリを使えるようになっています。これを利用して syn ver. 2 では、プロセスの起動時に以下のようなシンプルな記述方法が可能でした。こう書くだけでプロセス登録のメカニズムの syn にプロセス名を登録できます。
GenServer.start_link(__MODULE__, name, name: {:via, :syn, name})
ところがこの記法はなんと ver. 3 では使えません。使うと直ちにエラーします。この外部ライブラリを使う場合のドキュメントGenServer: Name registrationには以下の記載があります。
- The
:via
option expects a module that exportsregister_name/2
,unregister_name/1
,whereis_name/1
andsend/2
.
これらの関数が ver. 2 にはあったのですが、ver. 3 ではなくなってます。これと近い関数が register/3
, unregister/2
, lookup/2
, publish/3
としてあるのですが、全部引数が一つ多いです。これ scope を引数に取るためです。このため syn ver. 3 を使ったプロセスの登録は GenServer.start_link
ではやることができず init
関数内で :syn.register
関数を呼んでやるしかなくなります。
これパッと見た感じで、scope を使う必要がない場合に引数を省略できるように register/2
, unregister/1
, lookup/1
, publish/2
を作ってやって、それを register_name/2
, unregister_name/1
, whereis_name/1
, send/2
としてやればそのままうまく GenServer.start_link
のオプションとしていごきそうな気がするのですが、まだ試してみてません。機会があったらやってみたいと思ってます。
まとめ
Syn の新しいバージョン 3.3 での Pub/Sub をやってみました。これまで同様に Pub/Sub の機能が使えることの確認をしました。
さて、明日の #Elixir Advent Calendar 2022 の記事は 山口 大輝 さんの Stable Diffusion(画像生成AI)をローカルインストールしてみた です。お楽しみに!
参考文献
- Syn (HexDocs), Syn (github)
- はじめてなElixir(0)
- はじめてNerves(0) ElixirによるIoTフレームワークNervesがとにかく動くようになるためのリンク集
- GenServer: Name registration
-
要は毎度 Erlang の関数を呼び出す形になります。個人的にはあまり好きではないスタイルです。 ↩