普段、Celeryは非同期(終了を待ち合わせない、結果を求めない)で使うことが主ですが、同期(終了を待ち合わせて、結果を求める)でワークフロー的な使い方もできます。
元々、Celeryは非同期タスクの分散コンピューティングに特化していますが、今回は DigdagのPython APIを使う と同じ題材で、ワークフローの検証をしてみます。
用語
公式ドキュメント を読む上で、一般的な用語に対する Celeryの用語は、以下になります。
- タスクのキュー:Brokers
- ワークフロー:[Canvas](Canvas: Designing Work-flows)
検証環境
ファイル構成
celery
├── Dockerfile
├── docker-compose.yml
└── proj
├── __init__.py
├── celery.py
├── celeryconfig.py
├── myapp
│ ├── __init__.py
│ └── workflow.py
└── tasks
├── __init__.py
└── func_base.py
docker関係
docker-compose.yml
version: '3'
services:
redis:
image: redis:latest
ports:
- "6379:6379"
celery:
build: .
depends_on:
- redis
volumes:
- ./proj:/home/celery/proj
- docker-composeで、ブローカーのredis 1台、Celery worker n台の環境を立てます。
- 本番ではブローカーのredisは、Amazon ElastiCacheのredisにしたり、インスタンス管理がいやであればAmazon SQSにしたりするのがよいかもしれません。
Dockerfile
FROM python:2
RUN apt-get update && apt-get -y install sudo && \
useradd -m celery && echo "celery:celery" | chpasswd && adduser celery sudo && \
pip install -U celery[redis]
COPY proj /home/celery/proj
USER celery
WORKDIR /home/celery/
ENV PYTHONPATH $PYTHONPATH:/home/celery/
CMD celery -A proj worker -l info
- Celery workerの定義です。
- 諸事情でPython2をベースにしていますが、新規プロジェクトであればPython3の方がいいです。
- CMD行のCelery workerは簡易的に立ててますが、本番で使うのであれば、デーモン化した方がいいです。
Pythonコード
proj/celery.py
from __future__ import absolute_import
from celery import Celery
app = Celery('proj')
app.config_from_object('proj.celeryconfig')
if __name__ == '__main__':
app.start()
- CeleryのApplicationの定義です。
proj/celeryconfig.py
broker_url = 'redis://celery_redis_1:6379/0'
result_backend = 'redis://celery_redis_1:6379/0'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Asia/Tokyo'
enable_utc = True
include = ['proj.tasks']
- Celeryの設定ファイルとして、外出しにしたものです。
- 本番であれば、ログ ファイルを定義した方がいいですが、今は標準出力にしています。
proj/tasks/init.py
from .func_base import run, split, subtask
proj/tasks/func_base.py
from __future__ import absolute_import
from proj.celery import app
@app.task
def split():
task_count = 10
return task_count
@app.task
def run(task_count):
for i in range(task_count):
subtask.delay(i)
@app.task
def subtask(index):
print("Processing " + str(index))
- CeleryのTasksの定義です。
- 公式ドキュメントでは簡易的に tasks.py で書かれていますが、実務的にモジュール分割しています。
- func_base.py という名前になっているのは、公式ドキュメントでは function baseで書かれていますが、実務では function 1個で済むことはなくて散らかるので、機能別に class baseで書くことが多いからです。これについては後述します。
- subtask.delay(i) の部分は、後続のタスクがなく、最後は非同期で投げっぱなしにしています。終了を待ち合わせて後続のタスクがある場合は、Groupsで並列処理としてまとめ上げる感じでしょうか。
/proj/myapp/workflow.py
from __future__ import absolute_import
from celery import chain
from proj.tasks import split, run
def main():
chain(split.s() | run.s())()
if __name__ == '__main__':
main()
- CeleryのCanvas(ワークフロー)の定義です。
- Chainsで直列処理、タスクのパイプライン的なことをしています。
- Digdagのようにyamlで書けないので、見通しが悪くなるのは残念ですが、分散特化とのトレードオフでしょうか。
実行
ビルドして起動します。
$ docker-compose build
$ docker-compose up -d
Creating celery_redis_1 ... done
Creating celery_redis_1 ...
Creating celery_celery_1 ... done
$ docker-compose ps
Name Command State Ports
---------------------------------------------------------------------------------
celery_celery_1 /bin/sh -c celery -A proj ... Up
celery_redis_1 docker-entrypoint.sh redis ... Up 0.0.0.0:6379->6379/tcp
- redis 1台、Celery worker 1台が上がっています。
おもむろに、Celery workerを2台にスケールアップして、複数台構成にしてみます。
分散コンピューティングができるかを検証します。
$ docker-compose up -d --scale celery=2
celery_redis_1 is up-to-date
Starting celery_celery_1 ... done
Creating celery_celery_2 ... done
$ docker-compose ps
Name Command State Ports
---------------------------------------------------------------------------------
celery_celery_1 /bin/sh -c celery -A proj ... Up
celery_celery_2 /bin/sh -c celery -A proj ... Up
celery_redis_1 docker-entrypoint.sh redis ... Up 0.0.0.0:6379->6379/tcp
- ブローカー(タスクキュー)のredis 1台に対して、Celery worker 2台がぶら下がる形となりました。
Celery worker 1台目に入って、Taskが登録されているか確認し、ワークフローを実行してみます。
$ docker exec -it celery_celery_1 /bin/bash
$ celery -A proj inspect registered
-> celery@558097538e09: OK
* proj.tasks.func_base.run
* proj.tasks.func_base.split
* proj.tasks.func_base.subtask
-> celery@449561107d02: OK
* proj.tasks.func_base.run
* proj.tasks.func_base.split
* proj.tasks.func_base.subtask
$ python proj/myapp/workflow.py
- 今回は Celery worker 1台目から起動していますが、本番では例えばバッチサーバ(またはWebサーバ/APIサーバ)と、Celery worker n台に、同じ projリポジトリをデプロイし、ブローカーのredisを介して繋がるイメージとなります。
- proj/myapp/ がバッチサーバ(またはWebサーバ/APIサーバ)向け
- proj/tasks/ がClelery workerサーバ向け
- または、Celery workerがバッチサーバというのもありです。
ログを確認します。
$ docker-compose logs celery
celery_2 | [2018-02-07 02:11:11,839: INFO/ForkPoolWorker-2] Task proj.tasks.func_base.split[78d0a4a5-2b20-4a97-8015-f3c824221193] succeeded in 0.0402305749885s: 10
celery_2 | [2018-02-07 02:11:11,884: WARNING/ForkPoolWorker-2] Processing 0
celery_2 | [2018-02-07 02:11:11,889: WARNING/ForkPoolWorker-1] Processing 2
celery_2 | [2018-02-07 02:11:11,910: WARNING/ForkPoolWorker-2] Processing 4
celery_2 | [2018-02-07 02:11:11,915: WARNING/ForkPoolWorker-1] Processing 8
celery_1 | [2018-02-07 02:11:11,886: WARNING/ForkPoolWorker-1] Processing 1
celery_1 | [2018-02-07 02:11:11,915: WARNING/ForkPoolWorker-2] Processing 3
celery_1 | [2018-02-07 02:11:11,916: WARNING/ForkPoolWorker-1] Processing 5
celery_1 | [2018-02-07 02:11:11,920: WARNING/ForkPoolWorker-2] Processing 6
celery_1 | [2018-02-07 02:11:11,921: WARNING/ForkPoolWorker-1] Processing 7
celery_1 | [2018-02-07 02:11:11,933: WARNING/ForkPoolWorker-2] Processing 9
- 一部省略しています。
2台に分散されて実行されていることが確認できました。
Celery 4のClass baseのTaskの書き方
func_base.py という名前になっているのは、公式ドキュメントでは function baseで書かれていますが、実務では function 1個で済むことはなくて散らかるので、機能別に class baseで書くことが多いからです。
と書きましたが、4.0 リリースノートを読むと、開発元ではClass baseの書き方は冷遇されている印象を受けました。
とは言え、Classの方が保守し易いので、紹介してみます。
やらせたいことは、Task.run()に記述します。
書き方は以下の2パターンあるようです。
1.公式ドキュメントのCustom task classesの書き方
proj/tasks/class_base_a.py
from __future__ import absolute_import
from proj.celery import app
class _AddTask(app.Task):
def run(self, x, y):
return x + y
add = app.tasks[_AddTask.name]
このとおりに書いても動かないですね。
add = app.tasks[_AddTask.name]
の部分で celery.exceptions.NotRegistered
になります。
リリースノートの「The Task base class no longer automatically register tasks」が正しくて、ドキュメントが直ってないと見ました。
2.リリースノートWhat’s new in Celery 4.0の書き方
proj/tasks/class_base_b.py
from __future__ import absolute_import
from celery import Task
from proj.celery import app
class CustomTask(Task):
def run(self):
print('running')
CustomTask = app.register_task(CustomTask())
よって、init.py は以下のようになります。
proj/tasks/init.py
from .func_base import run, split, subtask
# from .class_base_a import add
from .class_base_b import CustomTask
呼び出し側
1台目に入って、Taskの登録状況を見てみます。
$ docker exec -it celery_celery_1 /bin/bash
$ celery -A proj inspect registered
-> celery@36c2ae29323c: OK
* proj.tasks.class_base_b.CustomTask
* proj.tasks.func_base.run
* proj.tasks.func_base.split
* proj.tasks.func_base.subtask
2.の書き方だと、今までと違う感じがしてきました。
$ python
>>> from proj.tasks import CustomTask
>>> custom_task = CustomTask() # 直接実行?
running
>>> CustomTask.delay() # Celeryに対する非同期起動
<AsyncResult: d30e5e34-1f78-4981-9315-01c2ebb7dc03>
>>> CustomTask.run() # Celeryを介さない直接実行
running
3.こちらで紹介されている書き方
proj/tasks/class_base_b.py
from __future__ import absolute_import
from celery import Task
from proj.celery import app
class CustomTask(Task):
def run(self):
print('running')
app.register_task(CustomTask())
こちらの方が、今までのClass的な使い方に近いと思いました。
$ python
>>> from proj.tasks import CustomTask
>>> custom_task = CustomTask()
>>> custom_task.delay() # Celeryに対する非同期起動
<AsyncResult: 7fee0dfa-621a-4934-9841-96cfaaee6ac5>
>>> custom_task.run() # Celeryを介さない直接実行
running
3.の書き方をすれば問題ないと思います。
普段使いは custom_task.delay()
で、unittest時やデバッガで止めたい時は custom_task.run()
を使えばよいでしょう。
4.追記、書き方4
書き方3は、呼び出し側で app にTaskが登録されていないことがわかりました。
from proj.tasks import CustomTask
custom_task = CustomTask() # appにTaskが登録されていない
あえて書くとすれば、都度 app.register_task() を行えば、classのインスタンスが返って来るのですが、何回も流れるのは、何か弊害がありそうな気がします。
from proj.tasks import CustomTask
custom_task = app.register_task(CustomTask()) # appにTaskが登録済でclassのインスタンスが返る
app.register_task() のソースコードを読むと、以下のように書いてあります。
This is here for compatibility with old Celery 1.0
style task classes, you should not need to use this for
new projects.
そこまで言われると、新規プロジェクトでは ここ とか ここ にかかれているように、Taskの継承は諦めて(継承元は object とし)、@app.task
でfunctionベースで書くのが良さそうです。
from proj.celery import app
class AddTask(object):
def run(self, x, y):
return x + y
@app.task()
def add_task(x, y):
return AddTask().run(x, y)
from .add_task import add_task
呼び出し側
from proj.tasks import add_task
add_task.delay(1, 2) # Celery経由の非同期実行の依頼
add_task.run(1, 2) # 直接実行。デバッガで止めたい時など。この run は AddTask.run() ではなく、app.task() の run です。
最後、ぐだぐだですみません。