はじめに
前編からの続きです!
前編はここからどうぞ👀👀
検証開始
前回はセンサデータをGreengrassに収集するところまでを実装しました。
今回はGreengrass上でデータの加工と、Stream Manager経由でIoT SiteWiseへデータ転送するところを実装します。
- Stream Managerのインストール
- データ加工&データ転送コンポーネント作成
1. Stream Managerのインストール
Stream Managerの詳細はこちらを確認。
Stream manager - AWS IoT Greengrass
利用のメリットとしてはこんな感じです。
この辺りがSiteWise APIを用いてデータ転送する時と比べたメリットになるかと
- クラウドへの転送が自動で行われる
- 帯域幅の制御や、タイムアウト、接続・切断時の処理も自動で行われる
- 上記設定を自分で設定することも可能
コンポーネントのインストール
Greengrassの設定画面からStream Managerのコンポーネントを選択してあげるだけで、簡単にインストールできます。
SDKのインストール
以下を参考にStream Manager SDkをインストールします。
ストリームマネージャーを使用するカスタムコンポーネントを作成する
sudo git clone https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-python.git
cd aws-greengrass-stream-manager-sdk-python
zip -rv stream_manager_sdk.zip stream_manager
sudo zip -rv stream_manager_sdk.zip stream_manager
cp {stream_manager_sdk.zip,requirements.txt} ~/{artifactのパス}
recipeの更新
先ほどインストールしたSDKをアーティファクトとして定義。
ライフサイクルでrequirements.txtからインストールします。
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "hello_world_subscriber",
"ComponentVersion": "1.0.0",
"ComponentDescription": "Uses stream manager to upload a file to an S3 bucket.",
"ComponentPublisher": "Amazon",
"ComponentDependencies": {
"aws.greengrass.StreamManager": {
"VersionRequirement": "^2.0.0"
}
},
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.clientdevices.MyHelloWorldSubscriber:pubsub:1": {
"policyDescription": "Allows access to subscribe to all topics.",
"operations": [
"aws.greengrass#SubscribeToTopic"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Platform": {
"os": "linux"
},
"Lifecycle": {
"install": "pip3 install --user -r {artifacts:path}/requirements.txt awsiotsdk",
"run": "export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk; python3 {artifacts:path}/stream_manager_s3.py"
},
"Artifacts": [
{
"URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.clientdevices.MyHelloWorldSubscriber/1.0.0/stream_manager_sdk.zip",
"Unarchive": "ZIP"
},
{
"URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.clientdevices.MyHelloWorldSubscriber/1.0.0/stream_manager_s3.py"
},
{
"URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.clientdevices.MyHelloWorldSubscriber/1.0.0/requirements.txt"
}
]
},
{
"Platform": {
"os": "windows"
},
"Lifecycle": {
"install": "pip3 install --user -r {artifacts:path}/requirements.txt awsiotsdk",
"run": "set \"PYTHONPATH=%PYTHONPATH%;{artifacts:decompressedPath}/stream_manager_sdk\" & py -3 {artifacts:path}/stream_manager_s3.py"
},
"Artifacts": [
{
"URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.clientdevices.MyHelloWorldSubscriber/1.0.0/stream_manager_sdk.zip",
"Unarchive": "ZIP"
},
{
"URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.clientdevices.MyHelloWorldSubscriber/1.0.0/stream_manager_s3.py"
},
{
"URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.clientdevices.MyHelloWorldSubscriber/1.0.0/requirements.txt"
}
]
}
]
}
2. データ加工&データ転送コンポーネント作成
以下にStream Manager経由でIoT SiteWiseに転送するためのコードのサンプルが記載されています。
aws-greengrass-stream-manager-sdk-python/samples
/stream_manager_ioT_siteWise.py
こちらを参考に、データ加工部分を追加してデータ転送します。
今回のデータ加工はMessage:"Hello World!!"を"Send SiteWise"に変更するだけです。
データ加工&データ転送コード例
import sys
import time
import traceback
import asyncio
import calendar
import logging
import random
import uuid
import json
from stream_manager import (
AssetPropertyValue,
ExportDefinition,
IoTSiteWiseConfig,
MessageStreamDefinition,
PutAssetPropertyValueEntry,
Quality,
ResourceNotFoundException,
StrategyOnFull,
StreamManagerClient,
TimeInNanos,
Variant,
)
from stream_manager.util import Util
from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
CLIENT_DEVICE_HELLO_WORLD_TOPIC = 'clients/+/hello/world'
TIMEOUT = 10
# This will create a random asset property value entry and return it to the caller.
def get_message_site_wise_entry(message):
# SiteWise requires unique timestamps in all messages and also needs timstamps not earlier
# than 10 minutes in the past. Add some randomness to time and offset.
# Note: Inorder to create a new asset property data, you should use the classes defined in the
# greengrasssdk.stream_manager module.
time_in_nanos = TimeInNanos(
time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
)
variant = Variant(message)
asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
return PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias=CLIENT_DEVICE_HELLO_WORLD_TOPIC, property_values=asset)
def on_hello_world_message(event):
try:
message = str(event.binary_message.message, 'utf-8')
json_dict = json.loads(message)
json_dict["message"] = "Send SiteWise"
message = json.dumps(json_dict)
print('Received new message: %s' % message, file=sys.stderr)
client.append_message(stream_name, Util.validate_and_serialize_to_json_bytes(get_message_site_wise_entry(message)))
print('Send to Iot SiteWise: %s' % message, file=sys.stderr)
except Exception as e:
print(e)
print(type(e))
print('Exception occurred when using IPC.', file=sys.stderr)
traceback.print_exc()
try:
ipc_client = GreengrassCoreIPCClientV2()
stream_name = "SomeStream"
client = StreamManagerClient()
# Try deleting the stream (if it exists) so that we have a fresh start
try:
client.delete_message_stream(stream_name=stream_name)
except ResourceNotFoundException:
pass
exports = ExportDefinition(
iot_sitewise=[IoTSiteWiseConfig(identifier="IoTSiteWiseExport" + stream_name, batch_size=5)]
)
client.create_message_stream(
MessageStreamDefinition(
name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports
)
)
# SubscribeToTopic returns a tuple with the response and the operation.
_, operation = ipc_client.subscribe_to_topic(
topic=CLIENT_DEVICE_HELLO_WORLD_TOPIC, on_stream_event=on_hello_world_message)
print('Successfully subscribed to topic: %s' %
CLIENT_DEVICE_HELLO_WORLD_TOPIC, file=sys.stderr)
# Keep the main thread alive, or the process will exit.
try:
while True:
time.sleep(10)
except InterruptedError:
print('Subscribe interrupted.', file=sys.stderr)
operation.close()
except asyncio.TimeoutError:
print("Timed out")
except Exception:
print('Exception occurred when using IPC.', file=sys.stderr)
traceback.print_exc()
finally:
if client:
client.close()
データ転送確認
今回SiteWiseの設定は全て飛ばしましたが、必要な場合はこちらを確認してください。
以下のようにデータが受信できていれば成功です!
おわり
Greengrass V2はまだ出たばかりということもあり、まとめられているサイトが少ないことから実装に苦労しましたが何とかできました。。。
簡単にしかまとめていないですが、センサデータを活用したDXを検証しようとしている人の助けになればなあと。
あくまで個人の復習用にブログをまとめているので、もしわからないこととかあれば気軽にご連絡ください。