以前の記事でも紹介していますが、AzureのEventHubs、StreamAnalyticsを利用することで簡単に大量のリクエストデータを捌き、リアルタイムにデータを登録することができます。この2サービスの利用にあたり事前に知っておいたほうがよさそうなポイントをまとめました。
Event Hubs
受信したメッセージは最大7日間の保存が可能ですが、これは意図した再処理や定常的に過去の集計処理を行う場合に利用することを目的としています。
StreamAnalyticsなどを利用してリアルタイムにデータ登録を行う場合は不要で、受信したメッセージを単純に保存しておきたい場合はCapture機能を利用します。
CaptureはEventHubsのストリームデータを監視し、定めた時間間隔またはデータ量に達した場合にAvro形式でStorageに格納されます。
※ただしストリームデータが無い場合も時間毎にファイルが作成されます。
格納ファイルは以下のような形式指定することで、日付ごとのフォルダに時間・パーティション単位のファイルが作成できます。
{Namespace}-{EventHub}/{Year}-{Month}-{Day}/{Hour}{Minute}{Second}-{PartitionId}
上記構成でスループットユニットを1として1ヶ月間稼働した場合のコストは約100ドル前後となります。
Stream Analytics
参照データの更新
マスタデータなどをStorageにjson形式で用意することで、参照データとしてストリームデータと紐付けて出力することができます。
SELECT
IEH.item_no,
SMJ.item_name,
dateadd(hour,(9),System.Timestamp) AS create_at
INTO
[outputsqldb] OSD
FROM
[inputeventhubs] IEH
JOIN
[storagemstjson] SMJ
ON
IEH.item_no = SMJ.item_no
ただし参照データを更新した際はStreamAnalyticsの再起動が必要となります。※REST APIを利用した起動/終了はこちら
エラー時のデータ特定と再処理
EventHubsと連携する想定で、受信メッセージが出力先の形式に変換できない場合、Data Conversion Errorsが発生します。この時、EventHubs内のどのメッセージがエラーとなったか特定することができません。
なので、データ内にはメッセージ毎に一意となる情報を持たせることで、Captureから特定して再送信することが実現できます。
エラーポリシーを再試行としている場合、エラー時は失敗したデータが蓄積され内部のメモリ使用量が増加していきます。この状態が続くと最終的にはStreamAnalyticsは再起動または停止となります。
EventHubs連携時のデータの流れについて、まず、StreamAnalyticsからEventHubsにデータ送信リクエストを送り、それに応える形で、 EventHubsがメッセージをStreamAnalyticsに送ります。これによりエラー発生時はEventhubsの「Incoming Requests(StreamAnalyticsからのデータ送信要求)」と「Outgoing Messages(要求に答えて送ったメッセージ数」の値も増加することになります。
SUの設定値
1SU(ストリーミング ユニット)はジョブのテストなどを行うための利用を目的としており、必要に応じて使用可能なメモリ量の約60%程度までシステム稼働の為に使用されます。
なので、データ量が一定でもメトリクス上で、メモリ量の変動が見られるケースがあります。公式のジョブに必要な数では6から始めて値を調整する方法を推奨しています。
降格済み
稼働中に「降格済み」といったアラートが表示される場合があります。StreamAnalyticsでは、いつのデータを読み取って出力したかの管理が非常に重要で、これを日時 (TIMESTAMPまたはTIMESTAMP BY) の情報を使用して実現しています。
例えばTIMESTAMP BYを指定しているにも関わらず、入力データに指定の列が含まれない場合、到着日時を代わりに本データのTIMESTAMPとします。これが降格するという状況です。
この影響としては出力先のデータに抜け落ちが発生する可能性があります。なのでTIMESTMP BYで指定する列は入力データに必ず含まれる列を指定する必要があります。