0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Azure Service BusのSession対応Queueで迅速にSessionを切り替えたい (Azure Python SDK)

Posted at

はじめに

Azure Service Bus のセッション対応キューを使っていると、セッション内のメッセージがなくなったときにすぐにセッションを離脱してNEXT_AVAILABLE_SESSIONで新しいセッションを掴みにいきたいけど、新しいセッションが見つからないときには待機時間を長くしてポーリングのし過ぎを防ぎたい、という状況に直面することがあります。この記事では、そんなときの工夫について紹介します。​

TL;DR

ServiceBusClientmax_wait_timeget_queue_receiverを呼んだときにだけ設定できるわけじゃなく、receive_messagesでも上書きできるので、セッションの取得とメッセージの受信で異なる待機時間を設定することが可能です。

セッションって何?

Service Bus のセッションは、同じ session_id を持つメッセージをグループ化して、順序を保って処理するための機能です。FIFO(先入れ先出し)や、関連するメッセージの一括処理、排他処理に便利です。
Azure Service Bus のメッセージ セッション - Microsoft Learn

状態遷移図で見るセッション管理

セッションを使用しない場合

Microsoft Learnで提案されているようなの利用法の場合、以下のような状態遷移が発生します:

ここで、NEXT_AVAILABLE_SESSIONを利用する場合はOperationTimeoutErrorの取得により、特定のセッションにはメッセージがない、という情報を取得し、次のセッションへ移行するという動作が行われますが、max_wait_timeが30秒などの長い場合にはセッションへの移行が遅れ、0.5秒などの短い場合にはポーリングが起きすぎるという問題が発生し得ます。

具体的な解決策

以下のように、 セッション管理を最適化できます。
これを解決するために、セッションがない初期状態では長めの待機時間をmax_wait_timeに設定し、セッションがある状態では短めの待機時間でメッセージを受信することで、効率的なセッション管理が可能になります。

import asyncio
from azure.servicebus.aio import ServiceBusClient, NEXT_AVAILABLE_SESSION

async def process_messages():
    servicebus_client = ServiceBusClient.from_connection_string("YourConnectionString")
    async with servicebus_client:
        while True:
            # receiverに短めの待機時間を設定
            async with servicebus_client.get_queue_receiver(
                queue_name="your_queue_name",
                session_id=NEXT_AVAILABLE_SESSION,
                max_wait_time=0.5
            ) as receiver:
                # セッション内のメッセージを受信する際は、長めの待機時間を別途設定
                try:
                    messages = await receiver.receive_messages(max_wait_time=30)
                except OperationTimeoutError:
                    continue
                if not messages:
                    # メッセージがなければセッションを離脱
                    continue
                for message in messages:
                    print(f"Received: {message}")
                    await receiver.complete_message(message)
                #残りを短い待機時間で処理し、メッセージがなくなった場合には早く離脱できるようにする
                async for msg in receiver:
                    print(f"Received: {message}")
                    
asyncio.run(process_messages())
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?