2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

キューイング処理をRedisからAzure Service Busに移行した話

Last updated at Posted at 2022-12-08

はじめに

この記事はKLab 2022 Advent Calendarの9日目になります。

社内ツールで実装されていたキューイング処理を、RedisからAzure Service Busに移行した際の知見を共有する目的で記事を作成しました。
あまり深く踏み込んだ話はないですが、キューイングサービスもしくはミドルウェアを比較検討する際などに参考にしていただければ幸いです。

Redisでキューイング処理を行う際の問題点

Redisでメッセージを格納する簡易的なキューを実装する場合、Lists型を用いて作成できます。

ここでの問題点は、メッセージをロストすることがあることです。

RedisのPOPはListからメッセージを取り出すと同時に、メッセージを削除します。
Redis側のメッセージが削除されたことで、メッセージ内容を保持しているのはPOPを実行したレシーバー側だけになります。
そのため、リキューする前にクライアント側で問題が起きた場合キューの内容を失ってしまうことがあります。
たとえば想定外のエラーや、サーバーダウンなどが考えられます。

リトライが必要ない場合など、要件によってはこの仕様でも問題ないですが、今回は以下の要件があったため、別のものに移行を検討することになりました。

  1. ジョブが成功しなかった場合リキューしてリトライできること
    a. Redisでも可能だが、リキューするまでに内容が失われる可能性がある
    b. 失敗したジョブを後でリトライするためにリキューしたい
  2. 数時間かかるジョブを処理できること
  3. ジョブが実行されている環境がダウンしても影響がないこと
    a. スポットVMを利用しているので、たまにVMが停止する
    b. 重たい処理を行うため、VMがクラッシュするかもしれない

Q. じゃあどこに移行するんですか -> A. Azure ServiceBus

いろいろ検討しましたが、最終的にAzure Service Busに移行しました。

理由

1. 要件を満たして問題点を解決できる

これが一番重要です。
Redisでもハンドリングを頑張れば要件を満たすことができそうですが、実装がたいへんなので、ミドルウェアやサービス側で解決できるならそっちのほうが良いと思いました。

詳細については問題点を解消するで解説します。

2. フルマネージドサービスなのでサーバーの保守が楽

候補としてRabbitMQも考えましたが、自前で構築して保守するのは手間だったため、このようなクラウドサービスを検討することにしました。

3. 社内ツールがAzureで動いていたため実装、保守コストが軽かった

Azureの認証処理をそのまま使えるため、実装コストを抑えることができました。

4. 使いたい機能がかなり格安で使える

キューの機能を使うためには最低限のプラン(Basic)でよく、$0.05/100万操作と格安で使うことができます。

今回のケースでは、このプランで十分でした。

5. 通信手段としてAMQPを使用しているため汎用性が高い

AMQP(Advanced Message Queuing Protocol)はRabbitMQなどでも使用されています。
汎用性が高く、もしServiceBusから移行することになってもあまり処理を変えることなく移行できそうに見えました。

また、MicrosoftはAMQPのようなオープン標準の利点として以下を挙げています。

Service Bus での Advanced Message Queuing Protocol (AMQP) 1.0 のサポート

オープン標準の利点としては、次のような点がよく挙げられます。

  • ベンダー ロックインの可能性が下がる
  • 相互運用性
  • ライブラリとツールを広範に利用できる
  • 陳腐化を予防できる
  • 知識が豊富なスタッフを利用できる
  • リスクが低く扱いやすい

問題点を解消する

さて、Azure ServiceBusを採用した理由について解説しましたので、次は移行してどう変わったかを図とコードを用いて解説します。

Azureではいろいろな言語向けのSDKが提供されているので、コード上から操作したい場合はそれらを利用するのがおすすめです。公式が出しているものはだいたいこちらから確認できます。
今回はクライアントをGoで実装していたため、Azure SDK for Goを利用して実装しました。

キューのロストを解決してリトライできるようにする

Service Busを使って、Redisで作成したような構成を考えてみます。

Service Busはデキューした際、メッセージを削除せずロックします。
キューから削除されるタイミングはレシーバーからACKが送信されたときです。
他のレシーバーが存在する場合はロック中のメッセージをスキップし、その後ろに積まれたメッセージを受信しにいきます。
また、NACKが送信されるService Bus側で設定したロック期間を超過した場合は、メッセージは削除されず、ロックが解除されます。レシーバーは再度このメッセージをデキューできるようになります。
つまり、成功する以外でメッセージは削除されないため、Redisで問題だったキューのロストを解決できます。

数時間かかるジョブを処理できるようにする

ジョブが成功したときにメッセージを削除するため、ジョブが終了するまでメッセージをロックしたいです。
しかし、Service Busが一度にロックできる期間は最長5分です。要件としてジョブの完了まで数時間かかることがわかっているので、なんとかする必要があります。
これについては、メッセージのロックを更新することで解決できます。

先程の図にロック更新処理を追加しました。
ロック更新処理は非同期で実行され、定期的にメッセージに対してロック更新を行います。
また、ジョブが完了したら実行をキャンセルするようにします。

実際に実装してみる

Azure側のリソース作成

AzureにService Bus名前空間とキューを作成します。

本記事コードの動作確認時の設定値は基本的にデフォルトで問題なかったので、特殊な設定は必要ありません。

また、今回は考慮しませんが、キューに最大配信数という設定があります。
設定された回数配信されたら、配信不能キュー(Dead Letter Queue)という特殊なキューにメッセージが転送されます。
何回かリトライしたら配信不能として扱う、みたいな処理を行いたい場合は、ここの値を変えてください。

配信不能キューにメッセージが転送される条件は他にもあるので、気になる方は以下を参照してください。

認証周り

認証はSAS認証を利用します。
以下が必要なのでAzure Portal上などから確認してください。

  • 作成したService Bus名前空間のホスト名
    • 特に設定を変えてなければ、[作成した名前空間].servicebus.windows.netとなるはず
  • アクセスキー名、アクセスキー
    • Service Bus名前空間から共有アクセスポリシーを確認する
    • 送信とリッスンの権限が必要なので設定する

一通り終わったら先程の図をコードに落としてみます。

コード

Sender
package main

import (
    "context"
    "fmt"
    "os"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func main() {
    host := os.Getenv("HOST")
    accessKeyName := os.Getenv("ACCESS_KEY_NAME")
    accessKey := os.Getenv("ACCESS_KEY")

    connStr := fmt.Sprintf("Endpoint=sb://%s/;SharedAccessKeyName=%s;SharedAccessKey=%s", host, accessKeyName, accessKey)

    client, err := azservicebus.NewClientFromConnectionString(connStr, nil)
    if err != nil {
        panic(err)
    }

    queueName := "queue-name"   // 作ったキューの名前に変更する

    sender, err := client.NewSender(
        queueName,
        nil,
    )
    if err != nil {
        panic(err)
    }

    message := "Hello Service Bus!"

    err = sender.SendMessage(context.TODO(), &azservicebus.Message{
        Body: []byte(message),
    }, nil)
    if err != nil {
        panic(err)
    }

    fmt.Printf("Message -> %s\n", message)
}

このコードではメッセージをエンキューしています。

Receiver
package main

import (
    "context"
    "fmt"
    "os"
    "time"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func main() {
    host := os.Getenv("HOST")
    accessKeyName := os.Getenv("ACCESS_KEY_NAME")
    accessKey := os.Getenv("ACCESS_KEY")

    connStr := fmt.Sprintf("Endpoint=sb://%s/;SharedAccessKeyName=%s;SharedAccessKey=%s", host, accessKeyName, accessKey)

    client, err := azservicebus.NewClientFromConnectionString(connStr, nil)
    if err != nil {
        panic(nil)
    }

    queueName := "queue-name"

    receiver, err := client.NewReceiverForQueue(
        queueName,
        nil,
    )
    if err != nil {
        panic(err)
    }

    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    // 今回は1メッセージだけ受け取る
    messages, err := receiver.ReceiveMessages(ctx,
        1,
        nil,
    )
    if err != nil {
        panic(err)
    }

    message := messages[0]

    fmt.Printf("受信したメッセージ -> %s\n", string(message.Body))

    lockCancel := make(chan struct{})

    go continuousRenewMessageLock(lockCancel, receiver, message, 10*time.Second)
    defer close(lockCancel)

    success := doSomething()

    if success {
        // ACK
        // うまくいくとキューからメッセージが削除される
        err = receiver.CompleteMessage(context.Background(), messages[0], nil)
    } else {
        // NACK
        // メッセージを削除せずロックを解除する
        err = receiver.AbandonMessage(context.Background(), messages[0], nil)
    }
    if err != nil {
        panic(err)
    }
    }

    func doSomething() bool {
    fmt.Println("ジョブ開始")
    endTime := 2
    for i := 1; i <= endTime; i++ {
        time.Sleep(1 * time.Minute) // 何らかの時間がかかるジョブ
        fmt.Printf("ジョブ実行中... 終了まで残り -> %d分\n", endTime-i)
    }
    fmt.Println("ジョブ終了")
    return true // 今回は成功したことにする
}

func continuousRenewMessageLock(cancel chan struct{}, receiver *azservicebus.Receiver,
    message *azservicebus.ReceivedMessage, interval time.Duration) {
    for {
        select {
        case <-cancel:
            fmt.Println("ロック更新処理を停止")
            return
        default:
            time.Sleep(interval)

            fmt.Printf("%d秒おきにロック更新中...\n", int(interval.Seconds()))

            err := receiver.RenewMessageLock(context.Background(), message, nil)
            if err != nil {
                fmt.Printf("ロック更新失敗 error -> %v\n", err)
                return
            }
        }
    }
}

このコードでは、メッセージをデキューしてジョブを実行し、完了したらACK(コード上ではCompleteMessage)を送信しています。
二分で終わるジョブ(time.Sleepするだけ)を実行していますが、メッセージロック期間がデフォルトのまま(30秒)であれば完了前にタイムアウトしてしまうので、ロックの更新をしないとメッセージの処理がうまくいきません。
そのため、continuousRenewMessageLockを非同期で実行し、10秒おきにロックを更新しています。

ジョブに失敗した場合を試したければdoSomethingの戻り値をfalseにしてください。
NACK(コード上ではAbandonMessage)が送信されることで、キューは削除されず、ロックが解除されます。

コードについて補足

Azure SDK for Goにたくさんサンプルコードがあるので見に行ってみるのがおすすめです。

まとめ

Service Busを活用してうまく問題点を解消できました。

もちろん要件に合わせて選定する必要はありますが、より凝ったキューイング処理をしたかったり、無駄な実装をしたくないという方はこのように特化したサービスやミドルウェアなどを検討してみてはいかがでしょうか。

また、この記事で説明されてない仕様についてはMicrosoftの公式ドキュメントを参照してください。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?