0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS IoT Greengrass IPC PubSub入門 ~エッジ内コンポーネント間通信の仕組みと実装~

0
Posted at

こんにちは。株式会社トラストでソリューションアーキテクトをしている髙宮です。Qiitaでの投稿は久しぶりになりますが、また再開します。

普段はBedrockなどのGenAIのソリューション構築をしていますが、その他にもRaspberry PiとGreengrassを使った賞味期限印字検査やリアルタイム異音検知のモデル構築、エッジデバイスでの推論等を担当しています。詳しくは 音検査のパートナー「オトモニ」

エッジデバイスを作っているとよくぶつかるのが 「コンポーネント同士をどう繋ぐか?」 という課題です。

今回はその解決策であるIPC PubSubについて、仕組みから実装、ハマりどころまでまとめました。


Greengrassの基礎をざっくり

Greengrassとは

AWS IoT Greengrassは、AWSの機能をエッジデバイス(Raspberry Pi等)に持ち込むランタイム(=デバイス上で常駐動作するソフトウェア基盤)です。センサーデータを全部クラウドに送ると、遅延・オフライン・コストの問題が出ます。Greengrassなら現場で処理して、必要な結果だけクラウドに送れます

核となるのがNucleus(ニュークリアス)です。Nucleusは以下の3つを担う「司令塔」のような存在です:

  • ライフサイクル管理: コンポーネントの起動・停止・更新を制御
  • 認可ポリシーの適用: どのコンポーネントがどのトピックに通信できるかを制御
  • IPC(プロセス間通信)サーバーの提供: コンポーネント同士がメッセージをやり取りする仕組みを提供

📖 公式ドキュメント: How AWS IoT Greengrass works


図1: Greengrassのアーキテクチャ。Nucleus(司令塔)がコンポーネントのライフサイクルとIPC通信を管理する。赤い矢印がIPC PubSub(ローカル通信)、青い矢印がMQTT Proxy(クラウド通信)

コンポーネントの種類

Greengrassでは機能をコンポーネントという単位で管理します。大きく分けて2種類あります:

種類 説明
パブリックコンポーネント AWS が提供する公式コンポーネント。デプロイ時に選択するだけで使える aws.greengrass.Nucleus(司令塔)、aws.greengrass.clientdevices.mqtt.Bridge(MQTT中継)、aws.greengrass.LogManager(ログ転送)
カスタムコンポーネント 自分で作るコンポーネント。今回の記事で扱うのはこちら com.example.SensorReadercom.example.DataProcessor

図1で描いた Nucleus や MQTT Bridge も実はコンポーネントの一つです。つまりGreengrassの仕組みそのものがコンポーネントで構成されているということです。

📖 公式ドキュメント: AWS-provided components

コンポーネント = レシピ + アーティファクト

カスタムコンポーネントの構成は2つだけです。


図2: コンポーネントはレシピ(設計図)とアーティファクト(実行コード)で構成され、Lifecycleの3ステップ(install → setenv → run)で起動する

要素 役割 形式
レシピ コンポーネントの設計図。依存関係・権限・起動方法を宣言 YAML / JSON
アーティファクト 実際に動くコード。レシピから参照される Python, Java, バイナリ等
# レシピの最小例
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.HelloSensor
ComponentVersion: '1.0.0'
ComponentConfiguration:
  DefaultConfiguration:
    SensorInterval: 10
Manifests:
  - Platform:
      os: linux
    Lifecycle:
      install: pip3 install -r {artifacts:path}/requirements.txt
      run: python3 -u {artifacts:path}/sensor_reader.py

注意: 上記は構造を示すための最小例です。実務では後述するvenv(仮想環境)での隔離が必須です。pip3 installをシステムPythonに直接行うのは避けてください。

これは最小例ですが、実務ではもっと構造化されたレシピが必要です。特にPythonコンポーネントでは、以下の3つのポイントが重要になります。

実務のレシピで押さえるべき3つのポイント

レシピのLifecycle(ライフサイクル)セクションで、コンポーネントの「インストール → 環境設定 → 実行」の各段階を定義します。

1. venv(仮想環境)でPythonを隔離する

Pythonコンポーネントではvenv(仮想環境)の利用が必須です。 システムのPythonに直接pip installすると、他のコンポーネントやOSのパッケージと競合します。

Lifecycleのinstallステップで{work:path}(コンポーネント専用の作業ディレクトリ)にvenvを作成し、runステップではvenv内のPythonを明示指定して実行します。

Lifecycle:
  install: |
    python3 -m venv {work:path}/venv
    {work:path}/venv/bin/pip install --upgrade pip
    {work:path}/venv/bin/pip install boto3 awsiotsdk
  run: |
    {work:path}/venv/bin/python3 -u {artifacts:path}/sensor_reader.py

run-uフラグも重要です。これがないとprint文のログがリアルタイムでGreengrassのログファイルに出力されず、デバッグが困難になります。

Raspberry Piなどの低スペックデバイスでは、ML系ライブラリ(numpy等)のインストールに時間がかかるため、Timeoutを長めに設定する必要がある点にも注意してください。

2. Setenvで設定値を環境変数に変換する

レシピのDefaultConfigurationに書いた設定値を、setenvで環境変数としてPythonコードに渡せます。{configuration:/キー名}の形式で参照します。

ComponentConfiguration:
  DefaultConfiguration:
    SensorInterval: 10
    OutputTopic: "sensor/temperature/ready"
Manifests:
  - Lifecycle:
      setenv:
        SENSOR_INTERVAL: "{configuration:/SensorInterval}"
        OUTPUT_TOPIC: "{configuration:/OutputTopic}"

これにより、コードを変更せずデプロイ時の設定変更だけで動作を変えられます。Python側ではos.environ.get("SENSOR_INTERVAL", "10")のように取得します。

3. Artifactsの配置とS3連携

実行するPythonファイルはS3に置き、レシピのArtifactsセクションでURIを指定します。Nucleusがデプロイ時に自動ダウンロードし、{artifacts:path}配下に配置してくれます。

また、S3やIoT Coreと通信するコンポーネントでは、ComponentDependenciesaws.greengrass.TokenExchangeServiceを指定してAWS認証を有効にする必要があります。

📖 公式ドキュメント: レシピの全仕様はComponent recipe referenceを参照。レシピ変数({work:path}{artifacts:path}{iot:thingName}等)の一覧もここにあります。

コンポーネントに分けると、機能ごとに独立して更新でき、1つがクラッシュしても他は動き続けます。では、分割したコンポーネント同士をどう繋ぐか? ── ここからが本題です。


IPC PubSubとは

コンポーネント間通信の課題

複数のコンポーネント(データ収集・処理・通知など)を動かすとき、「収集が完了したことを処理側にどう伝えるか?」が問題になります。

方式 問題点
ファイルポーリング CPU無駄遣い。タイミングが不安定
MQTT経由(IoT Core) クラウド往復が発生(50〜300ms)。オフラインで動かない
IPC PubSub ローカルで完結。サブミリ秒。オフラインOK

IPC PubSubはMQTTではない

最も重要な点: IPC PubSubはMQTTプロトコルではありません。

Nucleus内蔵のIPCサーバーがEventStreamRPCプロトコル(AWS独自の高速な通信規格)でコンポーネント間のメッセージルーティングを行います。MQTTブローカー(=外部のメッセージ中継サーバー)は一切関与しません。NucleusがコンポーネントのプロセスにIPC接続用の環境変数を自動注入し、各コンポーネントはそれを使ってIPCサーバーに接続します。

IPC PubSubとMQTT Proxyの比較

比較項目 IPC PubSub MQTT Proxy
IPC識別子 aws.greengrass.ipc.pubsub aws.greengrass.ipc.mqttproxy
通信範囲 デバイス内のみ AWS IoT Core経由でインターネット
プロトコル EventStreamRPC MQTT (TLS)
Publish操作名 PublishToTopic PublishToIoTCore
Subscribe操作名 SubscribeToTopic SubscribeToIoTCore
オフライン ✅ 完全対応 △ 制限付き(オフライン時はメッセージを一時保存し、接続復帰後に送信。ただしデフォルトは最大2500件のメモリ保存のため、長時間のオフラインではメッセージ消失の可能性あり)
レイテンシ(遅延) サブミリ秒(1ms未満) 数十〜数百ms
QoS(配信保証) なし(Fire-and-forget方式。Subscriberが起動中であれば即時配信されるが、未起動のSubscriberにはメッセージは届かない) QoS 0(最大1回配信)/ QoS 1(最低1回配信)
メッセージ上限 明記なし 128KB(最大2.5MBまで設定可)
スループット上限 明記なし 100 TPS(秒間100メッセージ)/ 512 KB/秒

📖 公式ドキュメント: Publish/subscribe local messages / Publish/subscribe AWS IoT Core MQTT messages

メッセージ形式: JsonMessage vs BinaryMessage

IPC PubSubのメッセージは2種類あります。

形式 Pythonでの型 用途
JsonMessage dict 構造化データ(温度、状態、設定値など)
BinaryMessage bytes 任意のバイナリ(画像、圧縮データなど)

構造化データならJsonMessage一択です。json.dumps/json.loadsが不要で、dictをそのまま渡すだけで済みます。


IPC PubSub vs MQTT Proxy ── 混同注意

Greengrassには似て非なる2つの通信APIがあります。IPC識別子・操作名を混同するとUnauthorizedErrorの原因になるので注意してください。前章の比較表に記載した通り、それぞれ異なるIPC識別子と操作名を使います。

使い分けはシンプルです:

  • 同じデバイス内 → IPC PubSub
  • クラウドへ送信 → MQTT Proxy
  • 両方 → 併用(実務で最も多いパターン)


図3: 実務の典型パターン。デバイス内の前処理はIPC PubSubで高速に連鎖させ、最終結果だけMQTT Proxyでクラウドに送る

実務の典型パターン: IPC + MQTT併用

実務ではこのパイプラインパターンが最も多いです。

SensorReader ──IPC PubSub──→ DataProcessor ──IPC PubSub──→ CloudUploader ──MQTT Proxy──→ IoT Core
                                                                                            ↓
                                                                                     Lambda / S3 / DynamoDB

デバイス内の前処理はIPC PubSubで高速に連鎖させ、最終結果だけMQTT Proxyでクラウドに飛ばします。


実装してみる

ここでは、前章で紹介したパイプラインパターンの一部を実際にコードで見ていきます。SensorReader(Publisher)がセンサーデータをIPC PubSubで送信し、DataProcessor(Subscriber)がそれを受信して処理するという最小構成を実装します。

⚠️ 以下は概念的な実装例です。本番では適切なエラーハンドリング等を追加してください。

IPC Client V2を使う

SDKにはV1とV2がありますが、必ずV2を使ってください。公式ドキュメントでもV2が推奨されています。

V1とV2の最大の違いはスレッドモデル(=メッセージ受信時の処理がどのスレッドで動くか)です:

プログラム内で並行して動く「処理の流れ」のことです。1つのプログラムの中で、メインの処理(メインスレッド)と通信の処理(ネットワークスレッド)が同時に動いています。V1では通信スレッドがメッセージ受信処理も兼務するため、受信処理中に通信を行おうとすると「お互いの完了を待ち合う」デッドロック(=永久に止まる状態)が起きます。V2では受信処理を別のスレッドで動かすことでこの問題を解決しています。

比較項目 V1 V2
コールバック(=メッセージ受信時に呼ばれる関数)実行 通信と同じスレッド(兼務) 別スレッドで実行(独立)
コールバック内でのPublish ❌ デッドロック(永久停止) ✅ 安全に可能
コード量 多い(クラス継承が必須) 少ない(関数/ラムダで記述)
API 非同期のみ 同期・非同期の両方
最小SDKバージョン Python v1.5.3 Python v1.9.0
C++/JavaScript対応 ✅ 対応 ❌ 非対応(V1のみ)


図5: V1では通信スレッドがコールバックを兼務するためデッドロックが発生。V2はコールバック専用スレッドで安全にPublish可能

V1のデッドロック問題を具体的に示すと:

# ❌ V1: メッセージ受信時にPublishするとデッドロック(永久停止)!
class MyHandler(SubscribeToTopicStreamHandler):
    def on_stream_event(self, event):
        # 通信スレッドで動いているのに、
        # 同じ通信スレッドを使おうとして永久に待ち合う
        ipc_client.publish_to_topic(...)  # → DEADLOCK! プログラムが固まる

# ✅ V2: 受信処理は別スレッドで実行されるので安全
def on_stream_event(event):
    # 通信スレッドとは別の場所で動くので、Publishしても問題なし
    ipc_client.publish_to_topic(...)  # → OK! 正常に動く

📖 公式ドキュメント: Use the AWS IoT Device SDK to communicate with the Greengrass nucleus, other components, and AWS IoT Core

Publisher

import os
import time
from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
from awsiot.greengrasscoreipc.model import PublishMessage, JsonMessage

# レシピのSetenvで設定した環境変数から取得(ハードコードしない)
TOPIC = os.environ.get("OUTPUT_TOPIC", "sensor/temperature/ready")
INTERVAL = int(os.environ.get("SENSOR_INTERVAL", "10"))

def main():
    ipc = GreengrassCoreIPCClientV2()

    try:
        while True:
            sensor_data = {
                "temperature": 25.3,
                "humidity": 60.1,
                "timestamp": time.time(),
            }

            # dictをそのまま渡すだけ。シリアライズはSDKが自動で行う
            ipc.publish_to_topic(
                topic=TOPIC,
                publish_message=PublishMessage(
                    json_message=JsonMessage(message=sensor_data)
                )
            )
            print(f"Published: {sensor_data}")
            time.sleep(INTERVAL)
    finally:
        ipc.close()  # IPC接続とスレッドプールの解放

if __name__ == "__main__":
    main()

ポイント: ipc.close()で明示的にリソースを解放しましょう。GreengrassCoreIPCClientV2は内部でIPC接続やスレッドプール(並行処理の仕組み)を保持しているため、プロセス終了時にclose()を呼んで後片付けするのがベストプラクティスです。

Subscriber

import os
import sys
import time
from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
from awsiot.greengrasscoreipc.model import SubscriptionResponseMessage

TOPIC = os.environ.get("SUBSCRIBE_TOPIC", "sensor/temperature/ready")

def on_stream_event(event: SubscriptionResponseMessage) -> None:
    """受信コールバック(V2では別スレッドで実行される)"""
    message = event.json_message.message  # dict型で取得
    topic = event.json_message.context.topic  # 実際のトピック名(Nucleus v2.6.0+ / SDK v1.9.0+)
    print(f"Received on {topic}: {message}")

    if message.get("temperature", 0) > 30.0:
        print("ALERT: High temperature!")

def on_stream_error(error: Exception) -> bool:
    """エラーコールバック

    重要: このコールバックは通信スレッドで実行される(V2でも!)
    そのため、ここでは重い処理やIPC呼び出しを避けること。
    """
    print(f"Stream error: {error}", file=sys.stderr)
    return False  # ← Falseでストリーム維持。Trueだと終了してしまう

def main():
    ipc = GreengrassCoreIPCClientV2()

    _, operation = ipc.subscribe_to_topic(
        topic=TOPIC,
        on_stream_event=on_stream_event,
        on_stream_error=on_stream_error,
        on_stream_closed=lambda: print("Stream closed"),
    )

    # メインスレッドを維持(これがないとプロセスが即終了する)
    try:
        while True:
            time.sleep(10)
    except KeyboardInterrupt:
        print("Shutting down...")
    finally:
        operation.close()  # サブスクリプションを明示的にクローズ
        ipc.close()

if __name__ == "__main__":
    main()

Subscriberの3つの注意点

1. on_stream_errorFalseを返す

Trueを返すとストリーム(=メッセージの受信チャネル)が終了し、以降のメッセージを受け取れなくなります。一時的なエラーであればログだけ出してFalseを返すのが安全です。

2. while Trueでメインスレッドを維持する

「Subscribeしたのにメッセージが来ない」場合、まずこれを確認してください。subscribe_to_topicはサブスクリプションを設定するだけで、メインスレッドが終了するとプロセスごと消えます。

3. on_stream_errorは通信スレッドで実行される

V2でもこのエラー処理だけは通信スレッド上で動きます。ここでIPC呼び出し(Publish等)をするとデッドロックの可能性があるため、ログ出力程度に留めましょう。

📖 SDKリファレンス: aws-iot-device-sdk-python-v2 / awsiot.greengrasscoreipc


レシピのaccessControl設定

IPC PubSubを使うにはレシピで認可ポリシー(=「このコンポーネントはこのトピックへの通信を許可する」というルール)を設定します。ここを間違えると動かないので丁寧に見ていきます。


図4: accessControlの階層構造とワイルドカードの罠。resourcesとtopicパラメータでルールが異なる

Publisher側

ComponentConfiguration:
  DefaultConfiguration:
    accessControl:
      aws.greengrass.ipc.pubsub:        # ← IPC識別子(pubsubとmqttproxyを間違えない!)
        "com.example.SensorPublisher:pubsub:1":  # ← Policy ID(全コンポーネントで一意)
          policyDescription: "Allow publish to sensor topic"
          operations:
            - "aws.greengrass#PublishToTopic"
          resources:
            - "sensor/temperature/ready"

Subscriber側

ComponentConfiguration:
  DefaultConfiguration:
    accessControl:
      aws.greengrass.ipc.pubsub:
        "com.example.DataProcessor:pubsub:1":
          policyDescription: "Allow subscribe to sensor topic"
          operations:
            - "aws.greengrass#SubscribeToTopic"
          resources:
            - "sensor/temperature/ready"

Policy IDの命名規則

Policy IDはデバイス上の全コンポーネントを通じて一意でなければなりません。推奨フォーマット:

{コンポーネント名}:{サービス短縮名}:{連番}

例: com.example.SensorPublisher:pubsub:1com.example.CloudUploader:mqtt:1

ワイルドカードの罠 ── 場所で意味が変わる!

自分もここでハマりました。 resourcesフィールドとSubscribeToTopictopicパラメータでは、ワイルドカードのルールが全く違います

resourcesフィールド(認可ポリシー)→ Greengrass独自ルール

# ✅ Greengrass独自の * ワイルドカード
resources:
  - "*"               # 全トピック許可
  - "sensor/*"        # sensor/ 以下すべて

# ✅ v2.6.0+: 文字列内の任意の位置に * を使える
resources:
  - "factory/1/devices/Thermo*/status"  # Thermo で始まるデバイス

# ❌ MQTTワイルドカードは使えない!
resources:
  - "sensor/#"        # リテラルの "sensor/#" にしかマッチしない
  - "sensor/+/ready"  # リテラルの "sensor/+/ready" にしかマッチしない

resources#+を使うと、MQTTワイルドカードとして解釈されずリテラル文字として扱われます。例えば "sensor/#"sensor/temperature/ready にマッチしませんsensor/ 以下すべてに許可を出したい場合は "sensor/*" を使ってください。

SubscribeToTopicのtopicパラメータ(Nucleus v2.6.0+)→ MQTTワイルドカード

# ✅ MQTTワイルドカード(# と +)が使える
ipc.subscribe_to_topic(topic="sensor/#")           # sensor/ 以下の全階層
ipc.subscribe_to_topic(topic="sensor/+/ready")     # 1階層だけワイルド

# ❌ Greengrass独自の * は使えない
ipc.subscribe_to_topic(topic="sensor/*")            # 動作しない

📖 公式ドキュメント: resourcesのワイルドカードルールはIPC authorization policies、topicのワイルドカードルールはSubscribeToTopicを参照

ワイルドカード使用時のresources設定

ワイルドカードでサブスクライブする場合、resourcesも対応する範囲を許可する必要があります:

# SubscribeToTopic(topic="sensor/#") に対応するresources設定
accessControl:
  aws.greengrass.ipc.pubsub:
    "com.example.Processor:pubsub:1":
      operations:
        - "aws.greengrass#SubscribeToTopic"
      resources:
        - "sensor/*"    # ← resourcesでは * を使う

receiveModeの落とし穴(Nucleus v2.6.0+)

Nucleus v2.6.0以降では、サブスクライブ時に自分自身が送ったメッセージを受信するかどうかを制御するreceiveModeパラメータが追加されました。

トピックの種類 デフォルトのreceiveMode 意味
ワイルドカードなし RECEIVE_ALL_MESSAGES 自分のメッセージも受信
ワイルドカードあり RECEIVE_MESSAGES_FROM_OTHERS 自分のメッセージは除外

つまり、ワイルドカードでサブスクライブすると、自分がPublishしたメッセージがデフォルトで届きません。同一コンポーネント内でPublish→Subscribeのテストをする際にハマりやすいポイントです。

receiveModeを使うには SDK バージョンも要確認: Python v2 は v1.11.3以上 が必要です。

{iot:thingName}を使った複数デバイス対応(v2.6.0+)

複数デバイスに同じコンポーネントをデプロイする場合、{iot:thingName}変数でデバイス名をトピックに埋め込めます:

# Nucleusの設定で interpolateComponentConfiguration: true が必要
resources:
  - "devices/{iot:thingName}/telemetry"

これにより、デバイスAではdevices/DeviceA/telemetry、デバイスBではdevices/DeviceB/telemetryに自動展開されます。

📖 公式ドキュメント: Recipe variables / Greengrass nucleus configuration

UnauthorizedErrorが出たら

最も多い原因は設定の混同です。このチェックリストを上から順に確認してください:

  1. IPC識別子の確認: aws.greengrass.ipc.pubsubaws.greengrass.ipc.mqttproxy を混同していないか
  2. 操作名の確認: PublishToTopicPublishToIoTCore を混同していないか
  3. Policy IDの重複: 他のコンポーネントと重複していないか(デバイス全体で一意が必要)
  4. トピック名の一致: resourcesのトピック名がコード内のtopicと一致しているか
  5. ワイルドカード: resources#を使っていないか(*を使うべき)
  6. Nucleusバージョン: ワイルドカード機能を使う場合、v2.6.0以上か

Nucleusバージョンと機能の対応表

IPC PubSubの機能はNucleusのバージョンによって段階的に追加されています。使いたい機能に必要なバージョンを確認してください:

Nucleusバージョン 追加された機能
v2.0.0〜 IPC PubSub基本機能(PublishToTopic / SubscribeToTopic)
v2.6.0 SubscribeToTopicでのワイルドカード#/+)サポート
v2.6.0 receiveModeパラメータ(自分のメッセージの受信制御)
v2.6.0 context.topic(実際のトピック名取得)
v2.6.0 resourcesでの詳細ワイルドカード(文字列途中の*
v2.6.0 **{iot:thingName}**変数のポリシー内使用
v2.10.0 MQTT 5サポート(MQTT Proxy側のみ。メッセージにメタデータを付与可能に)
v2.11.0 MQTTスプーラーのディスク保存(オフライン時のメッセージ保存先をメモリからディスクに変更可能)

📖 公式ドキュメント: Greengrass nucleus component


まとめ

押さえるべきポイントは3つです。

  1. IPC PubSubはMQTTではない: Nucleus内蔵のEventStreamRPCによるローカル通信。オフラインでも動作し、サブミリ秒のレイテンシ
  2. IPC Client V2を使う: コールバックが別スレッドで実行されるため、コールバック内からPublishが安全に可能。パイプライン構築に必須
  3. accessControlの設定が肝: IPC識別子(pubsub vs mqttproxy)、操作名(PublishToTopic vs PublishToIoTCore)の混同が最大の落とし穴。ワイルドカードは場所によってルールが異なる(resourcesは*、topicは#/+

V2のコールバック内Publishを活かせば、収集→前処理→推論→クラウド送信のようなパイプライン連鎖も1つのコールバックで書けます。

参考リソース

公式ドキュメント

SDK


※当ブログで提供している情報・プログラムコード・設定例などについては、正確性や安全性を保つよう努めておりますが、その内容を保証するものではありません。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?