🧩 実務で困ったこと
視聴ログなどの生ログから、ユーザー毎の週次の集計値を出したい場面がよくあります。
最初は生ログに対して直接、こんなクエリを書いていました:
SELECT
TIMESTAMP_TRUNC(created_at, WEEK(MONDAY), "Asia/Tokyo") AS week,
user_id,
COUNT(1) AS cnt,
FROM
events
WHERE
created_at BETWEEN {{FROM}} AND {{TO}}
GROUP BY
week,
user_id
しかし実務では次のような要望があります:
- 週次の指標をデータマート化しておきたい(ダッシュボードでサクサク見たい)
- 一方で、今週の途中経過も知りたい(週の途中でも数字を見たい)
- 毎回「全期間を再集計」すると BigQuery のスキャン量がつらい
「過去分はデータマートを見たいが、今週分は生ログから最新値を取りたい」という要件をどうやって両立させるかが課題でした。
💭 考えたこと
- 週次データマート(
weekly_user_statsみたいなテーブル)を作る - ただし 週の途中はそこにまだデータがない
- じゃあ「過去週はデータマート、当週は生ログから集計」すれば良いのでは?
ここで重要になるのが、集計ロジックをどこに持つか という点です。生ログ集計とデータマート作成でロジックを二重管理になると、メンテ時に整合性が取れず破綻します。
そこで、集計ロジックを TVF(テーブル関数)に寄せることにしました。
🧠 ラムダアーキテクチャをざっくり
ここで出てくるのが「ラムダアーキテクチャ」の考え方です。
- バッチレイヤ:過去分をまとめて集計したデータマート(重いけど正確)
- スピードレイヤ:最新の生ログからオンデマンドで集計(軽い・即時)
- サービングレイヤ:両者をよしなに合体して、1つのビューとして提供
今回やりたいことは、
過去の完結した週 → バッチ(データマート)
今週の途中経過 → スピード(生ログから集計)
を BigQuery の TVF でうまくラップする、という話です。
🧮 最終的に採用した方法
やったことは大きく3段階です:
- 生ログを集計する TVF を作る(日次 or 週次)
- その TVF を使って 週次データマートテーブルを作る
- 過去週はデータマート+当週は TVF を UNION する TVF を作る
これで、クエリ側は「範囲を指定して TVF を叩くだけ」でラムダアーキテクチャっぽい挙動を実現できます。
1. 生ログを週次集計する TVF
まずは生ログに対する集計ロジックを テーブル関数に寄せます。
CREATE OR REPLACE TABLE FUNCTION `project.mart.raw_weekly_user_events_stats_tvf`(
__from TIMESTAMP,
__to TIMESTAMP
) AS
SELECT
TIMESTAMP_TRUNC(created_at, WEEK(MONDAY), "Asia/Tokyo") AS week,
user_id,
COUNT(1) AS cnt,
FROM
events
WHERE
created_at BETWEEN __from AND __to
GROUP BY
week,
user_id
;
ポイント:
- ここに「週次とは何か(週の開始曜日など)」の定義も押し込んでおく
- あとからロジックを直したくなった時も、TVF だけ直せば済む
2. TVF を使って週次データマートを作る
次に、上の TVF を使って 週次データマートテーブルを作ります。
(スケジュールクエリなどで、毎日 or 週1回実行)
-- スケジュールクエリの実行時刻
DECLARE __run_at TIMESTAMP DEFAULT @run_time;
-- 実行時点の前週分
-- 実行日が 2025-11-17 10:00:00 JST (Mon) なら
-- __from... 2025-11-10 00:00:00 JST (Mon)
-- __to... 2025-11-16 23:59:59 JST (Sun)
DECLARE __to TIMESTAMP DEFAULT TIMESTAMP_SUB(TIMESTAMP_TRUNC(__run_at, WEEK(MONDAY), "Asia/Tokyo"), INTERVAL 1 SECOND);
DECLARE __from TIMESTAMP DEFAULT TIMESTAMP_TRUNC(__to, WEEK(MONDAY), "Asia/Tokyo");
-- レコード投入単位を識別するためのタイムスタンプ
DECLARE __target_date DATE DEFAULT DATE(__from, "Asia/Tokyo");
-- 処理を冪等にするための事前処理
-- 通常運用時は空振りになるが、入れ直し時には後段の INSERT による投入対象を先に消しておくことでレコード重複を防ぐ
DELETE
FROM
`project.mart.raw_weekly_user_events_stats`
WHERE
target_date = __target_date
;
INSERT `project.mart.raw_weekly_user_events_stats`
SELECT
week,
user_id,
cnt,
__target_date AS target_date,
FROM
`project.dataset.raw_weekly_user_events_stats_tvf`(
__from,
__to
);
3. 「過去週=テーブル」「当週=TVF」を統合する TVF
最後に、呼び出し側が使う TVF を作ります。
ここがラムダアーキテクチャ的な “サービングレイヤ” になります。
CREATE OR REPLACE TABLE FUNCTION `project.mart.weekly_user_events_stats_tvf`(
__from TIMESTAMP,
__to TIMESTAMP
)
AS (
WITH
-- バッチレイヤ:集計済の期間はマートから
batch_part AS (
SELECT
week,
user_id,
cnt,
FROM
`project.mart.raw_weekly_user_events_stats`
WHERE
week BETWEEN TIMESTAMP_TRUNC(__from, WEEK(MONDAY), "Asia/Tokyo") AND __to
),
-- スピードレイヤ:当週分は生ログから TVF で集計
speed_part AS (
SELECT
week,
user_id,
cnt,
FROM
-- 当週分のみ集計したいので、from は「今週開始日時」で固定
-- __to が当週内なら当週分が返り、前週以前なら空振りになる
`project.mart.raw_weekly_user_events_stats_tvf`(
TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), WEEK(MONDAY), "Asia/Tokyo"),
__to
)
)
SELECT * FROM batch_part
UNION ALL
SELECT * FROM speed_part
);
これで、利用側は:
SELECT
*
FROM
`project.mart.weekly_user_events_stats_tvf`(
TIMESTAMP("2025-03-01 00:00:00", "Asia/Tokyo"),
CURRENT_TIMESTAMP()
);
のように TVF を呼び出すだけで、
- 過去の完結した週 → データマート
- 今週の途中分 → 生ログからオンデマンド集計
がよしなに統合された結果を取得できます。
🧠 この構成でよかったところ
- 集計ロジックが TVF に一本化される
→ データマート作成も、当週分のオンデマンド集計も同じロジックを使い回せる - クエリを書く側は
→ 「どこまでがバッチで、どこからが生ログか」を意識しなくてよい - ラムダアーキテクチャっぽさを BigQuery ネイティブで表現できる