Google Cloudが買収し話題になったDataformを使ってみる。
https://cloud.google.com/blog/ja/products/data-analytics/welcoming-dataform-to-bigquery
BigQueryにデータ持ってきてる前提で、BigQuery内部のテーブル、データ項目、クエリの依存関係等を管理できるようだ。
ファイルの記述は基本的にSQLXという形式で記述する(JSONとSQLを混ぜたような感じ)
#前提
すでにGCPプロジェクトはある。
#始めてみる
##事前準備
Dataform用にBigQuery Adminロールのサービスアカウントを作り、JSONキーを発行する。
方法は割愛。
##Dataformのアカウントを作る
dataformにアクセスして、START NOWをクリック。グーグルアカウントで始められる。
BigQueryのデータセットのロケーションを選択し、事前に作ったサービスアカウントのJSONキーをアップする
#Dataformベストプラクティス
参考:https://docs.dataform.co/best-practices/start-your-dataform-project
##フォルダ構成を整える
どうやら以下3つのフォルダを作ってファイル管理することがベストプラクティスのようである
- Sources:(多分)テーブル情報とか正規化とか型とか色々クレンジングされてる状態にするためのファイル
- Staging:(多分)なるべく汎用化したテーブルを作るためのファイル
- Analytics(Reporting?):(多分)データマートを作るためのファイル
##ref()を使う
SQLのテーブル名は直接でなく、ref()を使って書くのが推奨らしい。
ref()使うことによって、依存関係エラーをリアルタイムに検知できるようだ。
config { type: "table" }
select * from ${ref("my_table")}
##declareでソースデータを定義する
ここでソースデータ定義しておくと、ref()で使えるようになる。
config {
type: "declaration",
database: "SNOWFLAKE_SAMPLE_DATA",
schema: "TPCH_SF1",
name: "CUSTOMER",
}
##タグでスケジュール管理する
SQLXにタグを記述することで、スケジュール実行を管理できるようである。
まずは以下2つのタグで始めてみるのが良いらしい。
- daily
- hourly
config { type: "table", tags: ["daily"] }
select * from ${ref("crm_data")}
##必要に応じてカスタムスキーマを使う
デフォルトではdataform.jsonでスキーマが定義されているが、SQLXファイルのconfigブロックでオーバーライドできる。
config {
type: "view",
schema: "staging",
tags: ["staging", "daily"],
description: "Cleaned version of the raw customer table."
}
select
c_custkey as customer_key,
c_name as full_name,
...
##アサーションを使用したテスト
データの品質を確認できる。通知を設定している場合、アサーションエラーとなった場合に検知できる。
以下の3つを設定可能。
- uniqueKey:一意確認
- nonNull:Nullでないこと確認
- rowConditions:カスタムの式。どこかの行でFalseになるとエラー
config {
type: "table",
assertions: {
nonNull: ["order_date", "order_key", "customer_key"],
uniqueKey: ["order_key"],
rowConditions: [
"total_parts >= 0"
]
}
}
select ...
##データ内容を記述する
コード内にデータ内容(テーブル内容、カラム内容)を記述することで、コラボレーションが容易になる。他のツールへのエクスポートも可能。
config {
type: "table",
description: "This table contains summary stats by date aggregated by country",
columns: {
order_date: "Date of the order",
order_id: "ID of the order",
customer_id: "ID of the customer in the CRM",
order_status: "Status of the order, from Shopify",
payment_status: "Status of payment, from Stripe",
payment_method: "Credit card of ACH",
item_count: "Number of items in that order",
amount: "Amount charged for that order, in US dollars using a floating FX rate"
}
}
select ...
##include マクロを使用して計算式を再利用する
CASE文等で固定値を使ってしまってる場合とか、事前に定義しておけばなにか変更が合った際にこのファイルを修正するだけでよくなる。
module.exports = (country) => {
return `
case
when ${country} in ('US', 'CA') then 'NA'
when ${country} in ('GB', 'FR', 'DE', 'IT', 'PL', 'SE') then 'EU'
when ${country} in ('AU') then ${country}
else 'Other'
end`;
}
config { type: "table"}
select
country as country,
${country_group("country")} as country_group,
...
#試しにフロー作ってみる
上記までのベストプラクティス情報を参考にしながら、以下のようなフローを作ってみる。
BQの天気パブリックデータを使う。SQLは以下の記事で使ってるものをベース。
https://qiita.com/yakamazu/items/5e29208536a1374a50cd
(サンプルが適切なのかわからないが、雰囲気は掴めるだろうということで。。)
- 対象テーブルを定義する⇒ definitions/source
- 条件、計算式を定義する ⇒ includes
- 対象年でデータ抽出 ⇒ definitions/staging
- 天気データを作成 ⇒ definitions/analytics
##フォルダを作成する
definitionsの下にsources, staging, analyticsのフォルダを作る
includesは最初からある
##対象テーブルを定義する
definitions/Sourcesに入力元となるBigQueryテーブルのdeclarationファイルを作る
出来たファイルを以下の内容で修正する。
config {
type: "declaration",
database: "bigquery-public-data",
schema: "ghcn_d",
name: "ghcnd_*",
}
同様に以下ファイルも作る
config {
type: "declaration",
database: "bigquery-public-data",
schema: "ghcn_d",
name: "ghcnd_stations",
}
##条件、計算式を定義する
includes/ に以下のファイルを作成する
###固定値を指定するファイル
const start_year = "'2015'";
const end_year = "'2020'";
const japan = "'JA'";
module.exports = {
start_year,
end_year,
japan,
};
###計算式のファイル
降雨量や最高気温等を計算する用のロジックのファイルを作成する
function element_value(element, kind, value){
return `
case ${element} when ${kind} then ${value} / 10 else 0 end
`;
}
module.exports = { element_value };
##対象年でデータ抽出
definitions/Stagingにテーブル作成する
テーブルは通常版と1日前版の2つ作る
config {
type: "table",
name: "ghcnd_org",
description: "天候データ",
columns:{
id: "id",
date: "日付",
element: "データの種類",
value: "値",
}
}
select
id,
date,
element,
value,
from
${ref("ghcnd_*")}
where
_TABLE_SUFFIX between ${constants.start_year} and ${constants.end_year}
右側にコンパイルされたSQLが出てくるので、BQのWEB UIに貼り付けて問題ないか確認できる
同様に1日前版も作る
config {
type: "table",
name: "ghcnd_1day_ago",
description: "天候データ(1日前)",
columns:{
id: "id",
date: "日付(1日前)",
element: "データの種類",
value: "値",
}
}
select
id,
date_add(date, interval -1 day) as date,
element,
value,
from
${ref("ghcnd_*")}
where
_TABLE_SUFFIX between ${constants.start_year} and ${constants.end_year}
##天気データを作成
stagingに作られたテーブルをベースにdefinitions/Analyticsに天気データを作成する
config {
type: "table",
description: "天候データ(完成版)",
columns:{
date: "日付",
name: "観測地点",
latitude: "緯度",
longitude: "経度",
prcp: "降雨量",
snow: "降雪量",
snwd: "積雪量",
tmax: "最高気温",
tmin: "最低気温",
tavg: "平均気温",
}
}
select
ghcnd.date,
stations.name,
stations.latitude,
stations.longitude,
max(${mapping.element_value("ghcnd.element", "'PRCP'", "ghcnd_1day_ago.value")}) as prcp, --降雨量(ミリメートル)
max(${mapping.element_value("ghcnd.element", "'SNOW'", "ghcnd_1day_ago.value")}) as snow, --降雪量(ミリメートル)
max(${mapping.element_value("ghcnd.element", "'SNWD'", "ghcnd_1day_ago.value")}) as snwd, --積雪量(ミリメートル)
max(${mapping.element_value("ghcnd.element", "'TMAX'", "ghcnd.value")}) as tmax, --最高気温
max(${mapping.element_value("ghcnd.element", "'TMIN'", "ghcnd.value")}) as tmin, --最低気温
max(${mapping.element_value("ghcnd.element", "'TAVG'", "ghcnd.value")}) as tavg, --平均気温
from
${ref("ghcnd_org")} as ghcnd
left outer join
${ref("ghcnd_1day_ago")} as ghcnd_1day_ago
on
ghcnd.id = ghcnd_1day_ago.id and
ghcnd.date = ghcnd_1day_ago.date and
ghcnd.element = ghcnd_1day_ago.element
inner join
${ref("ghcnd_stations")} as stations
on
ghcnd.id = stations.id and
substr(stations.id, 1, 2) = ${constants.japan} --日本だけに絞る
group by
1,2,3,4
##フローを見てみる
左上のメニューの「Dependency tree」から依存関係確認できる。
ref()を入力に使うだけで勝手に依存関係作ってくれるっぽい。
##実行してみる
###フローの実行
右上の「START NEW RUN」からフローを実行できる
実行は3タイプから選択できる
- プロジェクト全体実行
- タグを設定している場合は該当タグのプログラムのみ
- 実行プログラムの指定
今回はプロジェクト全体実行する
###実行状態の確認
フローを実行していると右上に「run in progress」が出てくる
###過去の実行ログを確認
左上のメニューの「Run logs」から確認可能
###実行結果
実行結果はデフォルト(dataform.jsonにて定義されている)では、dataformデータセットに作成される
configにdescriptionやcolumnsを設定しておけば、BQに説明として反映してくれるようだ
#感想
SQLも知らないうちにカオスなことになることが多いので、Dataformで管理するのは便利かもしれない。(githubとも連携できるらしい)
includeで計算ロジックも一元化できるのは使い勝手が良さそう。
Airflowからもキックできそうで、結局どのようにデータパイプラン管理するのが最適なのか悩ましい。。
#参考
すごいぞ Dataform
DataformでBigQueryのデータ変換を試してみた
無料のDataformでBigQueryにおけるデータ加工のDXを改善して幸せになろう