4
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Airflow における CeleryExecutor

Posted at

はじめに

  • このエントリーでは Airflow の Executor のうち CeleryExecutor の概要について扱います
  • Airflow 1.10.2 の CeleryExecutor では当稿執筆現在依存性の問題が発生しています
    • Airflow では以下のように指定されています celery>=4.1.1, <4.2.0
    • Celery 4.1.1 では以下のように指定されています redis>=2.10.5, kombu >= 4.0.2
    • kombu の最新版である 4.4.0 では以下のように指定されています redis>=3.2.0
    • kombu にも事情があるようなので状況的にしかたないのかもしれません
    • この問題を回避するためには pip install apache-airflow[celery] した後に pip install 'kombu==4.3.0' してダウングレードする必要があるようです
    • 別件ですが、webserver が依存している tornado も非互換の変更があったようで pip install 'tornado>=5.1.1,<6' してダウングレードする必要があります
  • celery のドキュメントは v4.1.1 がなく v4.1.0 と v4.2.0 があるので v4.1.0 を参照します

Celery とは

Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task the client adds a message to the queue, the broker then delivers that message to a worker.

Celeryはメッセージを介して通信し、通常はブローカーを使用してクライアントとワーカーの間の仲介を行います。 クライアントがタスクを開始するためにメッセージをキューに追加すると、ブローカーはそのメッセージをワーカーに配信します。

Celery は broker としては以下が利用可能です

  • RabbitMQ
  • Redis
  • Amazon SQS
  • Zookeeper (Experimental)

Google Cloud Composer における CeleryExecutor

Redis を broker として採用しています

Scheduler, Redis, Worker の関係は以下のリンクにある図を見ていただくと一目瞭然です

Scheduler が Redis に message を送り、それが queue となって Worker が処理します

Celery の monitoring tool として Flower がありますが、Google Cloud Composer では通常の手順で構築した環境で Flower は起動していません

以下の手順で Worker 上に起動することで利用可能です

https://cloud.google.com/composer/docs/how-to/using/installing-python-dependencies?hl=ja#connecting_to_the_flower_web_interface
https://flower.readthedocs.io/en/latest/

CeleryExecutor の実行

各 Executor の実行時は execute_asyn メソッドが呼び出されますが、SeleryExecutor では内部的に Celery の task デコレーターを利用し、最終的に apply_async メソッドが呼び出されています

これにより message が queue に積まれます

https://github.com/apache/airflow/blob/1.10.2/airflow/executors/celery_executor.py#L46-L86
http://docs.celeryproject.org/en/v4.1.0/reference/celery.app.task.html?highlight=apply_async#celery.app.task.Task.apply_async

4
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?