はじめに
公式ドキュメントを読んでDataformをキャッチアップしたときのメモです。
Dataformとは
データ アナリストが BigQuery でデータ変換を行う複雑な SQL ワークフローを開発、テスト、バージョン管理、スケジュール設定するためのサービスです。
Dataform を使用すると、データ統合の ELT(抽出、読み込み、変換)プロセスにおけるデータ変換を管理できます。Dataform では、ソースシステムから抽出されたデータを BigQuery に読み込むと、明確に定義されたテスト済みで一連のデータテーブルに変換できます。
引用: Dataform の概要
Dataformを使うメリット
データパイプラインによる効率化と柔軟性向上
-
分割されたコンポーネント化:
- 巨大なSQLファイルの代わりに、Dataformではコンポーネント化されたモデルを使用します。これにより、各データソースが明確になり、依存関係も明確化されます。これにより、大規模なデータパイプラインを管理しやすくなります。
-
ETLプロセスの自動化:
- データの抽出、変換、ロード(ETL)プロセスを自動化することで、手動でのタスク実行から解放され、時間を節約できます。これにより、データの品質や整合性を確保しながら、迅速にデータを処理することが可能です。
-
柔軟でシンプルなデータモデル定義:
- DataformではJavaScriptも併用してデータモデルを定義することで、柔軟性が高く、複雑なデータパイプラインもシンプルに記述できます。これにより、迅速かつ効率的にデータの加工や変換を行うことができます。
-
データリネージの可視化:
- Dataformでは、データの流れや変換プロセスを視覚的に追跡できるデータリネージ機能を提供しています。これにより、データの出所や変換履歴を簡単に確認できるため、データの信頼性と透明性が向上します。エラーや問題のトラブルシューティングが容易になり、データの管理が一層効率的になります。
フルマネージドの利便性とセキュリティ
-
インフラの管理不要:
- Dataformはサーバーレスで提供され、インフラの管理が不要です。これにより、利用料も無料でありながら、可視化やスケジューリング実行などの機能を提供します。
テストの自動化とバージョン管理の効果
-
テストのサポート:
- ユニットテストフレームワークを使用して、データ品質を自動化されたテストで保証することができます。これにより、データの正確性と信頼性を維持しながら、開発プロセスを迅速化できます。
-
バージョン管理の利便性:
- DataformはGitと連携することで、コードベースのバージョン管理と変更追跡が可能です。これにより、チームでの協力や運用が円滑に進み、過去の変更履歴を容易に確認できます。さらに、リリース管理やロールバックもシンプルに行えるため、開発サイクルの効率が向上します。
GCPとの深い統合で得られる利点
-
Google BigQueryとのシームレスな統合:
- DataformはGoogle BigQueryをはじめとするGoogle Cloud Platform(GCP)サービスとシームレスに統合されており、大規模なデータウェアハウス環境での使用が推奨されます。この統合により、データの処理速度やスケーラビリティが向上し、コスト効率も高まります。
-
セキュリティの強化:
- GCPのネイティブセキュリティ機能を活用することで、データの保護とコンプライアンスを確保できます。
-
他のGCPサービスとの統合:
- DataformはGoogle Cloud StorageやPub/Subなど、他のGCPサービスとも統合することができます。これにより、データの収集、保存、処理を一貫したワークフローで行うことができ、データエンジニアリングの効率が向上します。
利用手順例
前提、シチュエーション
- 既存のBigQueryテーブル(
dwh_customers
とdwh_orders
)がある前提とします。 - これらのソーステーブルを基にしたデータ集計を行い、最終的な出力テーブル**
dm_customer_orders
を作成します。出力テーブルはorder_date
**でパーティション化されます。 - アサーションファイルを使用して、データ品質を検証します。
最終的に下記を作成します
0. データを準備する
-
データセット作成
- dwh_sample
- dm_sample
-
テーブル作成・データ挿入
-- テーブルdwh_customersの作成 CREATE TABLE `project_id.dwh_sample.dwh_customers` ( customer_id INT64, customer_name STRING, customer_email STRING, created_at TIMESTAMP ); -- テーブルdwh_ordersの作成 CREATE TABLE `project_id.dwh_sample.dwh_orders` ( order_id INT64, customer_id INT64, order_amount FLOAT64, order_date TIMESTAMP ); -- データ挿入 INSERT INTO `project_id.dwh_sample.dwh_customers` (customer_id, customer_name, customer_email, created_at) VALUES (1, 'Alice', 'alice@example.com', '2023-01-01 00:00:00 UTC'), (2, 'Bob', 'bob@example.com', '2023-02-01 00:00:00 UTC'), (3, 'Charlie', 'charlie@example.com', '2023-03-01 00:00:00 UTC'); INSERT INTO `project_id.dwh_sample.dwh_orders` (order_id, customer_id, order_amount, order_date) VALUES (1, 1, 100.0, '2023-01-15 00:00:00 UTC'), (2, 2, 200.0, '2023-02-15 00:00:00 UTC'), (3, 1, 150.0, '2023-03-01 00:00:00 UTC'), (4, 3, 300.0, '2023-03-15 00:00:00 UTC');
1. コードを管理するためのリポジトリを作成する
- コンソール>BigQuery>Dataform>Create repository
- サービスアカウントを指定(デフォルトも指定可能)
2. 開発用のワークスペースを作成する
- リポジトリを選択>Create development workspace
- 開発者ごとのワークスペースを作成する
- Initialize workspace
- 初期設定ファイルが配置される
3.開発ワークスペースで SQL ワークフローを開発する
-
後続の手順で下記を作成していく
definitions/ sources/ dwh_customers.sqlx dwh_orders.sqlx intermediate/ output/ dm_customer_orders.sqlx dm_customer_orders_assertion.js extra/ includes/ dataform.json
-
sourcesディレクトリ
-
dwh_customers.sqlx
- SELECTなくても動作する
config { type: "declaration", database: "project_id", schema: "dwh_sample", name: "dwh_customers" } SELECT * FROM `project_id.dwh_sample.dwh_customers`
-
dwh_orders.sqlx
config { type: "declaration", database: "project_id", schema: "dwh_sample", name: "dwh_orders" } SELECT * FROM `project_id.dwh_sample.dwh_orders`
-
-
outputディレクトリ
-
dm_customer_orders.sqlx
- クエリの中でQUALIFYするか。ROW_NUMBERなど
config { type: "table", database: "project_id", schema: "dm_sample", name: "dm_customer_orders", description: "Aggregated customer orders with partitioning by last_order_date", columns: { customer_id: "Customer ID", customer_name: "Customer Name", total_amount: "Total Order Amount", last_order_date: "Last Order Date" }, tags: ["output"], uniqueKey: ["customer_id"], bigquery: { partitionBy: "DATE(last_order_date)" } } SELECT c.customer_id, c.customer_name, SUM(o.order_amount) AS total_amount, MAX(o.order_date) AS last_order_date FROM ${ref("dwh_customers")} AS c JOIN ${ref("dwh_orders")} AS o ON c.customer_id = o.customer_id GROUP BY c.customer_id, c.customer_name
-
dm_customer_orders_assertion.js
module.exports = { assertions: { unique_keys: ["customer_id"], not_null: ["customer_id", "customer_name", "total_amount", "last_order_date"], custom: [ { query: "SELECT COUNT(*) = 0 FROM dm_customer_orders WHERE total_amount <= 0", description: "Ensure total_amount is positive" }, { query: "SELECT COUNT(*) = 0 FROM dm_customer_orders WHERE last_order_date IS NULL", description: "Ensure last_order_date is not null" } ] } }
-
4. Dataform コアを SQL にコンパイルする
- コンソールで開発する場合、コーディングしながらコンパイルされている
- リアルタイムでコード検証がされてエラーチェックをすることができます
- COMPILED QUERIESタブで実行されるSQLを確認可能です
- Dataform>COMPILED GRAPH で確認できる
5. 依存関係ツリーを実行する
- XXX@gcp-sa-dataform.iam.gserviceaccount.com のアカウントで実行されるので、BigQueryの権限を付与しておく
- Dataform>START EXECUTIONで実行する
- tagごとに実行することも可能
- Dataform>EXECUTIONSで結果を確認できる
Tips
サンプルスクリプト
-
Dataform コアサンプル スクリプトが参考になります
- 増分テーブルの作成など
typeの使い分け
主なファイル
-
dataform.json→workflow_settings.yaml(Dataform コア 3.0から変更)
- Dataform ワークフロー設定を構成するwarehouse: bigquery defaultProject: dev-431902 defaultLocation: asia-northeast1 defaultDataset: dataform defaultAssertionDataset: dataform_assertions dataformCoreVersion: 3.0.0-beta.4 vars: env: production
-
includes/constants.js
- Dataform に含まれる include 変数と関数を再利用する
- 下記例はプロジェクトIDだが、よく使うカラムなどを定数としてexportしたりするconst PROJECT_ID = "my_project_name"; module.exports = { PROJECT_ID };
config { type: "table" } SELECT * FROM ${constants.PROJECT_ID}.my_schema_name.my_table_name
Gitリポジトリ紐付け
- サードパーティの Git リポジトリに接続する
- デフォルトブランチや直PUSH禁止設定など
開発環境
- 開発者ごとの環境を準備する
- ワークスペース コンパイル オーバーライドを作成する
- スキーマの接尾辞などで制御する。ワークスペースごとのスキーマが作成できる
環境管理: dev→stg→prdのフロー
-
コード ライフサイクルの管理
- 下記または、環境ごとの変数が複数ある場合はvarでenvを設定してconstで環境ごとの値を設定する、または環境ごとの設定ファイルを用意するなどがある
-
①環境ごとの設定ファイルを準備
- まず、dataform.jsonファイルで環境ごとのプロジェクトIDを変数として定義します。この変数を使用して、環境に応じてプロジェクトIDを切り替えます。
- または、varsではenvだけ指定しておいて、constとかでプロジェクト名をenvから作る
{ "defaultSchema": "your_dataset_name", "assertionSchema": "dataform_assertions", "warehouse": "bigquery", "defaultDatabase": "dev-project-id", // デフォルトは開発環境 "defaultLocation": "your_region", "vars": { "project_id": "dev-project-id" // 環境ごとのプロジェクトIDを変数で管理 } }
-
②SQLXファイルで変数を使用
- SQLXファイル内で、プロジェクトIDを変数として参照します。これにより、環境ごとに異なるプロジェクトIDを使用できます。
config { type: "view", schema: "${dataform.projectConfig.vars.project_id}.your_dataset_name" } SELECT * FROM `${dataform.projectConfig.vars.project_id}.your_table_name`
-
③開発環境での開発
- 開発者は、Dataformの開発ワークスペースを使用して開発を行います。開発が完了したら、Gitを使用してコードを管理し、PR(プルリクエスト)を作成してレビューと承認を受けます。
-
④本番環境での実行
- 本番環境では、Workflowsを使用してDataformを実行します。Workflowsから実行する際に、コンパイル変数を使用してプロジェクトIDを本番用に上書きします。
CLIを使用する場合
dataform run --vars '{"project_id":"prod-project-id"}' # 本番環境用のプロジェクトIDを指定
- Workflowsからの実行
- Workflowsから実行する際には、Dataform APIを使用してコンパイル変数を設定します。APIリクエストでcompilationResults.createを呼び出し、CodeCompilationConfigでプロジェクトIDを指定します。
{ "workspace": "projects/your_project_id/locations/your_location/repositories/your_repository/workspaces/your_workspace", "codeCompilationConfig": { "vars": { "project_id": "prod-project-id" // 本番環境用のプロジェクトID } } }
- 本番環境では、Workflowsを使用してDataformを実行します。Workflowsから実行する際に、コンパイル変数を使用してプロジェクトIDを本番用に上書きします。
includes/function.js
- 他、テーブルサフィックスの取得などができる
// includes/function.js
/**
* Returns the current timestamp in the format 'YYYY-MM-DD HH:MM:SS'.
*/
function getCurrentTimestamp() {
const date = new Date();
const year = date.getFullYear();
const month = String(date.getMonth() + 1).padStart(2, '0');
const day = String(date.getDate()).padStart(2, '0');
const hours = String(date.getHours()).padStart(2, '0');
const minutes = String(date.getMinutes()).padStart(2, '0');
const seconds = String(date.getSeconds()).padStart(2, '0');
return `${year}-${month}-${day} ${hours}:${minutes}:${seconds}`;
}
/**
* Capitalizes the first letter of each word in a given string.
* @param {string} text - The text to be capitalized.
* @returns {string} - The capitalized text.
*/
function capitalizeWords(text) {
return text.replace(/\b\w/g, char => char.toUpperCase());
}
/**
* Safely retrieves a property from an object, returning a default value if the property is undefined.
* @param {object} obj - The object to retrieve the property from.
* @param {string} key - The key of the property to retrieve.
* @param {*} defaultValue - The default value to return if the property is undefined.
* @returns {*} - The value of the property or the default value.
*/
function getOrDefault(obj, key, defaultValue) {
return obj.hasOwnProperty(key) ? obj[key] : defaultValue;
}
/**
* Generates a random integer between min and max (inclusive).
* @param {number} min - The minimum value.
* @param {number} max - The maximum value.
* @returns {number} - The random integer.
*/
function getRandomInt(min, max) {
return Math.floor(Math.random() * (max - min + 1)) + min;
}
/**
* Generates a unique identifier (UUID v4).
* @returns {string} - The generated UUID.
*/
function generateUUID() {
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function (c) {
const r = (Math.random() * 16) | 0;
const v = c === 'x' ? r : (r & 0x3) | 0x8;
return v.toString(16);
});
}
module.exports = {
getCurrentTimestamp,
capitalizeWords,
getOrDefault,
getRandomInt,
generateUUID,
};
// my_table.sqlx
const { getCurrentTimestamp, capitalizeWords } = require('./includes/function.js');
config {
type: "table",
description: capitalizeWords("this table stores user data"),
}
select
user_id,
user_name,
created_at,
'${getCurrentTimestamp()}' as generated_at
from
source("raw_data", "users")
冪等性担保のためのデータ削除
- pre_operationsを使用して、冪等性を担保するためのデータ削除を行う例
// tables/my_table.sqlx
// 変数設定
const { target_date } = require('./includes/function.js');
// テーブルの設定
config {
type: "table",
description: "This table stores daily transaction data.",
pre_operations: [
`
DELETE FROM ${self()}
WHERE date = '${target_date()}'
`
]
}
// 新しいデータの挿入クエリ
select
user_id,
transaction_amount,
transaction_type,
date
from
source('raw_data', 'transactions')
where
date = '${target_date()}'
// includes/function.js
/**
* Returns the target date to be used in the pre_operations and the main query.
* You can set this dynamically based on your needs.
* @returns {string} - The target date in the format 'YYYY-MM-DD'.
*/
function target_date() {
// You can set this to any dynamic or static date as per your requirements
return "2024-08-09"; // Example static date, or use logic to determine date
}
module.exports = {
getCurrentTimestamp,
capitalizeWords,
getOrDefault,
getRandomInt,
generateUUID,
target_date, // Add this function to the module exports
};
Assertionのプロパティ
rowConditions
uniqueKeyCondition
dataFreshnessConditions
globalAssertionParams
fieldConditions
nonNullConditions
customSql
expression
assertionDependencies