0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

AzureFunctionsとAzureEventhubでPartitionKey指定のイベント発行

Last updated at Posted at 2017-07-20

GYAOのtsです。
我々のチームは、オールパブリッククラウドで、Microservice Architectureを採用した次期バックエンドを設計中です。

経緯

前回の投稿ではServicebusを使用してみたが、
オフィシャルにある通り、順序性の担保のためにはパーティション指定をしないといけない。(queueなら問題ないのだが)
Eventhubsでもそうなのだが、今回作りたいものが、add,deleteの機能で、順序が担保されないと困る。
いろいろ考えた結果Eventhubsで実装してみることにした。
Event Hubs における可用性と一貫性
上記のドキュメントにある通り、Eventhubsでも同様にデフォルトで順序性の担保はなされない。この例のようにシーケンスナンバーを発行する方法もあるが、その場合はconsumerでの対応が強制される。
Partitionをまたがなければ順序性は保証されるので、
今回はPartitionKeyを指定して特定のPartitionにFunctionsでEventを発行してみる。可用性は犠牲になるが順序性を担保するためにはしょうがない。

やりたいこと

  • FunctionsでPartitionKeyを指定したEvent発行。

テスト用コンシューマ

#r "Microsoft.ServiceBus"

using System;
using Microsoft.ServiceBus.Messaging;

public static void Run(EventData myEventHubMessage, TraceWriter log)
{
    var key = myEventHubMessage.PartitionKey;
    log.Info($"C# Event Hub trigger function processed event: {key}");
}

EventDataオブジェクトをバインド先に指定できる。
そこからパーティションキーを取得して表示。
これをテストアプリとして、PartitionKeyを確認してみる。

問題

ところが。。。出力の場合はEventData型がバインド先に指定できない。
許可されているのはstring型のみ。なんで?!

下記のような形が推奨らしい。string・・・これだとPartitionKeyが指定できない・・・

using System;

public static void Run(TimerInfo myTimer, out string outputEventHubMessage, TraceWriter log)
{
    String msg = $"TimerTriggerCSharp1 executed at: {DateTime.Now}";

    log.Verbose(msg);   

    outputEventHubMessage = msg;
}

こちらのオフィシャルドキュメントでも出力は下記の3パターン

  • out string
  • ICollector (複数のメッセージを出力する場合)
  • IAsyncCollector (ICollector の非同期バージョン)

悩んだ挙句

悩んだ・・・。PartitionKeyの指定方法がどこにも載っていない。。。
悩んだ挙句、outを使用しないで、内部的にライブラリを使用してEventを発行しようと思い、やってみた。
Eventの発行は下記の通り。

  • まずクライアントとEventDataを生成するメソッド。
#r "Newtonsoft.Json"
#r "Microsoft.ServiceBus"

using System;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;


public static EventHubClient createEventHubClient() 
{
    string connectionString = 
        "Endpoint=sb://[your namespace].servicebus.windows.net;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=[your secret]";
    //Create the connection string
    ServiceBusConnectionStringBuilder builder = 
        new ServiceBusConnectionStringBuilder(connectionString)
    {
        TransportType = Microsoft.ServiceBus.Messaging.TransportType.Amqp
    };
    //Create the Eventhubs sender
    string eventHubName = "[name]";
    return EventHubClient.CreateFromConnectionString(builder.ToString(), eventHubName);
}

public static EventData createEventData(string json) 
{
    EventData eventData = new EventData(Encoding.UTF8.GetBytes(json));
    //PertitionKey
    eventData.PartitionKey = "0";
    return eventData;
}

  • 送るときはこんな感じ。
createEventHubClient().Send(createEventData(json));

createEventHubClient().SendAsync(createEventData(json));
  • 結果
2017-07-20T07:27:29.774 C# Event Hub trigger function processed event: 0
2017-07-20T07:27:29.774 Function completed (Success, Id=f622872a-・・・・・

指定できている(当然だが)。

所感

せめてoutputの場合もEventData型にバインドできるようにしてほしい。何故inputだけなのか・・・
MSさん。お願いします。。。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?