2
5

More than 3 years have passed since last update.

Celery

Celeryはアプリケーションで非同期タスクを実行するためのフレームワーク

メッセージを送受信するためにメッセージブローカーという別のサービスが必要になる
メッセージブローカーにはRabbitMQやRedisなどがある

今回はRabbitMQを使う

RabbitMQ

# install
$ brew install rabbitmq
$ export PATH=$PATH:/usr/local/sbin

# start
$ rabbitmq-server

Configuring logger redirection

  ##  ##      RabbitMQ 3.8.18
  ##  ##
  ##########  Copyright (c) 2007-2021 VMware, Inc. or its affiliates.
  ######  ##
  ##########  Licensed under the MPL 2.0. Website: https://rabbitmq.com

  Erlang:      24.0.3 [jit]
  TLS Library: OpenSSL - OpenSSL 1.1.1k  25 Mar 2021

  Doc guides:  https://rabbitmq.com/documentation.html
  Support:     https://rabbitmq.com/contact.html
  Tutorials:   https://rabbitmq.com/getstarted.html
  Monitoring:  https://rabbitmq.com/monitoring.html

  Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
        /usr/local/var/log/rabbitmq/rabbit@localhost_upgrade.log

  Config file(s): (none)

  Starting broker... completed with 6 plugins.

その他のコマンド

# run background
$ rabbitmq-server -detached

# stop
$ rabbitmqctl stop

# show status
$ rabbitmqctl status

サーバー起動中に以下のURLを開くとGUIで状況を確認できる
http://localhost:15672/
スクリーンショット 2021-06-29 21.59.54.png

Postgresql

タスクの状態を保持するためにデータベースを用意する

# install
$ brew install postgresql

# start
$ pg_ctl -D /usr/local/var/postgres start

# path
$ export PGDATA=/usr/local/var/postgres

# create role
$ createuser -P admin

# create database
$ createdb demo_celery -O admin

# show databases
$ psql -l

# connect database
$ psql demo_celery

Workerを起動する

参考:https://docs.celeryproject.org/en/stable/getting-started/first-steps-with-celery.html

インストール

$ pip install celery
$ pip install psycopg2  # postgresqlをpythonで使うために必要

Celeryインスタンスの作成とタスクの定義を行うtasks.pyを作成する

tasks.py
from celery import Celery

app = Celery(
    'tasks',
    backend='db+postgresql://admin:password@localhost:5432/demo_celery',
    broker='amqp://guest:guest@localhost//'
)

@app.task
def add(x, y):
    return x + y

postgresqlのURLの指定方法:https://docs.celeryproject.org/en/stable/userguide/configuration.html#database-url-examples

Workerを起動する

$ celery -A tasks worker --loglevel=INFO

celery@HOST v5.0.5 (singularity)

[config]
.> app:         tasks:0x10f3aac40
.> transport:   amqp://guest:**@localhost:5672//
.> results:     postgresql://admin:**@localhost:5432/demo_celery
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.add

[2021-06-29 22:43:12,415: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2021-06-29 22:43:12,426: INFO/MainProcess] mingle: searching for neighbors
[2021-06-29 22:43:13,453: INFO/MainProcess] mingle: all alone
[2021-06-29 22:43:13,467: INFO/MainProcess] celery@HOST ready.

タスクを呼び出す

>>> from tasks import add
>>> result = add.delay(4, 4)
>>> result.ready()
True
>>> result.get(timeout=1)
8

設定ファイル

celeryconfig.pyに設定を書き、app.config_from_objectでその設定を呼び出すことができる

celeryconfig.py
broker_url = 'amqp://guest:guest@localhost//'
result_backend = 'db+postgresql://admin:password@localhost:5432/demo_celery'
tasks.py
from celery import Celery

app = Celery('tasks')
app.config_from_object('celeryconfig')

@app.task
def add(x, y):
    return x + y
2
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
5