インラインコード
前置き
イベント駆動アーキテクチャ(EDA)に特化したオブザーバビリティの仕組みは、システムを「コンポーネント」として捉え、その「責務」と「契約」を監視するという点で、SOLID、GRASP、コンポーネント設計原則と非常に深く関連しています。
EDAのオブザーバビリティの仕組みを3つの主要なカテゴリ(トレーシング、メトリクス、DLQ)に分け、それぞれを要求された形式で詳細に解説します。
1. 分散トレーシング (Correlation ID / Trace Context)
メカニズム
EDAにおける「リクエストの流れ」を可視化する技術です。
ビジネスプロセス(例:「注文」)の開始時に一意のTrace IDを生成し、このIDをイベントメッセージのヘッダーに含めて伝搬させます。
各サービス(パブリッシャー、コンシューマー)は、このIDを読み書きし、自身の処理ログやスパン情報(開始/終了時間など)を、このTrace IDに関連付けてオブザーバビリティ基盤(Jaeger, OpenTelemetryなど)に送信します。
これにより、非同期の複数のサービスをまたがる処理を、単一のトレースとして時系列に表示できます。
それをイベントストーミングの成果物と対応付けるところまでやってください。
適用事例 (Java)
-
OrderServiceが注文を受け付けた際、Trace-ID: xyz-123を生成します。 -
Spring Cloud SleuthやMicrometer Tracingなどのライブラリを使い、このIDをKafkaメッセージのヘッダーに自動的に付与します。
-
PaymentService(コンシューマー)がメッセージを受け取ると、
ライブラリがヘッダーからTrace-ID: xyz-123を自動で抽出し、MDC (Mapped Diagnostic Context) に設定します。
これにより、PaymentServiceのlog.info()も同じTrace IDで記録され、一連の流れが可視化されます。
他に一緒に適用されることが多い原則集
GRASPのコントローラ
オブザーバビリティ基盤(Jaegerなど)が、システム全体のトレース情報を集約・制御するコントローラーとしての役割を果たします。
GRASPの間接化
メッセージブローカーという間接化されたコンポーネントを、トレーシングによって「透明化」し、その存在を前提とした観測が可能になります。
ADP (非循環依存関係の原則)
トレースの可視化は、イベントの依存関係グラフを描画することに他なりません。
これにより、A → B → C → A のような 循環依存(ADP違反) を視覚的に検知できます。
注意点
・トレースコンテキストの伝搬は、エコシステム全体で標準化(例:W3C Trace Context)する必要があります。
・メッセージブローカー自体がトレーシングに対応しているか(Kafka, RabbitMQのヘッダーサポートなど)の確認が不可欠です。
・サンプリングレートの調整が必要です。
全てのトレースを収集するとコストが膨大になるため、本番環境では一定の割合(例:10%)でサンプリングするのが一般的です。
アンチパターン
Trace IDの再生成
コンシューマーがメッセージヘッダーのTrace IDを無視し、新しいTrace IDを生成してしまうと、そこでトレースが分断され、因果関係が追えなくなります。
ペイロードへのTrace ID混入
Trace IDは処理のメタデータであり、メッセージのヘッダーで扱うべきです。
ビジネスデータであるペイロードに含めると、ドメインロジックがオブザーバビリティという技術的関心事に汚染されます(SRP/CCP違反)。
2. キュー中心のメトリクス監視 (Queue-centric Metrics)
📈 メカニズム
EDAの「血液」であるメッセージブローカー(キュー)の状態を直接監視します。
コンシューマーやパブリッシャーのCPU/メモリ(個々の臓器)を見るだけでなく、その間を流れる「血液」の状態を監視することが重要です。
その際の、主要なメトリクスは以下の通りです。
Queue Depth / Lag (キューの深さ)
キューに溜まっている未処理メッセージの数。
Throughput (流入/流出率)
単位時間あたりに発行されるメッセージ数と、処理されるメッセージ数。
Message Age (メッセージ滞留時間)
キュー内の最も古いメッセージが、どれだけの時間処理されていないか。
適用事例 (Java)
Micrometerなどのライブラリを使い、
KafkaConsumerのrecords-lag-max(最大ラグ)や、RabbitMQのmessages_ready(処理可能なメッセージ数)をPrometheusにエクスポートします。
Grafanaダッシュボードでこれを可視化し、
・「キューの深さが5分以上、1000件を超え続けている」
・「流入率(Inflow)に対して流出率(Outflow)が著しく低い」
といった場合にアラートを発報します。
他に一緒に適用されることが多い原則集
SOLIDのS (単一責任の原則)
キューの責任は「メッセージを安定して転送すること」です。
このメトリクス監視は、その責任が果たされているか(詰まっていないか)を検証する最も直接的な手段です。
SDP (安定依存の原則)
メッセージブローカーは、システム全体で最も安定した(変更頻度が低い)コンポーネントであるべきです。
つまり、不安定なコンシューマー(B)が安定したブローカー(Broker)に依存します。
キュー監視は、不安定なコンポーネントが安定したコンポーネントに与える影響(=キューの詰まり)を可視化します。
注意点
・キューの深さだけでは、障害の原因(コンシューマーのクラッシュか、ポイズンピルか、単なる高負荷か)は特定できません。
なので、他のメトリクスやログと組み合わせる必要があります。
・平均値の罠に注意が必要です。
平均ラグは低くても、特定のパーティションだけが滞留している場合があります。
最大ラグ (Max Lag) を監視することが重要です。
アンチパターン
コンシューマー側のみの監視
コンシューマーのCPUやメモリが正常であることだけを確認し、キュー自体を監視しないこと。
ポイズンピルによる無限リトライループが発生している場合、コンシューマーは見た目上「正常に」動作しているように見えてしまいます。
ラグの無視
スループット(流入/流出率)だけを見て安心すること。
スループットが一致していても、処理に時間がかかり、ラグ(遅延)が徐々に蓄積している可能性があります。
3. デッドレターキュー (DLQ) とセマンティック監視
☣️ メカニズム
処理に失敗したメッセージ、特に「ポイズンピル」(処理不能なメッセージ)を隔離し、検知する仕組みです。
コンシューマーは、処理に数回(例: 3回)失敗したメッセージを、Dead Letter Queue (DLQ) という専用の隔離キューに自動的に転送します。
さらに、ACL(腐敗防止層)を設け、スキーマ上は正しくてもビジネスルール上では不正な(例:price: -100)イベントを「セマンティック(意味的)なエラー」として検知し、能動的にDLQへ送ります。
適用事例 (Java)
Spring Cloud StreamやAkka Alpakka Kafkaは、リトライ回数を使い切ったメッセージを自動的にDLQトピックに転送する機能を持っています。
また、コンシューマーのロジック内で、try-catchブロックを使い、特定のビジネス例外(例: IllegalArgumentExceptionでpriceが負である場合)を捕捉し、手動でDLQに送信する処理を実装します。
他に一緒に適用されることが多い原則集
D (依存性逆転の原則) / GRASPの変動からの保護
DLQとACL(腐敗防止層)は、コンシューマーがパブリッシャーの具象(バグを含む実装)に依存せず、抽象(期待される契約)に依存するための防衛策です。
パブリッシャー側の変更やバグ(不安定な変化)から、コンシューマー(安定)を保護します。
CCP (閉鎖性共通の原則)
・あるイベントを処理するロジック
・そのイベントのバリデーションロジック
・DLQへの転送ロジック
これらは、同じ理由(イベント仕様の変更)で共に変更されるため、同じコンシューマーコンポーネント内に高凝集にまとめるべきです。
注意点
・DLQは「ゴミ箱」ではありません。
DLQにメッセージが1件でも入ったら、即座にP1(最優先)アラートを発報し、人間が原因を調査する仕組みが必須です。
・一時的な障害(DB接続断など)でリトライすべきエラーと、永久に処理不能なエラー(ポイズンピル)を区別するエラーハンドリングが重要です。
アンチパターン
無限リトライ (DLQなし)
DLQを設定せず、コンシューマーが失敗したメッセージを無限にリトライし続けること。
これがEDAにおける最悪の障害パターンであり、ポイズンピルがキューの先頭をブロックし、システム全体が停止します。
例外の飲み込み (Exception Swallowing)
try { ... } catch (Exception e) { /* ログだけ吐いて握りつぶす */ } のように、処理できないイベントを検知してもエラーにせず、単に握りつぶしてACK(処理済み)にしてしまうこと。
これにより、イベントが静かに失われ、深刻なデータ不整合を引き起こします。
