GA4イベントテーブルのフルスキャンをやめ、コストを定量化したい
これまでは、データウェアハウス層構築のために、GA4からexportされる日次テーブルのフルスキャンを行っていました。
詳しくは以下の前回記事を参照ください。
ファーストステップとして冪等性だけを考慮し、何度実行しても結果が変わらない状態からスタートしました。
しかし、データ量の増加に合わせコストが増加する状態は好ましくありません。
そこで、線形的に増えていくコストを定量化することを目指しました。
改善対象のコード例です。(前回記事からの抜粋)
CREATE OR REPLACE TABLE `staging_{プロジェクト名}.page_view`
AS (
SELECT
...
FROM `{PROJECT_ID}.analytics_000000000.*` -- (NG) 全テーブルを総ナメ
)
累計日数分のデータを参照せずに、スキャン量を日数分にしたい
上記の方法で、仮に1日のスキャン容量を1GBとした場合を考えてみます。
1日目は1GBのスキャンを行い、2日目は、1日目の1GB と合わせ2GBのスキャンを行うことになります。
30日目 のスキャン量は30GBですが、30日間 の合計スキャン量は以下のようになります。
1GB + 2GB + 3GB + ... + 30GB = 465GB
2ヶ月目の合計スキャン量は、31GB + 32GB + ... + 60GB = 1365GB
となります。
これを 1日1GBのスキャンとし、30日間のデータであれば、以下の状態にしたいと考えました。
1GB + 1GB + ... + 1GB = 30GB
この方法であれば、2ヶ月目以降も 日数 * 1GB
の状態は変わりません。
GA4からのexportはデイリーで行われるように設定しているので、毎日、増えた1日分だけを参照して更新する方法で対応することでこれを達成したいと思います。
GA4からexoprtされた日次テーブルを取り込む
EXECUTE IMMEDIATE
を利用し、GA4からexportされた対象テーブルを動的に指定しています。
詳細は以下のドキュメントを参照ください。
データウェアハウス層構築時に参照した際に、1日分のデータ量を参照できるように日付でのパーティション分割を行っています。
event_dateのデータ型はexport時点では STRING
のため、CASTして DATE
に変換しています。
また、重複して更新されないように CASE
を利用した条件分岐を行っています。
このクエリ一つでいい感じにやるということも考えたのですが、ややこしくなりそうだったので、増加を想定する状態でない場合には ERROR
を利用して処理を停止させます。
問題が起きれば、都度対応としています。
config {
type: "operations",
schema: "source", -- データセット(データレイク層)
name: "ga4_events", -- テーブル名
hasOutput: true -- 依存関係用
}
-- 実行時の4日前を対象
-- 例)20230401テーブルは20230404まで更新されるため20230405に取り込む
DECLARE target_date DATE DEFAULT (
SELECT (CURRENT_DATE('+09') - 4)
);
-- sourceテーブルにある最新データの日付
DECLARE event_date_checkpoint DATE DEFAULT (
SELECT MAX(event_date) FROM ${self()}
);
CASE
WHEN (event_date_checkpoint = target_date) THEN
-- 既に登録されている日時のため処理を実行しない
WHEN ((event_date_checkpoint + 1) = target_date) THEN
EXECUTE IMMEDIATE FORMAT("""
INSERT INTO ${self()}
SELECT
CAST(event_date AS DATE FORMAT 'YYYYMMDD') AS event_date,
* EXCEPT(event_date)
FROM `analytics_000000000`.events_%s
""", FORMAT_DATE('%Y%m%d', target_date));
ELSE
SELECT ERROR('source.ga4_eventsテーブルの状態が正しくありません');
END CASE
エラー時には、元テーブルを作り直す
BQに以下のクエリを用意しておき、パイプライン処理が失敗した場合にはデータレイク層のテーブルを初期化できるようにしています。
-- 注意
-- フルスキャンを行うためクエリコストを確認すること
-- パイプラインがga4_eventsテーブルの状態により失敗した場合に実行する
CREATE OR REPLACE TABLE `source.ga4_events`
PARTITION BY event_date
CLUSTER BY event_name AS (
SELECT
CAST(event_date AS DATE FORMAT 'YYYYMMDD') AS event_date,
* EXCEPT(event_date)
FROM
`analytics_000000000.*`
WHERE CAST(event_date AS DATE FORMAT 'YYYYMMDD') <= (SELECT CURRENT_DATE('+09') - 4)
)
-- 注意
データレイク層のGA4イベントテーブルを参照し、増分更新を行う
type: incremental
として、増分更新を行います。
詳細は以下ドキュメントを参照ください。
SELECT MAX(event_date) FROM ${self()}
を checkpoint として、自身が保持している日付より新しいデータだけをスキャンして取り込みを行います。
また、ドキュメントの以下の記述を参考に pre_operations
ブロックで event_date_checkpoint 変数に保存しています。
When creating incremental tables from partitioned tables in BigQuery, some extra care needs to be taken to avoid full table scans, as BigQuery can't optimize where statements that are computed from inline select statements. To work around this, values used in the where clause should be moved to a pre_operations block and saved into a variable using BigQuery scripting.
(Google翻訳)
BigQuery でパーティション分割されたテーブルから増分テーブルを作成する場合、テーブル全体のスキャンを回避するために特別な注意を払う必要があります。これは、BigQuery がインラインの select ステートメントから計算される where ステートメントを最適化できないためです。 これを回避するには、where 句で使用される値を pre_operations ブロックに移動し、BigQuery スクリプトを使用して変数に保存する必要があります。
config {
type: "incremental",
schema: "staging", -- データセット(データウェアハウス層)
name: "page_view", -- テーブル名
}
pre_operations {
-- 2000-01-01は使用しないのでいつでも良い、未設定だと非増分時にコンパイルエラーになる
DECLARE event_date_checkpoint DATE DEFAULT (
${when(incremental(),
`SELECT MAX(event_date) FROM ${self()}`,
`SELECT DATE("2000-01-01")`)}
)
}
SELECT
event_name,
event_date, -- CAST済みのためデータ型はDATE
event_timestamp,
(SELECT value.string_value FROM UNNEST(event_params) WHERE key = 'page_location') AS page_location,
(SELECT value.string_value FROM UNNEST(event_params) WHERE key = 'page_referrer') AS page_referrer,
(SELECT value.int_value FROM UNNEST(event_params) WHERE key = 'ga_session_id') AS ga_session_id,
user_pseudo_id,
(SELECT value.string_value FROM UNNEST(event_params) WHERE key = 'source') AS source,
device.category AS device_category,
device.mobile_model_name AS device_mobile_model_name,
device.operating_system_version AS device_operating_system_version,
device.web_info.browser AS device_web_info_browser
FROM
${ref('ga4_events')} -- データレイク層との依存関係を宣言
${when(incremental(),
`WHERE event_date > event_date_checkpoint AND event_name = 'page_view'`,
`WHERE event_name = 'page_view'`)}
まとめ
これまで1回で 数百GB ~ 数TB のスキャンが必要だった更新処理を、1回数十GB の定量のスキャン量に減らすことができました。
さらに、これまではスキャン量が大きく週一回しか行っていなかった更新処理を毎日行えるようになりました。
引き続き、データ分析基盤の改善に取り組んで行きたいと思います。
最後まで読んでいただきありがとうございました。