まえがき
Data Engineering Design Patterns をぼちぼち読んでいって、感想を残していきます。
この書籍を読む背景や感想を残していく方針についてはこちらを参照してください。
3章の内容
- Unprocessable Records
- Pattern: Dead-Letter
- Duplicated Records
- Pattern: Windowed Deduplicator
- Late Data
- Pattern: Late Data Detector
- Pattern: Static Late Data Integrator
- Pattern: Dynamic Late Data Integrator
- Filtering
- Pattern: Filter Interceptor
- Fault Tolerance
- Pattern: Checkpointer
感想
Dead-Letter パターンを実現するマルチテーブルインサート
Dead-Letter パターンとは、データ変換中に処理不能なレコード(Dead Letter)が見つかったら、後で調査できるように別の場所に保存しておこうというデザインパターンです。保存された処理不能レコードは調査・修正後にリプレイ処理で本来の処理結果に書き戻すことも含まれます。
書籍内では、この正常処理済みデータと処理不能レコードに振り分ける処理を SparkSQL で実現するために、
- CASE 文を使って処理可能/不能を判断し、その結果をフラグ列として追加する。
- その結果を永続化する。
- 永続化した結果からフラグ列に基づいて正常データと処理不能レコードを別々に保存する。
という回りくどい例が載っています。処理可能/不能の判定を二重に行うことを回避するための方策です。
ただし、普通はマルチテーブルインサートを用います。以下は Snowflake での例です。
INSERT FIRST
WHEN (full_name is null or version is null) THEN -- バリデーション条件
INTO dead_letter_table
ELSE
INTO correct_data_table
SELECT * FROM devices_to_load
データの二重読み込みなしに正常データと処理不能データに分けることができるので、大量データを扱うときは便利ですね。
ただ、SQL の場合は書籍で指摘されているように、
- 予期できないエラーレコードを包括的に捕捉する try-catch のような仕組み
- Apache Flink の side output のような便利な仕組み
はありません(外部からのデータロードであれば不良レコードを別ファイルに保存するデータローダーは一般的ですが)。なので、厳密に処理不能レコードを判定する条件式を定義しておく必要がありますね。それがちょっと大変。
Dead-Letter パターンにおけるリプレイ処理
Dead-Letter パターンで処理不能レコードが見つかった場合、そのデータを無視するか、または修正してリプレイ処理を実行することになります。
後者の場合、リプレイ処理は手動で実行されることが多いのですが、パイプラインの処理が何段にも連なる場合は、リプレイ処理もその分だけ手動で実行する必要があります。この面倒な影響を書籍では Snowball backfilling(雪玉が転がるとだんだん大きくなることから)と呼んでいるのですが、大規模な DWH だと運用負荷を跳ね上げる要素になります。
それなら、通常のデータ処理にリプレイ処理を組み込んだらどうかという発想で、以下のような設計をしたことがあります。
- データ変換処理は以下の 2 つを読み込む。
- ソースデータ
- 処理不能レコード(修正済みのフラグが
trueになっているレコードのみ)
- 処理を行い、正常に処理できたデータは正常処理済みデータへ、処理できなかったデータは処理不能レコードへ分けて保存する。
これをすると、処理不能レコードを修正しておけば、次回のデータ変換処理時に自動的にリプレイされるという仕組みです。
- 次の処理実行までリプレイされない
- 処理不能レコードが多いと通常のデータ変換処理も遅くなる
というデメリットもありますが、運用負荷を最小にするという観点ではアリだったなと思っています。
(まぁ、多くのケースでは処理不能レコードは修正されずそのまま放置になることが多いので、そこまでやることは滅多にないですが…)
SQL での重複排除方法
これはもう昔話ですが、重複排除の処理を SQL で実現する際に
- DISTINCT を使う(通常の方法)
- GROUP BY を使う
の 2 パターンがあり、昔の Oracle 11g だと GROUP BY の方が速いという事象がありました。それを知ったタイミングは忙しかったので理由をちゃんと把握しなかったのですが、アルゴリズム上はその通りらしく、その際に実機で試した際にも確かに GROUP BY の方が速かったです。
その後しばらくして、いろんな DBMS で試したんですが、どれも性能は変わらなかったんですよね(Oracle 12c 以降も含む)。
あの性能差は何だったんだろうと、今でもたまに不思議に思っています。
その時刻は何の時刻?
書籍の Late Data の節に "EVENT AND PROCESSING TIME" というコラムがあります。要はレコードの時刻というけど、
- そのレコードが表すイベントの発生時刻
- レコードがパイプラインで処理された時刻
の 2 種類があるよねという話です。
IoT の領域になるともっと面倒で、以下のようなバリエーションがあることがあります。
- 測定が行われた時刻
- IoT データがデータ収集ゲートウェイを通過した時刻
- ストレージやキューなどに保存された時刻
- パイプラインで処理された時刻
古いセンサーだと時計を持っていないことがあるため、2 番目以降の時刻を扱わないといけないケースも少なくありません。
時刻と一言で言っても、何の時刻なのか都度確認したいですね。
Static Late Data Integrator = 期間洗い替え?
Static Late Data Integrator パターンは、データが N 日(固定、書籍ではルックバックウィンドウと呼んでいる)遅延して到着した場合でも処理する必要性がある場合、N日前 + 当日分のデータをすべて再処理(バックフィル)する方法で遅延データに対応しようとする方法です。
例えば、N = 3 の場合に、2025/11/09 のデータを処理する日に以下のデータが新しく到着したとします。
- 2025/11/09(通常データ)
- 2025/11/08(遅延データ)
- 2025/11/06(遅延データ)
- 2025/11/05(遅延データ)
この場合、ルックバックウィンドウは 2025/11/06 ~ 2025/11/09 なので、この範囲のデータを再処理します。バックフィルなので、この日に到着したデータだけではなく、過去に到着しているデータと合わせて再処理します。これは当日分処理とほぼ変わらないロジックになるはずです。
(ちなみに、2025/11/05 の遅延データはルックバックウィンドウの範囲外なので無視します。)
このような処理アプローチは私も昔からよく採用しており、範囲洗い替え or 期間洗い替えとよく呼んでいたんですが、Static Late Data Integrator と呼ぶんだとこの書籍で初めて知りました。(処理内容が推測できる命名かは若干気になりますが)
dbt での実現方法については以前に以下の記事の 2-1. と 2-3. で触れました。
この方法は以下のようなメリットがあるので多用しています。
- 何よりシンプル
- べき等性も保ちやすい
- N 日分洗い替えの処理量は多いが、Snowflake や BigQuery など最新の DWH だとそれほど問題にならない
3 点目の処理量がどうしても気になるケースでは、書籍上では次のパターンである Dynamic Late Data Integrator パターンでバックフィルが必要な範囲を絞ればよいと思います。
3つ以上の値をとりえるカラムに~_flag という名前を付けないで
Filter Interceptor パターンはエラーレコードを複数の条件でフィルタリングする場合に、どの条件で何件フィルタリングされたかを分かるようにする方法です。
単純にどの条件でフィルタされたかが分かるだけではなく、
- データの分布が変わったためフィルターされたのか
- バグが混入したためフィルターされたのか
という調査にも有用な情報だとしています。
言葉で説明するより書籍に載っている以下の例を見てもらった方が早いです。エラーレコードを特定する条件が 4 つあり、それに抵触する場合に status_flag へエラー内容を表す文字列を格納します。
SELECT
CASE
WHEN (type IS NOT NULL) IS FALSE THEN 'null_type'
WHEN (LEN(type) > 2) IS FALSE THEN 'short_type'
WHEN (full_name IS NOT NULL) IS FALSE THEN 'null_full_name'
WHEN (version IS NOT NULL) IS FALSE THEN 'null_version'
ELSE NULL
END AS status_flag,
type, full_name, version
FROM input
やっていることはシンプルですが、4 パターンの値(null を含む場合は 5 パターン)をとりえるカラムに対して status_flag という名前はいかがなものかという気がします。flag という区分語は 2 値をとりえるカラムに付けるべきものと思うのですが。error_reason とかの方が良いかなと思います。
カラムの命名の問題だけではなく、先頭の条件に合致した場合にそれより後の条件への合致をチェックできていないのもあまり好ましくはないかなと思っています。
やるなら以下のような方法が良いかなと思うのですが、どうでしょうか。
SELECT
CASE WHEN (type IS NULL) THEN true ELSE false END AS type_is_null,
CASE WHEN (LEN(type) <= 2) THEN true ELSE false END AS type_is_short,
CASE WHEN (full_name IS NULL) THEN true ELSE false END AS full_name_is_null,
CASE WHEN (version IS NULL) THEN true ELSE false END AS version_is_null,
type, full_name, version
FROM input