こんにちは!この記事では、 株式会社Octa Roboticsが開発・提供している ロボット・設備間連携に特化したマルチベンダー型のインターフェースサービス「LCI」 について、ロボットからの利用方法を連載形式で紹介 していきます。
前回の記事では、「LCI」を利用するために必要な 開発用ロボットアカウントの発行手順とサンプルコードの入手 について紹介しました。
今回は、株式会社Octa Roboticsから提供されているサンプルコードを使用して、 MQTTのブローカーである「Octa Link」に接続 していきたいと思います。
接続準備
サンプルコードのライブラリ確認
まずは、サンプルコードの先頭行にあるライブラリを確認します。
サンプルコード自体は Python で書かれているようです。
import paho.mqtt.client as mqtt
import time
import ruamel.yaml
import json
import sys
import logging
from enum import Enum
import queue
import glob
import ssl
import threading
それぞれのライブラリについて調べると、以下の機能を持っていることが分かりました。
ライブラリ/モジュール名 | 説明 |
---|---|
paho.mqtt.client | MQTT(Message Queuing Telemetry Transport)プロトコルを使って、ブローカーに接続し、メッセージの送受信やトピックの購読を行うためのクライアントクラス。 |
ruamel.yaml | Python用の YAML 1.2 ローダー/ダンパーライブラリ。構造化されたデータの読み書きをサポートする。 |
time | 時間の取得、遅延処理、タイムスタンプ変換など、時間に関連する処理を行うための標準ライブラリ。 |
json | JSON(JavaScript Object Notation)形式のデータを扱うための標準ライブラリ。データのエンコード・デコードを行う |
sys | Pythonインタプリタや実行環境に関する情報や機能へアクセスするための標準ライブラリ |
logging | プログラムの実行時情報(ログ)を記録・出力するための標準ライブラリ。デバッグや運用監視に利用される。 |
enum | 列挙型(Enum)を定義するための標準ライブラリ。定数をわかりやすく整理して扱うことができる。 |
queue | スレッド間で安全にデータを受け渡すためのキュー(待ち行列)を提供する標準ライブラリ。マルチスレッド環境に適している。 |
glob | ワイルドカードを使ったファイル名のパターンマッチングを行うための標準ライブラリ。ファイル検索に利用される |
ssl | SSL/TLS(Secure Sockets Layer / Transport Layer Security)通信を行うための標準ライブラリ。暗号化通信の構築に使用される。 |
threading | 複数のスレッド(並列処理)を用いた同時実行を可能にする標準ライブラリ。並行処理を実現できる。 |
ライブラリのインストール
調べたライブラリの中から、必要なライブラリをインストールしていきます。
paho.mqtt.client バージョン 1.6.1
pip3 install paho-mqtt==1.6.1
ruamel.yaml 最新バージョン
※2025/05/27現在
pip3 install ruamel.yaml
最後に、インストールが正しく行われたか pip list
で確認します。
xxx xxx % pip list
Package Version
----------- -------
paho-mqtt 1.6.1
pip 24.3.1
ruamel.yaml 0.18.10
無事、ライブラリのインストールができました。
MQTTブローカーに接続
サンプルコードlci.client.py
を実行して、MQTTのブローカーである「Octa Link」へ接続を行なってみましょう。
「Octa Link」は、株式会社Octa Roboticsが提供しているクラウドサーバーです。
動作環境
- MacBook Pro AppleM1
- macOS Sequoia バージョン15.4.1
- python 3.13.1
ブローカーへの接続確認
ブローカーに接続ができたかどうかを判断するためには、def on_connect
で記載されているコールバック関数(ブローカーに接続されたら呼び出される関数)の引数である rc(return code) の値を確認します。
rcが0の場合は接続成功で、それ以外の場合は接続失敗 になります。
コールバック関数の設定
self.mqtt_client.on_connect = on_connect
コールバック関数
def on_connect(client, userdata, flags, rc):
userdata: LciClient = userdata
#ここで引数rcの値を出力
userdata.logger.info('Connected with result code {0}'.format(rc))
userdata.mqtt_client.subscribe([
(userdata.topic_prefix + '/RegistrationResult/' + userdata.robot_id, 1),
(userdata.topic_prefix + '/+/RegistrationResult/' + userdata.robot_id, 1),
(userdata.topic_prefix + '/+/CallElevatorResult/' + userdata.robot_id, 1),
(userdata.topic_prefix + '/+/ElevatorStatus/' + userdata.robot_id, 1),
(userdata.topic_prefix + '/+/RobotStatusResult/' + userdata.robot_id, 1),
(userdata.topic_prefix + '/+/ReleaseResult/' + userdata.robot_id, 1)
])
# For communication with simulator
userdata.mqtt_client.subscribe([
(userdata.topic_prefix + '/+/SimulatorControlResult/' + userdata.robot_id, 1)
])
プログラムの実行
実行コマンド
コードの実行時は、コマンドの第1引数に 設定ファイル(server_config_simulator.yamlなど)のパス、第2引数に証明書(.pemなど)が格納されているディレクトリのパス を指定します。
今回は、設定ファイルに server_config_simulator-1-1.yaml を使用します。
python3 lci_client.py ../../xxx/server_config_simulator-1-1.yaml ../../xxx/<cert_dir>
これらは、開発用ロボットアカウント発行時にダウンロードしたファイル等になります。
server_config_simulator.yaml には bldg_id単位の情報(1施設における全設備情報) が書かれています。
これを使用する場合は、 サンプルコードのロード部分(yamlファイルから情報を引き出す箇所)を変更する必要性 があります。
実行結果
ターミナルで上記のコマンドを実行します。
xxx % python3 lci_client.py ../../xxx/server_config_simulator-1-1.yaml ../../xxx/<cert_dir>
2025-05-27 16:53:57,986 DEBUG Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b'_uClosc2'
2025-05-27 16:53:58,076 DEBUG Received CONNACK (0, 0)
2025-05-27 16:53:58,077 INFO Connected with result code 0
...
ターミナルを確認すると、 「INFO Conncted with result code 0」 となっていますので、無事ブローカーに接続することができました!
MQTTの解説
サンプルコードで利用している通信プロトコル「MQTT」について、調べてみましょう。
MQTTとは?
MQTT(Message Queuing Telemetry Transport)は、 Publish(送信)/Subscribe(受信)パターンのメッセージングにより、非同期に1対多の通信をすることが可能なメッセージングプロトコル です。
特徴としては、以下が挙げられます。
主な特徴:
- 軽量でIoTや組み込み機器に最適
- 多数のデバイスが利用可能
- 非同期・低遅延なメッセージ通信
- 双方向通信が可能
- QoS(通信品質)のレベル設定が可能
IoT等のリアルタイム通信などに使われているようです。
Publish/Subscribeパターンとは?
MQTTでは、 メッセージを送る側(Publisher) と、受け取る側(Subscriber) 、中継役として Broker(ブローカー) が存在します。
通信の流れとしては、
1. 接続:クライアント(PublisherまたはSubscriber)がBrokerに接続
2. 購読:SubscriberがBrokerに対して特定のTopicをSubscribe
3. 公開:PublisherがBrokerに対してTopic付きのメッセージをPublish
4. 配信:Brokerが該当するSubscriberにメッセージを転送
になります。
ROS2(Robot Operating System 2)に使われている DDS(Data Distribution Service)と似た仕組み をもっているようです。
「LCI」の場合だと、 PublisherまたはSubscriberがロボットや「LCI Box(エレベーター連携)」等の専用通信端末、BrokerがOcta Robotics社のクラウドサーバー「Octa Link」 になります。
トピック(Topic)とは?
MQTTでは、 メッセージの宛先を「Topic」として表現 しています。
例えば、
aaa/bbb/ccc
のようなもので、「階層構造(スラッシュ区切り)」になっています。
LCIでは、
/lci/<bldg_id>/<bank_id>/<elevator_id>/Registration/<robot_id>
といった形で、「/」始まりでTopicを定義しています。
上記のトピックは、ロボット・エレベータ間での連携を行う際に使用するもので、 bldg_id
は施設の識別用ID、bank_id
は「バンク」の識別用ID、elevator_id
はエレベーターの識別用ID、robot_id
は接続するロボットの識別用IDを示しています。
同じシャフト(昇降路)や近接した場所に設置され、同じ目的で運行される複数のエレベータをまとめたグループのことを「バンク」と呼びます。
補足
ポート番号とは?
ポート番号は、コンピュータで 実際に通信される出入口の番号のこと を指します。
URLやIPアドレスが住所のようなものだとすると、その家にある各玄関に番号を振ったものがポート番号 です。
MQTTのポート番号には、以下の2パターンがあり、通信の暗号化の有無によって異なります。
- 1883 : 通常のMQTT通信(非暗号化)
- 8883 : 暗号化されたMQTT通信(TLS)
LCIでは主に、暗号化された通信である「8883」が使用されていますね。
身近なものとしては、
- 80 : HTTP(Webページ)
- 443 : HTTPS(暗号化されたWebページ)
などが挙げられます。
(これらは日常的に使用する機会が多いですね)
TLSとは?
TLS(Transport Layer Security)は、 インターネット上でデータを暗号化して安全に通信するためのプロトコル です。
TLSの目的は以下の3つです:
- 暗号化 :通信内容を第三者から隠す(盗聴の防止)
- 認証 :通信相手(サーバーまたはクライアント)が正しい相手であるかを確認
- 改ざん防止 :通信データが途中で書き換えられていないかを検証
mTLS(Mutual TLS)とは?
通常のTLSではサーバー側の正当性のみを検証しますが、 mTLSではクライアント側の正当性 も検証します。
サーバーとクライアントの双方が証明書を提示して、お互いを認証する仕組み です。
LCIなどの通信では、このmTLSが使用されています。
mTLSで必要なもの
-
サーバー側(MQTTブローカーなど)
-
ca.crt:信頼された認証局(CA)の証明書(クライアント証明書の検証に使用)
-
server.crt:サーバー証明書(サーバーの公開鍵を含む)
-
server.key:サーバーの秘密鍵(証明書の署名検証やTLS内部署名に使用)
-
-
クライアント側(ロボットやLCI Boxなど)
- ca.crt:サーバー証明書を検証するためのCA証明書
- client.crt:クライアント証明書(クライアントの公開鍵を含む)
- client.key:クライアントの秘密鍵(証明書の署名検証やTLS内部署名に使用)
開発用のロボットアカウントを作成した際にダウンロードした証明書類は、上記の client.crt / client.key / ca.crt に該当します。
mTLSの流れ
1. 認証フェーズ
1. クライアントがサーバーへ接続要求
2. サーバーが証明書server.crt
を提示し、クライアント証明書の送信を要求
3. クライアントが証明書client.crt
を提示
4. サーバーがクライアント証明書を ca.crt
を使って検証
5. クライアントがサーバー証明書を ca.crt
を使って検証
このプロセスで、 お互いが信頼できる相手であることを確認 します。
2. 暗号化通信(RSAの場合)
1. クライアントが証明書server.crt
からサーバーの公開鍵を取得
2. クライアントがセッション鍵を生成
3. そのセッション鍵を公開鍵で暗号化して送信
4. サーバーが秘密鍵server.key
で復号してセッション鍵を取得
5. クライアントとサーバーが共通のセッション鍵で暗号通信を開始
このような流れにより、 安全にセッション鍵を共有し、共有後の通信が保護 されます。
連載記事リスト
他の記事もチェックしてみてください!
次回の記事では、「LCI」のシュミレータに接続し、仮想のエレベータと通信する全体シーケンスについて紹介する予定です!
ご興味があれば「いいね」やフォローをいただけると励みになります!