0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Pythonによる分散システム構築ガイド | 第2回:Celeryによるタスクキューの構築

Posted at

はじめに

分散システムにおいて、タスクキューは非同期処理や負荷分散を実現する重要なコンポーネントです。Celeryは、Pythonでタスクキューを構築するための強力なライブラリであり、スケーラビリティ耐障害性を提供します。この記事では、Celeryの基本的なセットアップ、Flaskとの統合、タスクのスケジューリング、およびモニタリングについて解説します。実際のコード例として、大量のメール送信を処理するシステムを構築します。

Celeryの概要

Celeryは、非同期タスクの実行とスケジューリングを管理するための分散タスクキューシステムです。主なコンポーネントは以下の通りです:

  • Worker:タスクを実行するプロセス。
  • Broker:タスクをキューイングするメッセージブローカー(例:RabbitMQRedis)。
  • Backend:タスクの結果を保存するデータストア(例:RedisPostgreSQL)。

Celeryは、分散処理に適しており、Webアプリケーションやデータ処理パイプラインで広く使用されています。

Celeryのセットアップ

環境の準備

CeleryRabbitMQ(ブローカー)をインストールします:

pip install celery[rabbitmq]

RabbitMQをローカルで実行(Dockerを使用):

docker run -d -p 5672:5672 rabbitmq:3

Celeryの基本構成

以下のコードで、Celeryインスタンスを設定します:

# celery_app.py
from celery import Celery

app = Celery(
    'tasks',
    broker='amqp://guest:guest@localhost:5672//',
    backend='rpc://'
)

app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='Asia/Tokyo',
    enable_utc=True,
)

この設定では、RabbitMQをブローカーとして使用し、JSON形式でタスクをシリアライズします。

Flaskとの統合

FlaskCeleryを統合して、Webリクエストからタスクを非同期に実行します。以下の例は、メール送信タスクを処理するAPIです:

# app.py
from flask import Flask, jsonify
from celery_app import app as celery_app

app = Flask(__name__)

@celery_app.task
def send_email(recipient, subject, body):
    # 実際のメール送信ロジック(例:smtplibを使用)
    import time
    time.sleep(2)  # メール送信をシミュレート
    return f"メールを {recipient} に送信しました: {subject}"

@app.route('/send-email', methods=['POST'])
def trigger_email():
    recipient = 'user@example.com'
    subject = 'テストメール'
    body = 'これはCeleryを使ったテストメールです'
    task = send_email.delay(recipient, subject, body)
    return jsonify({'task_id': task.id})

if __name__ == '__main__':
    app.run(debug=True)

Workerを起動:

celery -A celery_app worker --loglevel=info

このコードでは、FlaskエンドポイントからCeleryタスクを非同期に実行し、タスクIDを返します。

タスクのスケジューリング

Celery Beatの使用

Celery Beatを使用して、定期的タスクをスケジュールします。インストール:

pip install celery[redis]

スケジュール設定:

# celery_app.py(続き)
from celery.schedules import crontab

app.conf.beat_schedule = {
    'send-daily-report': {
        'task': 'celery_app.send_email',
        'schedule': crontab(hour=9, minute=0),  # 毎日9:00に実行
        'args': ('admin@example.com', '日次レポート', '今日のレポートです'),
    }
}

Celery Beatを起動:

celery -A celery_app beat --loglevel=info

これにより、毎日9時にメール送信タスクが自動実行されます。

モニタリングとスケーリング

Flowerを使用したモニタリング

Flowerは、Celeryのタスクをリアルタイムで監視するツールです。インストール:

pip install flower

起動:

celery -A celery_app flower --port=5555

ブラウザでhttp://localhost:5555にアクセスすると、タスクの状態やWorkerの稼働状況を確認できます。

Autoscaling

CeleryWorkerは、負荷に応じてスケールアウト可能です。例:DockerコンテナでWorkerを追加:

# Dockerfile
FROM python:3.11
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["celery", "-A", "celery_app", "worker", "--loglevel=info"]

Docker Composeで複数Workerを起動:

version: '3'
services:
  worker:
    build: .
    deploy:
      replicas: 4
  rabbitmq:
    image: rabbitmq:3
    ports:
      - "5672:5672"

これにより、4つのWorkerが並行してタスクを処理します。

実際の応用例

以下の例は、大量のメール送信を処理するCeleryタスクです:

# app.py(拡張版)
from flask import Flask, jsonify, request
from celery_app import app as celery_app
import time

app = Flask(__name__)

@celery_app.task
def send_bulk_email(recipients, subject, body):
    results = []
    for recipient in recipients:
        time.sleep(1)  # メール送信をシミュレート
        results.append(f"メールを {recipient} に送信しました: {subject}")
    return results

@app.route('/send-bulk-email', methods=['POST'])
def trigger_bulk_email():
    recipients = [f"user{i}@example.com" for i in range(100)]
    subject = 'キャンペーンメール'
    body = '特別オファーです!'
    task = send_bulk_email.delay(recipients, subject, body)
    return jsonify({'task_id': task.id})

@app.route('/task-status/<task_id>')
def check_task_status(task_id):
    task = send_bulk_email.AsyncResult(task_id)
    if task.state == 'PENDING':
        response = {'state': task.state, 'status': '処理中...'}
    elif task.state == 'SUCCESS':
        response = {'state': task.state, 'result': task.result}
    else:
        response = {'state': task.state, 'status': str(task.info)}
    return jsonify(response)

if __name__ == '__main__':
    app.run(debug=True)

このAPIは、100件のメール送信タスクを非同期に実行し、タスクの状態を監視できます。Flowerでタスクの進行状況を確認可能です。

注意点とベストプラクティス

  • ブローカーの選択RabbitMQは信頼性が高いが、Redisは軽量でセットアップが簡単。
  • タスク設計:タスクは冪等性(同じタスクを複数回実行しても結果が変わらない)を保つ。
  • エラー処理:タスク内で例外を捕捉し、Backendに結果を保存。
  • スケーリングWorkerの数を負荷に応じて動的に調整。

まとめ

この記事では、Celeryを使ったタスクキューの構築方法を解説しました。Flaskとの統合、Celery Beatによるスケジューリング、Flowerでのモニタリングを通じて、分散システムの基本を実装しました。次回は、Daskを使った大規模データ処理の方法を紹介します。


この記事が役に立ったら、いいねストックをお願いします!コメントで質問やフィードバックもお待ちしています!

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?