はじめに
AWS IoT で IoT基盤を構築 ~お試し編~ではIoT Coreから直接Dynamo DBにデータを流していたが、それだとアクセスが多い場合データがドロップするので、Kinesisを使用してバッファを持たせる。
参考
- 全体的な手順 https://www.slideshare.net/dcubeio/kinesis-firehose-80037243
- S3prefixの設定 https://dev.classmethod.jp/cloud/firehose-custom-s3-prefixes/
- データの配信頻度 https://docs.aws.amazon.com/ja_jp/firehose/latest/dev/basic-deliver.html
- ハンズオン道場 https://awsiot-handson-dojo-basic-ble.readthedocs.io/en/latest/05.html
手順
AWS IoT で IoT基盤を構築 ~お試し編~の続きになっています。
ルールの作成
まず、コンソールにログインし、IoT Coreを開きます。画面左部のメニューから「ACT」をクリックします。
「ルールの作成」をクリックします。ルールに名前を付けます。今回はkinesis_streams
としました。
「ルールクエリステートメント」欄に、データに対する加工操作を記述します。「SQL バージョンの使用」は2016-03-23
にします。「ルールクエリステートメント」には下記のステートメントを記述します。
SELECT clientid() AS client_id, timestamp() AS timestamp, * FROM 'sdk/test/Python'
「アクションの追加」をクリックします。「Amazon Kinesis Firehose ストリームにメッセージを送信する」をクリックし、「アクションの設定」をクリックします。「アクションの設定」画面で「新しいリソースを作成する」をクリックすると、「Amazon Kinesis」画面に遷移します。画面左部の「Data Firehose」が選択されていることを確認し、「Create Delivery Stream」をクリックします。「Delivery stream name」にstream01
と付けました。「Choose source」では「Direct PUT or other sources」ラジオボタンをONにしました。「Next」をクリックします。
「Transform source records with AWS Lambda」ではKinesis上でLambdaを呼び出しデータを加工するか設定できます。今回は変換しないので「Disabled」ラジオボタンをONにします。「Convert record format」では、データをparquet形式に圧縮するか設定できます。圧縮すると行き先はS3のみになります。今回は変換しないので「Disabled」ラジオボタンをONにします。「Next」をクリックします。
「Select destination」では「Amazon S3」ラジオボタンをONにします。「S3 destination」ではS3 bucketを指定するのですが、新しく作成する場合は「Create new」をクリックします。今回は新しく作成することとし、「S3 bucket name」をdata-d9wj2qk3
としました。「Region」は「Asia Pacifi(Tokyo)」としました。その他はデフォルトとし、「Next」をクリックします。
「S3 buffer conditions」ではバッファの設定をします。S3へ配信する場合は、これらの条件のどちらかを満たしたタイミングで配信されます。「Buffer size」を5
とし、「Buffer interval」を60
としました。「S3 compression and encryption」ではデータを圧縮するか、暗号化するか選択しますが、今回はデフォルトのままとします。「Error logging」と「Tags(optional)」もデフォルトのままとします。「IAM role」では「Create new or choose」をクリックします。今まで一度も作成していなければ、「Amazon Kinesis Firehose is requesting permission to use resources in your account」画面に遷移し、デフォルトの値が入力された状態になります。そのまま「許可」をクリックします。Kinesisの画面に戻るので、「Next」をクリックします。
今まで設定した内容の確認画面が表示されるので、確認できたら「Create delivery stream」をクリックします。
AWS IoT の「アクションの設定」画面に戻るので、ストリーム名に先ほど作成したストリームを指定します。「Separator」には「\n(改行)」を指定します。「このアクションを実行するための AWS IoT アクセス権限を付与するロールを選択または作成します。」で「ロールの作成」をクリックし、「新しいロールの作成」画面でロール名を入力します。今回はiot_kinesis_firehose
としました。「アクションの追加」をクリックします。
「ルールの作成」をクリックします。
以上でKinesisを経由してS3にデータが送信されるようになりました。データはバッファにある程度溜まったあと、1つのファイルにまとめられてS3に格納されます。Dynamo DBに直接PUTしていたときは1リクエスト1ファイル生成していたので、効率的に処理され負荷が小さくなっています。Kinesisがバッファの役割になるのでIoT Coreの負荷が低減さて、QoSが0でも欠落しにくくなると考えられます。