GYAOのtsです。
我々のチームは、オールパブリッククラウドで、Microservice Architectureを採用した次期バックエンドを設計中です。
経緯
前回の投稿ではServicebusを使用してみたが、
オフィシャルにある通り、順序性の担保のためにはパーティション指定をしないといけない。(queueなら問題ないのだが)
Eventhubsでもそうなのだが、今回作りたいものが、add,deleteの機能で、順序が担保されないと困る。
いろいろ考えた結果Eventhubsで実装してみることにした。
Event Hubs における可用性と一貫性
上記のドキュメントにある通り、Eventhubsでも同様にデフォルトで順序性の担保はなされない。この例のようにシーケンスナンバーを発行する方法もあるが、その場合はconsumerでの対応が強制される。
Partitionをまたがなければ順序性は保証されるので、
今回はPartitionKeyを指定して特定のPartitionにFunctionsでEventを発行してみる。可用性は犠牲になるが順序性を担保するためにはしょうがない。
やりたいこと
- FunctionsでPartitionKeyを指定したEvent発行。
テスト用コンシューマ
#r "Microsoft.ServiceBus"
using System;
using Microsoft.ServiceBus.Messaging;
public static void Run(EventData myEventHubMessage, TraceWriter log)
{
var key = myEventHubMessage.PartitionKey;
log.Info($"C# Event Hub trigger function processed event: {key}");
}
EventDataオブジェクトをバインド先に指定できる。
そこからパーティションキーを取得して表示。
これをテストアプリとして、PartitionKeyを確認してみる。
問題
ところが。。。出力の場合はEventData型がバインド先に指定できない。
許可されているのはstring型のみ。なんで?!
下記のような形が推奨らしい。string・・・これだとPartitionKeyが指定できない・・・
using System;
public static void Run(TimerInfo myTimer, out string outputEventHubMessage, TraceWriter log)
{
String msg = $"TimerTriggerCSharp1 executed at: {DateTime.Now}";
log.Verbose(msg);
outputEventHubMessage = msg;
}
こちらのオフィシャルドキュメントでも出力は下記の3パターン
- out string
- ICollector (複数のメッセージを出力する場合)
- IAsyncCollector (ICollector の非同期バージョン)
悩んだ挙句
悩んだ・・・。PartitionKeyの指定方法がどこにも載っていない。。。
悩んだ挙句、outを使用しないで、内部的にライブラリを使用してEventを発行しようと思い、やってみた。
Eventの発行は下記の通り。
- まずクライアントとEventDataを生成するメソッド。
#r "Newtonsoft.Json"
#r "Microsoft.ServiceBus"
using System;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
public static EventHubClient createEventHubClient()
{
string connectionString =
"Endpoint=sb://[your namespace].servicebus.windows.net;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=[your secret]";
//Create the connection string
ServiceBusConnectionStringBuilder builder =
new ServiceBusConnectionStringBuilder(connectionString)
{
TransportType = Microsoft.ServiceBus.Messaging.TransportType.Amqp
};
//Create the Eventhubs sender
string eventHubName = "[name]";
return EventHubClient.CreateFromConnectionString(builder.ToString(), eventHubName);
}
public static EventData createEventData(string json)
{
EventData eventData = new EventData(Encoding.UTF8.GetBytes(json));
//PertitionKey
eventData.PartitionKey = "0";
return eventData;
}
- 送るときはこんな感じ。
createEventHubClient().Send(createEventData(json));
createEventHubClient().SendAsync(createEventData(json));
- 結果
2017-07-20T07:27:29.774 C# Event Hub trigger function processed event: 0
2017-07-20T07:27:29.774 Function completed (Success, Id=f622872a-・・・・・
指定できている(当然だが)。
所感
せめてoutputの場合もEventData型にバインドできるようにしてほしい。何故inputだけなのか・・・
MSさん。お願いします。。。