Keda へのコントリビュートに際して、あまり触ってこなかった Service Bus の挙動について知る必要があったので、簡単に調査してみた。
ライブラリは下記のものを使用している。
* Microsoft Azure Service Bus Client for Golang
* GodDoc servicebus
Queue Client
Queueのオペレーションができるように認証を行う。ServiceBus と Queue は事前に作っている。接続のためには、ServiceBus namespaceのConnectionString
が必要なので、取得して、環境変数から渡すようにしている。NameSpace
のインスタンスをConnectionString
を渡して作成し、NameSpace
から NewQueue
メソッドで Queue のクライアントが生成される。とても単純でいい感じだ。
func main() {
fmt.Println("Azure ServiceBus Queue Sender")
connectionString := os.Getenv("ConnectionString")
queueName := os.Getenv("queueName")
if len(os.Args) != 2 {
log.Fatalf("Specify the counter parameter. e.g. send 100 Parameter length: %d\n", len(os.Args))
}
count, err := strconv.Atoi(os.Args[1])
if err != nil {
log.Fatalf("count should be integer : %s", os.Args[1])
}
// Create a client to communicate with a Service Bus Namespace
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connectionString))
if err != nil {
log.Fatal("Cannot create a client for the Service Bus Namespace", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Create a client to communicate with the queue
q, err := ns.NewQueue(queueName)
if err != nil {
log.Fatal("Cannot create a client for the queue", err)
}
Send Message
メッセージの送信は、Queue のオブジェクトに対して、Send
メソッドを使用すればよい。一括で送信したい場合は、func (q *Queue) SendBatch(ctx context.Context, iterator BatchIterator) error
が存在するのでそちらを使えばよいだろう。
for i := 0; i < count; i++ {
err = q.Send(ctx, servicebus.NewMessageFromString("Hello!"))
fmt.Printf("Send Hello %d \n", i)
if err != nil {
log.Fatal(err)
}
}
Receive Message
メッセージの受信もとても単純で、コールバックのハンドラを渡しておくと、メッセージが来たらハンドラが実行される。一瞬非同期に見えるが、ReceiveOne
は、ハンドラに制御が戻ってきて終わるまでブロックする挙動になっている。ポイントは最後の message.Complete()
メソッド。デフォルトでは、Service Bus Queue は PeekLockという方式になっている。Receive
で Message を取得した時点で、メッセージがロックされてほかのクライアントから見えなくなる。そして、処理がうまくいったら、Complete
を発行した時点で Message が削除される。うまくいかなかったら Abandon
を発行すると、Message の実行が失敗したとみなされる。所定の回数失敗すると、DeadQueue にメッセージが自動で転送される。
err = q.ReceiveOne(
ctx,
servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) error {
fmt.Println(string(message.Data))
return message.Complete(ctx)
}))
DeadLetter
DeadLetter へのMessageの送信は、次の場合に自動で DeadLetter に転送される。Moving messages to the DLQ。
- ヘッダサイズの超過
- TTLの有効期限切れ
- SessionIDがnull
- MaxTransferHopCount Queueの間の転送上限。4
- MaxDeliveryCountExceeded Message の受信が何回も失敗したケース デフォルト 10
しかし、それ以外にも自分のアプリケーションの都合で、DeadLetter に送りたくなるケースがあるだろう。その場合は、Message の DeadLetter()
を使うと DeadLetter に転送される。この挙動は、PeekLock
のモードのみで動作して、後ほど説明する ReceiveAndDelete
のモードでは使用することができない。
個人的には DeadLetter()
よりも、DeadLetterWithInfo()
のほうがよさげだ。なぜかというと、カスタムのプロパティを渡して、例えば、DeadLetter に入った理由などをメタデータとして持たせることができる。ほかにも error
や servicebus.ErrorInternalError
といったプロパティも渡しているのだが、どこでこのメタデータを見れるのかは今はまだわかっていない。少なくとも負荷情報を追加できるこちらの方が良いだろう。
err = q.ReceiveOne(
ctx,
servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) error {
fmt.Println(message.LockToken.String()) // It will be null when the ReceiveAndDelete mode
fmt.Println(string(message.Data))
fmt.Println("Waiting...")
time.Sleep(time.Second * 10)
fmt.Println("Done.")
m := make(map[string]string)
m["rootCause"] = "hagechabin"
return message.DeadLetterWithInfo(ctx, errors.New("go to deadletter"), servicebus.ErrorInternalError, m) // Doesn't work for ReceiveAndDelete
}))
Service Bus Explorer で見ると、しっかりとメタデータを読み取ることができる。
DeadLetterOnMessageExpiration
メッセージがExpireしたら自動でDeadLetterに行くように設定できるのだが、デフォルトではない。QueueManager
というインスタンスに対して設定してあげると良い。ただし、これは2回実行すると、Conflict
のエラーになる。実際のところは、ServiceBus を作成する時点で、ARM Template で Queue を作るようにするとよさそう。このAPIはどうやらARMのAPIらしく、Create
を見てみると、BODYを持っている。おそらく、ARMTemplateをBodyに渡すのだろう。ちなみに、Putの方は、Bodyは無いが、なかったら作ってくれる挙動なので、テストを書くときはこちらの方がよさそう。削除して、Putで作るなら、Conflict
のエラーは発生しないと思う。
qm := ns.NewQueueManager()
qe, err := qm.Put(ctx, "abc", servicebus.QueueEntityWithDeadLetteringOnMessageExpiration()) // Doesn't work
if err != nil {
fmt.Println("error :", err.Error())
} else {
fmt.Println("endity anme: ", qe.Name)
}
設定すると、次のように設定が変更されていることがわかる。
ReceiveAndDelete Mode
PeekLock
は、最初にロックをメッセージにかけて、Complete
で削除という挙動だが、ReceiveAndDelete
は受け取った時点で、Queue から Message を削除する。であるので、DeadLetter に自分で転送することはできない。DeadLetteのQueueのAPIも用意されているが、読み込み専用なので、作者の意図として、DeadLetterに自分で送る処理は、ReceiveAndDelete
では書いてほしくないのだと思う。どうしてもやりたかったらカスタムのQueueを作ってそちらに流す方がいいだろう。設定の変更は簡単で次のようなオプションをQueueのクライアントを作るときに指定するだけだ。
q, err := ns.NewQueue(queueName, servicebus.QueueWithReceiveAndDelete())
Active Message Count
実はすべての今回の調査はメッセージカウントの調査をするためだった。*entity.CountDetails.ActiveMessageCount
を使うと、Queueのメッセージ数が取得されるのだが、問題は、現在ロック中のメッセージの数もカウントされてしまうことだ。
私は KEDA の Scaler を書いているので、現在処理中以外のメッセージの数が必要だ。どうしたらいいだろう?残念ながら方法は現在のところ見つかっていない。Peek
を使って、そこに、LockToken
というのがあったので、これが nil
出ないもの数に限定するというのもやってみたが、どうも、受け取り側でロックがかかったものとの数が違うので、ちょっと違うもののようだ。
自分のカスタムのアプリだと、メッセージを取得した時点で、IDをMapに書いておいて、Completeや、Abandanで消すようにしておいて、それを、Peekでとってきたときに参照する。という手は使えるが、私の書いているアプリケーションの構造は、抽象化されており、スケールロジックと、個別のリソースに対するスケーラーは、抽象化されているのと、StorageQueueとかだと、カウントがちゃんと取れるので問題にならない。
対策としては、Messageを受け取る側で、 ReceiveAndDelete
にしてあげると、カウントの数は、妥当なものになるが、通常は、PeekLock
で運用したいと思うので微妙である。継続して調査して、分かったらこのブログを更新してみたい。
m := ns.NewQueueManager()
ctx2 := context.Background()
for {
entity, _ := m.Get(ctx2, queueName)
fmt.Println("ActiveMessageCount: ", *entity.CountDetails.ActiveMessageCount)
ctx := context.Background()
iterator, _ := q.Peek(ctx)
var c int
for {
item, _ := iterator.Next(ctx)
if item != nil && item.LockToken != nil {
fmt.Printf("lockToken: %s \n", item.LockToken.String())
} else {
if item != nil {
fmt.Println("lockToken: nil")
c++
} else {
fmt.Println("item is nil")
}
}
if item != nil {
body, _ := json.Marshal(item)
fmt.Println(string(body))
}
if item == nil {
iterator.Done()
break
}
}
fmt.Println("Count: ", c)
time.Sleep(time.Second * 2)
}
まとめ
調査の目的だったロックされているメッセージ以外のカウントを取得する方法の検証は出来なかったが、ServiceBusの周辺知識と、Go SDK の挙動はいろいろ確認できたのでブログにまとめてみた。ソースコードは、リソースのところに置いておいたので、もしよかったら試してもらっても結構だ。