Go
golang
kubernetes

client-goのworkqueueパッケージはgoroutine safeなキューの実装を提供しています。非同期な処理をgoroutineで並列化するときに非常に便利です。

workqueue.Interface

基本的なキューのインターフェースは:

type Interface interface {
    Add(item interface{})
    Len() int
    Get() (item interface{}, shutdown bool)
    Done(item interface{})
    ShutDown()
    ShuttingDown() bool
}

で、それを実装するもっとも簡単なキューはworkqueue.Typeで、workqueue.New()で作成します。

Addでアイテムをキューに追加します。Kubernetesのコントローラを実装する場合は、リソースの名前空間/名前 (e.g., default/foo)のStringを追加することが多いです。

Getでキューからアイテムを取り出します。Getは完全にアイテムをキューから取り除くことはしません。完全に取り除くには、Doneを呼び出します。なぜこのような実装になっているかを説明しましょう。あるgoroutine AでGetでアイテムXを取り出したとします。AがXについて処理をしている間に、アイテムXが再びキューに追加されると、複数のgoroutineで並列化を行なっている場合、別のgoroutine BがGetでアイテムXを取り出して処理を開始する可能性があります。workqueueはこのような状況を回避するために、Doneを呼ばない限り、同じアイテムを別のgoroutineがGetで得るようなことが起こらないように保証しています。

Shutdownを呼び出すとAddでアイテムを追加できなくなります。Getは、アイテムがキューにある限り、それを返します。キューが空になると、Getは(nil, true)を返しますので全て処理したことがわかるようになっています。

GetとDoneのセマンティクスから、Kubernetesのコントローラでは以下のような実装でキューを使っています:

q := workqueue.New()

...

work := func() bool {
    k, quit := q.Get()
    if quit {
        // ShutDown開始してから、qが空になったら
        return true
    }
    defer q.Done(k)
    // k について何かする
    doSomething(k)
    return false
}

for {
    if quit := work(); quit {
        break
    }
}

workqueue.Interfaceの派生タイプ

workqueue.Typeはもっとも基本的なキューでした。このキューをベースにいくつか派生があります。おそらく実際に使うのは派生タイプの方が多いだろうと思います。いくつか紹介します。

workqueue.DelayingInterface

workqueue.DelayingInterfaceはキューへ追加するまでの待ち時間を指定することができるインターフェースを追加したキューです。

type DelayingInterface interface {
    Interface
    // AddAfter adds an item to the workqueue after the indicated duration has passed
    AddAfter(item interface{}, duration time.Duration)
}

AddAfterの呼び出しはすぐに返りますが、実際にitemがキューに追加されるのは、durationで指定した時間の後になります。実行を遅延させたいときに使うと便利なキューです。

workqueue.RateLimitingInterface

もう一つは、workqueue.RateLimitingInterfaceです。これは同じアイテムをキューに追加するタイミングを回数に比例して遅延させるキューです。
Kubernetesは非同期な分散システムなので、エラーが頻繁に起こります。そのような場合、再試行を行うことになりますが、遅延なしに再試行するとビジーループのような状況が起こる可能性があります。そのような場合に、このキューを使って追加を遅延させるとビジーループを避けることができます。

type RateLimitingInterface interface {
    DelayingInterface

    // AddRateLimited adds an item to the workqueue after the rate limiter says its ok
    AddRateLimited(item interface{})

    // Forget indicates that an item is finished being retried.  Doesn't matter whether its for perm failing
    // or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
    // still have to call `Done` on the queue.
    Forget(item interface{})

    // NumRequeues returns back how many times the item was requeued
    NumRequeues(item interface{}) int
}

AddRateLimitedで追加するアイテムが遅延対象になります。最初に追加する時は遅延がありません。重要なのがForgetです。Forgetを呼ばずに繰り返しAddRateLimitedを呼ぶと、その都度遅延が延長されます。Forgetを呼ぶと、キューがアイテムのことを忘れ、次回のAddRateLimitedでは遅延なしとなります。
よってアイテムの処理が失敗した場合は、Forgetを呼ばずに、AddRateLimitedを呼んで再度キューに追加する、アイテムの処理が成功した場合は、Forgetを忘れずに呼ぶ、というような使い方になります。

まとめ

workqueueパッケージが提供するgoroutine safeなキューについて簡単に紹介しました。このような実装が必要な場合は、利用するといいでしょう。