0
Help us understand the problem. What are the problem?

posted at

updated at

IoT リアルタイムデータを Lambda タンブリングウィンドウで集計してみた その1 IoT Core 準備編

はじめに

刻々と数が変化する IoT データや、SNS に投稿されたデータなど、リアルタイムなデータを集めて、集計を行うのは一般的に重労働な作業と言われています。従来のシステムでは集計処理は夜間に行うことがありますが、次のような課題があります。

  • リアルタイムなデータではデータ量が多いため、夜間のバッチ処理で終わらない
  • 1日1回の集計のため、翌日にならないとデータを活用できない

こういった課題を解決するために、AWS ではデータをリアルタイムに処理するための機能があります。すべてを網羅するわけではありませんが、次のような実現方法があります。

  • Kinesis Data Streams と Lambda のタンブリングウィンドウ (Tumbling Window)
  • Kinesis Data Analytics

今回は、Lambda のタンブリングウィンドウを使った集計機能を紹介していきます。タンブリングウィンドウとは、データストリームを一定時間周期で集計を行うときに使われます。イメージ図を載せます。リアルタイムなデータを5分おきに集計するような使い方を想像してみてください。「5分おき」といった一定周期を、この図では window size と表現されています。window 1 は 00:00~00:05 の5分間、windows 2 は 00:05~00:10 の 5分間、といった形で連続的な時間の流れを window に分けて処理をしていく考え方です。

image-20220410230041749.png

引用 : https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/operators/windows/

Lambda のタンブリングウィンドウ機能を使うことで、Kinesis Data Streams に流れているリアルタイムな IoT データを 一定周期おきに集計することが出来ます。Lambda のタンブリングウィンドウ機能では、最長で 15 分おきに集計処理を行うことが出来ます。これよりも長い間隔で集計を行いたい場合は、Kinesis Data Analytics や、独自のコンピュートリソース(ECS Task など) を検討すると良いでしょう。

Lambda のタンブリングウィンドウ機能の動作イメージ画像を引用します。この画像では、30秒おきに集計を行うものになっています。Kinesis Data Streams から流れてくるデータを30秒の間で、複数の Lambda Function がリレーのバトンをつなぐような形で、データを受け渡しを行っています。1 個の Lambda Function が処理できる Kinesis のデータ上限は、batch size で指定できます。言い換えると、30秒の間で batch size 以上のデータが渡ってくる可能性があり、複数の Lambda Function が連携する形で30秒周期を実現しています。このあたりのデータの渡し方は state を使うことになるので、詳細な内容は次の記事で紹介していきます。

image-20220410231042846.png

引用 : https://pages.awscloud.com/rs/112-TZM-766/images/ETL_serverless.pdf

今回の記事では、Lambda のタンブリングウィンドウ機能を使うための事前準備として、IoT Core を準備する内容になっています。IoT Core を飛ばしたい方は、記事その2へ飛んでください。

システム構成図

システムの全体像を記載します。今回の記事の作業範囲は、左側の疑似センサーデバイスと、IoT Core の部分です。興味のない方は次の記事へどうぞ。

image-20220410233142585.png

疑似 IoT センサーデバイスを作成

IoT をつかったリアルタイムデータ処理を実現していきたいですが、物理的なデバイスを用意するのは大変なので、Amazon Linux 2 の環境を疑似的なセンサーデバイスとして利用します。Amazon Linux2 や IoT Core 周りの設定を進めていきます。

IoT Policy の作成

IoT Things に紐づける、IoT Policy を作成します。

image-20220409204921861.png

IoT に関するすべての操作を許可します。実際の本番環境では必要最低限の権限を指定してください。

image-20220409205130703.png

IoT Policy が作成されました。

image-20220409205153892.png

IoT Things の登録

手動で IoT Things を登録します。本番環境では、1個1個作業するのは大変なので、フリートプロビジョニングなどの自動化を検討すると良いでしょう。

image-20220409205304770.png

Next を押します。

image-20220409205432108.png

Thing の名前を指定します。

image-20220409205658985.png

証明書の生成を選択して、Next を押します。

image-20220409205724477.png

前の手順で作成した IoT Policy を指定します

image-20220409205758195.png

マネージメントコンソール上で自動生成された証明書ファイルがダウンロードできるようになりました。以下の3種類をダウンロードします。Root CA はインターネット上からダウンロードできるので、ここでダウンロードしなくても大丈夫です。

  • Device certificate
  • Public key file
  • Private key file

image-20220409210058589.png

正常に Thing が登録されました

image-20220409210257804.png

Python プログラムを準備

Amazon Linux 2 を疑似 IoT センサーデバイスとして利用していきます。Python プログラムから IoT Core に JSON のデータを送信をしていきます。

作業用ディレクトリを用意します。場所はどこでも良いです。

mkdir ~/temp/kinesis-data-analytics
cd ~/temp/kinesis-data-analytics

IoT Core ハンズオンで公開されている Python プログラムをダウンロードします。

wget https://awsj-iot-handson.s3-ap-northeast-1.amazonaws.com/aws-iot-core-workshop/dummy_client/device_main.py -O device_main.py

Python の AWS IoT SDK をインストールします。

pip3 install --user awsiotsdk

Python プログラムを一部書き換えます。9個の部屋に滞在している人数をカウントするデータを自動生成する内容にします。

  • RoomID が 1~9 のデータを生成
  • 部屋に滞在している人数を 10~100の間のランダム値
    while True:
        now = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
        
        for i in range(1, 10):
            roomid = i
            people = 20 + random.randint(-10, 80)
        
            payload = {"ROOM": roomid, "EVENTTIME": now, "PEOPLE": people}
            logger.debug("  payload: %s", payload)

            mqtt_connection.publish(
                topic=topic,
                payload=json.dumps(payload),
                qos=mqtt.QoS.AT_LEAST_ONCE)

        time.sleep(1)

この Python プログラムと同じディレクトリに、証明書を格納するためのディレクトリを格納します

mkdir ~/temp/kinesis-data-analytics/certs

この certs ディレクトリに、IoT Things を登録した時にdownloadした 3 個のファイルを格納します。

> ls -la certs/
total 12
drwxr-xr-x 2 ec2-user docker  273 Apr  9 21:09 ./
drwxr-xr-x 3 ec2-user docker   41 Apr  9 21:09 ../
-rw-r--r-- 1 ec2-user docker 1224 Apr  9 21:04 b6f2bc619f9a65ef05cdca4b9c7604d50f682c5c648856613fbed49c2805ccf9-certificate.pem.crt
-rw-r--r-- 1 ec2-user docker 1675 Apr  9 21:04 b6f2bc619f9a65ef05cdca4b9c7604d50f682c5c648856613fbed49c2805ccf9-private.pem.key
-rw-r--r-- 1 ec2-user docker  451 Apr  9 21:04 b6f2bc619f9a65ef05cdca4b9c7604d50f682c5c648856613fbed49c2805ccf9-public.pem.key

また、インターネットからRoot CA ファイルをダウンロードします

wget https://www.amazontrust.com/repository/AmazonRootCA1.pem -O certs/AmazonRootCA1.pem

疑似 IoT デバイスから、IoT Core にデータを送信

ここまでで疑似 IoT デバイスの準備が出来ました。IoT Core のマネージメントコンソール画面にある Settings から Endpoint を確認します。

image-20220409211211917.png

Python プログラムを起動して、IoT Core にデータを送ってみます

cd ~/temp/kinesis-data-analytics
python3 device_main.py --device_name kinesis-test-device --endpoint aa49ciipg36pc-ats.iot.ap-northeast-1.amazonaws.com

実行例

  • 末尾に topic: data/kinesis-test-device と表示された場合、正常にデータが送信できています
> cd ~/temp/kinesis-data-analytics
  python3 device_main.py --device_name kinesis-test-device --endpoint aa49ciipg36pc-ats.iot.ap-northeast-1.amazonaws.com
device_name: kinesis-test-device
endpoint: aa49ciipg36pc-ats.iot.ap-northeast-1.amazonaws.com
rootca cert: ./certs/AmazonRootCA1.pem
private key: ./certs/b6f2bc619f9a65ef05cdca4b9c7604d50f682c5c648856613fbed49c2805ccf9-private.pem.key
certificate: ./certs/b6f2bc619f9a65ef05cdca4b9c7604d50f682c5c648856613fbed49c2805ccf9-certificate.pem.crt
Check latest Shadow status
Subscribing to Shadow Delta events...
Thing has no shadow document. Creating with defaults...
un subscribe from get shadow events
Updating reported shadow to...
Update request published.
Subscribing to Shadow Update responses...
topic: data/kinesis-test-device

IoT Core のマネージメントコンソール上で、MQTT Topic の中身を確認できます。Topic 名を指定したうえで、Subscribe を押すと右下にデータが表示されます。無事にデータが表示されれば正常です。

image-20220410223625119.png

次の記事

Lambda のタンブリングウィンドウを検証するための環境の準備が出来ました。次の手順で実際に試していきましょう。

その2
https://qiita.com/sugimount-a/items/4853392769af7b8a640d

Register as a new user and use Qiita more conveniently

  1. You can follow users and tags
  2. you can stock useful information
  3. You can make editorial suggestions for articles
What you can do with signing up
0
Help us understand the problem. What are the problem?