目的
分散TaskQueue処理を行うCeleryを利用して処理を行う
サンプルについて記載してみた。
前提
- Python 3.6.1
- celery
- RabbitMQをインストール、起動済みであること
インストール
pip install celery
windowsの場合はceleryの4以降がサポートされていないので、
windowsがサポートされている最後のバージョンを指定します。
pip install celery==3.1.25
ワーカーのコード
実際に処理を行うためのワーカーメソッドを利用します。
from celery import Celery
app = Celery('tasks', result='rpc://', broker='amqp://guest@192.168.0.3//')
@app.task
def add(x, y):
return x, y
これをワーカーとして起動します。
192.168.0.3で起動させたRabbitMQを利用します。
結果の保存先(結果バックエンドというらしいには)rpc://を指定します。
本番であればRedisなどを格納先とするらしいです。
$ celery -A tasks worker --loglevel=info
the ability to execute any command. It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.
If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
You must only enable the serializers that you will actually use.
warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))
-------------- celery@DESKTOP-GJOIME5 v3.1.25 (Cipater)
---- **** -----
--- * *** * -- Windows-10-10.0.14393-SP0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x22d0e56d080
- ** ---------- .> transport: amqp://guest:**@192.168.0.3:5672//
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
[2017-06-19 06:45:10,040: INFO/MainProcess] Connected to amqp://guest:**@192.168.0.3:5672//
[2017-06-19 06:45:10,118: INFO/MainProcess] mingle: searching for neighbors
[2017-06-19 06:45:11,262: INFO/MainProcess] mingle: all alone
[2017-06-19 06:45:11,332: WARNING/MainProcess] celery@DESKTOP-GJOIME5 ready.
無事に起動しました。
タスクの実行
呼び出し側のコードと実行結果
>>> from tasks import add
>>> async_result = add.delay(1,2)
>>> async_result
<AsyncResult: 69bf0ccf-6e74-46e0-ae5a-1fb566bb0657>
# AsyncResultのuuidを用いてRedisなどに格納した結果と紐づけている???
>>> async_result.ready()
True
>>> async_result.result
3
delayメソッドで呼び出すことでタスクをキューイングできる。
結果は、result.ready()の結果がTrueになった後であればresultから取得できる。
呼び出し時のワーカの挙動
すでに起動済みだったワーカにタスクを投げ込んだ際の挙動。
見る限りだと無事に指定されたタスクを実行できているように見える。
[2017-06-19 06:56:23,934: INFO/MainProcess] Received task: tasks.add[ff679978-8edd-47db-b599-79aa3c8844eb]
[2017-06-19 06:56:23,934: INFO/MainProcess] Task tasks.add[ff679978-8edd-47db-b599-79aa3c8844eb] succeeded in
0s: 3
結果の保存
タスク実行結果の保存にはCeleryのインスタンスを生成する際にバックエンドを指定する。
今回はrpc://を指定しているが、本番運用ではRedisなどに格納することが推奨されるらしい。
(https://blog.ozacc.com/docs/celery/getting-started/first-steps-with-celery.html#keeping-results)
まとめ
Celeryを用いたタスクキューイングはかなり容易であることが分かった。
簡単なタスク処理の仕組みならCelery + RabbitMQ + 結果の保存先を用いることで
手早く作成することができそう。