LoginSignup
1
0

More than 1 year has passed since last update.

Kinesis Data Firehoseを用いてJSTでパーティショニングする

Last updated at Posted at 2022-12-21

はじめに

オークファン開発部で新卒2年目として働いてる@isodaです
業務でKinesisを用いたapiの実装に携わり、そこで得た知見などを備忘録として残したいと思います

Kinesis Data Firehoseの設定

今回の題名にもなっているJSTでパーティショニングする方法ですが、デフォルトのパーティションだとUTCでパーティションされてしまう為、動的パーティショニングを設定します

  • ソース
    • Direct Put
  • 送信先
    • S3
  • 配信ストリーム名
    • test-stream
  • S3 バケット
    • test-bucket
  • 動的パーティショニング
    • 有効
  • JSON のインライン解析
    • 有効
  • 動的パーティショニングキー
    • キー名 | JQ 式
    • year | .ymd[:4]
    • month | .ymd[4:6]
    • day | .ymd[6:8]
  • S3 バケットプレフィックス
    • data/!{partitionKeyFromQuery:year}/!{partitionKeyFromQuery:month}/!{partitionKeyFromQuery:day}/
  • S3 バケットエラー出力プレフィックス
    • data/error-logs/
  • その他設定
    • デフォルトで設定、変えたい設定あればお好みで

aws-sdk-goを用いてデータをPUTする

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"math/rand"
	"time"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/firehose"
)

const (
	dataStreamsName   = "test-stream"
	role_arn          = "arn:aws:iam::XXXXXXXXXXXXXXXX"
	ymd_layout        = "20060102"
	created_at_layout = "2006-01-02 15:04:05"
)

type Data struct {
	Ymd       string `json:"ymd"`
	Number    int    `json:"number"`
	CreatedAt string `json:"created_at"`
}

func main() {
	sess := session.Must(session.NewSession())
	firehoseService := firehose.New(sess, aws.NewConfig().WithRegion("ap-northeast-1").WithCredentials(stscreds.NewCredentials(sess,
		role_arn,
	)))
	jst, err := time.LoadLocation("Asia/Tokyo")
	if err != nil {
		panic(err)
	}
	records := []*firehose.Record{}
	for i := 1; i <= 10; i++ {
		rand.Seed(time.Now().UnixNano())
		data := Data{
			Ymd:   time.Now().In(jst).Format(ymd_layout),
			Number: rand.Intn(1000),
			CreatedAt: time.Now().In(jst).Format(created_at_layout),
		}
		b := new(bytes.Buffer)
		json.NewEncoder(b).Encode(data)
		record := &firehose.Record{Data: b.Bytes()}
		records = append(records, record)
	}
	recordsBatchInput := &firehose.PutRecordBatchInput{}
	recordsBatchInput = recordsBatchInput.SetDeliveryStreamName(dataStreamsName)
	recordsBatchInput = recordsBatchInput.SetRecords(records)
	resp, err := firehoseService.PutRecordBatch(recordsBatchInput)
	if err != nil {
		fmt.Printf("err: %v\n", err)
	} else {
		fmt.Printf("success: %v\n", resp)
	}
}

S3バケットを確認

/data/YYYY/MM/DD/file
にファイルが生成されていることを確認する
ファイルの中身は以下の様な感じ

{"ymd":"20221213","number":805,"created_at":"2022-12-13 12:37:11"}
{"ymd":"20221213","number":435,"created_at":"2022-12-13 12:37:11"}
{"ymd":"20221213","number":485,"created_at":"2022-12-13 12:37:11"}
{"ymd":"20221213","number":709,"created_at":"2022-12-13 12:37:11"}
{"ymd":"20221213","number":553,"created_at":"2022-12-13 12:37:11"}
{"ymd":"20221213","number":613,"created_at":"2022-12-13 12:37:11"}
{"ymd":"20221213","number":673,"created_at":"2022-12-13 12:37:11"}
{"ymd":"20221213","number":701,"created_at":"2022-12-13 12:37:11"}
{"ymd":"20221213","number":576,"created_at":"2022-12-13 12:37:11"}
{"ymd":"20221213","number":787,"created_at":"2022-12-13 12:37:11"}

Athenaで確認

データベース作成

CREATE DATABASE `test_db`;

テーブル作成

CREATE EXTERNAL TABLE IF NOT EXISTS `test_db`.`test_table` (
`number` int,
`created_at` timestamp
)
PARTITIONED BY (
 ymd string
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1',
'ignore.malformed.json' = 'true'
) 
LOCATION 's3://test-bucket/data/'
TBLPROPERTIES (
 "projection.enabled" = "true",
 "projection.ymd.type" = "date",
 "projection.ymd.format" = "yyyy/MM/dd",
 "projection.ymd.range" = "2021/12/01,NOW",
 "projection.ymd.interval" = "1",
 "projection.ymd.interval.unit" = "DAYS",
 "storage.location.template" = "s3://test-bucket/data/${ymd}"
);

AtehnaでSELECTしてみる

SELECT * FROM "test_db"."test_table" WHERE ymd = '2022/12/13';

#	number	created_at	ymd
1	805	2022-12-13 12:37:11.000	2022/12/13
2	435	2022-12-13 12:37:11.000	2022/12/13
3	485	2022-12-13 12:37:11.000	2022/12/13
4	709	2022-12-13 12:37:11.000	2022/12/13
5	553	2022-12-13 12:37:11.000	2022/12/13
6	613	2022-12-13 12:37:11.000	2022/12/13
7	673	2022-12-13 12:37:11.000	2022/12/13
8	701	2022-12-13 12:37:11.000	2022/12/13
9	576	2022-12-13 12:37:11.000	2022/12/13
10	787	2022-12-13 12:37:11.000	2022/12/13

ちゃんとデータが読めていることが確認できました

まとめ

今回の実装を通して、初めて触ったKinesis Data FirehoseやAthenaについて少しわかる様になりました
SELECTを叩いてみると実行時間が遅いなーと感じて、調べてみたらAWS公式の記事でこんなものがありその中でも、
2.バケッティングでデータを分割する
4.ファイルサイズを最適化する
5.列指向データの作成を最適化する
が行えていなかったので、これを次に生かして行けたらいいなと思います

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0