はじめに
この記事はZenohアドベントカレンダーの23日目の記事です。
今回は画像データの送受信を試してみたので簡単にまとめます。
環境
以下の環境で動作を確認しています。
WindowsとWSL間でも通信できることも確認しました。
項目 | バージョン |
---|---|
Windows | 11 |
WSL | 2 |
Ubuntu | 22.04 |
Python | 3.10.12 |
画像データの送受信
データの送受信についてですが、Zenohのセッションの開始時やpublish、subscribe時にはデータ型は指定しません。これはMQTTやTCP/UDP通信などと一緒で、ROSとは異なりますね。
そのため、送信するには画像データをエンコード、受信時にはデコードする必要があります。
画像ファイルを送受信
送信側のコード
OpenCVを使って画像データをエンコードすることできます。
import zenoh
import cv2
# Zenohセッションを作成
z = zenoh.open(zenoh.Config())
key = "example/image"
# 画像をエンコードして送信
image = cv2.imread("sample.png")
_, encoded_image = cv2.imencode(".jpg", image)
binary_data = encoded_image.tobytes()
# Zenohでパブリッシュ
z.put(key, binary_data)
次にsubscriber側に簡単なUIを追加してリアルタイムにデータを分かるようにしておきましょう。
import zenoh
from PIL import Image, ImageTk
import tkinter as tk
import io
def main():
def _on_image_received(sample):
"""Zenohから受信した画像をTkinterウィンドウに表示する"""
# バイナリデータをPillow画像形式にデコードして変換
binary_data = bytes(sample.payload)
image = Image.open(io.BytesIO(binary_data))
image_for_ui = ImageTk.PhotoImage(image)
# ラベルに画像を設定
image_label.config(image=image_for_ui)
image_label.image = image_for_ui
print("画像を更新しました")
# Tkinterのウィンドウ作成
root = tk.Tk()
root.geometry("800x600")
root.title("Zenoh Screen Receiver")
image_label = tk.Label(root)
image_label.pack()
# Zenohセッションを作成
session = zenoh.open(zenoh.Config())
key_expr = "example/image"
# Zenohでサブスクライブ
subscriber = session.declare_subscriber(key_expr, _on_image_received)
print("画像の受信を待っています...")
root.mainloop()
subscriber.close()
session.close()
if __name__ == "__main__":
main()
スクリーンショット画像を送信
以下のコードで、冒頭のツイートの動画にあったようなUIが起動します。
ここでもtkinterのUIを作成して、その座標をImageGrab.grab()
の引数として渡しています。
publihserとsubscriberでsessionのkeyを合わせましょう!以下では先ほどの例と区別するためにexample/selection
としています。
import tkinter as tk
import time
from PIL import ImageGrab
import zenoh
import io
def main():
session = zenoh.open(zenoh.Config())
key = "example/selection"
# Tkinterのウィンドウ作成
root = tk.Tk()
root.geometry("800x600")
root.title("Zenoh Screen Transfer")
def _on_configure_let_filler_track(_):
filler.geometry(
f"{root.winfo_width()}x{root.winfo_height()}+{root.winfo_rootx()}+{root.winfo_rooty()}"
)
filler.lower(root)
def _capture_canvas_area():
x1 = root.winfo_rootx()
x2 = root.winfo_rootx() + root.winfo_width()
y1 = root.winfo_rooty()
y2 = root.winfo_rooty() + root.winfo_height()
screenshot = ImageGrab.grab(bbox=(x1, y1, x2, y2))
# JPEG形式でエンコードして送信
buffer = io.BytesIO()
screenshot.save(buffer, format="JPEG", quality=85) # 画像品質を指定
session.put(key, buffer.getvalue())
filler = tk.Toplevel(root)
filler.overrideredirect(True)
filler.transient(root)
filler.wm_attributes("-alpha", 0.0019607843138)
# 半透明属性の設定
root.configure(bg="white")
root.wm_attributes("-transparentcolor", "white")
canvas = tk.Canvas(root, bg="white")
canvas.pack(fill=tk.BOTH, expand=True)
root.bind("<Configure>", _on_configure_let_filler_track)
while True:
root.update()
_capture_canvas_area()
time.sleep(0.01)
また、エンコードに関してはスクリーンショットの保存時にio.BytesIO()のバイトデータを指定します。そのデータをそのままpublishすることができます。
# JPEG形式でエンコードして送信
buffer = io.BytesIO()
screenshot.save(buffer, format="JPEG", quality=85) # 画像品質を指定
session.put(key, buffer.getvalue())
おわりに
簡単ですが以上です!
コントローラも作っていたのでこれとあわせてロボットの操縦画面作れそう。
参考