やりたいこと
- Kinesis Data Firehose(以下Firehose)の新機能、動的パーティショニングを試したい。
- ついでに、サブ機能である改行追加機能(new line delimiter)も試しておきたい。
環境
- データにはRoute 53 Resolverログを使う。
- Resolverログはスキーマがシンプルで、かつFirehoseに直接投入できるのでテストに好適。
- ほんとはCloudWatch Logsをデータソースとしてあれこれ試したいが、Logsを介したログはメタデータで覆われてる上にBASE64エンコード・GZ圧縮されてるので、前段に結局Lambdaが必要になる模様。今回はスルー。
- 検証結果比較のため、Firehoseデリバリーストリームは二つ作成する。
-
kfh-resolverlog-plain
... 素のデリバリーストリーム -
kfh-resolverlog-dynamic
... 動的パーティショニングを適用したデリバリーストリーム
-
- S3バケットは単一(
kfh-dynamic-test
)とし、デリバリーストリームごとにプレフィックスで分岐する。s3://kfh-dynamic-test/plain/
s3://kfh-dynamic-test/dynamic/
検証開始
1) 何はともあれS3
-
kfh-dynamic-test
を作成する。
% aws s3 mb s3://kfh-dynamic-test --region ap-northeast-1
make_bucket: kfh-dynamic-test
2) 素のFirehoseデリバリーストリームを作成
- Resolverログを受け取るための最初のデリバリーストリーム、
kfh-resolver-plain
を作成する。
ちょっと見ないうちに何だか随分UI変わったなあ。。
それはさておき、ソースをDirect PUT(Resolverログを直接受けるので、Kinesis Data Streamsは介在しない)、ターゲットをS3とする以外は、特に変更せず構成する。今回は検索まではやらないが、一応S3プレフィックスはGlueパーティションを使えるようHive形式に合わせておく。
なお、二つ目のデリバリーストリームとパスを分けるため、plain/という文字列を先頭に付けている。
3) Resolverログ①を構成
- 無変換のままFirehose→S3に出力するためのResolverログを構成する。
こちらも似たようなUIから、シンプルにFiehoseデリバリーストリームと対象VPCを選択して完了。
4) 動作確認
ただし、ファイルをダウンロードしてみると、確かにResolverログはJSONフォーマットで記録されているものの、全レコードが1行に結合されている。よく見かける、Firehoseの吐くログのパターンだ(Firehoseは改行を付与してくれない)。
これではAthenaで検索できそうもない。Firehose、まだこの仕様直ってないのか。。。
と思ったら、(検索までやらないとは言いつつ、やっぱり気になるので)Glue Crawlerで走査し、Athenaで検索してみると、意外に検索できた。AthenaのJSONL縛りはドキュメント上は変わっていないように見えるので、なぜ通るのか、正確なところは不明。
ちなみに、これから試す動的パーティショニングには今回、new line delimiterという心強そうな機能が追加されている。AthenaがJSONL以外も検索できるようになったという公式情報は見当たらない(できないとしている公開情報はこちらを始め複数ある)ので、予定通りこちらも試してみることにする。
5) 動的パーティショニング付きのFirehoseデリバリーストリームを作成
- Resolverログを受け取るための二つ目のデリバリーストリーム、
kfh-resolver-dynamic
を作成する。 - 今度は動的パーティショニングを有効にし、以下を設定する。
- new line delimiterを有効化。
- inline parsing for JSONを有効化し、以下の2フィールドをパーティションキーとして抽出する。
キー | JQ Expression |
---|---|
region | .region |
vpc_id | .vpc_id |
S3プレフィックスは以下のように、dynamic/を頭に付けた上で、パーティションキーを反映した構成にする。ちなみに、Apply dynamic partitioning keys
ボタンを押すと所定の書式でサクッとプレフィックスを設定してくれるが、Hive形式にはしてくれないので注意。
6) Resolverログ②を構成
- 上記のFirehoseを経由してS3に出力するための、二つ目のResolverログを構成する。Firehoseデリバリーストリームには
kfh-resolver-dynamic
を指定する。 - あと少し。...と思ったら、どうやら同じVPCを同じターゲットタイプ(ここではFirehose)に重複して送れないらしい。仕方ないので、別のVPCを選んで再作成する。
VPC のクエリログ記録を有効にできませんでした。
ResourceId: vpc-aXXXXXXX。InvalidRequestException: [RSLVR-01306] The resource is already associated with a query logging configuration that is sending query logs to the specified destination type. Trace Id: "1-614a0faf-5cb479bf683df6c352b488f2"。
7) 動作確認
- これで、S3バケットのdynamic/配下に、指定したパーティションキーをS3プレフィックスとしたResolverログ群が出来上がり、かつログエントリごとに改行が挟まれているはず。
- 再び待つこと5分。無事、dynamic/リージョン/VPCIDでS3上にログが生成された。
実用上は、これにタイムスタンプ(Firehoseへの出力時間ではなく、ログの発生時間)を組み合わせてもう少し深いパーティションにすることになると思うが、検証としては十分。
(2021/10/22追記)横着せず、確認してみた。以下のプレフィックス設定にすれば、ちゃんと /region=AAAA/vpc_id=BBBB/year=yyyy/month=MM/day=dd/hour=HH/ の形式で出力してくれました。
dynamic/region=!{partitionKeyFromQuery:region}/vpc_id=!{partitionKeyFromQuery:vpc_id}/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/
気になる改行コードはというと、ちゃんと各レコードの末尾に付与されていた。有償機能使わずとも、本来、こうあるべきだよな。。
最後に、Glue Crawlerで上記パーティションを読み取った上で、Athenaクエリーを実行してみる。
一点注意が必要だったのは、Hiveの制約で、列名とパーティション名が重複するとエラーになる点。Glueのスキーマ編集機能を使って、パーティション名をpk_regionやpk_vpc_idと変更すると、クエリーが通るようになった。
まとめ
- Firehoseの動的パーティショニングは、ログの構造さえ把握していれば設定も簡単で、改行の追加を含め、使い勝手のよい機能だと思う。ただし個人的には、任意のキーでのパーティションや改行追加の機能は、標準機能として(追加コストなしで)実装して欲しかった。
- ログがJSON形式でない場合、圧縮されている場合、エンコードされている場合などは、前段で変換Lambdaをかませることで対応可能のようだが、ログフォーマットを選べるなら素直にJSONで送るのが吉。
- ちょっと目を離した隙にAthenaも進化を遂げていた。