LoginSignup
6
2

More than 3 years have passed since last update.

はじめてな Elixir(31) Swarm で Pub/Sub する

Last updated at Posted at 2020-04-30

「はじめてな Elixir シリーズ」も映えある第0b11111回となりました。
前回の記事で Elixir Registry で Pub/Sub をしてみました。今回は Swarm というライブラリを使ってやってみます。

Swarm とは

プロセスに名前をつけるのに Erlang や Elixir では公式非公式にいくつかの手法が提供されています。Swarm はそのひとつです。Elixir 用に(Erlang 用のを流用するのではなく)提供されていて、かつ分散環境で使えるのが探した限りではこれだけなようなので選びました。
通常 Swarm は kubernetes とか docker とかのコンテナで「プロセス走らせて、何かあったら別のコンテナで再実行」とかの分散処理に使われてるようです。GitHub で見ると Swarm 自体は2年近くメンテされてないのですが、これを用いてコンテナを使うのは流行っているようなので、それなりに安定しているものと解釈して使ってみます。

Swarm の概念

Swarm は Hex のドキュメントが恐ろしく不親切です。関数の説明はぶっきらぼうで、チュートリアルもなければ使用例もありません。本気で使うならこっちでドキュメントを書いてプルリクでも出そうかと思うほど悲惨です1Swarm のコンセプトや設計についてのドキュメント もありますが、これを読んでも使えるようになるわけではありません。ここでは簡単に Swarm の考え方と使い方を押さえておきます。

Swarm は他のプロセス登録メカニズム同様に、プロセスに対して名前をつけて プロセス名とプロセスID のペアを管理する機能を提供します。

  • プロセス名は Atom でなくても良い
  • プロセスグループの概念があり、プロセス名空間とは別にグループ名空間がある
  • プロセスは(0以上の複数の)グループに所属することができる
  • プロセスに対してメッセージを送ることができる
    • グループに所属するプロセス全てに対してもメッセージを送ることができる

この最後の機能が Pub/Sub を実現するキモになります。Pub/Sub で言うところの topic は Swarm を用いる場合にはこのグループにその機能を担わせます。

Elixir Registry との違い

Elixir の Registry ライブラリはプロセスに対して key value のペアを登録します。プロセスの名前は key であり、value は付帯的な値です。複数のプロセスが同一の key で登録できるので、この key を Pub/Sub の topic として使えます。これが Swarm で topic を表現するのはグループになることに注意してください。Swarm では同一プロセス名に対して複数のプロセスは登録できません。グループには複数プロセスを登録できるので、topic にはグループを使います。
Swarm ではプロセスが必ず単一の名前を持つため Pub/Sub をする場合でも GenServer.start_link 関数を呼び出す場合に name: {:via, :swarm, process_name} のオプションを付けて命名ができます。Registry の場合には Pub/Sub として用いる場合にはこれができませんでした。ちょっとしたことですがプロセスの見通しがずっと良くなります。

あと、Elixir Registry ライブラリに慣れてるとびっくりするのが「プログラム中では明示的に実行しなくても勝手に Supervisor 下で動き出す」ということです。なので iex -S mix とやってボーッっとしてると Swarm が起動する際のログメッセージがドンドン出てきて「おやおやおや…」となります。
これ deps/swarm/mix.exs の applications に記述してあるからのようです。さらにこのため「用途に応じて複数の Swarm プロセスを Supervisor にぶら下げる」ことができなさそうです。Elixir Registry のときはプログラマが明示的に Supervisor にぶら下げるので、そのときに name: オプションで名前をつけて複数の Registry プロセスを動かすことができましたが、Swarm ではこのようなことができません。

Swarm での Pub/Sub

まずは Swarm のインストールを行います。
mix new して当該ディレクトリに降りて mix.exs に以下を追加して mix deps.get すれば準備オケです。

mix.exs
  defp deps do
    [
      {:swarm, "~> 3.4"},
    ]

Publish を関数で Subscribe をプロセス間通信で行う Pub/Sub

まず前回の記事同様の機能を作ってみます。Subscriber はプロセスとして立ち上げて、それらが購読する topic を指定します。Publisher という概念はまだなくて、publish 操作を関数で実行することで topic を購読している subscriber にメッセージを投げ込みます。

pubsub.ex
defmodule PubSub do
  require Logger
  use GenServer

  def subscriber(name) do
    GenServer.start_link(__MODULE__, name, name: {:via, :swarm, name}) # Swarm で名前登録
  end

  @impl GenServer
  def init(name) do
    Logger.warn("#{__MODULE__}.init: start for #{inspect(name)}")
    {:ok, name}
  end

  def register_subscription_topics(name, topics) do
    Logger.warn("#{__MODULE__} register: #{inspect(name)} subscribes #{topics}")
    pid = Swarm.whereis_name(name)
    Enum.each(topics, &(Swarm.join(&1, pid))) # 「トピックの購読」を「グループへのプロセス登録」で行う
  end

  def put_topic_with_message(topic, msg) do
    Swarm.publish(topic, msg)   # 「トピックへの発信」を「グループ所属グループへのメッセージ発送」で行う
  end

  @impl GenServer
  def handle_info(msg, name) do
    Logger.warn("#{__MODULE__} handle_info: #{inspect(name)} gets #{inspect(msg)}")
    # ここに subscriber がトピックへのメッセージを受け取ったときの動作を記述する
    {:noreply, name}
  end
end

コンソール出力には Logger.warn を使います。これ Swarm が Logger.debugLogger.info を使うので、簡単に峻別するために warn にしましたが、別に Warning 的な意味で使ってるわけではありません。

これを使ってテストしてみます。

  • subscriber は sub_A sub_B sub_C がいる
  • sub_A は topic afo を購読、sub_B は topic bar を購読、sub_C は afo と bar を購読
  • topic afo に hello と、topic bar に world と、topic foo に good bye を発信

以下は理解しやすくなるように邪魔な出力を排除したものです。Swarm のログメッセージを見たくない場合は Logger.configure(level: :warn) を実行してください。

$ iex -S mix
Erlang/OTP 22 [erts-10.7] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:1] [hipe]
Interactive Elixir (1.10.2) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> PubSub.subscriber("sub_A") # subscriber の登録

02:04:03.523 [warn]  Elixir.PubSub.init: start for sub_A
{:ok, #PID<0.221.0>}
iex(2)> PubSub.subscriber("sub_B") # subscriber の登録

02:04:10.887 [warn]  Elixir.PubSub.init: start for sub_B
{:ok, #PID<0.223.0>}
iex(3)> PubSub.subscriber("sub_C") # subscriber の登録

02:04:21.934 [warn]  Elixir.PubSub.init: start for sub_C
{:ok, #PID<0.225.0>}
iex(4)> PubSub.register_subscription_topics("sub_A", ["afo"]) # topic の登録

02:04:30.840 [warn]  Elixir.PubSub register: sub_A subscribes afo
:ok
iex(5)> PubSub.register_subscription_topics("sub_B", ["bar"]) # topic の登録

02:04:39.405 [warn]  Elixir.PubSub register: sub_B subscribes bar
:ok
iex(6)> PubSub.register_subscription_topics("sub_C", ["afo", "bar"]) # topic の登録

02:04:46.109 [warn]  Elixir.PubSub register: sub_C subscribes afobar
:ok
iex(7)> PubSub.put_topic_with_message("afo", "hello") # topic "afo" でメッセージを発信

02:04:56.954 [warn]  Elixir.PubSub handle_info: sub_A gets "hello"

02:04:56.954 [warn]  Elixir.PubSub handle_info: sub_C gets "hello"
:ok
iex(8)> PubSub.put_topic_with_message("bar", "world") # topic "bar" でメッセージを発信

02:05:03.679 [warn]  Elixir.PubSub handle_info: sub_C gets "world"

02:05:03.679 [warn]  Elixir.PubSub handle_info: sub_B gets "world"
:ok
iex(9)> PubSub.put_topic_with_message("foo", "good bye") # topic "foo" でメッセージを発信
:ok

このように subscriber を登録して、それが特定の topic を購読するようにして、それに対してメッセージの発信ができます。

なお、以下が実際の動作での出力全部です。Swarm のログがかなりうるさく出てきます。

$ iex -S mix
Erlang/OTP 22 [erts-10.7] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:1] [hipe]

Compiling 1 file (.ex)

02:03:53.374 [info]  [swarm on nonode@nohost] [tracker:init] started
Interactive Elixir (1.10.2) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> 
02:03:58.380 [info]  [swarm on nonode@nohost] [tracker:cluster_wait] joining cluster..

02:03:58.380 [info]  [swarm on nonode@nohost] [tracker:cluster_wait] no connected nodes, proceeding without sync
PubSub.subscriber("sub_A")

02:04:03.523 [debug] [swarm on nonode@nohost] [tracker:handle_call] registering #PID<0.221.0> as "sub_A", with metadata %{}

02:04:03.523 [warn]  Elixir.PubSub.init: start for sub_A
{:ok, #PID<0.221.0>}
iex(2)> PubSub.subscriber("sub_B")

02:04:10.886 [debug] [swarm on nonode@nohost] [tracker:handle_call] registering #PID<0.223.0> as "sub_B", with metadata %{}

02:04:10.887 [warn]  Elixir.PubSub.init: start for sub_B
{:ok, #PID<0.223.0>}
iex(3)> PubSub.subscriber("sub_C")

02:04:21.934 [debug] [swarm on nonode@nohost] [tracker:handle_call] registering #PID<0.225.0> as "sub_C", with metadata %{}

02:04:21.934 [warn]  Elixir.PubSub.init: start for sub_C
{:ok, #PID<0.225.0>}
iex(4)> PubSub.register_subscription_topics("sub_A", ["afo"])

02:04:30.840 [warn]  Elixir.PubSub register: sub_A subscribes afo

02:04:30.840 [debug] [swarm on nonode@nohost] [tracker:handle_call] add_meta {"afo", true} to #PID<0.221.0>
:ok
iex(5)> PubSub.register_subscription_topics("sub_B", ["bar"])

02:04:39.405 [warn]  Elixir.PubSub register: sub_B subscribes bar

02:04:39.405 [debug] [swarm on nonode@nohost] [tracker:handle_call] add_meta {"bar", true} to #PID<0.223.0>
:ok
iex(6)> PubSub.register_subscription_topics("sub_C", ["afo", "bar"])

02:04:46.109 [warn]  Elixir.PubSub register: sub_C subscribes afobar

02:04:46.109 [debug] [swarm on nonode@nohost] [tracker:handle_call] add_meta {"afo", true} to #PID<0.225.0>

02:04:46.109 [debug] [swarm on nonode@nohost] [tracker:handle_call] add_meta {"bar", true} to #PID<0.225.0>
:ok
iex(7)> PubSub.put_topic_with_message("afo", "hello")

02:04:56.954 [warn]  Elixir.PubSub handle_info: sub_A gets "hello"

02:04:56.954 [warn]  Elixir.PubSub handle_info: sub_C gets "hello"
:ok
iex(8)> PubSub.put_topic_with_message("bar", "world")

02:05:03.679 [warn]  Elixir.PubSub handle_info: sub_C gets "world"

02:05:03.679 [warn]  Elixir.PubSub handle_info: sub_B gets "world"
:ok
iex(9)> PubSub.put_topic_with_message("foo", "good bye")
:ok

Publish もプロセス間通信で行う Pub/Sub

上の例では Publish 操作を関数を叩くことで実現していました。このため publisher という概念がありませんでした。つぎにここでは publisher もプロセスとして動いているような場合を想定してみます。くわえて、上のプログラムではメッセージを受け取りはしますが、どの publisher が発信したメッセージなのかは subscriber には秘匿してしまっていました。以下ではメッセージには publisher 名を付帯させるようにします。

pubsub.ex
defmodule PubSub do
  require Logger
  use GenServer

  def subscriber(name) do # subscriber プロセスの登録
    GenServer.start_link(__MODULE__, name, name: {:via, :swarm, name})
  end

  def publisher(name) do # publisher プロセスの登録
    GenServer.start_link(__MODULE__, name, name: {:via, :swarm, name})
  end

  @impl GenServer
  def init(name) do
    Logger.warn("#{__MODULE__}.init: start for #{inspect(name)}")
    {:ok, name}
  end

  def register_subscription_topics(name, topics) do # subscriber の topic 登録
    Logger.warn("#{__MODULE__} register: #{inspect(name)} subscribes #{topics}")
    pid = Swarm.whereis_name(name)
    Enum.each(topics, &(Swarm.join(&1, pid)))
  end

  def publish(publisher, topic, msg) do # publisher からの topic に対する msg メッセージの発信
    GenServer.cast(Swarm.whereis_name(publisher), {:publish, topic, msg})
  end

  @impl GenServer
  def handle_cast({:publish, topic, msg}, name) do
    Swarm.publish(topic, {name, msg})
    {:noreply, name}
  end

  @impl GenServer
  def handle_info(msg, name) do
    Logger.warn("#{__MODULE__} handle_info: #{inspect(name)} gets #{inspect(msg)}")
    {:noreply, name}
  end
end

これを動かしてみます。

  • subscriber は sub_A sub_B sub_C がいる
  • publisher は pub_X pub_Y pub_Z がいる
  • sub_A は topic afo を購読、sub_B は topic bar を購読、sub_C は afo と bar を購読
  • pub_X は topic afo に hello と、pub_Y は topic bar に world と、pub_Z は topic foo に good bye を発信

以下も邪魔な Swarm からのメッセージを排除した実行例です。

$ iex -S mix
Erlang/OTP 22 [erts-10.7] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:1] [hipe]


iex(1)> PubSub.subscriber("sub_A")

02:11:07.370 [warn]  Elixir.PubSub.init: start for sub_A
{:ok, #PID<0.211.0>}
iex(2)> PubSub.subscriber("sub_B")

02:11:10.377 [warn]  Elixir.PubSub.init: start for sub_B
{:ok, #PID<0.213.0>}
iex(3)> PubSub.subscriber("sub_C")

02:11:13.386 [warn]  Elixir.PubSub.init: start for sub_C
{:ok, #PID<0.215.0>}
iex(4)> PubSub.register_subscription_topics("sub_A", ["afo"])

02:11:19.744 [warn]  Elixir.PubSub register: sub_A subscribes afo
[:ok]
iex(5)> PubSub.register_subscription_topics("sub_B", ["bar"])

02:11:26.692 [warn]  Elixir.PubSub register: sub_B subscribes bar
[:ok]
iex(6)> PubSub.register_subscription_topics("sub_C", ["afo", "bar"])

02:11:33.701 [warn]  Elixir.PubSub register: sub_C subscribes afo
02:11:33.701 [warn]  Elixir.PubSub register: sub_C subscribes bar
[:ok, :ok]
iex(7)> PubSub.publisher("pub_X")

02:11:41.337 [warn]  Elixir.PubSub.init: start for pub_X
{:ok, #PID<0.220.0>}
iex(8)> PubSub.publisher("pub_Y")

02:11:44.649 [warn]  Elixir.PubSub.init: start for pub_Y
{:ok, #PID<0.222.0>}
iex(9)> PubSub.publisher("pub_Z")

02:11:47.908 [warn]  Elixir.PubSub.init: start for pub_Z
{:ok, #PID<0.224.0>}
iex(10)> PubSub.publish("pub_X", "afo", "hello")
:ok

02:11:54.054 [warn]  Elixir.PubSub handle_info: sub_C gets {"pub_X", "hello"}

02:11:54.054 [warn]  Elixir.PubSub handle_info: sub_A gets {"pub_X", "hello"}
iex(11)> PubSub.publish("pub_Y", "bar", "world")
:ok

02:12:01.399 [warn]  Elixir.PubSub handle_info: sub_C gets {"pub_Y", "world"}

02:12:01.399 [warn]  Elixir.PubSub handle_info: sub_B gets {"pub_Y", "world"}
iex(12)> PubSub.publish("pub_Z", "foo", "good bye")
:ok

このように publisher をプロセスにして同様のことができました。

Subscriber が Publisher にもなれる Pub/Sub

ではさらに機能を追加してみます。メッセージを受け取った subscriber が publisher となってメッセージを送信するということを可能にします。

まず、上のプログラムの例を見てもらうと subscriber と publisher の定義に違いがないことが分かります。ですので、これらを統一してしまって subscriber と publisher の区別をなくしてしまいます。つぎに publisher に対してもどの topic に発信するのかを事前に登録するようにします。subscriber の場合は登録は Swarm のグループへのプロセスの登録でしたが、publisher の場合は該当する概念が Swarm にはないので、プロセスの状態として topic のリストを持ち回るようにします。

pubsub.ex
defmodule PubSub do
  require Logger
  use GenServer

  def pubsuber(name) do
    GenServer.start_link(__MODULE__, name, name: {:via, :swarm, name})
  end

  @impl GenServer
  def init(name) do
    Logger.warn("#{__MODULE__}.init: start for #{inspect(name)}")
    {:ok, {name, []}}
  end

  def register_subscription_topics(name, topics) do
    Logger.warn("#{__MODULE__} register: #{inspect(name)} subscribes #{topics}")
    pid = Swarm.whereis_name(name)
    Enum.each(topics, &(Swarm.join(&1, pid)))
  end

  def register_publication_topics(name, topics) do
    GenServer.cast(Swarm.whereis_name(name), {:pubtopics, topics})
  end

  def publish(publisher, msg) do
    send(Swarm.whereis_name(publisher), {:publish, {:kickpub, msg}})
  end

  @impl GenServer
  def handle_cast({:pubtopics, topics}, {name, _pub_topics}) do
    {:noreply, {name, topics}}
  end

  @impl GenServer
  def handle_info({:publish, {publisher, msg}}, {name, []}) do
    Logger.warn("#{__MODULE__} handle_info: #{inspect(name)} gets #{inspect({publisher, msg})}")
    {:noreply, {name, []}}
  end

  @impl GenServer
  def handle_info({:publish, {publisher, msg}}, {name, pub_topics}) do
    Logger.warn("#{__MODULE__} handle_info: #{inspect(name)} gets #{inspect({publisher, msg})}")
    Enum.each(pub_topics, &(Swarm.publish(&1, {:publish, {name, msg}})))
    {:noreply, {name, pub_topics}}
  end
end

この様にすると登録したプロセスは以下のように分類されます。

  • publisher: publish 用の topics が空リストでなく、subscribe 用の topic を登録してないプロセス
  • subscriber: publish 用の topics が空リストで、subscribe 用の topic を登録したプロセス

publish 用の topic も subscribe 用の topic も登録したプロセスは、一旦メッセージを subscriber として受け取ったあとにさらにそれを publisher として再発信するようなプロセスになります。

やや複雑になりましたので、簡単な例で実行してみます。

  • subscriber は sub_A int_L がいる
  • publisher は pub_X int_L がいる
  • sub_A は topic afo を購読
  • pub_X は topic dog に発信
  • int_L は topic dog を購読して topic afo に発信
  • pub_X はメッセージ hello を発信

例によって以下では Swarm からのログメッセージを削除しています。

$ iex -S mix
Erlang/OTP 22 [erts-10.7] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:1] [hipe]
Interactive Elixir (1.10.2) - press Ctrl+C to exit (type h() ENTER for help)

iex(1)> PubSub.pubsuber("pub_X")        

02:33:19.728 [warn]  Elixir.PubSub.init: start for pub_X
{:ok, #PID<0.211.0>}
iex(2)> PubSub.register_publication_topics("pub_X", ["dog"])
:ok
iex(3)> PubSub.pubsuber("int_L")

02:33:37.913 [warn]  Elixir.PubSub.init: start for int_L
{:ok, #PID<0.214.0>}
iex(4)> PubSub.register_subscription_topics("int_L", ["dog"])

02:33:44.575 [warn]  Elixir.PubSub register: int_L subscribes dog

:ok
iex(5)> PubSub.register_publication_topics("int_L", ["afo"])
:ok
iex(6)> PubSub.pubsuber("sub_A")


02:33:57.430 [warn]  Elixir.PubSub.init: start for sub_A
{:ok, #PID<0.218.0>}
iex(7)> PubSub.register_subscription_topics("sub_A", ["afo"])

02:34:03.674 [warn]  Elixir.PubSub register: sub_A subscribes afo

:ok
iex(8)> PubSub.publish("pub_X", "hello")
{:publish, {:kickpub, "hello"}}

02:34:12.611 [warn]  Elixir.PubSub handle_info: pub_X gets {:kickpub, "hello"}

02:34:12.611 [warn]  Elixir.PubSub handle_info: int_L gets {"pub_X", "hello"}

02:34:12.612 [warn]  Elixir.PubSub handle_info: sub_A gets {"int_L", "hello"}

最後の3行を見ると、メッセージが pub_X → int_L → sub_A と受け渡されていくのが分かります。なおこのときに受け取るのがメッセージだけでなく、publisher とメッセージのタプルになってるのも分かります。

以下はより複雑な例です。

  PubSub.pubsuber("sub_A")
  PubSub.pubsuber("sub_B")
  PubSub.pubsuber("sub_C")
  PubSub.pubsuber("pub_X")
  PubSub.pubsuber("pub_Y")
  PubSub.pubsuber("pub_Z")
  PubSub.pubsuber("int_L")
  PubSub.pubsuber("int_M")
  PubSub.pubsuber("int_N")
  PubSub.register_subscription_topics("sub_A", ["afo"])
  PubSub.register_subscription_topics("sub_B", ["bar"])
  PubSub.register_subscription_topics("sub_B", ["afo", "bar"])
  PubSub.register_publication_topics("int_L", ["afo"])
  PubSub.register_publication_topics("int_M", ["bar"])
  PubSub.register_publication_topics("int_N", ["foo"])
  PubSub.register_subscription_topics("int_L", ["dog"])
  PubSub.register_subscription_topics("int_M", ["dog", "cat"])
  PubSub.register_subscription_topics("int_N", ["fox"])
  PubSub.register_publication_topics("pub_X", ["dog"])
  PubSub.register_publication_topics("pub_Y", ["cat"])
  PubSub.register_publication_topics("pub_Z", ["foo"])

以上を仕込んだ上で以下を投入してみてください。

  PubSub.publish("pub_X", "hello")
  PubSub.publish("pub_Y", "world")
  PubSub.publish("pub_Z", "good bye")

以下のようにメッセージが受け渡されていくのが観測されます。

02:37:09.667 [warn]  Elixir.PubSub handle_info: pub_X gets {:kickpub, "hello"}
02:37:09.667 [warn]  Elixir.PubSub handle_info: pub_Y gets {:kickpub, "world"}
02:37:09.667 [warn]  Elixir.PubSub handle_info: pub_Z gets {:kickpub, "good bye"}
02:37:09.667 [warn]  Elixir.PubSub handle_info: int_M gets {"pub_X", "hello"}
02:37:09.667 [warn]  Elixir.PubSub handle_info: int_L gets {"pub_X", "hello"}
02:37:09.668 [warn]  Elixir.PubSub handle_info: int_M gets {"pub_Y", "world"}
02:37:09.668 [warn]  Elixir.PubSub handle_info: sub_B gets {"int_M", "hello"}
02:37:09.668 [warn]  Elixir.PubSub handle_info: sub_B gets {"int_L", "hello"}
02:37:09.668 [warn]  Elixir.PubSub handle_info: sub_A gets {"int_L", "hello"}
02:37:09.668 [warn]  Elixir.PubSub handle_info: sub_B gets {"int_M", "world"}

まとめと今後の課題

プロセス登録ライブラリ Swarm を使って Pub/Sub 機構を実装してみました。今回、以下のような機能を実装しました。

  • subscriber だけでなく publisher もプロセスとして登録できる
  • 実行上は subscriber と publisher との区別がなく、どのプロセスも発信・受信・その両方を実行可能である
  • メッセージを受け取るとき、直近の publish をどのプロセスが行ったのかの情報が subscriber で受け取れる

Swarm は分散環境で実行可能です。こんどは複数ノードでも動くことを確認したいと思います。

謝辞

これらは主に各地の Elixir コミュニティのもくもく会で手を動かしたものです。以下のコミュニティのみなさま、もくもく会を主催していただいたみなさま、各種リソースを提供していただいたみなさまに感謝申し上げます。

参考文献


  1. 2020年4月30日現在の話です。 

6
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
2