はじめに
dbt snapshot を使うと、プロダクト側の改修を行わずとも、データベースの履歴(スナップショット)を記録することができます。
参考)公式の紹介記事
スナップショットはSCDタイプ2という形式で記録されるため、日ごとにテーブル全体のコピーを取る選択肢と比べて、ストレージ費用やクエリ費用を大きく抑えることができます。
参考)WikipediaにあるSCDタイプ2の解説
一方で、履歴をプロダクトのデータベースではなく分析用のデータウェアハウスに蓄積するため、プロダクト側のエンジニアはデータ基盤側でテーブルの履歴が取られていることを知らない場合があります。そのため、データソースで例外的な変更が行われ、データ基盤にあるスナップショットがその変更に追従できず、履歴テーブルが壊れてしまうかもしれません。
また、dbt snapshot は変更履歴を同じテーブルに蓄積し続けるため、データソースのスキーマが大きく変わっても、古い履歴に対する後方互換性を保つ必要があります。
シャーディングテーブルのような、日毎の状態を異なるテーブルに保存する選択肢と比べると、dbt snapshot によるスナップショットはスキーマを維持する難易度が高いです。
そこで、データソースの例外的な変更に対して dbt snapshot がどこまで追従できるのかを検証し、復旧や検知方法を整理します。
設定
以下の操作では、データウェアハウスに BigQuery を使用します。
まず、スナップショットを取るためのデータソースを定義します。
create or replace table `ucchi_test` as (
select 1 as user_id, "太郎" as user_name, 20 as user_age, current_timestamp() as updated_at
union all
select 2, "花子", 30, current_timestamp()
)
次に、dbt snapshot でスナップショットを取るための設定を定義します。
{% snapshot ucchi_test_snapshot %}
{{
config(
strategy="timestamp",
unique_key="user_id",
updated_at="updated_at",
)
}}
select *
from `ucchi_test`
{% endsnapshot %}
dbt snapshot --select ucchi_test_snapshot
コマンドを実行すると、以下のスナップショットが記録されます。
データソースの変更
様々なパターンの変更に対して、dbt snapshot がどこまで追従できるのか確かめます。
値の更新
太郎さんの年齢を25歳に変更します。
update `ucchi_test`
set
user_age = 25,
updated_at = current_timestamp()
where user_id = 1
再度dbt snapshot --select ucchi_test_snapshot
を実行すると、値の変更履歴が追加されました。
古い履歴の dbt_valid_to
にはタイムスタンプが追加され、この履歴が現在有効でないことを表すようになりました。
列の追加
性別を表すuser_sex
列を追加します。
alter table `ucchi_test`
add column user_sex int64;
再度dbt snapshot --select ucchi_test_snapshot
を実行すると、スナップショットにもuser_sex
列が追加されました。
列に値を追加
user_sex
列に値を追加します。
update `ucchi_test`
set
user_sex = 1,
updated_at = current_timestamp()
where user_id = 1;
update `ucchi_test`
set
user_sex = 2,
updated_at = current_timestamp()
where user_id = 2;
再度dbt snapshot --select ucchi_test_snapshot
を実行すると、user_sex
列に値の入ったレコードが追加されました。
列の改名
user_sex
列をsex
に改名します。
alter table `ucchi_test`
rename column user_sex to sex
再度dbt snapshot --select ucchi_test_snapshot
を実行すると、古い列user_sex
はそのままに、新しい列sex
が追加されました。
dbt snapshot は、データソースの列名が変わった場合、改名する前の古い列は残した上で、改名した後の新しい列を追加するようです。
しかし、改名した後の列sex
の値が null になってしまっています。
dbt snapshot はスキーマ変更には追従しましたが、今回の設定ではupdated_at
が更新されていないため、値の保存される列がuser_sex
からsex
に変わったことには追従できませんでした。
そこで、updated_at
を更新してみます。
update `ucchi_test`
set
updated_at = current_timestamp()
where true
再度dbt snapshot --select ucchi_test_snapshot
を実行すると、値がuser_sex
列からsex
列に移動した履歴が追加されました。
「列名が変わった」「updated_at
列の値は更新されない」という状況を検知・対応するには、以下の方法があります。
- ① スナップショットの strategy を
timestamp
からcheck
にする - ② 更新対象のSQL文で対象の列名を明示する
- ③ スナップショットの最新の値と更新元のテーブルが一致することを確かめる
個人的には③で検知して手動対応するのがよいと考えています。①は dbt が推奨していませんし、②はデータソースの列追加に気付けない恐れがあるからです。
列名の変更があまりにも頻繁に起こる場合は、手動対応の手間を減らす観点から①を選ぶのもよいかもしれません。
列の型を変更
sex
列の型を INT から STRING に変えてみます。
alter table `ucchi_test`
add column sex2 string;
update `ucchi_test`
set
sex2 = cast(sex as string),
updated_at = current_timestamp()
where true;
alter table `ucchi_test`
drop column sex;
alter table `ucchi_test`
rename column sex2 to sex;
この状態で、再度dbt snapshot
コマンドを実行するとエラーになってしまいます。
公式の記事に、このような場合の対処法が載っていました。以下の2つです。
https://discourse.getdbt.com/t/snapshots-when-column-data-type-changes/10452/2
- ① スナップショットを保存する際に型を戻す。例えば
safe_cast(sex as int64) as sex
とする - ② スナップショットテーブルの列の型を手動で変えておく
今回は②で対処します。
alter table `ucchi_test_snapshot`
rename column sex to sex_old;
alter table `ucchi_test_snapshot`
add column sex string;
update `ucchi_test_snapshot`
set sex = cast(sex_old as string)
where true;
alter table `ucchi_test_snapshot`
drop column sex_old;
dbt snapshot
コマンドが通りました。スナップショットのsex
列の種類が STRING に変わっていることが確認できます。
古い列名に戻す
データソースのsex
列をuser_sex
に戻してみます。
alter table `ucchi_test`
add column user_sex int64;
update `ucchi_test`
set
user_sex = cast(sex as int64),
updated_at = current_timestamp()
where true;
alter table `ucchi_test`
drop column sex;
再度dbt snapshot
コマンドを実行すると、元々あった列user_sex
に再び値が記録されました。
行を増やす
新しいユーザー「次郎さん」が増えたとします。
insert into `ucchi_test`
(user_id, user_name, user_age, user_sex, updated_at)
select 3, "次郎", 18, 1, current_timestamp()
再度dbt snapshot
コマンドを実行すると、スナップショットにも次郎さんの履歴が記録されました。
行を減らす
次郎さんの行を削除します。
delete from `ucchi_test`
where user_id = 3;
再度dbt snapshot
コマンドを実行すると、次郎さんのログのdbt_valid_to
に値が入りました。つまり、この履歴は現在有効でないと判定されるようになりました。
復旧
スナップショットは何かしらの理由で壊れてしまうことがあります。例えば、壊れたデータソースに対してスナップショットを取ってしまったケースや、スナップショットの仕組み自体がうまく機能しない場合などです。
そこで、履歴のロールバックと、新しいテーブルへの履歴の引き継ぎを行います。
履歴をロールバックする
以下の手順で、履歴をロールバックできます。
- ロールバックしたいタイムスタンプより
dbt_valid_from
が後の行を削除 - ロールバックしたいタイムスタンプより
dbt_valid_to
が後の行におけるdbt_valid_to
列の値を null に置換する
例えば、同期元のテーブルが空になってしまったとします。
delete from `ucchi_test`
where true;
dbt snapshot
コマンドを実行すると、dbt_valid_to
に値が入ります。すべての履歴が現在有効でないことになってしまいました。
異常が発生したタイムスタンプ(2024-12-08 16:25:52.849929 UTC
)の前まで履歴をロールバックさせます。
-- 異常発生後に挿入された行を削除
delete from `ucchi_test_snapshot`
where dbt_valid_from >= timestamp("2024-12-08 16:25:52.849929 UTC");
-- 異常発生後に無効となった行を有効化
update `ucchi_test_snapshot`
set
dbt_valid_to = null
where dbt_valid_to >= timestamp("2024-12-08 16:25:52.849929 UTC");
異常が発生する直前の履歴が再び有効になりました。履歴をロールバックできました。
最後に、データソースに値を戻し、正常化させます。
insert into `ucchi_test`
(user_id, user_name, user_age, user_sex, updated_at)
select 1, "太郎", 25, 1, timestamp("2024-12-08 16:10:12.283461 UTC")
union all
select 2, "花子", 30, 2, timestamp("2024-12-08 16:10:12.283461 UTC")
この状態でdbt snapshot
を実行しても、スナップショットに変化はありません。
履歴を新しいテーブルに引き継ぐ
スナップショットの向き先をucchi_test_snapshot2
に変えてみます。
{% snapshot ucchi_test_snapshot2 %}
{{
config(
strategy="timestamp",
unique_key="user_id",
updated_at="updated_at",
)
}}
select *
from ucchi_test
{% endsnapshot %}
dbt snapshot
を実行すると、新しいスナップショットテーブルucchi_test_snapshot2
が作成されます。
このテーブルに、過去のスナップショット(ucchi_test_snapshot
)を引き継がせます。
まず、古いスナップショットには過去存在した列sex
があるので、新しいスナップショットにも列を追加しておきます。
alter table `ucchi_test_snapshot2`
add column sex string
移行手順は以下です。
- 履歴の結合
- 過去のスナップショットから、特定のタイムスタンプまでの値を切り出す
-
dbt_valid_to
が null のものは、特定のタイムスタンプで埋める -
dbt_valid_to
が特定のタイムスタンプより後のものも、特定のタイムスタンプで埋める
-
- 新しいスナップショットの
dbt_valid_from
を特定のタイムスタンプにする
- 過去のスナップショットから、特定のタイムスタンプまでの値を切り出す
- 結合前後のログを統合
- 更新前後で値が全く変わっておらず、前の
dbt_valid_to
が後のdbt_valid_from
であるログを発見する - 前の
dbt_vaid_to
を null にし、後のログを削除する
- 更新前後で値が全く変わっておらず、前の
- 新しいスナップショットを空にし、統合したログを挿入する
移行スクリプトを書きます。
-- 突合済みの履歴を作成
create temp table `rows_to_insert` as (
with old_snapshot as (
select
user_id, user_name, user_age, updated_at, user_sex, dbt_scd_id, dbt_updated_at, dbt_valid_from,
case
when dbt_valid_to is null then timestamp("2024-12-09 10:00:00", "Japan")
when dbt_valid_to > timestamp("2024-12-09 10:00:00", "Japan") then timestamp("2024-12-09 10:00:00", "Japan")
else dbt_valid_to
end as dbt_valid_to, sex,
from `ucchi_test_snapshot`
where dbt_valid_from <= timestamp("2024-12-09 10:00:00", "Japan")
)
,
new_snapshot as (
select
user_id, user_name, user_age, updated_at, user_sex, dbt_scd_id, dbt_updated_at,
case
when dbt_valid_from < timestamp("2024-12-09 10:00:00", "Japan") then timestamp("2024-12-09 10:00:00", "Japan")
else dbt_valid_from
end as dbt_valid_from,
dbt_valid_to, sex,
from `ucchi_test_snapshot2`
)
,
unioned as (
select * from old_snapshot
union all select * from new_snapshot
)
,
unnecessary_rows as (
select
concat(u1.user_id, u1.dbt_valid_from) as before, -- 前の履歴は dbt_valid_to を null にする
concat(u2.user_id, u2.dbt_valid_from) as after -- 後ろの履歴は削除
from unioned as u1
inner join unioned as u2
on u1.user_id = u2.user_id -- 同じPK
and u1.user_name = u2.user_name -- 属性値が同じ
and u1.user_age = u2.user_age
and u1.dbt_valid_to = u2.dbt_valid_from -- 1つ後ろの履歴と結合
and u2.dbt_valid_to is null -- 後ろの履歴が最新版
)
,
merged as (
select
user_id, user_name, user_age, updated_at, user_sex, unioned.dbt_scd_id, dbt_updated_at, dbt_valid_from,
if(before.pk is not null, null, dbt_valid_to) as dbt_valid_to,
sex,
from unioned
left join (select before as pk from unnecessary_rows) as before on concat(unioned.user_id, unioned.dbt_valid_from) = before.pk
left join (select after as pk from unnecessary_rows) as after on concat(unioned.user_id, unioned.dbt_valid_from) = after.pk
where
after.pk is null
)
select * from merged order by user_id, dbt_valid_from
);
-- 新しいスナップショットから全ての行を削除
delete from `ucchi_test_snapshot2`
where true;
-- 突合済みの行を挿入
insert into `ucchi_test_snapshot2`(
user_id, user_name, user_age, updated_at, user_sex, dbt_scd_id, dbt_updated_at, dbt_valid_from, dbt_valid_to, sex
)
select
user_id, user_name, user_age, updated_at, user_sex, dbt_scd_id, dbt_updated_at, dbt_valid_from, dbt_valid_to, sex
from
`rows_to_insert`;
履歴の引き継ぎが完了しました。現在有効でない古いログが追加されていることがわかります。
検知
スナップショットテーブルが壊れていないことを定常的に観測することはとても重要です。
そこで、いくつかの基本的な項目について、異常がないか調べるクエリを書きます。
クエリの実行結果が1行でも存在していれば、異常な行があることになります。
PKが一意になっているか
select
dbt_scd_id, count(*) as count_log
from
`ucchi_test_snapshot2`
group by
dbt_scd_id
having
count_log > 1 -- PKに対する重複を検知
有効期間に隙間や重複がないか
with ordered_periods as (
select
user_id,
dbt_valid_from,
dbt_valid_to,
lead(dbt_valid_from) over (partition by user_id order by dbt_valid_from) as next_valid_from
from
`ucchi_test_snapshot2`
)
select *
from ordered_periods
where
dbt_valid_to != next_valid_from -- 有効期間同士の間の隙間や重複を検知
結論
dbt snapshot は、「列名は変わるが値のupdated_at
はそのまま」という場合を除いて、データソースの変更に追従できることがわかりました。
また、障害時には復旧作業が行えることを確認しました。スナップショットの履歴をロールバックしたり、古い履歴を新しい履歴に引き継いだりすることができました。
おわりに
dbt snapshot は思ったより例外的な変更にちゃんと追従してくれました。データウェアハウス上で仕組みが完結しているため、最悪手動でテーブルを整えればなんとかなるのもいいですね。
社内分析用途でテーブルの変更履歴が欲しい場合は、選択肢の1つとして覚えておくとよいでしょう。