0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Iot CoreでAWSへ取り込んだデータを元にIoT Eventsで任意のアクションを実行する

Posted at

はじめに

前回、Raspbery Pi Pico3台から部屋の照明情報をAWS IoT Coreに送信し、DynamoDBへ保存できるようになった。単にデータを保存するだけでは面白くないので、保存されているデータを確認し、IoT Eventsを使ってユーザーへの通知機能を実装する。

前回記事:https://qiita.com/bd8z/items/e12e6885b627fc444eeb

この記事で紹介すること

  • DynamoDBからS3へAWS Glueを使ったデータのエクスポート
  • DynamoDBに保存したIoTデータの中身の確認、アラームの設計
  • IoT Eventで、Iot Coreデータを元に次アクションにつながるAWS部分の実装

この記事で紹介しないこと

  • DynamoDB/S3/Glue等、各種AWSサービスの詳細な説明
  • boto3(AWS Python SDK)の詳細な説明
  • LINE Notify APIを使った通知部分の実装(SNS/Lambda/API)

前提

  • 集めたデータは5/1~5/5の約5日間、60秒間隔の照度データ
  • Raspbery Pico Wは3台、それぞれ設置場所が異なる
picoW_5c picoW_5d picoW_5b
IMG_0376.jpg IMG_0377.jpg IMG_0378.jpg
南向きの窓 洗面所の電灯裏 クローゼット棚上

モチベーション

  • IoTデータをトリガーに何らか生活に役立つアクションをさせたい
  • AWS Glueを使ったことがなかったので使ってみたい
  • AWS IoT Eventsも使ってみたい

Step 1 IoTデータの確認

データをcsvの形でS3へ出力して、そのデータを確認する。

1.1 AWS GlueによるDynamoDBデータの出力

DynamoDBデータをそのままデータをcsvに出力したいので、AWS Glueを使ってみる。Glueでは、DynamoDBやRDSなどのデータソースから、画面からデータマッピングなどETL作業をビジュアルUIで簡単に操作できる。ジョブという作業単位を定義し、GlueへIAMロールを与える(PassRoleする)ことで、各種データへのアクセス・変換・出力(ETL)が可能になる。自前で定義できるScriptの機能からも分かるが、Glueは裏で、Apache Sparkが動いているサーバーレスETLサービスである。

実際の設定

DynamoDBからS3へデータを出力する。
対象とするDynamoDBのテーブル・カラム、スキーマの変換、出力先バケットを定義する。
image.png

引っ掛かりがちなポイント

  • 出力データカラム
    • DynamoDBからデータを取り出す際、Output schemaはデフォルトでプライマリーキーとソートキーしか選択されなかった。ユーザーが明示的に出力データカラムを定義する必要がある。
  • Glueに設定するIAM権限の設定
    • GlueがIAM権限を引き受けてELTジョブを実行するため、sts:AssumeRoleが必要
    • データソースがDynamoDBの場合、dynamodb:DescribeExport,dynamodb:ExportTableToPointInTime権限が必要
  • Schedule設定
    • デフォルトでScheduleが設定され、今後毎日ETLジョブが定期実行され続ける。
    • 請求はDPUのUnit数x稼働時間となるため、IoTデータの蓄積によりデータソースのデータの量が増えるほど変換時間が増え、雪だるま式に支払額が増えていく。

1.2 S3からデータの取り出しと中身の確認

取得した各種データを可視化して、アラームの設計をしていく。
まず、S3から更新が1日以内のファイルをダウンロードする。

In1
import boto3

my_session = boto3.Session(profile_name=profileName)
s3 = my_session.client('s3')
bucketName = "iotcore-home-export"
objectList = s3.list_objects(Bucket=bucketName)

for object in objectList["Contents"]:
    if ((datetime.datetime.now(datetime.timezone.utc) - object["LastModified"]).days < 1):
        s3.download_file(Bucket=bucketName,Key = object["Key"],Filename=object["Key"])

ファイルが複数に分かれているので一つのDataFrameにまとめる。

In2
import pandas as pd
import glob

fileList = glob.glob("run-1683255843130**")

dfConcat = pd.DataFrame()

for file in fileList:
    df_ = pd.read_csv(file)
    dfConcat = pd.concat([dfConcat,df_])

dfConcat

出力:
image.png

Unix時刻をutc時刻にするなど元のDataFrameを加工する。
時系列のグラフと、hour単位のトレンドデータとして可視化する。

In3
import datetime
import matplotlib.pyplot as plt

#データの成型
utc_day = [datetime.datetime.fromtimestamp(float(time)/1000).day for time in list(dfConcat["timeStamp"])]
utc_hour = [datetime.datetime.fromtimestamp(float(time)/1000).hour for time in list(dfConcat["timeStamp"])]
utc_second = [datetime.datetime.fromtimestamp(float(time)/1000).second for time in list(dfConcat["timeStamp"])]
utc_minute = [datetime.datetime.fromtimestamp(float(time)/1000).minute for time in list(dfConcat["timeStamp"])]

utc_floatHour = [datetime.datetime.fromtimestamp(float(time)/1000).minute/60 + datetime.datetime.fromtimestamp(float(time)/1000).hour for time in list(dfConcat["timeStamp"])]

dfConcat["utc_hour"] = utc_hour
dfConcat["utc_second"] = utc_second
dfConcat["utc_day"] = utc_day
dfConcat["utc_floatHour"] = utc_floatHour

#グラフ描画
fig = plt.figure(figsize=(7,7))

v_Num = 2
h_Num = 1
graphNum = v_Num*h_Num

titles = ["unix-time[msec] vs illuminance[volt]","utc-hour[hour] vs illuminance[volt]"]

axDict = {}
for i in range((h_Num)):
    for ii in range((v_Num)):
        axDict["ax"+str(i*ii + ii)] = fig.add_subplot(graphNum,i+1,ii+1)
        axDict["ax"+str(i*ii + ii)].grid()
        axDict["ax"+str(i*ii + ii)].minorticks_on()
        axDict["ax"+str(i*ii + ii)].grid(True, which='minor', color='#999999', linestyle='-', alpha=0.2)
        axDict["ax"+str(i*ii + ii)].set_title(titles[i*ii + ii])

axList = list(axDict.keys())

#fig1
deviceList = dfConcat["deviceID"].unique()
device = deviceList[1]
df_slice = dfConcat[dfConcat["deviceID"]==device]
axDict[axList[0]].scatter(df_slice["timeStamp"], df_slice["illuminance"],s=10)

#fig2
days = sorted (df_slice["utc_day"].unique())
for day in days:
    slice_of_slice = df_slice[df_slice["utc_day"]==day]
    axDict[axList[1]].scatter(slice_of_slice["utc_floatHour"], slice_of_slice["illuminance"],s=1)

axDict[axList[1]].legend(["day" + str(day) for day in days])

fig.tight_layout()
plt.show()

可視化できたグラフから、実際の使われ方を想定して、アラームの閾値を設計する。

名称 picoW_5c picoW_5d picoW_5b
実物 IMG_0376.jpg IMG_0377.jpg IMG_0378.jpg
場所 南向きの窓 洗面所の電灯裏 クローゼット棚上
アラーム 外が暗く部屋の中が明るいと開けた窓から虫が入ってくるため、日が落ちる前に警告させたい 電灯のつけっぱなしを警告させたい 電灯のつけっぱなしを警告させたい
結果 image.png image.png image.png
分析 5日分のトレンド波形がほぼ重なる(5日のデータなので日照時間がほとんど変化ない)。日が落ちると照度が急に落ち込む。曇りでも照度が大きい。 3.0Vと1.5Vの2つの棚がある。前者は風呂の電灯のつけっぱなしに反応している。切り忘れていない場合、風呂や洗面のため1時間以上使っている場合がある。 2.8V程度の棚が一つある。切り忘れていない場合、一瞬で電気が切れる。(使っているはずだが、1分間のサンプリングで捉えられていない。 )
閾値設計 Value<2.5が2回連続 Value>1.3が1.5時間以上 Value>2.5が5分以上

可視化に使用したノートブックはこちら

Step2 IoT Eventsにおけるアラームの実装

IOT Eventsでは「探知機モデル」と「アラームモデル」の2つが実装できる。アラームは特定の値が閾値を超えたときに通知するだけの機能であり、何らかのデータに基づいてアクションさせたい場合は探知機のほうが向いている。探知機モデルは、入力データや内部変数に基づく状態遷移モデルであり、ビジュアルエディターを使って非常に簡単に定義できる。(MatlabにおけるStateflowによく似ている。)

作りたいもの

入力、ルール、探知機モデルがそれぞれ場所に合わせて3つのモデルを用意し、アラート状態に遷移した際、あらかじめ要したSNSトピックにpushしてLINE通知させる。
image.png

IoT Eventsで任意のアクションを実行させるまでには、Iot Core/Iot Eventsの両方で作業が必要となる。
まず、IoT Eventsで入力データを定義し、そのデータに基づく状態遷移モデルを定義する。最後にIoT Core側でルールを定義してIot CoreとIoT Eventsとを接続する。

2.1 IoT Eventsにおける入力の作成

状態遷移モデルへの入力変数を定義する。
使用する入力データのサンプルを、json形式で用意する。1行分のサンプルがあれば十分である。
今回は、3入力とも{"illuminance":2.100096}のサンプルファイルを使用した。

image.png

2.2 状態遷移モデルの定義

状態とトリガーや逐次実行させる関数を定義する。状態遷移のトリガには、入力データや独自の変数に応じてトリガすることができる。トリガの条件は、状態に入った時のOnEnter、その状態に有り入力データが更新された際のOnInput、状態から出る際のOnExitの3点が利用できる。今回は、アラーム状態に陥ったら(OnEnter)、SNSトピックにメッセージをpushする機能を設定した。SNS以外にも、SQSやIoTの別のEvent、Kinesis Firehoseなど実行可能アクションはかなりある様子。

image.png

  • 入力データをそのまま使用する場合
    ネストする形で入力データを$input.{入力名}.{jsonのキー}の形で利用する。
    例:$input.illuminance_bathroom.illuminance
     
  • 独自の変数を利用する場合
    予めOnEnterなどで変数を定義した上で、$valiable.{変数名}の形で利用する。
    例:$valiable.alartCnt

独自の変数の定義の方法
image.png

  • タイマーを使う場合
    タイマーを作成後、返り値がbooleanのtimeout("タイマー名")でトリガーをセットできる。

トリガーの設定では、<==など、一般的な論理演算子が利用できる。
image.png

参考:https://docs.aws.amazon.com/iotevents/latest/developerguide/iotevents-expressions.html

2.3 IoT Core側のデータ定義

IoT Core側からルール作成を行う。IoT Eventsで定義した入力が選べるため、入力変数とルールを実行するためのIAMロールを定義する。
image.png

Step3 作ったしくみの動作確認

  • クローゼットの電灯を付けっぱなしにしてみる。(トリガは5分間)

    • 17:12 電気ON
    • 17:17 LINEへ着信
      image.png
  • 窓の日照の通知

    • 日の入りは18:40だが、暗くなりかけの頃に通知が来ている。(トリガは窓辺の照度ダウン)
      image.png

感想

 今回、RaspberryPi Pico Wのデータをクラウドに送信する足回りの部分から、上がったデータを元にアクションを実行するサービス部分まで、簡単ではあるが一連の仕組みを作った。自動車におけるOTAなど、IoTを使った仕組みにおいて、継続的にサービスをアップグレードしていく仕組みを各社競争して開発しているようだが、IoTの仕組みは今までちゃんとあまり作ったことがなかったので、今回初めてIoTならではの苦労の勘所が理解できたように思う。

実際のIoTサービス開発の環境構築で苦労しそうな点

  • エッジ側へ新しいソフトを届け、更新する仕組みをどう作るか
  • クラウド側で受け取るデータをエッジから飛んでくるデータを受け取る仕組みをどう作るか
  • 受け取ったデータが膨大になりすぎるのでデータのライフサイクルをどうするか
  • 受け取ったデータを分析できる基盤をどうするか
  • 価値還元できる新しいサービスをクラウド側でどう開発し続けていくか

この辺の困りごとは、AWSではIoTシリーズの各種サービスで抑えられている。

 また、RaspberryPi Pico Wの使い方も何となく分かった。今回は、Pico Wはあくまでセンサー値送信、サービスはすべてAWS上で作った。もちろん、RaspberryPi Pico Wは自身がIoTデバイスなので、デバイス自身にタイマーを組み込んで、デバイスからLINE Notify APIへリクエストを送ることもできるが、もし、そのようにした場合、何度もソフトを焼いて検証となり、実装までのトライアンドエラーに非常に手がかか他だろうと思う。デバイスシャドウ、デバイス側でのMQTTサブスクライブを使えば、クラウド側からエッジ側へ値を渡したりはできるが、あくまでパラメータチューニングが精一杯であり、新しいソフトをエッジ側へデプロイできるわけではない。Greengrassのように、あとからIoTデバイスにOTAでソフトを焼ける環境ならまだしも、Pico Wのような非力なマイコンにおいては、マイコンはセンサー値をクラウド側に送るだけに徹して、サービス・アクション部分はクラウドで実装した方がデバッグも簡単で実装まで早く、PoCに向いているスタイルだと感じた。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?