本記事では dbt の incremental モデルによる差分更新の実現方式をいくつか述べたいと思います。
ただし、あくまで概念コードであり、細かいケースや運用のしやすさは簡易のために除いていますので、その点はご了承ください。
1. 題材とする処理
日次バッチで sales_detail
(売上詳細)テーブルから sales_summary
(売上サマリー)テーブルを作成することを考えます。
集計方法については以下とします。
-
sales_datetime
を日付単位に切り捨てたもの(=sales_date
)とproduct_id
ごとに集計します。 -
total_sales_quantity
はsales_quantity
の合計とします。 -
last_update_datetime
はupdate_datetime
の最新 (max) とします。
いくつかの前提を置きます。
-
sales_detail
テーブルのレコードはsales_datetime
の順序通りに挿入されるわけではない。(いわゆる Late Arriving Data を許容する) -
sales_detail
テーブルのレコードの更新/物理削除は許容しない(これについては、どういう影響があるかは後述します)
2. 実装方法
2-1. 日付単位の洗い替え
dbt で実現するには一番オーソドックスな方法で、dbt の以下のドキュメントにもサンプルが示されている方法です。
ただし、このドキュメントのサンプルは Late Arriving Data を想定していないので、少し修正します。具体的には、過去に取り込んだ分の前日分までの Late Arriving Data を許容するために (select max(sales_date) from {{ this }}) - 1
としています(もし Late Arriving Data がより遡って発生する場合は - 1
の部分を調整するか、update_datetime
の値に基づいて動的に変更します)。
{{
config (
materialized = 'incremental',
unique_key = 'sales_date'
)
}}
select
cast(sales_datetime as date) sales_date,
product_id,
sum(sales_quantity) total_sales_quantity,
max(update_datetime) last_update_datetime
from
sales_detail
{% if is_incremental() %}
where
sales_datetime >=
cast((select max(sales_date) from {{ this }}) - 1 as timestamp)
{% endif %}
group by
sales_date,
product_id
この方法はとてもシンプルで概ね動作するのですが、以下のデメリットがあります。
-
sales_detail
テーブルのレコードが物理削除された結果として1日のデータが丸っと存在しなくなった場合、sales_summary
テーブルにはその削除が反映されない。- そんなことは滅多にないように思えますが、上流システムから間違ったデータが連携されてしまい、ある程度の範囲のデータを削除しないといけないなどイレギュラーな運用対応時に問題になることはあり得ます。
- Late Arriving Dataが実際にはごく少数でも日単位での洗い替えになる。
(余談ですが、sales_date
は別にユニークキーではないのに unique_key = 'sales_date'
という設定名なのはどうかと思うのですが…)
2-2. 追加データ分を加算
dbt を利用せずに merge 文で実現しようとした場合によくあるアプローチで、新しく追加されたデータに対して、
- 同じキーのレコードが
sales_summary
テーブルに存在しなければ、そのまま挿入 - 同じキーのレコードが
sales_summary
テーブルに存在すれば、該当レコードのtotal_sales_quantity
に追加データ分を加算する
というアプローチです。
ただ、dbt の incremental モデルは「存在しなければ挿入 / 存在すれば上書き更新」が基本となっており、上述のように挿入/更新時のロジックが異なる場合は少し手間を加える必要があります。アイディアはこちらもドキュメントに記載があるのですが、挿入分/更新分のデータを事前に CTE で作成しておくという方法で実現できます。
- まず、
diff
という CTE で新規追加レコードを抽出します。 -
diff
と対象テーブルを突き合わせて更新用データを表す CTEupdates
を作成します。- ここで加算のロジックを含めます。
- 初回実行時はこの部分はないので、全体を
{% if is_incremental() %}
で囲います。
- また、
diff
と太守テーブルを突き合わせて挿入用データを表す CTEinserts
を作成します。 - 最後に
updates
とinserts
を UNION ALL して返します。-
uniqeu_key
にはsales_date
とproduct_id
を文字列連結したもの(summary_key
)を指定します。(dbt はunique_key
で複数カラムを指定できません)
-
{{
config (
materialized = 'incremental',
unique_key = 'summary_key'
)
}}
with
diff as (
select
cast(sales_datetime as date) sales_date,
product_id,
sum(sales_quantity) total_sales_quantity,
max(update_datetime) last_update_datetime
from
sales_detail
{% if is_incremental() %}
where
update_datetime > (select max(last_update_datetime) from {{ this }})
{% endif %}
group by
sales_date,
product_id
),
{% if is_incremental() %}
updates as (
select
d.sales_date,
d.product_id,
d.total_sales_quantity + t.total_sales_quantity total_sales_quantity,
d.last_update_datetime
from
diff d
inner join
{{ this }} t
on (
d.sales_date = t.sales_date
and
d.product_id = t.product_id
)
),
{% endif %}
inserts as (
select
d.sales_date,
d.product_id,
d.total_sales_quantity,
d.last_update_datetime
from
diff d
{% if is_incremental() %}
where
not exists (
select
1
from
{{ this }} t
where
t.sales_date = d.sales_date
and
t.product_id = d.product_id
)
{% endif %}
)
select
sales_date,
product_id,
total_sales_quantity,
last_update_datetime,
(sales_date || '-' || product_id) summary_key
from
inserts
{% if is_incremental() %}
union all
select
sales_date,
product_id,
total_sales_quantity,
last_update_datetime,
(sales_date || '-' || product_id) summary_key
from
updates
{% endif %}
このアプローチは、更新すべきデータ量が少ない場合はほぼ挿入になるのでメリットがあります。一方で以下のようなデメリットがあります。
- 更新すべきデータが多いと処理性能が低下しやすい。
- レコード単位の更新のため、対象テーブルデータの圧縮率やクラスタリングファクターに悪影響を与えやすい。
-
sales_detail
テーブルのレコードの更新/物理削除には対応できない。 - コードが長い
2-3. 範囲洗い替え
sales_detail
テーブルのデータに依存せずに、直近 N 日のデータを洗い替えるというアプローチです。
これを実現するためには、処理の前段階として sales_summary
テーブルの直近 N 日のデータを削除する必要があるのですが、これをマクロで実現します。
具体的に以下のマクロを準備します。内容は is_incremental()
が true
の場合のみ、対象テーブルから直近 N 日のデータを削除するマクロです。今回は N = 2(つまり最新のその前日分)とするため max - 1 としています。
{% macro delete_recent_sales_summary() %}
{% if is_incremental() %}
delete from sales_summary
where
sales_date >=
(select max(sales_date) from sales_summary) - 1
{% endif %}
{% endmacro %}
このマクロを利用して範囲洗い替えを実現する incremental モデルは以下になります。
- pre_hook で先のマクロを処理開始前に呼び出す。
- unique_key は今回は不要(更新なし挿入のみ)
- モデルに含まれるクエリの実行時には既に
sales_summary
の直近 N 日のデータは消されてるので、その翌日分 (+1) 以降をフィルダー条件としています。
{{
config (
materialized = 'incremental',
pre_hook = '{{ delete_recent_sales_summary() }}'
)
}}
select
cast(sales_datetime as date) sales_date,
product_id,
sum(sales_quantity) total_sales_quantity,
max(update_datetime) last_update_datetime
from
sales_detail
{% if is_incremental() %}
where
sales_datetime >=
cast((select max(sales_date) from {{ this }}) + 1 as timestamp)
{% endif %}
group by
sales_date,
product_id
この方法は、以下のメリットがあります。
- 直近 N 日の範囲であれば、
sales_detail
テーブルのレコードの更新/物理削除にも対応できる。 - 範囲削除 & 挿入のみなので、対象テーブルデータの圧縮率やクラスタリングファクターの観点でも好ましい。
一方でデメリットは以下になります。
- 洗い替えするデータ量はここまで見てきた方法の中で一番大きい。
- dbt らしいやり方ではない?
この方法は以下の内容を参考にしています。
3. まとめ
今回、以下の差分更新の方法を dbt でどう実現するのかを試してみました。
- 日単位での洗い替え
- 追加データ分を加算
- 範囲洗い替え
個人的には、DWH の性能が向上していた近年では Late Arriving Data 対応やイレギュラーなレコード更新/削除の対応、誤データ混入からのリカバリなどをシンプルにしたいと思っているので、3番目の方法が好きです。(全件洗い替えが許容できればそれが一番ですが、自分が関わる案件はデータ保持期間が結構長いものが多いので)
ただし、dbt で実現するとマクロやフックを使う必要があるので、それが分かりやすいか?メンテしやすいか?というとあまり自信はないです。
このあたり、皆さんは実際どうされているのでしょうか?