本日はAzure Service Bus
を導入していきます。Service Bus
とはざっくり言うとキューシステムです。数多くの処理を複数のユーザーから受けるときに一旦リクエストを受け、キューに入れて処理する際に使います。 今回はLineから送られてくる、レシート処理のリクエストを一旦キューに入れ、順に処理をするような利用をします。
Azure Service Bus
とは
詳細は「またえこうじ@ぴたデジの人」さんにて、こちらの記事にまとめられていますので、ご参照ください。
https://note.com/koji_matae/n/nb4335177b512
Service Busのサービスページはこちら
https://azure.microsoft.com/ja-jp/products/service-bus
詳細はこちらを参照
https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-overview
https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues?tabs=passwordless
Service Bus
またはStorage Queues
どちらが適しているのか悩んでいる方はこちらのドキュメントを参照ください。
https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-azure-and-service-bus-queues-compared-contrasted
基本Service Bus
の方が多くの機能を備えていますので、今後複雑なキュー処理を想定しているなら、Service Bus
、シンプルにメッセージを処理するだけなら、Storage Queues
でいいかと思います。(それだけではないけど)
導入はこちらのドキュメントを参照しました。
https://learn.microsoft.com/en-us/dotnet/api/overview/azure/service-bus?preserve-view=true&view=azure-dotnet
準備
では、さっそく準備に入ります。必要なライブラリーをインストールします。
PM > Install-Package Azure.Messaging.ServiceBus
簡単なサンプルコード、メッセージを送る、受け取る。
サンプルコードが公開されていますので、まずはそちらを見て理解していきます。キュー内のメッセージの送信と受信は下記です。
サンプルコードはこちら参照
https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample01_SendReceive.md
string connectionString = "<connection_string>";
string queueName = "<queue_name>";
// since ServiceBusClient implements IAsyncDisposable we create it with "await using"
await using var client = new ServiceBusClient(connectionString);
// create the sender
ServiceBusSender sender = client.CreateSender(queueName);
// create a message that we can send. UTF-8 encoding is used when providing a string.
ServiceBusMessage message = new ServiceBusMessage("Hello world!");
// send the message
await sender.SendMessageAsync(message);
// create a receiver that we can use to receive the message
ServiceBusReceiver receiver = client.CreateReceiver(queueName);
// the received message is a different type as it contains some service set properties
ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();
// get the message body as a string
string body = receivedMessage.Body.ToString();
Console.WriteLine(body);
メッセージを送る側、受ける側はTopics
, Subscriptions
という単位でキューを分けることができます。Queue
とTopic
の差分はこちらでAshish Patelさんによってまとまっていますので、参照ください。
https://medium.com/awesome-azure/azure-difference-between-azure-service-bus-queues-and-topics-comparison-azure-servicebus-queue-vs-topic-4cc97770b65#:~:text=Queues%20and%20Topics%20are%20similar,topic%20can%20have%20multiple%20subscribers.
↑で使われている図を拝借。
Queue処理
Topic処理
https://miro.medium.com/v2/resize:fit:1400/format:webp/1*NxCtGLUMbZPAdPzPh98nLg.png
サンプルコードは下記です。
string connectionString = "<connection_string>";
string topicName = "<topic_name>";
string subscriptionName = "<subscription_name>";
// since ServiceBusClient implements IAsyncDisposable we create it with "await using"
await using var client = new ServiceBusClient(connectionString);
// create the sender that we will use to send to our topic
ServiceBusSender sender = client.CreateSender(topicName);
// create a message that we can send. UTF-8 encoding is used when providing a string.
ServiceBusMessage message = new ServiceBusMessage("Hello world!");
// send the message
await sender.SendMessageAsync(message);
// create a receiver for our subscription that we can use to receive the message
ServiceBusReceiver receiver = client.CreateReceiver(topicName, subscriptionName);
// the received message is a different type as it contains some service set properties
ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();
// get the message body as a string
string body = receivedMessage.Body.ToString();
Console.WriteLine(body);
メッセージを受取る側は受け取る際に、settling
をします。settling
とは、メッセージを受けったので、ほかの人が受け取らないようにロックする行為。
サンプルコードは下記です。
string connectionString = "<connection_string>";
string queueName = "<queue_name>";
// since ServiceBusClient implements IAsyncDisposable we create it with "await using"
await using var client = new ServiceBusClient(connectionString);
// create the sender
ServiceBusSender sender = client.CreateSender(queueName);
// create a message that we can send
ServiceBusMessage message = new ServiceBusMessage("Hello world!");
// send the message
await sender.SendMessageAsync(message);
// create a receiver that we can use to receive and settle the message
ServiceBusReceiver receiver = client.CreateReceiver(queueName);
// the received message is a different type as it contains some service set properties
ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();
// complete the message, thereby deleting it from the service
await receiver.CompleteMessageAsync(receivedMessage);
キュー処理、管理のオペレーション
次はキューなどのオペレーションを作成、管理します。
こちらのコードを参照しました。
https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample07_CrudOperations.md
まずは、キューを作成します。キュー作成のサンプルコードはこちらです。
ここでの作業はポータルからも可能なので、複雑な利用でなければポータルで作成・管理でもいいかもしれません。
string connectionString = "<connection_string>";
string queueName = "<queue_name>";
var client = new ServiceBusAdministrationClient(connectionString);
var options = new CreateQueueOptions(queueName)
{
AutoDeleteOnIdle = TimeSpan.FromDays(7),
DefaultMessageTimeToLive = TimeSpan.FromDays(2),
DuplicateDetectionHistoryTimeWindow = TimeSpan.FromMinutes(1),
EnableBatchedOperations = true,
DeadLetteringOnMessageExpiration = true,
EnablePartitioning = false,
ForwardDeadLetteredMessagesTo = null,
ForwardTo = null,
LockDuration = TimeSpan.FromSeconds(45),
MaxDeliveryCount = 8,
MaxSizeInMegabytes = 2048,
RequiresDuplicateDetection = true,
RequiresSession = true,
UserMetadata = "some metadata"
};
options.AuthorizationRules.Add(new SharedAccessAuthorizationRule(
"allClaims",
new[] { AccessRights.Manage, AccessRights.Send, AccessRights.Listen }));
QueueProperties createdQueue = await client.CreateQueueAsync(options);
キューの取得
QueueProperties queue = await client.GetQueueAsync(queueName);
キューの更新
queue.LockDuration = TimeSpan.FromSeconds(60);
QueueProperties updatedQueue = await client.UpdateQueueAsync(queue);
キューの削除
await client.DeleteQueueAsync(queueName);
キューの削除
await client.DeleteQueueAsync(queueName);
Topc
Subscription
の作成
string connectionString = "<connection_string>";
string topicName = "<topic_name>";
var client = new ServiceBusAdministrationClient(connectionString);
var topicOptions = new CreateTopicOptions(topicName)
{
AutoDeleteOnIdle = TimeSpan.FromDays(7),
DefaultMessageTimeToLive = TimeSpan.FromDays(2),
DuplicateDetectionHistoryTimeWindow = TimeSpan.FromMinutes(1),
EnableBatchedOperations = true,
EnablePartitioning = false,
MaxSizeInMegabytes = 2048,
RequiresDuplicateDetection = true,
UserMetadata = "some metadata"
};
topicOptions.AuthorizationRules.Add(new SharedAccessAuthorizationRule(
"allClaims",
new[] { AccessRights.Manage, AccessRights.Send, AccessRights.Listen }));
TopicProperties createdTopic = await client.CreateTopicAsync(topicOptions);
string subscriptionName = "<subscription_name>";
var subscriptionOptions = new CreateSubscriptionOptions(topicName, subscriptionName)
{
AutoDeleteOnIdle = TimeSpan.FromDays(7),
DefaultMessageTimeToLive = TimeSpan.FromDays(2),
EnableBatchedOperations = true,
UserMetadata = "some metadata"
};
SubscriptionProperties createdSubscription = await client.CreateSubscriptionAsync(subscriptionOptions);
Topic
の取得
TopicProperties topic = await client.GetTopicAsync(topicName);
Subscription
の取得
SubscriptionProperties subscription = await client.GetSubscriptionAsync(topicName, subscriptionName);
Topic
の更新
topic.UserMetadata = "some metadata";
TopicProperties updatedTopic = await client.UpdateTopicAsync(topic);
Subscription
の取得
topic.UserMetadata = "some metadata";
TopicProperties updatedTopic = await client.UpdateTopicAsync(topic);
Topic
の削除
await client.DeleteSubscriptionAsync(topicName, subscriptionName);
Subscription
の削除
await client.DeleteTopicAsync(topicName);
実際のコード、Lineのメッセージを処理
今回はLineからきたメッセージを処理してService Bus
にてキューに送ります。
下記のコードはLineからメッセージを受取り、Service Bus
のキューにメッセージ保存しています。(一部関係ないので削除しています。)
/// <summary>
/// Receive line message, wh.
/// </summary>
/// <returns></returns>
[HttpPost, Route("{YOUR ROUTE}")]
public async Task<IActionResult> Receiver()
{
// Set stream reader
using var reader = new StreamReader(HttpContext.Request.Body);
// Read to the end
var body = await reader.ReadToEndAsync();
// Map to the object
var message = JsonSerializer.Deserialize<LineReceiver>(body);
// Get signature
var signature = Request.Headers["X-Line-Signature"];
// Set raw body content for validation
var text = body;
// Let's make sure the request is valid.
if (IsSingatureOk(signature, text, _config.Value.LineOfficialChannel.ChannelSecret))
{
foreach (var eventItem in message.events)
{
//
switch (eventItem.type)
{
case "message":
// Get the user
var user = _accountServices.GetUserByMid(eventItem.source.userId);
switch (eventItem.message.type)
{
case "image":
// Sned message to the que.
var jsonResult = await SendMessageToServiceBusAsync(message);
break;
}
return Ok();
default:
return Ok();
}
}
// Not OK, but OK for demo.
return Ok();
}
// Bad signature.
return BadRequest();
}
/// <summary>
/// Sends a serialized LineReceiver object to an Azure Service Bus queue using Azure.Messaging.ServiceBus library and returns the serialized JSON string.
/// </summary>
/// <param name="lineMessage"></param>
/// <returns></returns>
private async Task<string> SendMessageToServiceBusAsync(LineReceiver lineMessage)
{
var lineReceiverJsonString = JsonSerializer.Serialize(lineMessage);
// using Azure.Messaging.ServiceBus;
string connectionString = _config.Value.AzureServiceBus.ConnectionString;
string queueName = "line-messages";
var serviceBusConnectionLog = new Dictionary<string, string>();
serviceBusConnectionLog.Add("service-bus-connection", connectionString);
_telemetry.TrackEvent("SendMessageToServiceBusAsync", serviceBusConnectionLog);
// Because ServiceBusClient implements IAsyncDisposable, we'll create it
// with "await using" so that it is automatically disposed for us.
await using var client = new ServiceBusClient(connectionString);
// The sender is responsible for publishing messages to the queue.
ServiceBusSender sender = client.CreateSender(queueName);
ServiceBusMessage message = new ServiceBusMessage(lineReceiverJsonString);
await sender.SendMessageAsync(message);
return lineReceiverJsonString;
}
一旦メッセージをキューにいれた段階でOK()を返しています。そのあと下記のファンクションはキューにメッセージが入った時に発火されるものです。(Service Bus Trigger)
[FunctionName("{FUNCTION NAME HERE}")]
public async Task Run([ServiceBusTrigger("{QUEUE NAME HERE}")]string myQueueItem, ILogger log)
{
log.LogInformation($"C# ServiceBus queue trigger function processed message: {myQueueItem}");
// ここで処理
}
LINEから画像を受取るとFunctionが正しく稼働され、処理されていることを確認しました。
この設計で多量の処理リクエストがあった際にもLINEに200を返すことができ、エラーなく処理できるようになりました。
最後に
もっと複雑な処理や利用方法がありますので、是非オフィシャルドキュメントの方を参照して活用ください。
紙レシート削減目指し、絶賛採用中、+是非下記のURLからサービスご利用ください。今回記載した仕組みが実際に搭載されています。そして絶賛採用中です(フルリモート、C#、AZURE)。