Posted at

ServiceBus Queue と Storage Queue と戯れる

More than 1 year has passed since last update.

自分の書いているライブラリで、Storage Queue と Service Bus Queue のそれぞれのプログラミングの基本的な方法を学びたかったのでスパイクのコードを書いてみた。


Storage Queue

これは、とてもシンプルなキューで、下記のコードが大体の使い方。WindowsAzure.Storage の nuget パッケージをインポートして使う。

サンプルでは、App.config に下記に書いた設定を入れる必要がある。

APIが素直なので読めばわかると思う。

StorageQueueSample.cs

using System;

using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure;
using Microsoft.WindowsAzure.Storage.Queue;
using Microsoft.WindowsAzure.Storage;
using System.Configuration;

namespace QueueEmitSample
{
class StorageQueueSample
{
/// <summary>
/// Storage Queue sample progam
/// https://docs.microsoft.com/en-us/azure/storage/queues/storage-dotnet-how-to-use-queues
/// </summary>
/// <returns></returns>
public async Task ExecuteAsync()
{
// Configuration
var storageAccount = CloudStorageAccount.Parse(
ConfigurationManager.AppSettings.Get("Storage:ConnectionString"));
var queueClient = storageAccount.CreateCloudQueueClient();
// Create a new queue
var guid = Guid.NewGuid();
var queueName = $"testosterone{guid}";
var queue = queueClient.GetQueueReference(queueName);
await queue.CreateIfNotExistsAsync();

var noMessagequeue = queueClient.GetQueueReference("nomessage");
await noMessagequeue.CreateIfNotExistsAsync();

// Emitt queue
var message = new CloudQueueMessage("hello world");

await queue.AddMessageAsync(message);
// Prompt
Console.WriteLine($"We create {queueName} please check it out");
Console.ReadLine();

// Peek at the next message
var peekedMessage = await queue.PeekMessageAsync();
Console.WriteLine($"Peeked message: {peekedMessage.AsString}");

// In case there is no message, Storage Queue returns null
var noMessage = await noMessagequeue.PeekMessageAsync();
Console.WriteLine($"No message: {noMessage}");

// Delete Queue
await queue.DeleteAsync();
await noMessagequeue.DeleteAsync();
}
}
}

App.config

<?xml version="1.0" encoding="utf-8"?>

<configuration>
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.6.1" />
</startup>
<appSettings>
<add key="Storage:ConnectionString" value="YOUR_STORAGE_CONNECTION_STRING" />
<add key="ServiceBus:ConnectionString" value="YOUR_SERVICE_BUS_CONNECTION_STRING"/>
</appSettings>
:


Queue にメッセージが無かったら?

一点だけポイントは、Queue にメッセージが無かった時に

            var peekedMessage = await queue.PeekMessageAsync();

を読んだらどうなるか?ということだ。単純に null が返る。つまり、Queue が来たことを待ちたかったら、自分でポーリングする必要がある。


Service Bus Queue

次にService Bus Queue こちらは少し複雑なので少し解説したい。

ServiceBusQueueSample.cs

using System;

using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.ServiceBus;
using Microsoft.Azure.ServiceBus;
using System.Configuration;
using Microsoft.ServiceBus.Messaging;
using System.Threading;

namespace QueueEmitSample
{
class ServiceBusQueueSample
{
/// <summary>
/// Sample for create a queue and send message.
/// https://blogs.msdn.microsoft.com/brunoterkaly/2014/08/07/learn-how-to-create-a-queue-place-and-read-a-message-using-azure-service-bus-queues-in-5-minutes/
/// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues
/// For creating new queue and use namespace manager, we need WidowsAzure.SerivceBus package. for others Microsoft.Azure.ServiceBus.
/// </summary>
/// <returns></returns>
public async Task ExecAsync()
{
// create a new queue
const string SERVICE_BUS_CONNECTIONSTRING = "ServiceBus:ConnectionString";
var namespaceManager = NamespaceManager.CreateFromConnectionString(ConfigurationManager.AppSettings.Get(SERVICE_BUS_CONNECTIONSTRING));
var guid = Guid.NewGuid();
var queueName = $"testosterone{guid}";
if (await namespaceManager.QueueExistsAsync(queueName))
{
await namespaceManager.DeleteQueueAsync(queueName);
}
await namespaceManager.CreateQueueAsync(queueName);

// send message

// Note. QueueClient is resiside not only Microsoft.Azure.ServiceBus. Also Microsoft.ServiceBus
queueClient = new Microsoft.Azure.ServiceBus.QueueClient(ConfigurationManager.AppSettings.Get(SERVICE_BUS_CONNECTIONSTRING), queueName);
var message = new Message(Encoding.UTF8.GetBytes("hello world from service bus"));
await queueClient.SendAsync(message);

Console.WriteLine($"We create a queue named {queueName} and send message to ServiceBus. Please check it out");
Console.ReadLine();

// peek queue
// Configure the MessageHandler Options in terms of exception handling, number of concurrent messages to deliver etc.
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHander)
{
// Maximum number of Concurrent calls to the callback `ProcessMessagesAsync`, set to 1 for simplicity.
// Set it according to how many messages the application wants to process in parallel.
MaxConcurrentCalls = 1,

// Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
// False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below.
AutoComplete = false
};
queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);

// Wait for callback
Console.ReadKey();

// Close QueueClient
await queueClient.CloseAsync();

// Delete the Queue
await namespaceManager.DeleteQueueAsync(queueName);
}

async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
// Process the message
Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");

// Complete the message so that it is not received again.
// This can be done only if the queueClient is created in ReceiveMode.PeekLock mode (which is default).
await queueClient.CompleteAsync(message.SystemProperties.LockToken);

// Note: Use the cancellationToken passed as necessary to determine if the queueClient has already been closed.
// If queueClient has already been Closed, you may chose to not call CompleteAsync() or AbandonAsync() etc. calls
// to avoid unnecessary exceptions.
}

Task ExceptionReceivedHander(Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
Console.WriteLine("Exception context for troubleshooting:");
Console.WriteLine($"- Endpoint: {context.Endpoint}");
Console.WriteLine($"- Entity Path: {context.EntityPath}");
Console.WriteLine($"- Executing Action: {context.Action}");
return Task.CompletedTask;
}
private Microsoft.Azure.ServiceBus.QueueClient queueClient;
}
}


uget package

やりたかったことは、Queue 自体を作成して、メッセージを送付後、メッセージを消すということだ。 このプログラムを動かすために、2つのNugetパッケージをインポートしている。


  • Microsoft.Azure.ServiceBus

  • WindowsAzure.ServiceBus

Queueの操作に使うのは、Microsoft.Azure.ServiceBus の方。一方、Queue自体を作りたいという場合は、NamespaceManagerを使うのだが、こちらは、WindowsAzure.SerbiceBus の方にしかいない。`QueueClient は両方のネームスペースにあるので、ダサいけど、ネームスペースを指定する必要がある。


Queue 自体の作成、削除

NamespaceManager を使ってシンプルにできる。Create() ではなく CreateFromConnectionString() を使っている理由は、Create() は、デフォルトコネクションストリング名を使うため。ServiceBusConenctionString か何か。違うのをつけたかったので、自分でコンフィグしている。

var namespaceManager = NamespaceManager.CreateFromConnectionString(ConfigurationManager.AppSettings.Get(SERVICE_BUS_CONNECTIONSTRING));

var guid = Guid.NewGuid();
var queueName = $"testosterone{guid}";
if (await namespaceManager.QueueExistsAsync(queueName))
{
await namespaceManager.DeleteQueueAsync(queueName);
}
await namespaceManager.CreateQueueAsync(queueName);


Message の送信

特に難しさはないだろう。

            queueClient = new Microsoft.Azure.ServiceBus.QueueClient(ConfigurationManager.AppSettings.Get(SERVICE_BUS_CONNECTIONSTRING), queueName);

var message = new Message(Encoding.UTF8.GetBytes("hello world from service bus"));
await queueClient.SendAsync(message);


Message の受信

こちらは、StorageQueue との違いになっていて、コールバック形式になっている。StorageQueue がポーリングしないといけないのに対して、こちらは、ロングポーリング形式で、コールバックを待ってればいいことになる。だから、Queue が無い場合にタイムアウトする処理などを書いてあげる必要があるだろう。(もしくは設定できるかも)ServiceBus の Queue はかなり指定が細かくなっている。並列で沢山読めるようになていたり、コールバックが終わったら自動的にキューを終了させるかのオプションを指定している。他にも細かいのがたくさんあった。

デフォルトのタイムアウトは、1分で、経過したら TimeoutException がスローされる。ServiceBus でスローされる例外は Service Bus messaging exceptions試していないが、その設定を変えるためにはwhere do I set Azure ServiceBus timeout

を参照すればよい。

            // peek queue

var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHander)
{

MaxConcurrentCalls = 1,
AutoComplete = false
};
queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
// Wait for callback
Console.ReadKey();
}

async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
// Process the message
Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
await queueClient.CompleteAsync(message.SystemProperties.LockToken);
}

Task ExceptionReceivedHander(Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
Console.WriteLine("Exception context for troubleshooting:");
Console.WriteLine($"- Endpoint: {context.Endpoint}");
Console.WriteLine($"- Entity Path: {context.EntityPath}");
Console.WriteLine($"- Executing Action: {context.Action}");
return Task.CompletedTask;
}
private Microsoft.Azure.ServiceBus.QueueClient queueClient;


クロージング

ServiceBus の場合は、ちゃんとクライアントをクローズしないとけいない。using 句がつかえないのは残念だが、ちゃんと閉じる。

            // Close QueueClient

await queueClient.CloseAsync();


おわりに

StorageQueue と ServiceBus Queue を見てみると、似ているようで結構違う。Service Bus は銀行でのトランザクションとか沿いうのにも使えるので相当がっつりしている。

をざっと読んでも、各種の設定、プリフェッチ、プリフェッチのロックの長さの変更、ハイパフォーマンスシナリオなどが記述されている。また、比較資料もある。

自分には今回ぐらいので十分なので、本来StorageQueue でもよかったのだろう。

に今回のスパイクのソースを置いておいた。