1. はじめに
株式会社アイスタイル、データ分析システム部の今井です。
アイスタイルでは現在、データ分析基盤のReplaceプロジェクトとして、Embulkを利用したバッチ処理からDatastream/Airbyteを利用した取込みの改善を行っています。
この改善に伴い、過去のデータ履歴を保持するレイヤの構築が可能となりました(改善前も履歴保持をするテーブルはありましたが、限定的でした)。
大規模データの効率的な処理を目指し、dbtのincrementalモデルを採用しています。戦略面では、dbt標準のmergeではなく、BigQueryの特性を活かせるinsert_overwriteとcopy_partitionsオプションを導入しています。この構成により、履歴保持を行っている層でのクエリコストの最適化を実現することができました。
本記事では、dbtのincremental戦略の全体像を整理した上で、アイスタイルのスキャンコスト対策と、Airflowの運用と組み合わせた実際のコード設計について記述します。
2. dbt incremental の基本概念
dbtの Incrementalモデル とは、変換処理において「差分のみを更新対象とする」仕組みです。
デフォルトのテーブル構築では、実行のたびにデータを全入れ替えするため、規模の拡大に伴い処理時間とBigQueryのスキャンコストが膨れ上がります。
こうしたフルスキャンの非効率を排除し、大規模データを低コストかつ高速に処理するために導入されるのが Incrementalモデルです。
3. dbt incremental 戦略の種類一覧
dbtでは、抽出した差分データをターゲットテーブルへ反映する手法として、複数の incremental_strategy が用意されています。使用しているDWHやデータの特性・コスト要件に応じて、最適な戦略を選択することが重要です。
| strategy | 概要・仕組み | メリット | デメリット |
|---|---|---|---|
| merge | ユニークキーで照合し、既存なら更新、新規なら挿入する(Upsert)。 | Upsert(更新・挿入)が簡単に実現でき、ロジックがシンプルになる。 | ターゲットテーブルの全スキャンが発生しやすく、データ量に応じて課金が高額になる。 |
| append | 既存テーブルの末尾にデータを単純追加(INSERT)のみ行う。 | 重複排除等の処理がないため、負荷が非常に軽く最速で終わる。 | 何度も実行すると同じデータが増え続けるため、単純なログ収集以外では使いづらい。 |
| delete+insert | ユニークキーで既存データを削除(DELETE)した後、新データを挿入する。 |
MERGE 文が非対応、または最適化されていないDWHでも有用。 |
大きいテーブルだと非効率になりやすい |
| insert_overwrite | 変更対象のパーティション単位でデータを丸ごと差し替える。 | BigQueryの copy_partitions と併用することで、スキャン料金を抑えられる。 |
パーティション設計が必須。遅延データ等でパーティションがズレるとデータ消失のリスクがある。 |
| microbatch | イベント時間に基づいて処理対象を小分けにし、分割実行する。 | 失敗したバッチのみを独立して再実行(リトライ)できる。 |
event_time の設定が必要。バッチサイズのチューニング等の構成がやや複雑になる。 |
4. アイスタイルのクレンジング層で採用している insert_overwrite
アイスタイルのデータ分析基盤のクレンジング層では insert_overwrite を採用しています。mergeと比べてコストが安くなるのが主な理由です。
insert_overwrite + copy_partitions
今回のPJTでは、insert_overwrite戦略に加えて、dbt-bigqueryアダプタのオプションである copy_partitions: trueを採用しました。
insert_overwriteにより更新対象パーティションを差し替えます。さらにcopy_partitions: trueを有効にすることで、BigQueryのパーティションコピーを活用し、より効率的に反映できます。
{{
config(
materialized = "incremental",
incremental_strategy = "insert_overwrite",
partition_by = {
"field": "change_event_datetime_jst",
"data_type": "datetime",
"granularity": "day",
"copy_partitions": true
},
require_partition_filter = true
)
}}
5. incremental で事故る典型パターン
incrementalモデルは、正しく設定すれば強力ですが、一歩間違えると「データの不整合」を引き起こします。ここでは、戦略を問わず陥りやすい典型的な事故パターンを紹介します。
1. 削除イベントの取りこぼし
incrementalモデルは基本的に「新しく来たデータ」を積み上げるのが得意ですが、ソース側でデータが「物理削除」された場合、その変更を検知することはできません。
- 事故: ソースでレコードが削除されたが、incrementalモデル側には古いデータが残り続け、集計値が合わなくなる。
- 対策: DatastreamなどのCDCを利用し、「削除フラグ」や「削除イベント」そのものをログとして受け取って処理する、あるいは定期的にFull Refreshを実行して同期をとる必要がある
2. スキーマ変更による停止
運用中にソーステーブルにカラムが追加・削除された場合、incrementalモデルはデフォルトでは新しいカラムを取り込みません。
対策として「on_schema_change」設定を導入し、ソース側のカラム追加や削除をターゲットテーブルにどう同期するか定義することで、スキーマ変更によるエラーを防ぐことができます。
6. insert_overwriteとAirflow前提にした運用上の工夫
アイスタイルでは、パイプラインの管理にCloud Composerを採用しています。Datastreamで抽出されたrawデータを、Cloud Composer上のdbtジョブでBigQueryの各レイヤーへ加工・転送する流れです。
insert_overwrite戦略の強みは、パーティション単位でデータを安全に差し替えられる点にあります。この特性を活かすため、運用ではairflowから渡される変数をvar('ts')として活用し、抽出ロジックを以下のように共通化しています。
- _partitiontime によるスキャン範囲の絞り込み
- source_timestamp によるデータの特定
{% if is_incremental() %}
where
date(_partitiontime, "Asia/Tokyo") between date_sub(date("{{ var('ts') }}", "Asia/Tokyo"), interval 1 day) and date("{{ var('ts') }}", "Asia/Tokyo")
and date(timestamp_millis(r.datastream_metadata.source_timestamp), "Asia/Tokyo") = date("{{ var('ts') }}", "Asia/Tokyo")
{% endif %}
-
_partitiontimeによるスキャン範囲の絞り込み: _partitiontimeを利用し、物理的なスキャン範囲を「対象日の前後」に絞り込んでいます。これにより、日跨ぎのデータもしっかりとフィルタリングの対象に含められるようにしています -
source_timestampによるデータの特定: Datastreamのメタデータが持つ「ソースDBでの発生時刻」を参照し、特定の1日分(var('ts'))のデータのみを抽出します
この組み合わせにより、過去の特定日付にデータ不備が見つかった際も、Airflowからその日付でバックフィルするだけで、他の期間に影響を与えずに安全にデータを修復できる体制を構築しました。
7. まとめ
insert_overwrite の導入により、履歴保持レイヤにおける処理効率は劇的に向上しました。
一方で、履歴から最新状態を切り出す後続工程のスキャンコスト増大が次の課題です。今回の知見を活かし、最適なアーキテクチャの実現を目指していきたいと思います。
8.さいごに
今回ご紹介した取り組みは、アイスタイルが目指すデータ分析基盤刷新のほんの一部です。私たちは今後もデー分析タ基盤の進化に向けて前進していきたいと思います。
このような環境で、データ分析基盤の進化を共に取り組んでいただける方をお待ちしています。現在募集中のポジションは以下の通りです。カジュアル面談も実施しているので、「興味があるよ」という方は採用ページからご応募ください!
【istyle】プロジェクトマネージャー/大規模データ分析基盤領域
【istyle】テクニカルリーダー候補/データ基盤インフラ領域
【istyle】データモデリングリード候補/データ基盤モデリング領域