1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Microsoft Fabric Real-Time IntelligenceによるROSからのデータ受信を試す〜ROS2ロボット × Microsoft Fabricで作るクラウド連携デジタルツイン入門

1
Last updated at Posted at 2026-06-03

はじめに

IoTやロボティクスの現場では、「リアルワールドの状態をクラウド上に忠実に再現・分析したい」というニーズが急増しています。本記事では、ROS2で動作するロボットの位置情報データを Microsoft Fabric の Real-Time Intelligence(RTI) でリアルタイム収集し、OneLake に長期保存したうえで、Real-Time Dashboard によってリアルタイム可視化を行う「クラウド連携デジタルツイン」の構築手順を初心者向けに解説します。


本記事のアジェンダ

本記事では以下の流れで解説します。

  1. システム全体像の確認 — アーキテクチャの把握
  2. ステップ1:Fabricワークスペースの準備 — 作業環境のセットアップ
  3. ステップ2:ROS2からMQTT変換 — ロボット側のデータ送信設定
  4. ステップ3:RTI EventStreamでMQTT受信 — クラウド側の受信設定
  5. ステップ4:EventhouseとKQL Databaseへのデータ蓄積 — リアルタイムDB構築
  6. ステップ5:OneLakeへのデータ長期保存 — データレイクへの保存と参照
  7. ステップ6:Real-Time Dashboardによる可視化 — リアルタイム表示

システム全体像

[ROS2ロボット]
     │ MQTT変換(ros2_mqtt_bridge等)
     ▼
[インターネット]
     │ MQTT over TLS
     ▼
[Microsoft Fabric RTI]
 ├─ EventStream(受信)
 ├─ Eventhouse / KQL Database(蓄積)
 ├─ OneLake(長期保存)
 └─ Real-Time Dashboard(可視化)

ステップ1:Microsoft Fabric ワークスペースの準備

まず、Microsoft Fabricのテナントとワークスペースを用意します。

  1. Microsoft Fabric ポータル にサインイン
  2. 左メニューから 「ワークスペース」→「新しいワークスペース」 を作成
  3. ライセンスは Fabric 容量(F SKU) または 試用版 を割り当て

ワークスペース内で以降のすべてのリソース(EventStream、Eventhouse、OneLake等)を管理します。

公式ドキュメント:


ステップ2:ROS2ロボット側のMQTT変換設定

ROS2のトピックデータをMQTTに変換してクラウドへ送信します。

2-1. MQTTブリッジのインストール

# ros2_mqtt_bridge を使う例
pip install paho-mqtt

# またはOSSブリッジ
git clone https://github.com/yourusername/ros2_mqtt_bridge

2-2. 送信するデータの定義

ROS2トピック 内容 MQTTトピック例
/robot/pose 位置・姿勢(geometry_msgs/Pose) robot/001/pose

2-3. MQTT送信スクリプト例

import rclpy
from rclpy.node import Node
import paho.mqtt.client as mqtt
from paho.mqtt.packettypes import PacketTypes
from paho.mqtt.properties import Properties
import json
from geometry_msgs.msg import PoseStamped

class MQTTBridgeNode(Node):
    def __init__(self):
        super().__init__('mqtt_bridge')
        # MQTTv5 を使用(Properties によるセッション制御と Content-Type 指定に必要)
        self.client = mqtt.Client(
            mqtt.CallbackAPIVersion.VERSION2,
            protocol=mqtt.MQTTv5,
        )
        self.client.tls_set()  # TLS有効化

        # 切断時にセッションを即時破棄してセッション枯渇を防ぐ
        connect_props = Properties(PacketTypes.CONNECT)
        connect_props.SessionExpiryInterval = 0

        self.client.connect(
            "YOUR_EVENT_GRID_MQTT_ENDPOINT", 8883,
            clean_start=True, properties=connect_props,
        )
        self.client.loop_start()  # ネットワーク処理をバックグラウンドスレッドで開始

        self.pose_sub = self.create_subscription(
            PoseStamped, '/robot/pose', self.pose_callback, 10)

    def pose_callback(self, msg):
        payload = {
            "robot_id": "robot_001",
            "timestamp": self.get_clock().now().to_msg().sec,
            "x": msg.pose.position.x,
            "y": msg.pose.position.y,
            "z": msg.pose.position.z
        }
        # Content-Type を application/json に設定することで
        # Azure Event Grid が data_base64 ではなく data (JSON) としてルーティングする
        pub_props = Properties(PacketTypes.PUBLISH)
        pub_props.ContentType = "application/json"
        self.client.publish("robot/001/pose", json.dumps(payload), properties=pub_props)

公式ドキュメント:


ステップ3:RTI EventStreamでMQTTデータを受信

Microsoft FabricのReal-Time Intelligence(RTI)のEventStreamを使って、ロボットからのMQTTデータを受け取ります。

3-1. EventStreamの作成

  1. Fabricワークスペースで 「+ 新しい項目」→「EventStream」 を選択
  2. EventStreamに任意の名前を付けて作成(例:robot-telemetry-stream

3-2. MQTTソースの追加

EventStream から Azure Event Grid に接続する方法は2通りあります。

方法A:MQTT コネクタ 方法B:Azure Event Grid コネクタ(推奨)
認証 クライアント証明書(Azure Key Vault経由) マネージドID(証明書不要)
設定の複雑さ 高(Key Vault準備が必要) 低(サブスクリプション選択のみ)
対応ブローカー 任意のMQTTブローカー Azure Event Grid 専用

方法A:MQTT コネクタ

⚠️ 評価版(試用版)ご利用時の注意

Azure Event Grid の試用版 Namespace は MQTT 同時接続数の上限が低く設定されています(デフォルト:1接続/認証名)。接続・切断を繰り返すとセッションが蓄積し Quota exceeded エラーが発生する場合があります。エラーが発生した場合は、以下の Azure CLI コマンドで上限を引き上げてください。

az eventgrid namespace update \
  --name my-robot-iotns \
  --resource-group MyResource01 \
  --topic-spaces-configuration maximumClientSessionsPerAuthenticationName=3

または 30分〜1時間待機 してセッションが自然消滅するのを待ち、Azure Portal → Event Grid 名前空間 → 「監視」→「メトリクス」MQTT: Connections が 0 に戻ったことを確認してから再試行してください。本番環境の Standard tier では最大 10,000 同時接続をサポートしており、この制限には通常抵触しません。

Azure Event Grid はX.509証明書によるクライアント認証を使用するため、EventStream 側のクライアント証明書(付録A-3で生成した .pem.key)をあらかじめ Azure Key Vault にアップロードしておく必要があります。

事前準備:Azure Key Vault への証明書アップロード

以下の手順では Azure CLI を使用します。実行環境は2通りあります。

実行環境 説明
Azure Cloud Shell(推奨) Azure Portal 上部の 「>_」アイコン を選択して起動。インストール不要でブラウザから直接実行できます
ローカル PC(macOS) brew update && brew install azure-cli でインストール後、az login でサインイン

公式ドキュメント:

Azure Key Vault が未作成の場合は、以下で作成してください。Key Vault の作成前に、サブスクリプションへのリソースプロバイダー登録が必要です。

# Microsoft.KeyVault リソースプロバイダーを登録
az provider register --namespace Microsoft.KeyVault

# 登録完了を確認("Registered" になるまで1〜2分かかります)
az provider show --namespace Microsoft.KeyVault --query registrationState

"Registered" と表示されたら Key Vault を作成します。

# Key Vault の作成(既存の場合はスキップ)
az keyvault create --name robot-key-001 --resource-group MyResource01 --location japanwest

Azure Key Vault にアップロードする証明書は、証明書と秘密鍵を1ファイルに結合した PEM バンドル形式 である必要があります。また、秘密鍵は Azure Key Vault が要求する PKCS#8 形式BEGIN PRIVATE KEY)に変換が必要です。

# /tmp/mqtt-cert-test に移動(A-3 の作業ディレクトリ)
cd /tmp/mqtt-cert-test

# 秘密鍵を PKCS#8 形式に変換
openssl pkcs8 -topk8 -nocrypt \
  -in robot-001-authn-ID.key \
  -out robot-001-authn-ID-pkcs8.key

# PEM バンドルファイルの作成(証明書 + PKCS#8形式の秘密鍵を結合)
cat robot-001-authn-ID.pem robot-001-authn-ID-pkcs8.key > robot-001-authn-ID-bundle.pem

次に、インポートポリシーファイルを作成します。--kty RSA --size 2048 で生成した RSA 2048 証明書に合わせて keyType"RSA"keySize2048 を指定します。

{
  "secretProperties": {
    "contentType": "application/x-pem-file"
  },
  "keyProperties": {
    "exportable": true,
    "keyType": "RSA",
    "keySize": 2048,
    "reuseKey": false
  },
  "issuerParameters": {
    "name": "Unknown"
  }
}

上記を keyvault-cert-policy.json として保存し、Azure CLI でインポートします。

# プロジェクトディレクトリ(keyvault-cert-policy.json があるディレクトリ)から実行
az keyvault certificate import \
  --vault-name robot-key-001 \
  --name robot-001-client-cert \
  --file /tmp/mqtt-cert-test/robot-001-authn-ID-bundle.pem \
  --policy @keyvault-cert-policy.json

注意: EventStream の接続設定を行うユーザーには、Key Vault の 「Key Vault Certificate User」 または 「Key Vault Administrator」 ロールが必要です。Azure Portal → Key Vault → 「Access control (IAM)」 からロールを割り当ててください。

EventStream への MQTT ソース追加手順

  1. EventStreamのキャンバスで 「ソースの追加」→「データ ソースの接続」→「新規(+)」→「MQTT」→「接続」 を選択
  2. 「新しい接続」 を選択
  3. 接続設定を入力:
    • MQTTブローカーURLssl://<MQTT ホスト名>:8883(例:ssl://my-robot-iotns.japanwest-1.ts.eventgrid.azure.net:8883
    • 接続名:任意
    • ユーザー名:A-4 で登録した「クライアント認証名」(例:robot-001-authn-ID
    • パスワード:空欄
    • [接続] を選択
  4. MQTTデータソースの構成を入力:
    • トピック名robot/001/pose
    • バージョン:V5(またはV3)を選択
  5. 「TLS/mTLS 設定」 を展開すると以下の2つのトグルが表示されます:
    • 「Trust CA certificate」:ブローカーのサーバー証明書を検証するCA証明書を指定する場合に有効化(今回は不要)
    • 「Client certificate and key」:有効にして、Azure Key Vault に保存した証明書(robot-001-client-cert)を指定

方法B:Azure Event Grid コネクタ(推奨)

マネージドIDによる認証で、証明書の管理が不要です。事前に以下の準備が必要です。

事前準備:

  1. Azure Portal → Event Grid 名前空間 → 左メニューの 「Settings」→「Identity」 → システム割り当てIDのスイッチを 「On」 にして 「保存」
  2. Fabric ポータル右上の 歯車アイコン(設定) を選択 → サイドパネルの 「ガバナンスと管理」 下にある 「管理ポータル」 を選択 → 「テナント設定」 を選択 → 「開発者向け設定」 セクションの 「サービス プリンシパルは Fabric の公開用 API を呼び出すことができます」 を有効化
  3. Fabric ワークスペースのページを開き、コマンドバーの 「アクセスの管理」「+ ユーザーまたはグループの追加」 を選択 → 検索欄に Event Grid 名前空間名(例:my-robot-iotns)を入力して表示された 「my-robot-iotns」 を選択して 「追加」 を選択 → ロールに 「共同作業者」 を選択

EventStreamへの追加:

  1. EventStreamのキャンバスで 「ソースの追加」→「データ ソースの接続」→「新規(+)」→「Azure Event Grid Namespace」→「接続」 を選択
  2. 接続設定を入力:
    • サブスクリプション:Event Grid 名前空間が属するAzureサブスクリプション
    • 名前空間名my-robot-iotns
  3. MQTTが有効でルーティング未設定の場合は、「名前空間トピック」 で使用するトピックを選択
  4. 「次へ」→「接続」→「追加」 を選択

公式ドキュメント:


ステップ4:EventhouseとKQL Databaseへのデータ蓄積

受信したデータをリアルタイム検索・分析が可能な Eventhouse(KQL Database)に保存します。

4-1. Eventhouseの作成

  1. ワークスペースで 「+ 新しい項目」→「イベントハウス」 を作成(名前例:robot-eventhouse
  2. 「作成」 を選択すると、robot-eventhouse という名前の KQL データベースが同時に自動生成されます

4-2. KQLテーブルの作成

  1. 作成した robot-eventhouse を開く
  2. 左側のオブジェクトツリーに表示される robot-eventhouse_queryset を選択してクエリエディターを開く
  3. エディターに以下のコマンドを貼り付け、「▶ 実行」 をクリック
// 位置情報テーブル
.create table RobotPose (
    robot_id: string,
    timestamp: datetime,
    x: real,
    y: real,
    z: real
)

4-3. EventStreamの宛先にEventhouseを追加・発行

  1. robot-telemetry-stream EventStream を開き、robot-telemetry-stream ノードの出力から 「宛先の追加」→「イベントハウス」 を選択
  2. データインジェストモード を選択:
モード 動作
直接インジェスト(推奨) 変換なしでそのままEventhouseへ取り込む
インジェスト前のイベント処理(デフォルト) フィルター・集計などの変換を挟んでから取り込む

今回はデータ加工が不要なため 「直接インジェスト」 を選択します。

  1. 以下を入力して 「保存」 を選択:
    • 宛先名:任意(例:robot-eventhouse-dest
    • ワークスペース:使用中のワークスペース
    • イベントハウスrobot-eventhouse
    • KQL データベースrobot-eventhouse
    • テーブルRobotPose(ステップ4-2で作成済み)
  2. 「発行」 ボタンをクリックしてEventStreamを有効化

4-4. KQLインジェストマッピングの設定

EventStreamを発行したら、KQLインジェストマッピング を設定します。これにより、受信したJSONデータのどのフィールドをKQLテーブルのどのカラムに格納するかを定義します。

なぜ必要か: Azure Event Grid は MQTT ペイロードを CloudEvents 形式 でラップして配信します。そのため、実際のデータは最上位ではなく data フィールドの中に格納されています。

{
  "datacontenttype": "application/json",
  "data": { "robot_id": "robot_001", "x": 135.507, "y": 34.693, ... },
  "subject": "robot/001/pose",
  ...
}

KQLのカラム(robot_idx など)に正しく値を取り込むには、$.data.robot_id$.data.x のように data フィールド配下のパスを指定する必要があります。

robot-eventhouse_queryset のクエリエディターで以下を 1行で 実行してください。

.create-or-alter table RobotPose ingestion json mapping 'RobotPose_mapping' '[{"column":"robot_id","path":"$.data.robot_id","datatype":"string"},{"column":"timestamp","path":"$.data.timestamp","datatype":"datetime","transform":"DateTimeFromUnixSeconds"},{"column":"x","path":"$.data.x","datatype":"real"},{"column":"y","path":"$.data.y","datatype":"real"},{"column":"z","path":"$.data.z","datatype":"real"}]'

ポイント: "transform":"DateTimeFromUnixSeconds" により、ブリッジスクリプトが送信するUnixエポック秒の整数値を自動的に datetime 型へ変換します。

このコマンドの再実行が必要なケース:

このマッピング設定は 初回のみ 実行すれば、それ以降は自動的に適用され続けます。ただし、以下の場合は再実行(または新規作成)が必要です。

  • ブリッジスクリプトが送信するJSONのフィールド構成を変更した場合(フィールドの追加・削除・リネーム)
  • KQLテーブルのカラム定義(スキーマ)を変更した場合
  • .create-or-alter を使うと既存マッピングを上書きできるため、スキーマ変更時は同じコマンドを修正して再実行するだけで対応できます。

4-5. ブリッジスクリプトの実行(テストデータ送信)

EventStream の発行とマッピング設定が完了したら、ロボットからのデータ送信をシミュレートします。ターミナルを2つ用意して以下を実行します。

ターミナル1:位置情報シミュレーター起動

事前にローカルMQTTブローカー(Mosquitto)を起動しておきます。

# macOS
brew services start mosquitto

# Ubuntu
sudo systemctl start mosquitto
python3 ros2_pose_simulator.py

ターミナル2:MQTTブリッジ起動

# /tmp/mqtt-cert-test に移動(A-3 の作業ディレクトリ)
cd /tmp/mqtt-cert-test

python3 /path/to/ros2_mqtt_bridge.py \
  --fabric-host my-robot-iotns.japanwest-1.ts.eventgrid.azure.net \
  --username robot-001-authn-ID \
  --cert-file robot-001-authn-ID.pem \
  --key-file robot-001-authn-ID.key

[Fabric接続成功] と表示されれば Azure Event Grid への接続が確立されています。

4-6. 動作確認

EventStream ライブプレビュー

  1. Fabric ポータルで robot-telemetry-stream を開く
  2. キャンバス上の MQTT ソースノードを選択し、「データのプレビュー」 をクリック
  3. ブリッジスクリプトが実行中であれば、受信中のメッセージがリアルタイムで表示されます

KQL クエリによる蓄積確認

  1. ワークスペースから robot-eventhouse を開く
  2. 左側のオブジェクトツリーに表示される robot-eventhouse_queryset を選択してクエリエディターを開く
  3. 以下のクエリを貼り付け、「▶ 実行」 をクリック
RobotPose
| where timestamp > ago(5m)
| order by timestamp desc
| take 10

データが返ってくれば、EventStream → Eventhouse のパイプライン全体が正常に動作しています。
https---cdn-ak.f.st-hatena.com-images-fotolife-y-yoossh-20260530-20260530154357.png.avif

公式ドキュメント:


ステップ5:OneLakeへのデータ長期保存

位置情報の履歴データは、Fabricの統合データレイク OneLake に保存して長期管理します。

5-1. OneLakeのショートカットまたはレイクハウスを利用

  1. ワークスペースで 「+ 新しい項目」→「レイクハウス」 を作成(例:robot_datalake
    • 作成画面の 「レイクハウス スキーマ」 はデフォルト(チェックあり)のまま 「作成」 を選択
  2. Eventhouseの 「OneLake」「Availability」 を有効化します:
    1. ワークスペースから robot-eventhouse を開く
    2. 上部タブの 「データベース」 を選択
    3. 左ツリーの robot-eventhouse データベースを選択
    4. 右側の 「Database details」 パネルの 「OneLake」 セクションで 「Availability」 のトグルを 「Enabled」 に切り替える
    5. トグルが 「Enabled」 になると、KQLのデータがDelta Parquet形式でOneLakeへの同期が開始されます

⏱️ 同期完了までの待機について

「Database details」パネルの 「OneLake」 セクションには 「Latency」 という同期遅延の目安が表示されます(例:3 hours)。有効化直後は 「Pending size」 に未同期のデータ量が表示され、同期が完了すると 0 B になります。次のステップ(Notebookからのデータ参照)は同期完了後に実施してください。

5-2. OneLakeに同期された位置情報ログの参照

OneLake可用性を有効にすると、RobotPose テーブルのデータが Delta Parquet 形式で OneLake に継続的に同期されます。Fabric Notebook(Spark 環境)から以下のように参照できます。

# Fabric Notebook(Spark環境)でOneLakeの位置情報ログを参照
df = spark.read.format("delta").load(
    "abfss://robot_datalake@onelake.dfs.fabric.microsoft.com/Tables/RobotPose"
)
df.show()

公式ドキュメント:


ステップ6:Real-Time Dashboardによるリアルタイム可視化

Microsoft Fabric の Real-Time Dashboard を使って、ロボットの位置情報をリアルタイムに可視化します。

  1. Fabricワークスペースで 「+ 新しい項目」→「リアルタイム ダッシュボード」 を選択して新規作成(名前はデフォルトの NewRTDashboard_1 のままでも可)

  2. KQL Databaseをデータソースとして接続:「+ データソースの追加」robot-eventhouse を選択 → 「Connect」 をクリック

  3. 「新しいタイル」→「マップ」 を選択

  4. 画面下部の 「Query editor」 に以下のクエリを貼り付け、「実行」 をクリック

    // リアルタイムダッシュボード用クエリ(直近5分の軌跡)
    RobotPose
    | where timestamp > ago(5m)
    | where robot_id == "robot_001"
    | project timestamp, x, y, z
    | order by timestamp asc
    
  5. 左パネルの 「データ」 セクションで 「場所の定義方法:」→「緯度と経度」 を選択し、以下のように列をマッピング:

    • 緯度y (real)
    • 経度x (real)
  6. 「Done」 をクリックして保存
    https---cdn-ak.f.st-hatena.com-images-fotolife-y-yoossh-20260530-20260530205000.png.avif

公式ドキュメント:


付録:Azure Event GridをMQTTブローカーとして設定する

ステップ3のEventStreamにMQTTソースを接続するには、ロボット(ブリッジスクリプト)とFabric EventStreamの両方が接続できるMQTTブローカーが必要です。ここでは Azure Event Grid をMQTTブローカーとして利用する手順を解説します。

A-1. Event Grid 名前空間の作成

  1. Azure Portal にサインインし、「Event Grid 名前空間」 を検索して選択
  2. 「+ 作成」 をクリック
  3. 以下を入力して 「確認 + 作成」→「作成」
    • 名前空間名:一意の名前(3〜50文字、英数字とハイフンのみ)。本記事では my-robot-iotns を使用
    • サブスクリプション / リソースグループ:任意
    • リージョン:任意

A-2. MQTTブローカーの有効化とエンドポイント確認

  1. 作成した名前空間の概要ページを開く
  2. 概要ページで 「MQTT ブローカー」「無効」 と表示されている 「無効」 リンクを選択(「構成」ページへリダイレクトされます)
  3. 「構成」ページで 「MQTT ブローカーを有効にする」 を選択
  4. 「適用」 をクリックして設定を保存
  5. 名前空間の概要ページに戻り、表示される 「MQTT ホスト名」 を控える
my-robot-iotns.japanwest-1.ts.eventgrid.azure.net  (ポート 8883)

注意: いったん有効にした MQTT ブローカーは無効に戻すことができません。

A-3. クライアント証明書の作成

Azure Event Grid はX.509証明書によるクライアント認証を使用します。まず Step CLI をインストールし、ローカルCAとクライアント証明書を生成します。証明書ファイルの作業ディレクトリは /tmp/mqtt-cert-test とします。

# Step CLI のインストール(macOS)
brew install step

# 作業ディレクトリの作成と移動
mkdir -p /tmp/mqtt-cert-test && cd /tmp/mqtt-cert-test

# CA の初期化(実行中にCAの秘密鍵パスワードとプロビジョナーのパスワードを求められます)
# ※ここで設定したCAの秘密鍵パスワードは、後続の証明書生成コマンドでも必要になります。必ず記録しておいてください。
step ca init --deployment-type standalone --name MqttAppSamplesCA \
  --dns localhost --address 127.0.0.1:443 \
  --provisioner MqttAppSamplesCAProvisioner

# クライアント証明書の生成(実行中にA-3で設定したCAパスワードを求められます)
# ※ ~/.step/ は step ca init がCA証明書を生成したホームディレクトリ配下のパス
step certificate create robot-001-authn-ID robot-001-authn-ID.pem robot-001-authn-ID.key \
  --ca ~/.step/certs/intermediate_ca.crt \
  --ca-key ~/.step/secrets/intermediate_ca_key \
  --no-password --insecure --not-after 2400h

# サムプリントの取得
step certificate fingerprint robot-001-authn-ID.pem

A-4. クライアントの登録

  1. 名前空間の左メニュー 「MQTT ブローカー」→「クライアント」 を選択
  2. 「+ クライアント」 をクリック
  3. 以下を入力して 「作成」
項目
クライアント名 任意(例:robot-001
クライアント認証名 証明書生成時の名前(例:robot-001-authn-ID
クライアント証明書認証検証スキーム Thumbprint Match を選択

「Thumbprint Match」を選択後に表示される「プライマリ拇印」入力フィールドに、A-3 で取得したサムプリントの値を入力します。

A-5. トピック空間の作成

  1. 左メニュー 「MQTT ブローカー」→「トピック空間」 を選択
  2. 「+ トピック空間」 をクリック
  3. 以下を入力して 「作成」
    • 名前robot-topic-space
    • トピックテンプレートrobot/001/pose(「+ トピック テンプレートの追加」から入力)

A-6. アクセス許可バインドの設定

ロボット(パブリッシャー)とFabric EventStream(サブスクライバー)の両方向で通信できるよう、2件のバインドを作成します。

  1. 左メニュー 「MQTT ブローカー」→「アクセス許可のバインド」 を選択
  2. 「+ アクセス許可のバインド」 で以下を2件作成
バインド名 クライアントグループ トピック空間 権限
robot-publisher-binding $all robot-topic-space パブリッシャー
robot-subscriber-binding $all robot-topic-space サブスクライバー

A-7. ブリッジスクリプトからの接続

python3 ros2_mqtt_bridge.py \
  --fabric-host my-robot-iotns.ts.eventgrid.azure.net \
  --username robot-001-authn-ID \
  --cert-file robot-001-authn-ID.pem \
  --key-file robot-001-authn-ID.key

A-8. 動作確認(Azure Portal メトリクス)

ブリッジスクリプトを実行しながら、Azure Portal でメッセージの送受信を確認できます。

  1. Azure Portal → 作成した Event Grid 名前空間 → 左メニュー 「監視」→「メトリクス」 を選択
  2. 「+ メトリクスの追加」 をクリックし、以下のメトリクスを追加して確認
メトリクス名 確認できること
MQTT: Connections ブリッジスクリプトが正常に接続できているか
MQTT: Successful Published Messages Event Grid がメッセージを受信しているか
MQTT: Successful Delivered Messages サブスクライバーへメッセージが配信されているか

スクリプト実行中にこれらの値が増加していれば、Azure Event Grid との接続およびメッセージ送信が正常に動作しています。

公式ドキュメント:


セキュリティに関する注意事項

ロボットがインターネット経由でクラウドへ接続する際は、以下のセキュリティ対策を必ず実施してください。

  • 通信の暗号化:MQTT over TLS(ポート8883)を使用
  • 認証:X.509証明書またはSASトークンによるデバイス認証
  • ネットワーク分離:ロボットのネットワークセグメントをファイアウォールで保護
  • 最小権限の原則:ロボットには送信専用の権限のみ付与

公式ドキュメント:


まとめ

本記事では以下のシステムを段階的に構築しました:

ステップ 内容 使用技術
1 Fabricワークスペース準備 Microsoft Fabric
2 ROS2からMQTT変換 ROS2 / paho-mqtt
3 クラウドでMQTT受信 RTI EventStream
4 リアルタイムDB蓄積 Eventhouse / KQL
5 データレイク保存 OneLake / レイクハウス
6 リアルタイム可視化 RTI Real-Time Dashboard

Microsoft Fabricはデータの収集・蓄積・保存・可視化を 単一プラットフォーム で完結できるため、デジタルツインの構築コストと運用負荷を大幅に削減できます。MQTTブローカーの準備については付録のAzure Event Grid設定手順もご参照ください。本記事を参考に、ぜひご自身のロボットやIoTデバイスでお試しください。

最後まで読んでいただきありがとうございます。本記事がROS2ロボットやIoTデバイスのクラウド連携を始める際のお役に立てれば幸いです。ご質問やフィードバックがあれば、ぜひコメントでお知らせください。


付録B:サンプルソースコード

本記事で使用したスクリプトの全文を掲載します。

B-1. ros2_mqtt_bridge.py

ローカルMQTTブローカーの /robot/pose トピックを購読し、Azure Event Grid(Fabric)のMQTTエンドポイントへ転送するブリッジスクリプトです。

"""
ROS2 → Microsoft Fabric MQTTブリッジ

ローカルMQTTブローカーの /robot/pose トピックから geometry_msgs/PoseStamped
相当のJSONを受信し、Microsoft Fabric(Azure Event Grid)のMQTTエンドポイントへ
robot/001/pose トピックとして転送します。

使用方法:
    python ros2_mqtt_bridge.py --fabric-host <ENDPOINT> [オプション]

前提:
    - ros2_pose_simulator.py が別ターミナルで起動済みであること
    - pip install paho-mqtt
"""

import argparse
import json
import ssl

import paho.mqtt.client as mqtt
from paho.mqtt.packettypes import PacketTypes
from paho.mqtt.properties import Properties

LOCAL_TOPIC = "/robot/pose"
FABRIC_TOPIC = "robot/001/pose"
ROBOT_ID = "robot_001"


def on_local_connect(client, userdata, flags, reason_code, properties=None):
    if reason_code == 0:
        client.subscribe(LOCAL_TOPIC, qos=0)
        print(f"[ローカル接続成功] '{LOCAL_TOPIC}' をサブスクライブ")
    else:
        print(f"[ローカル接続失敗] コード: {reason_code}")


def make_on_message(fabric_client: mqtt.Client):
    """ローカルMQTTからメッセージを受信してFabricへ転送するコールバックを返す。"""

    def on_message(client, userdata, msg):
        try:
            data = json.loads(msg.payload.decode())
            pos = data["pose"]["position"]
            stamp = data["header"]["stamp"]

            payload = {
                "robot_id": ROBOT_ID,
                "timestamp": stamp["sec"],
                "x": pos["x"],
                "y": pos["y"],
                "z": pos["z"],
            }
            # Content-Type を application/json に設定することで
            # Azure Event Grid が data_base64 ではなく data (JSON) としてルーティングする
            pub_props = Properties(PacketTypes.PUBLISH)
            pub_props.ContentType = "application/json"
            fabric_client.publish(FABRIC_TOPIC, json.dumps(payload), qos=1, properties=pub_props)
            print(f"[転送] x={pos['x']:>8.4f}  y={pos['y']:>8.4f}{FABRIC_TOPIC}")
        except (KeyError, json.JSONDecodeError) as e:
            print(f"[エラー] ペイロードの解析に失敗: {e}")

    return on_message


def on_fabric_connect(client, userdata, flags, reason_code, properties=None):
    if reason_code == 0:
        print(f"[Fabric接続成功] {userdata['host']}:{userdata['port']}")
    else:
        print(f"[Fabric接続失敗] コード: {reason_code}")


def main():
    parser = argparse.ArgumentParser(description="ROS2 → Fabric MQTTブリッジ")
    parser.add_argument("--local-host", default="localhost")
    parser.add_argument("--local-port", type=int, default=1883)
    parser.add_argument("--fabric-host", required=True,
                        help="Fabric/Event Grid MQTTエンドポイントのFQDN")
    parser.add_argument("--fabric-port", type=int, default=8883)
    parser.add_argument("--username", default=None,
                        help="クライアント認証名(A-4で登録した名前)")
    parser.add_argument("--password", default=None)
    parser.add_argument("--cert-file", default=None,
                        help="クライアント証明書ファイルのパス(.pem)")
    parser.add_argument("--key-file", default=None,
                        help="クライアント秘密鍵ファイルのパス(.key)")
    parser.add_argument("--no-tls", action="store_true",
                        help="TLSを無効にする(テスト用)")
    args = parser.parse_args()

    # Fabricクライアント(パブリッシャー)
    fabric_client = mqtt.Client(
        mqtt.CallbackAPIVersion.VERSION2,
        client_id=f"{ROBOT_ID}_bridge",
        protocol=mqtt.MQTTv5,
        userdata={"host": args.fabric_host, "port": args.fabric_port},
    )
    fabric_client.on_connect = on_fabric_connect

    if args.username:
        fabric_client.username_pw_set(args.username, args.password)

    if not args.no_tls:
        fabric_client.tls_set(
            certfile=args.cert_file,
            keyfile=args.key_file,
            cert_reqs=ssl.CERT_REQUIRED,
            tls_version=ssl.PROTOCOL_TLS_CLIENT,
        )

    # SessionExpiryInterval=0: 切断時にセッションを即時破棄しクォータを消費しない
    connect_props = Properties(PacketTypes.CONNECT)
    connect_props.SessionExpiryInterval = 0

    fabric_client.connect(args.fabric_host, args.fabric_port, clean_start=True, properties=connect_props)
    fabric_client.loop_start()

    # ローカルクライアント(サブスクライバー)
    local_client = mqtt.Client(
        mqtt.CallbackAPIVersion.VERSION2,
        client_id="ros2_bridge_local",
    )
    local_client.on_connect = on_local_connect
    local_client.on_message = make_on_message(fabric_client)
    local_client.connect(args.local_host, args.local_port)

    print("[実行中] Ctrl+C で停止")
    try:
        local_client.loop_forever()
    except KeyboardInterrupt:
        print("\n[停止] Ctrl+C を受信しました")
    finally:
        fabric_client.loop_stop()
        fabric_client.disconnect()
        local_client.disconnect()


if __name__ == "__main__":
    main()

B-2. ros2_pose_simulator.py

ROS2ロボットの代わりに、大阪市中心部を基点とした円形経路の位置情報をローカルMQTTブローカーへ送信するシミュレーターです。

"""
ROS2 位置情報データ生成シミュレーター

geometry_msgs/PoseStamped に相当するJSONペイロードを生成し、ローカルMQTT
ブローカーに送信することで ROS2 パブリッシャーを疑似的に再現します。
ros2_mqtt_bridge.py と組み合わせて使用してください。

座標系:
    x = 経度(Longitude)、y = 緯度(Latitude)
    大阪市中心部(緯度 34.6937、経度 135.5023)を基点に半径約500mの円形経路を走行

使用方法:
    python ros2_pose_simulator.py [オプション]

前提:
    - ローカルMQTTブローカー(例: Mosquitto)が起動済みであること
      macOS: brew install mosquitto && brew services start mosquitto
      Ubuntu: sudo apt install mosquitto && sudo systemctl start mosquitto
    - pip install paho-mqtt
"""

import argparse
import json
import math
import time

import paho.mqtt.client as mqtt

LOCAL_TOPIC = "/robot/pose"
FRAME_ID = "map"

# 大阪市中心部を基点とする経路パラメータ
CENTER_LAT = 34.6937   # 緯度
CENTER_LON = 135.5023  # 経度
# 約500m半径(緯度1度 ≈ 111km なので 0.005度 ≈ 556m)
RADIUS_DEG = 0.005


def circular_path(step: int, total_steps: int = 360) -> tuple[float, float]:
    """大阪市中心部を基点とした円形経路上の (longitude, latitude) を返す。"""
    angle = math.radians(step % total_steps)
    lon = CENTER_LON + RADIUS_DEG * math.cos(angle)
    lat = CENTER_LAT + RADIUS_DEG * math.sin(angle)
    return lon, lat


def build_pose_stamped(step: int) -> dict:
    """geometry_msgs/PoseStamped 相当のJSONペイロードを生成する。
    x = 経度(Longitude)、y = 緯度(Latitude)として格納する。
    """
    now = time.time()
    lon, lat = circular_path(step)
    return {
        "header": {
            "stamp": {"sec": int(now), "nanosec": int((now % 1) * 1e9)},
            "frame_id": FRAME_ID,
        },
        "pose": {
            "position": {"x": round(lon, 6), "y": round(lat, 6), "z": 0.0},
            "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0},
        },
    }


def on_connect(client, userdata, flags, reason_code, properties=None):
    if reason_code == 0:
        print(f"[接続成功] ローカルブローカー {userdata['host']}:{userdata['port']}")
    else:
        print(f"[接続失敗] コード: {reason_code}")


def main():
    parser = argparse.ArgumentParser(description="ROS2 位置情報シミュレーター")
    parser.add_argument("--host", default="localhost")
    parser.add_argument("--port", type=int, default=1883)
    parser.add_argument("--interval", type=float, default=1.0,
                        help="送信間隔(秒、デフォルト: 1.0)")
    parser.add_argument("--count", type=int, default=0,
                        help="送信回数(0=無制限)")
    args = parser.parse_args()

    client = mqtt.Client(
        mqtt.CallbackAPIVersion.VERSION2,
        client_id="ros2_simulator",
        userdata={"host": args.host, "port": args.port},
    )
    client.on_connect = on_connect
    client.connect(args.host, args.port)
    client.loop_start()

    print(f"[開始] トピック '{LOCAL_TOPIC}' へ送信中... (Ctrl+C で停止)")
    step = 0
    try:
        while True:
            payload = build_pose_stamped(step)
            client.publish(LOCAL_TOPIC, json.dumps(payload), qos=0)
            pos = payload["pose"]["position"]
            print(f"[送信] step={step:>4d}  lon={pos['x']:>10.6f}  lat={pos['y']:>10.6f}")
            step += 1
            if args.count > 0 and step >= args.count:
                break
            time.sleep(args.interval)
    except KeyboardInterrupt:
        print("\n[停止] Ctrl+C を受信しました")
    finally:
        client.loop_stop()
        client.disconnect()


if __name__ == "__main__":
    main()

注意: 本記事の執筆時点(2026年5月)における情報をもとにしています。Microsoft Fabricの機能は継続的に更新されているため、最新情報は Microsoft Fabric 公式ドキュメント をご確認ください。一部機能はプレビュー段階のものも含まれており、GA(一般提供)までの間に設定項目・動作・画面上の表記が変更される可能性があります。

なお、執筆時点ではプレビュー版の影響か、ポータル画面上で英語と日本語の表記が混在している箇所が多く見受けられ、手順の把握が難しい場面がありました。GA に向けて UI の日本語化・表記統一が進むことを期待しています。

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?