実現したいこと
BigQuery上に蓄積したデータを使って何かしらの集計データをテーブルに格納する時に
Dataformでテーブルに入れるデータの集計ロジックをコードで管理しつつ、定期更新できるようにします。
Dataform上にもワークフローを定義してスケジューリングする機能が提供されていますが、BigQueryのデータを更新するスケジューラーが分散すると管理しづらいため、NateeではETLツールとして使っているDigdagで更新する方式を取ることにしました。
DataformをAPI経由で使う時に、画面の操作からは直感的に理解しづらいポイントがあったので備忘録がてらまとめます。
Dataformの基本的な使い方については触れません。
実装
Dataform
クエリの結果をテーブルに格納したい場合は、下記のような記述でDataformに登録します。
config {
type: "table",
schema: "sample",
description: "Qiita用のサンプルテーブル",
tags: ["daily"],
columns: {
id: "autoincrement",
name: "名前",
created_at: "作成日",
}
}
SELECT
id,
name,
created_at
FROM
${ref(qiita_example)}
WHERE
weight > 100
ポイントはtagsの"daily"です。
日時で更新したいテーブル全てにdailyタグをつけておくことで、日時更新対象をタグベースで判定できます。
Digdag(呼び出し側)
上記のテーブル更新を実行するためにDigdagからpythonでAPIをコールしてワークフローを実行します。
(Digdagである必要はないですし、Cloud Composerを使うとより容易に実現できます)
Dataform上でワークフローを実行するために必要な手順は下記の4つです。
- APIの有効化とサービスアカウントに権限追加
- 必要なライブラリのインストール
- sqlxファイルのコンパイル
- コンパイル結果を使ってワークフローを実行
APIの有効化と権限追加
GCPを利用している人ならわかると思いますが、DataformのAPIを有効化し
下記ページを参考に適切な権限を付与しましょう。
https://cloud.google.com/dataform/docs/access-control?hl=ja#predefined_roles
ライブラリインストール
Pythonライブラリが提供されているのでそれを使います
pip install google-cloud-dataform
コンパイル
画面上では自動的にやってくれるのであまり意識することがないですが、APIだと明示的に実行する必要があります。
自分はdeployというworkspaceを作っていて、mainブランチにマージしたコードをdeployワークスペースから反映するフローにしています。
(まだ自動化はできていないので、ここもGithub Actions経由でできるようにしていきたい)
そのため、deployワークスペースを参照すれば最新コードが存在しているという前提で下記のような処理でコンパイルします。
import google.cloud.dataform as dataform
#...略
client = dataform.DataformClient()
parent = f"projects/{project}/locations/{location}/repositories/{repository}"
compilation_result = dataform.CompilationResult(
name="deploy workspace compilation result",
workspace=f"{parent}/workspaces/deploy"
)
request = dataform.CreateCompilationResultRequest(parent=parent, compilation_result=compilation_result)
compilation_result = client.create_compilation_result(request=request)
dataform.CompilationResult の引数に特定workspaceを指定することでそのワークスペースに反映されているコードの内容からコンパイル結果を得ることができます。
git_commitishという引数にブランチ名を渡すことも可能ですが、運用のフロー的にワークスペースを利用しています。
CompilationResultRequestを作成してcreate_compilation_resultに渡すことでコンパイル結果を取得できます。
ワークフローの実行
次にコンパイル結果を使ったワークフローの実行です。
Dataformの画面からBigQueryに適用する際には裏でワークフローが実行されているのでAPI経由でそれを再現します。
dailyタグが含まれるものを全て実行するワークフローの起動は下記のコードで実現できます。
invocation_config = dataform.WorkflowInvocation.InvocationConfig(included_tags=["daily"])
wf_invocation = dataform.WorkflowInvocation(
name="daily workflow invocation",
compilation_result=compilation_result.name,
invocation_config=invocation_config
)
wf_invocation_request = dataform.CreateWorkflowInvocationRequest(parent=parent, workflow_invocation=wf_invocation)
result = client.create_workflow_invocation(request=wf_invocation_request)
最後に
上記の手順はあまり調べても出てこないので、ライブラリの中のコメントを参考に実装しました。
特に日本語の例が少ないので今後Dataformがより盛り上がっていくと嬉しいですね!