はじめに
分散システムにおいて、タスクキューは非同期処理や負荷分散を実現する重要なコンポーネントです。Celeryは、Pythonでタスクキューを構築するための強力なライブラリであり、スケーラビリティと耐障害性を提供します。この記事では、Celeryの基本的なセットアップ、Flaskとの統合、タスクのスケジューリング、およびモニタリングについて解説します。実際のコード例として、大量のメール送信を処理するシステムを構築します。
Celeryの概要
Celeryは、非同期タスクの実行とスケジューリングを管理するための分散タスクキューシステムです。主なコンポーネントは以下の通りです:
- Worker:タスクを実行するプロセス。
- Broker:タスクをキューイングするメッセージブローカー(例:RabbitMQ、Redis)。
- Backend:タスクの結果を保存するデータストア(例:Redis、PostgreSQL)。
Celeryは、分散処理に適しており、Webアプリケーションやデータ処理パイプラインで広く使用されています。
Celeryのセットアップ
環境の準備
CeleryとRabbitMQ(ブローカー)をインストールします:
pip install celery[rabbitmq]
RabbitMQをローカルで実行(Dockerを使用):
docker run -d -p 5672:5672 rabbitmq:3
Celeryの基本構成
以下のコードで、Celeryインスタンスを設定します:
# celery_app.py
from celery import Celery
app = Celery(
'tasks',
broker='amqp://guest:guest@localhost:5672//',
backend='rpc://'
)
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Asia/Tokyo',
enable_utc=True,
)
この設定では、RabbitMQをブローカーとして使用し、JSON形式でタスクをシリアライズします。
Flaskとの統合
FlaskとCeleryを統合して、Webリクエストからタスクを非同期に実行します。以下の例は、メール送信タスクを処理するAPIです:
# app.py
from flask import Flask, jsonify
from celery_app import app as celery_app
app = Flask(__name__)
@celery_app.task
def send_email(recipient, subject, body):
# 実際のメール送信ロジック(例:smtplibを使用)
import time
time.sleep(2) # メール送信をシミュレート
return f"メールを {recipient} に送信しました: {subject}"
@app.route('/send-email', methods=['POST'])
def trigger_email():
recipient = 'user@example.com'
subject = 'テストメール'
body = 'これはCeleryを使ったテストメールです'
task = send_email.delay(recipient, subject, body)
return jsonify({'task_id': task.id})
if __name__ == '__main__':
app.run(debug=True)
Workerを起動:
celery -A celery_app worker --loglevel=info
このコードでは、FlaskエンドポイントからCeleryタスクを非同期に実行し、タスクIDを返します。
タスクのスケジューリング
Celery Beatの使用
Celery Beatを使用して、定期的タスクをスケジュールします。インストール:
pip install celery[redis]
スケジュール設定:
# celery_app.py(続き)
from celery.schedules import crontab
app.conf.beat_schedule = {
'send-daily-report': {
'task': 'celery_app.send_email',
'schedule': crontab(hour=9, minute=0), # 毎日9:00に実行
'args': ('admin@example.com', '日次レポート', '今日のレポートです'),
}
}
Celery Beatを起動:
celery -A celery_app beat --loglevel=info
これにより、毎日9時にメール送信タスクが自動実行されます。
モニタリングとスケーリング
Flowerを使用したモニタリング
Flowerは、Celeryのタスクをリアルタイムで監視するツールです。インストール:
pip install flower
起動:
celery -A celery_app flower --port=5555
ブラウザでhttp://localhost:5555
にアクセスすると、タスクの状態やWorkerの稼働状況を確認できます。
Autoscaling
CeleryのWorkerは、負荷に応じてスケールアウト可能です。例:DockerコンテナでWorkerを追加:
# Dockerfile
FROM python:3.11
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["celery", "-A", "celery_app", "worker", "--loglevel=info"]
Docker Composeで複数Workerを起動:
version: '3'
services:
worker:
build: .
deploy:
replicas: 4
rabbitmq:
image: rabbitmq:3
ports:
- "5672:5672"
これにより、4つのWorkerが並行してタスクを処理します。
実際の応用例
以下の例は、大量のメール送信を処理するCeleryタスクです:
# app.py(拡張版)
from flask import Flask, jsonify, request
from celery_app import app as celery_app
import time
app = Flask(__name__)
@celery_app.task
def send_bulk_email(recipients, subject, body):
results = []
for recipient in recipients:
time.sleep(1) # メール送信をシミュレート
results.append(f"メールを {recipient} に送信しました: {subject}")
return results
@app.route('/send-bulk-email', methods=['POST'])
def trigger_bulk_email():
recipients = [f"user{i}@example.com" for i in range(100)]
subject = 'キャンペーンメール'
body = '特別オファーです!'
task = send_bulk_email.delay(recipients, subject, body)
return jsonify({'task_id': task.id})
@app.route('/task-status/<task_id>')
def check_task_status(task_id):
task = send_bulk_email.AsyncResult(task_id)
if task.state == 'PENDING':
response = {'state': task.state, 'status': '処理中...'}
elif task.state == 'SUCCESS':
response = {'state': task.state, 'result': task.result}
else:
response = {'state': task.state, 'status': str(task.info)}
return jsonify(response)
if __name__ == '__main__':
app.run(debug=True)
このAPIは、100件のメール送信タスクを非同期に実行し、タスクの状態を監視できます。Flowerでタスクの進行状況を確認可能です。
注意点とベストプラクティス
- ブローカーの選択:RabbitMQは信頼性が高いが、Redisは軽量でセットアップが簡単。
- タスク設計:タスクは冪等性(同じタスクを複数回実行しても結果が変わらない)を保つ。
- エラー処理:タスク内で例外を捕捉し、Backendに結果を保存。
- スケーリング:Workerの数を負荷に応じて動的に調整。
まとめ
この記事では、Celeryを使ったタスクキューの構築方法を解説しました。Flaskとの統合、Celery Beatによるスケジューリング、Flowerでのモニタリングを通じて、分散システムの基本を実装しました。次回は、Daskを使った大規模データ処理の方法を紹介します。
この記事が役に立ったら、いいねやストックをお願いします!コメントで質問やフィードバックもお待ちしています!