19
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

workqueue

Last updated at Posted at 2017-12-19

client-goのworkqueueパッケージはgoroutine safeなキューの実装を提供しています。非同期な処理をgoroutineで並列化するときに非常に便利です。

workqueue.Interface

基本的なキューのインターフェースは:

type Interface interface {
    Add(item interface{})
    Len() int
    Get() (item interface{}, shutdown bool)
    Done(item interface{})
    ShutDown()
    ShuttingDown() bool
}

で、それを実装するもっとも簡単なキューはworkqueue.Typeで、workqueue.New()で作成します。

Addでアイテムをキューに追加します。Kubernetesのコントローラを実装する場合は、リソースの名前空間/名前 (e.g., default/foo)のStringを追加することが多いです。

Getでキューからアイテムを取り出します。Getは完全にアイテムをキューから取り除くことはしません。完全に取り除くには、Doneを呼び出します。なぜこのような実装になっているかを説明しましょう。あるgoroutine AでGetでアイテムXを取り出したとします。AがXについて処理をしている間に、アイテムXが再びキューに追加されると、複数のgoroutineで並列化を行なっている場合、別のgoroutine BがGetでアイテムXを取り出して処理を開始する可能性があります。workqueueはこのような状況を回避するために、Doneを呼ばない限り、同じアイテムを別のgoroutineがGetで得るようなことが起こらないように保証しています。

Shutdownを呼び出すとAddでアイテムを追加できなくなります。Getは、アイテムがキューにある限り、それを返します。キューが空になると、Getは(nil, true)を返しますので全て処理したことがわかるようになっています。

GetとDoneのセマンティクスから、Kubernetesのコントローラでは以下のような実装でキューを使っています:

q := workqueue.New()

...

work := func() bool {
    k, quit := q.Get()
    if quit {
        // ShutDown開始してから、qが空になったら
        return true
    }
    defer q.Done(k)
    // k について何かする
    doSomething(k)
    return false
}

for {
    if quit := work(); quit {
        break
    }
}

workqueue.Interfaceの派生タイプ

workqueue.Typeはもっとも基本的なキューでした。このキューをベースにいくつか派生があります。おそらく実際に使うのは派生タイプの方が多いだろうと思います。いくつか紹介します。

workqueue.DelayingInterface

workqueue.DelayingInterfaceはキューへ追加するまでの待ち時間を指定することができるインターフェースを追加したキューです。

type DelayingInterface interface {
    Interface
    // AddAfter adds an item to the workqueue after the indicated duration has passed
    AddAfter(item interface{}, duration time.Duration)
}

AddAfterの呼び出しはすぐに返りますが、実際にitemがキューに追加されるのは、durationで指定した時間の後になります。実行を遅延させたいときに使うと便利なキューです。

workqueue.RateLimitingInterface

もう一つは、workqueue.RateLimitingInterfaceです。これは同じアイテムをキューに追加するタイミングを回数に比例して遅延させるキューです。
Kubernetesは非同期な分散システムなので、エラーが頻繁に起こります。そのような場合、再試行を行うことになりますが、遅延なしに再試行するとビジーループのような状況が起こる可能性があります。そのような場合に、このキューを使って追加を遅延させるとビジーループを避けることができます。

type RateLimitingInterface interface {
    DelayingInterface

    // AddRateLimited adds an item to the workqueue after the rate limiter says its ok
    AddRateLimited(item interface{})

    // Forget indicates that an item is finished being retried.  Doesn't matter whether its for perm failing
    // or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
    // still have to call `Done` on the queue.
    Forget(item interface{})

    // NumRequeues returns back how many times the item was requeued
    NumRequeues(item interface{}) int
}

AddRateLimitedで追加するアイテムが遅延対象になります。最初に追加する時は遅延がありません。重要なのがForgetです。Forgetを呼ばずに繰り返しAddRateLimitedを呼ぶと、その都度遅延が延長されます。Forgetを呼ぶと、キューがアイテムのことを忘れ、次回のAddRateLimitedでは遅延なしとなります。
よってアイテムの処理が失敗した場合は、Forgetを呼ばずに、AddRateLimitedを呼んで再度キューに追加する、アイテムの処理が成功した場合は、Forgetを忘れずに呼ぶ、というような使い方になります。

まとめ

workqueueパッケージが提供するgoroutine safeなキューについて簡単に紹介しました。このような実装が必要な場合は、利用するといいでしょう。

19
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
19
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?