LoginSignup
41
25

More than 5 years have passed since last update.

Celeryでワークフローを扱う

Last updated at Posted at 2018-02-07

普段、Celeryは非同期(終了を待ち合わせない、結果を求めない)で使うことが主ですが、同期(終了を待ち合わせて、結果を求める)でワークフロー的な使い方もできます。

元々、Celeryは非同期タスクの分散コンピューティングに特化していますが、今回は DigdagのPython APIを使う と同じ題材で、ワークフローの検証をしてみます。

用語

公式ドキュメント を読む上で、一般的な用語に対する Celeryの用語は、以下になります。

検証環境

ファイル構成

celery
├── Dockerfile
├── docker-compose.yml
└── proj
    ├── __init__.py
    ├── celery.py
    ├── celeryconfig.py
    ├── myapp
    │   ├── __init__.py
    │   └── workflow.py
    └── tasks
        ├── __init__.py
        └── func_base.py

docker関係

docker-compose.yml

version: '3'
services:
  redis:
    image: redis:latest
    ports:
      - "6379:6379"
  celery:
    build: .
    depends_on:
      - redis
    volumes:
      - ./proj:/home/celery/proj
  • docker-composeで、ブローカーのredis 1台、Celery worker n台の環境を立てます。
  • 本番ではブローカーのredisは、Amazon ElastiCacheのredisにしたり、インスタンス管理がいやであればAmazon SQSにしたりするのがよいかもしれません。

Dockerfile

FROM python:2

RUN apt-get update && apt-get -y install sudo && \
    useradd -m celery && echo "celery:celery" | chpasswd && adduser celery sudo && \
    pip install -U celery[redis]

COPY proj /home/celery/proj
USER celery
WORKDIR /home/celery/
ENV PYTHONPATH $PYTHONPATH:/home/celery/

CMD celery -A proj worker -l info
  • Celery workerの定義です。
  • 諸事情でPython2をベースにしていますが、新規プロジェクトであればPython3の方がいいです。
  • CMD行のCelery workerは簡易的に立ててますが、本番で使うのであれば、デーモン化した方がいいです。

Pythonコード

proj/celery.py

from __future__ import absolute_import
from celery import Celery


app = Celery('proj')
app.config_from_object('proj.celeryconfig')

if __name__ == '__main__':
    app.start()
  • CeleryのApplicationの定義です。

proj/celeryconfig.py

broker_url = 'redis://celery_redis_1:6379/0'
result_backend = 'redis://celery_redis_1:6379/0'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Asia/Tokyo'
enable_utc = True
include = ['proj.tasks']
  • Celeryの設定ファイルとして、外出しにしたものです。
  • 本番であれば、ログ ファイルを定義した方がいいですが、今は標準出力にしています。

proj/tasks/init.py

from .func_base import run, split, subtask

proj/tasks/func_base.py

from __future__ import absolute_import
from proj.celery import app


@app.task
def split():
    task_count = 10
    return task_count

@app.task
def run(task_count):
    for i in range(task_count):
        subtask.delay(i)

@app.task
def subtask(index):
    print("Processing " + str(index))
  • CeleryのTasksの定義です。
  • 公式ドキュメントでは簡易的に tasks.py で書かれていますが、実務的にモジュール分割しています。
  • func_base.py という名前になっているのは、公式ドキュメントでは function baseで書かれていますが、実務では function 1個で済むことはなくて散らかるので、機能別に class baseで書くことが多いからです。これについては後述します。
  • subtask.delay(i) の部分は、後続のタスクがなく、最後は非同期で投げっぱなしにしています。終了を待ち合わせて後続のタスクがある場合は、Groupsで並列処理としてまとめ上げる感じでしょうか。

/proj/myapp/workflow.py

from __future__ import absolute_import
from celery import chain
from proj.tasks import split, run


def main():
    chain(split.s() | run.s())()

if __name__ == '__main__':
    main()
  • CeleryのCanvas(ワークフロー)の定義です。
  • Chainsで直列処理、タスクのパイプライン的なことをしています。
  • Digdagのようにyamlで書けないので、見通しが悪くなるのは残念ですが、分散特化とのトレードオフでしょうか。

実行

ビルドして起動します。

$ docker-compose build

$ docker-compose up -d
Creating celery_redis_1  ... done
Creating celery_redis_1  ... 
Creating celery_celery_1 ... done

$ docker-compose ps
     Name                    Command               State           Ports         
---------------------------------------------------------------------------------
celery_celery_1   /bin/sh -c celery -A proj  ...   Up                            
celery_redis_1    docker-entrypoint.sh redis ...   Up      0.0.0.0:6379->6379/tcp
  • redis 1台、Celery worker 1台が上がっています。

おもむろに、Celery workerを2台にスケールアップして、複数台構成にしてみます。
分散コンピューティングができるかを検証します。

$ docker-compose up -d --scale celery=2
celery_redis_1 is up-to-date
Starting celery_celery_1 ... done
Creating celery_celery_2 ... done

$ docker-compose ps
     Name                    Command               State           Ports         
---------------------------------------------------------------------------------
celery_celery_1   /bin/sh -c celery -A proj  ...   Up                            
celery_celery_2   /bin/sh -c celery -A proj  ...   Up                            
celery_redis_1    docker-entrypoint.sh redis ...   Up      0.0.0.0:6379->6379/tcp
  • ブローカー(タスクキュー)のredis 1台に対して、Celery worker 2台がぶら下がる形となりました。

Celery worker 1台目に入って、Taskが登録されているか確認し、ワークフローを実行してみます。

$ docker exec -it celery_celery_1 /bin/bash

$ celery -A proj inspect registered
-> celery@558097538e09: OK
    * proj.tasks.func_base.run
    * proj.tasks.func_base.split
    * proj.tasks.func_base.subtask
-> celery@449561107d02: OK
    * proj.tasks.func_base.run
    * proj.tasks.func_base.split
    * proj.tasks.func_base.subtask

$ python proj/myapp/workflow.py
  • 今回は Celery worker 1台目から起動していますが、本番では例えばバッチサーバ(またはWebサーバ/APIサーバ)と、Celery worker n台に、同じ projリポジトリをデプロイし、ブローカーのredisを介して繋がるイメージとなります。
    • proj/myapp/ がバッチサーバ(またはWebサーバ/APIサーバ)向け
    • proj/tasks/ がClelery workerサーバ向け
  • または、Celery workerがバッチサーバというのもありです。

ログを確認します。

$ docker-compose logs celery
celery_2  | [2018-02-07 02:11:11,839: INFO/ForkPoolWorker-2] Task proj.tasks.func_base.split[78d0a4a5-2b20-4a97-8015-f3c824221193] succeeded in 0.0402305749885s: 10
celery_2  | [2018-02-07 02:11:11,884: WARNING/ForkPoolWorker-2] Processing 0
celery_2  | [2018-02-07 02:11:11,889: WARNING/ForkPoolWorker-1] Processing 2
celery_2  | [2018-02-07 02:11:11,910: WARNING/ForkPoolWorker-2] Processing 4
celery_2  | [2018-02-07 02:11:11,915: WARNING/ForkPoolWorker-1] Processing 8

celery_1  | [2018-02-07 02:11:11,886: WARNING/ForkPoolWorker-1] Processing 1
celery_1  | [2018-02-07 02:11:11,915: WARNING/ForkPoolWorker-2] Processing 3
celery_1  | [2018-02-07 02:11:11,916: WARNING/ForkPoolWorker-1] Processing 5
celery_1  | [2018-02-07 02:11:11,920: WARNING/ForkPoolWorker-2] Processing 6
celery_1  | [2018-02-07 02:11:11,921: WARNING/ForkPoolWorker-1] Processing 7
celery_1  | [2018-02-07 02:11:11,933: WARNING/ForkPoolWorker-2] Processing 9
  • 一部省略しています。

2台に分散されて実行されていることが確認できました。

Celery 4のClass baseのTaskの書き方

func_base.py という名前になっているのは、公式ドキュメントでは function baseで書かれていますが、実務では function 1個で済むことはなくて散らかるので、機能別に class baseで書くことが多いからです。

と書きましたが、4.0 リリースノートを読むと、開発元ではClass baseの書き方は冷遇されている印象を受けました。

とは言え、Classの方が保守し易いので、紹介してみます。
やらせたいことは、Task.run()に記述します。
書き方は以下の2パターンあるようです。

1.公式ドキュメントのCustom task classesの書き方

proj/tasks/class_base_a.py

from __future__ import absolute_import
from proj.celery import app


class _AddTask(app.Task):

    def run(self, x, y):
        return x + y
add = app.tasks[_AddTask.name]

このとおりに書いても動かないですね。
add = app.tasks[_AddTask.name] の部分で celery.exceptions.NotRegistered になります。
リリースノートの「The Task base class no longer automatically register tasks」が正しくて、ドキュメントが直ってないと見ました。

2.リリースノートWhat’s new in Celery 4.0の書き方

proj/tasks/class_base_b.py

from __future__ import absolute_import
from celery import Task
from proj.celery import app


class CustomTask(Task):
    def run(self):
        print('running')
CustomTask = app.register_task(CustomTask())

よって、init.py は以下のようになります。

proj/tasks/init.py

from .func_base import run, split, subtask
# from .class_base_a import add
from .class_base_b import CustomTask

呼び出し側

1台目に入って、Taskの登録状況を見てみます。

$ docker exec -it celery_celery_1 /bin/bash

$ celery -A proj inspect registered
-> celery@36c2ae29323c: OK
    * proj.tasks.class_base_b.CustomTask
    * proj.tasks.func_base.run
    * proj.tasks.func_base.split
    * proj.tasks.func_base.subtask

2.の書き方だと、今までと違う感じがしてきました。

$ python
>>> from proj.tasks import CustomTask

>>> custom_task = CustomTask()  # 直接実行?
running

>>> CustomTask.delay()  # Celeryに対する非同期起動
<AsyncResult: d30e5e34-1f78-4981-9315-01c2ebb7dc03>

>>> CustomTask.run()    # Celeryを介さない直接実行
running

3.こちらで紹介されている書き方

proj/tasks/class_base_b.py

from __future__ import absolute_import
from celery import Task
from proj.celery import app


class CustomTask(Task):
    def run(self):
        print('running')
app.register_task(CustomTask())

こちらの方が、今までのClass的な使い方に近いと思いました。

$ python
>>> from proj.tasks import CustomTask
>>> custom_task = CustomTask()

>>> custom_task.delay()  # Celeryに対する非同期起動
<AsyncResult: 7fee0dfa-621a-4934-9841-96cfaaee6ac5>

>>> custom_task.run()    # Celeryを介さない直接実行
running

3.の書き方をすれば問題ないと思います。
普段使いは custom_task.delay() で、unittest時やデバッガで止めたい時は custom_task.run() を使えばよいでしょう。

4.追記、書き方4

書き方3は、呼び出し側で app にTaskが登録されていないことがわかりました。

from proj.tasks import CustomTask

custom_task = CustomTask()  # appにTaskが登録されていない

あえて書くとすれば、都度 app.register_task() を行えば、classのインスタンスが返って来るのですが、何回も流れるのは、何か弊害がありそうな気がします。

from proj.tasks import CustomTask

custom_task = app.register_task(CustomTask())  # appにTaskが登録済でclassのインスタンスが返る

app.register_task() のソースコードを読むと、以下のように書いてあります。

This is here for compatibility with old Celery 1.0
style task classes, you should not need to use this for
new projects.

そこまで言われると、新規プロジェクトでは ここ とか ここ にかかれているように、Taskの継承は諦めて(継承元は object とし)、@app.task でfunctionベースで書くのが良さそうです。

proj/tasks/add_task.py
from proj.celery import app


class AddTask(object):
    def run(self, x, y):
        return x + y

@app.task()
def add_task(x, y):
    return AddTask().run(x, y)
proj/tasks/__init__.py
from .add_task import add_task

呼び出し側

from proj.tasks import add_task

add_task.delay(1, 2)  # Celery経由の非同期実行の依頼

add_task.run(1, 2)  # 直接実行。デバッガで止めたい時など。この run は AddTask.run() ではなく、app.task() の run です。

最後、ぐだぐだですみません。

41
25
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
41
25