今のお仕事で django-celery + RabbitMQ を使うのですが、インフラには AWS を使っているので Amazon SQS との連携はできないのか気になったので試してみました。
ソースコードは GitHub に置いてあります。
準備
必要なライブラリを pip でインストールします。
$ pip install django
$ pip install django-celery
$ pip install boto
$ pip install django-dotenv
$ pip freeze > requirements.txt
サンプル用のプロジェクトを作成
$ django-admin.py startproject demo .
$ python manage.py startapp items
settings.py を編集
settings.py の編集をする前に Amazon SQS を使うのでアクセスキーとシークレットキーを用意しておきます。
アクセスキー等は settings.py に直接書いても良いのですが、git での管理対象にしたくなかったので django-dotenv を使ってみました。
.env ファイルは以下のようにしています。
AWS_SQS_ACCESS_KEY=XXXXX
AWS_SQS_SECRET_ACCESS_KEY=YYYYY
AWS_SQS_REGION=ap-northeast-1
次に settings.py を編集します。
# import 追加
import os
import urllib
〜
# DB は sqlite3
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': 'demo.db',
}
}
〜
# timezone は一応変えておく
TIME_ZONE = 'Asia/Tokyo'
〜
INSTALLED_APPS = (
〜
# 以下を追加
'djcelery',
'items',
)
〜
# 最後に以下を追加
# urllib.quote を使っているのはアクセスキーに URL エンコードが必要な文字列が入っている場合があるため
import djcelery
djcelery.setup_loader()
BROKER_URL = 'sqs://%s:%s@' % (urllib.quote(os.environ['AWS_SQS_ACCESS_KEY'], ''),
urllib.quote(os.environ['AWS_SQS_SECRET_ACCESS_KEY'], ''))
BROKER_TRANSPORT_OPTIONS = {'region': os.environ['AWS_SQS_REGION'],
'queue_name_prefix': 'celery_sqs_demo-'}
次に manage.py に以下を追加します。
import dotenv
dotenv.read_dotenv()
これで .env ファイルが読み込まれるようになりました。
アプリケーションを作成
モデル
適当です。
from django.db import models
class Item(models.Model):
uuid = models.CharField(max_length=200)
created_at = models.DateTimeField('created at')
タスク
非同期処理するタスクを作成します。
このファイルは tasks.py というファイル名である必要があるそうです。
import time
from celery import task
from items.models import Item
@task
def add_item(uuid, created_at):
time.sleep(3)
item = Item(uuid=uuid, created_at=created_at)
item.save()
ビュー
これも適当です。
import uuid
from django.http import HttpResponse
from django.utils import timezone
from items.tasks import add_item
def index(request):
add_item.delay(uuid.uuid4(), timezone.now())
return HttpResponse('success')
urls.py も編集しておきます。
from django.conf.urls import patterns, include, url
# Uncomment the next two lines to enable the admin:
# from django.contrib import admin
# admin.autodiscover()
urlpatterns = patterns('',
url(r'^items/$', 'items.views.index'),
)
実行
その前に DB を用意しておきます。
celery 関係のテーブルも作成されます。
$ python manage syncdb
celeryd を起動します。
ログを確認するときは -l info
を付けます。
$ python manage.py celeryd
最初 celeryd が起動せず以下のようなエラーが発生しました。
ValueError: invalid literal for int() with base 10: ‘XXXXXXXXXX’
> 原因は settings.py の編集のところでも書いていますが、AWS のアクセスキーやシークレットキーに URL エンコードが必要な文字列が混入することがあったためです。
無事 celeryd が起動したら Django アプリケーションを起動します。
$ python manage.py runserver
起動したら http://localhost:4567/items/ にアクセスしてみます。
celeryd のログを確認すると以下のようなログが出ていて、メッセージを取る事ができていることが確認できます。
[2014-01-25 15:38:27,668: INFO/MainProcess] Received task: items.tasks.add_item[XXXXXXXXXX]
[2014-01-25 15:38:30,702: INFO/MainProcess] Task items.tasks.add_item[XXXXXXXXXX] succeeded in 3.031301742s: None
DB も確認してみるとちゃんとレコードが作成されています。
$ sqlite3 demo.db
sqlite> select * from items_item;
1|c23bd4f4-720f-4488-a6b9-dc26ed495c71|2014-01-25 06:38:26.908489
SQS を確認してみると以下の二つのキューが作成されていました。
* celery_sqs_demo-celery
* celery_sqs_demo-celery_{ホスト名}-celery-pidbox
```celery_sqs_demo``` の部分は settings.py で ```BROKER_TRANSPORT_OPTIONS``` に設定した prefix になります。
celery_{ホスト名}-celery-pidbox というキューが何のためのものなのかはよくわかってないので調べてみます。
# まとめ
もっと手間がかかるものかと思っていたのですが、大きなハマりポイントもなくあっさり動作を確認することができました。
ただ SQS との連携はまだ Experimental なので Stable になるのが待ち遠しいですね。
今度は複数のホストからメッセージを送って、タスクの重複が起きないか等も試してみたいと思います。
# 参考
* [First steps with Django - Celery 3.1.8 documentation](http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#using-celery-with-django)
* [Using Amazon SQS - Celery 3.1.8 documentation](http://docs.celeryproject.org/en/latest/getting-started/brokers/sqs.html)
* [django-celeryで非同期処理クイックスタートガイド - hirokiky's blog](http://blog.hirokiky.org/2013/03/23/quick_start_guid_about_django_celery.html)