Help us understand the problem. What is going on with this article?

Airflowのwebserverと、読み込みに時間がかかるDAGの話

クリスマスも近いので、Airflowのwebserverの話をしましょう。

言いたいこと

  • 読み込みに時間がかかるDAGがあると
    • そのDAGが読み込めないかもよ
    • Airflowの管理画面にアクセス出来ないかもよ
  • タスク数だけなら割と多くても大丈夫だよ
    • Componserのデフォルト設定でも、千程度はいける?
  • 未来は明るいよ

Airflowを構成するモジュール

今回の主役のwebserverは、Airflowのモジュールの一つです。
他のモジュールには、

  • scheduler
    • DAG/Taskのスケジューリング・状態の管理
  • worker
    • タスクインスタンスを実際に実行
  • database
    • TaskInstance・DagRun・xcom・Variableなどの保持
  • executor
    • スケジュールされたタスクをworkerに配分

などがあります。
詳しくは、Astronomerさんの記事がわかりやすいです。

webserverとは

webserverは、管理画面(下図)・CLIコマンド(の一部)APIなどを担当するモジュールで、

  • DAG/task instanceなどの状況把握
  • DAG/task instanceの再実行や、ステータスの変更

などの処理を受け付けます。

内部的には、Flask+Gunicornの構成となっており、画面からのエンドポイントはここらへんに定義されています。

Airflow UI
(図はAirflow公式ページより)

読み込みに時間がかかるDAG問題

webserverは、リクエストを受け付けるだけではなく、DAGファイルを定期的に読み込みます。

その結果、DAGファイルの読み込みに時間がかかると、

  • 該当のDAGが読み込めない
  • webserverへのアクセスが重くなる
  • アクセスすら出来ない(エラーページ表示)

ことがあり、

などでも注意喚起?されています。

読み込みに時間がかかる?

DAGの読み込みに時間がかかるのと、DAGRunの実行に時間がかかるのは、混同しそうですが別の話で、今回問題にしているのは前者です。

例を出すと、これは読み込みに時間がかかるDAGで、

    sleep(10000000)
    start = DummyOperator(task_id='start')

これはDAGRunの実行に時間がかかるDAGです。

        def hoge():
            sleep(1000000)
        slow_task = PythonOperator(
            task_id='query_' + str(i),
            python_callable=hoge,
        )

タスク数が多かったり、タスクの外で外部にアクセスしていると、読み込みが遅くなる可能性があります。

webserverがDAGをパースする流れ

細かい流れが気になる人向けに:

  1. webserver起動時に、定期的に(※)子プロセス(gunicorn worker)を再起動するように設定
  2. エンドポイントのファイルのロード時にDagBagオブジェクトが作られる
  3. DagBagオブジェクトが作られる中で、DAGファイルがパースされる

※ 具体的にはworker_refresh_interval秒

タスク数との関係

Cloud Composer(Airflow 1.10.2)・BigQueryOperatorのみのDAGで試したところ:

  • DAGあたりに1000タスクくらいでは、デフォルト設定でも表示が出来る
  • DAGあたりに3000タスクくらいになると、デフォルト設定では表示ができなくなる
    • webserverが重くてもSchedulerやWorkerは動きます(Stackdriverで確認)
    • タイムアウト(worker_refresh_interval)を伸ばしたり、読み込みを非同期(async_dagbag_loader)にするといける

なお、 Graph ViewやTree Viewだけが重い時は、default_dag_run_display_numberを変えるといいらしいです。

明るい未来の話

この「読み込みに時間がかかるDAG」に関しては、改善がいくつか提案されています。

Cloud Composerでは、webserver上のDAGの読み込みを非同期にするオプションが実装されており、Airflow1.10.4にも移植されています

まだドラフトですが、AIP-24 DAG Persistence in DB using JSON for Airflow Webserver and (optional) Schedulerという提案は、より大幅な変更で、

  • webserverでのDAGのパースはやめる
  • schedulerがDAGをパースし、シリアライズ結果をDBに入れる
  • webserverは、その結果を使う

オプションを提案しています。
(webserverが状態を持っているのが、そもそも良くないよねという話もあるらしい

Cloud Compserのwebserver

Cloud Composerのwebserverに関してのメモです:

ちなみに、Astronomer.ioの方はvCPU・メモリのサイズを変えられます。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away