4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ZenohAdvent Calendar 2024

Day 15

とりあえずzenohでpubsubしてみる(Rust, Python, C++)

Last updated at Posted at 2024-12-14

あ、どうも

いかこうです。
初投稿です。

zenohのアドベントカレンダーが開催されるということで参加せさていただきました。
僕自身は、zenohバリバリ使ってるよ!!みたいな人ではないのですが、zenohにはかなり興味を持っているので何か日本のzenohコミュニティに貢献したいなと思いこの記事を書きました。

まずは、一番基本的なpubsub通信に関する内容を取り上げようと思います。

この記事が、僕を含めたzenohワカラナイ人たちの助けになることを期待しています。

zenohってなんゼノー??????(極寒)

zenohの開発元であるZettaScaleはzenohを以下のように解説しています。
「Zenoh、静的データ、動的データおよびコンピューティングを統合する Pub/Sub/Query プロトコルです。従来のPub/Subを地域分散ストレージやクエリ、演算(コンピューティング)とエレガントに組み合わせながらも、どのメインストリームスタックよりもはるかに高いレベルの時間とスペースの効率を保持しています。」

いかこう的にまとめるとこんな感じ。
「ロボティクスやIoT分野で使われる通信プロトコルをいい感じにまとめたよ〜」

他にもzenohには以下のような特徴があります。

  • 多様な通信方式のサポート: メッシュ、peer2peer、ルーテッド、ブローカード
  • 結構早い
  • 多様なネットワークレイヤ: TCP、UDP、Ethernet、Bluetooth
  • 組み込みデバイスへの対応(zenoh-pico)
  • 多様な言語対応: C, C++, Rust, Python, Elixirなど

この人の記事も非常にわかりやすいです。こっちの方が表があったりして直感的かも
https://qiita.com/Shintaro_Hosoai/items/0bde489cde43a00d6f96


もうzenohだけやっとけば何でも通信できんじゃね!?

注意: zenohにはバージョンがあり、場合によっては互換性がなかったりプログラムが全然違ってたりするので、できる限り揃えることをおすすめします。
現状では、1.0.3、1.0.4では互換性があることが確認できています。

Rustでzenoh

細かいことは置いといてzenohを使ってみましょう!!まずはRustから!!!
githubリポジトリ

注意: いかこうはRustナニモワカラナイ人です

当たり前ですが、Rust環境はインストールしておいてください。

1. インストール

ここでは、zenoh v1.0.3を使用しました。

Linux Debian
echo "deb [trusted=yes] https://download.eclipse.org/zenoh/debian-repo/ /" | sudo tee -a /etc/apt/sources.list.d/zenoh.list > /dev/null
sudo apt update
sudo apt install zenoh
Mac
brew tap eclipse-zenoh/homebrew-zenoh
brew install zenoh

マニュアルインストールはこのページからzipファイルをダウンロード
この場合は以下のコマンドでビルドする必要があります。

ビルドコマンド
cd zenoh                            # zenohフォルダに移動
rustup update                       # Rustのツールチェーンをアップデート
cargo build --release --all-targets # ビルド

2. プロジェクトの作成

まず、Rustのプロジェクトを作成します。

cargo new zenoh_pubsub

Rustでは、実行ファイルを複数作成する場合はbin/にまとめる慣習?があるらしいのでこれに則って以下のようなフォルダ構成にします。

zenoh_pubsub/
├── src/
│   └── bin/
│       ├── pub.rs
│       └── sub.rs
├── target/
├── Cargo.lock
└── Cargo.toml

Cargo.tomlを編集して使用するクレートを追加します。Rustで使うパッケージはクレートって呼ぶらしい。

Cargo.toml
[package]
name = "zenoh_pubsub"
version = "0.1.0"
edition = "2021"

[dependencies]
zenoh = "1.0.3"
tokio = { version = "1", features = ["full"] }

3. pubsub通信

簡単なパブリッシャーとサブスクライバーを書いてみましょう!!

pub.rs
use std::thread;
use std::time::Duration;

#[tokio::main]
async fn main(){
  let session = zenoh::open(zenoh::config::Config::default()).await.unwrap(); // zenohセッションの開始
  let keyexpr = "test".to_string(); // keyを定義
  let publisher = session.declare_publisher(&keyexpr).await.unwrap(); // パブリッシャーの定義
  let msg = "Hello, World"; // 送信データ
  loop{
    publisher.put(msg).await.unwrap(); // データのパブリッシュ
    println!("published: {:?}", msg);
    thread::sleep(Duration::from_secs(1)); // 1秒間スリープ
  }
}
sub.rs
#[tokio::main]
async fn main(){
  let session = zenoh::open(zenoh::config::Config::default()).await.unwrap(); // zenohセッションの開始
  let keyexpr = "test".to_string(); // keyの定義
  let subscriber = session.declare_subscriber(&keyexpr).await.unwrap(); // サブスクライバーの定義
  while let Ok(sample) = subscriber.recv() {
    println!("Received: {:?}", sample); // 受信データを表示
    let key = sample.key_expr().to_string(); // 受信データからkeyを取得
    let payload = sample.payload().try_to_string().unwrap_or_else(|e| e.to_string().into()); // 受信データからメインデータを取得
    println!("key: {}", key);
    println!("payload: {}", payload);
  }
}

ではいごかします。プロジェクトのルートディレクトリで

cargo build

問題なくビルドが完了したら、ターミナルを二つ開いてそれぞれのプログラムを実行します。
二つのプログラム間で通信していることが確認できると思います。

ターミナル1
$ cargo run --bin pub
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 1.03s
     Running `target/debug/pub`
published: "Hello, World"
published: "Hello, World"
published: "Hello, World"
published: "Hello, World"
ターミナル2
$ cargo run --bin sub
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.37s
     Running `target/debug/sub`
Received: Sample { key_expr: ke`test`, payload: ZBytes(ZBuf { slices: [[48, 65, 6c, 6c, 6f, 2c, 20, 57, 6f, 72, 6c, 64]] }), kind: Put, encoding: Encoding(Encoding { id: 0, schema: None }), timestamp: None, qos: QoS { inner: QoS { priority: Data, congestion: Drop, express: false } }, attachment: None }
key: test
payload: Hello, World
Received: Sample { key_expr: ke`test`, payload: ZBytes(ZBuf { slices: [[48, 65, 6c, 6c, 6f, 2c, 20, 57, 6f, 72, 6c, 64]] }), kind: Put, encoding: Encoding(Encoding { id: 0, schema: None }), timestamp: None, qos: QoS { inner: QoS { priority: Data, congestion: Drop, express: false } }, attachment: None }

Pythonでzenoh

次は同じくzenoh v1.0.3をpythonで書いていきます。
githubリポジトリ

1. インストール

pythonでは二つ方法があります。どちらも事前に仮装環境を作っておくことを推奨します。

  1. pip install
    普通にpip installで入ります

    非推奨
    pip install eclipse-zenoh
    

    しかし、このままでは最新のバージョンがインストールされてしまうので、バージョンを指定した方が良いでしょう

    推奨
    $ pip index versions eclipse-zenoh # インストール可能なバージョンの確認
    eclipse-zenoh (1.0.4)
    Available versions: 1.0.4, 1.0.3, 1.0.2, 1.0.1, 1.0.0, 0.11.0
        INSTALLED: 1.0.3
        LATEST:    1.0.4
    $ pip install eclipse-zenoh==1.0.3 # バージョン指定でインストール
    
  2. マニュアルインストール
    git cloneしてビルドする方法です。Rust環境が必要です。

    git clone -b 1.0.3 https://github.com/eclipse-zenoh/zenoh-python.git # バージョン1.0.3でクローン
    cd zenoh-python
    rustup update
    pip install -r requirements-dev.txt
    which maturin # maturinていうビルドツールのパスを取得
    export PATH=$PATH:<maturinのパス> # ↑で取得したパスを環境変数に追加する(ここで間違えて上書きしちゃって大変なことになった)
    maturin develop --release
    

2. pubsub通信

以下にpubsubプログラムを示します。

pub.py
import time
import zenoh


def main():
  conf = zenoh.Config()  # zenohの設定
  session = zenoh.open(conf)  # zenohセッションの開始
  key = "hoge"  # データを判別するkeyの定義
  pub = session.declare_publisher(key)  # パブリッシャーの定義
  interval = 1  # 1秒間隔でパブリッシュ
  cnt = 0
  while True:
    msg = "msg from '{key}'. cnt={cnt}".format(key=key, cnt=cnt)  # 送信データの作成
    pub.put(msg)  # データをパブリッシュ
    print(f"Published!! msg={msg}")
    time.sleep(interval)
    cnt += 1


if __name__ == "__main__":
  main()
sub.py
import zenoh


def listener(sample):
  """
  サブスクライバのコールバック関数
  """
  # print(f"sample info: {sample=}") # sampleの構造を表示

  key = sample.key_expr  # キーの部分を取得
  payload = sample.payload.to_string()  # データの部分を取得
  print(f"Received '{payload}' from '{key}'")


def main():
  conf = zenoh.Config()  # zenohの設定
  session = zenoh.open(conf)  # zenohセッションの開始

  key = "hoge"  # サブスクライブするキーの定義
  sub = session.declare_subscriber(key, listener)  # キーを指定してサブスクライブ
  while True:
    pass


if __name__ == "__main__":
  main()

あとはそれぞれ実行するだけです。pythonは簡単でいいですね

ターミナル1
$ python pub.py
Published!! msg=msg from 'hoge'. cnt=0
Published!! msg=msg from 'hoge'. cnt=1
Published!! msg=msg from 'hoge'. cnt=2
Published!! msg=msg from 'hoge'. cnt=3
Published!! msg=msg from 'hoge'. cnt=4
Published!! msg=msg from 'hoge'. cnt=5
Published!! msg=msg from 'hoge'. cnt=6
Published!! msg=msg from 'hoge'. cnt=7
Published!! msg=msg from 'hoge'. cnt=8
Published!! msg=msg from 'hoge'. cnt=9
ターミナル2
$ python sub.py
Received 'msg from 'hoge'. cnt=7' from 'hoge'
Received 'msg from 'hoge'. cnt=8' from 'hoge'
Received 'msg from 'hoge'. cnt=9' from 'hoge'
Received 'msg from 'hoge'. cnt=10' from 'hoge'
Received 'msg from 'hoge'. cnt=11' from 'hoge'
Received 'msg from 'hoge'. cnt=12' from 'hoge'
Received 'msg from 'hoge'. cnt=13' from 'hoge'
Received 'msg from 'hoge'. cnt=14' from 'hoge'

C++でzenoh

c++が一番めんどかったです。zenoh v1.0.4で動作確認しました。
githubリポジトリ

1. インストール

git cloneしてビルドします。

zenoh-cppのクローン
git clone --recursive -b 1.0.4 https://github.com/eclipse-zenoh/zenoh-cpp.git
cd zenoh-cpp
mkdir build

ここで、READMEによると
「By default it is expected that you have zenoh-c installed. If you want to install for zenoh-pico backend or for both (or to not specify any backend), please set ZENOHCXX_ZENOHC or ZENOHCXX_ZENOHPICO Cmake variables toON or OFF accordingly. Notice that at least one of the backends is required for using the library and/or building tests and examples.

Use option CMAKE_INSTALL_PREFIX for specifying installation location. Without this parameter installation is performed to default system location /usr/local which requires root privileges.」

デフォルトでは zenoh-c がインストールされているものとします。zenoh-pico バックエンドまたはその両方をインストールしたい場合 (またはバックエンドを指定したくない場合) は、ZENOHCXX_ZENOHC または ZENOHCXX_ZENOHPICO Cmake 変数を適宜 ON または OFF に設定してください。ライブラリを使用したり、テストやサンプルをビルドしたりするには、少なくともどちらかのバックエンドが必要です。
インストール場所を指定するには CMAKE_INSTALL_PREFIX オプションを使用します。このパラメータを指定しないと、インストールはデフォルトのシステム・ロケーション /usr/local に行われ、root 権限が必要になります。(DeepL翻訳)

なんかバックエンドってのを設定してビルドするみたいですね。
どうやら、zenoh-cppのなかでzenoh-cを使用してるやつとzenoh-picoを使用してるやつがあるっぽい??(間違ってるかも)
ってことで、zenoh-cのビルドもした方が良さそうです。zenoh-czenoh-cppのリポジトリに含まれてます。

zenoh-cのビルド
cd zenoh-c
rustup update
mkdir build
cd build
cmake ..
cmake --build . --config Release
cmake --build . --target install

zenoh-cppに戻ってビルドします。今回はバックエンドにzenoh-cを使用してビルドします。

zenoh-cppのビルド
cd build
cmake .. -DZENOHCXX_ZENOHC=ON -DZENOHCXX_ZENOHPICO=OFF  -DCMAKE_INSTALL_PREFIX=~/.local
cmake --install .
cmake --build . --target examples # サンプルコードのビルド

2. 動作確認

色々ビルドしたので、プログラムを書く前にちゃんと動作するか確認したくなります。
サンプルコードの実行ファイルはzenoh-cpp/build/examples/zenohcにあります。

パブリッシャーのデモ
./z_pub test # keyをtestでパブリッシュ
サブスクライバーのデモ
./z_sub test # testをサブスクライブ

3. pubsub通信

動作確認ができたら早速プログラムを書いていきましょう。
フォルダ構成はこんな感じ

zenoh_cpp_test/
├── build/
├── include/
├── lib/
│   └── zenoh-cpp/
├── src/
│   ├── pub.cpp
│   └── sub.cpp
└── CMakeLists.txt
pub.cpp
#include <iostream>
#include <thread>
#include "zenoh.hxx"
using namespace zenoh;

int main(int argc, char **argv)
{
  Config config = Config::create_default();
  auto session = Session::open(std::move(config));
  std::string key = "test";
  std::string payload = "Hello, World";
  auto publisher = session.declare_publisher(KeyExpr(key.c_str()));
  while (1)
  {
    publisher.put(payload.c_str());
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  }
}
sub.cpp
#include <iostream>
#include "zenoh.hxx"
using namespace zenoh;

void listener(const Sample &sample)
{
  std::cout << "Received: " << sample.get_payload().as_string() << std::endl;
}

int main(int argc, char **argv)
{
  Config config = Config::create_default();
  auto session = Session::open(std::move(config));
  std::string key = "test";
  auto subscriber = session.declare_subscriber(KeyExpr(key.c_str()), &listener, closures::none);
  while (1)
  {
    continue;
  }
}
CMakeLists.txt
cmake_minimum_required(VERSION 3.5)
project(zenoh_cpp_test)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(PUBLISHER_EXECUTABLE_NAME pub)
set(SUBSCRIBER_EXECUTABLE_NAME sub)

find_package(zenohc REQUIRED)
find_package(zenohcxx REQUIRED)

# 実行ファイルの生成
add_executable(${PUBLISHER_EXECUTABLE_NAME} src/pub.cpp)
add_executable(${SUBSCRIBER_EXECUTABLE_NAME} src/sub.cpp)

# ライブラリのリンク
target_link_libraries(${PUBLISHER_EXECUTABLE_NAME} zenohcxx::zenohc)
target_link_libraries(${SUBSCRIBER_EXECUTABLE_NAME} zenohcxx::zenohc)

cmakeビルドです。

mkdir build
cd build
cmake ..
make

ビルドに成功するとbuild/ないにpubsubが実行ファイルとして作成されるはずです。

ターミナル1
./pub
ターミナル2
$ ./sub
Received: Hello, World
Received: Hello, World
Received: Hello, World
Received: Hello, World
Received: Hello, World

おわりに

読みずらい文章を最後まで読んでいただきありがとうございます。ここまで読んでくれたあなたはいい人です。
zenoh-cppで結構苦労しましたが、動いてよかったです。そして、アドベントカレンダーの期日に間に合いそうで一安心です。
qiitaの記事を書くのはこれが初めてですが、結構楽しかったです。これからも積極的に書いていこうと思います。
ここで取り上げた内容はzenohの本の一部なので自分が理解でき次第、他の内容も取り上げたいです。

zenohこれからめちゃくちゃキそうなのでみんなで使っていきましょう

参考文献・参考になる文献

4
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?