Kinesis3兄弟
言わずも知れた3兄弟のうち、Firehose
を触ってみました。(自分用のメモです)
Firehoseって?
ストリーミングデータを確実にキャプチャおよび変換し、データレイク、データストア、および分析サービスに配信する、抽出、変換、ロード (ETL) サービス
とありますが、とりあえずEverything (It's you)=>parquet変換を試してみました。
(JSON=>parquet変換ですね)
data = {
"id": 1,
"name": "Everything(It's you)"
}
Firehose配信ストリーム作成
Source
Kinesis Data Streamsから取り込むか、直接Firehoseへ書き込むか。
今回はDirectPUTを試行。
Destination
出力先を色々選べますが、今回はS3を選択。
ストリーム名を設定
データ変換の指定
Record format converion
をEnabled
に。
Output format
の設定が表示されるので下図のようにApache Parquet
を指定。
データ変換プロキシの役割するAWS Glue databaseを指定
出力先S3バケットを指定
作成Done
Firehoseへデータを送信
put_record
にてEverything(It's you)
を送信。
import boto3
import json
client = boto3.client('firehose')
data = {
"id": 1,
"name": "Everything(It's you)"
}
response = client.put_record(
DeliveryStreamName='PUT-S3-from-kinoko',
Record={
'Data': f'{json.dumps(data)}\n'
}
)
print(response)
S3を見てみると
.parquet
の形式で格納されてた。
- デフォルトで5分間隔の配信頻度
PUTしてからしばらく経ってからS3に格納されましたが、以下の通りデフォルトのBuffer interval
が5分
となってますので、この値を良きに変更すれば良さそうです。
Amazon S3 へのデータ配信の頻度は、配信ストリームで設定した Amazon S3 の [Buffer size (バッファサイズ)] および [Buffer interval (バッファ間隔)] の値によって決まります。
まとめ
parquet形式に簡単に変換できたけど、この先どう扱っていくかはまだピンと来ていないので試してみたい。