はじめに
こんにちは、京セラコミュニケーションシステム 西田(@kccs_hiromi-nishida)です。
前編記事で、Dataformの概要とDataformを使うまでの準備について解説しました。
中編では、実際にDataformを使って簡単なデータパイプラインを作ってみようと思います!
できるだけわかりやすく解説しますので、最後までどうぞお付き合いください。
本記事は2023年8月ごろに作成しております。よって、引用している文章などはこの時点での最新となります。ご了承ください。
連載記事一覧
Dataformって何?便利そうだし調べてみた!(前編)
Dataformって何?便利そうだし調べてみた!(中編) ★本記事★
Dataformって何?便利そうだし調べてみた!(後編)
この記事の対象者
- Dataformの具体的な操作方法を知りたい方
- Dataformって実際にどんなことができるんだろうと気になっている方
今回作成するワークフローのイメージ
今回はBigQueryの一般公開データセットを使って、下図のイメージでワークフローを実装してみようと思います。
気象データと観測所データを結合し、そこに航空機定時到着データを合わせることで天候と航空機の遅延関係を確認するデータを作成するのが最終目標です。
※航空機定時到着データの最新が2012年のため、2012年のデータにて分析を行います。また日本の空港データがないため、アメリカのデータを用います。(アトランタ空港)
さっそく作ってみよう!
Dataformを使用するまでの準備は完了していることが前提となります。
準備がまだの方は前編記事をご確認ください。
前編 - 使うための準備をしよう
また、以降の画像は特別な記載のない限り、BigQuery-Dataform画面をキャプチャしたものとなります。
1. 事前作業
1-1. 不要なファイルの削除
前編で作成した開発ワークスペースを開き、自動作成されていた2つのSQLXファイルは不要なので削除しておきます。
SQLXファイルを選択すると、丸が3つつながったようなアイコンが表示されるので選択し、表示されたメニューの削除
を選択すれば削除できます。
1-2.ディレクトリの作成
データソーステーブルの定義ファイルを格納するディレクトリを作成します。
definitions
を選択し、ディレクトリを作成
を選択します。
表示された作成画面で、definitions/sources
と入力し、ディレクトリを作成
を選択します。
同じ手順で、definitions/tables
も作成しておきます。
2. データソーステーブルの定義
データソーステーブルは一番始めに必要になるテーブルで、先ほどのイメージ図で言うと一番左にある「気象データ」「観測所データ」「航空機定時到着データ」がデータソーステーブルとなります。
2-1. SQLXファイルの作成
データソーステーブルのSQLXファイルを作成します。
先ほど作成したsources
ディレクトリを右クリックし、ファイルを作成
を選択します。
以下の名前でファイル名をそれぞれ作成します。
(作成方法の解説は1の気象データで行います。2,3も同じように作成してください)
- 気象データ:ghcnd_2012.sqlx
- 観測所データ:ghcnd_stations.sqlx
- 航空機定時到着データ:airline_ontime_data_flight.sqlx
ファイル名を入力し、ファイルを作成
を選択します。
今回はBigQueryの一般公開データセットを利用します。以下の内容を貼り付けてください。
config {
type: "declaration",
database: "bigquery-public-data",
schema: "ghcn_d",
name: "ghcnd_2012"
}
オプション | 今回の設定値 | 説明 |
---|---|---|
type | declaration | データソースの場合はdeclaration固定 |
database | bigquery-public-data | BigQueryが属しているプロジェクトのID |
schema | ghcn_d | データソースが存在する BigQuery データセット名 |
name | ghcnd_2012 | データ ソースとして使用するテーブル or ビューの名前 |
ファイルが作成されました。
手順を繰り返し、残りのデータソースも作成してください。
config {
type: "declaration",
database: "bigquery-public-data",
schema: "ghcn_d",
name: "ghcnd_stations"
}
config {
type: "declaration",
database: "bigquery-samples",
schema: "airline_ontime_data",
name: "flights"
}
3. パーティションテーブルの定義
分析コスト節約のために作成することの多いパーティションテーブル。
これもDataFormで定義できます。
※パーティションテーブルに関しては以下の記事をチェックしてみてください!
BigQueryの料金体系をサクッと解説 -(おまけ)分析コストの削減
今回は先ほど作成した気象データ(ghcnd_2012.sqlx)のデータソースからパーティションテーブルを作成するためのSQLXを定義します。
ここでアメリカのデータに絞って取得するようにしておきます。
SQLXファイルの作成手順はSQLXファイルの作成と同じです。
※パーティションテーブルはtablesディレクトリの下に作成
config {
type: "table",
bigquery: {
partitionBy: "DATE_TRUNC(date, MONTH)"
}
}
SELECT
id,
date,
element,
value,
qflag
FROM
${ref("ghcnd_2012")}
WHERE
id like 'US%'
データソースの作成時と異なる点は、configブロックの設定内容とbodyブロックにSQLが定義されていることです。
オプション | 説明 |
---|---|
type | tableと記述 (参照:TableType) |
bigquery | BigQuery固有のウェアハウスオプション 今回はpartitionByでテーブルを分割するキーを指定 (参照:IBigQueryOptions) |
bodyブロックのSQLで重要なのは、${ref("ghcnd_2022")}
という箇所!
ref関数というのですが、この形式でテーブル参照を記述すると、Dataformが依存関係を定義してくれます。
※公式ドキュメント:ref を使用して依存関係を参照する
今回の場合、ghcd_2012 <- ghcd_2012_by_monthという依存関係が定義され、DAGでも確認できます。
航空機定時到着データもデータ量が多いので同じ手順でパーティションテーブルを作成しておきます。(クリックすると展開します)
航空機定時到着データのパーティションテーブル作成コード
config {
type: "table",
bigquery: {
partitionBy: "DATE_TRUNC(date_formatted, MONTH)"
}
}
SELECT
PARSE_DATE("%F", date) AS date_formatted,
departure_delay,
departure_airport,
arrival_airport,
FROM
${ref("flights")}
4. 目的のデータを取得するためのテーブルとViewを定義する
データソースとパーティションテーブルがそろったので、目的のデータを取得するためのテーブルとViewを定義していきましょう!
4-1. 気象データと観測所データのJoin表の定義
どの観測所で測定された気象データかがわかるように観測所データをJoinし、観測所名にATLANTAを含んでいるデータから降水量を取得します。
※データ量が多いので、半年ごとの(2012年1月~6月、7月~12月)テーブルを作成
config {
type: "table",
schema: "dataform",
name: "atlanta_weather_1to6",
description: "アトランタ1月~6月降水量",
columns: {
id: "id",
name: "観測所名",
date: "日",
prcp: "降水量"
}
}
SELECT
id,
name,
date,
MAX(prcp) AS prcp
FROM (
SELECT
wd.id,
s.name,
wd.date,
IF( wd.element = 'PRCP', wd.value / 10, NULL ) AS prcp
FROM
${ref("ghcnd_2012_by_month")} AS wd
JOIN
${ref("ghcnd_stations")} AS s
ON
wd.id = s.id
WHERE
wd.date BETWEEN "2012-01-01" AND "2012-06-30"
AND s.name LIKE "%ATLANTA%")
GROUP BY
id,name,date
ORDER BY
date
7月~12月も同様に作成します。(クリックすると展開します)
アトランタ7月~12月降水量のテーブル作成コード
config {
type: "table",
schema: "dataform",
name: "atlanta_weather_7to12",
description: "アトランタ7月~12月降水量",
columns: {
id: "id",
name: "観測所名",
date: "日",
prcp: "降水量"
}
}
SELECT
id,
name,
date,
MAX(prcp) AS prcp
FROM (
SELECT
wd.id,
s.name,
wd.date,
IF( wd.element = 'PRCP', wd.value / 10, NULL ) AS prcp
FROM
${ref("ghcnd_2012_by_month")} AS wd
JOIN
${ref("ghcnd_stations")} AS s
ON
wd.id = s.id
WHERE
wd.date BETWEEN "2012-07-01" AND "2012-12-31"
AND s.name LIKE "%ATLANTA%")
GROUP BY
id,name,date
ORDER BY
date
オプション | 説明 |
---|---|
type | tableと記述 (参照:TableType) |
schema | BigQueryのデータセット指定 |
name | table名を指定 |
description | tableの説明を記述 |
columns | tableの各列の説明を記述 |
4-2. 降水量と航空機の遅延を合わせて参照するViewの定義
4-1で作成したアトランタの降水量データと、航空機定時到着データのパーティションテーブルをJoinし、降水量と航空機の遅延に関係があるかを確認するViewを作成します。
今回作成するデータパイプラインの最終地点です!
このViewを定義することで、以下のデータを参照できるようになります。
- 出発空港がアトランタ空港
- 出発の遅れ時間
- アトランタ空港周辺の降水量
- 到着空港
- 対象期間は2012/1/1~2012/6/30と2012/7/1~2012/12/31
config {
type: "view",
schema: "dataform",
name: "relation_flight_weather_1to6",
description: "航空機の遅延と天候の関係を見るView(1月から6月)",
columns: {
date: "日",
name: "観測所名",
prcp: "降水量",
departure_delay: "出発の遅れ",
arrival_airport: "到着空港"
}
}
SELECT
wx.date,
wx.name,
wx.prcp,
f.departure_delay,
f.arrival_airport
FROM (
SELECT
STRING(date) AS date,
name,
prcp
FROM
${ref("atlanta_weather_1to6")}
WHERE
name like 'ATLANTA HARTSFIELD%'
AND qflag IS NULL) AS wx
JOIN
${ref("flight_by_month")} AS f
ON
STRING(f.date_formatted) = wx.date
WHERE
f.departure_airport = 'ATL' and
f.date_formatted BETWEEN "2012-01-01" AND "2012-06-30"
7月~12月も同様に作成します。(クリックすると展開します)
7月~12月のView
config {
type: "view",
schema: "dataform",
name: "relation_flight_weather_7to12",
description: "航空機の遅延と天候の関係を見るView(7月から12月)",
columns: {
date: "日",
name: "観測所名",
prcp: "降水量",
departure_delay: "出発の遅れ",
arrival_airport: "到着空港"
}
}
SELECT
wx.date,
wx.name,
wx.prcp,
f.departure_delay,
f.arrival_airport
FROM (
SELECT
STRING(date) AS date,
name,
prcp
FROM
${ref("atlanta_weather_7to12")}
WHERE
name like 'ATLANTA HARTSFIELD%'
AND qflag IS NULL) AS wx
JOIN
${ref("flight_by_month")} AS f
ON
STRING(f.date_formatted) = wx.date
WHERE
f.departure_airport = 'ATL' and
f.date_formatted BETWEEN "2012-07-01" AND "2012-12-31"
5. ワークフローの実行
では、最後にワークフローを実行し、定義通りにテーブルやViewができていることを確認します。
まず、DAGを確認してみましょう!
5-1. DAGの確認
Dataform画面の上部に、COMPILED GRAPH
というタブがあるので選択します。
このようにSQLXファイル内のref関数によって自動的に依存関係が定義されDAGが作成されていることがわかります。
5-2. 実行して結果を確認
画面上の実行を開始
を選択し、すべてのアクション
を選択します。
今回ははじめての実行なので、ALL ACTIONS
を選択し実行を開始
を選択してください。
実行結果はEXECUTIONS
から確認できます。
開始時間のリンクを選択すると詳細を確認でき、実行順から依存関係がきちんと管理されていることがわかります。
(たとえば、flight_by_monthやghcnd_2012_by_monthはそれ以降のTableやViewのInputとなるため、一番最初に作成が実行されていますよね)
実際にBigQueryのデータセットを表示し、今回Dataformで定義したとおりにテーブルやViewが作成されていることが確認できました。
6. まとめ
今回、Dataformで一連のワークフローを作成しました。
いかがでしたか?
クエリを記述する以外でそれほど難しい操作や記述はなかったな、意外と簡単だった!と思っていただけたでしょうか。
今回は一般公開データセットを使いましたが、自分で用意したデータセットを使い、Dataformでいろいろなデータ変換を試してみていただければと思います!
7. 次回予告
後編では以下のような内容を紹介する記事を作成する予定ですので、お楽しみに。
- ワークフローのテストを実施しよう
- ワークフローを定期実行させてみよう
※記事作成中に変更になる可能性はあります。ご了承ください!