Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
Help us understand the problem. What is going on with this article?

AzureServiceBusを色々と試してみた

More than 1 year has passed since last update.

はじめに

アーキテクチャに弾力性を持たせるためにはキューの存在は必須なわけですがAzureにもAWS SQSと同じようなサービスが存在します。それがAzureServiceBusです。
今回も仕事で使うためにAzureServiceBusについて調べてみました。
似たものとしてBlobStorageQueueがあるのですが、そのあたりの違いについては公式のドキュメントをご覧ください。

ServiceBusの各プランの機能と料金についてはこちらをご覧ください。

本記事で作ったソースはこちらにあります。

準備

サンプルプログラムを試すためのServiceBusとキューはAzurePortalから予め作っておいてください。
キューの名前は何でも構いませんがサンプルプログラム内ではqueueという名前で作成されたものを使っています。

また、キューにアクセスするためのポリシーの作成と接続文字列はプログラム内で使いますのでメモしておいてください。
私は「管理」「送信」「リッスン」それぞれのポリシーを作っておきました。

image.png

簡単なメッセージの送信

簡単なプログラムを使ってメッセージを送ってみます。

send-message.js
const {ServiceBusClient} = require("@azure/service-bus");

const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING;
const queueName = process.env.QUEUE_NAME;

const main = async () => {

    const sbClient = ServiceBusClient.createFromConnectionString(connectionString);
    const queueClient = sbClient.createQueueClient(queueName);
    const sender = queueClient.createSender();

    const message = {
        body: 'Hello. Service Bus',
        label: 'test',
        userProperties: {
            testPropertyName: 'property value'
        }
    };
    console.log('send message start');
    try {
        await sender.send(message);
        await queueClient.close();
        console.log('send message end');
    } finally {
        await sbClient.close();
    }
};

main().catch(err => {
    console.log(err);
});

実行してみましょう

set SERVICE_BUS_CONNECTION_STRING=<ConnectionString>
set QUEUE_NAME=<QueueName>
C:\Users\uzres\products\azure-servicebus-transaction>node send-message.js
send message start
send message end

ServiceBus Explorerで確認

こちらからServiceBus Explorerをダウンロードします

File → Connectを選択し、ConnectionStringを入力します。
ConnectionStringのポリシーは「管理」である必要があります。

image.png

左メニューのqueueを選択し、Message→OKをクリックするとメッセージを見ることができます。

image.png

カスタムプロパティもみることができますね。

image.png

メッセージの受信

今度はメッセージを受信してみましょう。

receive-message.js
const {ServiceBusClient, ReceiveMode} = require("@azure/service-bus");

const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING;
const queueName = process.env.QUEUE_NAME;

const main = async () => {

    const sbClient = ServiceBusClient.createFromConnectionString(connectionString);
    const queueClient = sbClient.createQueueClient(queueName);
    const receiver = queueClient.createReceiver(ReceiveMode.receiveAndDelete)

    console.log('receive message start');
    try {
        const messages = await receiver.receiveMessages(5);
        console.log(messages.map(message => message.body));

        await queueClient.close();
        console.log('receive message end');
    } finally {
        await sbClient.close();
    }
};

main().catch(err => {
    console.log(err);
});

実行するためには、「リッスン」権限が必要です。

C:\Users\uzres\products\azure-servicebus-transaction>node receive-message.js
receive message start
[ 'Hello. Service Bus' ]
receive message end

実行してみるとわかりますが結構待たされます。
これはメッセージを受信するときに60秒間待つからです。
短くするにはreceveMessagesの2番目の引数に待つ秒数を指定します。

const messages = await receiver.receiveMessages(5, 1);

Azure Functionで受信

先ほどの例ではとあるタイミングでプログラムを実行しQueueからメッセージを受信しましたが、Queueに入ったタイミングでFunctionを動かすことができます。
これでQueueをポーリングする手間がなくなりますね。素晴らしい・・・。

関数の引数であるmySbMsgにメッセージが入ってきます。

module.exports = async function(context, mySbMsg) {
    context.log('JavaScript ServiceBus queue trigger function processed message', mySbMsg);
    context.log('EnqueuedTimeUtc =', context.bindingData.enqueuedTimeUtc);
    context.log('DeliveryCount =', context.bindingData.deliveryCount);
    context.log('MessageId =', context.bindingData.messageId);
};
functions.json
{
  "bindings": [
    {
      "name": "mySbMsg",
      "type": "serviceBusTrigger",
      "direction": "in",
      "queueName": "queue",
      "connection": ""
    }
  ]
}

Functionを設定するときの注意点

  • connectionのところを空にした場合は、アプリケーション設定の追加で、AzureWebJobsServiceBusという名前でQueueへの接続文字列を設定しておきます。
  • function.jsonにqueueNameを指定しているので接続文字列からはEntityPath=queueを削除しておかないとエラーになります。
  • このへんの設定はトリガーとバインド ServiceBusを確認してください。

Azure Functionで送信

Bindingという機能を使うとServiceBusにメッセージを送ることができます。
先ほど作ったfuctionで受け取ったメッセージをoutputqueueというQueueに投入してみたいと思います。

まずはfunction.jsonにdirectionがoutの要素を追記します。

function.json
{
  "bindings": [
    {
      "name": "mySbMsg",
      "type": "serviceBusTrigger",
      "direction": "in",
      "queueName": "queue",
      "connection": ""
    },
    {
      "name": "output",
      "type": "serviceBus",
      "queueName": "outputqueue",
      "connection": "OutputConnection",
      "direction": "out"
    }
  ]
}

プログラムも少し変えましょう。受信したメッセージにoutput を付けて、outputqueueにメッセージを入れてみます。
bindingsの後の文字列はfunction.jsonのnameで指定した文字列になります。

index.js
module.exports = async function(context, mySbMsg) {
    context.log('JavaScript ServiceBus queue trigger function processed message', mySbMsg);
    context.log('EnqueuedTimeUtc =', context.bindingData.enqueuedTimeUtc);
    context.log('DeliveryCount =', context.bindingData.deliveryCount);
    context.log('MessageId =', context.bindingData.messageId);

    const outputMessage = 'output' + mySbMsg;
    context.bindings.output=outputMessage;
};

今回はfunction.jsonのconnectionにOutputConnectionを指定したので、Functionの環境変数にっはAzureWebJobsOutputConnectionという名前で接続文字列を指定します。

image.png

実行してoutputqueueの中身を見るとメッセージが配信されていることがわかりますね。

image.png

PeekLockとReceiveAndDelete

AzureFunctionでメッセージを受信したとき既定の動作はPeekLockです。
https://docs.microsoft.com/ja-jp/azure/azure-functions/functions-bindings-service-bus?tabs=csharp#trigger---peeklock-behavior

処理が正常に終了すれば、FunctionからCompleteを呼び出しQueueからメッセージが削除されるようになっています。ReceiveAndDeleteに設定することもできますが、これは受信した瞬間にメッセージが削除されるモードです。これはエラー発生時にメッセージがロストしてしまう可能性があります。

DeadLetter

MaxDeliveryCountに指定されている回数分処理が失敗することや、TimeToLiveの超過が起こるとDeadLetterQueue(DLQ)にメッセージが送信されます。
これはアプリケーションで明示的に指定することも可能です。

DLQの中身は自動的にクリーンアップされないため、DLQに移ったメッセージはログに出力するなどして確実に処理する必要があります。

早速試してみましょう。

DeadLetterQueueにメッセージを移す簡単な方法は、先ほど作ったFunctionの出力先であるoutputqueueを削除したあとにメッセージを送りましょう。

少し経ってFunctionのログを見ると10回失敗していることがわかります。

image.png

ExplorerでDeadLetterQueueに移っていることも確認できます。

image.png

DLQのメッセージを取得してみたいと思います。

receive-DLQ-message.js
const {ServiceBusClient, ReceiveMode} = require("@azure/service-bus");

const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING;
const queueName = process.env.QUEUE_NAME + "/$DeadLetterQueue";

const main = async () => {

    const sbClient = ServiceBusClient.createFromConnectionString(connectionString);
    const queueClient = sbClient.createQueueClient(queueName);
    const receiver = queueClient.createReceiver(ReceiveMode.peekLock)

    console.log('receive DLQ message start');
    try {
        const messages = await receiver.receiveMessages(5, 1);
        for (let i = 0; i < messages.length; i++) {
            const message = messages[i];
            console.log(message.body)
            await message.complete();
        }
        await queueClient.close();
        console.log('receive DLQ message end');
    } finally {
        await sbClient.close();
    }
};

main().catch(err => {
    console.log(err);
});

DLQはQueueNameの後ろに/$DeadLetterQueueを付けただけです。
今回は、PeekLockで取得するのでメッセージ取得後complete()を呼び出しDLQからメッセージを削除しています。

おわりに

シンプルなメッセージのやり取りからFunctionを使った送受信、DLQまで試してみましたがいかがだったでしょうか。割と簡単に処理できることがお分かりいただけたかなと思います。

uzresk
触ったプロダクトのメモです。
https://github.com/uzresk
tis
創業40年超のSIerです。
https://www.tis.co.jp/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away