1
3

More than 1 year has passed since last update.

[Azure] Service Busを介してLINEからのメッセージを処理する。

Posted at

本日はAzure Service Busを導入していきます。Service Busとはざっくり言うとキューシステムです。数多くの処理を複数のユーザーから受けるときに一旦リクエストを受け、キューに入れて処理する際に使います。 今回はLineから送られてくる、レシート処理のリクエストを一旦キューに入れ、順に処理をするような利用をします。

Azure Service Busとは

詳細は「またえこうじ@ぴたデジの人」さんにて、こちらの記事にまとめられていますので、ご参照ください。
https://note.com/koji_matae/n/nb4335177b512

Service Busのサービスページはこちら
https://azure.microsoft.com/ja-jp/products/service-bus

詳細はこちらを参照
https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-overview
https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues?tabs=passwordless

Service BusまたはStorage Queuesどちらが適しているのか悩んでいる方はこちらのドキュメントを参照ください。
https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-azure-and-service-bus-queues-compared-contrasted

基本Service Busの方が多くの機能を備えていますので、今後複雑なキュー処理を想定しているなら、Service Bus、シンプルにメッセージを処理するだけなら、Storage Queuesでいいかと思います。(それだけではないけど)

導入はこちらのドキュメントを参照しました。
https://learn.microsoft.com/en-us/dotnet/api/overview/azure/service-bus?preserve-view=true&view=azure-dotnet

準備

では、さっそく準備に入ります。必要なライブラリーをインストールします。

PM > Install-Package Azure.Messaging.ServiceBus

簡単なサンプルコード、メッセージを送る、受け取る。

サンプルコードが公開されていますので、まずはそちらを見て理解していきます。キュー内のメッセージの送信と受信は下記です。

サンプルコードはこちら参照
https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample01_SendReceive.md

string connectionString = "<connection_string>";
string queueName = "<queue_name>";
// since ServiceBusClient implements IAsyncDisposable we create it with "await using"
await using var client = new ServiceBusClient(connectionString);

// create the sender
ServiceBusSender sender = client.CreateSender(queueName);

// create a message that we can send. UTF-8 encoding is used when providing a string.
ServiceBusMessage message = new ServiceBusMessage("Hello world!");

// send the message
await sender.SendMessageAsync(message);

// create a receiver that we can use to receive the message
ServiceBusReceiver receiver = client.CreateReceiver(queueName);

// the received message is a different type as it contains some service set properties
ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();

// get the message body as a string
string body = receivedMessage.Body.ToString();
Console.WriteLine(body);

メッセージを送る側、受ける側はTopics, Subscriptionsという単位でキューを分けることができます。QueueTopicの差分はこちらでAshish Patelさんによってまとまっていますので、参照ください。
https://medium.com/awesome-azure/azure-difference-between-azure-service-bus-queues-and-topics-comparison-azure-servicebus-queue-vs-topic-4cc97770b65#:~:text=Queues%20and%20Topics%20are%20similar,topic%20can%20have%20multiple%20subscribers.

↑で使われている図を拝借。

Queue処理

image.png

Topic処理

image.png
https://miro.medium.com/v2/resize:fit:1400/format:webp/1*NxCtGLUMbZPAdPzPh98nLg.png

サンプルコードは下記です。

string connectionString = "<connection_string>";
string topicName = "<topic_name>";
string subscriptionName = "<subscription_name>";
// since ServiceBusClient implements IAsyncDisposable we create it with "await using"
await using var client = new ServiceBusClient(connectionString);

// create the sender that we will use to send to our topic
ServiceBusSender sender = client.CreateSender(topicName);

// create a message that we can send. UTF-8 encoding is used when providing a string.
ServiceBusMessage message = new ServiceBusMessage("Hello world!");

// send the message
await sender.SendMessageAsync(message);

// create a receiver for our subscription that we can use to receive the message
ServiceBusReceiver receiver = client.CreateReceiver(topicName, subscriptionName);

// the received message is a different type as it contains some service set properties
ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();

// get the message body as a string
string body = receivedMessage.Body.ToString();
Console.WriteLine(body);

メッセージを受取る側は受け取る際に、settlingをします。settlingとは、メッセージを受けったので、ほかの人が受け取らないようにロックする行為。

サンプルコードは下記です。

サンプルコードはこちら参照
https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample02_MessageSettlement.md

string connectionString = "<connection_string>";
string queueName = "<queue_name>";
// since ServiceBusClient implements IAsyncDisposable we create it with "await using"
await using var client = new ServiceBusClient(connectionString);

// create the sender
ServiceBusSender sender = client.CreateSender(queueName);

// create a message that we can send
ServiceBusMessage message = new ServiceBusMessage("Hello world!");

// send the message
await sender.SendMessageAsync(message);

// create a receiver that we can use to receive and settle the message
ServiceBusReceiver receiver = client.CreateReceiver(queueName);

// the received message is a different type as it contains some service set properties
ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();

// complete the message, thereby deleting it from the service
await receiver.CompleteMessageAsync(receivedMessage);

キュー処理、管理のオペレーション

次はキューなどのオペレーションを作成、管理します。

こちらのコードを参照しました。
https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample07_CrudOperations.md

まずは、キューを作成します。キュー作成のサンプルコードはこちらです。

ここでの作業はポータルからも可能なので、複雑な利用でなければポータルで作成・管理でもいいかもしれません。


string connectionString = "<connection_string>";
string queueName = "<queue_name>";
var client = new ServiceBusAdministrationClient(connectionString);
var options = new CreateQueueOptions(queueName)
{
    AutoDeleteOnIdle = TimeSpan.FromDays(7),
    DefaultMessageTimeToLive = TimeSpan.FromDays(2),
    DuplicateDetectionHistoryTimeWindow = TimeSpan.FromMinutes(1),
    EnableBatchedOperations = true,
    DeadLetteringOnMessageExpiration = true,
    EnablePartitioning = false,
    ForwardDeadLetteredMessagesTo = null,
    ForwardTo = null,
    LockDuration = TimeSpan.FromSeconds(45),
    MaxDeliveryCount = 8,
    MaxSizeInMegabytes = 2048,
    RequiresDuplicateDetection = true,
    RequiresSession = true,
    UserMetadata = "some metadata"
};

options.AuthorizationRules.Add(new SharedAccessAuthorizationRule(
    "allClaims",
    new[] { AccessRights.Manage, AccessRights.Send, AccessRights.Listen }));

QueueProperties createdQueue = await client.CreateQueueAsync(options);

キューの取得

QueueProperties queue = await client.GetQueueAsync(queueName);

キューの更新

queue.LockDuration = TimeSpan.FromSeconds(60);
QueueProperties updatedQueue = await client.UpdateQueueAsync(queue);

キューの削除

await client.DeleteQueueAsync(queueName);

キューの削除

await client.DeleteQueueAsync(queueName);

Topc Subscriptionの作成

string connectionString = "<connection_string>";
string topicName = "<topic_name>";
var client = new ServiceBusAdministrationClient(connectionString);
var topicOptions = new CreateTopicOptions(topicName)
{
    AutoDeleteOnIdle = TimeSpan.FromDays(7),
    DefaultMessageTimeToLive = TimeSpan.FromDays(2),
    DuplicateDetectionHistoryTimeWindow = TimeSpan.FromMinutes(1),
    EnableBatchedOperations = true,
    EnablePartitioning = false,
    MaxSizeInMegabytes = 2048,
    RequiresDuplicateDetection = true,
    UserMetadata = "some metadata"
};

topicOptions.AuthorizationRules.Add(new SharedAccessAuthorizationRule(
    "allClaims",
    new[] { AccessRights.Manage, AccessRights.Send, AccessRights.Listen }));

TopicProperties createdTopic = await client.CreateTopicAsync(topicOptions);

string subscriptionName = "<subscription_name>";
var subscriptionOptions = new CreateSubscriptionOptions(topicName, subscriptionName)
{
    AutoDeleteOnIdle = TimeSpan.FromDays(7),
    DefaultMessageTimeToLive = TimeSpan.FromDays(2),
    EnableBatchedOperations = true,
    UserMetadata = "some metadata"
};
SubscriptionProperties createdSubscription = await client.CreateSubscriptionAsync(subscriptionOptions);

Topicの取得

TopicProperties topic = await client.GetTopicAsync(topicName);

Subscriptionの取得

SubscriptionProperties subscription = await client.GetSubscriptionAsync(topicName, subscriptionName);

Topicの更新

topic.UserMetadata = "some metadata";
TopicProperties updatedTopic = await client.UpdateTopicAsync(topic);

Subscriptionの取得

topic.UserMetadata = "some metadata";
TopicProperties updatedTopic = await client.UpdateTopicAsync(topic);

Topicの削除

await client.DeleteSubscriptionAsync(topicName, subscriptionName);

Subscriptionの削除

await client.DeleteTopicAsync(topicName);

実際のコード、Lineのメッセージを処理

今回はLineからきたメッセージを処理してService Busにてキューに送ります。

下記のコードはLineからメッセージを受取り、Service Busのキューにメッセージ保存しています。(一部関係ないので削除しています。)


/// <summary>
        /// Receive line message, wh.
        /// </summary>
        /// <returns></returns>
        [HttpPost, Route("{YOUR ROUTE}")]
   public async Task<IActionResult> Receiver()
        {
            // Set stream reader
            using var reader = new StreamReader(HttpContext.Request.Body);

            // Read to the end
            var body = await reader.ReadToEndAsync();

            // Map to the object
            var message = JsonSerializer.Deserialize<LineReceiver>(body);

            // Get signature
            var signature = Request.Headers["X-Line-Signature"];

            // Set raw body content for validation
            var text = body;

            // Let's make sure the request is valid.
            if (IsSingatureOk(signature, text, _config.Value.LineOfficialChannel.ChannelSecret))
            {

                foreach (var eventItem in message.events)
                {
                    // 
                    switch (eventItem.type)
                    {
                     
                        case "message":

                            // Get the user
                            var user = _accountServices.GetUserByMid(eventItem.source.userId);

                            switch (eventItem.message.type)
                            {
                                case "image":

                                    // Sned message to the que.
                                    var jsonResult = await SendMessageToServiceBusAsync(message);

                                    break;
                            }

                            return Ok();

                        default:
                            return Ok();
                    }
                }

                // Not OK, but OK for demo.
                return Ok();
            }

            // Bad signature.
            return BadRequest();

        }


   /// <summary>
        /// Sends a serialized LineReceiver object to an Azure Service Bus queue using Azure.Messaging.ServiceBus library and returns the serialized JSON string.
        /// </summary>
        /// <param name="lineMessage"></param>
        /// <returns></returns>
 private async Task<string> SendMessageToServiceBusAsync(LineReceiver lineMessage)
        {
            var lineReceiverJsonString = JsonSerializer.Serialize(lineMessage);

            // using Azure.Messaging.ServiceBus;
            string connectionString = _config.Value.AzureServiceBus.ConnectionString;
            string queueName = "line-messages";

            var serviceBusConnectionLog = new Dictionary<string, string>();
            serviceBusConnectionLog.Add("service-bus-connection", connectionString);
            _telemetry.TrackEvent("SendMessageToServiceBusAsync", serviceBusConnectionLog);

            // Because ServiceBusClient implements IAsyncDisposable, we'll create it 
            // with "await using" so that it is automatically disposed for us.
            await using var client = new ServiceBusClient(connectionString);

            // The sender is responsible for publishing messages to the queue.
            ServiceBusSender sender = client.CreateSender(queueName);
            ServiceBusMessage message = new ServiceBusMessage(lineReceiverJsonString);

            await sender.SendMessageAsync(message);

            return lineReceiverJsonString;
        }

一旦メッセージをキューにいれた段階でOK()を返しています。そのあと下記のファンクションはキューにメッセージが入った時に発火されるものです。(Service Bus Trigger)

こちらを参照しました。
https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus-trigger?tabs=in-process%2Cextensionv5&pivots=programming-language-csharp


  [FunctionName("{FUNCTION NAME HERE}")]
        public async Task Run([ServiceBusTrigger("{QUEUE NAME HERE}")]string myQueueItem, ILogger log)
        {
            log.LogInformation($"C# ServiceBus queue trigger function processed message: {myQueueItem}");

// ここで処理
        }

LINEから画像を受取るとFunctionが正しく稼働され、処理されていることを確認しました。
image.png

この設計で多量の処理リクエストがあった際にもLINEに200を返すことができ、エラーなく処理できるようになりました。

最後に

もっと複雑な処理や利用方法がありますので、是非オフィシャルドキュメントの方を参照して活用ください。

紙レシート削減目指し、絶賛採用中、+是非下記のURLからサービスご利用ください。今回記載した仕組みが実際に搭載されています。そして絶賛採用中です(フルリモート、C#、AZURE)。

1
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
3