映えある第 0b100000 回も Pub/Sub です。
前回に引き続き、今回は Erlang の syn モジュールで Pub/Sub してみます。
はじめに
Swarm のドキュメント には「なぜ既存のレジストリを使わずに Swarm を開発したのか」が書いてあって、それによると「global(とおそらく pg2 との組合せ)や gproc はアレだし、syn は mnesia を使ってるので云々…」というくだりがありました。私はこれ読んで Swarm にチャレンジした次第です。
ところが Syn の ver 2 に至るドキュメントを読むと 2.0 まで mnesia を使っていたのが 2.1 から ETS にしたようなことが書いてあります。となると Swarm の作者が懸念してたことが解消されてるのかもしれません。Syn のドキュメント は大変良く書けているので使ってみたくもなります。
そこで前回の Swarm で書いた Pub/Sub を Syn を使って書き直してみました… というのが今回の記事です。大変簡単でした。関数の構造が良く似ているので、ちょっとマクロを書いたら同じプログラムでも実行できそうなぐらいでした。
Syn で Pub/Sub する
Syn のインストールをするには、以下を mix.exs
に追加して mix deps.get
してください。バージョンは必ず 2.1 以上で。
defp deps do
[
{:syn, "~> 2.1"},
]
以下では はじめてな Elixir(31) Swarm で Pub/Sub する をなぞって実装していきます。主な説明はこれを参照してください。
Subscriber がプロセスで Publish を関数で行う
Swarm での場合同様のモジュールです。APIは一緒です。
defmodule PubSub do
require Logger
use GenServer
def subscriber(name) do
GenServer.start_link(__MODULE__, name, name: {:via, :syn, name})
end
@impl GenServer
def init(name) do
Logger.warn("#{__MODULE__}.init: start for #{inspect(name)}")
{:ok, name}
end
def register_subscription_topics(name, topics) do
Logger.warn("#{__MODULE__} register: #{inspect(name)} subscribes #{topics}")
pid = :syn.whereis(name)
Enum.map(topics, &(:syn.join(&1, pid)))
end
def put_topic_with_message(topic, msg) do
:syn.publish(topic, msg)
end
@impl GenServer
def handle_info(msg, name) do
Logger.warn("#{__MODULE__} handle_info: #{inspect(name)} gets #{inspect(msg)}")
{:noreply, name}
end
end
テストの実行は以下でどうぞ。
- subscriber は sub_A sub_B sub_C がいる
- sub_A は topic afo を購読、sub_B は topic bar を購読、sub_C は afo と bar を購読
- topic afo に hello と、topic bar に world と、topic foo に good bye を発信
なお Syn のログメッセージがうるさい場合は Logger.configure(level: :warn)
を実行してください。
PubSub.subscriber("sub_A")
PubSub.subscriber("sub_B")
PubSub.subscriber("sub_C")
PubSub.register_subscription_topics("sub_A", ["afo"])
PubSub.register_subscription_topics("sub_B", ["bar"])
PubSub.register_subscription_topics("sub_C", ["afo", "bar"])
PubSub.put_topic_with_message("afo", "hello")
PubSub.put_topic_with_message("bar", "world")
PubSub.put_topic_with_message("foo", "good bye")
subscriber だけでなく publisher もプロセスにする
これも Swarm の場合 同様の API です。
defmodule PubSub do
require Logger
use GenServer
def subscriber(name) do
GenServer.start_link(__MODULE__, name, name: {:via, :syn, name})
end
def publisher(name) do
GenServer.start_link(__MODULE__, name, name: {:via, :syn, name})
end
@impl GenServer
def init(name) do
Logger.warn("#{__MODULE__}.init: start for #{inspect(name)}")
{:ok, name}
end
def register_subscription_topics(name, topics) do
Logger.warn("#{__MODULE__} register: #{inspect(name)} subscribes #{topics}")
pid = :syn.whereis(name)
Enum.each(topics, &(:syn.join(&1, pid)))
end
def publish(publisher, topic, msg) do
GenServer.cast(:syn.whereis(publisher), {:publish, topic, msg})
end
@impl GenServer
def handle_cast({:publish, topic, msg}, name) do
:syn.publish(topic, {name, msg})
{:noreply, name}
end
@impl GenServer
def handle_info(msg, name) do
Logger.warn("#{__MODULE__} handle_info: #{inspect(name)} gets #{inspect(msg)}")
{:noreply, name}
end
end
これは以下のプログラムなどでテストしてみてください。
- subscriber は sub_A sub_B sub_C がいる
- publisher は pub_X pub_Y pub_Z がいる
- sub_A は topic afo を購読、sub_B は topic bar を購読、sub_C は afo と bar を購読
- pub_X は topic afo に hello と、pub_Y は topic bar に world と、pub_Z は topic foo に good bye を発信
PubSub.subscriber("sub_A")
PubSub.subscriber("sub_B")
PubSub.subscriber("sub_C")
PubSub.register_subscription_topics("sub_A", ["afo"])
PubSub.register_subscription_topics("sub_B", ["bar"])
PubSub.register_subscription_topics("sub_C", ["afo", "bar"])
PubSub.publisher("pub_X")
PubSub.publisher("pub_Y")
PubSub.publisher("pub_Z")
PubSub.publish("pub_X", "afo", "hello")
PubSub.publish("pub_Y", "bar", "world")
PubSub.publish("pub_Z", "foo", "good bye")
subscriber が publisher にもなれる Pub/Sub
これも Swarm での実装 同様の API です。
defmodule PubSub do
require Logger
use GenServer
def pubsuber(name) do
GenServer.start_link(__MODULE__, name, name: {:via, :syn, name})
end
@impl GenServer
def init(name) do
Logger.warn("#{__MODULE__}.init: start for #{name}")
{:ok, {name, []}}
end
def register_subscription_topics(name, topics) do
Logger.warn("#{__MODULE__} register: #{name} subscribes #{topics}")
pid = :syn.whereis(name)
Enum.each(topics, &(:syn.join(&1, pid)))
end
def register_publication_topics(name, topics) do
GenServer.cast(:syn.whereis(name), {:pubtopics, topics})
end
def publish(publisher, msg) do
send(:syn.whereis(publisher), {:publish, {:kickpub, msg}})
end
@impl GenServer
def handle_cast({:pubtopics, topics}, {name, _pub_topics}) do
{:noreply, {name, topics}}
end
@impl GenServer
def handle_info({:publish, {publisher, msg}}, {name, []}) do
Logger.warn("#{__MODULE__} handle_info: #{name} gets #{inspect({publisher, msg})}")
{:noreply, {name, []}}
end
@impl GenServer
def handle_info({:publish, {publisher, msg}}, {name, pub_topics}) do
Logger.warn("#{__MODULE__} handle_info: #{name} gets #{inspect({publisher, msg})}")
Enum.each(pub_topics, &(:syn.publish(&1, {:publish, {name, msg}})))
{:noreply, {name, pub_topics}}
end
end
これは以下のテストを試してみてください。
- subscriber は sub_A int_L がいる
- publisher は pub_X int_L がいる
- sub_A は topic afo を購読
- pub_X は topic dog に発信
- int_L は topic dog を購読して topic afo に発信
- pub_X はメッセージ hello を発信
PubSub.pubsuber("pub_X")
PubSub.register_publication_topics("pub_X", ["dog"])
PubSub.pubsuber("int_L")
PubSub.register_subscription_topics("int_L", ["dog"])
PubSub.register_publication_topics("int_L", ["afo"])
PubSub.pubsuber("sub_A")
PubSub.register_subscription_topics("sub_A", ["afo"])
PubSub.publish("pub_X", "hello")
より複雑なテストはこちらで。
PubSub.pubsuber("sub_A")
PubSub.pubsuber("sub_B")
PubSub.pubsuber("sub_C")
PubSub.pubsuber("pub_X")
PubSub.pubsuber("pub_Y")
PubSub.pubsuber("pub_Z")
PubSub.pubsuber("int_L")
PubSub.pubsuber("int_M")
PubSub.pubsuber("int_N")
PubSub.register_subscription_topics("sub_A", ["afo"])
PubSub.register_subscription_topics("sub_B", ["bar"])
PubSub.register_subscription_topics("sub_B", ["afo", "bar"])
PubSub.register_publication_topics("int_L", ["afo"])
PubSub.register_publication_topics("int_M", ["bar"])
PubSub.register_publication_topics("int_N", ["foo"])
PubSub.register_subscription_topics("int_L", ["dog"])
PubSub.register_subscription_topics("int_M", ["dog", "cat"])
PubSub.register_subscription_topics("int_N", ["fox"])
PubSub.register_publication_topics("pub_X", ["dog"])
PubSub.register_publication_topics("pub_Y", ["cat"])
PubSub.register_publication_topics("pub_Z", ["foo"])
PubSub.publish("pub_X", "hello")
PubSub.publish("pub_Y", "world")
PubSub.publish("pub_Z", "good bye")
まとめ
Swarm 同様の Pub/Sub が Syn v2.1 でもできました。
今後 Swarm を使うか Syn2.1 を使うかは迷うところです。いずれ比較調査をしてみたいです。