Help us understand the problem. What is going on with this article?

Airflow における CeleryExecutor

More than 1 year has passed since last update.

はじめに

  • このエントリーでは Airflow の Executor のうち CeleryExecutor の概要について扱います
  • Airflow 1.10.2 の CeleryExecutor では当稿執筆現在依存性の問題が発生しています
    • Airflow では以下のように指定されています celery>=4.1.1, <4.2.0
    • Celery 4.1.1 では以下のように指定されています redis>=2.10.5, kombu >= 4.0.2
    • kombu の最新版である 4.4.0 では以下のように指定されています redis>=3.2.0
    • kombu にも事情があるようなので状況的にしかたないのかもしれません
    • この問題を回避するためには pip install apache-airflow[celery] した後に pip install 'kombu==4.3.0' してダウングレードする必要があるようです
    • 別件ですが、webserver が依存している tornado も非互換の変更があったようで pip install 'tornado>=5.1.1,<6' してダウングレードする必要があります
  • celery のドキュメントは v4.1.1 がなく v4.1.0 と v4.2.0 があるので v4.1.0 を参照します

Celery とは

Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task the client adds a message to the queue, the broker then delivers that message to a worker.

Celeryはメッセージを介して通信し、通常はブローカーを使用してクライアントとワーカーの間の仲介を行います。 クライアントがタスクを開始するためにメッセージをキューに追加すると、ブローカーはそのメッセージをワーカーに配信します。

http://docs.celeryproject.org/en/v4.1.0/getting-started/introduction.html#what-s-a-task-queue

Celery は broker としては以下が利用可能です

  • RabbitMQ
  • Redis
  • Amazon SQS
  • Zookeeper (Experimental)

http://docs.celeryproject.org/en/v4.1.0/getting-started/brokers/index.html

Google Cloud Composer における CeleryExecutor

Redis を broker として採用しています

Scheduler, Redis, Worker の関係は以下のリンクにある図を見ていただくと一目瞭然です

Scheduler が Redis に message を送り、それが queue となって Worker が処理します

https://cloud.google.com/composer/docs/concepts/overview?hl=ja#architecture

Celery の monitoring tool として Flower がありますが、Google Cloud Composer では通常の手順で構築した環境で Flower は起動していません

以下の手順で Worker 上に起動することで利用可能です

https://cloud.google.com/composer/docs/how-to/using/installing-python-dependencies?hl=ja#connecting_to_the_flower_web_interface
https://flower.readthedocs.io/en/latest/

CeleryExecutor の実行

各 Executor の実行時は execute_asyn メソッドが呼び出されますが、SeleryExecutor では内部的に Celery の task デコレーターを利用し、最終的に apply_async メソッドが呼び出されています

これにより message が queue に積まれます

https://github.com/apache/airflow/blob/1.10.2/airflow/executors/celery_executor.py#L46-L86
http://docs.celeryproject.org/en/v4.1.0/reference/celery.app.task.html?highlight=apply_async#celery.app.task.Task.apply_async

kysnm
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away