はじめに
リアルタイムに大量のデータを受け取って処理できる Azure Event Hubs を触る機会がありました。
Azure Event Hubs で受け取ったメッセージを取得してその情報を基に処理するアプリケーションを作りましたが、すべてのメッセージを取得して処理しているかチェックする術がないかなーと思いました。アプリケーションが出力するログをチェックするのはとてもシンドイので、もっとカンタンな方法はないものかと。
Azure Event Hubs とは
Azure Event Hubs を扱う際は「用語」と「機能」を理解しておく必要があります。理解できていないといろいろ悩んでしまうことがありますので、まずは以下のドキュメントをしっかり読み込みましょう。
んー...ムズカシイデス。こんな時は先人たちのチカラを借りましょう。
私は以下のブログと上のドキュメントを行ったり来たり繰り返し読んで、やっと理解しました。ありがとう!
カンタンにチェックするには
Azure Monitor でメトリックやログでチェックできると良いですね。
ということで、以下のドキュメントでどのような監視データがあるのか確認します。
(英語のほうも見てみる)
メッセージのメトリックの 「受信メッセージ (Incoming Messages)」 と 「送信メッセージ (Outgoing Messages)」 が良さげです。これらは、以下のような情報が収集されます。
メトリック名 | 説明 |
---|---|
受信メッセージ (Incoming Messages) |
指定された期間に Event Hubs に受信したイベントまたはメッセージの数。 |
送信メッセージ (Outgoing Messages) |
指定した期間に Event Hubs から送信されたイベントまたはメッセージの数。 |
Azure Event Hubs の診断ログを設定
それでは、Azure Monitor にログとメトリックをストリーミングするために、診断ログを設定します。
Azure ポータルで、Azure Event Hubs 名前空間リソースの [監視] カテゴリの [診断設定] を開き、「メトリック」の [AllMetrics] を選択し、「宛先の詳細」で [Log Analytics ワークスペースへの送信] を選択して [Log Analytics ワークスペース] を指定します。そして、[保存] します。
メトリックログを検索
先ほど指定した [AllMetrics] は「AzureMetrics」テーブルに収集されます。
以下のようなクエリで、Azure Event Hubs の「受信メッセージ」と「送信メッセージ」を確認します。
AzureMetrics
| where MetricName == "OutgoingMessages" or MetricName == "IncomingMessages"
| project TimeGenerated, Resource, ResourceProvider, MetricName, Total
| sort by TimeGenerated desc nulls last
結果は、以下のように出力されます。
受信したメッセージ分、送信されていることがわかります。
ただ、この結果でチェックするには、受信メッセージ、送信メッセージそれぞれの数を合計して比較しなければならないので、まだまだメンドクサイです。
以上のことを踏まえて、チェックするためのクエリをもっと工夫が必要そうです。
送受信メッセージの数を比較するためのクエリ
ここでは、指定した期間内の受信メッセージ数の合計と送信メッセージ数の合計を比較する、以下のクエリを用意しました。
AzureMetrics
| where MetricName == "OutgoingMessages" or MetricName == "IncomingMessages"
| extend Total_Incoming_Messages = iif(MetricName == "IncomingMessages", Total, 0.00)
| extend Total_Outgoing_Messages = iif(MetricName == "OutgoingMessages", Total, 0.00)
| summarize sum(Total_Incoming_Messages), sum(Total_Outgoing_Messages)
| extend delta_messages = sum_Total_Incoming_Messages - sum_Total_Outgoing_Messages
クエリの解説
以下より、各行を解説します。
※1 ~ 2 行目は省略
| extend Total_Incoming_Messages = iif(MetricName == "IncomingMessages", Total, 0.00)
| extend Total_Outgoing_Messages = iif(MetricName == "OutgoingMessages", Total, 0.00)
Total_Incoming_Messages
(受信メッセージの件数) に、MetricName
が "IncomingMessages"
の場合は Total
の値を、そうでない場合は 0
をセットします。
(上記と同様に) Total_Outgoing_Messages
(送信メッセージの件数) に、MetricName
が "OutgoingMessages"
の場合は Total
の値を、そうでない場合は 0
をセットします。
| summarize sum(Total_Incoming_Messages), sum(Total_Outgoing_Messages)
Total_Incoming_Messages
列、Total_Outgoing_Messages
列、それぞれの合計を算出します。
| extend delta_messages = sum_Total_Incoming_Messages - sum_Total_Outgoing_Messages
最後に、Total_Incoming_Messages
列の合計から Total_Outgoing_Messages
列の合計を引いた結果を delta_messages
列にセットします。
期間を指定してこのクエリを実行すると、以下のような結果となります。
これで、アプリケーションが Azure Event Hubs で受け取ったメッセージすべて取得していることがわかりますね。
ただし注意点がありました
前述のメトリックログを検索するクエリの結果を見てみると、下図のようにたまに送信メッセージの件数が大きいログが検出されることがあります。アプリケーションが出力するログをチェックしてもこのような件数を処理した形跡もありません。
さて、これはなんでしょう?
そもそも、Azure Event Hubs からメッセージを取得する際、コンシューマーグループを介して配信されます。コンシューマーグループは、Azure Event Hubs インスタンスのパーティション内の古いメッセージから順に読み取り、配信します。
Azure Event Hubs からメッセージを取得するアプリケーションを起動するたびにこのような挙動されると困るので、メッセージを取得する側はどこまで読み取ったか記録するためオフセット値 (パーティション内の位置を示す値) を保持します。これは、アプリケーションが利用する以下のライブラリ (クライアント SDK) が担ってくれています。
これにより、アプリケーションを再起動しても読み取り済み (処理済み) のメッセージを再び処理することなく、処理を再開することができます。
さて、話を戻そう。
上図の 「送信メッセージの件数が大きいログ」 は何かというと、「コンシューマーグループは、Azure Event Hubs インスタンスのパーティション内の古いメッセージから順に読み取り、配信」するので、アプリケーションが利用する ライブラリが保持しているオフセット値まで読み飛ばしている (加えて、オフセット値後に新たに取得したメッセージ) 分 が「送信メッセージ」としてカウントされたものと思われます。
些細なことの発端から、いろいろ考えたり、調べたりすることによって、より多くの気づきや発見がありました。面白い!