client-goのworkqueueパッケージはgoroutine safeなキューの実装を提供しています。非同期な処理をgoroutineで並列化するときに非常に便利です。
workqueue.Interface
基本的なキューのインターフェースは:
type Interface interface {
Add(item interface{})
Len() int
Get() (item interface{}, shutdown bool)
Done(item interface{})
ShutDown()
ShuttingDown() bool
}
で、それを実装するもっとも簡単なキューはworkqueue.Typeで、workqueue.New()
で作成します。
Add
でアイテムをキューに追加します。Kubernetesのコントローラを実装する場合は、リソースの名前空間/名前
(e.g., default/foo
)のStringを追加することが多いです。
Get
でキューからアイテムを取り出します。Get
は完全にアイテムをキューから取り除くことはしません。完全に取り除くには、Done
を呼び出します。なぜこのような実装になっているかを説明しましょう。あるgoroutine AでGet
でアイテムXを取り出したとします。AがXについて処理をしている間に、アイテムXが再びキューに追加されると、複数のgoroutineで並列化を行なっている場合、別のgoroutine BがGet
でアイテムXを取り出して処理を開始する可能性があります。workqueueはこのような状況を回避するために、Done
を呼ばない限り、同じアイテムを別のgoroutineがGet
で得るようなことが起こらないように保証しています。
Shutdown
を呼び出すとAdd
でアイテムを追加できなくなります。Get
は、アイテムがキューにある限り、それを返します。キューが空になると、Get
は(nil, true)を返しますので全て処理したことがわかるようになっています。
GetとDoneのセマンティクスから、Kubernetesのコントローラでは以下のような実装でキューを使っています:
q := workqueue.New()
...
work := func() bool {
k, quit := q.Get()
if quit {
// ShutDown開始してから、qが空になったら
return true
}
defer q.Done(k)
// k について何かする
doSomething(k)
return false
}
for {
if quit := work(); quit {
break
}
}
workqueue.Interfaceの派生タイプ
workqueue.Type
はもっとも基本的なキューでした。このキューをベースにいくつか派生があります。おそらく実際に使うのは派生タイプの方が多いだろうと思います。いくつか紹介します。
workqueue.DelayingInterface
workqueue.DelayingInterfaceはキューへ追加するまでの待ち時間を指定することができるインターフェースを追加したキューです。
type DelayingInterface interface {
Interface
// AddAfter adds an item to the workqueue after the indicated duration has passed
AddAfter(item interface{}, duration time.Duration)
}
AddAfter
の呼び出しはすぐに返りますが、実際にitemがキューに追加されるのは、durationで指定した時間の後になります。実行を遅延させたいときに使うと便利なキューです。
workqueue.RateLimitingInterface
もう一つは、workqueue.RateLimitingInterfaceです。これは同じアイテムをキューに追加するタイミングを回数に比例して遅延させるキューです。
Kubernetesは非同期な分散システムなので、エラーが頻繁に起こります。そのような場合、再試行を行うことになりますが、遅延なしに再試行するとビジーループのような状況が起こる可能性があります。そのような場合に、このキューを使って追加を遅延させるとビジーループを避けることができます。
type RateLimitingInterface interface {
DelayingInterface
// AddRateLimited adds an item to the workqueue after the rate limiter says its ok
AddRateLimited(item interface{})
// Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.
Forget(item interface{})
// NumRequeues returns back how many times the item was requeued
NumRequeues(item interface{}) int
}
AddRateLimited
で追加するアイテムが遅延対象になります。最初に追加する時は遅延がありません。重要なのがForget
です。Forget
を呼ばずに繰り返しAddRateLimited
を呼ぶと、その都度遅延が延長されます。Forget
を呼ぶと、キューがアイテムのことを忘れ、次回のAddRateLimited
では遅延なしとなります。
よってアイテムの処理が失敗した場合は、Forget
を呼ばずに、AddRateLimited
を呼んで再度キューに追加する、アイテムの処理が成功した場合は、Forget
を忘れずに呼ぶ、というような使い方になります。
まとめ
workqueueパッケージが提供するgoroutine safeなキューについて簡単に紹介しました。このような実装が必要な場合は、利用するといいでしょう。