はじめに
オークファン開発部で新卒2年目として働いてる@isodaです
業務でKinesisを用いたapiの実装に携わり、そこで得た知見などを備忘録として残したいと思います
Kinesis Data Firehoseの設定
今回の題名にもなっているJSTでパーティショニングする方法ですが、デフォルトのパーティションだとUTCでパーティションされてしまう為、動的パーティショニングを設定します
- ソース
- Direct Put
- 送信先
- S3
- 配信ストリーム名
- test-stream
- S3 バケット
- test-bucket
- 動的パーティショニング
- 有効
- JSON のインライン解析
- 有効
- 動的パーティショニングキー
- キー名 | JQ 式
- year | .ymd[:4]
- month | .ymd[4:6]
- day | .ymd[6:8]
- S3 バケットプレフィックス
- data/!{partitionKeyFromQuery:year}/!{partitionKeyFromQuery:month}/!{partitionKeyFromQuery:day}/
- S3 バケットエラー出力プレフィックス
- data/error-logs/
- その他設定
- デフォルトで設定、変えたい設定あればお好みで
aws-sdk-goを用いてデータをPUTする
package main
import (
"bytes"
"encoding/json"
"fmt"
"math/rand"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/firehose"
)
const (
dataStreamsName = "test-stream"
role_arn = "arn:aws:iam::XXXXXXXXXXXXXXXX"
ymd_layout = "20060102"
created_at_layout = "2006-01-02 15:04:05"
)
type Data struct {
Ymd string `json:"ymd"`
Number int `json:"number"`
CreatedAt string `json:"created_at"`
}
func main() {
sess := session.Must(session.NewSession())
firehoseService := firehose.New(sess, aws.NewConfig().WithRegion("ap-northeast-1").WithCredentials(stscreds.NewCredentials(sess,
role_arn,
)))
jst, err := time.LoadLocation("Asia/Tokyo")
if err != nil {
panic(err)
}
records := []*firehose.Record{}
for i := 1; i <= 10; i++ {
rand.Seed(time.Now().UnixNano())
data := Data{
Ymd: time.Now().In(jst).Format(ymd_layout),
Number: rand.Intn(1000),
CreatedAt: time.Now().In(jst).Format(created_at_layout),
}
b := new(bytes.Buffer)
json.NewEncoder(b).Encode(data)
record := &firehose.Record{Data: b.Bytes()}
records = append(records, record)
}
recordsBatchInput := &firehose.PutRecordBatchInput{}
recordsBatchInput = recordsBatchInput.SetDeliveryStreamName(dataStreamsName)
recordsBatchInput = recordsBatchInput.SetRecords(records)
resp, err := firehoseService.PutRecordBatch(recordsBatchInput)
if err != nil {
fmt.Printf("err: %v\n", err)
} else {
fmt.Printf("success: %v\n", resp)
}
}
S3バケットを確認
/data/YYYY/MM/DD/file
にファイルが生成されていることを確認する
ファイルの中身は以下の様な感じ
{"ymd":"20221213","number":805,"created_at":"2022-12-13 12:37:11"}
{"ymd":"20221213","number":435,"created_at":"2022-12-13 12:37:11"}
{"ymd":"20221213","number":485,"created_at":"2022-12-13 12:37:11"}
{"ymd":"20221213","number":709,"created_at":"2022-12-13 12:37:11"}
{"ymd":"20221213","number":553,"created_at":"2022-12-13 12:37:11"}
{"ymd":"20221213","number":613,"created_at":"2022-12-13 12:37:11"}
{"ymd":"20221213","number":673,"created_at":"2022-12-13 12:37:11"}
{"ymd":"20221213","number":701,"created_at":"2022-12-13 12:37:11"}
{"ymd":"20221213","number":576,"created_at":"2022-12-13 12:37:11"}
{"ymd":"20221213","number":787,"created_at":"2022-12-13 12:37:11"}
Athenaで確認
データベース作成
CREATE DATABASE `test_db`;
テーブル作成
CREATE EXTERNAL TABLE IF NOT EXISTS `test_db`.`test_table` (
`number` int,
`created_at` timestamp
)
PARTITIONED BY (
ymd string
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1',
'ignore.malformed.json' = 'true'
)
LOCATION 's3://test-bucket/data/'
TBLPROPERTIES (
"projection.enabled" = "true",
"projection.ymd.type" = "date",
"projection.ymd.format" = "yyyy/MM/dd",
"projection.ymd.range" = "2021/12/01,NOW",
"projection.ymd.interval" = "1",
"projection.ymd.interval.unit" = "DAYS",
"storage.location.template" = "s3://test-bucket/data/${ymd}"
);
AtehnaでSELECTしてみる
SELECT * FROM "test_db"."test_table" WHERE ymd = '2022/12/13';
# number created_at ymd
1 805 2022-12-13 12:37:11.000 2022/12/13
2 435 2022-12-13 12:37:11.000 2022/12/13
3 485 2022-12-13 12:37:11.000 2022/12/13
4 709 2022-12-13 12:37:11.000 2022/12/13
5 553 2022-12-13 12:37:11.000 2022/12/13
6 613 2022-12-13 12:37:11.000 2022/12/13
7 673 2022-12-13 12:37:11.000 2022/12/13
8 701 2022-12-13 12:37:11.000 2022/12/13
9 576 2022-12-13 12:37:11.000 2022/12/13
10 787 2022-12-13 12:37:11.000 2022/12/13
ちゃんとデータが読めていることが確認できました
まとめ
今回の実装を通して、初めて触ったKinesis Data FirehoseやAthenaについて少しわかる様になりました
SELECTを叩いてみると実行時間が遅いなーと感じて、調べてみたらAWS公式の記事でこんなものがありその中でも、
2.バケッティングでデータを分割する
4.ファイルサイズを最適化する
5.列指向データの作成を最適化する
が行えていなかったので、これを次に生かして行けたらいいなと思います