前回の記事でKinesis Data Firehoseの作り方を書いた。今度は引き続き、Kinesis Data Firehoseでの動的パーティショニングを利用するやり方を備忘として書く。
Kinesis Data Streams から Kinesis Data Firehose の Dynamic Partitioning を使って S3 に格納してみた
https://qiita.com/sugimount-a/items/edd136364a8e0d6cb725
AWSのエンジニアさんが書いた記事のとおり。既定だとS3などの保存先には勝手に日時のフォルダが切られる。例えば2023/08/31/11/23/filename
といった具合に、S3にディレクトリとサブディレクトリが切られて、そこにストリームで流れたデータが格納される。
このとき、流すストリームのデータを特定のディレクトリに格納するとなると、ストリームを複数作ることになり煩雑。また、その都度ストレージ側(S3)でディレクトリを手動などで切る処理が必要があり、手間がかかる。
そういうケースで、このDynamicPartitioningを活用すると便利。
-
DynamicPartitioningの処理
- KinesisFirehoseが流れるストリームの中身を読み取る
- 読み取ったヘッダの中身を抽出
- 抽出内容の値でS3にディレクトリを動的に作成(動的パーティショニング)
-
注意
一度作った配信ストリームでDynamicPartitioningを無効にしていると、有効にできない(そういう動きだったはず、今はバージョンアップして変わっているかも?)。そのため、使う場合は配信ストリームを再度作り直す。
KinesisDataFirehoseで配信ストリームを新規作成する。
その際、S3バケット選択の箇所に動的パーティショニングの設定箇所がある。
いくつかのオプション設定があるが、ここではJSON のインライン解析
を有効にする。
有効にすると、動的パーティショニングキーの設定画面が出てくる。JQでパースするための式とキーの名前を書く。
初見でなんのことやらという感じで戸惑うが、ここはストリームするデータの中身についてを記述する。例えばストリームに流すデータに「Date」というヘッダがあり、そこに「YYYY-MM-DD」形式で値が入っているとする。S3バケットのディレクトリは「Date」ヘッダの「YYYY-MM-DD」部分を読み取って、その値を元に動的にディレクトリを作成してもらいたい。なおかつ、そこにマッチするデータだけを、S3の「YYYY-MM-DD」サブディレクトリに流したい。
その場合、このように記述する。
続いて、S3バケットプレフィックスを指定する。以下ではS3バケット直下に「testbucketdir
」というディレクトリがあり、その配下に動的パーティショニングを作る構成としている。
/!{partitionKeyFromQuery:Date}/
のDateの部分を自身の検索キーに変える。
エラー発生時のログのS3保存先も指定。以下ではS3バケット直下に「testbucketdir
」の配下「/errdir/
」に保管するように指定している。
その他は暗号化やバッファサイズなどをよしなに設定して、ストリームを作成する。
上記で、S3バケット直下「testbucketdir
」の配下に、「YYYY-MM-DD」形式のディレクトリが自動で作られて、そこにKinesisFirehoseで流れたストリームデータが格納されるようになる。
まさに名前の通り、動的にディレクトリをパーティションで分割して格納されるという算段。
とても便利な機能だった。
参考
Amazon Kinesis Data Firehoseの動的パーティショニング(jqパーサー)を試してみた
https://dev.classmethod.jp/articles/dynamic-partitioning-kinesis-firehose/
Amazon Kinesis Data Firehose の動的パーティショニングを利用してみた記録と注意点
https://qiita.com/kkkdev/items/e4c931bc54dcf3442a0b