バッチ処理でキューを処理していて、特定の送信元からの処理は優先度高くしたい!っていう要件が発生することありますよね?あります。
Goでチャネルをキューとして使い、ワーカーを別 gorutine で作ってタスクを処理するというのは、よくある実装です。そこへ優先度の高いタスクは割り込ませたい、みたいなことが起こった時に、さくっと解決するニッチなテクニックをご紹介します。
問題のプログラム
チャネルからタスクを消費するようなプログラムですが、途中で優先度の高いタスクが発生するとします。
package main
import (
"fmt"
"time"
)
type task struct {
name string
}
var queue = make(chan task)
func startConsumer() {
for {
select {
case t := <-queue:
// 100ms ごとに実行するワーカー
fmt.Printf("consumed %s\n", t.name)
time.Sleep(100 * time.Millisecond)
}
}
}
func main() {
go startConsumer()
for i := 0; i < 10; i++ {
go func(i int) {
queue <- task{fmt.Sprintf("normal task : %d", i)}
}(i)
}
// 0.3秒後に優先度の高いタスクが3つ発生した!!
time.Sleep(300 * time.Millisecond)
for i := 0; i < 3; i++ {
go func(i int) {
queue <- task{fmt.Sprintf("high priority task : %d", i)}
}(i)
}
// 全部のgorutineが待機状態になったらエラーで落ちる
select {}
}
gorutineの後処理を端折っているので、最後までキューが処理されるとデッドロックで終了します。
環境変数 GOMAXPROCS が1の場合(1.4 までのデフォルト)とそれ以上の場合で、処理順が異なりますが、だいたい以下のようになるはずです。
consumed normal task : 0
consumed normal task : 1
consumed normal task : 6
consumed normal task : 7
consumed normal task : 3
consumed normal task : 8
consumed normal task : 2
consumed normal task : 5
consumed normal task : 9
consumed normal task : 4
consumed high priority task : 0
consumed high priority task : 1
consumed high priority task : 2
fatal error: all goroutines are asleep - deadlock!
Go のチャネルは、先に送信されたものは先に受信されるようになっているため( https://golang.org/ref/mem ) 後からきたタスクは最後に処理されます。
寄り道1: Go の select は複数チャネルが処理待ちの場合はランダムで処理する
ところで、GoのSelectは、case の条件が複数あり、それぞれのチャネルに送受信できる状態のとき、ランダムにその中から1つの処理を選択します。
https://golang.org/ref/spec#Select_statements
試して見ましょう
package main
import "fmt"
var calls = [3]int{}
func test() {
chs := [3]chan struct{}{}
for i := 0; i < 3; i++ {
chs[i] = make(chan struct{}, 1)
chs[i] <- struct{}{}
}
select {
case <-chs[0]:
calls[0]++
case <-chs[1]:
calls[1]++
case <-chs[2]:
calls[2]++
}
}
func main() {
for i := 0; i < 300000; i++ {
test()
}
for i := range calls {
fmt.Printf("%d : %d times\n", i, calls[i])
}
}
だいたい以下のような結果になると思います。
0 : 100155 times
1 : 100090 times
2 : 99755 times
均等に処理されていますね。
寄り道2: select の default の挙動
select 文に default が定義されている場合、送受信できるチャネルがない場合には、即座に default の内容が実行されます。default がなければ、ブロックし続けます。
select {
case <- ch1:
...
case ch2 <- "hoge":
...
default:
// 全部がブロックされるようなときに実行される
}
3つの優先度をもつチャネル(ぽいもの)を作る
現実的なプロダクトでは、優先度なんていうものは、10段階つけようがあんまり意味がなくて、普通、優先度高、優先度低 があればことたります!(いいすぎ?)
つまり、3つのチャネルがあって、優先度高い方から順に消費するようなチャネル(のようなもの)が作れればいいわけです。
そこで、前述の 2つの select の性質を念頭に、以下のようなチャネルもどきを実装します。
type PriorityChannel struct {
Out chan interface{}
High chan interface{}
Normal chan interface{}
Low chan interface{}
stopCh chan struct{}
}
func NewPriorityChannel(closeCh <-chan struct{}) *PriorityChannel {
pc := PriorityChannel{}
pc.Out = make(chan interface{})
pc.High = make(chan interface{})
pc.Normal = make(chan interface{})
pc.Low = make(chan interface{})
pc.stopCh = make(chan struct{})
pc.start()
return &pc
}
func (pc *PriorityChannel) Close() {
close(pc.stopCh)
close(pc.High)
close(pc.Normal)
close(pc.Low)
close(pc.Out)
}
func (pc *PriorityChannel) start() {
go func() {
for {
select {
case s := <-pc.High:
pc.Out <- s
continue
case <-pc.stopCh:
return
default:
}
select {
case s := <-pc.High:
pc.Out <- s
continue
case s := <-pc.Normal:
pc.Out <- s
continue
case <-pc.stopCh:
return
default:
}
select {
case s := <-pc.High:
pc.Out <- s
case s := <-pc.Normal:
pc.Out <- s
case s := <-pc.Low:
pc.Out <- s
case <-pc.stopCh:
return
}
}
}()
}
start() の中身がミソです。
- High だけチェックして、受信できれば Out に送る
- High, Normal だけチェックして受信できれば Out に送る
- High, Normal, Low を同時に待ち受けてブロックする
という挙動になります。
頭の体操的な感じですね!早速さっきの例で使ってみましょう
// PriorityChannel の定義...
var queue = NewPriorityChannel()
func startConsumer() {
for {
select {
case t := <-queue.Out:
// 変更 1
fmt.Printf("consumed %s\n", t.(task).name)
time.Sleep(100 * time.Millisecond)
}
}
}
func main() {
for i := 0; i < 10; i++ {
go func(i int) {
// 変更 2
queue.Normal <- task{fmt.Sprintf("normal task : %d", i)}
}(i)
}
// 0.3秒後に優先度の高いタスクが3つ発生した!!
time.Sleep(300 * time.Millisecond)
for i := 0; i < 3; i++ {
go func(i int) {
// 変更 3
queue.High <- task{fmt.Sprintf("high priority task : %d", i)}
}(i)
}
go startConsumer()
// 全部のgorutineが待機状態になったら落ちる
select {}
}
以下のような出力になると思います。
consumed normal task : 0
consumed normal task : 5
consumed high priority task : 0
consumed high priority task : 2
consumed high priority task : 1
consumed normal task : 1
consumed normal task : 2
consumed normal task : 8
consumed normal task : 3
consumed normal task : 4
consumed normal task : 9
consumed normal task : 7
consumed normal task : 6
fatal error: all goroutines are asleep - deadlock!
だいたい0.3秒後の優先度高にしたいタスクが優先的に処理されていますね!
補足: 実質 buffer = 1 のチャネルになる
一回中身で受信して変数に入れたものを Out に送っているため、バッファが1のチャネルと似た挙動をしますので気をつけましょう。
まとめ
既存のチャネルを使ったコードに、少し手を加えるだけで、優先度付きのチャネルを作る例を紹介しました。
コード的にちょっとおもしろく、十分実用的です。実際 wacul のプロダクトでもこれに似たコードで実稼働しています。