はじめに
前回、Raspbery Pi Pico3台から部屋の照明情報をAWS IoT Coreに送信し、DynamoDBへ保存できるようになった。単にデータを保存するだけでは面白くないので、保存されているデータを確認し、IoT Eventsを使ってユーザーへの通知機能を実装する。
前回記事:https://qiita.com/bd8z/items/e12e6885b627fc444eeb
この記事で紹介すること
- DynamoDBからS3へAWS Glueを使ったデータのエクスポート
- DynamoDBに保存したIoTデータの中身の確認、アラームの設計
- IoT Eventで、Iot Coreデータを元に次アクションにつながるAWS部分の実装
この記事で紹介しないこと
- DynamoDB/S3/Glue等、各種AWSサービスの詳細な説明
- boto3(AWS Python SDK)の詳細な説明
- LINE Notify APIを使った通知部分の実装(SNS/Lambda/API)
前提
- 集めたデータは5/1~5/5の約5日間、60秒間隔の照度データ
- Raspbery Pico Wは3台、それぞれ設置場所が異なる
picoW_5c | picoW_5d | picoW_5b |
---|---|---|
![]() |
![]() |
![]() |
南向きの窓 | 洗面所の電灯裏 | クローゼット棚上 |
モチベーション
- IoTデータをトリガーに何らか生活に役立つアクションをさせたい
- AWS Glueを使ったことがなかったので使ってみたい
- AWS IoT Eventsも使ってみたい
Step 1 IoTデータの確認
データをcsvの形でS3へ出力して、そのデータを確認する。
1.1 AWS GlueによるDynamoDBデータの出力
DynamoDBデータをそのままデータをcsvに出力したいので、AWS Glueを使ってみる。Glueでは、DynamoDBやRDSなどのデータソースから、画面からデータマッピングなどETL作業をビジュアルUIで簡単に操作できる。ジョブという作業単位を定義し、GlueへIAMロールを与える(PassRoleする)ことで、各種データへのアクセス・変換・出力(ETL)が可能になる。自前で定義できるScriptの機能からも分かるが、Glueは裏で、Apache Sparkが動いているサーバーレスETLサービスである。
実際の設定
DynamoDBからS3へデータを出力する。
対象とするDynamoDBのテーブル・カラム、スキーマの変換、出力先バケットを定義する。
引っ掛かりがちなポイント
- 出力データカラム
- DynamoDBからデータを取り出す際、Output schemaはデフォルトでプライマリーキーとソートキーしか選択されなかった。ユーザーが明示的に出力データカラムを定義する必要がある。
- Glueに設定するIAM権限の設定
- GlueがIAM権限を引き受けてELTジョブを実行するため、
sts:AssumeRole
が必要 - データソースがDynamoDBの場合、
dynamodb:DescribeExport
,dynamodb:ExportTableToPointInTime
権限が必要
- GlueがIAM権限を引き受けてELTジョブを実行するため、
- Schedule設定
- デフォルトでScheduleが設定され、今後毎日ETLジョブが定期実行され続ける。
- 請求はDPUのUnit数x稼働時間となるため、IoTデータの蓄積によりデータソースのデータの量が増えるほど変換時間が増え、雪だるま式に支払額が増えていく。
1.2 S3からデータの取り出しと中身の確認
取得した各種データを可視化して、アラームの設計をしていく。
まず、S3から更新が1日以内のファイルをダウンロードする。
import boto3
my_session = boto3.Session(profile_name=profileName)
s3 = my_session.client('s3')
bucketName = "iotcore-home-export"
objectList = s3.list_objects(Bucket=bucketName)
for object in objectList["Contents"]:
if ((datetime.datetime.now(datetime.timezone.utc) - object["LastModified"]).days < 1):
s3.download_file(Bucket=bucketName,Key = object["Key"],Filename=object["Key"])
ファイルが複数に分かれているので一つのDataFrameにまとめる。
import pandas as pd
import glob
fileList = glob.glob("run-1683255843130**")
dfConcat = pd.DataFrame()
for file in fileList:
df_ = pd.read_csv(file)
dfConcat = pd.concat([dfConcat,df_])
dfConcat
Unix時刻をutc時刻にするなど元のDataFrameを加工する。
時系列のグラフと、hour単位のトレンドデータとして可視化する。
import datetime
import matplotlib.pyplot as plt
#データの成型
utc_day = [datetime.datetime.fromtimestamp(float(time)/1000).day for time in list(dfConcat["timeStamp"])]
utc_hour = [datetime.datetime.fromtimestamp(float(time)/1000).hour for time in list(dfConcat["timeStamp"])]
utc_second = [datetime.datetime.fromtimestamp(float(time)/1000).second for time in list(dfConcat["timeStamp"])]
utc_minute = [datetime.datetime.fromtimestamp(float(time)/1000).minute for time in list(dfConcat["timeStamp"])]
utc_floatHour = [datetime.datetime.fromtimestamp(float(time)/1000).minute/60 + datetime.datetime.fromtimestamp(float(time)/1000).hour for time in list(dfConcat["timeStamp"])]
dfConcat["utc_hour"] = utc_hour
dfConcat["utc_second"] = utc_second
dfConcat["utc_day"] = utc_day
dfConcat["utc_floatHour"] = utc_floatHour
#グラフ描画
fig = plt.figure(figsize=(7,7))
v_Num = 2
h_Num = 1
graphNum = v_Num*h_Num
titles = ["unix-time[msec] vs illuminance[volt]","utc-hour[hour] vs illuminance[volt]"]
axDict = {}
for i in range((h_Num)):
for ii in range((v_Num)):
axDict["ax"+str(i*ii + ii)] = fig.add_subplot(graphNum,i+1,ii+1)
axDict["ax"+str(i*ii + ii)].grid()
axDict["ax"+str(i*ii + ii)].minorticks_on()
axDict["ax"+str(i*ii + ii)].grid(True, which='minor', color='#999999', linestyle='-', alpha=0.2)
axDict["ax"+str(i*ii + ii)].set_title(titles[i*ii + ii])
axList = list(axDict.keys())
#fig1
deviceList = dfConcat["deviceID"].unique()
device = deviceList[1]
df_slice = dfConcat[dfConcat["deviceID"]==device]
axDict[axList[0]].scatter(df_slice["timeStamp"], df_slice["illuminance"],s=10)
#fig2
days = sorted (df_slice["utc_day"].unique())
for day in days:
slice_of_slice = df_slice[df_slice["utc_day"]==day]
axDict[axList[1]].scatter(slice_of_slice["utc_floatHour"], slice_of_slice["illuminance"],s=1)
axDict[axList[1]].legend(["day" + str(day) for day in days])
fig.tight_layout()
plt.show()
可視化できたグラフから、実際の使われ方を想定して、アラームの閾値を設計する。
可視化に使用したノートブックはこちら
Step2 IoT Eventsにおけるアラームの実装
IOT Eventsでは「探知機モデル」と「アラームモデル」の2つが実装できる。アラームは特定の値が閾値を超えたときに通知するだけの機能であり、何らかのデータに基づいてアクションさせたい場合は探知機のほうが向いている。探知機モデルは、入力データや内部変数に基づく状態遷移モデルであり、ビジュアルエディターを使って非常に簡単に定義できる。(MatlabにおけるStateflowによく似ている。)
作りたいもの
入力、ルール、探知機モデルがそれぞれ場所に合わせて3つのモデルを用意し、アラート状態に遷移した際、あらかじめ要したSNSトピックにpushしてLINE通知させる。
IoT Eventsで任意のアクションを実行させるまでには、Iot Core/Iot Eventsの両方で作業が必要となる。
まず、IoT Eventsで入力データを定義し、そのデータに基づく状態遷移モデルを定義する。最後にIoT Core側でルールを定義してIot CoreとIoT Eventsとを接続する。
2.1 IoT Eventsにおける入力の作成
状態遷移モデルへの入力変数を定義する。
使用する入力データのサンプルを、json形式で用意する。1行分のサンプルがあれば十分である。
今回は、3入力とも{"illuminance":2.100096}
のサンプルファイルを使用した。
2.2 状態遷移モデルの定義
状態とトリガーや逐次実行させる関数を定義する。状態遷移のトリガには、入力データや独自の変数に応じてトリガすることができる。トリガの条件は、状態に入った時のOnEnter
、その状態に有り入力データが更新された際のOnInput
、状態から出る際のOnExit
の3点が利用できる。今回は、アラーム状態に陥ったら(OnEnter
)、SNSトピックにメッセージをpushする機能を設定した。SNS以外にも、SQSやIoTの別のEvent、Kinesis Firehoseなど実行可能アクションはかなりある様子。
- 入力データをそのまま使用する場合
ネストする形で入力データを$input.{入力名}.{jsonのキー}
の形で利用する。
例:$input.illuminance_bathroom.illuminance
- 独自の変数を利用する場合
予めOnEnter
などで変数を定義した上で、$valiable.{変数名}
の形で利用する。
例:$valiable.alartCnt
- タイマーを使う場合
タイマーを作成後、返り値がbooleanのtimeout("タイマー名")
でトリガーをセットできる。
トリガーの設定では、<
や==
など、一般的な論理演算子が利用できる。
参考:https://docs.aws.amazon.com/iotevents/latest/developerguide/iotevents-expressions.html
2.3 IoT Core側のデータ定義
IoT Core側からルール作成を行う。IoT Eventsで定義した入力が選べるため、入力変数とルールを実行するためのIAMロールを定義する。
Step3 作ったしくみの動作確認
-
クローゼットの電灯を付けっぱなしにしてみる。(トリガは5分間)
-
窓の日照の通知
感想
今回、RaspberryPi Pico Wのデータをクラウドに送信する足回りの部分から、上がったデータを元にアクションを実行するサービス部分まで、簡単ではあるが一連の仕組みを作った。自動車におけるOTAなど、IoTを使った仕組みにおいて、継続的にサービスをアップグレードしていく仕組みを各社競争して開発しているようだが、IoTの仕組みは今までちゃんとあまり作ったことがなかったので、今回初めてIoTならではの苦労の勘所が理解できたように思う。
実際のIoTサービス開発の環境構築で苦労しそうな点
- エッジ側へ新しいソフトを届け、更新する仕組みをどう作るか
- クラウド側で受け取るデータをエッジから飛んでくるデータを受け取る仕組みをどう作るか
- 受け取ったデータが膨大になりすぎるのでデータのライフサイクルをどうするか
- 受け取ったデータを分析できる基盤をどうするか
- 価値還元できる新しいサービスをクラウド側でどう開発し続けていくか
この辺の困りごとは、AWSではIoTシリーズの各種サービスで抑えられている。
また、RaspberryPi Pico Wの使い方も何となく分かった。今回は、Pico Wはあくまでセンサー値送信、サービスはすべてAWS上で作った。もちろん、RaspberryPi Pico Wは自身がIoTデバイスなので、デバイス自身にタイマーを組み込んで、デバイスからLINE Notify APIへリクエストを送ることもできるが、もし、そのようにした場合、何度もソフトを焼いて検証となり、実装までのトライアンドエラーに非常に手がかか他だろうと思う。デバイスシャドウ、デバイス側でのMQTTサブスクライブを使えば、クラウド側からエッジ側へ値を渡したりはできるが、あくまでパラメータチューニングが精一杯であり、新しいソフトをエッジ側へデプロイできるわけではない。Greengrassのように、あとからIoTデバイスにOTAでソフトを焼ける環境ならまだしも、Pico Wのような非力なマイコンにおいては、マイコンはセンサー値をクラウド側に送るだけに徹して、サービス・アクション部分はクラウドで実装した方がデバッグも簡単で実装まで早く、PoCに向いているスタイルだと感じた。