6
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

はじめてな Elixir(32) Syn (v2.1) で Pub/Sub する

Last updated at Posted at 2020-05-01

映えある第 0b100000 回も Pub/Sub です。
前回に引き続き、今回は Erlang の syn モジュールで Pub/Sub してみます。

はじめに

Swarm のドキュメント には「なぜ既存のレジストリを使わずに Swarm を開発したのか」が書いてあって、それによると「global(とおそらく pg2 との組合せ)や gproc はアレだし、synmnesia を使ってるので云々…」というくだりがありました。私はこれ読んで Swarm にチャレンジした次第です。

ところが Syn の ver 2 に至るドキュメントを読むと 2.0 まで mnesia を使っていたのが 2.1 から ETS にしたようなことが書いてあります。となると Swarm の作者が懸念してたことが解消されてるのかもしれません。Syn のドキュメント は大変良く書けているので使ってみたくもなります。

そこで前回の Swarm で書いた Pub/Sub を Syn を使って書き直してみました… というのが今回の記事です。大変簡単でした。関数の構造が良く似ているので、ちょっとマクロを書いたら同じプログラムでも実行できそうなぐらいでした。

Syn で Pub/Sub する

Syn のインストールをするには、以下を mix.exs に追加して mix deps.get してください。バージョンは必ず 2.1 以上で。

mix.exs
  defp deps do
    [
      {:syn, "~> 2.1"},
    ]

以下では はじめてな Elixir(31) Swarm で Pub/Sub する をなぞって実装していきます。主な説明はこれを参照してください。

Subscriber がプロセスで Publish を関数で行う

Swarm での場合同様のモジュールです。APIは一緒です。

defmodule PubSub do
  require Logger
  use GenServer

  def subscriber(name) do
    GenServer.start_link(__MODULE__, name, name: {:via, :syn, name})
  end

  @impl GenServer
  def init(name) do
    Logger.warn("#{__MODULE__}.init: start for #{inspect(name)}")
    {:ok, name}
  end

  def register_subscription_topics(name, topics) do
    Logger.warn("#{__MODULE__} register: #{inspect(name)} subscribes #{topics}")
    pid = :syn.whereis(name)
    Enum.map(topics, &(:syn.join(&1, pid)))
  end

  def put_topic_with_message(topic, msg) do
    :syn.publish(topic, msg)
  end

  @impl GenServer
  def handle_info(msg, name) do
    Logger.warn("#{__MODULE__} handle_info: #{inspect(name)} gets #{inspect(msg)}")
    {:noreply, name}
  end
end

テストの実行は以下でどうぞ。

  • subscriber は sub_A sub_B sub_C がいる
  • sub_A は topic afo を購読、sub_B は topic bar を購読、sub_C は afo と bar を購読
  • topic afo に hello と、topic bar に world と、topic foo に good bye を発信

なお Syn のログメッセージがうるさい場合は Logger.configure(level: :warn) を実行してください。

    PubSub.subscriber("sub_A")
    PubSub.subscriber("sub_B")
    PubSub.subscriber("sub_C")

    PubSub.register_subscription_topics("sub_A", ["afo"])
    PubSub.register_subscription_topics("sub_B", ["bar"])
    PubSub.register_subscription_topics("sub_C", ["afo", "bar"])

    PubSub.put_topic_with_message("afo", "hello")
    PubSub.put_topic_with_message("bar", "world")
    PubSub.put_topic_with_message("foo", "good bye")

subscriber だけでなく publisher もプロセスにする

これも Swarm の場合 同様の API です。

defmodule PubSub do
  require Logger
  use GenServer

  def subscriber(name) do
    GenServer.start_link(__MODULE__, name, name: {:via, :syn, name})
  end

  def publisher(name) do
    GenServer.start_link(__MODULE__, name, name: {:via, :syn, name})
  end

  @impl GenServer
  def init(name) do
    Logger.warn("#{__MODULE__}.init: start for #{inspect(name)}")
    {:ok, name}
  end

  def register_subscription_topics(name, topics) do
    Logger.warn("#{__MODULE__} register: #{inspect(name)} subscribes #{topics}")
    pid = :syn.whereis(name)
    Enum.each(topics, &(:syn.join(&1, pid)))
  end

  def publish(publisher, topic, msg) do
    GenServer.cast(:syn.whereis(publisher), {:publish, topic, msg})
  end

  @impl GenServer
  def handle_cast({:publish, topic, msg}, name) do
    :syn.publish(topic, {name, msg})
    {:noreply, name}
  end

  @impl GenServer
  def handle_info(msg, name) do
    Logger.warn("#{__MODULE__} handle_info: #{inspect(name)} gets #{inspect(msg)}")
    {:noreply, name}
  end
end

これは以下のプログラムなどでテストしてみてください。

  • subscriber は sub_A sub_B sub_C がいる
  • publisher は pub_X pub_Y pub_Z がいる
  • sub_A は topic afo を購読、sub_B は topic bar を購読、sub_C は afo と bar を購読
  • pub_X は topic afo に hello と、pub_Y は topic bar に world と、pub_Z は topic foo に good bye を発信
    PubSub.subscriber("sub_A")
    PubSub.subscriber("sub_B")
    PubSub.subscriber("sub_C")
    PubSub.register_subscription_topics("sub_A", ["afo"])
    PubSub.register_subscription_topics("sub_B", ["bar"])
    PubSub.register_subscription_topics("sub_C", ["afo", "bar"])

    PubSub.publisher("pub_X")
    PubSub.publisher("pub_Y")
    PubSub.publisher("pub_Z")

    PubSub.publish("pub_X", "afo", "hello")
    PubSub.publish("pub_Y", "bar", "world")
    PubSub.publish("pub_Z", "foo", "good bye")

subscriber が publisher にもなれる Pub/Sub

これも Swarm での実装 同様の API です。

defmodule PubSub do
  require Logger
  use GenServer

  def pubsuber(name) do
    GenServer.start_link(__MODULE__, name, name: {:via, :syn, name})
  end

  @impl GenServer
  def init(name) do
    Logger.warn("#{__MODULE__}.init: start for #{name}")
    {:ok, {name, []}}
  end

  def register_subscription_topics(name, topics) do
    Logger.warn("#{__MODULE__} register: #{name} subscribes #{topics}")
    pid = :syn.whereis(name)
    Enum.each(topics, &(:syn.join(&1, pid)))
  end

  def register_publication_topics(name, topics) do
    GenServer.cast(:syn.whereis(name), {:pubtopics, topics})
  end

  def publish(publisher, msg) do
    send(:syn.whereis(publisher), {:publish, {:kickpub, msg}})
  end

  @impl GenServer
  def handle_cast({:pubtopics, topics}, {name, _pub_topics}) do
    {:noreply, {name, topics}}
  end

  @impl GenServer
  def handle_info({:publish, {publisher, msg}}, {name, []}) do
    Logger.warn("#{__MODULE__} handle_info: #{name} gets #{inspect({publisher, msg})}")
    {:noreply, {name, []}}
  end

  @impl GenServer
  def handle_info({:publish, {publisher, msg}}, {name, pub_topics}) do
    Logger.warn("#{__MODULE__} handle_info: #{name} gets #{inspect({publisher, msg})}")
    Enum.each(pub_topics, &(:syn.publish(&1, {:publish, {name, msg}})))
    {:noreply, {name, pub_topics}}
  end
end

これは以下のテストを試してみてください。

  • subscriber は sub_A int_L がいる
  • publisher は pub_X int_L がいる
  • sub_A は topic afo を購読
  • pub_X は topic dog に発信
  • int_L は topic dog を購読して topic afo に発信
  • pub_X はメッセージ hello を発信
    PubSub.pubsuber("pub_X")
    PubSub.register_publication_topics("pub_X", ["dog"])
    PubSub.pubsuber("int_L")
    PubSub.register_subscription_topics("int_L", ["dog"])
    PubSub.register_publication_topics("int_L", ["afo"])
    PubSub.pubsuber("sub_A")
    PubSub.register_subscription_topics("sub_A", ["afo"])
    PubSub.publish("pub_X", "hello")

より複雑なテストはこちらで。

    PubSub.pubsuber("sub_A")
    PubSub.pubsuber("sub_B")
    PubSub.pubsuber("sub_C")
    PubSub.pubsuber("pub_X")
    PubSub.pubsuber("pub_Y")
    PubSub.pubsuber("pub_Z")
    PubSub.pubsuber("int_L")
    PubSub.pubsuber("int_M")
    PubSub.pubsuber("int_N")

    PubSub.register_subscription_topics("sub_A", ["afo"])
    PubSub.register_subscription_topics("sub_B", ["bar"])
    PubSub.register_subscription_topics("sub_B", ["afo", "bar"])
    PubSub.register_publication_topics("int_L", ["afo"])
    PubSub.register_publication_topics("int_M", ["bar"])
    PubSub.register_publication_topics("int_N", ["foo"])

    PubSub.register_subscription_topics("int_L", ["dog"])
    PubSub.register_subscription_topics("int_M", ["dog", "cat"])
    PubSub.register_subscription_topics("int_N", ["fox"])

    PubSub.register_publication_topics("pub_X", ["dog"])
    PubSub.register_publication_topics("pub_Y", ["cat"])
    PubSub.register_publication_topics("pub_Z", ["foo"])

    PubSub.publish("pub_X", "hello")
    PubSub.publish("pub_Y", "world")
    PubSub.publish("pub_Z", "good bye")

まとめ

Swarm 同様の Pub/Sub が Syn v2.1 でもできました。

今後 Swarm を使うか Syn2.1 を使うかは迷うところです。いずれ比較調査をしてみたいです。

参考文献

6
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?