0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Pythonによる分散システム構築ガイド | 第6回:分散システムのエラー処理と耐障害性

Posted at

はじめに

分散システムでは、ネットワーク障害ノードの失敗データの一貫性の問題など、さまざまなエラーが発生します。これらの課題に対処し、耐障害性信頼性を確保することは、システムの成功に不可欠です。この記事では、エラー処理のためのパターン(リトライサーキットブレーカー)、ロギングモニタリングのベストプラクティス、そしてPythonを使った実装を解説します。実際のコード例として、CelerygRPCを使用した耐障害性の高いシステムを構築します。

分散システムのエラーの種類

分散システムでよく見られるエラーには以下が含まれます:

  • ネットワーク障害:通信の遅延や切断。
  • ノードの失敗:サーバーのクラッシュやリソース不足。
  • データの一貫性:異なるノード間でデータが同期しない問題。
  • タイムアウト:リクエストが規定時間内に完了しない。

これらに対処するため、リトライサーキットブレーカーフォールバックなどのパターンを活用します。

リトライパターンの実装

tenacityを使用したリトライ

tenacityライブラリは、リトライロジックを簡潔に実装するためのツールです。インストール:

pip install tenacity

以下の例では、gRPCリクエストにリトライを実装します:

from tenacity import retry, stop_after_attempt, wait_exponential
import grpc
import compute_pb2
import compute_pb2_grpc

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def make_grpc_request(number):
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = compute_pb2_grpc.ComputeServiceStub(channel)
        response = stub.ComputeSquare(compute_pb2.ComputeRequest(number=number))
        return response.result

try:
    result = make_grpc_request(5)
    print(f"結果: {result}")
except grpc.RpcError as e:
    print(f"エラー: {e}")

出力例(成功時):

結果: 25

tenacityは、最大3回のリトライを試み、指数バックオフ(2秒から10秒の待機時間)で再試行します。

サーキットブレーカーの実装

pybreakerを使用したサーキットブレーカー

サーキットブレーカーは、連続するエラーを検知し、一時的にリクエストを停止することでシステムを保護します。インストール:

pip install pybreaker

以下の例では、Celeryタスクにサーキットブレーカーを適用します:

from pybreaker import CircuitBreaker
from celery_app import app as celery_app

# サーキットブレーカーの設定
breaker = CircuitBreaker(fail_max=5, reset_timeout=60)

@breaker
@celery_app.task
def send_email(recipient, subject, body):
    # メール送信をシミュレート(失敗する場合を想定)
    import random
    if random.random() < 0.7:  # 70%の確率で失敗
        raise ValueError("メール送信に失敗")
    return f"メールを {recipient} に送信しました: {subject}"

try:
    task = send_email.delay('user@example.com', 'テスト', 'こんにちは')
    result = task.get(timeout=10)
    print(result)
except CircuitBreakerError:
    print("サーキットブレーカーがオープン状態です")
except Exception as e:
    print(f"エラー: {e}")

出力例(失敗時):

サーキットブレーカーがオープン状態です

サーキットブレーカーは、5回連続の失敗で回路を「オープン」にし、60秒間リクエストをブロックします。

ロギングとモニタリング

構造化ロギング

structlogを使用した構造化ロギングで、エラー情報を整理します。インストール:

pip install structlog

例:

import structlog
import logging

# ロギング設定
structlog.configure(
    processors=[structlog.processors.JSONRenderer()],
    context_class=dict,
    logger_factory=structlog.stdlib.LoggerFactory(),
)
logger = structlog.get_logger()

def process_task(data):
    logger.info("タスク開始", data_id=data['id'])
    try:
        result = data['value'] ** 2
        logger.info("タスク成功", result=result)
        return result
    except Exception as e:
        logger.error("タスク失敗", error=str(e))
        raise

data = {'id': 1, 'value': 5}
process_task(data)

出力例(JSON形式):

{"event": "タスク開始", "data_id": 1}
{"event": "タスク成功", "result": 25}

structlogは、モニタリングツール(例:Elasticsearch)との統合に適しています。

PrometheusとGrafanaによるモニタリング

PrometheusGrafanaでタスクのメトリクスを監視します。インストール:

pip install prometheus_client

例:Celeryタスクのメトリクスを収集:

from prometheus_client import Counter, start_http_server
from celery_app import app as celery_app

# メトリクス定義
task_success = Counter('task_success_total', 'Successful tasks')
task_failure = Counter('task_failure_total', 'Failed tasks')

@celery_app.task
def process_data(data):
    try:
        result = data ** 2
        task_success.inc()
        return result
    except Exception as e:
        task_failure.inc()
        raise

# Prometheusサーバー起動
start_http_server(8000)

# タスク実行
process_data.delay(5)

Prometheushttp://localhost:8000で確認し、Grafanaで可視化します。

実際の応用例

以下の例では、CelerygRPCを組み合わせ、リトライサーキットブレーカーを適用したシステムを構築します:

# app.py
from flask import Flask, jsonify
from celery_app import app as celery_app
from pybreaker import CircuitBreaker
from tenacity import retry, stop_after_attempt, wait_exponential
import grpc
import compute_pb2
import compute_pb2_grpc
import structlog

app = Flask(__name__)
breaker = CircuitBreaker(fail_max=5, reset_timeout=60)
logger = structlog.get_logger()

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def call_grpc_service(number):
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = compute_pb2_grpc.ComputeServiceStub(channel)
        response = stub.ComputeSquare(compute_pb2.ComputeRequest(number=number))
        return response.result

@celery_app.task
@breaker
def process_task(number):
    logger.info("タスク開始", number=number)
    try:
        result = call_grpc_service(number)
        logger.info("タスク成功", result=result)
        return result
    except Exception as e:
        logger.error("タスク失敗", error=str(e))
        raise

@app.route('/compute/<int:number>')
def trigger_task(number):
    task = process_task.delay(number)
    return jsonify({'task_id': task.id})

if __name__ == '__main__':
    app.run(debug=True)

このシステムは、gRPCで計算を処理し、Celeryで非同期実行、リトライサーキットブレーカー耐障害性を確保します。

注意点とベストプラクティス

  • リトライ設定:過剰なリトライはリソースを浪費するため、適切な上限を設定。
  • サーキットブレーカー:オープン状態の監視とリカバリ戦略を明確化。
  • ロギング:構造化ログを使用して、モニタリングツールとの統合を簡素化。
  • メトリクスPrometheusでレイテンシやエラーレートを追跡。

まとめ

この記事では、分散システムエラー処理耐障害性を高める方法を解説しました。tenacitypybreakerstructlogPrometheusを活用し、信頼性の高いシステムを構築しました。次回は、クラウドKubernetesAWS ECS)での分散システムのデプロイを紹介します。


この記事が役に立ったら、いいねストックをお願いします!コメントで質問やフィードバックもお待ちしています!

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?