Edited at

【Python】非同期タスクの実行環境+モニタリング環境を作る

More than 3 years have passed since last update.


なにこれ

キューにタスクをガンガン貯めていって、ワーカーでサバサバ捌いていきたい時ってありますよね。

計算タスクとか、何かの大量デプロイ作業とかとか。

そういう仕組を作るとき、Pythonであればceleryが便利です。


ゴール

これを読んだ人がPythonで簡単に非同期処理の仕組みを実現し、処理状況のモニタリングまで

できること。


準備物


  • redisサーバー (メモリ上で動作する揮発性KVS)

  • celery (非同期処理の仕組みを提供するもの)

  • flower (celeryのextentionで監視するやつ)

apt-get install redis-server

pip install celery
pip install flower


最低限のコンポーネント


main.py

import tasks

print('<first task>')
# ここでタスク起動 (runタスク)
worker = tasks.run.delay()
# 終わらぬなら終わるまで待とうホトトギス
while not worker.ready():
pass
# 返り値をだす
print worker.result

print('<second task>')
# ここでタスク起動 (calcタスク)
worker = tasks.calc.delay(100, 200)
# 終わらぬなら終わるまで待とうホトトギス
while not worker.ready():
pass
# 返り値をだす
print worker.result


tasks.py

非同期処理させたいタスクを関数にまとめて、@taskデコレータをつけると

celeryから叩ける準備が出来ます。

引数・返り値の受け渡しは、 celeryのシリアライザーがうまくやってくれます。

自作クラスのインスタンス等はシリアライズできないので注意。

import time

from celery.decorators import task

@task
def run():
time.sleep(10)
print('処理 おわた')
return 'おわったよ'

@task
def calc(a, b):
return a+b


celeryconfig.py

celeryを動かすための設定ファイル。基本的にワーカーまわりのデータ受け渡しは

jsonでやりたいので、タスク・結果受け渡しのシリアライザーに「json」を指定している。

バックエンド(BROKER)はredisで動くようにしているが、RabbitMQを使うことも可能。

(そこらへんはお任せします)

下記の例では、ワーカーはtasks.pyを読み込みます。非同期処理させる関数を

含むスクリプト全てを指定しておきましょう。

CELERYD_LOG_LEVELをINFOにしておくと、タスクの標準出力もログ(celeryd.log)

に書かれます。プロダクションではERRORとかに設定しておくとよいかもです。

※ちなみに、CELERYD_LOG_LEVELはduplicateになったという噂。

CELERYD_CONCURRENCY=1なので、1こずつキューを捌いていきます。 

ここはCPU数に合わせていくのがよいでしょう。

BROKER_URL = 'redis://localhost/0'

CELERYD_CONCURRENCY = 1
CELERY_RESULT_BACKEND = 'redis'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_RESULT_BACKEND = "redis"
CELERYD_LOG_FILE = "./celeryd.log"
CELERYD_LOG_LEVEL = "INFO"
CELERY_IMPORTS = ("tasks", )


動かし方


redis-serverの起動

まずはredis-serverを起動しましょう。(必須)

サービスで既に起動している人はスキップ。

$ redis-server


celery worker起動

これでワーカーがキューをさばく準備ができる

(env) docker@1824542bb286:~/workspace$ celery worker

/home/docker/.virtualenvs/env2/local/lib/python2.7/site-packages/celery/app/defaults.py:251: CPendingDeprecationWarning:
The 'CELERYD_LOG_LEVEL' setting is scheduled for deprecation in version 2.4 and removal in version v4.0. Use the --loglevel argument instead

alternative='Use the {0.alt} instead'.format(opt))
/home/docker/.virtualenvs/env2/local/lib/python2.7/site-packages/celery/app/defaults.py:251: CPendingDeprecationWarning:
The 'CELERYD_LOG_FILE' setting is scheduled for deprecation in version 2.4 and removal in version v4.0. Use the --logfile argument instead

alternative='Use the {0.alt} instead'.format(opt))

-------------- celery@1824542bb286 v3.1.23 (Cipater)
---- **** -----
--- * *** * -- Linux-3.13.0-24-generic-x86_64-with-Ubuntu-14.04-trusty
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: default:0x7f068383f610 (.default.Loader)
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results:
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery

[tasks]
. tasks.run


実際にキューを入れてみる

非同期処理でけた

docker@1824542bb286:~/workspace$ python main.py

<first task>
おわったよ
<second task>
300


flowerによるタスク監視


flower起動

(env2) docker@1824542bb286:~/workspace$ celery flower

/home/docker/.virtualenvs/env2/local/lib/python2.7/site-packages/celery/app/defaults.py:251: CPendingDeprecationWarning:
The 'CELERYD_LOG_LEVEL' setting is scheduled for deprecation in version 2.4 and removal in version v4.0. Use the --loglevel argument instead

alternative='Use the {0.alt} instead'.format(opt))
/home/docker/.virtualenvs/env2/local/lib/python2.7/site-packages/celery/app/defaults.py:251: CPendingDeprecationWarning:
The 'CELERYD_LOG_FILE' setting is scheduled for deprecation in version 2.4 and removal in version v4.0. Use the --logfile argument instead

alternative='Use the {0.alt} instead'.format(opt))
[I 160617 13:02:20 command:136] Visit me at http://localhost:5555
[I 160617 13:02:20 command:141] Broker: redis://localhost:6379/0
[I 160617 13:02:20 command:144] Registered tasks:
['celery.backend_cleanup',
'celery.chain',
'celery.chord',
'celery.chord_unlock',
'celery.chunks',
'celery.group',
'celery.map',
'celery.starmap',
'tasks.run']
[I 160617 13:02:20 mixins:231] Connected to redis://localhost:6379/0


インターフェースへアクセス

デフォルトだと、localhost:5555がflower(監視インターフェース)のURLです。

監視だけじゃなくて、ワーカーの数も調節できるので便利。

スクリーンショット 2016-06-17 17.00.55.png