LoginSignup
3
0

More than 3 years have passed since last update.

Greengrass(V1)でストリームマネージャーを利用してみる

Posted at

はじめに

今回、以下の公式ドキュメントの手順通り、Greengrass(V1)でストリームマネージャーを利用してみました。
参考:クラウドへのデータストリームのエクスポート (コンソール)AWS

なお、デバイスは以前GreengrassをインストールしたJetson nanoを利用しています。
参考:Greengrass(V1)をクイックスタートでインストールしてみる

1 事前準備

ストリームマネージャーの使用に必要な環境の設定と、今回のデータ送信先とするKinesis Data Streamsを準備します。

1-1 Javaのインストール

以下のコマンドでJetson nanoにJavaをインストールする

sudo apt install openjdk-8-jdk

1-2 Python3.7のインストール

StreamManagerClient で AWS IoT Greengrass Core SDK for Python を使用するにはPython3.7以降が必要になります。
インストールされていない場合は事前にインストールします。

sudo apt install python3.7

1-3 データの送信先(Kinesis Data Streams)の作成

1.Kinesisのマネジメントコンソールに移動し、「Kinesis Data Streams」にチェックして「データストリームを作成」をクリック
image.png
2.以下の通り作成する

  • データストリーム名:MyKinesisStream
  • 開いているシャードの数:1

image.png
3.正常に作成されたことを確認する
image.png
4.ARNを控えておく

1-4 IAMロールの設定

GreengrassがKinesisにデータを書き込めるよう権限を追加します。

1.IoT Coreのマネジメントコンソールで左のメニューから Greengrass > クラシック(V1) > グループ > 対象のグループ の順にクリック
2.設定 で現在のグループのロールを確認する
image.png
3.IAMのマネージメントコンソールに移動し、左のメニューからポリシーをクリック
4.「ポリシーの作成」をクリックする
5.JSONタブをクリックし、以下の通りポリシーを作成する
※Resource箇所はKinesis作成時に控えておいたARNに置きかえる
※ポリシーの名前は任意の名前とする

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecords"
            ],
            "Resource": [
                "arn:aws:kinesis:リージョン:account-id:stream/MyKinesisStream"
            ]
        }
    ]
}

6.IAMのマネージメントコンソールで左のメニューから ロール をクリック
7.Greengrassグループに現在アタッチされているロールに先ほど作成たポリシーをアタッチする
8.IoT CoreのマネージメントコンソールでGreengrassグループの設定画面に追加したポリシーが表示されていることを確認する

2 Lambda関数の作成(ローカル)

ローカルでストリームマネージャー実行用のLambda関数を作成します。

2-1 greengrasssdkの準備

ストリームマネージャーをLambda関数で利用するためにgreengrasssdk for Pythonを利用します。
利用にあたっては、公式ドキュメントの記載通り、デプロイするLambda関数のパッケージに含めるSDKの用意が必要です。

Greengrass Lambda 関数で SDK を使用するには、AWS Lambda にアップロードする Lambda 関数デプロイパッケージに SDK を含めます。
出典:SDKs 関数のGreengrass Lambda

1.「AWS IoT Greengrass Core SDK for Python」のGitHubレポジトリにアクセスし、Code > Download ZIP の順にクリック
Github:AWS IoT Greengrass Core SDK for Python
image.png
2.ダウンロードした「aws-greengrass-core-sdk-python-master.zip」を解凍する
3.SDKの依存関係をインストールする

#requirements.txtがあるディレクトリに移動
cd ***/aws-greengrass-core-sdk-python-master

#依存関係のインストール
pip install --target . -r requirements.txt

※解凍してできたフォルダに「cbor2」、「cbor2-4.12.dist-info」が追加された
image.png

2-2 関数の作成(ローカル)

1.以下の通り「transfer_stream.py」を作成する
※今回は公式ドキュメント通りに作成

transfer_stream.py
import asyncio
import logging
import random
import time

from greengrasssdk.stream_manager import (
    ExportDefinition,
    KinesisConfig,
    MessageStreamDefinition,
    ReadMessagesOptions,
    ResourceNotFoundException,
    StrategyOnFull,
    StreamManagerClient,
)


# This example creates a local stream named "SomeStream".
# It starts writing data into that stream and then stream manager automatically exports  
# the data to a customer-created Kinesis data stream named "MyKinesisStream". 
# This example runs forever until the program is stopped.

# The size of the local stream on disk will not exceed the default (which is 256 MB).
# Any data appended after the stream reaches the size limit continues to be appended, and
# stream manager deletes the oldest data until the total stream size is back under 256 MB.
# The Kinesis data stream in the cloud has no such bound, so all the data from this script is
# uploaded to Kinesis and you will be charged for that usage.


def main(logger):
    try:
        stream_name = "SomeStream"
        kinesis_stream_name = "MyKinesisStream"

        # Create a client for the StreamManager
        client = StreamManagerClient()

        # Try deleting the stream (if it exists) so that we have a fresh start
        try:
            client.delete_message_stream(stream_name=stream_name)
        except ResourceNotFoundException:
            pass

        exports = ExportDefinition(
            kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)]
        )
        client.create_message_stream(
            MessageStreamDefinition(
                name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports
            )
        )

        # Append two messages and print their sequence numbers
        logger.info(
            "Successfully appended message to stream with sequence number %d",
            client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")),
        )
        logger.info(
            "Successfully appended message to stream with sequence number %d",
            client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")),
        )

        # Try reading the two messages we just appended and print them out
        logger.info(
            "Successfully read 2 messages: %s",
            client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)),
        )

        logger.info("Now going to start writing random integers between 0 and 1000 to the stream")
        # Now start putting in random data between 0 and 1000 to emulate device sensor input
        while True:
            logger.debug("Appending new random integer to stream")
            client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big"))
            time.sleep(1)

    except asyncio.TimeoutError:
        logger.exception("Timed out while executing")
    except Exception:
        logger.exception("Exception while running")


def function_handler(event, context):
    return


logging.basicConfig(level=logging.INFO)
# Start up this sample code
main(logger=logging.getLogger())

2.以下4点を選択して圧縮する
※公式ドキュメントに則って圧縮してできたzipファイルは「transfer_stream_python.zip」とする

  • transfer_stream.py
  • greengrasssdk
  • cbor2
  • cbor2-4.12.dist-info

2-3 Lambda関数の作成(クラウド)

1.Lambdaのマネジメントコンソールで以下の通り関数を作成する

  • 一から作成
  • 関数名:TransferStream ※公式ドキュメントにあわせた
  • ランタイム:Python 3.7

image.png
2.関数が作成されたら、コードタブで アップロード元 > zipファイル の順にクリック
image.png
3.ファイル選択ウィンドウで先ほど作成したzipファイル「transfer_stream_python.zip」を選択して「保存」をクリック
4.コードタブでランタイム設定の「編集」をクリック
5.ハンドラを「transfer_stream.function_handler」に修正して「保存」をクリック
※独自でファイル名、関数名をつけている際は「[プログラムファイル名].[関数名]」とする
image.png
6.関数画面で アクション > 新しいバージョンを発行 をクリックし、「発行」をクリックする
image.png
7.アクション > エイリアスを作成 をクリック
8.以下の通りエイリアス設定を入力し、「保存」をクリックする

  • 名前:GG_TransferStream ※公式ドキュメントにあわせた
  • バージョン:1

image.png

3 Greengrassグループのデプロイ

作成したLambda関数をGreengrassグループに紐づけJetson nanoにデプロイします。

3-1 Lambda関数をグループに追加する

1.IoT Coreのマネジメントコンソールで左のメニューから Greengrass > クラシック(V1) > グループ > 対象のグループ の順にクリック
2.Lambda をクリック > 「Lambdaの追加」をクリック
3.GreengrassグループへのLambdaの追加画面で「既存のLambdaの使用」をクリック
4.作成したLambda関数「transfer_stream」を選択して「次へ」をクリック
5.作成したエイリアス「GG_TransferStream」を選択して「完了」をクリック
6.グループのLambda画面の「TransferStream」上で ... > 設定の編集 の順にクリック
image.png
7.以下の通り設定を変更して「更新」をクリック

  • メモリ制限:32MB
  • Lambdaのライフサイクル:存続期間が長く無制限に稼働する関数にする

image.png

3-2 ストリームマネージャーの有効化

ストリームマネージャーを利用するため、コンソール画面でストリームマネージャーを有効化します。

1.IoT Coreのマネジメントコンソールで左のメニューから Greengrass > クラシック(V1) > グループ > 対象のグループ の順にクリック
2.設定 をクリックし「ストリームマネージャー」が有効になっているか確認する
※以降は「無効」だった場合の手順
image.png
3.編集 をクリックし、設定画面で「有効化」を選択して「保存」をクリック
image.png
4.グループの設定画面で「ストリームマネージャー」が有効になっていることを確認する

3-3 グループのデプロイ

1.Jetson nano上でGreengrassが起動していることを確認する
※起動していないとデプロイできない

#Greengrassが起動しているか確認
ps aux | grep -E "greengrass.*daemon"

#起動していない場合は以下のコマンドで起動
sudo /greengrass/ggc/core/greengrassd start

2.IoT Coreのマネジメントコンソールで 左のメニューから Greengrass > クラシック(V1) > グループ の順にクリック
3.一覧で作成したグループ名をクリックして詳細画面に遷移し、アクション > デプロイの順にクリック
image.png
4.「正常に完了しました」と表示されることを確認する

4 動作検証

Kinesis Data StreamsにJetson nanoからデータが送られることを確認します。

1.Kinesisのマネジメントコンソール > データストリームの数字 の順にクリック
※以下画像の場合は「1」がリンクになっている
image.png
2.作成したストリーム「MyKinesisStream」をクリック
3.成功するとモニタリングタブの「Put レコード」などに値が表示される
※デプロイ後、表示されるまでに数分ほどかかった
image.png

5 後片付け

以下を削除します。

  • デプロイした関数「TransferStream」
  • Kinesis Data Streamsで作成したデータストリーム「MyKinesisStream」

5-1 Lambda関数「TransferStream」の削除

1.IoT Coreのマネジメントコンソールで 左のメニューから Greengrass > クラシック(V1) > グループ の順にクリック
2.一覧で作成したグループ名をクリックして詳細画面に遷移し、Lambda をクリック
3.「TransferStream」上で ... > 関数の削除 の順にクリック
4.グループをデプロイする
※Lambdaには関数が残っているので、不要な場合はそちらも削除する

5-2 データストリーム「MyKinesisStream」の削除

1.Kinesisのマネジメントコンソール > データストリームの数字 の順にクリック
2.作成した「MyKinesisStream」を選択して、アクション > 削除 の順にクリック

6 おわりに

今回は公式ドキュメント通りに実施し、ストリームマネージャーを使えるところまでざっくりと確認できました。
以降、これをベースとしてデータのフィルタリングや集約、他のAWSサービスへのエクスポートなどを行おうと思います。

7 参考文献(文中で登場していないもの)

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0