(※初稿2021/09/04、 2021/12/09更新)
2021/12/08、「BigData-JAWS 勉強会#19」にて、運用実績を加味した登壇を行いました。
https://jawsug-bigdata.connpass.com/event/226545/
DMMデータインフラ部Trackingチームに所属している津久井です。
先月の8月31日にリリースされたばかりの
Dynamic Partitioning(動的パーティション) in Kinesis Data Firehose について
さっそく試す機会があったので、
取り急ぎこの場に仕様と実際に利用した記録、注意点を記載いたします。
前提
AWS Kinesis Data FirehoseでのS3出力のパスのカスタマイズには、
・固定文字列の指定
・"Firehoseに読まれた時点での"日付による指定(UTC)
のカスタムプレフィックスのパーティショニングのみに対応していました。
<例>
s3://bucket-name/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/
課題
そのため、
・パスにサフィックスの付与
・レコードの値に応じたディレクトリ分けでのデータ保存
・日付による指定(JST)
などを実現することが出来ず、その後のETLに工夫が必要でした。
(時差でデータのパーティショニングが2日にまたがる、などは地味に嫌な問題)
大規模データでは そのパーティショニングがポイントの1つとなっており、
Firehoseが要件を満たさないため、Firehoseを採用せず、
細かいパーティショニングのためだけに コンシューマーアプリケーション
(Lambda / Spark Streaming / Kinesis Data Analytics など)
で仕様を実装して実現しているケースもあります。
Dynamic Partitioningでできるようになったこと
そんな中、今回 リリースされた Dynamic Partitioning機能では、
・FirehoseにPUTされたデータ内のレコードの値に応じて、パス全体を指定する
・値は jq で加工可能
・固定文字列も指定可能
ということが可能になり、
・今までコンシューマーアプリケーションの実装で実現していた部分が不要
・かつ、完全サーバレスなFirehoseで完結する (かつkinesis streamsとの連携は設定1つでOK)
というかなり大きい恩恵(コンシューマーアプリがまるごと不要になる)を手軽に受けられるようになりました。
当チームでの導入(調査)のモチベーション
当チームでは 今後実現したいこととして
・複数のデータ構造のパターンがあり、
・データのレコード内にそれを識別するキーを持つ、
・KPLでaggregationされたデータをkinesis data streams に送信しており、
・S3へのアウトプットの際も "日付"と"データ構造の種類"ごとにパーティションを切る
という仕様のコンシューマーアプリケーションを開発するのにマッチした
AWSサービスを検討していました。
そんなタイミングでこの Dynamic Partitioning を知り、
まさに実現したい上記の仕様が完全サーバレスかつ ノーコード で実現できる、
ということで 試してみる価値がある、という結論に至り、調査を始めました。
利用〜データ生成までの流れ
今回は下記の仕様で利用してみました。
実現したいこと
input:ストリームより送られてくるデータ構造
1つのストリームに、(KPLなどで)aggregationされた複数のデータ構造のjsonフォーマットが混在して送られてくるとします。
[データ構造1]
{"time":"2021-08-10T19:29:59+09:00", "various", "data_structure_1", ...},
{"time":"2021-08-10T09:03:45+09:00", "various", "data_structure_1", ...},
[データ構造2]
{"time":"2021-08-27T23:29:59+09:00", "various", "data_structure_2", ...},
{"time":"2021-08-27T10:54:25+09:00", "various", "data_structure_2", ...},
output:出力イメージ
送られてくるデータの "time" "various" の値に応じて、パスを振り分けてレコードをS3に出力します。
[データ構造1]
s3://(バケット名)/result/data_structure_1/2021-08-10/
[データ構造2]
s3://(バケット名)/result/data_structure_2/2021-08-27/
手順
1. Firehoseの対象のdelivery streamの設定で、Dynamic partitioningを有効にする
元からあるstreamでも、後から有効にできるようです。
(2021/09/07追記)
どうやら、後からは変更できなそうです。
有効化して保存しようとした際のメッセージ
Enabling or disabling Dynamic Partitioning is not supported at the moment
(私は新規で作りました)
2. Dynamic partitioning関連のパラメータを設定
Multi record deaggregation: Enabled で、aggregationされたデータをdeaggregationする
New line delimiter: Enabled で、レコードごとに改行を加える
Inline parsing for JSON: Enabled で、jq表現による dynamic partitioning機能が使えます
3. 利用する値のキーと jq表現を追加し、パスへの置き換えを設定
・「Dynamic partitioning keys」で利用する値のキーと JQ表現を指定できるので、
追加しました。
追加したキーは、パスの指定でテンプレートとして利用可能です。
キー名:variousの値は そのまま値を採用し、
キー名:timeの値は、yyyy-mm-dd の部分だけを抽出します。
・「S3 bucket prefix」で、先程追加した jq表現のKey nameを指定し、dynamic partitionを
設定します。
result/!{partitionKeyFromQuery:various}/!{partitionKeyFromQuery:time}/
4. エラーログのパスを指定
「S3 bucket error output prefix」で、エラーログのパスを指定します。
エラーログには エラーとなったレコードそのものがbase64で保存されるようなので、エラーだらけだと
あっという間にS3を圧迫しそうなのが注意です。
(下記SSの「rawData」)
※ちなみに prefixの最後に / を付けないと、下記のように ディレクトリ名が結合されて出力されてしまいます。
(つまり、上の画像のパスはBadです)
保存して完了
その他 Retry duration など諸々設定し、「Save changes」で設定が反映されます。
出力結果
上記の設定の上、前述「input:ストリームより送られてくるデータ構造」を送信してみた結果、
「output:出力イメージ」の期待されるパスに、データが振り分けられて保存されていました。
・s3://(バケット名)/result/data_structure_1/2021-08-10/
・s3://(バケット名)/result/data_structure_2/2021-08-27/
後はGlue crawlerにてパーティションを付与すれば、Athena等で利用が可能になります。
ここまでで5分とかからず(うちjq表現の検討が3分くらい)実現ができてしまい、
あまりにラクすぎて頭がハゲそうです。
# Appendix
注意点
jqによるpartitioningのバックエンドにLambdaが使われている?
Firehoseのメトリクスを見ると「Records successfully processed by Lambda function」「Bytes successfully processed by Lambda function」が 追加されていました。
このデリバリーストリームは 従来からあった「Lambdaをデータ変換する」処理を使っていないので、
このメトリクスは おそらくDynamic partitioningのものと思われます。
(Lambda内でjqを動かして実現している?)
このLambdaについての資料が見つからなかったのですが、Service Quotaの設定によっては
同時実行数などの制約にしれっと引っかかりそうな気がします。
jq表現が切れる
下記のように先頭10文字を抽出するjq表現にて
.time[:10]
コロン(:)を入れると 保存後のコンソールの表示上からはコロン以降の表現が消えてしまうようなのですが、
設定としては正しくされている(期待されたpartitioningで書き出されている)ようです。
ただ、設定画面を再度開くとコロン以降が消えた状態で表示されているので、
このまま保存すると消えた状態で保存されてしまうかもしれないので、
管理上の思わぬ事故を防ぐために 下記の 正規表現での抽出に変更しました。
.time | match("[0-9]{4}-[0-9]{2}-[0-9]{2}").string
まとめ
・地味ながらも今まで悩みの種であったDynamic Partitioningが完全に手離れした状態となることで、
また1つETLの悩みが減るのではないでしょうか。
・次回は(需要があれば)引き続きこのDynamic Partitioningにおいて、
数万RPSで稼働している 当チームのトラッキングプロダクトのストリームにて、
本番稼働に耐え得るかの検証結果を記載していく予定です。
(Firehose自体の実績は言わずもがななので、不安はあまりありませんが..)
DMM データインフラ部では、AWSでのビッグデータ基盤関連の運用を行っており、
AWS新規サービスなどもいち早く検証・採用するプロダクト開発を行っています。
中途採用などもおこなっておりますので、興味のある方は、一度弊社HPなどから
カジュアル面談など、是非ご応募ください!