Posted at

Airflow における CeleryExecutor


はじめに


  • このエントリーでは Airflow の Executor のうち CeleryExecutor の概要について扱います

  • Airflow 1.10.2 の CeleryExecutor では当稿執筆現在依存性の問題が発生しています


    • Airflow では以下のように指定されています celery>=4.1.1, <4.2.0

    • Celery 4.1.1 では以下のように指定されています redis>=2.10.5, kombu >= 4.0.2

    • kombu の最新版である 4.4.0 では以下のように指定されています redis>=3.2.0

    • kombu にも事情があるようなので状況的にしかたないのかもしれません

    • この問題を回避するためには pip install apache-airflow[celery] した後に pip install 'kombu==4.3.0' してダウングレードする必要があるようです

    • 別件ですが、webserver が依存している tornado も非互換の変更があったようで pip install 'tornado>=5.1.1,<6' してダウングレードする必要があります



  • celery のドキュメントは v4.1.1 がなく v4.1.0 と v4.2.0 があるので v4.1.0 を参照します


Celery とは


Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task the client adds a message to the queue, the broker then delivers that message to a worker.


Celeryはメッセージを介して通信し、通常はブローカーを使用してクライアントとワーカーの間の仲介を行います。 クライアントがタスクを開始するためにメッセージをキューに追加すると、ブローカーはそのメッセージをワーカーに配信します。

http://docs.celeryproject.org/en/v4.1.0/getting-started/introduction.html#what-s-a-task-queue

Celery は broker としては以下が利用可能です


  • RabbitMQ

  • Redis

  • Amazon SQS

  • Zookeeper (Experimental)

http://docs.celeryproject.org/en/v4.1.0/getting-started/brokers/index.html


Google Cloud Composer における CeleryExecutor

Redis を broker として採用しています

Scheduler, Redis, Worker の関係は以下のリンクにある図を見ていただくと一目瞭然です

Scheduler が Redis に message を送り、それが queue となって Worker が処理します

https://cloud.google.com/composer/docs/concepts/overview?hl=ja#architecture

Celery の monitoring tool として Flower がありますが、Google Cloud Composer では通常の手順で構築した環境で Flower は起動していません

以下の手順で Worker 上に起動することで利用可能です

https://cloud.google.com/composer/docs/how-to/using/installing-python-dependencies?hl=ja#connecting_to_the_flower_web_interface

https://flower.readthedocs.io/en/latest/


CeleryExecutor の実行

各 Executor の実行時は execute_asyn メソッドが呼び出されますが、SeleryExecutor では内部的に Celery の task デコレーターを利用し、最終的に apply_async メソッドが呼び出されています

これにより message が queue に積まれます

https://github.com/apache/airflow/blob/1.10.2/airflow/executors/celery_executor.py#L46-L86

http://docs.celeryproject.org/en/v4.1.0/reference/celery.app.task.html?highlight=apply_async#celery.app.task.Task.apply_async