4
0

平穏なAirflow DAGsに突如として大量のゾンビが襲いかかってきた

Last updated at Posted at 2023-12-14

忙しい人のためのAirflowゾンビ撃退法

  • Cloud Composerのチューニングの話です。
  • Cloud Composerにはモニタリングという非常にありがたいものが存在します。そこでCPU、メモリが限界まで達していないか確認し、必要に応じてワークロードの構成を変更しましょう。
  • core_concurrency ( = タスクの同時実行数)を調整してもいいかもしれません。

はじめに

こんにちは!Intimate Merger(インティメート・マージャー)のOharaです。

弊社ではワークフロー管理ツールであるAirflowを活用しています。

AirflowはPythonで記述され、豊富なプラグインや柔軟なカスタマイズ性があります。DAG(Directed Acyclic Graph)と呼ばれるタスクの依存関係を定義するグラフ構造を使用して、複雑なワークフローを効果的に管理できます。
弊社ではGoogle CloudのCloud Composerを用いて、Airflowをセットアップ&運用しています。

12日目にAirflowの関連記事がありますので、そちらもぜひ!
Airflowスケジューリングマスターへの最初の一歩

本記事では、そんなAirflowを楽しく使っていた時に突如襲いかかってきたゾンビたちに、どう立ち向かい駆逐していったかを、後世に残していきたいと思います。

前提

当時のversionは以下です。

composer-2.0.24-airflow-2.2.5

また、ワークロードの構成は次の通りでした。

vCPU メモリ ストレージ
スケジューラ 1 vCPU 1.875 GB 1 GB 1
ワーカー 0.5 vCPU 1.875 GB 1 GB 1~3
ウェブサーバー 2 vCPU 3 GB 1 GB -

Cloud Composer、Airflowのバージョンアップに伴い、Airflow1.x系で動かしていたDagsをAirflow2.x系に移行している最中でした。

ゾンビ、第一波襲来

それは深夜2時のこと…悲劇は突然訪れる…

zombie_alert

ギャー😱

複数のタスクが Detected <TaskInstance: hogehoge> as zombie というエラー文とともに異常終了するという現象が起こりました。

原因調査

調査の結果、このzombieは、特定のDAGを追加してから急に発生し始めていることがわかりました。

tasks

その後、開発環境でDAGを実行し、どのくらいの負荷がかかるかを調べた結果、複数のtaskを同時に実行すると当然のことながらtaskの分だけメモリやCPUを大量消費していました。

このDAG自体は、最大24taskを同時に実行するような構成となっています。したがって、対応策として以下の2つが考えられます。

  • メモリやCPUを上げて、24taskの同時実行に耐えうるスペックにする
  • 同時に実行できるtaskの数を減らす

今回はtaskの数を調整するために celery.worker_concurrency に着目しました。

celery.worker_concurrency は、1つのワーカーがタスクキューから選択できるタスクの数を調整できます。

対処

celery.worker_concurrency はAirflow2.2.3以降では、「 3212 * worker_CPU8 * worker_memory の最小値」(Cloud Composer公式ドキュメント参照)と定義されているため、当時の構成ではdefault値は 6 となります。

Airflow公式ドキュメントには 16 とあるのですが、Cloud Composerが適切に設定してくれているようです。実際には airflow.cfg にAirflow構成の詳細があり、こちらを参照するといいでしょう。)

Cloud ComposerでAirflowを立ち上げると自動的に作成されるGoogle Cloud Storageのバケット内に、Airflow構成の詳細が記載された airflow.cfgファイルがあります。

当時対応した際は、参考にしていたこちらの記事

worker_concurrency = 6-8 * core数 または 6-8 * memory/3.75GB

が推奨値であると記載されていたため、メモリが1.875GBであったことを考慮し worker_concurrency4 に設定したところ無事ゾンビ化を抑えることができました。

ただ、DAGが今後増えることが予想されていたので、実際は以下のように変更しました。

【ワーカー】

  • vCPU 0.5 → 1
  • メモリ 1.875 → 3.75

【Airflow構成のオーバーライド】

  • celery.worker_concurrency default (6) → 8

推奨値が書いてあった記事は2020年のものなので、Cloud Composer1, Airflow1.x系に当てはまる内容かもしれません。今回はうまくいきましたが、あくまで参考程度にとどめておいた方が良いかと思います。

こうして、バランスをうまく調整することで大量発生したゾンビたちを駆逐することに成功しました。しかし、その平穏も長くは続かないのです…。

DAGのimportがされない?

ゾンビが消えてから数ヶ月経った後、今度はDAGのimportがタイムアウトするという現象に悩まされるようになりました。

Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/pkg_resources/__init__.py", line 2086, in dist_factory
    lower.endswith('.dist-info') and
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/timeout.py", line 37, in handle_timeout
    raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: DagBag import timeout for /home/airflow/gcs/hogehoge.py after 30.0s.
Please take a look at these docs to improve your DAG import time:
* https://airflow.apache.org/docs/apache-airflow/2.2.5/best-practices.html#top-level-python-code
* https://airflow.apache.org/docs/apache-airflow/2.2.5/best-practices.html#reducing-dag-complexity, PID: 3849
import_error

Airflowの画面を開くと常にDAGのimportエラーが鎮座しています。DAGを実行できないのは困るということで、どうしてタイムアウトしてしまうのかを調査してみました。

原因調査

モニタリングを見てみると、DAGファイルの合計解析時間がどんどん伸び、スケジューラのCPU使用率がほぼ100%で張り付いている状態になっていました。

dag_parse scheduler_cpu

合計解析時間に5分はかかりすぎです。どうやらCPUが足りなさそうに見えています。

このときのスケジューラのスペックは以下の通りです。

vCPU メモリ ストレージ
スケジューラ 1 vCPU 1.875 GB 1 GB 1

対処

次のようにスペックを変更してみました。

【スケジューラ】

  • vCPU 1 → 4
  • メモリ 1.875 → 4 (CPUを上げると、一定数メモリも上げなければならない場合があります)

その結果、スケジューラのCPU使用率が60~70%に抑えられ、すべてのDAGファイルの合計解析時間も驚きの短さになりました。

scheduler_cpu_after dag_parse_after

CPUが上限に張り付く状態を避けることにより、importエラーもなくなり、快適なAirflowライフを取り戻したのでした。

モニタリングを見て、CPUやメモリが足りていなかったら適宜スペックを上げましょう。

忘れたころに訪れる、ゾンビ第二波襲来

そうです。奴らは忘れた頃にやってくるんです。

Detected <TaskInstance: hogehoge> as zombie

ゾンビというものは非常にやっかいですね。

原因調査

とりあえずモニタリングを見てみます。

worker_cpu

なんとなくワーカーのCPUが足りてない時間がありそうです。

実はこのとき、Airflow2.x系へのDAG移行が進み、合計DAG数が400を超えていました。DAGが増えつづけた結果、CPUが限界を迎えたようです。

公式ドキュメントにはDAGの規模に応じて推奨されるプリセットが記載されています。

推奨されるプリセット 合計 DAG DAG同時実行の最大数 task同時実行の最大数
50 15 18
250 60 100
1000 250 400

Airflowの立ち上げ時、小プリセットからスタートし状況に応じてCPUやメモリを上げていましたが、さすがに今のDAG数的には小プリセット+αの規模だと足りなさそうでした。

対処

そこで、ワーカーのみ中プリセットを参考にvCPUを変更しました。

【ワーカー】

  • vCPU 1 → 2

このときメモリは十分に足りていたので変更しておりません。

worker_cpu_after

スペックを上げることにより、再びゾンビを駆逐することに成功しました。
こうして、今では幸せなAirflowライフを謳歌しています。めでたしめでたし。

まとめ

Cloud Composerを使う際のチューニング体験談でした。

基本的にモニタリングを見てCPU、メモリが足りているかどうかを判断し都度スペックをあげていけば解決するかと思います。ですが、スペックを上げるとその分だけ料金もかかってしまうので、場合によっては他のパラメータで調整することも悪くない選択肢かなと思います。

また、詳しいトラブルシューティングは公式がドキュメントを用意してくださっています。Cloud Composer、Airflowのversionによって解決方法が異なるケースも多いので、なにかあったらそちらを見るとよいでしょう。

この記事が、深夜にゾンビに襲われた際の助けになれば幸いです。

あとがき

最後まで読んでくださり、ありがとうございました!

次回の記事もぜひお楽しみに!!

4
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
0