LoginSignup
0
0

More than 3 years have passed since last update.

Service Bus Queueを Azure SDK for Go で学んでみる (1)

Last updated at Posted at 2020-10-04

Keda へのコントリビュートに際して、あまり触ってこなかった Service Bus の挙動について知る必要があったので、簡単に調査してみた。
ライブラリは下記のものを使用している。
* Microsoft Azure Service Bus Client for Golang
* GodDoc servicebus

Queue Client

Queueのオペレーションができるように認証を行う。ServiceBus と Queue は事前に作っている。接続のためには、ServiceBus namespaceのConnectionString が必要なので、取得して、環境変数から渡すようにしている。NameSpaceのインスタンスをConnectionString を渡して作成し、NameSpace から NewQueue メソッドで Queue のクライアントが生成される。とても単純でいい感じだ。

func main() {
    fmt.Println("Azure ServiceBus Queue Sender")
     connectionString := os.Getenv("ConnectionString")
     queueName := os.Getenv("queueName")
     if len(os.Args) != 2 {
        log.Fatalf("Specify the counter parameter. e.g. send 100 Parameter length: %d\n", len(os.Args))
     }
     count, err := strconv.Atoi(os.Args[1])
     if err != nil {
        log.Fatalf("count should be integer : %s", os.Args[1])
     }
    // Create a client to communicate with a Service Bus Namespace
    ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connectionString))
    if err != nil {
        log.Fatal("Cannot create a client for the Service Bus Namespace", err)
    }

    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    // Create a client to communicate with the queue

    q, err := ns.NewQueue(queueName)
    if err != nil {
        log.Fatal("Cannot create a client for the queue", err)
    }

Send Message

メッセージの送信は、Queue のオブジェクトに対して、Send メソッドを使用すればよい。一括で送信したい場合は、func (q *Queue) SendBatch(ctx context.Context, iterator BatchIterator) error が存在するのでそちらを使えばよいだろう。

for i := 0; i < count; i++ {
    err = q.Send(ctx, servicebus.NewMessageFromString("Hello!"))
    fmt.Printf("Send Hello %d \n", i)
    if err != nil {
        log.Fatal(err)
    }
}

Receive Message

メッセージの受信もとても単純で、コールバックのハンドラを渡しておくと、メッセージが来たらハンドラが実行される。一瞬非同期に見えるが、ReceiveOne は、ハンドラに制御が戻ってきて終わるまでブロックする挙動になっている。ポイントは最後の message.Complete() メソッド。デフォルトでは、Service Bus Queue は PeekLockという方式になっている。Receive で Message を取得した時点で、メッセージがロックされてほかのクライアントから見えなくなる。そして、処理がうまくいったら、Complete を発行した時点で Message が削除される。うまくいかなかったら Abandon を発行すると、Message の実行が失敗したとみなされる。所定の回数失敗すると、DeadQueue にメッセージが自動で転送される。

err = q.ReceiveOne(
    ctx,
    servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) error {
        fmt.Println(string(message.Data))
        return message.Complete(ctx)
    }))

DeadLetter

DeadLetter へのMessageの送信は、次の場合に自動で DeadLetter に転送される。Moving messages to the DLQ

  • ヘッダサイズの超過
  • TTLの有効期限切れ
  • SessionIDがnull
  • MaxTransferHopCount Queueの間の転送上限。4
  • MaxDeliveryCountExceeded Message の受信が何回も失敗したケース デフォルト 10

しかし、それ以外にも自分のアプリケーションの都合で、DeadLetter に送りたくなるケースがあるだろう。その場合は、Message の DeadLetter() を使うと DeadLetter に転送される。この挙動は、PeekLock のモードのみで動作して、後ほど説明する ReceiveAndDelete のモードでは使用することができない。

個人的には DeadLetter() よりも、DeadLetterWithInfo() のほうがよさげだ。なぜかというと、カスタムのプロパティを渡して、例えば、DeadLetter に入った理由などをメタデータとして持たせることができる。ほかにも errorservicebus.ErrorInternalError といったプロパティも渡しているのだが、どこでこのメタデータを見れるのかは今はまだわかっていない。少なくとも負荷情報を追加できるこちらの方が良いだろう。

err = q.ReceiveOne(
    ctx,
    servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) error {

        fmt.Println(message.LockToken.String())  // It will be null when the ReceiveAndDelete mode
        fmt.Println(string(message.Data))
        fmt.Println("Waiting...")
        time.Sleep(time.Second * 10)
        fmt.Println("Done.")
        m := make(map[string]string)
        m["rootCause"] = "hagechabin"
        return message.DeadLetterWithInfo(ctx, errors.New("go to deadletter"), servicebus.ErrorInternalError, m) // Doesn't work for ReceiveAndDelete

    }))

Service Bus Explorer で見ると、しっかりとメタデータを読み取ることができる。
image.png

DeadLetterOnMessageExpiration

メッセージがExpireしたら自動でDeadLetterに行くように設定できるのだが、デフォルトではない。QueueManager というインスタンスに対して設定してあげると良い。ただし、これは2回実行すると、Conflict のエラーになる。実際のところは、ServiceBus を作成する時点で、ARM Template で Queue を作るようにするとよさそう。このAPIはどうやらARMのAPIらしく、Createを見てみると、BODYを持っている。おそらく、ARMTemplateをBodyに渡すのだろう。ちなみに、Putの方は、Bodyは無いが、なかったら作ってくれる挙動なので、テストを書くときはこちらの方がよさそう。削除して、Putで作るなら、Conflict のエラーは発生しないと思う。

    qm := ns.NewQueueManager()
    qe, err := qm.Put(ctx, "abc", servicebus.QueueEntityWithDeadLetteringOnMessageExpiration()) // Doesn't work
    if err != nil {
        fmt.Println("error :", err.Error())
    } else {
        fmt.Println("endity anme: ", qe.Name)
    }

設定すると、次のように設定が変更されていることがわかる。

image.png

ReceiveAndDelete Mode

PeekLock は、最初にロックをメッセージにかけて、Complete で削除という挙動だが、ReceiveAndDelete は受け取った時点で、Queue から Message を削除する。であるので、DeadLetter に自分で転送することはできない。DeadLetteのQueueのAPIも用意されているが、読み込み専用なので、作者の意図として、DeadLetterに自分で送る処理は、ReceiveAndDelete では書いてほしくないのだと思う。どうしてもやりたかったらカスタムのQueueを作ってそちらに流す方がいいだろう。設定の変更は簡単で次のようなオプションをQueueのクライアントを作るときに指定するだけだ。

q, err := ns.NewQueue(queueName, servicebus.QueueWithReceiveAndDelete())

Active Message Count

実はすべての今回の調査はメッセージカウントの調査をするためだった。*entity.CountDetails.ActiveMessageCount を使うと、Queueのメッセージ数が取得されるのだが、問題は、現在ロック中のメッセージの数もカウントされてしまうことだ。
私は KEDA の Scaler を書いているので、現在処理中以外のメッセージの数が必要だ。どうしたらいいだろう?残念ながら方法は現在のところ見つかっていない。Peek を使って、そこに、LockToken というのがあったので、これが nil 出ないもの数に限定するというのもやってみたが、どうも、受け取り側でロックがかかったものとの数が違うので、ちょっと違うもののようだ。

自分のカスタムのアプリだと、メッセージを取得した時点で、IDをMapに書いておいて、Completeや、Abandanで消すようにしておいて、それを、Peekでとってきたときに参照する。という手は使えるが、私の書いているアプリケーションの構造は、抽象化されており、スケールロジックと、個別のリソースに対するスケーラーは、抽象化されているのと、StorageQueueとかだと、カウントがちゃんと取れるので問題にならない。

対策としては、Messageを受け取る側で、 ReceiveAndDeleteにしてあげると、カウントの数は、妥当なものになるが、通常は、PeekLock で運用したいと思うので微妙である。継続して調査して、分かったらこのブログを更新してみたい。

    m := ns.NewQueueManager()
    ctx2 := context.Background()

    for {
        entity, _ := m.Get(ctx2, queueName)
        fmt.Println("ActiveMessageCount: ", *entity.CountDetails.ActiveMessageCount)
        ctx := context.Background()
        iterator, _ := q.Peek(ctx)
        var c int
        for {
            item, _ := iterator.Next(ctx)
            if item != nil && item.LockToken != nil {
                fmt.Printf("lockToken: %s \n", item.LockToken.String())
            } else {
                if item != nil {
                    fmt.Println("lockToken: nil")
                    c++
                } else {
                    fmt.Println("item is nil")
                }
            }
            if item != nil {
                body, _ := json.Marshal(item)
                fmt.Println(string(body))
            }

            if item == nil {
                iterator.Done()
                break
            }
        }
        fmt.Println("Count: ", c)
        time.Sleep(time.Second * 2)
    }

まとめ

調査の目的だったロックされているメッセージ以外のカウントを取得する方法の検証は出来なかったが、ServiceBusの周辺知識と、Go SDK の挙動はいろいろ確認できたのでブログにまとめてみた。ソースコードは、リソースのところに置いておいたので、もしよかったら試してもらっても結構だ。

Resource

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0