Edited at
Z LabDay 20

workqueue

More than 1 year has passed since last update.

client-goのworkqueueパッケージはgoroutine safeなキューの実装を提供しています。非同期な処理をgoroutineで並列化するときに非常に便利です。


workqueue.Interface

基本的なキューのインターフェースは:

type Interface interface {

Add(item interface{})
Len() int
Get() (item interface{}, shutdown bool)
Done(item interface{})
ShutDown()
ShuttingDown() bool
}

で、それを実装するもっとも簡単なキューはworkqueue.Typeで、workqueue.New()で作成します。

Addでアイテムをキューに追加します。Kubernetesのコントローラを実装する場合は、リソースの名前空間/名前 (e.g., default/foo)のStringを追加することが多いです。

Getでキューからアイテムを取り出します。Getは完全にアイテムをキューから取り除くことはしません。完全に取り除くには、Doneを呼び出します。なぜこのような実装になっているかを説明しましょう。あるgoroutine AでGetでアイテムXを取り出したとします。AがXについて処理をしている間に、アイテムXが再びキューに追加されると、複数のgoroutineで並列化を行なっている場合、別のgoroutine BがGetでアイテムXを取り出して処理を開始する可能性があります。workqueueはこのような状況を回避するために、Doneを呼ばない限り、同じアイテムを別のgoroutineがGetで得るようなことが起こらないように保証しています。

Shutdownを呼び出すとAddでアイテムを追加できなくなります。Getは、アイテムがキューにある限り、それを返します。キューが空になると、Getは(nil, true)を返しますので全て処理したことがわかるようになっています。

GetとDoneのセマンティクスから、Kubernetesのコントローラでは以下のような実装でキューを使っています:

q := workqueue.New()

...

work := func() bool {
k, quit := q.Get()
if quit {
// ShutDown開始してから、qが空になったら
return true
}
defer q.Done(k)
// k について何かする
doSomething(k)
return false
}

for {
if quit := work(); quit {
break
}
}


workqueue.Interfaceの派生タイプ

workqueue.Typeはもっとも基本的なキューでした。このキューをベースにいくつか派生があります。おそらく実際に使うのは派生タイプの方が多いだろうと思います。いくつか紹介します。


workqueue.DelayingInterface

workqueue.DelayingInterfaceはキューへ追加するまでの待ち時間を指定することができるインターフェースを追加したキューです。

type DelayingInterface interface {

Interface
// AddAfter adds an item to the workqueue after the indicated duration has passed
AddAfter(item interface{}, duration time.Duration)
}

AddAfterの呼び出しはすぐに返りますが、実際にitemがキューに追加されるのは、durationで指定した時間の後になります。実行を遅延させたいときに使うと便利なキューです。


workqueue.RateLimitingInterface

もう一つは、workqueue.RateLimitingInterfaceです。これは同じアイテムをキューに追加するタイミングを回数に比例して遅延させるキューです。

Kubernetesは非同期な分散システムなので、エラーが頻繁に起こります。そのような場合、再試行を行うことになりますが、遅延なしに再試行するとビジーループのような状況が起こる可能性があります。そのような場合に、このキューを使って追加を遅延させるとビジーループを避けることができます。

type RateLimitingInterface interface {

DelayingInterface

// AddRateLimited adds an item to the workqueue after the rate limiter says its ok
AddRateLimited(item interface{})

// Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.
Forget(item interface{})

// NumRequeues returns back how many times the item was requeued
NumRequeues(item interface{}) int
}

AddRateLimitedで追加するアイテムが遅延対象になります。最初に追加する時は遅延がありません。重要なのがForgetです。Forgetを呼ばずに繰り返しAddRateLimitedを呼ぶと、その都度遅延が延長されます。Forgetを呼ぶと、キューがアイテムのことを忘れ、次回のAddRateLimitedでは遅延なしとなります。

よってアイテムの処理が失敗した場合は、Forgetを呼ばずに、AddRateLimitedを呼んで再度キューに追加する、アイテムの処理が成功した場合は、Forgetを忘れずに呼ぶ、というような使い方になります。


まとめ

workqueueパッケージが提供するgoroutine safeなキューについて簡単に紹介しました。このような実装が必要な場合は、利用するといいでしょう。