クリスマスも近いので、Airflowのwebserverの話をしましょう。
言いたいこと
- 読み込みに時間がかかるDAGがあると
- そのDAGが読み込めないかもよ
- Airflowの管理画面にアクセス出来ないかもよ
- タスク数だけなら割と多くても大丈夫だよ
- Componserのデフォルト設定でも、千程度はいける?
- 未来は明るいよ
Airflowを構成するモジュール
今回の主役のwebserverは、Airflowのモジュールの一つです。
他のモジュールには、
- scheduler
- DAG/Taskのスケジューリング・状態の管理
- worker
- タスクインスタンスを実際に実行
- database
- TaskInstance・DagRun・xcom・Variableなどの保持
- executor
- スケジュールされたタスクをworkerに配分
などがあります。
詳しくは、Astronomerさんの記事がわかりやすいです。
webserverとは
webserverは、管理画面(下図)・CLIコマンド(の一部)・APIなどを担当するモジュールで、
- DAG/task instanceなどの状況把握
- DAG/task instanceの再実行や、ステータスの変更
などの処理を受け付けます。
内部的には、Flask+Gunicornの構成となっており、画面からのエンドポイントはここらへんに定義されています。
(図はAirflow公式ページより)
読み込みに時間がかかるDAG問題
webserverは、リクエストを受け付けるだけではなく、DAGファイルを定期的に読み込みます。
その結果、DAGファイルの読み込みに時間がかかると、
- 該当のDAGが読み込めない
- webserverへのアクセスが重くなる
- アクセスすら出来ない(エラーページ表示)
ことがあり、
などでも注意喚起?されています。
読み込みに時間がかかる?
DAGの読み込みに時間がかかるのと、DAGRunの実行に時間がかかるのは、混同しそうですが別の話で、今回問題にしているのは前者です。
例を出すと、これは読み込みに時間がかかるDAGで、
sleep(10000000)
start = DummyOperator(task_id='start')
これはDAGRunの実行に時間がかかるDAGです。
def hoge():
sleep(1000000)
slow_task = PythonOperator(
task_id='query_' + str(i),
python_callable=hoge,
)
タスク数が多かったり、タスクの外で外部にアクセスしていると、読み込みが遅くなる可能性があります。
webserverがDAGをパースする流れ
細かい流れが気になる人向けに:
- webserver起動時に、定期的に(※)[子プロセス(gunicorn worker)を再起動]
(https://github.com/apache/airflow/blob/4a344f13d26ecbb627bb9968895b290bfd86e4da/airflow/cli/commands/webserver_command.py#L146)するように設定 - エンドポイントのファイルのロード時に[DagBagオブジェクトが作られる]
(https://github.com/apache/airflow/blob/cb8b2a1dc64c3ea6ba445893c65c6c953dfb476a/airflow/www/views.py#L92) - DagBagオブジェクトが作られる中で、DAGファイルがパースされる
※ 具体的にはworker_refresh_interval秒
タスク数との関係
Cloud Composer(Airflow 1.10.2)・BigQueryOperatorのみのDAGで試したところ:
- DAGあたりに1000タスクくらいでは、デフォルト設定でも表示が出来る
- DAGあたりに3000タスクくらいになると、デフォルト設定では表示ができなくなる
- webserverが重くてもSchedulerやWorkerは動きます(Stackdriverで確認)
- タイムアウト(worker_refresh_interval)を伸ばしたり、読み込みを非同期(async_dagbag_loader)にするといける
なお、 Graph ViewやTree Viewだけが重い時は、default_dag_run_display_numberを変えるといいらしいです。
明るい未来の話
この「読み込みに時間がかかるDAG」に関しては、改善がいくつか提案されています。
Cloud Composerでは、webserver上のDAGの読み込みを非同期にするオプションが実装されており、Airflow1.10.4にも移植されています。
まだドラフトですが、AIP-24 DAG Persistence in DB using JSON for Airflow Webserver and (optional) Schedulerという提案は、より大幅な変更で、
- webserverでのDAGのパースはやめる
- schedulerがDAGをパースし、シリアライズ結果をDBに入れる
- webserverは、その結果を使う
オプションを提案しています。
(webserverが状態を持っているのが、そもそも良くないよねという話もあるらしい)
Cloud Compserのwebserver
Cloud Composerのwebserverに関してのメモです:
- テナントプロジェクト(顧客管理外のプロジェクト)・CPU2コアのGAE Flexibleで動く
- スペックは変えられない…
- GKEクラスタの方にwebserverを移すことが出来る
- Airflow設定変えてもダメなら、試す価値はあるかも
ちなみに、Astronomer.ioの方はvCPU・メモリのサイズを変えられます。