はじめに
以前、はじめてNerves(13) Elixir API の Zenohex を使って Zenoh で Pub/Sub してみる という記事で Zenohex を紹介しました。これは Eclipse の Pub/Sub プラットフォームである Zenoh への Elixir の API です。この Zenoh のメジャーバージョンが1になりました。
これに合わせて Zenohex も新しくなりました。今回の改定で API が大きく変わっているので、前回の記事同様に Zenohex 入門をしてみます。Zenoh 全体はまずまず大きいので、より高度な使い方は次回以降で(書くとは言ってない)。
なお、あいかわらず Zenohex を「ゼノヘックス」と心のなかで呟いてしまうことが多いのですが、おそらく正しい発音は「ゼノ〜・イーエックス」だと思います。
いごかしてみる
まずは zenohex の環境を作ります。
% elixir -v
Erlang/OTP 27 [erts-15.2.7] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [jit]
Elixir 1.17.3 (compiled with Erlang/OTP 27)
いつものお作法通り mix で環境を作成します。ここでは zenoh05 としておきます。
% mix new zenoh05
% cd zenoh05
mix.exs に1行追加します。
defp deps do
[
+ {:zenohex, "~> 0.5.1"}
# {:dep_from_hexpm, "~> 0.3.0"},
# {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}
]
end
そして依存関係を解決します。
% mix deps.get
これで準備ができました。(若干わらわらと warning が出ましたが構わず無視しました)
単純な Pub/Sub 通信をさせてみる
2つのpublisherと2つのsubscriberを同じ key(トピック)で通信してみます1。以下では4つのターミナルを使います。全部同一のマシン上でいごかします。トピックは afo です。
- terminal 1: Pub側
- terminal 2: Pub側
- terminal 3: Sub側
- terminal 4: Sub側
全てのターミナルで iex を起動する
% iex -S mix
Erlang/OTP 27 [erts-15.2.7] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [jit]
Interactive Elixir (1.17.3) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)>
%iex -S mix
Erlang/OTP 27 [erts-15.2.7] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [jit]
Interactive Elixir (1.17.3) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)>
%iex -S mix
Erlang/OTP 27 [erts-15.2.7] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [jit]
Interactive Elixir (1.17.3) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)>
% iex -S mix
Erlang/OTP 27 [erts-15.2.7] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [jit]
Interactive Elixir (1.17.3) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)>
Publisher の準備をする
さきに Pub 側の準備をします。
まず Zenohex.Session.open/0 関数で session を作ります。関数名や返り値がメジャーチェンジ前と違うので注意してください。
iex(1)> {:ok, session} = Zenohex.Session.open()
{:ok, #Reference<0.4040646170.773718018.48614>}
つぎに Publisher の宣言を Zenohex.Session.declare_publisher/2 で行います。
- 第1引数: さきほどの session の zid
- 第2引数: トピック (key)
今回は key を "afo" にします。
iex(2)> {:ok, publisher} = Session.declare_publisher(session, "afo")
{:ok, #Reference<0.1263928079.4256563201.214873>}
もうひとつの Publisher 用のターミナル2でも同じことをします。
iex(1)> {:ok, session} = Zenohex.Session.open()
{:ok, #Reference<0.2882918809.1579286529.73610>}
iex(2)> {:ok, publisher} = Zenohex.Session.declare_publisher(session, "afo")
{:ok, #Reference<0.2882918809.1579286529.73706>}
Subscriber の準備をする
つぎに Sub 側の準備をします。
まず Zenohex.Session.open/0 関数で session を作ります。
iex(1)> {:ok, session} = Zenohex.Session.open()
{:ok, #Reference<0.3242688437.2652766210.69121>}
つぎに Subscriber の宣言を Zenohex.Session.declare_subscriber/3 で行います。
- 第1引数: さきほどの session の zid
- 第2引数: key
- 第3引数: プロセス
先程の publisher の場合より引数が増えてることに注意です。
iex(2)> {:ok, subscriber} = Zenohex.Session.declare_subscriber(session, "afo", self)
{:ok, #Reference<0.3242688437.2652766210.69237>}
メジャーチェンジ前と第3引数が変わっていることに注意してください。この宣言のもとで該当する key で pub されたら、プロセスにメッセージが来ます。
以前は通知を受け取るコールバック関数を書きましたが、今回のバージョンより通知を受け取るプロセスになってます。これは省略可能でデフォルトは自分のプロセスです。今回は明示的に self と書いてますが、書かなくても同じ動作になります。
もうひとつの Subscriber 用のターミナル4でも同様のことをします。こちらは self を省略してみました。
iex(1)> {:ok, session} = Zenohex.Session.open()
{:ok, #Reference<0.2321999223.3457810433.189076>}
iex(2)> {:ok, subscriber} = Zenohex.Session.declare_subscriber(session, "afo")
{:ok, #Reference<0.2321999223.3457810437.190127>}
メッセージをPub/Subする
では Pub/Sub 通信をしてみましょう。まず、ターミナル1で発信します。
iex(3)> Zenohex.Publisher.put(publisher, "hello")
:ok
なにも起こりません。これ terminal 3 と terminal 4 で起動している IEx のプロセスに value が送られてます。これがバッファに溜まってて直ちには表示されません。見るには手動でバッファをフラッシュしてやる必要があります。
iex(3)> flush
%Zenohex.Sample{
attachment: nil,
congestion_control: :drop,
encoding: "zenoh/bytes",
express: false,
key_expr: "afo",
kind: :put,
payload: "hello",
priority: :data,
timestamp: nil
}
:ok
iex(3)> flush
%Zenohex.Sample{
attachment: nil,
congestion_control: :drop,
encoding: "zenoh/bytes",
express: false,
key_expr: "afo",
kind: :put,
payload: "hello",
priority: :data,
timestamp: nil
}
:ok
Subscriber のあるターミナル3とターミナル4とが受け取って出力してます。受け取った value は payload: として見えています。
こんどはターミナル2から発信します。
Zenohex.Publisher.put(publisher, "world")
:ok
これも flush してやらないと見えません。
iex(4)> flush
%Zenohex.Sample{
attachment: nil,
congestion_control: :drop,
encoding: "zenoh/bytes",
express: false,
key_expr: "afo",
kind: :put,
payload: "world",
priority: :data,
timestamp: nil
}
:ok
iex(4)> flush
%Zenohex.Sample{
attachment: nil,
congestion_control: :drop,
encoding: "zenoh/bytes",
express: false,
key_expr: "afo",
kind: :put,
payload: "world",
priority: :data,
timestamp: nil
}
:ok
Zenoh のつながり方を見てみる
この状態で Publisher と Subscriber とが各々2個上がってます。これは以下のコマンドで状況を見ることができます。
iex(7)> Zenohex.Session.info(session)
{:ok,
%Zenohex.Session.Info{
zid: "9b1e6a760f23ff932df0c7ccd793b953",
routers_zid: [],
peers_zid: ["708a8463390cd40247f66b1fcd9f776c",
"4abbb93e2bcea0967fe6672751b8d2d2", "61669eed9aea5788abf42d2012d6f1ec",
"e19d2673dc5469cbea74cb0e04934ed4"]
}}
セッションに関する pid 的な識別子が zid という名前になってます。ここではまず自分のセッションの zid がわかります。次に routers_zid というのは Zenoh のルータ機能を持つノードのことを示しています。今回は使ってないので空リストになってます。最後の peers_zid というのは Zenoh の通信相手になってるノードの zid です。今回は terminal 2 〜 4 が該当しますので、3つの zid のリストが返ってきてます。
セッションを終了する
不要になったセッションを終了するには Zenohex.Session.close を用います。
iex(8)> Zenohex.Session.close(session)
:ok:
次に行くのに、全部のターミナルでセッションを閉じておきます。
iex(4)> Zenohex.Session.close(session)
:ok
iex(5)> Zenohex.Session.close(session)
:ok
iex(5)> Zenohex.Session.close(session)
:ok
若干複雑な Pub/Sub 通信をさせてみる
Zenoh の key には木構造を持たせられます。またパターンマッチをさせることもできます。これを試してみます。
まず terminal 1 と 2 について。先程の例と同様にセッションを開いたうえで、 Publisher であることを宣言しても良いのですが、簡単に実験するために今回はこのままにしておきます。
次に Subscriber 側のセッションを開きます。こちらは Subscriber の宣言をしておきます。ただし key に若干の工夫を入れます。
iex(6)> {:ok, session} = Zenohex.Session.open()
{:ok, #Reference<0.3242688437.2652766210.69353>}
iex(7)> {:ok, subscriber} = Zenohex.Session.declare_subscriber(session, "afo/*")
{:ok, #Reference<0.3242688437.2652766210.69378>}
iex(6)> {:ok, session} = Zenohex.Session.open()
{:ok, #Reference<0.2321999223.3457810437.190241>}
iex(7)> {:ok, subscriber} = Zenohex.Session.declare_subscriber(session, "afo/bar$*")
{:ok, #Reference<0.2321999223.3457810437.190419>}
terminal 3 では key として "afo/*" を、terminal 4 では key に "afo/bar$*" を指定してます。
いくつかの key で publish してみます。セッションを一旦開いて、publish し、そのセッションを閉じるというのを1行で実行させることができますので、ここではそれを用います。
iex(12)> Zenohex.put("afo/bar100", "see you")
:ok
key として afo/bar100 を指定しています。
iex(11)> flush
%Zenohex.Sample{
attachment: nil,
congestion_control: :drop,
encoding: "zenoh/bytes",
express: false,
key_expr: "afo/bar100",
kind: :put,
payload: "see you",
priority: :data,
timestamp: nil
}
:ok
iex(12)> flush
%Zenohex.Sample{
attachment: nil,
congestion_control: :drop,
encoding: "zenoh/bytes",
express: false,
key_expr: "afo/bar100",
kind: :put,
payload: "see you",
priority: :data,
timestamp: nil
}
:ok
先程同様に terminal の 3 と 4 の IEx プロセスにメッセージが飛んでいました。これはどちらの subscriber の key にも "afo/bar100" がマッチするからです。
次に key を "afo/foo" として publish してみます。
iex(15)> Zenohex.put("afo/foo", "tomorrow")
:ok
iex(14)> flush
%Zenohex.Sample{
attachment: nil,
congestion_control: :drop,
encoding: "zenoh/bytes",
express: false,
key_expr: "afo/foo",
kind: :put,
payload: "tomorrow",
priority: :data,
timestamp: nil
}
:ok
iex(17)> flush
:ok
このように terminal 3 にはメッセージが飛びましたが、terminal 4 には飛びませんでした。これは "afo/foo" が前者の subscriber の key "afo/*" にはマッチするけれども、後者の "afo/bar$*" にはマッチしなかったためです。
このように、UNIX のシェルコマンドでファイル名のマッチに使うような / * $ # といった特殊文字を使って key のパターンマッチをさせることができます。詳しくは Zenoh の公式ドキュメント
-
Abstractions の
Key Expressionの項 - The Key Expressions Language
を参照してください。
まとめ
メジャーバージョンが 1.0 になった新しい Zenoh の Elixir API である Zenohex を用いて Pub/Sub をしてみました。簡単に Pub/Sub できました。key のパターンマッチが増えててより複雑な動作をさせられるようになってました。
今回は NAT越えやネットワーク間通信をするための Zenoh ルータを用いていません。また Zenoh の持つ単純な pub/sub 以上の機能である Query 機能も使っていません。今後、機会があれば試していきたいと思います。
謝辞
今回の Zenohex の改定は @takasehideki さんと @pojiro さんの貢献によるものです。感謝いたします。
参考文献
-
通常の Pub/Sub で、トピック・メッセージと呼ぶ通信の単位を、Zenoh ではちょっとDBっぽく
keyとvalueと読んでます。 ↩