なにこれ
キューにタスクをガンガン貯めていって、ワーカーでサバサバ捌いていきたい時ってありますよね。
計算タスクとか、何かの大量デプロイ作業とかとか。
そういう仕組を作るとき、Pythonであればceleryが便利です。
ゴール
これを読んだ人がPythonで簡単に非同期処理の仕組みを実現し、処理状況のモニタリングまで
できること。
準備物
- redisサーバー (メモリ上で動作する揮発性KVS)
- celery (非同期処理の仕組みを提供するもの)
- flower (celeryのextentionで監視するやつ)
apt-get install redis-server
pip install celery
pip install flower
最低限のコンポーネント
main.py
import tasks
print('<first task>')
# ここでタスク起動 (runタスク)
worker = tasks.run.delay()
# 終わらぬなら終わるまで待とうホトトギス
while not worker.ready():
pass
# 返り値をだす
print worker.result
print('<second task>')
# ここでタスク起動 (calcタスク)
worker = tasks.calc.delay(100, 200)
# 終わらぬなら終わるまで待とうホトトギス
while not worker.ready():
pass
# 返り値をだす
print worker.result
tasks.py
非同期処理させたいタスクを関数にまとめて、@taskデコレータをつけると
celeryから叩ける準備が出来ます。
引数・返り値の受け渡しは、 celeryのシリアライザーがうまくやってくれます。
自作クラスのインスタンス等はシリアライズできないので注意。
import time
from celery.decorators import task
@task
def run():
time.sleep(10)
print('処理 おわた')
return 'おわったよ'
@task
def calc(a, b):
return a+b
celeryconfig.py
celeryを動かすための設定ファイル。基本的にワーカーまわりのデータ受け渡しは
jsonでやりたいので、タスク・結果受け渡しのシリアライザーに「json」を指定している。
バックエンド(BROKER)はredisで動くようにしているが、RabbitMQを使うことも可能。
(そこらへんはお任せします)
下記の例では、ワーカーはtasks.pyを読み込みます。非同期処理させる関数を
含むスクリプト全てを指定しておきましょう。
CELERYD_LOG_LEVELをINFOにしておくと、タスクの標準出力もログ(celeryd.log)
に書かれます。プロダクションではERRORとかに設定しておくとよいかもです。
※ちなみに、CELERYD_LOG_LEVELはduplicateになったという噂。
CELERYD_CONCURRENCY=1なので、1こずつキューを捌いていきます。
ここはCPU数に合わせていくのがよいでしょう。
BROKER_URL = 'redis://localhost/0'
CELERYD_CONCURRENCY = 1
CELERY_RESULT_BACKEND = 'redis'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_RESULT_BACKEND = "redis"
CELERYD_LOG_FILE = "./celeryd.log"
CELERYD_LOG_LEVEL = "INFO"
CELERY_IMPORTS = ("tasks", )
動かし方
redis-serverの起動
まずはredis-serverを起動しましょう。(必須)
サービスで既に起動している人はスキップ。
$ redis-server
celery worker起動
これでワーカーがキューをさばく準備ができる
(env) docker@1824542bb286:~/workspace$ celery worker
/home/docker/.virtualenvs/env2/local/lib/python2.7/site-packages/celery/app/defaults.py:251: CPendingDeprecationWarning:
The 'CELERYD_LOG_LEVEL' setting is scheduled for deprecation in version 2.4 and removal in version v4.0. Use the --loglevel argument instead
alternative='Use the {0.alt} instead'.format(opt))
/home/docker/.virtualenvs/env2/local/lib/python2.7/site-packages/celery/app/defaults.py:251: CPendingDeprecationWarning:
The 'CELERYD_LOG_FILE' setting is scheduled for deprecation in version 2.4 and removal in version v4.0. Use the --logfile argument instead
alternative='Use the {0.alt} instead'.format(opt))
-------------- celery@1824542bb286 v3.1.23 (Cipater)
---- **** -----
--- * *** * -- Linux-3.13.0-24-generic-x86_64-with-Ubuntu-14.04-trusty
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: default:0x7f068383f610 (.default.Loader)
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results:
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. tasks.run
実際にキューを入れてみる
非同期処理でけた
docker@1824542bb286:~/workspace$ python main.py
<first task>
おわったよ
<second task>
300
flowerによるタスク監視
flower起動
(env2) docker@1824542bb286:~/workspace$ celery flower
/home/docker/.virtualenvs/env2/local/lib/python2.7/site-packages/celery/app/defaults.py:251: CPendingDeprecationWarning:
The 'CELERYD_LOG_LEVEL' setting is scheduled for deprecation in version 2.4 and removal in version v4.0. Use the --loglevel argument instead
alternative='Use the {0.alt} instead'.format(opt))
/home/docker/.virtualenvs/env2/local/lib/python2.7/site-packages/celery/app/defaults.py:251: CPendingDeprecationWarning:
The 'CELERYD_LOG_FILE' setting is scheduled for deprecation in version 2.4 and removal in version v4.0. Use the --logfile argument instead
alternative='Use the {0.alt} instead'.format(opt))
[I 160617 13:02:20 command:136] Visit me at http://localhost:5555
[I 160617 13:02:20 command:141] Broker: redis://localhost:6379/0
[I 160617 13:02:20 command:144] Registered tasks:
['celery.backend_cleanup',
'celery.chain',
'celery.chord',
'celery.chord_unlock',
'celery.chunks',
'celery.group',
'celery.map',
'celery.starmap',
'tasks.run']
[I 160617 13:02:20 mixins:231] Connected to redis://localhost:6379/0
インターフェースへアクセス
デフォルトだと、localhost:5555がflower(監視インターフェース)のURLです。
監視だけじゃなくて、ワーカーの数も調節できるので便利。