はじめに
公式ドキュメントを元に、業務でDataformを使いました。
そこで得られたtipsを、ここにまとめておきます。
想定読者
- Dataformが何をするツールか知っている
- Dataformを使ったことがある
注意事項
Dataformには、いくつか種類があります。
ここで扱うのは、2023年1月現在のGCP版(パブリックプレビュー段階) です
ネットの海には、依存グラフやWeb UIなどが散見されますが、あれは(現在)使えません。
tips一覧
環境について
Dataformでは、まずリポジトリとワークスペースを作ります。
これが何を指しているのか、最初分かりにくいと思います。
DataformとGitHubの連携時、それぞれは以下に相当します。
- リポジトリ→リポジトリ
- ワークスペース→ブランチ
これを見て、「作業用のワークスペースと別に、mainブランチに相当するワークスペースが必要!」
と思われるかもしれませんが、それは不要です。
なぜなら、Dataform側で管理していないブランチ(名のワークスペース)も、指定して実行することができるからです。
弊チームでの運用方法
リポジトリのデフォルトブランチをmainにした上で、「workspace」というワークスペースを1個だけを運用しています。
mainブランチをスケジュール実行しているため、作業用ブランチ(ワークスペース)だけで十分だからです。
mainブランチは、以下の方法で更新しています
- workspace(Dataform)でcommit -> push
- GitHubでPR作成 -> (レビュー) -> merge
また、Dataformからmainへの直pushを防止すべく、GitHubのmainブランチへ以下のBranch protection rule
を入れています
Require a pull request before merging
Do not allow bypassing the above settings
まとめると、Dataform(とBigQuery)以外に、以下プロダクトを使用しています。
- GitHub(Dataform連携用)
- workflows(スケジュール実行用)
- scheduler(スケジュール実行用)
エラー対応方法
Dataformを用いた開発を行っていると、エラーに遭遇することがあると思います。
エラー対応に一番有効だった方法は、エラーメッセージとエラー箇所を読むことです。
「なにを当たり前のことを…」と思われるかもしれません。
しかし、以下の理由で、その当たり前を強調しました。
- エラーがsqlx自体でなく、sqlxから生成されたsql文に対して発生することが多い
- ドキュメント通りに書いてもエラーになる場面が多い
ありがちなエラーをいくつか紹介します。
例1 incremental tableのWHERE重複
incremental tableにおいて、公式ドキュメントによると、最終行に以下のような行追加条件を設定する必要があるみたいです。
${when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }
もし、上記構文の上にWHERE区があった場合、エラーになります。
なぜなら、生成されるSQL構文は以下のようになり、WHERE区が重複するからです
WHERE 1 = 1 -- 追加行
WHERE timestamp > (SELECT MAX(timestamp) FROM `test_project.dataform.test_table`)
結果、このようなエラーメッセージが表示されます
Syntax error: Expected ")" but got keyword WHERE at [78:1]: internal
対象の78行目に飛ぶと、すぐこの原因に気付けます。
すでにWHERE区が使われている場合は、バッククォート内WHEREをANDに変更すると解消しそうですね。
例2 requirePartitionFilterの入ったテーブルでパーティションの未設定
Dataformでは、パーティションフィルタの設定ができます。
WHERE区でパーティションカラムの指定を必須にするものです。
https://cloud.google.com/dataform/docs/partitions-clusters#set_a_partition_filter
これを用いて、以下のようなincremental tableを設定します。
dtが日付で、日次で積み上げるイメージです。
config {
type: "incremental",
bigquery: {
partitionBy: "dt",
requirePartitionFilter: true
}
}
-- 省略 --
${when(incremental(), `WHERE dt > (SELECT MAX(dt) FROM ${self()})`) }
結果、以下のようなエラーに遭遇します
Query error: Cannot query over table 'test_project.dataform.test_table' without a filter over column(s) 'dt' that can be used for partition elimination at [28:11]: internal
データを積み上げるかどうかの判定時、自身へのテーブルへクエリを叩きます。
その際(when構文のWHERE区)、パーティションが設定されてないのが原因みたいです。
以下のようなWHERE区の追加で、解決しました。
(日次で前日分のデータを取得するため、その日付が入る範囲を指定)
${ when(incremental(), `where dt > (SELECT MAX(dt) FROM ${self()} where dt > CURRENT_DATE(‘Asia/Tokyo’) - 2)`) }
incremental tableの公式ドキュメントに、パーティションフィルタのケースが無いので、紹介しました。
increment table運用方法
カラム変更など、既存データにも影響のある変更をする場合、「完全に更新して実行する」ボタンを押した方が良いです。
実行成功するのにデータが積み上がらなかったり、一生エラーが出たりします。
他ありがちなエラーは、最初に紹介したので省略します。
おわりに
数週間ほどDataformを触ってみて、得られたtipsを紹介しました。
「これは違う」「これはこうした方が良い」などのマサカリお待ちしております。