本記事ではデータパイプラインにおいてべき等性(Idempotent)を保つための方法について、パターン分けして整理してみたいと思います。
1. はじめに
データパイプラインでは べき等性(Idempotent)と言われる「ある操作を何度実施しても同じ結果になる」という条件を満たすことが好ましいと言われています。
ただし、べき等性を満たすためには具体的にどうすれば良いかというのは結構難しいので、自分の頭の整理も兼ねてパターンに分けて考えてみたいと思います。
単純化のために操作を1回実行した結果と2回実行した場合を考えたいと思います。
同じ状態(上図の「初期状態」)から操作を1回実行した場合と2回実行した場合の結果の状態(それぞれ「状態X」「状態Y」)が同じであれば、べき等性を実現できていると本記事では考えます。
1回実行時の操作 (A) と2回実行した場合の2回目の操作 (C) は、正しい入力データでエラーなく処理していないとそもそも評価する意味がないので、そのように前提を置きます。
(A) (C) 以外の2回実行した場合の1回目の操作 (B) については、以下のパターンが考えられると思います。
- 正しい入力データでエラーなく操作が完了
- エラーが発生し途中で操作が中断
- 間違った入力データが与えられ、結果が不正(処理自体はエラーなく終了)
それぞれのパターンごとに、べき等性を実現するためにどのような注意が必要か検討してみたいと思います。
ちなみに、べき等性を保つ一番簡単な方法として、今までの出力結果をすべて破棄して、今までの入力データからすべてを計算しなおすという方法があります。これが一番シンプルで採用できるなら採用すべきですが、今回においては、処理時間や下流システムとの関係の都合でその方法は取れないことを前提とします。
結論だけ知りたい人は 3. に飛んでください。
2. パターンごとのべき等性実現のために必要なこと
(1) 正しい入力データでエラーなく操作が完了
a. 二重処理の回避
例として、入力で与えられたファイルを変換して出力する処理を考えてみます。
- 1回実行のパターンでは、ファイル①~③の3ファイルを処理
- 2回実行のパターンでは、1回目はファイル①~②の2ファイルを、2回目は追加されたファイル③を処理
(ファイル①~③は処理された結果、ファイルA~Cを生成するものとする)
2回実行のパターンで求められることは、2回目の処理ではファイル③のみ処理されて結果的に3ファイルのみ出力された状態にすることです(少なくとも見た目上は)。ファイル①~②を再度処理し、ファイルA~Bを二重に出力(5ファイルを出力)してはいけません。
1回目処理終了後に入力ファイル①~②を削除すれば二重処理は回避できます。ただし、
- これらのファイルを別処理でも利用したい
- アーカイブや監査などの都合で残しておきたい
- 後述する部分的な洗い替えのために残しておきたい
などのケースは出てくるので、ファイル削除には頼りづらい場合もあるかと思います。
これを回避するには、どの入力データが処理済みかを記録しておき、次回以降の処理実行時に処理済みファイルは除外することで二重処理を排除することが必要になります。
よくあるパターンとしては、以下が良くあるかと思います。
- 入力がファイルの場合、処理済みファイルのパスを DB テーブルに保存しておく。
- 入力がテーブルレコードの場合、レコードの更新日時などに注目してどのレコードまで処理したかを保存しておく。
二重処理の排除はファイルの DB へのロード時によく求められます。例えば、Snowflake でデータをロードする COPY コマンドでは、ロード済みのファイル+そのメタデータを内部で記録しておき、次回の COPY 実行時には自動で除外するという機能が備わっていたりします。
b. 追加ロジックと更新ロジックの不一致
a のケースは入力ファイル(データ)と出力ファイル(データ)が1対1のケースでしたが、そうでない場合もあります。この場合、2回実行のパターンでは更新処理が必要になります。
次のようなケースを考えてみます。
- sales_detail(売上詳細)テーブルを集計して sales_summary(売上サマリー)テーブルを作成する。
- sales_detail テーブルは追記オンリー(更新なし)、かつ sales_timestamp の順番でデータが追加される。
- キャンセルや変更の場合は赤黒処理される(seqがインクリメントされる)。
(例をシンプルにするためにかなり無理がある仕様ですが、そこは見逃してください)
この処理を以下の MERGE 文で実現しようとする発想はよくあると思います。
MERGE INTO sales_summary s
USING (
select
trunc(sales_timestamp) sales_date,
sum(sales_amount) total_sales_amount
from
sales_detail
where
sales_timestamp > (前回までで処理済みの sales_timestamp)
group by
sales_date
having
total_sales_amount > 0
) d
ON (s.sales_date = d.sales_date)
WHEN MATECHED
THEN UPDATE SET s.total_sales_amount = s.total_sales_amount + d.total_sales_amount
WHEN NOT MATCHED
THEN INSERT VALUES (d.sales_date, d.total_sales_amount)
;
実はこれは厳密にはべき等性を満たしていません。
- 1回でまとめて処理されるケースでは、出力は 0 件になります。
- 2回に分けて(1レコードずつ)処理されるケースでは、total_sales_amount = 0 の 1 件が生成されます。
かなり強引な例ですが、結果が変わってしまう根本の原因は MERGE 文における追加時(WHEN NOT MATCHED)と更新時(WHEN MATCHED)のロジックの整合性が取れていないことに起因しています(売上キャンセルなどになった場合の挙動の差)。
対策として
- 追加ロジックと更新ロジックが整合するよう注意深く設計する。
- 一定の範囲を部分に洗い替える(DELETE/INSERT)などを用いてそもそも更新ロジックを不要にする。
が考えられますが、前者は根性論に近いので、後者を個人的にはお勧めしたいです。後者の部分的に洗い替えるアプローチは以下で説明しています(2-1. と 2-3.)。
(2) エラーが発生し途中で操作が中断
一時的なネットワーク / サービスの瞬断やメモリ / ディスク容量不足などで不意にエラーが発生し処理が中断することは少なくありません(特にクラウド環境においては)。
ファイルシステム上でファイルを扱う処理に多いのですが、エラーが発生し処理が途中で中断すると、出力データの状態が中途半端になることがあります(図の左側)。
この状態で特に何のケアもなく2回目の処理を実行すると、出力結果に1回目の中途半端なデータが残ることになります(図の右側)。
これを防ぐには、処理の最初に中途半端に生成された出力データ(上の図ではファイルB)を削除するというロジックを入れる必要があります。「このデータは中途半端か否か」を判断するのは一つ考えどころになりますが、
- 2-(1)-a で述べた処理済みファイルの結果と出力先を突き合わせて、処理済みになっていない出力ファイルを削除する。
- 出力結果は一時フォルダに保存し、処理が完了した後に本来の出力フォルダに移動する。処理の最初では一時フォルダ内のファイルをすべて削除する。(移動処理がアトミック出ないのがリスクですが)
といったアプローチが考えられます。
そもそもトランザクションに対応した DB/DWH で処理を完結できれば、処理全体を1つのトランザクションとすることで中途半端な状態を回避できる(エラーになればロールバックされる)ので、考えることが減りますね。
また、ファイルベースの処理でも Amazon S3 のようなオブジェクトストレージを用いる場合、ファイル単位で整合性をチェックする仕組みもあるため、問題を回避するのも楽になっているような気はします。(複数ファイルにまたがった整合性に関してはまだ検討が必要ですが)
この節の内容は以下を参考にしています。(内容自体は次の (3) のパターンに近いのですが)
(3) 間違った入力データが与えられ、結果が不正
データパイプラインを運用していると、上流から間違った入力データを連携され、それをエラーなく処理してしまったというケースは度々出くわします。
間違った入力データで一度処理してしまっても、再度正しいデータを元に処理を再実行すれば正しい結果に戻る(最初から正しい入力データで処理した結果と同じになる)というのはべき等性に期待されることです。それを実現するためのポイントについて考えていきたいと思います。
大きくポイントは2つあると思います。
間違った入力データの特定
まず、どの入力データが間違ったデータから正しいデータに入れ替えられたかを識別する必要があります(図のケースではファイル②)。基本的に更新されたファイルは正しいデータに入れ替えられたと認識すれば良いはずです。そのためには、
- ファイルの更新日時
- ファイルサイズ
- ファイルのチェックサム値
などを1回目に記録しておき、2回目にその記録と実際の入力データを突き合わせて更新の有無をチェックするのが常套手段かと思います。(更新日時やサイズは更新されていても変わってないということもあるので、リスクがありますが)
ちなみに、前述した Snowflak の COPY コマンドは ETag で更新有無をチェックしていたと思います。
図では入力データがファイルのケースですが、入力データが DB テーブルのレコードの場合は更新日時を持たせておくと更新有無を判断できます。
間違った入力データに基づいた出力結果の特定
次に必要なことは、間違ったデータに基づく出力結果を消す必要があるため、その出力結果を特定する必要があります(図のケースではファイルB)。
図のケースでは入力ファイルと出力ファイルが1対1なのでファイル名などでマッピングを取れるようにしておけば楽なのですが、集計などを含む場合は検討が必要になります。
例えば、以下のような集計を含むケースを考えます。
sales_details テーブルの sales_id = 1003 のレコードが間違ったデータの場合、それに基づいて出力された結果は sales_summary テーブルの sales_date = 2024-04-27 に該当します。このデータを削除した上で 2024-04-27 の入力データを再度部分洗い替えで集計しなおせば、正しい結果が得られることになります。
(削除せずに、間違ったデータと正しいデータの差分を計算してそれを反映させるというアプローチもありますが、差分を取る処理は大変なので、部分洗い替えした方が良いかと思います。)
3. まとめ
以上をまとめると、べき等性を保つためには以下に気を付ける必要がありそうです。
- 入力データに対して、それが処理済みか判別できるように記録しておく。
- 処理済み入力データのリストを記録しておく
- 時系列にどこまで処理したかを記録しておく
- など
- その記録に基づいて、入力データの二重処理を回避するロジックを入れる。
- 入力データが一度処理された後に更新されたか判別できるようにしておく。
- 入力データ処理時に、入力データの更新日付や ETag などの情報を残しておく。
- 入力データそのものにも更新日付などの情報を付与しておく。(特に入力データがレコードの場合)
- など
- もし処理済み入力データでも更新されている場合は、再度処理するロジックを入れる。
- 処理方法に関しては、追加と更新のロジックの不整合が起こりづらい部分洗い替えを検討する。
- 処理がエラーなどで途中で中断することを想定する。
- 処理全体を1トランザクションとして、エラーで中断しても処理途中のデータが残らないようにする。
- 処理開始時に前回の処理で中途半端にのこっているデータをクリーンアップする。
- など
ここまで整理しましたがおそらく網羅的ではないと思うので、「こういうポイントも気を付けた方がよい」という意見があれば、コメントしてもらえると幸いです。