1. Celeryとは
Celeryは、Pythonで書かれた非同期タスクキューライブラリです。大規模なアプリケーションで、時間のかかる処理や定期的なタスクを効率的に実行するために使用されます。
主な特徴:
- 分散システムでの非同期タスク実行
- スケジューリング機能
- 様々なメッセージブローカーとの互換性(Redis、RabbitMQなど)
- 高い拡張性と柔軟性
2. Celeryのインストールと基本設定
まず、Celeryをインストールしましょう。
pip install celery
次に、プロジェクトにCeleryを設定します。celery.py
ファイルを作成し、以下のコードを追加します:
from celery import Celery
app = Celery('myproject',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0',
include=['myproject.tasks'])
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
このコードでは、Redisをブローカーとバックエンドとして使用しています。
3. タスクの定義
タスクは、Celeryで非同期に実行される関数です。tasks.py
ファイルを作成し、以下のようにタスクを定義します:
from celery import shared_task
@shared_task
def add(x, y):
return x + y
@shared_task
デコレータを使用することで、この関数がCeleryタスクとして認識されます。
4. タスクの実行
タスクを実行するには、以下のようにします:
from myproject.tasks import add
result = add.delay(4, 4)
delay()
メソッドを使用すると、タスクが非同期で実行されます。
5. 結果の取得
タスクの結果を取得するには、以下のようにします:
result = add.delay(4, 4)
print(result.get()) # 8
get()
メソッドは、タスクが完了するまでブロックします。
6. 定期的なタスクのスケジューリング
Celeryを使用して、定期的にタスクを実行することができます。celery.py
に以下を追加します:
from celery.schedules import crontab
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'myproject.tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}
これにより、add
タスクが30秒ごとに実行されます。
7. ワーカーの起動
Celeryワーカーを起動するには、以下のコマンドを使用します:
celery -A myproject worker --loglevel=info
8. Celery Beatの使用
定期的なタスクを実行するには、Celery Beatを使用します:
celery -A myproject beat --loglevel=info
9. タスクの優先順位
タスクに優先順位を設定することができます:
@shared_task(priority=10)
def high_priority_task():
pass
数値が大きいほど、優先順位が高くなります。
10. エラーハンドリング
タスクでエラーが発生した場合、自動的に再試行するように設定できます:
@shared_task(bind=True, max_retries=3)
def my_task_with_retry(self):
try:
# タスクの処理
except Exception as exc:
raise self.retry(exc=exc, countdown=60)
11. タスクのチェーン
複数のタスクを連鎖的に実行することができます:
from celery import chain
result = chain(add.s(4, 4), add.s(8))()
print(result.get()) # 16
12. モニタリング
Celeryの動作状況をモニタリングするには、Flowerというツールが便利です:
pip install flower
celery -A myproject flower
これにより、Webインターフェースでタスクの状況を確認できます。
13. 並列処理
Celeryは複数のワーカーを使用して並列処理を行うことができます:
celery -A myproject worker --concurrency=4 --loglevel=info
この例では、4つの並列ワーカーを起動しています。
14. タスクのグループ化
複数のタスクをグループ化して実行することができます:
from celery import group
result = group(add.s(i, i) for i in range(10))()
print(result.get()) # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
15. セキュリティ対策
Celeryを本番環境で使用する際は、以下のセキュリティ対策を考慮してください:
- SSL/TLSを使用した通信の暗号化
- 認証機能の有効化
- タスクシリアライゼーションの制限
app.conf.task_serializer = 'json'
app.conf.accept_content = ['json']
以上が、Celeryの基本的な使い方と主要な機能の解説です。Celeryを使いこなすことで、Pythonアプリケーションのパフォーマンスと拡張性を大幅に向上させることができます。非同期処理やバックグラウンドタスクの実装に悩んでいる方は、ぜひCeleryの導入を検討してみてください。