Help us understand the problem. What is going on with this article?

Go でチャネルに手軽に優先度を付けたいとき

More than 3 years have passed since last update.

バッチ処理でキューを処理していて、特定の送信元からの処理は優先度高くしたい!っていう要件が発生することありますよね?あります。

Goでチャネルをキューとして使い、ワーカーを別 gorutine で作ってタスクを処理するというのは、よくある実装です。そこへ優先度の高いタスクは割り込ませたい、みたいなことが起こった時に、さくっと解決するニッチなテクニックをご紹介します。

問題のプログラム

チャネルからタスクを消費するようなプログラムですが、途中で優先度の高いタスクが発生するとします。

package main

import (
    "fmt"
    "time"
)

type task struct {
    name string
}

var queue = make(chan task)

func startConsumer() {
    for {
        select {
        case t := <-queue:
            // 100ms ごとに実行するワーカー
            fmt.Printf("consumed %s\n", t.name)
            time.Sleep(100 * time.Millisecond)
        }
    }
}

func main() {
    go startConsumer()

    for i := 0; i < 10; i++ {
        go func(i int) {
            queue <- task{fmt.Sprintf("normal task : %d", i)}
        }(i)
    }

    // 0.3秒後に優先度の高いタスクが3つ発生した!!
    time.Sleep(300 * time.Millisecond)
    for i := 0; i < 3; i++ {
        go func(i int) {
            queue <- task{fmt.Sprintf("high priority task : %d", i)}
        }(i)
    }

    // 全部のgorutineが待機状態になったらエラーで落ちる
    select {}
}

Go Playground で実行

gorutineの後処理を端折っているので、最後までキューが処理されるとデッドロックで終了します。
環境変数 GOMAXPROCS が1の場合(1.4 までのデフォルト)とそれ以上の場合で、処理順が異なりますが、だいたい以下のようになるはずです。

consumed normal task : 0
consumed normal task : 1
consumed normal task : 6
consumed normal task : 7
consumed normal task : 3
consumed normal task : 8
consumed normal task : 2
consumed normal task : 5
consumed normal task : 9
consumed normal task : 4
consumed high priority task : 0
consumed high priority task : 1
consumed high priority task : 2
fatal error: all goroutines are asleep - deadlock!

Go のチャネルは、先に送信されたものは先に受信されるようになっているため( https://golang.org/ref/mem ) 後からきたタスクは最後に処理されます。

寄り道1: Go の select は複数チャネルが処理待ちの場合はランダムで処理する

ところで、GoのSelectは、case の条件が複数あり、それぞれのチャネルに送受信できる状態のとき、ランダムにその中から1つの処理を選択します。
https://golang.org/ref/spec#Select_statements

試して見ましょう

package main

import "fmt"

var calls = [3]int{}

func test() {
    chs := [3]chan struct{}{}
    for i := 0; i < 3; i++ {
        chs[i] = make(chan struct{}, 1)
        chs[i] <- struct{}{}
    }
    select {
    case <-chs[0]:
        calls[0]++
    case <-chs[1]:
        calls[1]++
    case <-chs[2]:
        calls[2]++
    }
}

func main() {
    for i := 0; i < 300000; i++ {
        test()
    }
    for i := range calls {
        fmt.Printf("%d : %d times\n", i, calls[i])
    }
}

Go Playground で実行

だいたい以下のような結果になると思います。

0 : 100155 times
1 : 100090 times
2 : 99755 times

均等に処理されていますね。

寄り道2: select の default の挙動

select 文に default が定義されている場合、送受信できるチャネルがない場合には、即座に default の内容が実行されます。default がなければ、ブロックし続けます。

select {
   case <- ch1:
     ...
   case ch2 <- "hoge":
     ...
   default:
     // 全部がブロックされるようなときに実行される
}

3つの優先度をもつチャネル(ぽいもの)を作る

現実的なプロダクトでは、優先度なんていうものは、10段階つけようがあんまり意味がなくて、普通、優先度高、優先度低 があればことたります!(いいすぎ?)
つまり、3つのチャネルがあって、優先度高い方から順に消費するようなチャネル(のようなもの)が作れればいいわけです。

そこで、前述の 2つの select の性質を念頭に、以下のようなチャネルもどきを実装します。

type PriorityChannel struct {
    Out    chan interface{}
    High   chan interface{}
    Normal chan interface{}
    Low    chan interface{}
    stopCh chan struct{}
}

func NewPriorityChannel(closeCh <-chan struct{}) *PriorityChannel {
    pc := PriorityChannel{}
    pc.Out = make(chan interface{})
    pc.High = make(chan interface{})
    pc.Normal = make(chan interface{})
    pc.Low = make(chan interface{})
    pc.stopCh = make(chan struct{})

    pc.start()
    return &pc
}

func (pc *PriorityChannel) Close() {
    close(pc.stopCh)
    close(pc.High)
    close(pc.Normal)
    close(pc.Low)
    close(pc.Out)
}

func (pc *PriorityChannel) start() {
    go func() {
        for {
            select {
            case s := <-pc.High:
                pc.Out <- s
                continue
            case <-pc.stopCh:
                return
            default:
            }

            select {
            case s := <-pc.High:
                pc.Out <- s
                continue
            case s := <-pc.Normal:
                pc.Out <- s
                continue
            case <-pc.stopCh:
                return
            default:
            }

            select {
            case s := <-pc.High:
                pc.Out <- s
            case s := <-pc.Normal:
                pc.Out <- s
            case s := <-pc.Low:
                pc.Out <- s
            case <-pc.stopCh:
                return
            }
        }
    }()
}

start() の中身がミソです。

  1. High だけチェックして、受信できれば Out に送る
  2. High, Normal だけチェックして受信できれば Out に送る
  3. High, Normal, Low を同時に待ち受けてブロックする

という挙動になります。
頭の体操的な感じですね!早速さっきの例で使ってみましょう

// PriorityChannel の定義...

var queue = NewPriorityChannel()

func startConsumer() {
    for {
        select {
        case t := <-queue.Out:
                        // 変更 1
            fmt.Printf("consumed %s\n", t.(task).name)   
            time.Sleep(100 * time.Millisecond)
        }
    }
}

func main() {
    for i := 0; i < 10; i++ {
        go func(i int) {
                        // 変更 2
            queue.Normal <- task{fmt.Sprintf("normal task : %d", i)}
        }(i)
    }

    // 0.3秒後に優先度の高いタスクが3つ発生した!!
    time.Sleep(300 * time.Millisecond)
    for i := 0; i < 3; i++ {
        go func(i int) {
                        // 変更 3
            queue.High <- task{fmt.Sprintf("high priority task : %d", i)}
        }(i)
    }

    go startConsumer()

    // 全部のgorutineが待機状態になったら落ちる
    select {}
}

Go Playgroundで実行

以下のような出力になると思います。

consumed normal task : 0
consumed normal task : 5
consumed high priority task : 0
consumed high priority task : 2
consumed high priority task : 1
consumed normal task : 1
consumed normal task : 2
consumed normal task : 8
consumed normal task : 3
consumed normal task : 4
consumed normal task : 9
consumed normal task : 7
consumed normal task : 6
fatal error: all goroutines are asleep - deadlock!

だいたい0.3秒後の優先度高にしたいタスクが優先的に処理されていますね!

補足: 実質 buffer = 1 のチャネルになる

一回中身で受信して変数に入れたものを Out に送っているため、バッファが1のチャネルと似た挙動をしますので気をつけましょう。

まとめ

既存のチャネルを使ったコードに、少し手を加えるだけで、優先度付きのチャネルを作る例を紹介しました。
コード的にちょっとおもしろく、十分実用的です。実際 wacul のプロダクトでもこれに似たコードで実稼働しています。

Why do not you register as a user and use Qiita more conveniently?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away