Celery
Celeryはアプリケーションで非同期タスクを実行するためのフレームワーク
メッセージを送受信するためにメッセージブローカーという別のサービスが必要になる
メッセージブローカーにはRabbitMQやRedisなどがある
今回はRabbitMQを使う
RabbitMQ
# install
$ brew install rabbitmq
$ export PATH=$PATH:/usr/local/sbin
# start
$ rabbitmq-server
Configuring logger redirection
## ## RabbitMQ 3.8.18
## ##
########## Copyright (c) 2007-2021 VMware, Inc. or its affiliates.
###### ##
########## Licensed under the MPL 2.0. Website: https://rabbitmq.com
Erlang: 24.0.3 [jit]
TLS Library: OpenSSL - OpenSSL 1.1.1k 25 Mar 2021
Doc guides: https://rabbitmq.com/documentation.html
Support: https://rabbitmq.com/contact.html
Tutorials: https://rabbitmq.com/getstarted.html
Monitoring: https://rabbitmq.com/monitoring.html
Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
/usr/local/var/log/rabbitmq/rabbit@localhost_upgrade.log
Config file(s): (none)
Starting broker... completed with 6 plugins.
その他のコマンド
# run background
$ rabbitmq-server -detached
# stop
$ rabbitmqctl stop
# show status
$ rabbitmqctl status
サーバー起動中に以下のURLを開くとGUIで状況を確認できる
http://localhost:15672/
Postgresql
タスクの状態を保持するためにデータベースを用意する
# install
$ brew install postgresql
# start
$ pg_ctl -D /usr/local/var/postgres start
# path
$ export PGDATA=/usr/local/var/postgres
# create role
$ createuser -P admin
# create database
$ createdb demo_celery -O admin
# show databases
$ psql -l
# connect database
$ psql demo_celery
Workerを起動する
参考:https://docs.celeryproject.org/en/stable/getting-started/first-steps-with-celery.html
インストール
$ pip install celery
$ pip install psycopg2 # postgresqlをpythonで使うために必要
Celeryインスタンスの作成とタスクの定義を行うtasks.pyを作成する
tasks.py
from celery import Celery
app = Celery(
'tasks',
backend='db+postgresql://admin:password@localhost:5432/demo_celery',
broker='amqp://guest:guest@localhost//'
)
@app.task
def add(x, y):
return x + y
postgresqlのURLの指定方法:https://docs.celeryproject.org/en/stable/userguide/configuration.html#database-url-examples
Workerを起動する
$ celery -A tasks worker --loglevel=INFO
celery@HOST v5.0.5 (singularity)
[config]
.> app: tasks:0x10f3aac40
.> transport: amqp://guest:**@localhost:5672//
.> results: postgresql://admin:**@localhost:5432/demo_celery
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
[2021-06-29 22:43:12,415: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2021-06-29 22:43:12,426: INFO/MainProcess] mingle: searching for neighbors
[2021-06-29 22:43:13,453: INFO/MainProcess] mingle: all alone
[2021-06-29 22:43:13,467: INFO/MainProcess] celery@HOST ready.
タスクを呼び出す
>>> from tasks import add
>>> result = add.delay(4, 4)
>>> result.ready()
True
>>> result.get(timeout=1)
8
設定ファイル
celeryconfig.pyに設定を書き、app.config_from_objectでその設定を呼び出すことができる
celeryconfig.py
broker_url = 'amqp://guest:guest@localhost//'
result_backend = 'db+postgresql://admin:password@localhost:5432/demo_celery'
tasks.py
from celery import Celery
app = Celery('tasks')
app.config_from_object('celeryconfig')
@app.task
def add(x, y):
return x + y