最近zenohという通信ライブラリを調べていたのですが、国内では情報が殆ど引っかからなかったので共有したいと思います。
zenohとは
zetta scale社が開発しているPub/Sub通信ライブラリで、MQTTなどと比べても性能や相互運用性に優れたライブラリです。
Webサイトではドキュメントや事例、ブログ記事も充実しています。定期的にWebinarも開催しているようです。本記事の殆どは本家サイトのBlogやgithub, Webinarからピックアップしてきた内容となります。
githubリポジトリは以下の通りです。
https://github.com/eclipse-zenoh/zenoh
また、プラグインや他言語のクライアントライブラリは、一つ上の階層から辿ってください。
https://github.com/eclipse-zenoh/
最新版はv0.7.0-rc です。(2022/12/22時点)
以前はOCamlで実装されていたものをRustに移植したとのことで、アーキテクチャは堅固で完成度は高いのですが、
バージョン毎に大きな変更が入っているらしく、バージョンが異なると通信できなくなることも多いので注意が必要です。
後述するzenoh routerやクライアントライブラリ、プラグインのバージョンは揃えるようにしてください。
通信方式
単純なPub/Subだけでなくいくつかのバリエーションを持っています
- Pub / Sub (push) : よくあるPub/Sub形式です。Subが購読しているトピックにPubがメッセージを配信すると、Subのコールバック関数が呼ばれます。
- Pub / Sub (pull) : Pub/Subなのですが、上記と異なりSubにメッセージが届いた際に一時保管され、Subが任意のタイミングで読み出せる形式です。
- Pub / Store / Get : 上記のPub / Sub(pull)と挙動はほぼ同じですが、Pub-Sub間に中継サーバが入り、メッセージはそこに一時保管される形式です。
- Get / Reply : 特定のトピックの呼び出しに対し、返答を返すような形式です。RPCのようなものとして利用できそうです。
ネットワーク構造
よくあるPub/Sub通信では統括するBrokerにノードが接続し、Pubノードが出版すると、配下で購読しているSubノードに配信される形式が多いですが、Zenohでは様々なネットワーク構成を取ることができます。
もちろん上記を混成してもよくゴチャっとしたネットワーク構造でも、ちゃんと配信してくれます。またRouterの設定次第でトピックのフィルタリングなども行えるようです。
また、Clientから接続先のRouterは複数登録しておき、状況に応じて繋ぎ変えたり、障害時に迂回するといった使い方もできるようです。
ネットワークレイヤ
実装レイヤはNetwork層から上となるのですが、状況に応じて様々なプロトコルやネットワークを使い分けることが可能となっています。
デフォルトではUDP Multicastで動作しているようですが、同一PC内であればプロセス間通信、組込み機器とはSerial、信頼性が欲しいのでTLS等々、設定で切り替えることができるようです。もちろんどのような下位層となっても同じAPIでPub/Subできます。
(https://zenoh.io/blog/2022-08-12-zenoh-serial/ から引用)
相互運用性
Zenohは相互運用性にも優れ、様々なプロトコルと接続することができます。変換Pluginをノードとして追加したり、Routerに差し込むことでMQTT、ROS 2(DDS)、RESTなどと通信することができます。また上記のPub / Store / GetのStore部にDBを繋ぐこともできるようです。
(第一回Webinar資料より引用)
組込み向けZenoh
組込みデバイスでもZenohを利用することができます。Raspberry Pi相当であればそのままRustのクライアントライブラリを利用すればいいですが、ArduinoのようにRustが利用できない環境も多いです。Zenoh-picoはzenohをcに軽量再実装したものでArduinoやESP32などちっちゃいマイコンでもサクサク動くようです。またWifiを持たないデバイスでもシリアル経由で通信できてしまうようです。すごいですね。
Zenoh getting started
さて、前置きが長くなってしまいました。動かし方について見ていきましょう。
前述したように、zenohではclientとrouterがあります。Routerの方は中継機能を持つ単一のプログラムなので、バイナリをサーバに配置すればすぐ動きます。一方clientはライブラリを使ってPub, Subする内容を記述する必要があります。
まずはclientのサンプルプログラムを動かしてみましょう。
準備
Rust環境が必要となります。適当にググって入れて下さい。
zenohプロジェクトのcloneとbuild
zenohはバージョンにセンシティブなので今回はv0.7.0に縛っています。適宜最新のRelease Tagに変更してください。
git clone -b 0.7.0-rc https://github.com/eclipse-zenoh/zenoh.git
cd zenoh
cargo build --release --all-targets
そこそこ時間が掛かるので、サンプルのソースコードでも眺めてみましょう。
/examples/examples/ 以下に各サンプルコードが含まれています。今回はz_sub.rs, z_pub.rsを利用します。
以下、z_sub.rsの要点の部分だけ抽出したものです。他の部分は概ね引数を設定に反映する部分となります。
let session = zenoh::open(config) // OpenBuilderが返る(Builderパターン)、必要に応じてメソッドチェーンで設定を加える。
.res() // builderから設定済みのSessionを取得
.await // ここまで非同期なので待つ
.unwrap(); // Optionで返ってくるので中身を取り出す。(実用の際はちゃんと処理してね)
let subscriber = session.declare_subscriber(&key_expr) // key_expr(トピック名)でSubscriberを作成。ここもBuilderが返る
.res().await.unwrap(); // 上記と同様、設定済みのインスタンスを得て、待って、取り出す。
loop {
select!(
sample = subscriber.recv_async() => { // メッセージ待ちのループ。届くとsampleにメッセージが入る。
let sample = sample.unwrap(); // 取り出す
println!(">> [Subscriber] Received {} ('{}': '{}')",
sample.kind, sample.key_expr.as_str(), sample.value); // メッセージの種類、トピック名、値をそれぞれ取り出して出力
}
);
}
続いてz_pub.rs
let session = zenoh::open(config).res().await.unwrap(); // 上記と同様にSessionを作成。
let publisher = session.declare_publisher(&key_expr).res().await.unwrap(); // key_exprトピックのPublisherを作成
for idx in 0..u32::MAX {
sleep(Duration::from_secs(1)).await; // 1秒寝る
let buf = format!("[{:4}] {}", idx, value); // インデックスと送りたいメッセージをいい感じに整形
publisher.put(buf).res().await.unwrap(); // publish!
}
サンプルの実行
サンプル類は/target/release/examples/に配置されています。
ターミナルを二枚立てて、片方でSub、もう片方でPubしてみましょう。
./target/release/examples/z_sub
./target/release/examples/z_pub
z_pubが送ったメッセージがz_subにも届いたでしょうか?
z_pubは-v オプションで送るメッセージを変更できます。z_sub, z_pubともに-kオプションでトピック名を変更できます。
Router経由での通信
次にRouter経由での通信を試してみましょう。3地点が必要となるので、適当なクラウドVMなどを借りてください。
Routerはバイナリを落としてくるだけでいいです。githubのReleaseより、
先ほどgit cloneで指定したタグに対応するパッケージをダウンロードしてください。
今回はAzureVMのUbuntu上に置くのでzenoh-0.7.0-rc-x86_64-unknown-linux-gnu.zip
を選択しました。適宜OSに合わせて選択してください。
mkdir zenoh-0.7.0-rc
cd zenoh-0.7.0-rc
wget https://github.com/eclipse-zenoh/zenoh/releases/download/0.7.0-rc/zenoh-0.7.0-rc-x86_64-unknown-linux-gnu.zip
unzip zenoh-0.7.0-rc-x86_64-unknown-linux-gnu.zip
./zenohd
これでrouterが立ち上がった状態になります。が、TCP:7447ポートの開放が必要となります。適宜設定してください。
今回は仮にこのサーバにIPアドレス 200.200.200.200 が与えられたものとして説明を進めます。
次にPC1でz_subを起動します。オプションでRouterのIPとポートを指定します。
./target/release/examples/z_sub -e tcp/200.200.200.200:7447
同様にPC2でもz_pubを立ち上げます。
./target/release/examples/z_pub -e tcp/200.200.200.200:7447
異なるPC間でPub/Subできたでしょうか?
routerだけGlobal IPがあれば、ClientはLocal Net内でも問題なく通信できます。
新規プロジェクトでpubsub
続いて、自身のプロジェクトでZenohを導入する方法について見ていきます。
まずcargo newでRustプロジェクトを作成し、Cargo.tomlにzenoh crateを追加します。
cargo new my_first_zenoh_project
cd my_first_zenoh_project
[package]
name = "zenoh_sample"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-std = "=1.12.0" # <=この行を追加
zenoh = "0.7.0-rc" # <=この行を追加。バージョンは適宜揃える。
今回は単発のメッセージを投げるプログラムにしてみます。Sub側は先ほどのz_subを利用します。
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
#[async_std::main]
async fn main() {
let session = zenoh::open(Config::default()).res().await.unwrap();
let key_expr = "hello/zenoh".to_string();
let publisher = session.declare_publisher(&key_expr).res().await.unwrap();
publisher.put("hello zenoh!").res().await.unwrap();
}
書き終わったら、zenoh_sampleのルートフォルダでビルドしてください。
cargo build
それでは実行してみましょう。z_subは先ほどのzenohのプロジェクトの方でターミナルを開いてください。
./target/release/examples/z_sub -k "hello/zenoh"
./target/debug/zenoh_sample
ちゃんとメッセージは届いたでしょうか?
おわりに
駆け足でしたがzenohについて紹介してみました。
まだ発展途上ではあるものの、個人的にはとても期待できるライブラリではないかと思っています。
特に私の生息している組込み界隈とも親和性が高いのはワクワクしますね。
ROS 2との連携については以前こんなの記事も書いてるので、良かったら参照ください。
皆さんの選択の一助になれば幸いです。ぜひzenohで遊んでみてください!