LoginSignup
3
1

More than 1 year has passed since last update.

DBからの更新日時に基づく差分データ抽出

Posted at

DWH などデータ分析基盤上のETL/ELT処理において、データソースからデータを抽出する際に、処理時間の短縮を目的としてデータソースから全量データではなく追加・更新されたデータ(以下、差分データ)のみを抽出する手法が良く使われます。

03.png

このデータソースから差分データのみを抽出する処理の実装手段は複数あるのですが、比較的よく使われる手段の1つとして、データソース側でレコードが追加・更新された際に現在のタイムスタンプを格納する「更新日時」という列に基づいて差分抽出を行うというアプローチがあります。

例えば、まずDWH側で過去にどこまで追加・更新されたレコードを取り込んだかを以下のような SQL 文で確認します。

DWH側
select
    max("更新日時")
from
    "ステージングテーブル"

その後に、データソース側でその日時以降に追加・更新されたレコードのみを以下の SQL 文で抽出します。

データソース側
select
    "購入ID",
    "ユーザーID",
    "購入商品ID",
    "購入数",
    "更新日時"
from
    "購入履歴"
where
    "更新日時" > [DWH側のクエリで取得した日時]

(この方法は dbt のincremental modelsに関するドキュメントの models/stg_events.sql を参考にしています)

この方法は差分データ抽出する際にデータソース側でデータ変更が発生していなければ概ね問題ないのですが(データソース側でレコードがDELETEされるケースを考慮しなければ)、データソース側でデータの追加・更新が発生している場合は、差分データを抽出し損ねる可能性があります。

例えば以下のようなケースです。

06.png

  1. トランザクション①が購入ID=101のレコードをINSERTするが、何かの原因(別の処理をする、ネットワークで少し待たされるなど)でコミットはすぐにはされない。
  2. トランザクション②が購入ID=102のレコードをINSERTし、すぐにコミットされる。
  3. 1回目の差分データ抽出の処理が動く。この時点で購入ID=102のレコードはコミット済みのため抽出されるが、購入ID=101のレコードは未コミットのため抽出されない。
  4. トランザクション①がコミットする。
  5. トランザクション③が購入ID=103のレコードをINSERTし、すぐにコミットされる。
  6. 2回目の差分データ抽出の処理が動く。この際、更新日時の基準となる日時は抽出済みの購入ID=102の更新日時となるため、それより前にINSERTされた購入ID=101のレコードは今回も抽出されない(購入ID=103のレコードはもちろん抽出される)。

抽出漏れが発生する根本原因はレコードの更新日時とコミットのタイミングがずれていることにあります。

そのため、先に示した差分データ抽出の方法は、データソース側でデータ抽出中にデータの追加・更新が発生する可能性がある場合は使いづらいです。

対策としては幾つかあり、例えば以下のような方法があります。

  • 案1:データ抽出の際にデータソース側のテーブルにロックを掛ける。
    • 最もラジカルな方法。ただ、これで困らないのであれば一番楽。
    • もちろん、データソース側のデータ追加・更新処理は待たせることになる。
  • 案2:[抽出済みレコードの更新日時の MAX] < "更新日時" < (current_timestamp - α)の条件で抽出する。
    • 更新日時のの抽出条件に現在時刻より少し前の時刻を上限とすることで、この範囲に未コミットのレコードが含まれていないことを保証する。
    • αはトランザクションの最大時間を指定する。
    • 毎晩 1:00 に差分抽出のバッチ処理を動かし、その中で前日 0:00 - 24:00 までという条件を指定するのも本質的には同じ。
    • αより時間が掛かるロングトランザクションがあると危険。
    • ごく直近に追加・更新されたレコードの抽出は次回に回される。
  • 案3:"更新日時" > [抽出済みレコードの更新日時の MAX] - β の条件で差分データ抽出する。
    • 上の例の購入ID=101のレコードを2回目で抽出できるよう、条件を少し緩める。
    • 前回抽出した差分データと今回抽出する差分データに重複が発生する可能性があるが、それは DWH 側で何とかする。
  • 案4:データの追加・更新処理が動いていないタイミングの過去断面からデータを抽出する。
    • 例えば、Oracle では v$transaction などを監視し、データ追加・削除が発生していない時刻を確認できたら、フラッシュバッククエリなどでその時刻の断面のデータから差分データ抽出を行う。
    • 案2と同様、ごく直近に追加・更新されたレコードの抽出は次回に回される。
  • 案5:Change Data Capture(CDC)を使う。
    • 使えるなら使いたい。

どの方法が適しているかは要件次第ですが、データ抽出時にデータ追加・更新がないかはまず確認しましょう。

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1