はじめに
- このエントリーでは Airflow の Executor のうち CeleryExecutor の概要について扱います
- Airflow 1.10.2 の CeleryExecutor では当稿執筆現在依存性の問題が発生しています
- Airflow では以下のように指定されています
celery>=4.1.1, <4.2.0
- Celery 4.1.1 では以下のように指定されています
redis>=2.10.5
,kombu >= 4.0.2
- kombu の最新版である 4.4.0 では以下のように指定されています
redis>=3.2.0
- kombu にも事情があるようなので状況的にしかたないのかもしれません
- この問題を回避するためには
pip install apache-airflow[celery]
した後にpip install 'kombu==4.3.0'
してダウングレードする必要があるようです - 別件ですが、webserver が依存している tornado も非互換の変更があったようで
pip install 'tornado>=5.1.1,<6'
してダウングレードする必要があります
- Airflow では以下のように指定されています
- celery のドキュメントは v4.1.1 がなく v4.1.0 と v4.2.0 があるので v4.1.0 を参照します
Celery とは
Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task the client adds a message to the queue, the broker then delivers that message to a worker.
Celeryはメッセージを介して通信し、通常はブローカーを使用してクライアントとワーカーの間の仲介を行います。 クライアントがタスクを開始するためにメッセージをキューに追加すると、ブローカーはそのメッセージをワーカーに配信します。
Celery は broker としては以下が利用可能です
- RabbitMQ
- Redis
- Amazon SQS
- Zookeeper (Experimental)
Google Cloud Composer における CeleryExecutor
Redis を broker として採用しています
Scheduler, Redis, Worker の関係は以下のリンクにある図を見ていただくと一目瞭然です
Scheduler が Redis に message を送り、それが queue となって Worker が処理します
Celery の monitoring tool として Flower がありますが、Google Cloud Composer では通常の手順で構築した環境で Flower は起動していません
以下の手順で Worker 上に起動することで利用可能です
https://cloud.google.com/composer/docs/how-to/using/installing-python-dependencies?hl=ja#connecting_to_the_flower_web_interface
https://flower.readthedocs.io/en/latest/
CeleryExecutor の実行
各 Executor の実行時は execute_asyn メソッドが呼び出されますが、SeleryExecutor では内部的に Celery の task デコレーターを利用し、最終的に apply_async メソッドが呼び出されています
これにより message が queue に積まれます
https://github.com/apache/airflow/blob/1.10.2/airflow/executors/celery_executor.py#L46-L86
http://docs.celeryproject.org/en/v4.1.0/reference/celery.app.task.html?highlight=apply_async#celery.app.task.Task.apply_async