3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

dbtAdvent Calendar 2024

Day 12

データソースの例外的な変更に対する dbt snapshot の挙動検証と復旧・検知方法の整理

Posted at

はじめに

dbt snapshot を使うと、プロダクト側の改修を行わずとも、データベースの履歴(スナップショット)を記録することができます。
参考)公式の紹介記事

スナップショットはSCDタイプ2という形式で記録されるため、日ごとにテーブル全体のコピーを取る選択肢と比べて、ストレージ費用やクエリ費用を大きく抑えることができます。
参考)WikipediaにあるSCDタイプ2の解説

一方で、履歴をプロダクトのデータベースではなく分析用のデータウェアハウスに蓄積するため、プロダクト側のエンジニアはデータ基盤側でテーブルの履歴が取られていることを知らない場合があります。そのため、データソースで例外的な変更が行われ、データ基盤にあるスナップショットがその変更に追従できず、履歴テーブルが壊れてしまうかもしれません。

また、dbt snapshot は変更履歴を同じテーブルに蓄積し続けるため、データソースのスキーマが大きく変わっても、古い履歴に対する後方互換性を保つ必要があります。
シャーディングテーブルのような、日毎の状態を異なるテーブルに保存する選択肢と比べると、dbt snapshot によるスナップショットはスキーマを維持する難易度が高いです。

そこで、データソースの例外的な変更に対して dbt snapshot がどこまで追従できるのかを検証し、復旧や検知方法を整理します。

設定

以下の操作では、データウェアハウスに BigQuery を使用します。
まず、スナップショットを取るためのデータソースを定義します。

create or replace table `ucchi_test` as (
  select 1 as user_id, "太郎" as user_name, 20 as user_age, current_timestamp() as updated_at
  union all
  select 2, "花子", 30, current_timestamp()
)

次に、dbt snapshot でスナップショットを取るための設定を定義します。

{% snapshot ucchi_test_snapshot %}
    {{
        config(
            strategy="timestamp",
            unique_key="user_id",
            updated_at="updated_at",
        )
    }}

    select *
    from `ucchi_test`
{% endsnapshot %}

dbt snapshot --select ucchi_test_snapshotコマンドを実行すると、以下のスナップショットが記録されます。
スクリーンショット 2024-12-09 0.14.48.png

データソースの変更

様々なパターンの変更に対して、dbt snapshot がどこまで追従できるのか確かめます。

値の更新

太郎さんの年齢を25歳に変更します。

update `ucchi_test`
set
  user_age = 25,
  updated_at = current_timestamp()
where user_id = 1

再度dbt snapshot --select ucchi_test_snapshotを実行すると、値の変更履歴が追加されました。
古い履歴の dbt_valid_toにはタイムスタンプが追加され、この履歴が現在有効でないことを表すようになりました。
スクリーンショット 2024-12-09 0.19.46.png

列の追加

性別を表すuser_sex列を追加します。

alter table `ucchi_test`
add column user_sex int64;

再度dbt snapshot --select ucchi_test_snapshotを実行すると、スナップショットにもuser_sex列が追加されました。
スクリーンショット 2024-12-09 0.23.37.png

列に値を追加

user_sex列に値を追加します。

update `ucchi_test`
set
  user_sex = 1,
  updated_at = current_timestamp()
where user_id = 1;

update `ucchi_test`
set
  user_sex = 2,
  updated_at = current_timestamp()
where user_id = 2;

再度dbt snapshot --select ucchi_test_snapshotを実行すると、user_sex列に値の入ったレコードが追加されました。
スクリーンショット 2024-12-09 0.26.46.png

列の改名

user_sex列をsexに改名します。

alter table `ucchi_test`
rename column user_sex to sex

再度dbt snapshot --select ucchi_test_snapshotを実行すると、古い列user_sexはそのままに、新しい列sexが追加されました。
dbt snapshot は、データソースの列名が変わった場合、改名する前の古い列は残した上で、改名した後の新しい列を追加するようです。
スクリーンショット 2024-12-09 0.30.59.png

しかし、改名した後の列sexの値が null になってしまっています。
dbt snapshot はスキーマ変更には追従しましたが、今回の設定ではupdated_atが更新されていないため、値の保存される列がuser_sexからsexに変わったことには追従できませんでした。

そこで、updated_at を更新してみます。

update `ucchi_test`
set
  updated_at = current_timestamp()
where true

再度dbt snapshot --select ucchi_test_snapshotを実行すると、値がuser_sex列からsex列に移動した履歴が追加されました。
スクリーンショット 2024-12-09 0.32.22.png

「列名が変わった」「updated_at 列の値は更新されない」という状況を検知・対応するには、以下の方法があります。

  • ① スナップショットの strategy をtimestampからcheckにする
  • ② 更新対象のSQL文で対象の列名を明示する
  • ③ スナップショットの最新の値と更新元のテーブルが一致することを確かめる

個人的には③で検知して手動対応するのがよいと考えています。①は dbt が推奨していませんし、②はデータソースの列追加に気付けない恐れがあるからです。
列名の変更があまりにも頻繁に起こる場合は、手動対応の手間を減らす観点から①を選ぶのもよいかもしれません。

列の型を変更

sex列の型を INT から STRING に変えてみます。

alter table `ucchi_test`
add column sex2 string;

update `ucchi_test`
set
  sex2 = cast(sex as string),
  updated_at = current_timestamp()
where true;

alter table `ucchi_test`
drop column sex;

alter table `ucchi_test`
rename column sex2 to sex;

sex列の種類が STRING に変わりました。

この状態で、再度dbt snapshotコマンドを実行するとエラーになってしまいます。

公式の記事に、このような場合の対処法が載っていました。以下の2つです。
https://discourse.getdbt.com/t/snapshots-when-column-data-type-changes/10452/2

  • ① スナップショットを保存する際に型を戻す。例えばsafe_cast(sex as int64) as sexとする
  • ② スナップショットテーブルの列の型を手動で変えておく

今回は②で対処します。

alter table `ucchi_test_snapshot`
rename column sex to sex_old;

alter table `ucchi_test_snapshot`
add column sex string;

update `ucchi_test_snapshot`
set sex = cast(sex_old as string)
where true;

alter table `ucchi_test_snapshot`
drop column sex_old;

dbt snapshotコマンドが通りました。スナップショットのsex列の種類が STRING に変わっていることが確認できます。
スクリーンショット 2024-12-09 1.05.27.png

古い列名に戻す

データソースのsex列をuser_sexに戻してみます。

alter table `ucchi_test`
add column user_sex int64;

update `ucchi_test`
set
  user_sex = cast(sex as int64),
  updated_at = current_timestamp()
where true;

alter table `ucchi_test`
drop column sex;

再度dbt snapshotコマンドを実行すると、元々あった列user_sexに再び値が記録されました。
スクリーンショット 2024-12-09 1.12.03.png

行を増やす

新しいユーザー「次郎さん」が増えたとします。

insert into `ucchi_test`
(user_id, user_name, user_age, user_sex, updated_at)
select 3, "次郎", 18, 1, current_timestamp()

再度dbt snapshotコマンドを実行すると、スナップショットにも次郎さんの履歴が記録されました。
スクリーンショット 2024-12-09 1.14.20.png

行を減らす

次郎さんの行を削除します。

delete from `ucchi_test`
where user_id = 3;

再度dbt snapshotコマンドを実行すると、次郎さんのログのdbt_valid_toに値が入りました。つまり、この履歴は現在有効でないと判定されるようになりました。
スクリーンショット 2024-12-09 1.16.32.png

復旧

スナップショットは何かしらの理由で壊れてしまうことがあります。例えば、壊れたデータソースに対してスナップショットを取ってしまったケースや、スナップショットの仕組み自体がうまく機能しない場合などです。

そこで、履歴のロールバックと、新しいテーブルへの履歴の引き継ぎを行います。

履歴をロールバックする

以下の手順で、履歴をロールバックできます。

  • ロールバックしたいタイムスタンプより dbt_valid_from が後の行を削除
  • ロールバックしたいタイムスタンプより dbt_valid_to が後の行における dbt_valid_to 列の値を null に置換する

例えば、同期元のテーブルが空になってしまったとします。

delete from `ucchi_test`
where true;

dbt snapshotコマンドを実行すると、dbt_valid_toに値が入ります。すべての履歴が現在有効でないことになってしまいました。
スクリーンショット 2024-12-09 1.26.35.png

異常が発生したタイムスタンプ(2024-12-08 16:25:52.849929 UTC)の前まで履歴をロールバックさせます。

-- 異常発生後に挿入された行を削除
delete from `ucchi_test_snapshot`
where dbt_valid_from >= timestamp("2024-12-08 16:25:52.849929 UTC");

-- 異常発生後に無効となった行を有効化
update `ucchi_test_snapshot`
set
  dbt_valid_to = null
where dbt_valid_to >= timestamp("2024-12-08 16:25:52.849929 UTC");

異常が発生する直前の履歴が再び有効になりました。履歴をロールバックできました。
スクリーンショット 2024-12-09 1.35.45.png

最後に、データソースに値を戻し、正常化させます。

insert into `ucchi_test`
(user_id, user_name, user_age, user_sex, updated_at)
select 1, "太郎", 25, 1, timestamp("2024-12-08 16:10:12.283461 UTC")
union all
select 2, "花子", 30, 2, timestamp("2024-12-08 16:10:12.283461 UTC")

この状態でdbt snapshotを実行しても、スナップショットに変化はありません。

履歴を新しいテーブルに引き継ぐ

スナップショットの向き先をucchi_test_snapshot2に変えてみます。

{% snapshot ucchi_test_snapshot2 %}
    {{
        config(
            strategy="timestamp",
            unique_key="user_id",
            updated_at="updated_at",
        )
    }}

    select *
    from ucchi_test
{% endsnapshot %}

dbt snapshotを実行すると、新しいスナップショットテーブルucchi_test_snapshot2が作成されます。
スクリーンショット 2024-12-09 11.29.16.png

このテーブルに、過去のスナップショット(ucchi_test_snapshot)を引き継がせます。

まず、古いスナップショットには過去存在した列sexがあるので、新しいスナップショットにも列を追加しておきます。

alter table `ucchi_test_snapshot2`
add column sex string

移行手順は以下です。

  • 履歴の結合
    • 過去のスナップショットから、特定のタイムスタンプまでの値を切り出す
      • dbt_valid_to が null のものは、特定のタイムスタンプで埋める
      • dbt_valid_to が特定のタイムスタンプより後のものも、特定のタイムスタンプで埋める
    • 新しいスナップショットの dbt_valid_from を特定のタイムスタンプにする
  • 結合前後のログを統合
    • 更新前後で値が全く変わっておらず、前の dbt_valid_to が後の dbt_valid_from であるログを発見する
    • 前の dbt_vaid_to を null にし、後のログを削除する
  • 新しいスナップショットを空にし、統合したログを挿入する

移行スクリプトを書きます。

-- 突合済みの履歴を作成
create temp table `rows_to_insert` as (
with old_snapshot as (
  select
    user_id, user_name, user_age, updated_at, user_sex, dbt_scd_id, dbt_updated_at, dbt_valid_from,
    case
      when dbt_valid_to is null then timestamp("2024-12-09 10:00:00", "Japan")
      when dbt_valid_to > timestamp("2024-12-09 10:00:00", "Japan") then timestamp("2024-12-09 10:00:00", "Japan")
      else dbt_valid_to
    end as dbt_valid_to, sex,
  from `ucchi_test_snapshot`
  where dbt_valid_from <= timestamp("2024-12-09 10:00:00", "Japan")
)
,
new_snapshot as (
  select
    user_id, user_name, user_age, updated_at, user_sex, dbt_scd_id, dbt_updated_at,
    case
      when dbt_valid_from < timestamp("2024-12-09 10:00:00", "Japan") then timestamp("2024-12-09 10:00:00", "Japan")
      else dbt_valid_from
    end as dbt_valid_from,
    dbt_valid_to, sex,
  from `ucchi_test_snapshot2`
)
,
unioned as (
  select * from old_snapshot
  union all select * from new_snapshot
)
,
unnecessary_rows as (
  select
    concat(u1.user_id, u1.dbt_valid_from) as before, -- 前の履歴は dbt_valid_to を null にする
    concat(u2.user_id, u2.dbt_valid_from) as after -- 後ろの履歴は削除
  from unioned as u1
  inner join unioned as u2
    on u1.user_id = u2.user_id -- 同じPK
    and u1.user_name = u2.user_name -- 属性値が同じ
    and u1.user_age = u2.user_age
    and u1.dbt_valid_to = u2.dbt_valid_from -- 1つ後ろの履歴と結合
    and u2.dbt_valid_to is null -- 後ろの履歴が最新版
)
,
merged as (
  select
    user_id, user_name, user_age, updated_at, user_sex, unioned.dbt_scd_id, dbt_updated_at, dbt_valid_from,
    if(before.pk is not null, null, dbt_valid_to) as dbt_valid_to, 
    sex,
  from unioned
  left join (select before as pk from unnecessary_rows) as before on concat(unioned.user_id, unioned.dbt_valid_from) = before.pk
  left join (select after as pk from unnecessary_rows) as after on concat(unioned.user_id, unioned.dbt_valid_from) = after.pk
  where
    after.pk is null
)
select * from merged order by user_id, dbt_valid_from
);

-- 新しいスナップショットから全ての行を削除
delete from `ucchi_test_snapshot2`
where true;

-- 突合済みの行を挿入
insert into `ucchi_test_snapshot2`(
  user_id, user_name, user_age, updated_at, user_sex, dbt_scd_id, dbt_updated_at, dbt_valid_from, dbt_valid_to, sex
)
select
  user_id, user_name, user_age, updated_at, user_sex, dbt_scd_id, dbt_updated_at, dbt_valid_from, dbt_valid_to, sex
from
  `rows_to_insert`;

履歴の引き継ぎが完了しました。現在有効でない古いログが追加されていることがわかります。
スクリーンショット 2024-12-09 15.09.13.png

検知

スナップショットテーブルが壊れていないことを定常的に観測することはとても重要です。
そこで、いくつかの基本的な項目について、異常がないか調べるクエリを書きます。

クエリの実行結果が1行でも存在していれば、異常な行があることになります。

PKが一意になっているか

select
  dbt_scd_id, count(*) as count_log
from
  `ucchi_test_snapshot2`
group by
  dbt_scd_id
having
  count_log > 1 -- PKに対する重複を検知

有効期間に隙間や重複がないか

with ordered_periods as (
  select
    user_id,
    dbt_valid_from,
    dbt_valid_to,
    lead(dbt_valid_from) over (partition by user_id order by dbt_valid_from) as next_valid_from
  from
    `ucchi_test_snapshot2`
)
select *
from ordered_periods
where
  dbt_valid_to != next_valid_from -- 有効期間同士の間の隙間や重複を検知

結論

dbt snapshot は、「列名は変わるが値のupdated_atはそのまま」という場合を除いて、データソースの変更に追従できることがわかりました。
また、障害時には復旧作業が行えることを確認しました。スナップショットの履歴をロールバックしたり、古い履歴を新しい履歴に引き継いだりすることができました。

おわりに

dbt snapshot は思ったより例外的な変更にちゃんと追従してくれました。データウェアハウス上で仕組みが完結しているため、最悪手動でテーブルを整えればなんとかなるのもいいですね。
社内分析用途でテーブルの変更履歴が欲しい場合は、選択肢の1つとして覚えておくとよいでしょう。

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?