はじめに
アーキテクチャに弾力性を持たせるためにはキューの存在は必須なわけですがAzureにもAWS SQSと同じようなサービスが存在します。それがAzureServiceBusです。
今回も仕事で使うためにAzureServiceBusについて調べてみました。
似たものとしてBlobStorageQueueがあるのですが、そのあたりの違いについては公式のドキュメントをご覧ください。
ServiceBusの各プランの機能と料金についてはこちらをご覧ください。
本記事で作ったソースはこちらにあります。
準備
サンプルプログラムを試すためのServiceBusとキューはAzurePortalから予め作っておいてください。
キューの名前は何でも構いませんがサンプルプログラム内ではqueueという名前で作成されたものを使っています。
また、キューにアクセスするためのポリシーの作成と接続文字列はプログラム内で使いますのでメモしておいてください。
私は「管理」「送信」「リッスン」それぞれのポリシーを作っておきました。
簡単なメッセージの送信
簡単なプログラムを使ってメッセージを送ってみます。
const {ServiceBusClient} = require("@azure/service-bus");
const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING;
const queueName = process.env.QUEUE_NAME;
const main = async () => {
const sbClient = ServiceBusClient.createFromConnectionString(connectionString);
const queueClient = sbClient.createQueueClient(queueName);
const sender = queueClient.createSender();
const message = {
body: 'Hello. Service Bus',
label: 'test',
userProperties: {
testPropertyName: 'property value'
}
};
console.log('send message start');
try {
await sender.send(message);
await queueClient.close();
console.log('send message end');
} finally {
await sbClient.close();
}
};
main().catch(err => {
console.log(err);
});
実行してみましょう
set SERVICE_BUS_CONNECTION_STRING=<ConnectionString>
set QUEUE_NAME=<QueueName>
C:\Users\uzres\products\azure-servicebus-transaction>node send-message.js
send message start
send message end
ServiceBus Explorerで確認
こちらからServiceBus Explorerをダウンロードします
File → Connectを選択し、ConnectionStringを入力します。
ConnectionStringのポリシーは「管理」である必要があります。
左メニューのqueueを選択し、Message→OKをクリックするとメッセージを見ることができます。
カスタムプロパティもみることができますね。
メッセージの受信
今度はメッセージを受信してみましょう。
const {ServiceBusClient, ReceiveMode} = require("@azure/service-bus");
const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING;
const queueName = process.env.QUEUE_NAME;
const main = async () => {
const sbClient = ServiceBusClient.createFromConnectionString(connectionString);
const queueClient = sbClient.createQueueClient(queueName);
const receiver = queueClient.createReceiver(ReceiveMode.receiveAndDelete)
console.log('receive message start');
try {
const messages = await receiver.receiveMessages(5);
console.log(messages.map(message => message.body));
await queueClient.close();
console.log('receive message end');
} finally {
await sbClient.close();
}
};
main().catch(err => {
console.log(err);
});
実行するためには、「リッスン」権限が必要です。
C:\Users\uzres\products\azure-servicebus-transaction>node receive-message.js
receive message start
[ 'Hello. Service Bus' ]
receive message end
実行してみるとわかりますが結構待たされます。
これはメッセージを受信するときに60秒間待つからです。
短くするにはreceveMessagesの2番目の引数に待つ秒数を指定します。
const messages = await receiver.receiveMessages(5, 1);
Azure Functionで受信
先ほどの例ではとあるタイミングでプログラムを実行しQueueからメッセージを受信しましたが、Queueに入ったタイミングでFunctionを動かすことができます。
これでQueueをポーリングする手間がなくなりますね。素晴らしい・・・。
関数の引数であるmySbMsgにメッセージが入ってきます。
module.exports = async function(context, mySbMsg) {
context.log('JavaScript ServiceBus queue trigger function processed message', mySbMsg);
context.log('EnqueuedTimeUtc =', context.bindingData.enqueuedTimeUtc);
context.log('DeliveryCount =', context.bindingData.deliveryCount);
context.log('MessageId =', context.bindingData.messageId);
};
{
"bindings": [
{
"name": "mySbMsg",
"type": "serviceBusTrigger",
"direction": "in",
"queueName": "queue",
"connection": ""
}
]
}
Functionを設定するときの注意点
- connectionのところを空にした場合は、アプリケーション設定の追加で、AzureWebJobsServiceBusという名前でQueueへの接続文字列を設定しておきます。
- function.jsonにqueueNameを指定しているので接続文字列からはEntityPath=queueを削除しておかないとエラーになります。
- このへんの設定はトリガーとバインド ServiceBusを確認してください。
Azure Functionで送信
Bindingという機能を使うとServiceBusにメッセージを送ることができます。
先ほど作ったfuctionで受け取ったメッセージをoutputqueueというQueueに投入してみたいと思います。
まずはfunction.jsonにdirectionがoutの要素を追記します。
{
"bindings": [
{
"name": "mySbMsg",
"type": "serviceBusTrigger",
"direction": "in",
"queueName": "queue",
"connection": ""
},
{
"name": "output",
"type": "serviceBus",
"queueName": "outputqueue",
"connection": "OutputConnection",
"direction": "out"
}
]
}
プログラムも少し変えましょう。受信したメッセージにoutput を付けて、outputqueueにメッセージを入れてみます。
bindingsの後の文字列はfunction.jsonのnameで指定した文字列になります。
module.exports = async function(context, mySbMsg) {
context.log('JavaScript ServiceBus queue trigger function processed message', mySbMsg);
context.log('EnqueuedTimeUtc =', context.bindingData.enqueuedTimeUtc);
context.log('DeliveryCount =', context.bindingData.deliveryCount);
context.log('MessageId =', context.bindingData.messageId);
const outputMessage = 'output' + mySbMsg;
context.bindings.output=outputMessage;
};
今回はfunction.jsonのconnectionにOutputConnectionを指定したので、Functionの環境変数にっはAzureWebJobsOutputConnectionという名前で接続文字列を指定します。
実行してoutputqueueの中身を見るとメッセージが配信されていることがわかりますね。
PeekLockとReceiveAndDelete
AzureFunctionでメッセージを受信したとき既定の動作はPeekLockです。
https://docs.microsoft.com/ja-jp/azure/azure-functions/functions-bindings-service-bus?tabs=csharp#trigger---peeklock-behavior
処理が正常に終了すれば、FunctionからCompleteを呼び出しQueueからメッセージが削除されるようになっています。ReceiveAndDeleteに設定することもできますが、これは受信した瞬間にメッセージが削除されるモードです。これはエラー発生時にメッセージがロストしてしまう可能性があります。
DeadLetter
MaxDeliveryCountに指定されている回数分処理が失敗することや、TimeToLiveの超過が起こるとDeadLetterQueue(DLQ)にメッセージが送信されます。
これはアプリケーションで明示的に指定することも可能です。
DLQの中身は自動的にクリーンアップされないため、DLQに移ったメッセージはログに出力するなどして確実に処理する必要があります。
早速試してみましょう。
DeadLetterQueueにメッセージを移す簡単な方法は、先ほど作ったFunctionの出力先であるoutputqueueを削除したあとにメッセージを送りましょう。
少し経ってFunctionのログを見ると10回失敗していることがわかります。
ExplorerでDeadLetterQueueに移っていることも確認できます。
DLQのメッセージを取得してみたいと思います。
const {ServiceBusClient, ReceiveMode} = require("@azure/service-bus");
const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING;
const queueName = process.env.QUEUE_NAME + "/$DeadLetterQueue";
const main = async () => {
const sbClient = ServiceBusClient.createFromConnectionString(connectionString);
const queueClient = sbClient.createQueueClient(queueName);
const receiver = queueClient.createReceiver(ReceiveMode.peekLock)
console.log('receive DLQ message start');
try {
const messages = await receiver.receiveMessages(5, 1);
for (let i = 0; i < messages.length; i++) {
const message = messages[i];
console.log(message.body)
await message.complete();
}
await queueClient.close();
console.log('receive DLQ message end');
} finally {
await sbClient.close();
}
};
main().catch(err => {
console.log(err);
});
DLQはQueueNameの後ろに/$DeadLetterQueueを付けただけです。
今回は、PeekLockで取得するのでメッセージ取得後complete()を呼び出しDLQからメッセージを削除しています。
おわりに
シンプルなメッセージのやり取りからFunctionを使った送受信、DLQまで試してみましたがいかがだったでしょうか。割と簡単に処理できることがお分かりいただけたかなと思います。