はじめに
Azure Service Bus のセッション対応キューを使っていると、セッション内のメッセージがなくなったときにすぐにセッションを離脱してNEXT_AVAILABLE_SESSION
で新しいセッションを掴みにいきたいけど、新しいセッションが見つからないときには待機時間を長くしてポーリングのし過ぎを防ぎたい、という状況に直面することがあります。この記事では、そんなときの工夫について紹介します。
TL;DR
ServiceBusClient
のmax_wait_time
はget_queue_receiver
を呼んだときにだけ設定できるわけじゃなく、receive_messages
でも上書きできるので、セッションの取得とメッセージの受信で異なる待機時間を設定することが可能です。
セッションって何?
Service Bus のセッションは、同じ session_id
を持つメッセージをグループ化して、順序を保って処理するための機能です。FIFO(先入れ先出し)や、関連するメッセージの一括処理、排他処理に便利です。
Azure Service Bus のメッセージ セッション - Microsoft Learn
状態遷移図で見るセッション管理
セッションを使用しない場合
Microsoft Learnで提案されているようなの利用法の場合、以下のような状態遷移が発生します:
ここで、NEXT_AVAILABLE_SESSION
を利用する場合はOperationTimeoutError
の取得により、特定のセッションにはメッセージがない、という情報を取得し、次のセッションへ移行するという動作が行われますが、max_wait_time
が30秒などの長い場合にはセッションへの移行が遅れ、0.5秒などの短い場合にはポーリングが起きすぎるという問題が発生し得ます。
具体的な解決策
以下のように、 セッション管理を最適化できます。
これを解決するために、セッションがない初期状態では長めの待機時間をmax_wait_time
に設定し、セッションがある状態では短めの待機時間でメッセージを受信することで、効率的なセッション管理が可能になります。
import asyncio
from azure.servicebus.aio import ServiceBusClient, NEXT_AVAILABLE_SESSION
async def process_messages():
servicebus_client = ServiceBusClient.from_connection_string("YourConnectionString")
async with servicebus_client:
while True:
# receiverに短めの待機時間を設定
async with servicebus_client.get_queue_receiver(
queue_name="your_queue_name",
session_id=NEXT_AVAILABLE_SESSION,
max_wait_time=0.5
) as receiver:
# セッション内のメッセージを受信する際は、長めの待機時間を別途設定
try:
messages = await receiver.receive_messages(max_wait_time=30)
except OperationTimeoutError:
continue
if not messages:
# メッセージがなければセッションを離脱
continue
for message in messages:
print(f"Received: {message}")
await receiver.complete_message(message)
#残りを短い待機時間で処理し、メッセージがなくなった場合には早く離脱できるようにする
async for msg in receiver:
print(f"Received: {message}")
asyncio.run(process_messages())