97
100

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

【Python】非同期タスクの実行環境+モニタリング環境を作る

Last updated at Posted at 2016-06-17

なにこれ

キューにタスクをガンガン貯めていって、ワーカーでサバサバ捌いていきたい時ってありますよね。
計算タスクとか、何かの大量デプロイ作業とかとか。
そういう仕組を作るとき、Pythonであればceleryが便利です。

ゴール

これを読んだ人がPythonで簡単に非同期処理の仕組みを実現し、処理状況のモニタリングまで
できること。

準備物

  • redisサーバー (メモリ上で動作する揮発性KVS)
  • celery (非同期処理の仕組みを提供するもの)
  • flower (celeryのextentionで監視するやつ)
apt-get install redis-server
pip install celery
pip install flower

最低限のコンポーネント

main.py

import tasks

print('<first task>')
# ここでタスク起動 (runタスク)
worker = tasks.run.delay()
# 終わらぬなら終わるまで待とうホトトギス
while not worker.ready():
    pass
# 返り値をだす
print worker.result

print('<second task>')
# ここでタスク起動 (calcタスク)
worker = tasks.calc.delay(100, 200)
# 終わらぬなら終わるまで待とうホトトギス
while not worker.ready():
    pass
# 返り値をだす
print worker.result

tasks.py

非同期処理させたいタスクを関数にまとめて、@taskデコレータをつけると
celeryから叩ける準備が出来ます。
引数・返り値の受け渡しは、 celeryのシリアライザーがうまくやってくれます。
自作クラスのインスタンス等はシリアライズできないので注意。

import time
from celery.decorators import task

@task
def run():
    time.sleep(10)
    print('処理 おわた')
    return 'おわったよ'


@task
def calc(a, b):
    return a+b

celeryconfig.py

celeryを動かすための設定ファイル。基本的にワーカーまわりのデータ受け渡しは
jsonでやりたいので、タスク・結果受け渡しのシリアライザーに「json」を指定している。
バックエンド(BROKER)はredisで動くようにしているが、RabbitMQを使うことも可能。
(そこらへんはお任せします)
下記の例では、ワーカーはtasks.pyを読み込みます。非同期処理させる関数を
含むスクリプト全てを指定しておきましょう。
CELERYD_LOG_LEVELをINFOにしておくと、タスクの標準出力もログ(celeryd.log)
に書かれます。プロダクションではERRORとかに設定しておくとよいかもです。
※ちなみに、CELERYD_LOG_LEVELはduplicateになったという噂。

CELERYD_CONCURRENCY=1なので、1こずつキューを捌いていきます。 
ここはCPU数に合わせていくのがよいでしょう。

BROKER_URL = 'redis://localhost/0'
CELERYD_CONCURRENCY = 1
CELERY_RESULT_BACKEND = 'redis'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_RESULT_BACKEND = "redis"
CELERYD_LOG_FILE = "./celeryd.log"
CELERYD_LOG_LEVEL = "INFO"
CELERY_IMPORTS = ("tasks", )

動かし方

redis-serverの起動

まずはredis-serverを起動しましょう。(必須)
サービスで既に起動している人はスキップ。

$ redis-server

celery worker起動

これでワーカーがキューをさばく準備ができる

(env) docker@1824542bb286:~/workspace$ celery worker
/home/docker/.virtualenvs/env2/local/lib/python2.7/site-packages/celery/app/defaults.py:251: CPendingDeprecationWarning:
    The 'CELERYD_LOG_LEVEL' setting is scheduled for deprecation in     version 2.4 and removal in version v4.0.     Use the --loglevel argument instead

  alternative='Use the {0.alt} instead'.format(opt))
/home/docker/.virtualenvs/env2/local/lib/python2.7/site-packages/celery/app/defaults.py:251: CPendingDeprecationWarning:
    The 'CELERYD_LOG_FILE' setting is scheduled for deprecation in     version 2.4 and removal in version v4.0.     Use the --logfile argument instead

  alternative='Use the {0.alt} instead'.format(opt))

 -------------- celery@1824542bb286 v3.1.23 (Cipater)
---- **** -----
--- * ***  * -- Linux-3.13.0-24-generic-x86_64-with-Ubuntu-14.04-trusty
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         default:0x7f068383f610 (.default.Loader)
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.run

実際にキューを入れてみる

非同期処理でけた

docker@1824542bb286:~/workspace$ python main.py
<first task>
おわったよ
<second task>
300

flowerによるタスク監視

flower起動

(env2) docker@1824542bb286:~/workspace$ celery flower
/home/docker/.virtualenvs/env2/local/lib/python2.7/site-packages/celery/app/defaults.py:251: CPendingDeprecationWarning:
    The 'CELERYD_LOG_LEVEL' setting is scheduled for deprecation in     version 2.4 and removal in version v4.0.     Use the --loglevel argument instead

  alternative='Use the {0.alt} instead'.format(opt))
/home/docker/.virtualenvs/env2/local/lib/python2.7/site-packages/celery/app/defaults.py:251: CPendingDeprecationWarning:
    The 'CELERYD_LOG_FILE' setting is scheduled for deprecation in     version 2.4 and removal in version v4.0.     Use the --logfile argument instead

  alternative='Use the {0.alt} instead'.format(opt))
[I 160617 13:02:20 command:136] Visit me at http://localhost:5555
[I 160617 13:02:20 command:141] Broker: redis://localhost:6379/0
[I 160617 13:02:20 command:144] Registered tasks:
    ['celery.backend_cleanup',
     'celery.chain',
     'celery.chord',
     'celery.chord_unlock',
     'celery.chunks',
     'celery.group',
     'celery.map',
     'celery.starmap',
     'tasks.run']
[I 160617 13:02:20 mixins:231] Connected to redis://localhost:6379/0

インターフェースへアクセス

デフォルトだと、localhost:5555がflower(監視インターフェース)のURLです。
監視だけじゃなくて、ワーカーの数も調節できるので便利。

スクリーンショット 2016-06-17 17.00.55.png
97
100
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
97
100

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?