この記事は
データエンジニアのためのSaaS「trocco(トロッコ)」のアドベントカレンダー 2021
14日目の記事になります。
こんにちは。trocco開発メンバーのいとうです。
転送元のテーブルのデータ更新パターンに合わせて、
troccoでの転送設定の作り方と、DWH側のテーブル設計のTIPSを紹介します。
追加処理のみのテーブル
履歴テーブルのような
- 行が追加しかされない
- 追加されたあとは変更されない
ような特徴を持つテーブルであれば、
追加された行のみを転送することで効率よく転送できます。
転送元RDBMS系
転送元MySQLなどのRDB系の場合はtroccoの
**「差分転送方式」**を利用するのが便利です。
設定画面の補足にも書かれていますが、
増分データを判別するカラムは、created_at
カラムのような日付テーブル
もしくはidカラムのようなユニークキーを持つ場合は組み合わせて設定するのがおすすめです。
その他転送元SaaS系など
SaaS系の転送元であれば、カスタム変数で日付を指定する方法があります。
例えば転送元Marketoであれば、抽出する期間を指定することができますので
日付を実行時間に合わせて設定することで、増分のみを転送できます。
例えば、転送元Marketoのアクティビティログを取得したい場合は
以下のような設定ができます。
この設定で毎日実行すると、実行時の日付を元に前日0時〜当日の0時までの期間を動的に設定することができ、
BigQueryに転送することができるようになります。
更新処理を含むテーブル
多くの場合、追加のみとなるケースはほとんどなく
行を追加した後に変更がが行われる形式のテーブルがほとんだと思います。
その場合はtroccoのデータ転送設定に加えて、データマート自由記述モードでmergeクエリをおすすめします。
転送設定は、追加のみ設定とほぼ一緒ですが、
- 増分データを判別するカラムを
updated_at
カラムのようなデータの更新を補足できるカラムを指定するか - カスタム変数の指定する日時カラムを更新日時を意味するカラムに指定して期間を指定する
のいずれかで設定します。
転送先のテーブルは分析に利用するテーブルを直接指定せず、一時テーブルにWRITE_TRANCATE(洗い替え)します。
その後、本来利用するBigQueryテーブルに対して一時テーブルを元にmergeクエリを実行するデータマート設定を作成します。
クエリ例は以下の通りです。
-
sample_table
: 利用したいテーブル。転送元テーブル、もしくはオブジェクトと同一の名称にするのがおすすめ。 -
sample_table_daily_tmp
: 1回の転送データを持つ一時テーブル。↑のテーブルと同じテーブルスキーマにしておくこと。1日一回動くものとして daily という名称にしておきます。
MERGE `sample_mysql_database.sample_table` target USING `sample_mysql_database.sample_table_daily_tmp` tmp
ON(target.ID = tmp.ID)
WHEN MATCHED AND target.updated_at < tmp.updated_at THEN
-- IDが一致かつ、更新日時が増えている場合は行を更新する
UPDATE SET
-- 更新対象としたいカラムをすべて記述する
target.name = tmp.name
target.note = tmp.note
target.updated_at = tmp.updated_at
WHEN NOT MATCHED THEN
-- 不一致 = 新規行として判断し、追加する
INSERT ROW
詳しいMERGEクエリの記述方法はBigQueryの公式ドキュメントを参考にしてください。
削除処理を含むテーブル
削除処理については当たり前ですが、削除された行がどれなのかわかる必要があります。
都度洗い替えすれば確実ですが、データ量が多いと転送量も増えて効率が悪いです。
なので今回はID列が存在する場合は、ID列のみにしぼり取得、転送し
転送先のBigQueryテーブルに存在しない場合はdeleteするmergeクエリを書くことで対応できます。
※この場合はRDMS系以外だとオブジェクトの列を絞るなどの仕組みが存在しないと難しいです。
転送設定は対応するID列のみを抽出、一時テーブルに出力する転送設定を作成します。
データマートでは前項目同様自由記述モードでmergeクエリを書きます。
クエリ例は以下の通りです。
-
sample_table
: 前項目で紹介した例と同じテーブルを想定。 -
sample_table_daily_id_tmp
: 1回の転送データを持つ一時テーブル。↑のテーブルのID列のみを全件もってきます。
MERGE `sample_mysql_database.sample_table` target USING `sample_mysql_database.sample_table_daily_id_tmp` tmp
ON(target.ID = tmp.ID)
WHEN NOT MATCHED THEN
DELETE -- 一致しない = 行が消えたと判断し、BigQueryテーブルを削除する
また論理削除としてupdateするのも良いかもしれないです。
(アプリケーションのテーブル設計として論理削除フラグを持つテーブルを作るべきかという話はここではしません)
MERGE `sample_mysql_database.sample_table` target USING `sample_mysql_database.sample_table_daily_id_tmp` tmp
ON(target.ID = tmp.ID)
WHEN NOT MATCHED THEN
UPDATE SET
target.is_deleted = true
転送に失敗したとき
設定を誤ったり、外部サービスに起因するエラーにより
不要なデータが入ってしまったり、欠損してしまうことがあります。
極力手順を簡単にして再実行することで再度転送を行える形にしておく必要があります。
方法としては、
転送先はBigQueryテーブルをパーティションテーブル機能を活用します、
troccoでは 転送時刻カラム設定 で転送時の時刻を含めるカラムを追加しつつ
さらに、そのカラムをBigQueryのパーティションキーとするのがおすすめです。
こうしておくことで、仮に誤ってデータを送り込んでしまった場合に
実行時間カラムを条件にしてdelete文を書くことで削除したり
パーティションのlifetimeを指定することで、古いデータを削除しBigQueryの費用を節約が期待できます。
その他:古いデータを削除
データ活用・分析の仕組みがある程度できあがると
古いデータは不要になります。
最初から仕組みを作ることを考えるには削除する判断材料が少ないので難しいです。
そのため、まずはパーティションテーブルとして実行時間カラムを追加しておくのが良いです。
データが増えてきたあたりからデータを活用している期間が見えてくるはずなので、
その基準を元にパーティションの有効期限を設定し活用されないデータは削除してくのが良さそうです。
troccoの転送設定では、step2の画面よりパーティションテーブルの有効期限の設定ができます。
BigQueryパーティションの有効期限について詳しく知りたい方はこちらを確認ください。
- パーティションの有効期限を更新する
まとめ
転送元のテーブルやオブジェクトに合わせて、troccoでのデータ転送のパターンを紹介しました。
紹介したパターンで転送設定をつくり、ワークフローでつなげはたいていのデータの転送には対応できるかと思います。