この記事は「BigQuery Advent Calendar 2024」の18日目の記事です。
はじめに
こんにちは、hoppy(@its_my_hoppy)といいます。
この記事では、最近自分が行ったGCSのcsvデータをBigQueryに自動で同期するといったことをどのようにして実装したか書いてみようと思います。
実装の要件
実装の要件は下記の通りとなっています
- csvはGCSに毎日アップロードされる
- アップロードされる内容はその日の更新分のみ
- csvのファイル名は毎日同じファイル名でアップされる
- 更新分を毎日BigQueryのtableにも反映させたい
- tableはupsert(insert or update)で、あるカラムの値がすでにある場合は行を更新してない場合は行を挿入する
実装
csvを下記と仮定して話を進めていこうと思います。
id(int) | hoge(str) | creatad_at(date) |
---|---|---|
1 | fuga | 2024-01-01 |
2 | foo | 2024-01-02 |
GCSからBigQueryにデータを流す
GCS上にあるcsvデータをBigQueryに流すのはdata transferを用いて行いました。
ここでのステップとしては、
- 流したいcsvデータと同様のschemaの空のtableをBigQuery側に作成する
- data transferで送信スケジュールを設定する
となっています。
空のtableを作成する際は下記DDLをBigQueryのコンソールで実行すれば作成できます。
CREATE TABLE dataset_name.table_name (
id INT64,
hoge STRING,
created_at DATE
);
ここでの注意点は下記の2点です。
- csv側のschemaと一致していること
- カラム名とデータ型が一致している
- ここで作成したtableはあくまでもGCSからのデータを一時的に保存するためのtable
- BigQuery上で更新されたものをすべて保持しておくtableは別で作成するので混同しないような命名にしておく
data transferでの送信スケジュールの設定は下記のように設定しました。
ここでの注意点は下記の3点です。
- Write preferenceをMIRRORにする
- MIRRORは更新のたびにdestination table(ここでは上で作成したもの)を上書きしてくれます
- APPENDは上書きせずレコードを追加をしてくれるが今回やりたい実装に適していないので利用していない
- Header rows to skipを1にする
- 基本的にcsvの1行目はカラム名が入っていると思うのでここは取り込まないようにする
- サービスアカウントに適切なroleを付与しておく
- 今回自分はSAを作成して実行をしているがmustではない
BigQueryに更新されたデータをtableにupsertしていく
ここまでで、GCSにアップロードされたcsvデータをBigQueryに送るとこまで完了しました。
次は、上記で作成した一時tableのデータを対象table(更新分をすべて蓄積しておくためのもの)にupsertしていく部分について説明していきます。
ここでのステップとしては、
- もしない場合は対象となるtableを作成する
- upsertするためのクエリを書く
- 書いたクエリをscheduled queryとして自動で動かすようにする
となっています。
tableを作成するのは上記と同様にDDLを実行して作成します。
命名が被らない、混同しないようにすれば問題ないと思います。
次にupsertをするためのクエリを書きます。
ここではidカラムをキーとして、対象tableに同じキーが存在していた場合は一時テーブルのデータで行を更新して、存在しない場合は一時テーブルのデータを挿入するといった挙動になるようにします。
クエリは以下となります。
MERGE
`project_name.dataset_name.対象table_name` AS T
USING
`project_name.dataset_name.一時table_name` AS S
ON
T.id= S.id -- キーの指定
WHEN MATCHED THEN -- キーが存在していたときの処理
UPDATE SET
T.hoge = S.hoge,
T.created_at = S.created_at,
WHEN NOT MATCHED THEN -- キーが存在していないときの処理
INSERT (
id,
hoge,
created_at,
)
VALUES (
S.id,
S.hoge,
S.created_at,
)
ここでの注意点は特にないのですが、data transferの部分でAPPENDを採用しなかったのはキーが重複しているとここでうまくクエリが動作しないためです。
クエリを作成したらコンソール上のスケジュールボタンからクエリの実行間隔と時間を設定したら完了です。
おわりに
初めてGCSからBigQueryにデータを転送する仕組みを作成しましたが、結構簡単にできました。
データ基盤でいうところのextractの部分にあたり、ユースケースによっては使えることが多いかなといった印象です。
今回はすべてGUI上で設定しましたが、時間がある時にterraformでコード管理できるようにしようかなと思っています。
ここまで読んでいただきありがとうございました!
ぜひX(@its_my_hoppy)のフォローもよろしくおねがいします!