Airflow における CeleryExecutor

  • このエントリーでは Airflow の Executor のうち CeleryExecutor の概要について扱います
  • Airflow 1.10.2 の CeleryExecutor では当稿執筆現在依存性の問題が発生しています
    • Airflow では以下のように指定されています celery>=4.1.1, <4.2.0
    • Celery 4.1.1 では以下のように指定されています redis>=2.10.5, kombu >= 4.0.2
    • kombu の最新版である 4.4.0 では以下のように指定されています redis>=3.2.0
    • kombu にも事情があるようなので状況的にしかたないのかもしれません
    • この問題を回避するためには pip install apache-airflow[celery] した後に pip install 'kombu==4.3.0' してダウングレードする必要があるようです
    • 別件ですが、webserver が依存している tornado も非互換の変更があったようで pip install 'tornado>=5.1.1,<6' してダウングレードする必要があります
  • celery のドキュメントは v4.1.1 がなく v4.1.0 と v4.2.0 があるので v4.1.0 を参照します

Celery とは

Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task the client adds a message to the queue, the broker then delivers that message to a worker.

Celeryはメッセージを介して通信し、通常はブローカーを使用してクライアントとワーカーの間の仲介を行います。 クライアントがタスクを開始するためにメッセージをキューに追加すると、ブローカーはそのメッセージをワーカーに配信します。

Celery は broker としては以下が利用可能です

  • RabbitMQ
  • Redis
  • Amazon SQS
  • Zookeeper (Experimental)

Google Cloud Composer における CeleryExecutor

Redis を broker として採用しています

Scheduler, Redis, Worker の関係は以下のリンクにある図を見ていただくと一目瞭然です

Scheduler が Redis に message を送り、それが queue となって Worker が処理します

Celery の monitoring tool として Flower がありますが、Google Cloud Composer では通常の手順で構築した環境で Flower は起動していません

以下の手順で Worker 上に起動することで利用可能です

CeleryExecutor の実行

各 Executor の実行時は execute_asyn メソッドが呼び出されますが、SeleryExecutor では内部的に Celery の task デコレーターを利用し、最終的に apply_async メソッドが呼び出されています

これにより message が queue に積まれます

