AWS IoT Core 即席ハンズオン
このハンズオンで学べること
- MQTTクライアント - AWS IoT Core - Kinesis Data Streamsの連携
- Kinesis Data Streams上に保存されたデータをLambdaから取得する
- Kinesis Data Streams上に保存されたデータをKinesis Client Library for Python で取得し、DynamoDBに保存する
このハンズオンで発生する料金について
- Cloud9とDynamoDBを2テーブル利用します。どちらも無料枠(が残っている場合)の範囲内です
- Kinesis Data Streamsを1シャード作成します。無料枠はないので、料金がほんの少しかかります。
(1) Kinesis Data Streamsへデータを保存します
Kinesis Data Streamsを作成します
sh 04_create-kinesis-data-stream.sh
(なんらか出力されます)
作成したKinesis Data Streamsを確認します
こんな感じでiot-demo-stream
が作成されていることを確認します
作成されていない場合は、東京リージョンであることとIAMの権限を確認してください
(2) Kinesis Data StreamsとLambdaの連携
Lambda用のIAMロールの作成
AdministratorAccess
にチェックを入れて、次のステップを選択します。次のタグの追加は何もせず次のステップを選択します
ハンズオンのため管理者権限を割り当てています
ロール名にLambdaDemoRole
と入れて、ロールの作成
でロールを作ります
Lambdaの作成
マネジメントコンソールのLambdaから、「関数の作成」を選択し、「設計図」からkinesis-process-record-python
を選択します
名前をLambdaKinesisDemo
ロールを先程作成したLambdaDemoRole
Kinesis ストリームをiot-demo-stream
開始位置を水平トリム
トリガーの有効化にチェック
を入れて
関数を作成します
ちなみに以下のようなコードになっています
from __future__ import print_function
import base64
import json
print('Loading function')
def lambda_handler(event, context):
#print("Received event: " + json.dumps(event, indent=2))
for record in event['Records']:
# Kinesis data is base64 encoded so decode here
payload = base64.b64decode(record['kinesis']['data'])
print("Decoded payload: " + payload)
return 'Successfully processed {} records.'.format(len(event['Records']))
CloudWatch Logsの確認
print("Decoded payload: " + payload)
の部分で、Kinesis Data Streamから取り出したデータ(payload)を標準出力に出していることが分かります。
Lambdaの標準出力はCloudWatch Logsに保存されているので、実際に確認してみましょう。
以下のLambdaの画面で「モニタリング」をタブを選択します。
CloudWatchのログを表示
という箇所をクリックすると、CloudWatchの画面に飛びます。
ログストリームがいくつか(もしくは1つ)あるので、それをクリックします
こんな感じでKinesis Data Streamsに保存されたデータをLambdaが取り出したことを確認できます。
このハンズオンではLambdaでデータを取り出し、標準出力へ出しただけですが、Lambdaの処理を書いてさえしまえば、
(A) DynamoDBに保存する
(B) MySQLなどのRDBにinsertする(採用する場合は事前検証を入念に)
など柔軟な処理を行うことが可能です。
(3) Kinesis Data Streams と Kinesis Client Library for Python の連携
Kinesis Data Streamsからのデータの取り出し方は、先程のLambdaを使う方法とKinesis Client Library(KCL)を使う方法の2通りがメジャーです。
KCLはJavaを利用することが多いですが、個人的によく使うPythonを使ってみました。
このハンズオンでは、KCLがKinesis Data Streamsからデータを取り出して、DynamoDBに保存するところまで紹介します。
データ保存用DynamoDBの作成
sh 05_create-ddb-kcl-table.sh
##「iot-demo-kcl」というDynamoDBのテーブルが東京リージョンで作成されます
KCLの実行
sh 06_run_kcl.sh
(以下のようなメッセージが出力されます)
Step 1/7 : FROM amazonlinux:2
---> d656eea421ba
Step 2/7 : RUN yum -y install git java-1.8.0-openjdk
---> Using cache
---> 173841e0f1e6
Step 3/7 : ADD amazon-kinesis-client-python/ /amazon-kinesis-client-python
---> Using cache
---> af06385789b3
Step 4/7 : RUN cd /amazon-kinesis-client-python/ && python -m ensurepip && python setup.py install && pip install boto3
---> Using cache
---> dbbc480b5588
Step 5/7 : COPY sample.properties /amazon-kinesis-client-python/samples/sample.properties
---> Using cache
---> c6a447964cc8
Step 6/7 : WORKDIR /amazon-kinesis-client-python/samples
---> Using cache
---> c14f28b534cf
Step 7/7 : CMD `python amazon_kclpy_helper.py --print_command --java /usr/bin/java --properties sample.properties`
---> Using cache
---> 8efb95db66ad
Successfully built 8efb95db66ad
Successfully tagged aws-iot-demo-kcl:latest
Nov 26, 2018 2:56:35 PM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator getConfiguration
INFO: Value of workerId is not provided in the properties. WorkerId is automatically assigned as: 06a3b98d-122b-4bf2-afb9-993165e0ff77
Nov 26, 2018 2:56:35 PM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property initialPositionInStream with value TRIM_HORIZON
Nov 26, 2018 2:56:35 PM com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator withProperty
INFO: Successfully set property regionName with value ap-northeast-1
Nov 26, 2018 2:56:35 PM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig buildExecutorService
INFO: Using a cached thread pool.
Nov 26, 2018 2:56:35 PM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig <init>
INFO: Running PythonKCLIoT to process stream iot-demo-stream with executable sample_kclpy_app.py
Nov 26, 2018 2:56:35 PM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: Using workerId: 06a3b98d-122b-4bf2-afb9-993165e0ff77
Nov 26, 2018 2:56:35 PM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: Using credentials with access key id: ASIA24VN5XYJ2SC732V3
Nov 26, 2018 2:56:35 PM com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig prepare
INFO: MultiLangDaemon is adding the following fields to the User Agent: amazon-kinesis-client-library-java-1.9.0 amazon-kinesis-multi-lang-daemon/1.0.1 python/2.7 sample_kclpy_app.py
Nov 26, 2018 2:56:36 PM com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator <init>
INFO: With failover time 10000 ms and epsilon 25 ms, LeaseCoordinator will renew leases every 3308 ms, takeleases every 20050 ms, process maximum of 2147483647 leases and steal 1 lease(s) at a time.
Nov 26, 2018 2:56:36 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Initialization attempt 1
Nov 26, 2018 2:56:36 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Initializing LeaseCoordinator
Nov 26, 2018 2:56:37 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
INFO: Syncing Kinesis shard info
Nov 26, 2018 2:56:38 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker initialize
内部的にはKinesisからデータを取り出して、先程作成したDynamoDBへデータを保存しています。
それでは見てみましょう。
確かに保存されていますね。
ソースコードも公開しているので、DynamoDBに保存している部分を例えばRDBに保存するなどに変えていただければ
Lambdaの時と同様に柔軟にデータ処理ができますね。
後片付け
Cloud9とDynamoDBは無料枠の範囲内であればそのままでもいいかと思います。
ただ、Kinesis Data Streamsには無料枠がないので、ハンズオンが終わったら消してしまいましょう。