恥ずかしながら最近になって知ったワークフローエンジン Apache Airflow。日本語の紹介記事もちらほら出てきていますが、公式ドキュメントをちょっとずつ抄訳しながら読んでいこうと思います。
13回目の今回はCeleryエグゼキューター(Celery Executor)。
バージョン2.3.3時点のものです。
Celeryエグゼキューター(Celery Executor)
CeleryExecutor
はワーカー・プロセス数をスケールアウトするための方法の1つです。使用するにはCeleryバックエンド(RabbitMQやRedisなど)をセットアップし、 airflow.cfg
のexecutorパラメータをCeleryExecutor
へと変更し、Celery関連の設定を追記する必要があります。
Celeryブローカーをセットアップするための情報については、これを徹底的に解説したCeleryのドキュメントを参照してください。
ワーカーを稼動させるためにいくつか必要不可欠な要件があります:
-
airflow
がインストールされており、CLIにパスが通っている必要があります。 - Airflowの構成(コンフィギュレーター)がクラスター間で同一である必要があります。
- ワーカーで実行されるオペレーターから、それらが依存するリソースにアクセスできなくてはなりません。例えば、
HiveOperator
を使う場合、Hive CLIがワーカー・ノード上で予め利用可能になっている必要があります。あるいはまたMySqlOperator
を使う場合、当該オペレーターが必要とするPythonライブラリがワーカー・ノード上の PYTHONPATH 上で利用可能になっている必要があります。 - ワーカーは
DAGS_FOLDER
にアクセスできなくてはなりません。そしてその内容も同期しておかなくてはなりません。一般的な手法としてはDAGS_FOLDER
をGitリポジトリで管理し、Chef、Puppet、Ansibleなど何かしらの構成管理ツールで同期させることがあります。あるいは、すべてのワーカー・ノードで共通のマウントポイントを利用してDAGファイルを共有する方法でもうまくいくでしょう。
ワーカーを起動するために、Airflowをセットアップし、workerサブコマンドを実行します:
airflow celery worker
ワーカーは起動すると速やかにタスクの取得を始めます。ワーカーを停止するには次のコマンドを実行します:
airflow celery stop
このコマンドはCeleryのドキュメントで推奨されている方法でCeleryのメイン・プロセスに対してSIGTERM
シグナルを送信して、ワーカーの停止を試みます。
Celery Flowerを実行することもできます。これはCelery上に構築されたWeb UIであり、ワーカー群の状態を監視するためのものです。Flowerサーバーを起動するためのショートカット・コマンドは:
airflow celery flower
flower
のPythonライブラリがインストールされている必要がある点に注意してください。Airflowが提供するCeleryバンドルをインストールするのがおすすめです:
pip install 'apache-airflow[celery]'
いくつかの注意点:
- データベースを使用してタスク結果を保存する構成になっていることを確認してください(Celeryにはタスク結果の保存方法として複数のオプションが存在します)。
-
[celery_broker_transport_options]
セクションの設定で長時間実行されるタスクのETA(タスクの実行日時)を上回る可視性タイムアウトが指定されていることを確認してください。 -
[worker_umask]
セクションの設定でワーカー群により新たに作成されるファイルのパーミンション設定が指定されていることを確認してください。 - タスク実行にともないリソースが消費されます。
worker_concurrency
(個々のワーカーがどれくらいのタスクを捌くかを決める)に基づき生成されるタスク数に対して十分なリソースをワーカー・ノード上に確保してください。 - キュー名称は256文字までに制限されています。しかし各ブローカー・バックエンドは独自の(より厳しい)制限を課す可能性があります。
PythonとAirflowがどのようにモジュールを管理しているかについて詳しくはモジュール管理をご覧ください。
アーキテクチャ
Airflowはいくつかのコンポーネントからなります:
- ワーカー群 - 割り当てられたタスクを実行します。
- スケジューラー - 実行するタスクをキューに追加する責任を負います。
- Web サーバー - このHTTPサーバーはDAG定義情報およびタスク・ステータス情報へのアクセスを提供します。
- データベース - タスク・ステータス、DAG定義、各種の変数、コネクション、その他の情報を保持します。
- Celery - キュー機能を提供します。
Celeryのキューは2つのコンポーネントからなります:
- ブローカー(Broker) - 実行するコマンドを管理します。
- 結果バックエンド(Result backend) - 完了したコマンドのステータスを管理します。
これらのコンポーネント群がさまざまな状況で相互にやり取りをします:
- [1] Webサーバー→ワーカー群 - タスク実行ログを取得します。
- [2] Webサーバー→DAGファイル群 - DAGの内容を参照します。
- [3] Webサーバー→データベース - タスク群のステータスを取得します。
- [4] ワーカー群→DAGファイル群 - DAGの内容を参照しタスクを実行します。
- [5] ワーカー群→データベース - 接続設定や各種の変数、XCOMの情報を参照・更新します。
- [6] ワーカー群→Celeryの結果バックエンド - タスクのステータスを保存します。
- [7] ワーカー群→Celeryのブローカー - 実行するコマンドの情報を保存します。
- [8] スケジューラー→DAGファイル群 -DAGの内容を参照しタスクを実行します。
- [9] スケジューラー→データベース - DAG Runとそこに含まれるタスク群の情報を保存します。
- [10] スケジューラー→Celery’の結果バックエンド - 完了したタスクのステータスを取得します。
- [11] スケジューラー→Celeryのブローカー - 実行すべきコマンドの情報を登録します。
タスク実行プロセス
タスク実行プロセスのシーケンス図です。
はじめに3つのプロセスが動いています:
- スケジューラー・プロセス(SchedulerProcess) - タスク群をCeleryExecutorを使って実行します。
- ワーカー・プロセス(WorkerProcess) - 新しいタスクがキューに追加されるのを監視します。
- ワーカー子プロセス(WorkerChildProcess) - 新しいタスクを待機します。
2つのデータベースも稼働しています:
- キュー・ブローカー(QueueBroker)
- 結果バックエンド(ResultBackend)
処理の過程で、2つのプロセスが新たに生成されます:
- ローカル・タスク・ジョブ・プロセス(LocalTaskJobProcess) - ローカル・タスク・ジョブ(LocalTaskJob)のコードを実行します。このジョブはロー・タスク・プロセスを監視しています。新しいプロセス群はTaskRunnerを使用して起動されます。
- ロー・タスク・プロセス(RawTaskProcess) - ユーザー・コード(例えば
execute()
が実行されるプロセスです。
これらが次のように協調してはたらきます:
- [1] スケジューラー・プロセス(SchedulerProcess) はタスク群を処理し実行すべきものがあればそれを キュー・ブローカー(QueueBroker) に送信します。
- [2] スケジューラー・プロセス(SchedulerProcess) は 結果バックエンド(ResultBackend) に対して定期的に問い合わせを行いタスクのステータスを確認します。
- [3] キュー・ブローカー(QueueBroker) はタスクの追加を検知すると、その情報をワーカー・プロセス(WorkerProcess)の1つに送信します。
- [4] ワーカー・プロセス(WorkerProcess) はタスクを ワーカー子プロセス(WorkerChildProcess) に対して1対1で割り当てます。
- [5] ワーカー子プロセス(WorkerChildProcess) は
execute_command()
関数などを使用して当該タスクを実行します。 このとき新しいプロセス、ローカル・タスク・ジョブ・プロセス(LocalTaskJobProcess)が生成されます。 - [6] ローカル・タスク・ジョブ・プロセス(LocalTaskJobProcess)のコードは
LocalTaskJob
クラスに記述さています。このコードはTaskRunnerを使って新しいプロセスを生成します。 - [7][8] 処理が終わると ロー・タスク・プロセス(RawTaskProcess) と ローカル・タスク・ジョブ・プロセス(LocalTaskJobProcess) は停止します。
- [10][12] ワーカー子プロセス(WorkerChildProcess) はメイン・プロセス、つまり ワーカー・プロセス(WorkerProcess) に対して、当該タスクの終了と次のタスクが実行可能である旨とを通知します。
- [11] ワーカー・プロセス(WorkerProcess) はステータス情報を 結果バックエンド(ResultBackend) に保存します。
- [13] この時点で **スケジューラー・プロセス(SchedulerProcess)が 結果バックエンド(ResultBackend) にステータスを問い合わせると、当該タスクのステータスの情報が得られます。
複数のキューを運用する
CeleryExecutorを使用する場合、各タスクを送信する相手となるCeleryのキューを指定することができます。BaseOperatorの属性queue
はそれぞれのタスクをどのキューに割り当てるかを決めるものです。デフォルトのキューはairflow.cfg
の[operators]
セクションの中のdefault_queue
で指定します。タスクに明示的にキューが指定されていない場合、このキューが使用されます。
ワーカーは複数のタスク・キューを参照することができます。ワーカーがスタートするとき(airflow celery worker
コマンドを使います)、キューの名称をカンマ区切り(空白文字なし)で指定することができます(例えば airflow celery worker -q spark,quark
みたいに)。ワーカーは紐付けられたキューのみを参照しタスクを取り出します。
リソース観点で(例えば、とても軽量なものであれば大量のタスクであっても1ワーカーで問題なく処理できるとか)、あるいは、環境の観点で(例えば、とても特殊な環境で、セキュリティを担保するために、特定のSparkクラスター内でワーカーを稼動させなくてはならないとか)、ワーカーに上記のような指定をしたいことがありえます。
こうしてエグゼキューターとその動作に必要な基盤構築の説明を読むと・・・
この手の製品をセットアップする上では当たり前のことなのかもしれませんが、そうした経験のない私としては「だいぶ面倒くさいな」と感じてしまいました。。Airflowのセットアップもさることながら、Celeryのセットアップ(これは別記事ですが)にも相当な知識が必要になるからです。
AWSがマネージドなAirflowを提供するのもなるほどとうなずける(その背景となるニーズになるほどとうなずける)、と。