Azure
AzureEventHubs
EventHubs
Functions
AzureFunctions

AzureFunctionsとAzureEventhubでPartitionKey指定のイベント発行

More than 1 year has passed since last update.

GYAOのtsです。
我々のチームは、オールパブリッククラウドで、Microservice Architectureを採用した次期バックエンドを設計中です。

経緯

前回の投稿ではServicebusを使用してみたが、
オフィシャルにある通り、順序性の担保のためにはパーティション指定をしないといけない。(queueなら問題ないのだが)
Eventhubsでもそうなのだが、今回作りたいものが、add,deleteの機能で、順序が担保されないと困る。
いろいろ考えた結果Eventhubsで実装してみることにした。
Event Hubs における可用性と一貫性
上記のドキュメントにある通り、Eventhubsでも同様にデフォルトで順序性の担保はなされない。この例のようにシーケンスナンバーを発行する方法もあるが、その場合はconsumerでの対応が強制される。
Partitionをまたがなければ順序性は保証されるので、
今回はPartitionKeyを指定して特定のPartitionにFunctionsでEventを発行してみる。可用性は犠牲になるが順序性を担保するためにはしょうがない。

やりたいこと

  • FunctionsでPartitionKeyを指定したEvent発行。

テスト用コンシューマ

#r "Microsoft.ServiceBus"

using System;
using Microsoft.ServiceBus.Messaging;

public static void Run(EventData myEventHubMessage, TraceWriter log)
{
    var key = myEventHubMessage.PartitionKey;
    log.Info($"C# Event Hub trigger function processed event: {key}");
}

EventDataオブジェクトをバインド先に指定できる。
そこからパーティションキーを取得して表示。
これをテストアプリとして、PartitionKeyを確認してみる。

問題

ところが。。。出力の場合はEventData型がバインド先に指定できない。
許可されているのはstring型のみ。なんで?!

下記のような形が推奨らしい。string・・・これだとPartitionKeyが指定できない・・・

using System;

public static void Run(TimerInfo myTimer, out string outputEventHubMessage, TraceWriter log)
{
    String msg = $"TimerTriggerCSharp1 executed at: {DateTime.Now}";

    log.Verbose(msg);   

    outputEventHubMessage = msg;
}

こちらのオフィシャルドキュメントでも出力は下記の3パターン

  • out string
  • ICollector (複数のメッセージを出力する場合)
  • IAsyncCollector (ICollector の非同期バージョン)

悩んだ挙句

悩んだ・・・。PartitionKeyの指定方法がどこにも載っていない。。。
悩んだ挙句、outを使用しないで、内部的にライブラリを使用してEventを発行しようと思い、やってみた。
Eventの発行は下記の通り。

  • まずクライアントとEventDataを生成するメソッド。
#r "Newtonsoft.Json"
#r "Microsoft.ServiceBus"

using System;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;


public static EventHubClient createEventHubClient() 
{
    string connectionString = 
        "Endpoint=sb://[your namespace].servicebus.windows.net;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=[your secret]";
    //Create the connection string
    ServiceBusConnectionStringBuilder builder = 
        new ServiceBusConnectionStringBuilder(connectionString)
    {
        TransportType = Microsoft.ServiceBus.Messaging.TransportType.Amqp
    };
    //Create the Eventhubs sender
    string eventHubName = "[name]";
    return EventHubClient.CreateFromConnectionString(builder.ToString(), eventHubName);
}

public static EventData createEventData(string json) 
{
    EventData eventData = new EventData(Encoding.UTF8.GetBytes(json));
    //PertitionKey
    eventData.PartitionKey = "0";
    return eventData;
}

  • 送るときはこんな感じ。
createEventHubClient().Send(createEventData(json));

createEventHubClient().SendAsync(createEventData(json));
  • 結果
2017-07-20T07:27:29.774 C# Event Hub trigger function processed event: 0
2017-07-20T07:27:29.774 Function completed (Success, Id=f622872a-・・・・・

指定できている(当然だが)。

所感

せめてoutputの場合もEventData型にバインドできるようにしてほしい。何故inputだけなのか・・・
MSさん。お願いします。。。