#はじめに
今回、以下の公式ドキュメントの手順通り、Greengrass(V1)でストリームマネージャーを利用してみました。
参考:クラウドへのデータストリームのエクスポート (コンソール)AWS
なお、デバイスは以前GreengrassをインストールしたJetson nanoを利用しています。
参考:Greengrass(V1)をクイックスタートでインストールしてみる
#1 事前準備
ストリームマネージャーの使用に必要な環境の設定と、今回のデータ送信先とするKinesis Data Streamsを準備します。
##1-1 Javaのインストール
以下のコマンドでJetson nanoにJavaをインストールする
sudo apt install openjdk-8-jdk
##1-2 Python3.7のインストール
StreamManagerClient で AWS IoT Greengrass Core SDK for Python を使用するにはPython3.7以降が必要になります。
インストールされていない場合は事前にインストールします。
sudo apt install python3.7
##1-3 データの送信先(Kinesis Data Streams)の作成
1.Kinesisのマネジメントコンソールに移動し、「Kinesis Data Streams」にチェックして「データストリームを作成」をクリック
2.以下の通り作成する
- データストリーム名:MyKinesisStream
- 開いているシャードの数:1
3.正常に作成されたことを確認する
4.ARNを控えておく
##1-4 IAMロールの設定
GreengrassがKinesisにデータを書き込めるよう権限を追加します。
1.IoT Coreのマネジメントコンソールで左のメニューから Greengrass > クラシック(V1) > グループ > 対象のグループ の順にクリック
2.設定 で現在のグループのロールを確認する
3.IAMのマネージメントコンソールに移動し、左のメニューからポリシーをクリック
4.「ポリシーの作成」をクリックする
5.JSONタブをクリックし、以下の通りポリシーを作成する
※Resource箇所はKinesis作成時に控えておいたARNに置きかえる
※ポリシーの名前は任意の名前とする
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecords"
],
"Resource": [
"arn:aws:kinesis:リージョン:account-id:stream/MyKinesisStream"
]
}
]
}
6.IAMのマネージメントコンソールで左のメニューから ロール をクリック
7.Greengrassグループに現在アタッチされているロールに先ほど作成たポリシーをアタッチする
8.IoT CoreのマネージメントコンソールでGreengrassグループの設定画面に追加したポリシーが表示されていることを確認する
#2 Lambda関数の作成(ローカル)
ローカルでストリームマネージャー実行用のLambda関数を作成します。
##2-1 greengrasssdkの準備
ストリームマネージャーをLambda関数で利用するためにgreengrasssdk for Pythonを利用します。
利用にあたっては、公式ドキュメントの記載通り、デプロイするLambda関数のパッケージに含めるSDKの用意が必要です。
Greengrass Lambda 関数で SDK を使用するには、AWS Lambda にアップロードする Lambda 関数デプロイパッケージに SDK を含めます。
出典:SDKs 関数のGreengrass Lambda
1.「AWS IoT Greengrass Core SDK for Python」のGitHubレポジトリにアクセスし、Code > Download ZIP の順にクリック
Github:AWS IoT Greengrass Core SDK for Python
2.ダウンロードした「aws-greengrass-core-sdk-python-master.zip」を解凍する
3.SDKの依存関係をインストールする
#requirements.txtがあるディレクトリに移動
cd ***/aws-greengrass-core-sdk-python-master
#依存関係のインストール
pip install --target . -r requirements.txt
※解凍してできたフォルダに「cbor2」、「cbor2-4.12.dist-info」が追加された
##2-2 関数の作成(ローカル)
1.以下の通り「transfer_stream.py」を作成する
※今回は公式ドキュメント通りに作成
import asyncio
import logging
import random
import time
from greengrasssdk.stream_manager import (
ExportDefinition,
KinesisConfig,
MessageStreamDefinition,
ReadMessagesOptions,
ResourceNotFoundException,
StrategyOnFull,
StreamManagerClient,
)
# This example creates a local stream named "SomeStream".
# It starts writing data into that stream and then stream manager automatically exports
# the data to a customer-created Kinesis data stream named "MyKinesisStream".
# This example runs forever until the program is stopped.
# The size of the local stream on disk will not exceed the default (which is 256 MB).
# Any data appended after the stream reaches the size limit continues to be appended, and
# stream manager deletes the oldest data until the total stream size is back under 256 MB.
# The Kinesis data stream in the cloud has no such bound, so all the data from this script is
# uploaded to Kinesis and you will be charged for that usage.
def main(logger):
try:
stream_name = "SomeStream"
kinesis_stream_name = "MyKinesisStream"
# Create a client for the StreamManager
client = StreamManagerClient()
# Try deleting the stream (if it exists) so that we have a fresh start
try:
client.delete_message_stream(stream_name=stream_name)
except ResourceNotFoundException:
pass
exports = ExportDefinition(
kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)]
)
client.create_message_stream(
MessageStreamDefinition(
name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports
)
)
# Append two messages and print their sequence numbers
logger.info(
"Successfully appended message to stream with sequence number %d",
client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")),
)
logger.info(
"Successfully appended message to stream with sequence number %d",
client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")),
)
# Try reading the two messages we just appended and print them out
logger.info(
"Successfully read 2 messages: %s",
client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)),
)
logger.info("Now going to start writing random integers between 0 and 1000 to the stream")
# Now start putting in random data between 0 and 1000 to emulate device sensor input
while True:
logger.debug("Appending new random integer to stream")
client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big"))
time.sleep(1)
except asyncio.TimeoutError:
logger.exception("Timed out while executing")
except Exception:
logger.exception("Exception while running")
def function_handler(event, context):
return
logging.basicConfig(level=logging.INFO)
# Start up this sample code
main(logger=logging.getLogger())
2.以下4点を選択して圧縮する
※公式ドキュメントに則って圧縮してできたzipファイルは「transfer_stream_python.zip」とする
- transfer_stream.py
- greengrasssdk
- cbor2
- cbor2-4.12.dist-info
##2-3 Lambda関数の作成(クラウド)
1.Lambdaのマネジメントコンソールで以下の通り関数を作成する
- 一から作成
- 関数名:TransferStream ※公式ドキュメントにあわせた
- ランタイム:Python 3.7
2.関数が作成されたら、コードタブで アップロード元 > zipファイル の順にクリック
3.ファイル選択ウィンドウで先ほど作成したzipファイル「transfer_stream_python.zip」を選択して「保存」をクリック
4.コードタブでランタイム設定の「編集」をクリック
5.ハンドラを「transfer_stream.function_handler」に修正して「保存」をクリック
※独自でファイル名、関数名をつけている際は「[プログラムファイル名].[関数名]」とする
6.関数画面で アクション > 新しいバージョンを発行 をクリックし、「発行」をクリックする
7.アクション > エイリアスを作成 をクリック
8.以下の通りエイリアス設定を入力し、「保存」をクリックする
- 名前:GG_TransferStream ※公式ドキュメントにあわせた
- バージョン:1
#3 Greengrassグループのデプロイ
作成したLambda関数をGreengrassグループに紐づけJetson nanoにデプロイします。
##3-1 Lambda関数をグループに追加する
1.IoT Coreのマネジメントコンソールで左のメニューから Greengrass > クラシック(V1) > グループ > 対象のグループ の順にクリック
2.Lambda をクリック > 「Lambdaの追加」をクリック
3.GreengrassグループへのLambdaの追加画面で「既存のLambdaの使用」をクリック
4.作成したLambda関数「transfer_stream」を選択して「次へ」をクリック
5.作成したエイリアス「GG_TransferStream」を選択して「完了」をクリック
6.グループのLambda画面の「TransferStream」上で ... > 設定の編集 の順にクリック
7.以下の通り設定を変更して「更新」をクリック
- メモリ制限:32MB
- Lambdaのライフサイクル:存続期間が長く無制限に稼働する関数にする
##3-2 ストリームマネージャーの有効化
ストリームマネージャーを利用するため、コンソール画面でストリームマネージャーを有効化します。
1.IoT Coreのマネジメントコンソールで左のメニューから Greengrass > クラシック(V1) > グループ > 対象のグループ の順にクリック
2.設定 をクリックし「ストリームマネージャー」が有効になっているか確認する
※以降は「無効」だった場合の手順
3.編集 をクリックし、設定画面で「有効化」を選択して「保存」をクリック
4.グループの設定画面で「ストリームマネージャー」が有効になっていることを確認する
##3-3 グループのデプロイ
1.Jetson nano上でGreengrassが起動していることを確認する
※起動していないとデプロイできない
#Greengrassが起動しているか確認
ps aux | grep -E "greengrass.*daemon"
#起動していない場合は以下のコマンドで起動
sudo /greengrass/ggc/core/greengrassd start
2.IoT Coreのマネジメントコンソールで 左のメニューから Greengrass > クラシック(V1) > グループ の順にクリック
3.一覧で作成したグループ名をクリックして詳細画面に遷移し、アクション > デプロイの順にクリック
4.「正常に完了しました」と表示されることを確認する
#4 動作検証
Kinesis Data StreamsにJetson nanoからデータが送られることを確認します。
1.Kinesisのマネジメントコンソール > データストリームの数字 の順にクリック
※以下画像の場合は「1」がリンクになっている
2.作成したストリーム「MyKinesisStream」をクリック
3.成功するとモニタリングタブの「Put レコード」などに値が表示される
※デプロイ後、表示されるまでに数分ほどかかった
#5 後片付け
以下を削除します。
- デプロイした関数「TransferStream」
- Kinesis Data Streamsで作成したデータストリーム「MyKinesisStream」
##5-1 Lambda関数「TransferStream」の削除
1.IoT Coreのマネジメントコンソールで 左のメニューから Greengrass > クラシック(V1) > グループ の順にクリック
2.一覧で作成したグループ名をクリックして詳細画面に遷移し、Lambda をクリック
3.「TransferStream」上で ... > 関数の削除 の順にクリック
4.グループをデプロイする
※Lambdaには関数が残っているので、不要な場合はそちらも削除する
##5-2 データストリーム「MyKinesisStream」の削除
1.Kinesisのマネジメントコンソール > データストリームの数字 の順にクリック
2.作成した「MyKinesisStream」を選択して、アクション > 削除 の順にクリック
#6 おわりに
今回は公式ドキュメント通りに実施し、ストリームマネージャーを使えるところまでざっくりと確認できました。
以降、これをベースとしてデータのフィルタリングや集約、他のAWSサービスへのエクスポートなどを行おうと思います。
#7 参考文献(文中で登場していないもの)