まえがき
X で "Data Engineering Design Patterns" という書籍を知りました。
1 章にも記載があるのですが、データエンジニアリングにおいて求められる以下の観点を考慮したデザインパターン本であるらしいです。
- 障害管理
- バックフィル
- べき等性
- データの正確性
もともと私は以下のような課題感を持っていたので、この書籍に興味を持ちました。
ぼちぼち読み始めるとともに、Qiita に感想を残しておきたいと思います。
以下のようにユルくやっていきたいと思います。
- まとめではないですし、それを目指しません。書籍の中で扱っている話題について思ったことを脇道に逸れながらアレコレ書いていきます。
- 書籍全体を読まずに対象の章(今回は Chapter 2)まで読んでの記述になるので、後半の章に記載されている/されてないはあまり意識しない内容になっています。
- 本書のコード例は Spark 系が多そうです。ただ、私自身はあまり詳しくないので主に SQL/DWH の観点から感想を述べます。
- 続くとは限りません。
ちなみに、以下から登録すると無料で PDF を入手できます。(私はこれを知らずに楽天 Kobo で買ってしまった…)
2章の内容
以下が目次(パターンの一覧)です。
- Full Load
- Pattern: Full Loader
- Incremental Load
- Pattern: Incremental Loader
- Pattern: Change Data Capture
- Replication
- Pattern: Passthrough Replicator
- Pattern: Transformation Replicator
- Data Compaction
- Pattern: Compactor
- Data Readiness
- Pattern: Readiness Marker
- Event Driven
- Pattern: External Trigger
- Summary
感想
アトミック洗い替えのバリエーション
Full Loader パターンとしてすぐ思いつくのは DROP & INSERT(実際には CTAS) や TRUNCATE & INSERT ですが、この実装方法には以下の問題があると指摘されています。
-
DROPやTRUNCATEが実行されてINSERTが完了する前にユーザーがSELECTすると、結果が 0 件になる。 - 予期しないデータの問題が発生した場合、処理前の正常なデータが消えてしまう。
これを防ぐために書籍では以下のような方法を提案しています。(items へのフルロードを想定)
-- 1回目のフルロード
COPY INTO items_001 FROM ...;
CREATE OR REPLACE VIEW items AS SELECT * FROM items_001;
-- 2回目のロード
COPY INTO items_002 FROM ...;
CREATE OR REPLACE VIEW items AS SELECT * FROM items_002;
これの別パターンとして 2 章では触れられていないのですが、dbt などでよく使われるアトミック洗い替えと呼ばれる方法(Write-Audit-Publish パターンとも呼ぶ)や
COPY INTO items_tmp FROM ...;
ALTER TABLE items_tmp SWAP WITH items;
シノニム切り替えと呼ばれる方法があります(Oracle や MSSQL ではよく見るけど、Snowflake/BigQuery などではサポートされていない)。
-- 1回目のフルロード
COPY INTO items_001 FROM ...;
CREATE OR REPLACE SYNONYM items FOR items_001;
-- 2回目のロード
COPY INTO items_002 FROM ...;
CREATE OR REPLACE SYNONYM items FOR items_001;
ただ、最近の DWH であれば DELETE & INSERT でも問題ないんですよね。
例えば Snowflake では
- 全件
DELETEはメタデータ操作のみなのでTRUNCATE並みに速い。(参考) -
DELETEとINSERTを同一トランザクションに含めれば、処理途中でも参照ユーザーに 0 件を返すこともないし、エラーになってもロールバックすれば元のデータに戻る。 - タイムトラベル機能があるので、コミット後にデータの問題が見つかっても、以前のデータを参照できる。
(一部の DBMS では TRUNCATE をトランザクションの一部にできるので TRUNCATE & INSERT でも 1 番目と 2 番目は実現可能)
物理削除 vs 論理削除
Incremental Loader パターンの節に物理削除(Hard deletes)に関する記載があります。
増分データをロードする際にデータソースから更新日時を条件に増分データを抽出するという方法は良く採用されますが、データソース側で物理削除が発生する場合はちょっと難しいんですよね。(更新日時を条件にしても削除されたレコードは抽出できない)
これを回避するために、論理削除(Soft deletes)に頼ることができるよと書籍には書いてあるんですよね。
you can rely on soft deletes, where the producer, instead of physically removing the data, simply marks it as removed.
一方で、データモデリング界隈では「論理削除=削除フラグはアンチパターン」という雰囲気がありますね。元の主張は何も考えずにとりあえず削除フラグという考え方はダメだと言っているんですが、ねじ曲がって「気にせず物理削除しろ」と主張する人がいて、業務システムから DWH にデータを連携するデータエンジニアとしてはちょっと待て待て待てと思ったりします。
ちなみに、イミュータブルデータモデルであれば大歓迎です。
CDCとテーブル間整合性
増分ロードのもう1つのパターンとして Change Data Capture パターンが紹介されています。データソース DB のトランザクションログから変更データを抽出して DWH に適用する方法ですね。Fivetran や Airbyte、Qlik Replicate、最近では Snowflake Openflow などが該当します。
この方法はバッチジョブを実装せずに増分ロードを実現できるし、先に述べた物理削除の問題も解決できるので魅力的なのですが、実際にはいろいろ問題があります。
- テーブル間の整合性を保つことができない。
- 増分ロードできないパターンが稀によくある。(主キーの更新系が鬼門)
- データソース側で大量更新が発生すると、DWH 側への適用に大きな遅延が発生しやすい。
1 番目の問題は "Data Semantics" という節で触れられています。
具体的には、データソース側で以下のようなトランザクションが発生したとします。
BEGIN TRANSACTION;
INSERT INTO table_a VALUES (1);
INSERT INTO table_b VALUES (1);
COMMIT;
BEGIN TRANSACTION;
INSERT INTO table_a VALUES (2);
INSERT INTO table_b VALUES (2);
COMMIT;
ソースとなるテーブル table_a と table_b はどのタイミングでも一致するデータを保持するはずなのですが、CDC の多くのツールはこれらの変更を以下のようにテーブルごとに分割してターゲット側に適用しようとします。
INSERT INTO table_a VALUES (1);
INSERT INTO table_a VALUES (2);
INSERT INTO table_b VALUES (1);
INSERT INTO table_b VALUES (2);
1 ブロック目の処理が完了し、2 ブロック目の処理が未完了の状態で 2 つのテーブルを結合する SELECT を行うと、一致するレコードが見つからないという不整合が発生します。いずれ正しい状態にはなる(Eventual Consistency)のですが、CDC を採用する場合はこの問題を受け入れる必要があります。
ちなみに、CDC の中でも Oracle GoldenGate や Qlik Replicate ではこの問題に対応する設定があります。その代わりターゲットへの適用遅延は大きいですが。
PUSH と PULL
Passthrough Replicator パターンの中に以下のような記述があります。
you should implement the replication with the push approach instead of pull.
データを複製するときは、
- データソース側がデータをコピーしてターゲット側に送るべき(PUSH)
- ターゲット側がデータソース側からデータを抽出する(PULL)はすべきではない
ということを主張しています。もし、PULL を採用した際にターゲット側の該当処理にバグや問題があるとデータソース側に悪影響をあたる、PUSH ならデータソース側が頻度やスループットを制御できるということを理由に挙げています。
人によって賛否は分かれそうな主張ですが、
- PULL は簡単だけど問題が起きやすい
- PUSH は面倒だけど堅牢
という技術的な傾向はある気がします。まぁ、実際に PUSH か PULL かは技術的な側面より、ネットワーク的にどっち向きが開いているのかということと、データソースを持つ組織とターゲットを持つ組織のどちらが立場的に強いかという政治的な話に左右されることが多い気もしますが。
Readiness markerと遅延データ
Readiness Marker パターンとはデータ取り込みが完了したことをデータ利用者にどう知らせるかという話です。
- フラグファイルを配置する
- Delta Lake のコミットログに完了を書き込み
- パーティション作成(パーティションあり = データ取り込み完了)
などの方法が紹介されています。
ただし、データ取り込み完了を知らせた後に、遅延データが取り込まれるということもデータエンジニアリングの世界ではよくあることです。"Late Arriving Data" と言われ Kimball 本(The Data Warehouse Toolkit)などでも言及されている問題です。
2 章ではそういう問題もあると触れられているだけで、具体的には 3 章で扱うネタなのですが、対応するにはデータソースにデータを投入する人、データソース側、DWH 側、データ利用者側のそれぞれの組織で合意をとった方法で回さないといけないので、いつも頭が痛い問題なんですよね。
データ抽出から始めてほしかったな
そもそもとしてこの書籍は Data Ingestion から始まっていますが、取り込むべきデータを抽出する Data Extraction にもパターンがあるんじゃないかと思ったり。(CDC のように一体のものもありますが)
- 大量データを効率よく抽出するパターン
- 増分データを物理削除を含めて抽出するパターン
- 断続的にトランザクションが発生する中で、整合性を保った静止断面を抽出するパターン
など、本当は整理しておくべきだよなと思ったり。