3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Kinesis Data Streams から Kinesis Data Firehose の Dynamic Partitioning を使って S3 に格納してみた

Last updated at Posted at 2022-09-20

はじめに

AWS でストリームデータを活用する一つの方法として、Kinesis Data Streams → Kinesis Data Firehose → S3 → Athena → QuickSight という組み合わせがあります。受け取ったストリームデータを S3 に格納し、Athena と QuickSight で可視化をする構成です。Athena では S3 上に格納された、JSON, CSV, Parquet といったデータをスキャンして集計のクエリーを発行できます。Athena の料金の考え方のひとつとして、スキャン料金があります。S3 上に格納されたデータをスキャンした容量によって料金が発生します。たとえば、毎日の単位でディレクトリを分けることで、特定の期間に限定したスキャンが出来るパーティションと呼ばれる機能があります。

利用のイメージはこんなかんじです。

image-20220920224448478.png

大事なポイントは、S3 にデータを格納するときに、パーティションを意識してディレクトリを区切ったうえで、データを格納しないといけない点があります。これを自分たちでプログラムを作成していくのは面倒なので、Kinesis Data Firehose に自動的にディレクトリを切ってくれる機能があります。

ただ、何も意識しないで利用すると、 Kinesis Data Firehose が受信した UTC 時刻を使って ディレクトリが自動的に作成されます。UTC 時刻なので、日本時間と時間差が生じることになります。具体例を見てみましょう。

image-20220920230258300.png

JSON データを Kinesis Data Streams, Kinesis Data Firehose を経由して S3 に格納している図を表現しています。日付に関して何も意識せず利用すると、UTC 時刻を使います。そのため、日本時刻の 2022-09-02 08:00 に受信したデータは、UTC 的に 2022-09-01 のディレクトリに格納されます。この問題点は、Athena をつかって SQL クエリーを実行しても、正確な時刻でデータを取得できない問題点があります。

これを解決するために、Kinesis Data Firehose の Dynamic Partitioning という機能を利用できます。

image-20220920230628246.png

Dynamic Partitioning を使うことで、受け取った JSON データの中身に記録されている日付を確認し、その日付に合わせたディレクトリに格納することができます。これによって、Athena の SQL クエリーの時刻指定も、より厳密な期間を指定しやすくなります。

なお、Dynamic Partitioning の機能は、通常の Kinesis Data Firehose の料金に加えて、追加料金が発生します。具体的なものは料金ページをご覧ください。

それでは、Dynamic Partitioning の設定方法を確かめていきましょう!

Kinesis Data Firehose を作成

Kinesis の画面に移動して、delivery stream を作成していきます。メニューに「Firehose」というキーワードは出てこないです。Kinesis Data Firehose のサービスで提供されている delivery stream を作っていく考え方ですね。

image-20220918113329754.png

データの入力元 Source の指定、出力先 Destination の指定を行います。今回は、入力元を Kinesis Data Streams にして、出力先を S3 にしていきます。

image-20220918131201074.png

データの変更や加工はしません

image-20220918131454829.png

以下のパラメータを入れます。

  • 出力先に S3 Bucket を指定
  • Dynamic partitioning を有効
  • New line delimiter を有効化
    • Kinesis Data Firehose では、S3 にデータを保存するときに、複数のデータをまとめて出力先に保存します。まとめる単位は、「一定時間ごと」もしくは「一定容量ごと」にまとめられます。New Line delimiter を有効にすると、複数のレコードの間に自動的に改行を入れてくれるため、基本的には有効化が良いと思います。(この機能を無効化すると、1行に複数のデータが入れられるので逆にやりにくい)

image-20220918140449168.png

New line delimiter について、具体的な例を上げます。無効化のときは、このように2つのレコードがまとまって1行となっています。

{"type":"clickEvent","date":"2022-09-18 13:47:34","url":"http://test.com","contentsName":"すごいコンテンツ1"}{"type":"clickEvent","date":"2022-09-18 13:47:34","url":"http://test.com2","contentsName":"すごいコンテンツ2"}

New line delimiter を 有効化したときには、複数のレコード間で自動的に改行されます。Athena などから利用する際には別々のレコードは改行されていることが期待されているので、New line delimiter は有効化の方が便利だと思います。

{"type":"clickEvent","date":"2022-09-18 13:47:34","url":"http://test.com","contentsName":"すごいコンテンツ1"}
{"type":"clickEvent","date":"2022-09-18 13:47:34","url":"http://test.com2","contentsName":"すごいコンテンツ2"}

次に、Dynamic Partitioning の動作を指定していくのですが、データのフォーマットを意識する必要があります。この記事の環境では、この Kinesis Data Firehose に流すデータのフォーマットは、次の形式になっています。

{"type":"clickEvent","date":"2022-09-17 20:43:33","url":"http://test.com","contentsName":"すごいコンテンツ"}

このフォーマットを基に、マネージメントコンソールにパーサーのための文字列を入力する必要があります。例えばこんなかんじです。

image-20220918133855448.png

Dynamic Partitioning の裏側では、Linux などで利用できる jq コマンドが使われています。jq コマンドは、JSON メッセージから抽出などが行える便利なパーサーコマンドです。上記の画像の JQ expression は、jq コマンドとして扱うための正規表現です。

この正規表現を、マネージメントコンソール上で動作確認するのは大変なので、ローカルの環境でまずは動作確認を行います。なお、jq コマンドで利用できる function などはこちらに記載されています

上記の Document を見ながら、このように jq コマンドとしてパーサーできることがわかります。

echo -n '{"type":"clickEvent","date":"2022-09-17 20:43:33","url":"http://test.com","contentsName":"すごいコンテンツ"}' | jq '.date | strptime("%Y-%m-%d %H:%M:%S")'

ポイントを整理すると、Kinesis Data Firehose に流れくる時刻のデータは "date":"2022-09-17 20:43:33" です。これを jq 上で Date 型に変換するために .date | strptime("%Y-%m-%d %H:%M:%S") を使っています。

実行例

> echo -n '{"type":"clickEvent","date":"2022-09-17 20:43:33","url":"http://test.com","contentsName":"すごいコンテ
ンツ"}' | jq '.date | strptime("%Y-%m-%d %H:%M:%S")'
[
  2022,
  8,
  17,
  20,
  43,
  33,
  6,
  259
]

この変換を踏まえて、JQ Expression を入力していきます。

  • year という変数に、年数を入れる
  • month という変数に、月数を入れる
  • day という変数に、日数を入れる
.date | strptime("%Y-%m-%d %H:%M:%S") | strftime("%Y")
.date | strptime("%Y-%m-%d %H:%M:%S") | strftime("%m")
.date | strptime("%Y-%m-%d %H:%M:%S") | strftime("%d")

image-20220918133855449.png

そして、実際に S3 Bucket に出力するディレクトリ構成を指定します。上記で指定した yearmonthday の変数を使って、S3 Prefix を指定します。partition_date=年-月-日/ の形式で、S3 に保存する形式です。これは、Athena の Partition Projection の形式を意識しています。

partition_date=!{partitionKeyFromQuery:year}-!{partitionKeyFromQuery:month}-!{partitionKeyFromQuery:day}/

image-20220918174834125.png

バッファーなどはデフォルトのままです。

image-20220918134344113.png

Create を押します。

image-20220918134354701.png

作成中となります。

image-20220918134449490.png

一定時間後、作成が完了されます。

image-20220918134519920.png

動作確認

データを流して、実際に S3 Bucket に正しいディレクトリ構成で出力されるか確認をしていきます。データの流し方の詳細は省略します。(JavaScript のプログラムから流します)

node clickcount.js

一定時間後、S3 にディレクトリが切られデータが格納されています!素晴らしい!

image-20220920233514441.png

このディレクトリの中身は、複数の JSON ファイルが格納されています。

image-20220920233536633.png

検証を通じてわかったこと

  • Kinesis Data Firehose には、**「Firehose がデータを受け取った UTC 時刻」**を基にプレフィックスを付与してくれる機能がある。

  • 上記の懸念を解決するために Dynamic Partitioning を活用可能

  • Kinesis Data Firehose で Dynamic Partitioning を有効化し、その中にある New line delimiter の機能を活用することで、複数のレコードを改行することが出来る。

    • New line delimiter について、具体的な例を上げます。無効化のときは、このように2つのレコードがまとまって1行となっています。

      {"type":"clickEvent","date":"2022-09-18 13:47:34","url":"http://test.com2","contentsName":"すごいコンテンツ2"}{"type":"clickEvent","date":"2022-09-18 13:47:34","url":"http://test.com","contentsName":"すごいコンテンツ"}
      

      有効化のときは改行されます。Athena などから利用する際には別々のレコードは改行されていることが期待されているので、New line delimiter は有効化の方が便利だと思います。

      {"type":"clickEvent","date":"2022-09-18 13:47:34","url":"http://test.com2","contentsName":"すごいコンテンツ2"}
      {"type":"clickEvent","date":"2022-09-18 13:47:34","url":"http://test.com","contentsName":"すごいコンテンツ"}
      
  • Dynamic Partitioning の裏側で、jq コマンドが使われている。バージョンは 1.6

    • マネージメントコンソールにバージョンも書かれている
      image-20220920232615774.png
  • Dynamic Partitioning は、後から有効化することができない。Delivery Stream を作成したときに指定する必要がある。

  • Dynamic Partitioning は追加の料金が発生する

参考URL

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?