2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Celery: @shared_task

Last updated at Posted at 2015-05-21
  • キュー、エクスジェンジ、キーなどを変えたり複数定義したりするとき

RabbitMQにユーザーと仮想ホスト作成

$ sudo rabbitmqctl add_vhost lafoglia
$ sudo rabbitmqctl add_user lafoglia lafoglia
$ sudo rabbitmqctl set_permissions -p lafoglia lafoglia ".*" ".*" ".*"

app/settings.py

  • kombuでCELERY_QUEUESを定義
  • めんどくさいのでJOBQでuser, vhost, passwordなどを同じにしてみた
JOBQ = 'lafoglia'

CELERY_RESULT_BACKEND = 'amqp'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
BROKER_URL = 'amqp://{0}:{1}@localhost:5672/{2}'.format(JOBQ, JOBQ, JOBQ)
from kombu import Exchange, Queue
CELERY_DEFAULT_QUEUE = JOBQ
CELERY_QUEUES = (Queue(JOBQ, Exchange(JOBQ), routing_key=JOBQ),)

app/celery.py

  • サンプル通り
from __future__ import absolute_import
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings')

from django.conf import settings

app = Celery('app')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

acounts/tasks.py

  • サンプル通り
from celery import shared_task

@shared_task
def add(x, y):
    ret = x + y
    return ret

ワーカー

  • settings.py のキューでCeleryのワーカーが動いている
  • ので、このCeleryにたいしてジョブを投げる必要があります
$ celery -A app worker -l info -E -B
 -------------- celery@jessie.local v3.1.18 (Cipater)
---- **** ----- 
--- * ***  * -- Linux-3.16.0-4-amd64-x86_64-with-debian-8.0
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         app:0x7f027c104050
- ** ---------- .> transport:   amqp://lafoglia:**@localhost:5672/lafoglia
- ** ---------- .> results:     amqp
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> lafoglia         exchange=lafoglia(direct) key=lafoglia
                

[tasks]
  . accounts.tasks.add

タスク実行 : app をロードすること

  • app(Celeryインスタンス)にキューのパラメータが保持されているので、appが違うと別のキューに対してタスクを投げようとしてしまう。
>>> from accounts.tasks import *
>>> add.delay(1, 3).get()
Traceback (most recent call last):
  File "<console>", line 1, in <module>
  File "/home/vagrant/.anyenv/envs/pyenv/versions/wordpress/lib/python2.7/site-packages/celery/result.py", line 169, in get
    no_ack=no_ack,
  File "/home/vagrant/.anyenv/envs/pyenv/versions/wordpress/lib/python2.7/site-packages/celery/backends/base.py", line 601, in _is_disabled
    'No result backend configured.  '
NotImplementedError: No result backend configured.  Please see the documentation for more information.
>>> from app.celery import app
>>> add.delay(1, 3).get()
4

current_app とは?

>>> from celery import current_app
>>> current_app.main
'default'
>>> from app.celery import app
>>> current_app.main
'app'
>>> from celery import Celery
>>> app = Celery('hoge')
>>> current_app.main
'hoge'

celery.local.Proxy

>>> type(current_app)
<class 'celery.local.Proxy'>
  • つまり、current_appは参照されるたびに現在のCeleryインスタンスを返す代理オブジェクト
class Proxy(object):                                                             

    def __init__(self, local,                                                    
                 args=None, kwargs=None, 
                 name=None, __doc__=None):     
        
        # 元のオブジェクト保存 
        object.__setattr__(self, '_Proxy__local', local)  
        ...
        
    def _get_current_object(self):                                                
        
        # 代理元の実際のオブジェクト                                                                   
        loc = object.__getattribute__(self, '_Proxy__local')   
        
        # celery.utils.threads.Localのサブクラス                  
        if not hasattr(loc, '__release_local__'):                                
            return loc(*self.__args, **self.__kwargs)   
                                     
        try:                                                                     
            return getattr(loc, self.__name__)                                   
        except AttributeError:                                                   
            raise RuntimeError(
            	'no object bound to {0.__name__}'.format(self))   


celery.app.base.Celery

  • コンストラクタで自分を現在のアプリとして設定します
class Celery(object): 
	...
	def __init__(self, ....):
		...	               
        if self.set_as_current:                                                  
            self.set_current()                                                   
		...

    def set_current(self):                                                       
        _set_current_app(self)    	

celery._state._set_current_app

  • TLS(Thread Loacal Storage) にシングルトンで現在のアプリを保持
def _set_current_app(app):                                                          
    _tls.current_app = app                                                          
class _TLS(threading.local):                                                                   
    current_app = None  
                                                             
_tls = _TLS() 

celery._state._get_current_app

  • TLSから current_appを返す
  • NoneだったらデフォルトCeleryを作ってカレントに設定してから返す
def _get_current_app():                                                          
    if default_app is None:                                                      
        from celery.app import Celery  
        # デフォルト作成                                          
        set_default_app(Celery(                                                  
            'default',                                                           
            loader=os.environ.get('CELERY_LOADER') or 'default',                 
            fixups=[],                                                           
            set_as_current=False, accept_magic_kwargs=True,                      
        ))     
                                                                          
    return _tls.current_app or default_app                                       
  • これは基本的には get_current_app() です

celery._state.current_app

  • get_current_app() のプロキシーです
current_app = Proxy(get_current_app) 
2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?