動機
djangoでWebアプリを作っているのですが、重たい処理をサーバ上で行う場合、処理が終わるまでWebページを開きっぱなしにしておく必要があって困るのです。ページを閉じると処理も中止されてしまいます。
そんな時には、非同期処理。ページを閉じても処理は継続されるし、後でページを開いて結果を確認することもできます。
djangoで非同期処理を行う場合、ネットで検索して一番情報が多いのがCeleryとRedisを使う方法でした。しかし、色々な記事を読んでも、djangoへの組込み方は説明されているのですが、動作の仕組みや、タスクを別のホストで動かす方法(実際にはこれがしたかった)に言及されていなかったり、なんとなくしっくり来ませんでした。
そこで、とりあえずdjangoの話は置いておいて、Celery+RedisでPythonタスクをリモートホスト上で分散処理させる方法をまとめてみました。
環境
- OS : macOS 12.1 Monterey
- Python : 3.8.9 (macOS付属)
- Docker 20.10.12 (Redisサーバを動かす)
インストール
Redis
まずは、Redisをインストールして動かします。
今回、RedisサーバはDocker上で動かして済ませてしまいました。Docker-Desktopでも、Docker-CLI + Docker-Engineでも構わないのですが、Dockerが既にインストールされている環境であれば、以下のコマンドでRedisのDockerイメージが取得され、コンテナが動作します。
docker run -d --rm -p 6379:6379 redis
Dockerを使わないのであれば、Homebrewでredisをインストールし、redis-serverを立ち上げるでもOKです。
brew install redis
redis-server
Pythonモジュール
必要なPythonモジュールは、celeryとRedisです。私はpipでインストールしました。
pip3 install celery Redis [--user]
celeryモジュールをインストールすると、celeryコマンドもインストールされます。インストール時に"--user""オプションをつけた場合には、$HOME/Library/Python/3.8/bin/celery にありますので、パスを通しておくなどした方が良いでしょう。
まずはローカルで動かしてみる
動かしたいタスクの準備
非同期で動かしたいタスクを、tasks.pyというファイルで定義します。
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
app.conf.result_backend = 'redis://localhost:6379/0'
@app.task
def add(x, y):
from time import sleep
sleep(10)
return x + y
2行目ではCeleryクラスのインスタンスを作成していますが、第一引数はCeleryオブジェクト(アプリ)の名前、2番目のbroker引数はブローカーのURLです。
ここで、ブローカーとは発行されたタスクをキューに貯め込み、タスクを実行するワーカーに割り振るための機構で、先にDocker上で動作させたRedisを指定しています。6379番ポートを公開しているので、localhost:6379
でDocker上のRedisにアクセスできます。
3行目の記述は、実行したタスクの結果もRedis上でストアするためのものです。この記述がないと、タスクの実行結果はワーカーのログに出力されるだけになります。
(djangoで使用する場合は、結果をDB上のテーブルに格納するようにresult_backendを設定します。)
RedisのURLフィールドのポート番号(6379)とDB番号(末尾の0)は、いずれもデフォルト値なので、省略可能です。
app.conf.result_backend = 'redis://localhost'
実際に実行されるタスクは、add関数です。
add関数には@app.task
という修飾子が付けられていて、これがCeleryで実行されるタスクである指定になります。
タスクの実行
tasks.py
ができたら、ワーカーを起動します。
ワーカーは、タスクを実行させたいホストで、ずっと実行させておきます。今回はローカルで実行なので、tasks.py
があるディレクトリで以下のコマンドを実行します。
celery -A tasks worker --loglevel=info --concurrency=5
...
[config]
.> app: tasks:0x109a70730
.> transport: redis://localhost:6379//
.> results: redis://localhost:6379/
.> concurrency: 5 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
[2022-01-28 13:40:46,996: INFO/MainProcess] Connected to redis://localhost:6379//
[2022-01-28 13:40:47,021: INFO/MainProcess] mingle: searching for neighbors
[2022-01-28 13:40:48,073: INFO/MainProcess] mingle: all alone
[2022-01-28 13:40:48,140: INFO/MainProcess] celery@myhost.local ready.
-A tasks
はtasks.pyからCeleryアプリの定義を参照することを指定しています。
--concurrency=5
は、このワーカーでは最大5個のタスクを同時に実行する指定です。
では、タスクの発行を行います。
tasks.pyのあるディレクトリでPythonを起動し、以下のように入力します。
python3
>>> from tasks import add
>>> res = add.delay(1234, 5678)
importしたaddではなく、add.delay()を呼び出すことで、addがタスクとして発行されます。
ワーカー(celely)のログには、以下の1行目が出力され、10秒ほど経つと2行目が表示されます。これは、発行したタスクが「10秒スリープしてから足し算の答えを返す」ものだからで、2行目のログにも1234+5678の答えの6912が表示されています。
[2022-01-28 13:41:10,048: INFO/MainProcess] Task tasks.add[e1165fa2-779e-49e5-9aad-1fdb472315a6] received
[2022-01-28 13:41:20,074: INFO/ForkPoolWorker-4] Task tasks.add[e1165fa2-779e-49e5-9aad-1fdb472315a6] succeeded in 10.023096726000002s: 6912
ここで、タスクを発行したPythonのターミナルで、タスクの結果を確認してみましょう。
>>> res.ready()
True
>>> res.get()
6912
(余談)タスク定義のもう1つの形式
先の例では、tasks.pyにタスクの定義を記述し、ワーカーにアプリとしてtasksを指定していました。
これを、例えば以下のように構成を変更することができます。
mkdir adder
mv tasks.py adder/celery.py
結果として、tasks.pyはディレクトリadder/配下のcelery.pyに移動・リネームされました。
この状態でワーカーを起動します。起動方法は以下のように変わります。
celery -A adder worker --loglevel=info --concurrency=5
-A tasks
が-A adder
に変わっています。celeryは-A
でディレクトリが指定された場合、その配下のcelery.pyにアプリ定義がされているものとして参照するようです。
この場合、タスクの発行は、以下のようになります。
>>> from adder.celery import add
>>> res = add.delay(1234, 5678)
リモートでのタスク実行
ここからが本当にやりたかったことです。
先の例では、発行されたタスクは、同じホスト上で実行されていました。これを、別のホスト上で実行されるようにしたいわけです。
hostBでの準備
今までタスクの発行をしていたlocalhostをhostA、実際にタスクを実行させたいホストをhostBとしましょう。hostBにもceleryとRedisのPythonモジュールをインストールしておく必要があります。
pip3 install celery Redis [--user]
hostA上で作成したtasks.py(または、adder/celery.py)をhostBにコピーし、以下のように変更します。
from celery import Celery
app = Celery('tasks', broker='redis://hostA:6379/0') # hostAに変更
app.conf.result_backend = 'redis://hostA:6379/0' # hostAに変更
@app.task
def add(x, y):
from time import sleep
sleep(10)
return x + y
変更したのは、RedisサーバーのURLです。RedisはhostA上で(正確には、hostA上のDockerコンテナ上で)動いているので、URLのホスト名をhostAに変更します。
変更できたら、hostB上でワーカーを動かします。hostA(localhost)での動かし方と全く同じです。
celery -A tasks worker --loglevel=info --concurrency=5
hostAでタスクを発行し、hostBでタスクを実行
hostA上で、先の例と全く同じくタスクを発行します。
python3
>>> from tasks import add
>>> res = add.delay(1234, 5678)
今度は、hostB上でタスクが実行されます。
(hostA上のワーカーを止めていない場合は、hostA上で実行されるかもしれません :-)
試しに、hostAとhostBの両方でワーカーを実行しておき、hostAのPythonから以下のようにタスクを発行してみます。
from tasks import add
results = []
for i in range(10):
res = add(i, i)
results.append(res)
タスクを10個発行しています。
hostAとhostBのタスクはそれぞれ--concurrency=5
の指定をつけてあるので、それぞれ5個ずつのタスクが実行されます(たぶん)。
最後に
Celery+Redisで、簡単にPythonの分散処理が書けます。
そもそもの目的はDjangoでの非同期処理でしたが、他にも色々と使い道がありそうな気がします。