4
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

PythonAdvent Calendar 2023

Day 9

pub/sub通信ライブラリZenohを試す

Last updated at Posted at 2023-12-08

1.はじめに

 Zenohは通信プロトコル/ミドルウェアの一つで、出版購読型の通信方式に加え、キーバリューストア型の分散クエリの機能も提供しています。
 2023年末に、ZettaScale CEO/CTOのAngelo Corsaro さんが来日されるタイミングで、セミナーが開かれます。

 C/C++やPythonなどのAPIライブラリも提供されていますので、予習を兼ねて、PythonでZenohによるpub/sub通信を試してみました。

2.環境

  • Ubuntu Linux 22.04
  • Python 3.10
  • poetry で仮想環境

3.考え方

こちらのサンプルコードは、仮想的に温度を測定し、出版、購読するサンプルです。
購読・出版は1キーワード(key expressions)です。

今回の記事では少し拡張して、2つのkey expressionsを用いて、仮想的な温度と湿度を購読したり出版したりするサンプルを作ってみました。

  • z_publisher.pyは2つのkey expressionsを出版します
    • myhome/kitchen/tempmyhome/kitchen/humiです
  • z_subscriber.pyは2つのkey expressionsを購読します
    • key expressionsそれぞれを購読する専用のハンドラ(listener_temp、listener_humi)
    • 両方とも購読するハンドラ(listener)

4.ソースコード

poetryでPython環境を作ります。

$ cd ~

# poetry環境を作る
$ poetry new zenohpy
$ cd zenohpy

# zenohパッケージをインストール
$ poetry add eclipse-zenoh

# 空のファイルを作成
$ touch zenohpy/z_publisher.py zenohpy/z_subscriber.py

# 実行権限を付与(poetry runするときに実行権限が必要)
$ chmod 755 zenohpy/z_publisher.py zenohpy/z_subscriber.py

publisher側

zenohpy/z_publisher.py
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
"""publisher
参考:https://zenoh.io/docs/getting-started/first-app/
"""
import zenoh, random, time, datetime
from typing import Tuple

random.seed()


def gen_value() -> Tuple[float, float]:
    """ランダムに値を生成

    Returns:
        Tuple[float, float]: 温度、湿度
    """
    return (random.randint(150, 300) / 10, random.randint(400, 600) / 10)


if __name__ == "__main__":
    try:
        print(f"-- zenoh sample - publisher --")

        session = zenoh.open()

        # key expressionsごとにpublisherを生成
        key_temp = "myhome/kitchen/temp"
        pub_temp = session.declare_publisher(key_temp)

        key_humi = "myhome/kitchen/humi"
        pub_humi = session.declare_publisher(key_humi)

        while True:
            # ダミーの温度・湿度の値を生成
            (t, h) = gen_value()
            dt_now = datetime.datetime.now().isoformat()

            # 温度をpublish
            print(f"Putting Data ('{key_temp}': '{t}' / {dt_now})...")
            pub_temp.put(t)

            # 湿度をpublish
            print(f"Putting Data ('{key_humi}': '{h}' / {dt_now})...")
            pub_humi.put(h)

            # 一秒待機
            time.sleep(1)

    except KeyboardInterrupt:
        # [Ctrl-C]が押された
        print("SIGINT - Exit")
    except:
        # 例外発生時にメッセージ
        import traceback

        traceback.print_exc()
    finally:
        pass

subscriber側

zenohpy/z_subscriber.py
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
"""subscriber
参考:https://zenoh.io/docs/getting-started/first-app/
"""
import zenoh


def listener_temp(sample):
    """受信・温度"""
    print(
        f"Received {sample.kind} ('{sample.key_expr}': '{sample.payload.decode('utf-8')}'℃)"
    )


def listener_humi(sample):
    """受信・湿度"""
    print(
        f"Received {sample.encoding} ('{sample.key_expr}': '{sample.payload.decode('utf-8')}'%)"
    )


def listener(sample):
    """受信・すべて"""
    str = f"Received all: {sample.encoding},{sample.key_expr},{sample.kind},{sample.payload},{sample.timestamp},{sample.value}"
    print(str)


if __name__ == "__main__":
    try:
        print(f"-- zenoh sample - subscriber --")

        session = zenoh.open()

        # 受信したkey expressionsごとに、実行する関数を紐づけ
        sub1 = session.declare_subscriber("myhome/kitchen/temp", listener_temp)
        sub2 = session.declare_subscriber("myhome/kitchen/humi", listener_humi)
        sub3 = session.declare_subscriber("myhome/kitchen/*", listener)
        # ここで受信待機状態になる
        # time.sleep(60) ←書いても書かなくても良さそう
    except KeyboardInterrupt:
        # [Ctrl-C]が押された
        print("SIGINT - Exit")
    except:
        # 例外発生時にメッセージ
        import traceback

        traceback.print_exc()
    finally:
        pass

5.実行例

ターミナルを2つ開いて、それぞれ実行します。

ターミナル1・発行用
$ poetry run python ./zenoh/z_publisher.py 

-- zenoh sample - publisher --
Putting Data ('myhome/kitchen/temp': '22.5' / 2023-12-03T15:36:12.010331)...
Putting Data ('myhome/kitchen/humi': '41.3' / 2023-12-03T15:36:12.010331)...
^CSIGINT - Exit

2つのkey expressionsをPublishします

  • myhome/kitchen/temp: 22.5
  • myhome/kitchen/humi: 41.3
ターミナル2・購読用
$ poetry run python ./zenoh/z_subscriber.py 

-- zenoh sample - subscriber --
Received all: application/float,myhome/kitchen/temp,PUT,b'22.5',None,<zenoh.value.Value object at 0x7fe2d6d17240>
Received PUT ('myhome/kitchen/temp': '22.5'℃)
Received all: application/float,myhome/kitchen/humi,PUT,b'41.3',None,<zenoh.value.Value object at 0x7fe2d6d17240>
Received PUT ('myhome/kitchen/humi': '41.3'%)

2つのkey expressionsを購読します

key expressions myhome/kitchen/temp:22.5 myhome/kitchen/humi: 41.3
ハンドラlistener_temp
ハンドラlistener_humi
ハンドラlistener

このように、簡単に2つのPub/Subを作る事ができました。

6.参考資料

4
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?