はじめに
仕事でIoTHub, EventHubs, ServiceBusと触ってきましたが、それぞれ利用用途が異なります。AWSもそうですがたくさんのキューサービスが存在します。
今回はEventGridを触っていきたいと思います。
EventGridとはなにか?
このサービスはEventメッセージを扱うサービスで、送る側は後にどんな処理が行われるかについては関知しません。このへんはIoTHub,EventHubsと同じですね。
逆にメッセージを扱うサービスはServiceBusです。受け取った側が最後まで責任をもってキューからデータを削除してあげる必要があります。
ServiceBusについては先日まとめたエントリーをご参照ください。
イベントソースはこちらにある通りたくさんのサービスに対応しています。
- Container Registry
- EventHubs
- IoTHubs
- Key Vault
- MediaService
- ServiceBus
- ストレージ
- AppConfiguration
- SignalR
- MachineLearning
トピックというのはイベントのエンドポイントのことです。イベントのソースはトピックに対してイベントを送信します。
サブスクリプションには、どのトピックから受信してどのイベントハンドラ起動させるかを設定しておきます。
イベントハンドラはイベントの送信先です。こちらにある通り色々なサービスに対応しています
- AzureAutomation
- AzureFunctions
- EventHubs
- LogicApps
- ServiceBus
- QueueStorage
- WebHook
AzureFunctionsにつなげば超強力なBinding機能でCosmosにもつなげますし使いどころはたくさんありそうです。
EventGridドメインを作る
イベントスキーマに何を設定すればよいのか悩むと思いますが、「クラウドイベントスキーマ v1.0」というものがあります。
これはCNCFが策定するクラウドイベントの標準仕様でAWSやOracle,MSが策定に参画しており、イベントのクラウド間相互運用性や互換性が高まることを狙っているものです。
現在ではFunctionがネイティブサポートされていないため、CloudEventsではなく、「イベントグリッドスキーマ」を使っていきます。
Functionの作成
イベントサブスクリプションを定義していきたいところですが、先にFunctionを作っておかないといけないので作っておきます。
いつも通りAzure Functions Core Toolsで作っていきます。
func new
した後に「JavaScript」→「6. Azure Event Grid trigger」を選択して関数のひな型を作りAzureFunctionsをデプロイしておきます。
このへんの操作は詳しくは下記のエントリを参照ください
今回は、1つのイベントソースで2つのFunctionを起動させてみたいと思いますので2つのFunctionをデプロイしました。
それぞれソースはこんな感じです
module.exports = async function (context, eventGridEvent) {
context.log("#1")
context.log(eventGridEvent);
};
module.exports = async function (context, eventGridEvent) {
context.log("#2")
context.log(eventGridEvent);
};
トピックと紐づける
それではサブスクリプションを登録していきましょう。
特に難しいところはないはずです。ドメイントピックの名前は今回はtopicという名前にしました。
同じトピックに2つのサブスクリプションを紐付けましょう
イベントを送る
Azure Event Grid libraries for JavaScript Example codeのままだとエラーが出てしまいます。
eventsには、eventTimeとtopicが必要なので忘れずに設定しましょう。
const EventGridClient = require("azure-eventgrid");
const msRestAzure = require("ms-rest-azure");
const uuid = require("uuid").v4;
const moment = require("moment");
const topicCred = new msRestAzure.TopicCredentials(process.env.TOPIC_KEY);
const EGClient = new EventGridClient(topicCred);
const topicEndpoint = process.env.TOPIC_ENDPOINT;
const events = [
{
id: uuid(),
subject: 'TestSubject',
topic: 'topic',
dataVersion: '1.0',
eventType: 'Microsoft.MockPublisher.TestEvent',
eventTime: new Date().toISOString(),
data: {
name: 'uzresk',
message: 'hello!'
}
}
];
return EGClient.publishEvents(topicEndpoint, events).then((result) => {
return Promise.resolve(console.log('publish successfully.'))
}).catch((err) =>{
console.log("ERROR-------------------------");
console.log(err);
});
TOPIC_ENDPOINTですが、ドメインの画面にある「ドメインエンドポイント」をそのまま設定してはいけません。
https://xxxxxxx.japaneast-1.eventgrid.azure.net/api/events
ではなく↓を設定します。
xxxxxxx.japaneast-1.eventgrid.azure.net
では実行してみます。
$ node index.js
publish successfully.
少し経つと、EventGridドメインにある画面で1 Published Event, 2 Matched Eventとなっていれば成功です。
Functionsのログも確認しておきましょう
Dead Letter
イベントハンドラに渡せなかったりした場合はDeadLetterにデータが渡ります。
ServiceBusの場合はDeadLetterQueueというキューにデータが渡りましたがEventGridの場合はストレージに入るようです。
イベントの最大試行回数分動いてからDeadLetterにデータが渡るため実験する場合は短めに設定しましょう。
対象の関数を停止しておきわざと失敗させます。
配信不能だったデータは以下のパスに保管されます。(日付はUTCです)
ストレージアカウント/トピック名/サブスクリプション名/年/月/日/xxxxxx.json
[
{
"id": "xxxxxxxxxx-3b29-4bfc-a634-xxxxxxxxxx",
"eventTime": "2020-02-26T23:22:02.0000000Z",
"eventType": "Microsoft.MockPublisher.TestEvent",
"dataVersion": "1.0",
"metadataVersion": "1",
"topic": "/subscriptions/xxxxxxxxxx/resourceGroups/rg-xxxxxxxx/providers/Microsoft.EventGrid/domains/evg-tisesk/topics/topic",
"subject": "TestSubject",
"deadLetterReason": "MaxDeliveryAttemptsExceeded",
"deliveryAttempts": 2,
"lastDeliveryOutcome": "Forbidden",
"publishTime": "2020-02-26T23:22:03.8286689Z",
"lastDeliveryAttemptTime": "2020-02-26T23:22:04.1762595Z",
"data": {
"name": "uzresk",
"message": "hello!"
}
}
]
DeadLetterとなったデータはBlobStorage Trigger Functionを使って再試行や通知を行うことで対応ができるかと思います。
.Netのサンプルは「.NETのAzure Event Grid Dead Letterサンプル」にありますが、普通のTrigger Functionsと同じです。
DeadLetterに入る時点で時間が経っているのであまり気にする状況はないかと思いますが、BlobTriggerは最大10分間遅延することは忘れないでおきましょう。
関数アプリを従量課金プランで使用しているときに、関数アプリがアイドル状態になっている場合、新しい BLOB の処理が最大で 10 分間遅延する可能性があります。 この待機時間を避けるには、Always On が有効な App Service プランに切り替えることができます。 BLOB ストレージ アカウントで Event Grid トリガーを使用することもできます。
ServiceBusとつないでみる
同じようにサブスクリプションを作って、イベントハンドラをServiceBusにします。
ServiceBusExplorerで確認してみると確かにデータが入っていますね。
EventGridの後ろにServiceBusを挟むのはなぜか?
Event Grid は、持続性のある配信を提供します。 各サブスクリプションに対して、最低 1 回は各メッセージを配信します。 イベントは、直ちに各サブスクリプションの登録済みのエンドポイントに送信されます。 エンドポイントがイベントの受信を確認しない場合、Event Grid はイベントの配信を再試行します。
ここに記載の通り最低1回、2回送信されることもありうる訳です。
EventGridのバックエンドの処理が冪等にできない場合や、1回しか処理できない場合はEventGridのバックエンドにServiceBusを配置し、ServiceBusのキューの設定で重複排除しておくことで1回しか処理されないようなメッセージを配信することが可能になるわけです。