Azure
AzureFunctions
EventHub
AzureSearch
ServiceBus

Azure Service Bus から Azure Search へのデータ投入の間に Event Hub を挟んでうまくいった話

More than 1 year has passed since last update.


TL;DR

今の仕事ではサービス間の連携に Service Bus を使用し Azure Functinos でデータ加工を行って各種ストレージに投入を行っています。その中には Azure Search のインデックスもありました。

Service Bus に投入したメッセージは1件単位で並列で Azure Functions に連携されるので、 Azure Search には1件ずつ並列でインデックスの更新が行われます。

Azure Search は更新処理が遅く、また並列で更新を行うと結構な頻度でエラーになることがあったため、間に Event Hub を挟んでメッセージを一旦蓄積し、並列度を抑えつつバッチ更新で更新処理全体の速度を改善できました。

おおむね2時間ほどかかる処理が1時間未満で終わり、Azure Search のデータ投入時の絵エラーが一度も発生しなくなりました。

こういう感じです。


before

Service Bus から並列で Functionが1件ずつメッセージを取得し Azure Search へ登録する

                          +-----------+      +---------+

+-----> | Function 1+----> | |
| +-----------+ | |
| | Azure |
+-----------+ | +-----------+ | Search |
| ServiceBus| +---------> | Function 2+----> | |
+-----------+ | +-----------+ | |
| | |
| +-----------+ | |
+-----> | Function n+----> | |
+-----------+ +---------+


after

Service Bus のメッセージを Event Hub に一度バッファし、 Import Function がバッチ更新する

                          +-----------+                                           +---------+

+-----> | Function 1+--+ | |
| +-----------+ | | |
| | +----------+ +-----------+ | Azure |
+-----------+ | +-----------+ | | | | Import | | Search |
| ServiceBus| +---------> | Function 2+------> | EventHub +---> | Function +---> | |
+-----------+ | +-----------+ | | | | | | |
| | +----------+ +-----------+ | |
| +-----------+ | | |
+-----> | Function n+--+ | |
+-----------+ +---------+


なぜ Event Hub か

ここ にある通り、 Event Hub トリガーではメッセージは受信側の Function (上図の Import Function に該当) で Event Bus に蓄積されたメッセージを配列で Functions の引数に渡すことができるので、1回の関数の実行で複数のメッセージを扱えます。

Service Bus トリガーでは同様の動作はできませんでした。

さらに host.json の設定によりメッセージを受け取る数を変更可能です。

次の設定を行うと、最大で1000件のメッセージを Event Hub から取得します。

    "eventHub": {

"maxBatchSize": 1000
}

Azure Search のインデックス更新は最大1000ドキュメントを一度に更新できるため、

バッチの取得サイズを1000件以上に設定できることがパフォーマンスの観点で重要でした。

またタイマートリガーと異なり、メッセージが投入されたらすぐに Function が実行が始まるので、そこそこの即時反映が可能です。一方でメッセージの投入が大量にあった場合でもバッチの取得サイズの上限を設定しているので、効率よくメッセージを順次バッチ更新していきます。

そのため、メッセージが少量でも大量でも準リアルタイムくらいの速度でインデックス更新ができます。


プログラムサンプル

歴史的な経緯から、Azure Functions は Javascript で実装しています。


Event Hub への出力

Service Bus から Event Hub への投入は、トリガーやバインディングを使うと簡単です。

まずは function.json

{

"disabled": false,
"bindings": [
{
"name": "mySbMsg",
"type": "serviceBusTrigger",
"direction": "in",
"topicName": "sample-topic",
"subscriptionName": "sample",
"connection": ""
},
{
"type": "eventHub",
"name": "hub",
"direction": "out",
"path": "buffer",
"connection": "EVENT_HUB"
}
]
}

これで、 Service Bus の sample-topic に入ったメッセージが、 function のmySbMsg 変数にバインドされて実行が始まり、 context.bindings.hub に値を設定するとその値が eventHub の buffer パスに出力されるようになります。

続いて関数本体です。

関数の実装は mySbMsg から Azure Search に登録可能な形式にデータ加工して、リトライ回数などと一緒に hub に設定するようにしておきます。

module.exports = function(context, mySbMsg) {

let data = /* mySbMsg を加工 */;
// リトライ回数、\登録対象データをHub に書き込む
context.bindings.hub = {retry:0, data:data}
context.done();
};


Azure Search への登録

Azure Search の登録はデータ不備や一時的なサービスダウンにより失敗することもあり得るため

最終的には手動による補償を行うとしても、何回かはリトライさせたいところです。

そのため今回は、登録に失敗したメッセージはリトライカウントをカウントアップし、もう一度同じメッセージを Event Hub に書き込むようにしてみました。

というわけで、 function.json には in, out 両方に同一の Event Hub の定義があります。

{

"disabled": false,
"bindings": [
{
"type": "eventHubTrigger",
"name": "messages",
"direction": "in",
"path": "buffer",
"connection": "EVENT_HUB",
"consumerGroup": "$Default",
"cardinality ": "many"
},
{
"type": "eventHub",
"name": "retryMessages",
"direction": "out",
"path": "buffer",
"connection": "EVENT_HUB"
}
]
}

messages には前段の function から登録されたメッセージが複数件配列として入ってきます。

Azure Search へのバインディングは残念ながらないため、 ライブラリ などを使って登録用のコードを記述します。

長くなるので要点だけ記述しておきます。

const _ = require("lodash");

const AzureSearch = require('azure-search');

module.exports = function(context, messages) {
let client = AzureSearch({
url: "xxx",
key: "xxx"});
client.uploadDocuments("hogeIndex", messages.map(doc => doc.data), (error, result) => {
if (error) retry(error, context, messages)
else context.done();
// TODO 実際には部分的に失敗した場合のメッセージのリトライも行う
});
}

// リトライ回数が5回未満のメッセージのみ、リトライ回数を加算して Event Hub に再出力
function retry(error, context, messages) {
context.log.error(error);
context.bindings.retryMessages = messages.filter(m => m.retry < 5).map(m => {m.retry += 1; return m;}
// TODO リトライ回数超過のメッセージは、個別にログ出力し、手動で補償する。
}

バインディングが無いと面倒ですね。。

TODO にも記載しましたが、 Azure Search のドキュメント更新は部分的に成功・失敗することもあります。

完全に登録を成功させるには部分的に失敗したドキュメントもリトライして更新する必要があります。

上記コードでは省略していますが、実際には同じ関数中で部分更新に失敗したドキュメントを再登録しています。

というわけで、作ってしまえばそんなに難しいことはなくバッチ更新が実現できました。

Event Hub の用途は大量のストリーミングデータが通常かもしれませんが、システム内部のバッファでも使えるよという例でした。


その他検討した構成

Service Bus + Event Hub という構成にたどり着くまでに色々試行錯誤したので、その顛末の参考までに書いておきます。


全部 Event Hub にする

前段の Service Bus いるの? 全部 Event Hub にしたらいいじゃんと思うかもしれません。

最初から Event Hub 前提で設計していたらそれもありだったかもしれませんが、

Service Bus の次の機能に依存しているため、今更全面的に Event Hub には移行できませんでした。


  • メッセージリトライ機能によるリトライ処理実装の単純化

  • Dead Letter Queure による処理不能メッセージの監視と検知

  • Topic-subscribe による一対多配信

実際のところ、 Event Hub でリトライを行うのは相当に力技になってしまいますので、サービス間連携の要として Service Bus を外すことはできませんでした。


Indexer を使う

Azure Search には Indexer という仕組みがあり、Blobストレージ、SQL Database、 Cosmos DB などのストレージから手動またはタイマーで取り込みを行えます。

準リアルタイムでの更新要件がなければこれが第一候補だったと思います。

ただし、 Azure Search は配列型として定義可能な要素が文字列のみという極めて強いスキーマ上の制約があります。

https://docs.microsoft.com/ja-jp/azure/search/search-howto-complex-data-types

実際、配列型のデータを複数扱う場合、 Indexer の単純なマッピングでは対処できず加工用のプログラムが必要になることがあります。

また、Indexer を手動でキックする場合はどのタイミングでキックするか、タイミングがつかめないということもあり今回は見送りました。


Cosmos DB change feed

Cosmos DB には登録・更新されたデータをストリーミング的に取得できる change feed という仕組みがあります。また change feed trigger として Azure Functions のトリガー・入力としても使えます。

https://docs.microsoft.com/ja-jp/azure/cosmos-db/change-feed

change feed トリガーの入力は、配列型になるため Event Hub トリガーと同じようにバッチ更新ができます。

Cosmos DB と Azure Search 両方にデータ連携を行うなら良い方法だと思いました。

ただし、今回のケースでは Azure Search のみにデータ連携を行う場合もありました。

その場合に、わざわざ Cosmos DB に Azure Search 連携用のためのコレクションや RU を別途確保するくらいなら Event Hub を用意したほうが多分安いです。

大量データの連携に備えるための措置となると、そこそこの RU を設定しておかないといけないためです。

また、change feed trigger から入力で入ってくる件数は最大 100 件で固定されていて現状これを変更する手段がないこともネックでした。 Azure Search へのバッチ更新を検討するなら 1000 件以上にできないと効率的になりません。


Durable Functions

Durable Functions を使うと、関数の状態を保存したり、複数の関数の実行を取りまとめることができます。

https://docs.microsoft.com/ja-jp/azure/azure-functions/durable-functions-overview

最初、これを使えば Service Bus のメッセージを蓄積しある程度の件数が貯まったら、Azure Search に書き込むことができるのではと想定していました。

実際に、メッセージの蓄積とバッチ出力を行うようなサンプルを作っては見たのですが、明らかに Durable Functions の使途と今回の要件が違っていることがわかりました。

Durable Functions は長期的なワークフロー・トランザクションを サーバレスで実現する仕組みです。

例えば、ユーザーを仮登録する・本登録確認用の認証コードを送る・認証コードを確認し本登録する、というような状態が絡んだ時間をまたぐ複数の処理を上手く扱いたい場合に真価を発揮します。

状態を永続化するために、Blob Storage や Queue を使うので遅延が発生します。そのためリアルタイム性が重視される処理には向いていません。

また、 Azure Functions ランタイムがバージョン 2 になってしまうのと、言語が現状 C# のみというのも採用しなかった理由になります。

というわけで、 Event Hub にたどり着くまでの間に色々と Azure Functions に詳しくなったので無駄な道のりではなかったと信じたい。

実際、 Azure Functions の入出力バインディングはとてもよくできていると思いました。


余談

Azure Search の S1インスタンスは 数十くらいの同時接続を行うと、結構な頻度でスロットル (503エラー) になってしまっていましたが、S2 インスタンスに上げるとそれも起きなくなりました。

ひょっとすると、 S2 インスタンスで最初からやっていれば、 Event Bus を入れなくてもどうにかなったのかもしれない、とふと思うことがあります。

ですが、 Azure Search はガンガン更新するようなものでもないので、今回の対策を入れてよかったとは思っています。

この対策を入れても Azure Search が不安定になるようなら、今度は Cosmos DB をお金で殴りに行くことになるでしょう。