Edited at

AWS IoT Core 即席ハンズオン (2)


AWS IoT Core 即席ハンズオン

AWS IoT Core 即席ハンズオン (1)


このハンズオンで学べること


  • MQTTクライアント - AWS IoT Core - Kinesis Data Streamsの連携

  • Kinesis Data Streams上に保存されたデータをLambdaから取得する

  • Kinesis Data Streams上に保存されたデータをKinesis Client Library for Python で取得し、DynamoDBに保存する


このハンズオンで発生する料金について


  • Cloud9とDynamoDBを2テーブル利用します。どちらも無料枠(が残っている場合)の範囲内です

  • Kinesis Data Streamsを1シャード作成します。無料枠はないので、料金がほんの少しかかります。


(1) Kinesis Data Streamsへデータを保存します


Kinesis Data Streamsを作成します

sh 04_create-kinesis-data-stream.sh

(なんらか出力されます)


作成したKinesis Data Streamsを確認します

こんな感じでiot-demo-streamが作成されていることを確認します

作成されていない場合は、東京リージョンであることとIAMの権限を確認してください

image.png


(2) Kinesis Data StreamsとLambdaの連携


Lambda用のIAMロールの作成

IAMのロールからロールの作成を選択します

image.png

Lambdaを選択します

image.png

AdministratorAccessにチェックを入れて、次のステップを選択します。次のタグの追加は何もせず次のステップを選択します

ハンズオンのため管理者権限を割り当てています

image.png

ロール名にLambdaDemoRoleと入れて、ロールの作成でロールを作ります

image.png


Lambdaの作成

マネジメントコンソールのLambdaから、「関数の作成」を選択し、「設計図」からkinesis-process-record-python

を選択します

image.png

名前をLambdaKinesisDemo

ロールを先程作成したLambdaDemoRole

Kinesis ストリームをiot-demo-stream

開始位置を水平トリム

トリガーの有効化にチェックを入れて

関数を作成します

image.png

ちなみに以下のようなコードになっています

from __future__ import print_function

import base64
import json

print('Loading function')

def lambda_handler(event, context):
#print("Received event: " + json.dumps(event, indent=2))
for record in event['Records']:
# Kinesis data is base64 encoded so decode here
payload = base64.b64decode(record['kinesis']['data'])
print("Decoded payload: " + payload)
return 'Successfully processed {} records.'.format(len(event['Records']))


CloudWatch Logsの確認

print("Decoded payload: " + payload)の部分で、Kinesis Data Streamから取り出したデータ(payload)を標準出力に出していることが分かります。

Lambdaの標準出力はCloudWatch Logsに保存されているので、実際に確認してみましょう。

以下のLambdaの画面で「モニタリング」をタブを選択します。

image.png

CloudWatchのログを表示という箇所をクリックすると、CloudWatchの画面に飛びます。

image.png

ログストリームがいくつか(もしくは1つ)あるので、それをクリックします

image.png

こんな感じでKinesis Data Streamsに保存されたデータをLambdaが取り出したことを確認できます。

image.png

このハンズオンではLambdaでデータを取り出し、標準出力へ出しただけですが、Lambdaの処理を書いてさえしまえば、

(A) DynamoDBに保存する

(B) MySQLなどのRDBにinsertする(採用する場合は事前検証を入念に)

など柔軟な処理を行うことが可能です。


(3) Kinesis Data Streams と Kinesis Client Library for Python の連携

Kinesis Data Streamsからのデータの取り出し方は、先程のLambdaを使う方法とKinesis Client Library(KCL)を使う方法の2通りがメジャーです。

KCLはJavaを利用することが多いですが、個人的によく使うPythonを使ってみました。

このハンズオンでは、KCLがKinesis Data Streamsからデータを取り出して、DynamoDBに保存するところまで紹介します。


データ保存用DynamoDBの作成

sh 05_create-ddb-kcl-table.sh

##「iot-demo-kcl」というDynamoDBのテーブルが東京リージョンで作成されます


KCLの実行

sh 06_run_kcl.sh

(以下のようなメッセージが出力されます)
Step 1/7 : FROM amazonlinux:2
---> d656eea421ba
Step 2/7 : RUN yum -y install git java-1.8.0-openjdk
---> Using cache
---> 173841e0f1e6
Step 3/7 : ADD amazon-kinesis-client-python/ /amazon-kinesis-client-python
---> Using cache
---> af06385789b3
Step 4/7 : RUN cd /amazon-kinesis-client-python/ && python -m ensurepip && python setup.py install && pip install boto3
---> Using cache
---> dbbc480b5588
Step 5/7 : COPY sample.properties /amazon-kinesis-client-python/samples/sample.properties
---> Using cache
---> c6a447964cc8
Step 6/7 : WORKDIR /amazon-kinesis-client-python/samples
---> Using cache
---> c14f28b534cf
Step 7/7 : CMD `python amazon_kclpy_helper.py --print_command --java /usr/bin/java --properties sample.properties`
---> Using cache
---> 8efb95db66ad
Successfully built 8efb95db66ad
Successfully tagged aws-iot-demo-kcl:latest
Nov 26, 2018 2:56:35 PM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator getConfiguration
INFO: Value of workerId is not provided in the properties. WorkerId is automatically assigned as: 06a3b98d-122b-4bf2-afb9-993165e0ff77
Nov 26, 2018 2:56:35 PM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property initialPositionInStream with value TRIM_HORIZON
Nov 26, 2018 2:56:35 PM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property regionName with value ap-northeast-1
Nov 26, 2018 2:56:35 PM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig buildExecutorService
INFO: Using a cached thread pool.
Nov 26, 2018 2:56:35 PM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig <init>
INFO: Running PythonKCLIoT to process stream iot-demo-stream with executable sample_kclpy_app.py
Nov 26, 2018 2:56:35 PM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: Using workerId: 06a3b98d-122b-4bf2-afb9-993165e0ff77
Nov 26, 2018 2:56:35 PM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: Using credentials with access key id: ASIA24VN5XYJ2SC732V3
Nov 26, 2018 2:56:35 PM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: MultiLangDaemon is adding the following fields to the User Agent: amazon-kinesis-client-library-java-1.9.0 amazon-kinesis-multi-lang-daemon/1.0.1 python/2.7 sample_kclpy_app.py
Nov 26, 2018 2:56:36 PM com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator <init>
INFO: With failover time 10000 ms and epsilon 25 ms, LeaseCoordinator will renew leases every 3308 ms, takeleases every 20050 ms, process maximum of 2147483647 leases and steal 1 lease(s) at a time.
Nov 26, 2018 2:56:36 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Initialization attempt 1
Nov 26, 2018 2:56:36 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Initializing LeaseCoordinator
Nov 26, 2018 2:56:37 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Syncing Kinesis shard info
Nov 26, 2018 2:56:38 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize

内部的にはKinesisからデータを取り出して、先程作成したDynamoDBへデータを保存しています。

それでは見てみましょう。

image.png

確かに保存されていますね。

ソースコードも公開しているので、DynamoDBに保存している部分を例えばRDBに保存するなどに変えていただければ

Lambdaの時と同様に柔軟にデータ処理ができますね。


後片付け

Cloud9とDynamoDBは無料枠の範囲内であればそのままでもいいかと思います。

ただ、Kinesis Data Streamsには無料枠がないので、ハンズオンが終わったら消してしまいましょう。