まえがき
データエンジニアリングにおけるデザインパターンを扱った書籍 "Data Engineering Design Patterns" の第 6 章 Data Flow Design Patterns の感想を残していきます。
この章は、ユーザーや他のシステムに提供するデータセットを作成するために必要な「複数の」処理ステップをうまく調整して実行するにはどうしたらよいかという問題を扱っています。
ざっくりいうと、
- ジョブ管理
- オーケストレーション
といわれる領域に関わるデザインパターンです。
2~5 章の感想については以下になります。この書籍を読む背景や感想を残していく方針について知りたい場合は、最初の記事を参照してください。
紹介されているデザインパターン
以下が本章で紹介されているデザインパターンです。(概要は私が自分の理解の元、言い直しています)
| セクション | デザインパターン | 概要 |
|---|---|---|
| Sequence | Local Sequencer | 一連の処理を保守性や運用容易性の観点から分割した上で、順次実行する。(1 つの開発チーム内のパイプラインが対象) |
| Isolated Sequencer | 開発チームが異なる2つのパイプラインを連携させる。 | |
| Fan-In | Aligned Fan-In | 複数の先行タスクがすべて成功した後に、後続のタスクを実行する。 |
| Unaligned Fan-In | 複数の先行タスクのすべてが成功しなくとも、後続のタスクを実行する。 | |
| Fan-Out | Parallel Split | 2 つ以上の後続タスクを並列的に実行する。 |
| Exclusive Choice | 2 つの後続タスクのうち、どちらか一方のみを実行する。 | |
| Orchestration | Single Runner | パイプラインの同時実行を 1 に制限する。 |
| Concurrent Runner | パイプラインの並行実行を許容する。 |
感想
前提として、私のこの記事の中では、以下で用語を統一します。
- パイプライン:一連の必要な複数の処理の集まり
- タスク:パイプラインに含まれる 1 つ 1 つの実行単位
- 先行タスク / 後続タスク:依存関係があるタスク(先行タスクの完了が後続タスク開始の条件)
タスクの単位(Local Sequencer パターン)
Local Sequencer パターンは、名前の通りローカル(1 つの開発チーム内のパイプライン内部)で処理を順次実行していくことを指しています。ただ、このパターンの本質は上に記載した通り、「保守性や運用容易性を保つために一連の処理を含むパイプラインをどのように複数のタスクに分けるか」という点にあります。
この課題感は私も昔からずっと気になっていて、以下のような記事を書いたこともあります。
以下では、この記事では触れていなかった書籍で紹介されていた観点を 2 つ説明したいと思います。
命名
以前の記事で、処理の内容/役割/目的が異なる処理は 1 つのタスクにまとめたくないが、主観的な判断にならざるを得ないこともある旨を書きました。
判断難しいケースが残るのは避けられないですが、書籍では判断基準の 1 つとしてタスクに対する「命名」を挙げています。
- タスクに適切な名前を付けるのに難しさを感じた
- タスクに付けた名前が長すぎると感じた
といったケースでは、1 つのタスクに多くの処理を詰めすぎなのではないかと、書籍では提案しています。
特に 1 番目のケースでは自分の経験上、その処理が本質的に何をしているのかの整理の仕方を間違えていて、まとめすぎというより分割するところがズレているケースが多いので、確かに判断基準の 1 つとして有用だなと思いました。
オーケストレーターの機能
Airflow などでは SQL Operator や HTTP Operator が提供されており、SQL 文や REST API リクエストなどの単独実行は容易にタスクとして定義可能です。そうであれば、SQL 文ごと、REST API リクエストごとに 1 つずつタスクを定義していけば、オーケストレーターにおける実装を楽にできるよねという考え方です。
この観点は持ち合わせていなかったのですが、実装量の削減という意味では確かにあり得る基準かと思います。
ただ、
- SQL 文 ① だけのタスク
- SQL 文 ② と 別の処理 ③ からなるタスク(トランザクションやバックフィルなどの観点で 1 つにまとめる必要ありとする)
の 2 つのタスクがあった場合に、1 番目は SQL Operator で、2 番目は異なる Operator で(例えば Bash Operator によるスクリプト呼び出しなど)というのも統一感が欠けてよろしくない気もするので、あまり優先度が高い判断基準ではない気もします。
話は変わりますが、書籍では上の Airflow などオーケストレーターの機能に依存した判断について触れていますが、それから呼び出されるデータ処理側の機能もタスクの単位に影響を及ぼすことはあるかなと思いました。
例えば、ある ETL ツールだと、DB ① からデータを抽出し、別の DB ② に抽出結果を反映するという一連の処理が簡単に書けるものがあります。これを別々にすると、抽出結果を一時的にファイルなどに書き出すなどの処理が必要になるため、データ抽出とデータ反映は 1 つのタスクにまとめた方が良いという考え方もあり得ると思いました。(もちろんこれだけで決定はできないですが)
パイプラインの間の連携方法(Isolated Sequencer パターン)
Isolated Sequencer パターンを利用するユースケースの例として、書籍では
- データ準備チームが開発したパイプライン
- データ可視化チームが開発したパイプライン
の 2 つを順番に実行する場合、その連携(以下の図における破線の部分であるタスク ② とタスク X の連携)をどのように実現するかを扱っています。
実現のパターンとしては一般的に以下があります。
| # | 方法 | 概要 |
|---|---|---|
| 1 | Readiness Marker パターン | タスク ② が正常に完了したら、ストレージに完了を表すマーカーを保存する。タスク X はマーカーを監視し、マーカーが見つかったら処理を開始する。 |
| 2 | トリガー | タスク ② がタスク X をキックする |
| 3 | スケジュール設定 | タスク ② が完了する時刻を見越して、タスク X の開始時刻を設定する。 |
| 4 | 上位パイプライン制御 | 上位のパイプラインがデータ準備チームのパイプラインを実行し、次にデータ可視化チームのパイプラインを実行する。 |
1 番目と 2 番目の方法が書籍では紹介されていますが、先に書籍では触れられていない 3 番目と 4 番目の方法について触れます。
3 番目の方法は危険です。タスク ② の処理時間が想定より長くなると、タスク ② が完了する前にタスク X が開始される可能性があります。(最近もチーム間調整の結果としてこの手法を採用せざるを得なかったけど、やっぱりトラブった…)
4 番目の方法はパイプラインの入れ子を作りやすい JP1 などでたまに見る方法です。その場合、上位のパイプラインはシステム間の調停を担う上位のチームが管理するという役割分担が前提にある気がします。Airflow だと SubDag は非推奨になっているので、あまり採用しないかと思います。
書籍で紹介されている 1 番目と 2 番目の方法に話を戻すと、書籍では基本的に 1 番目を推奨しているように感じます(断言はしていない)。それは 1 番目の方法が 2 番目に比べると疎結合であるからです。
2 番目の方法はより密結合であるため、例えば以下のような問題が起こりえると書籍では紹介しています。
- 後続側のデータ可視化チーム側でタスクやパイプラインの名前を変えてしまうと動かなくなる
- 後続側のデータ可視化チームのパイプラインが先行側のデータ準備チーム作成のデータを不要とした場合でも、依存関係の解除はデータ準備チーム側の対応が必要
- 2 つのパイプラインの実行頻度/スケジュールなどは同一である必要がある(データ準備パイプラインを 1 時間に一回実行し、データ可視化パイプラインを 3 時間に一回実行するといったことができない)
ただし、1 番目の方法はマーカーの管理(読み取ったマーカーの履歴管理や削除、バックフィル時の扱いなど)や依存関係が分かりづらくなるなどの問題もあります。書籍でもデータを共有しないときには、トリガーのアプローチが有効としています。
まぁ、どちらにせよ、結局はチーム間でちゃんとコミュニケーションしろと警告されているわけですが。
先行タスクが失敗しても後続タスクを実行すべきケース(Unaligned Fan-In パターン)
Unaligned Fan-In パターンは複数の先行タスクのすべてが正常終了しなくとも、1 つの後続タスクを実行するというパターンです。
このパターンは先行タスクが複数あることを前提にしていますが、先行タスクが 1 つでもこのような制御をすることがあります。例えば以下のようなケースがあります。
- 先行 / 後続タスクの関係が実行の順番のみを規定しており、データ上の依存関係がない場合
- データ上の依存関係はあるが、先行タスクが失敗しても後続タスクの実行を許容する場合
- 先行タスクの作成するデータが不完全でも、ユーザー提供などを優先して後続タスクを実行したい場合
- DB においてテーブル A の統計情報収集 ⇒ テーブル A をインプットとする処理(統計情報がなくても後続処理自体は実行可能)
この後続タスクを実行するための先行タスクの成否条件を専用のオーケストレーションツールは柔軟に設定できることが多いです。
- 先行タスクがすべて成功した場合にのみ後続タスクを実行
- 先行タスクが 1 つでも成功した場合に後続タスクを実行
- (先行タスクが N 個以上成功した場合に後続タスクを実行)
- 先行タスクがすべてエラーになっても後続タスクを実行
一方、私の観測範囲だと、ETL ツール付属やクラウドベンダーのオーケストレーション機能だと、あまり柔軟に設定できないことが少なくないですね。(専用ツールでもできないのもありますが…)
Concurrent Runner パターンは本当に採用する?(Concurrent Runner パターン)
Concurrent Runner パターンは同一のパイプラインの同時実行を許容するパターンです。
これが有用なケースとして、書籍では以下の 2 ケースが紹介されています。
- 定期的に実行するパイプラインにおいて、あるタイミングの処理が次の処理の開始時刻までに完了しなくとも、次の処理を並行して実行することで遅延を回復できる
- 通常のスケジュール実行とバックフィル実行を同時に実行することで、バックフィル完了を早くすることができる
ここで触れておきたいのは 1 番目のケースなのですが、具体例を出すと、下の図でいえば、1 時開始の処理が 2 時までに完了しなくても、2 時開始の処理をスケジュール通り実行するようなケースです。
こうすることで、処理の遅延を早く解消することを目指しているのですが、実際に採用するかというとなかなか踏み切れないケースが多い気もします。書籍でも懸念点として、
- リソースの枯渇
- 並行処理の副作用(同時に動かしたことで壊れる)
が挙げられているのですが、それ以外にも以下のような理由から個人的にはなかなか踏み切れないかなと思います。
- 処理時間伸長の理由が分からない段階で、同じ処理を並行実行することの危うさ
- 並行処理してしまうと、本来の処理は単独ではどれだけ掛かるか分かりづらくなること
- 実行管理やログ管理の複雑さ
- 同時実行のためのリソース増強(Snowflake でいえば仮想ウェアハウスのサイズアップやマルチクラスター有効化など)
実際、どれぐらい採用されているものなのでしょうか。
Parallel と Concurrent の違い(Parallel Split パターン/Concurrent Runner パターン)
同時に複数の処理を行うという意味を持つ Parallel と Concurrent ですが、書籍のこの章では両方とも出てきます。
- Parallel Split パターンでは "Parallel"
- Concurrent Runner パターンでは "Concurrent"
一般に以下のような区別がされるようですが、文脈による部分も多く、いまいち区別に自信がありません。
| 観点 | Parallel | Concurrent |
|---|---|---|
| 日本語 | 並列 | 並行 |
| 物理/論理 | 物理的(本当に同時実行) | 論理的(コンテキストスイッチによる交互実行 OK) |
| 目的 | 処理時間の短縮 | 同時実行によるリソース効率化・応答時間短縮 |
Parallel Split は目的の 1 つとして複数タスク同時実行による全体の完了時間の短縮が目的、Concurrent Runner のユースケースの 1 つとして遅延時の次処理の開始時刻前倒しなので、上の観点に照らし合わせれば確かにあってそうな気もするのですが。
ワーカープール方式(Aligned Fan-In パターン/Parallel Split パターン)
複数タスクを同時に実行する方法として、大きく以下の 2 つのアプローチがあると思っています。
- ワークフロー型(タスクの先行/後続の関係を明示的に指定して実行)
- ワーカープール型(複数のワーカープロセスをプールしておき、タスクをそれぞれに動的に割り当てていく)
書籍では Airflow を中心に述べているので、前者がメインですね。このアプローチは、Aligned Fan-In パターンでは Scheduling skew、Parallel Split パターンでは Blocked execution と言及されているのですが、タスクを並列実行する場合の処理時間の偏りは問題になりやすいポイントかと思います。
一方、ワーカープールの場合はあるワーカーの処理が遅延していても、別のワーカーが別のタスクを巻き取るので、この偏りをある程度軽減することが可能です。
上の図の例では、ワーカー (B) に割り当てられたタスク (2) の処理時間が伸びているのですが、その他のワーカーであるワーカー (A)/(C) が他のタスクを動的に巻き取ることで、全体の処理時間が延びることを軽減します。
(例えば、Parallel Split で 3 つに分岐させ、それぞれに (1) & (2)、(3) & (4)、(5) と事前にタスクを割り振ると、全体の処理時間は伸びる)
一般には、複数のタスクを 1 つのキューに入れておき、ワーカーはキューからタスクを 1 つ取得して処理、それが完了したら再びキューからタスクを取得して処理を繰り返すことになります。(キューが空になったら終了)
特に大量のタスクを捌く必要があるシステムでは、ワークフロー型とワーカープール型を組み合わせて実現していくことが多いかなと思います。



