6
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Celery+RedisでPythonタスクの分散処理

Last updated at Posted at 2022-01-28

動機

djangoでWebアプリを作っているのですが、重たい処理をサーバ上で行う場合、処理が終わるまでWebページを開きっぱなしにしておく必要があって困るのです。ページを閉じると処理も中止されてしまいます。
そんな時には、非同期処理。ページを閉じても処理は継続されるし、後でページを開いて結果を確認することもできます。

djangoで非同期処理を行う場合、ネットで検索して一番情報が多いのがCeleryRedisを使う方法でした。しかし、色々な記事を読んでも、djangoへの組込み方は説明されているのですが、動作の仕組みや、タスクを別のホストで動かす方法(実際にはこれがしたかった)に言及されていなかったり、なんとなくしっくり来ませんでした。

そこで、とりあえずdjangoの話は置いておいて、Celery+RedisでPythonタスクをリモートホスト上で分散処理させる方法をまとめてみました。

環境

  • OS : macOS 12.1 Monterey
  • Python : 3.8.9 (macOS付属)
  • Docker 20.10.12 (Redisサーバを動かす)

インストール

Redis

まずは、Redisをインストールして動かします。
今回、RedisサーバはDocker上で動かして済ませてしまいました。Docker-Desktopでも、Docker-CLI + Docker-Engineでも構わないのですが、Dockerが既にインストールされている環境であれば、以下のコマンドでRedisのDockerイメージが取得され、コンテナが動作します。

docker run -d --rm -p 6379:6379 redis

Dockerを使わないのであれば、Homebrewでredisをインストールし、redis-serverを立ち上げるでもOKです。

brew install redis
redis-server

Pythonモジュール

必要なPythonモジュールは、celeryとRedisです。私はpipでインストールしました。

pip3 install celery Redis [--user]

celeryモジュールをインストールすると、celeryコマンドもインストールされます。インストール時に"--user""オプションをつけた場合には、$HOME/Library/Python/3.8/bin/celery にありますので、パスを通しておくなどした方が良いでしょう。

まずはローカルで動かしてみる

動かしたいタスクの準備

非同期で動かしたいタスクを、tasks.pyというファイルで定義します。

[tasks.py]
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
app.conf.result_backend = 'redis://localhost:6379/0'

@app.task
def add(x, y):
    from time import sleep
    sleep(10)
    return x + y

2行目ではCeleryクラスのインスタンスを作成していますが、第一引数はCeleryオブジェクト(アプリ)の名前、2番目のbroker引数はブローカーのURLです。
ここで、ブローカーとは発行されたタスクをキューに貯め込み、タスクを実行するワーカーに割り振るための機構で、先にDocker上で動作させたRedisを指定しています。6379番ポートを公開しているので、localhost:6379でDocker上のRedisにアクセスできます。

3行目の記述は、実行したタスクの結果もRedis上でストアするためのものです。この記述がないと、タスクの実行結果はワーカーのログに出力されるだけになります。
(djangoで使用する場合は、結果をDB上のテーブルに格納するようにresult_backendを設定します。)

RedisのURLフィールドのポート番号(6379)とDB番号(末尾の0)は、いずれもデフォルト値なので、省略可能です。

app.conf.result_backend = 'redis://localhost'

実際に実行されるタスクは、add関数です。
add関数には@app.taskという修飾子が付けられていて、これがCeleryで実行されるタスクである指定になります。

タスクの実行

tasks.pyができたら、ワーカーを起動します。
ワーカーは、タスクを実行させたいホストで、ずっと実行させておきます。今回はローカルで実行なので、tasks.pyがあるディレクトリで以下のコマンドを実行します。

celery -A tasks worker --loglevel=info --concurrency=5
...
[config]
.> app:         tasks:0x109a70730
.> transport:   redis://localhost:6379//
.> results:     redis://localhost:6379/
.> concurrency: 5 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery           exchange=celery(direct) key=celery
[tasks]
  . tasks.add
[2022-01-28 13:40:46,996: INFO/MainProcess] Connected to redis://localhost:6379//
[2022-01-28 13:40:47,021: INFO/MainProcess] mingle: searching for neighbors
[2022-01-28 13:40:48,073: INFO/MainProcess] mingle: all alone
[2022-01-28 13:40:48,140: INFO/MainProcess] celery@myhost.local ready.

-A tasksはtasks.pyからCeleryアプリの定義を参照することを指定しています。
--concurrency=5は、このワーカーでは最大5個のタスクを同時に実行する指定です。

では、タスクの発行を行います。
tasks.pyのあるディレクトリでPythonを起動し、以下のように入力します。

python3
>>> from tasks import add
>>> res = add.delay(1234, 5678)

importしたaddではなく、add.delay()を呼び出すことで、addがタスクとして発行されます。

ワーカー(celely)のログには、以下の1行目が出力され、10秒ほど経つと2行目が表示されます。これは、発行したタスクが「10秒スリープしてから足し算の答えを返す」ものだからで、2行目のログにも1234+5678の答えの6912が表示されています。

[2022-01-28 13:41:10,048: INFO/MainProcess] Task tasks.add[e1165fa2-779e-49e5-9aad-1fdb472315a6] received
[2022-01-28 13:41:20,074: INFO/ForkPoolWorker-4] Task tasks.add[e1165fa2-779e-49e5-9aad-1fdb472315a6] succeeded in 10.023096726000002s: 6912

ここで、タスクを発行したPythonのターミナルで、タスクの結果を確認してみましょう。

>>> res.ready()
True
>>> res.get()
6912

(余談)タスク定義のもう1つの形式

先の例では、tasks.pyにタスクの定義を記述し、ワーカーにアプリとしてtasksを指定していました。
これを、例えば以下のように構成を変更することができます。

mkdir adder
mv tasks.py adder/celery.py

結果として、tasks.pyはディレクトリadder/配下のcelery.pyに移動・リネームされました。
この状態でワーカーを起動します。起動方法は以下のように変わります。

celery -A adder worker --loglevel=info --concurrency=5

-A tasks-A adderに変わっています。celeryは-Aでディレクトリが指定された場合、その配下のcelery.pyにアプリ定義がされているものとして参照するようです。

この場合、タスクの発行は、以下のようになります。

>>> from adder.celery import add
>>> res = add.delay(1234, 5678)

リモートでのタスク実行

ここからが本当にやりたかったことです。
先の例では、発行されたタスクは、同じホスト上で実行されていました。これを、別のホスト上で実行されるようにしたいわけです。

hostBでの準備

今までタスクの発行をしていたlocalhostをhostA、実際にタスクを実行させたいホストをhostBとしましょう。hostBにもceleryとRedisのPythonモジュールをインストールしておく必要があります。

pip3 install celery Redis [--user]

hostA上で作成したtasks.py(または、adder/celery.py)をhostBにコピーし、以下のように変更します。

[tasks.py]
from celery import Celery
app = Celery('tasks', broker='redis://hostA:6379/0') # hostAに変更
app.conf.result_backend = 'redis://hostA:6379/0' # hostAに変更

@app.task
def add(x, y):
    from time import sleep
    sleep(10)
    return x + y

変更したのは、RedisサーバーのURLです。RedisはhostA上で(正確には、hostA上のDockerコンテナ上で)動いているので、URLのホスト名をhostAに変更します。

変更できたら、hostB上でワーカーを動かします。hostA(localhost)での動かし方と全く同じです。

celery -A tasks worker --loglevel=info --concurrency=5

hostAでタスクを発行し、hostBでタスクを実行

hostA上で、先の例と全く同じくタスクを発行します。

python3
>>> from tasks import add
>>> res = add.delay(1234, 5678)

今度は、hostB上でタスクが実行されます。
(hostA上のワーカーを止めていない場合は、hostA上で実行されるかもしれません :-)

試しに、hostAとhostBの両方でワーカーを実行しておき、hostAのPythonから以下のようにタスクを発行してみます。

from tasks import add
results = []
for i in range(10):
    res = add(i, i)
    results.append(res)

タスクを10個発行しています。
hostAとhostBのタスクはそれぞれ--concurrency=5の指定をつけてあるので、それぞれ5個ずつのタスクが実行されます(たぶん)。

最後に

Celery+Redisで、簡単にPythonの分散処理が書けます。
そもそもの目的はDjangoでの非同期処理でしたが、他にも色々と使い道がありそうな気がします。

6
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?