はじめに
本記事はQualiArts Advent Calender 2025の14日目の記事になります。
バックエンドエンジニアの水村です。
今回は、Google Dataformのベストプラクティスに従ってリポジトリを作成し、BigQueryのログを加工・集計して出力するまでを実装してみました。
概要
Dataformとは、BigQuery 上で行うデータ変換のワークフローを、開発・テスト・バージョン管理・スケジュール実行まで含めて扱うためのサービスです。データ加工は主にSQLXで記述し、Dataform内で管理するか、必要であればGitHubのリポジトリに接続してバージョン管理を行うことができます。
今回は公式ドキュメントの「リポジトリのベスト プラクティス」のリポジトリ構成に従い、架空のゲームのBigQueryテーブルで管理しているログを加工・集計して日次KPI分析用のデータを作成します。
使用するログテーブル
今回想定する架空ゲームのログテーブルを説明します。あくまで今回用に作成したものなので、厳密性などは排除しています。
login
ユーザーの初回ログイン情報を取得するログです。
ユーザーがログインした端末のプラットフォーム(iOS, Androidなど)や、アプリのバージョンも同時に収集します。
CREATE TABLE `test-project-xxxxxx.test_dataset.login` (
timestamp TIMESTAMP NOT NULL,
user_id STRING NOT NULL,
platform STRING NOT NULL,
app_version STRING NOT NULL
)
PARTITION BY TIMESTAMP_TRUNC(timestamp, DAY);
purchase
ユーザーの課金ログです。
購入ごとに一意のpurchase_idを割り振り、購入物や値段を集計します。
CREATE TABLE `test-project-xxxxxx.test_dataset.purchase` (
timestamp TIMESTAMP NOT NULL,
purchase_id STRING NOT NULL,
user_id STRING NOT NULL,
price INTEGER NOT NULL,
item_id STRING NOT NULL
)
PARTITION BY TIMESTAMP_TRUNC(timestamp, DAY);
mission
ユーザーのミッションの進捗を取得するログです。
100を進捗(progress)の最大として、達成・未達成に関わらず進捗が更新されたら収集します。
CREATE TABLE `test-project-xxxxxx.test_dataset.mission` (
timestamp TIMESTAMP NOT NULL,
user_id STRING NOT NULL,
mission_id STRING NOT NULL,
progress INTEGER NOT NULL
)
PARTITION BY TIMESTAMP_TRUNC(timestamp, DAY);
リポジトリ構成
今回Dataformで実装したリポジトリ構成は以下となります。
.
├── workflow_settings.yaml
├── definitions/
│ ├── sources/
│ │ ├── declarations.js
│ │ ├── view_login.sqlx
│ │ ├── view_mission.sqlx
│ │ └── view_purchase.sqlx
│ ├── intermediate/
│ │ ├── stg_login.sqlx
│ │ ├── stg_mission.sqlx
│ │ ├── stg_purchase.sqlx
│ │ └── stg_user_daily.sqlx
│ └── outputs/
│ └── kpi_daily.sqlx
└── includes/
└── fn_intermediate.js
definitions/sources
データソース宣言と、フィルタなどの基本変換を行うサブディレクトリです。
基本的には取得したログテーブルごとにビューを作成し、基本のフィルタ・整形を行う処理をSQLXにより記述して保存します。
definitions/intermediate
definitions/sources から取得したデータから、大きめの変換・集計を行います。
基本的にはDataformのワークフロー内でしか使用せず、分析時のBIツールなど追加のプロセスやツールには公開されないテーブルを保存します。
definitions/outputs
通常は definitions/intermediate を参照し、分析用のダッシュボードやレポートで使用する予定の最終成果物を保存します。
includes
SQLXファイルから参照されるJavaScriptファイルを保存します。
サブディレクトリを跨いで共通で使いたい関数や定数などはこちらで管理します。
構成手順
ワークフローの設定
Google Dataformに新規のリポジトリ・開発ワークスペースを作成し、初期化すると、workflow_settings.yaml が作成されます。こちらにワークフローで使用するプロジェクトやデータセット・リージョンなどを記載します。
こちらのファイル内の内容はJavaScriptからでも参照可能です(後述)
少々分かりにくいのですが、defaultDatasetはDataformが作るテーブルの出力先で、vars.datasetはテスト用のログテーブルが入っているデータセットになります。
defaultProject: test-project-xxxxxx
defaultDataset: dataform
defaultAssertionDataset: dataform_assertions
dataformCoreVersion: 3.0.35
defaultLocation: asia-northeast1
vars:
dataset: test_dataset
sourcesでビューテーブルの作成
早速SQLXを記述してビューテーブルの作成をしたいのですが、その前にデータソースの宣言をしておきます。
この処理は必須ではないですが、Dataform外のBigQueryのテーブルを宣言しておくと、Dataform内の他テーブルと同様に参照・依存関係を解決でき、グラフにも現れ、依存先の一括実行にも効いてきます。
'use strict';
const project = dataform.projectConfig.defaultDatabase;
const dataset = dataform.projectConfig.vars.dataset;
const sources = ["login", "mission", "purchase"];
sources.forEach((name) => {
declare({
database: project,
schema: dataset,
name
})
});
宣言したら、それぞれのテーブルのビューを作成していきます。
上記の宣言をすることにより、ref() で参照することが可能です。
また、本記事では 日付の基準を JST(Asia/Tokyo) に統一するため、日次化は DATE(timestamp, "Asia/Tokyo") を使用します。
-- login
config { type: "view" }
SELECT timestamp, user_id, platform, app_version
FROM ${ref("login")}
WHERE DATE(timestamp, "Asia/Tokyo") <= CURRENT_DATE("Asia/Tokyo")
-- mission
config { type: "view" }
SELECT timestamp, user_id, mission_id, progress
FROM ${ref("mission")}
WHERE DATE(timestamp, "Asia/Tokyo") <= CURRENT_DATE("Asia/Tokyo")
-- purchase
config { type: "view" }
SELECT timestamp, user_id, purchase_id, price, item_id
FROM ${ref("purchase")}
WHERE DATE(timestamp, "Asia/Tokyo") <= CURRENT_DATE("Asia/Tokyo")
intermediateで増分更新・結合・集計を行う
各ビューについて増分テーブルの作成
基本的にログは増え続けるので、Dataform の incremental を使うと増分テーブルとして作成することが有用です。
増分テーブルは初回のみゼロから作成しますが、その後の実装では、構成した条件に従って新しい行のみを増分テーブルに挿入・統合されるため、毎回再作成することがなくなります。
config {
type: "incremental",
uniqueKey: ["timestamp", "user_id"],
partitionBy: "DATE(timestamp)",
}
SELECT timestamp, user_id, platform, app_version
FROM ${ref("view_login")}
WHERE timestamp > timestamp_checkpoint
pre_operations {
DECLARE timestamp_checkpoint DEFAULT (
${when(incremental(),
`${fn_intermediate.renderCheckpoint(self(), true)}`,
`${fn_intermediate.renderCheckpoint(self(), false)}`,
)}
);
}
config {
type: "incremental",
uniqueKey: ["timestamp", "user_id", "mission_id", "progress"],
partitionBy: "DATE(timestamp)",
}
SELECT timestamp, user_id, mission_id, progress
FROM ${ref("view_mission")}
WHERE timestamp > timestamp_checkpoint
pre_operations {
DECLARE timestamp_checkpoint DEFAULT (
${when(incremental(),
`${fn_intermediate.renderCheckpoint(self(), true)}`,
`${fn_intermediate.renderCheckpoint(self(), false)}`,
)}
);
}
config {
type: "incremental",
uniqueKey: ["purchase_id"],
partitionBy: "DATE(timestamp)",
}
SELECT timestamp, user_id, purchase_id, price, item_id
FROM ${ref("view_purchase")}
WHERE timestamp > timestamp_checkpoint
pre_operations {
DECLARE timestamp_checkpoint DEFAULT (
${when(incremental(),
`${fn_intermediate.renderCheckpoint(self(), true)}`,
`${fn_intermediate.renderCheckpoint(self(), false)}`,
)}
);
}
incremental() は、増分・非増分の場合の処理をそれぞれ記載することができます。今回はfn_intermediate.jsに処理を記載しております。
テーブルが存在しない場合は、self() で参照してもエラーになるので、INIT_TIMESTAMP でデフォルトのtimestampを定義しておいて、増分の場合は最大のtimestampから取得するようにします。
const INIT_TIMESTAMP = `TIMESTAMP("2025-11-01", "Asia/Tokyo")`;
exports.renderCheckpoint = (table, is_incr) => {
if (is_incr) {
return `SELECT IFNULL(MAX(timestamp), ${INIT_TIMESTAMP}) FROM ${table}`;
} else {
return `SELECT ${INIT_TIMESTAMP}`;
}
};
ユーザーの日次データ
上記の増分テーブルのデータをもとに、ユーザーの日次データを集計していきます。
いきなり日次KPIを増分テーブルのデータから作るよりも、まずユーザーごとの日次データとして集計することで、指標の追加・変更が扱いやすくなります。
config {
type: "incremental",
uniqueKey: ["date", "user_id"],
partitionBy: "date",
columns: {
date: "日付",
user_id: "ユーザーID",
login_count: "当日のログイン回数",
is_active: "当日ログインが1件以上あるか",
mission_event_count: "当日のミッション進捗イベント数",
mission_completed_events: "当日の完了イベント数",
purchase_count: "当日の購入回数",
revenue: "当日の売上"
}
}
WITH
-- 日次ログイン
login_daily AS (
SELECT
DATE(timestamp, "Asia/Tokyo") AS date,
user_id,
COUNT(*) AS login_count
FROM ${ref("stg_login")}
WHERE DATE(timestamp, "Asia/Tokyo") > date_checkpoint
GROUP BY 1, 2),
-- 日次ミッション
mission_daily AS (
SELECT
DATE(timestamp, "Asia/Tokyo") AS date,
user_id,
COUNT(*) AS mission_event_count,
COUNTIF(SAFE_CAST(progress AS int64) >= 100) AS mission_completed_events
FROM ${ref("stg_mission")}
WHERE DATE(timestamp, "Asia/Tokyo") > date_checkpoint
GROUP BY 1, 2),
-- 日次購入
purchase_daily AS (
SELECT
DATE(timestamp, "Asia/Tokyo") AS date,
user_id,
COUNT(*) AS purchase_count,
SUM(SAFE_CAST(price AS int64)) AS revenue
FROM ${ref("stg_purchase")}
WHERE DATE(timestamp, "Asia/Tokyo") > date_checkpoint
GROUP BY 1, 2)
SELECT
COALESCE(l.date, m.date, p.date) AS date,
COALESCE(l.user_id, m.user_id, p.user_id) AS user_id,
COALESCE(l.login_count, 0) AS login_count,
COALESCE(l.login_count, 0) > 0 AS is_active,
COALESCE(m.mission_event_count, 0) AS mission_event_count,
COALESCE(m.mission_completed_events, 0) AS mission_completed_events,
COALESCE(p.purchase_count, 0) AS purchase_count,
COALESCE(p.revenue, 0) AS revenue
FROM login_daily l
FULL OUTER JOIN mission_daily m
USING (date, user_id)
FULL OUTER JOIN purchase_daily p
USING (date, user_id)
pre_operations {
DECLARE
date_checkpoint DEFAULT (
${when(incremental(),
`${fn_intermediate.renderDateCheckpoint(self(), true)}`,
`${fn_intermediate.renderDateCheckpoint(self(), false)}`,
)}
);
}
少し長いので column で説明を記載していますが、このデータにより、3テーブルを統合した日次データを集計できます。
renderDateCheckpointは上記のrenderCheckpointのDate版です。
今回のログでは、アクティブユーザーを「その日ログインしたか」に定義し、DAUを集計します。
outputsでKPIデータを出力
最後に、stg_user_dailyを集計し、日次KPIテーブルとして出力します。
config {
type: "incremental",
uniqueKey: ["date"],
partitionBy: "date",
columns: {
date: "日付",
dau: "日次アクティブユーザー数",
login_events: "当日のログイン行数合計",
mission_event_count: "当日のミッション進捗イベント数",
mission_completed_events: "当日のミッション完了イベント数",
mission_completed_users: "当日ミッション完了したユーザー数。",
revenue: "当日の売上",
purchase_count: "当日の購入回数",
paying_users: "当日の課金ユーザー数",
arppu: "ARPPU = revenue / paying_users",
arpdau: "ARPDAU = revenue / dau"
}
}
SELECT
date,
COUNT(DISTINCT IF (is_active, user_id, NULL)) AS dau,
SUM(login_count) AS login_events,
SUM(mission_event_count) AS mission_event_count,
SUM(mission_completed_events) AS mission_completed_events,
COUNT(DISTINCT IF (mission_completed_events > 0, user_id, NULL)) AS mission_completed_users,
SUM(revenue) AS revenue,
SUM(purchase_count) AS purchase_count,
COUNT(DISTINCT IF (revenue > 0, user_id, NULL)) AS paying_users,
SAFE_DIVIDE(SUM(revenue), COUNT(DISTINCT IF (revenue > 0, user_id, NULL))) AS arppu,
SAFE_DIVIDE(SUM(revenue), COUNT(DISTINCT IF (is_active, user_id, NULL))) AS arpdau
FROM ${ref("stg_user_daily")}
WHERE date > date_checkpoint
GROUP BY date
pre_operations {
DECLARE date_checkpoint DEFAULT (
${when(incremental(),
`${fn_intermediate.renderDateCheckpoint(self(), true)}`,
`${fn_intermediate.renderDateCheckpoint(self(), false)}`,
)}
);
}
これで準備ができたので、最後に、BigQueryテーブルにデータを入れて出力を確認してみましょう。
実行結果
生データ
INSERT INTO `test-project-xxxxxx.test_dataset.login`
(timestamp, user_id, platform, app_version)
VALUES
-- user_001: 初日ログイン → 翌日もログイン
(TIMESTAMP('2025-12-01 01:10:00', 'Asia/Tokyo'), 'user_001', 'ios', '1.0.0'),
(TIMESTAMP('2025-12-02 02:05:00', 'Asia/Tokyo'), 'user_001', 'ios', '1.0.0'),
-- user_002: 初日だけログイン
(TIMESTAMP('2025-12-01 03:20:00', 'Asia/Tokyo'), 'user_002', 'android', '1.0.0'),
-- user_003: 初日 → 7日後に再訪(D7)
(TIMESTAMP('2025-12-01 10:00:00', 'Asia/Tokyo'), 'user_003', 'ios', '1.0.1'),
(TIMESTAMP('2025-12-08 09:30:00', 'Asia/Tokyo'), 'user_003', 'ios', '1.0.1'),
-- user_004: 後日新規流入
(TIMESTAMP('2025-12-03 14:45:00', 'Asia/Tokyo'), 'user_004', 'android', '1.0.1');
INSERT INTO `test-project-xxxxxx.test_dataset.mission`
(timestamp, user_id, mission_id, progress)
VALUES
-- user_001: ミッションを順調に進行
(TIMESTAMP('2025-12-01 01:20:00', 'Asia/Tokyo'), 'user_001', 'mission_tutorial_01', 50),
(TIMESTAMP('2025-12-01 01:40:00', 'Asia/Tokyo'), 'user_001', 'mission_tutorial_01', 100),
-- user_002: ミッション途中で離脱
(TIMESTAMP('2025-12-01 03:30:00', 'Asia/Tokyo'), 'user_002', 'mission_tutorial_01', 30),
-- user_003: 別ミッションを完了
(TIMESTAMP('2025-12-01 10:20:00', 'Asia/Tokyo'), 'user_003', 'mission_daily_01', 100),
-- user_004: 新規ミッション開始のみ
(TIMESTAMP('2025-12-03 15:00:00', 'Asia/Tokyo'), 'user_004', 'mission_tutorial_01', 10);
INSERT INTO `test-project-xxxxxx.test_dataset.purchase`
(timestamp, user_id, purchase_id, price, item_id)
VALUES
-- user_001: 初日に課金
(TIMESTAMP('2025-12-01 02:00:00', 'Asia/Tokyo'), 'user_001', 'purchase_001', 1200, 'gem_pack_small'),
-- user_002: 課金なし
-- user_003: 初日と7日後に課金
(TIMESTAMP('2025-12-01 11:00:00', 'Asia/Tokyo'), 'user_003', 'purchase_002', 980, 'starter_pack'),
(TIMESTAMP('2025-12-08 10:15:00', 'Asia/Tokyo'), 'user_003', 'purchase_003', 3000, 'gem_pack_large');
-- user_004: 課金なし
これらのデータをテーブルに入れて実行してみます。
ワークフローの流れはDataformの「Compiled graph」から確認できるので、より複雑な構造でも視覚的にわかりやすく確認できます。
結果
Dataformを実行し、結果をBigQueryで出力した結果です。
まとめ
今回は、Google Dataformのベストプラクティスに従ってリポジトリを作成し、BigQueryのログを加工・集計して出力するまでを実装してみました。
さらなるアプローチとしては、intermediate/outputsテーブルを文書化してテストしたり、outputsテーブルへの追加・変更があった場合に強い構成にするなどが考えられます。

