12
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

dbtによる差分更新のいくつかのアプローチ

Last updated at Posted at 2022-12-25

本記事では dbt の incremental モデルによる差分更新の実現方式をいくつか述べたいと思います。
ただし、あくまで概念コードであり、細かいケースや運用のしやすさは簡易のために除いていますので、その点はご了承ください。

1. 題材とする処理

日次バッチで sales_detail(売上詳細)テーブルから sales_summary(売上サマリー)テーブルを作成することを考えます。

01.png

集計方法については以下とします。

  • sales_datetime を日付単位に切り捨てたもの(= sales_date)と product_id ごとに集計します。
  • total_sales_quantitysales_quantity の合計とします。
  • last_update_datetimeupdate_datetime の最新 (max) とします。

いくつかの前提を置きます。

  • sales_detail テーブルのレコードは sales_datetime の順序通りに挿入されるわけではない。(いわゆる Late Arriving Data を許容する)
  • sales_detail テーブルのレコードの更新/物理削除は許容しない(これについては、どういう影響があるかは後述します)

2. 実装方法

2-1. 日付単位の洗い替え

dbt で実現するには一番オーソドックスな方法で、dbt の以下のドキュメントにもサンプルが示されている方法です。

ただし、このドキュメントのサンプルは Late Arriving Data を想定していないので、少し修正します。具体的には、過去に取り込んだ分の前日分までの Late Arriving Data を許容するために (select max(sales_date) from {{ this }}) - 1 としています(もし Late Arriving Data がより遡って発生する場合は - 1 の部分を調整するか、update_datetime の値に基づいて動的に変更します)。

{{
    config (
        materialized = 'incremental',
        unique_key = 'sales_date'
    )
}}
select
    cast(sales_datetime as date) sales_date,
    product_id,
    sum(sales_quantity) total_sales_quantity,
    max(update_datetime) last_update_datetime
from
    sales_detail
{% if is_incremental() %}
where
    sales_datetime >=
        cast((select max(sales_date) from {{ this }}) - 1 as timestamp)
{% endif %}
group by
    sales_date,
    product_id

この方法はとてもシンプルで概ね動作するのですが、以下のデメリットがあります。

  • sales_detail テーブルのレコードが物理削除された結果として1日のデータが丸っと存在しなくなった場合、sales_summary テーブルにはその削除が反映されない。
    • そんなことは滅多にないように思えますが、上流システムから間違ったデータが連携されてしまい、ある程度の範囲のデータを削除しないといけないなどイレギュラーな運用対応時に問題になることはあり得ます。
  • Late Arriving Dataが実際にはごく少数でも日単位での洗い替えになる。

(余談ですが、sales_date は別にユニークキーではないのに unique_key = 'sales_date' という設定名なのはどうかと思うのですが…)

2-2. 追加データ分を加算

dbt を利用せずに merge 文で実現しようとした場合によくあるアプローチで、新しく追加されたデータに対して、

  • 同じキーのレコードが sales_summary テーブルに存在しなければ、そのまま挿入
  • 同じキーのレコードが sales_summary テーブルに存在すれば、該当レコードの total_sales_quantity に追加データ分を加算する

というアプローチです。

ただ、dbt の incremental モデルは「存在しなければ挿入 / 存在すれば上書き更新」が基本となっており、上述のように挿入/更新時のロジックが異なる場合は少し手間を加える必要があります。アイディアはこちらもドキュメントに記載があるのですが、挿入分/更新分のデータを事前に CTE で作成しておくという方法で実現できます。

  • まず、diff という CTE で新規追加レコードを抽出します。
  • diff と対象テーブルを突き合わせて更新用データを表す CTE updates を作成します。
    • ここで加算のロジックを含めます。
    • 初回実行時はこの部分はないので、全体を {% if is_incremental() %} で囲います。
  • また、diff と太守テーブルを突き合わせて挿入用データを表す CTE inserts を作成します。
  • 最後に updatesinserts を UNION ALL して返します。
    • uniqeu_key には sales_dateproduct_id を文字列連結したもの(summary_key)を指定します。(dbt は unique_key で複数カラムを指定できません)
{{
    config (
        materialized = 'incremental',
        unique_key = 'summary_key'
    )
}}

with
diff as (
    select
        cast(sales_datetime as date) sales_date,
        product_id,
        sum(sales_quantity) total_sales_quantity,
        max(update_datetime) last_update_datetime
    from
        sales_detail
{% if is_incremental() %}
    where
        update_datetime > (select max(last_update_datetime) from {{ this }})
{% endif %}
    group by
        sales_date,
    product_id
),

{% if is_incremental() %}
updates as (
    select
        d.sales_date,
        d.product_id,
    d.total_sales_quantity + t.total_sales_quantity total_sales_quantity,
    d.last_update_datetime
    from
        diff d
    inner join
        {{ this }} t
    on (
        d.sales_date = t.sales_date
    and
        d.product_id = t.product_id
    )
),
{% endif %}

inserts as (
    select
        d.sales_date,
        d.product_id,
    d.total_sales_quantity,
    d.last_update_datetime
    from
        diff d
{% if is_incremental() %}
    where
        not exists (
            select
            1
            from
                {{ this }} t
            where
            t.sales_date = d.sales_date
            and
            t.product_id = d.product_id
        )
{% endif %}
)

select
    sales_date,
    product_id,
    total_sales_quantity,
    last_update_datetime,
    (sales_date || '-' || product_id) summary_key
from
    inserts
{% if is_incremental() %}
union all
select
    sales_date,
    product_id,
    total_sales_quantity,
    last_update_datetime,
    (sales_date || '-' || product_id) summary_key
from
    updates
{% endif %}

このアプローチは、更新すべきデータ量が少ない場合はほぼ挿入になるのでメリットがあります。一方で以下のようなデメリットがあります。

  • 更新すべきデータが多いと処理性能が低下しやすい。
  • レコード単位の更新のため、対象テーブルデータの圧縮率やクラスタリングファクターに悪影響を与えやすい。
  • sales_detail テーブルのレコードの更新/物理削除には対応できない。
  • コードが長い

2-3. 範囲洗い替え

sales_detail テーブルのデータに依存せずに、直近 N 日のデータを洗い替えるというアプローチです。

これを実現するためには、処理の前段階として sales_summary テーブルの直近 N 日のデータを削除する必要があるのですが、これをマクロで実現します。

具体的に以下のマクロを準備します。内容は is_incremental()true の場合のみ、対象テーブルから直近 N 日のデータを削除するマクロです。今回は N = 2(つまり最新のその前日分)とするため max - 1 としています。

{% macro delete_recent_sales_summary() %}
    {% if is_incremental() %}
        delete from sales_summary
        where
            sales_date >= 
                (select max(sales_date) from sales_summary) - 1
    {% endif %}
{% endmacro %}

このマクロを利用して範囲洗い替えを実現する incremental モデルは以下になります。

  • pre_hook で先のマクロを処理開始前に呼び出す。
  • unique_key は今回は不要(更新なし挿入のみ)
  • モデルに含まれるクエリの実行時には既に sales_summary の直近 N 日のデータは消されてるので、その翌日分 (+1) 以降をフィルダー条件としています。
{{
    config (
        materialized = 'incremental',
        pre_hook = '{{ delete_recent_sales_summary() }}'
    )
}}
select
    cast(sales_datetime as date) sales_date,
    product_id,
    sum(sales_quantity) total_sales_quantity,
    max(update_datetime) last_update_datetime
from
    sales_detail
{% if is_incremental() %}
where
    sales_datetime >=
        cast((select max(sales_date) from {{ this }}) + 1 as timestamp)
{% endif %}
group by
    sales_date,
    product_id

この方法は、以下のメリットがあります。

  • 直近 N 日の範囲であれば、sales_detail テーブルのレコードの更新/物理削除にも対応できる。
  • 範囲削除 & 挿入のみなので、対象テーブルデータの圧縮率やクラスタリングファクターの観点でも好ましい。

一方でデメリットは以下になります。

  • 洗い替えするデータ量はここまで見てきた方法の中で一番大きい。
  • dbt らしいやり方ではない?

この方法は以下の内容を参考にしています。

3. まとめ

今回、以下の差分更新の方法を dbt でどう実現するのかを試してみました。

  • 日単位での洗い替え
  • 追加データ分を加算
  • 範囲洗い替え

個人的には、DWH の性能が向上していた近年では Late Arriving Data 対応やイレギュラーなレコード更新/削除の対応、誤データ混入からのリカバリなどをシンプルにしたいと思っているので、3番目の方法が好きです。(全件洗い替えが許容できればそれが一番ですが、自分が関わる案件はデータ保持期間が結構長いものが多いので)

ただし、dbt で実現するとマクロやフックを使う必要があるので、それが分かりやすいか?メンテしやすいか?というとあまり自信はないです。

このあたり、皆さんは実際どうされているのでしょうか?

12
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?