10
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

WACULAdvent Calendar 2015

Day 11

Go でチャネルに手軽に優先度を付けたいとき

Last updated at Posted at 2015-12-10

バッチ処理でキューを処理していて、特定の送信元からの処理は優先度高くしたい!っていう要件が発生することありますよね?あります。

Goでチャネルをキューとして使い、ワーカーを別 gorutine で作ってタスクを処理するというのは、よくある実装です。そこへ優先度の高いタスクは割り込ませたい、みたいなことが起こった時に、さくっと解決するニッチなテクニックをご紹介します。

問題のプログラム

チャネルからタスクを消費するようなプログラムですが、途中で優先度の高いタスクが発生するとします。

package main

import (
	"fmt"
	"time"
)

type task struct {
	name string
}

var queue = make(chan task)

func startConsumer() {
	for {
		select {
		case t := <-queue:
			// 100ms ごとに実行するワーカー
			fmt.Printf("consumed %s\n", t.name)
			time.Sleep(100 * time.Millisecond)
		}
	}
}

func main() {
	go startConsumer()

	for i := 0; i < 10; i++ {
		go func(i int) {
			queue <- task{fmt.Sprintf("normal task : %d", i)}
		}(i)
	}

	// 0.3秒後に優先度の高いタスクが3つ発生した!!
	time.Sleep(300 * time.Millisecond)
	for i := 0; i < 3; i++ {
		go func(i int) {
			queue <- task{fmt.Sprintf("high priority task : %d", i)}
		}(i)
	}

	// 全部のgorutineが待機状態になったらエラーで落ちる
	select {}
}

Go Playground で実行

gorutineの後処理を端折っているので、最後までキューが処理されるとデッドロックで終了します。
環境変数 GOMAXPROCS が1の場合(1.4 までのデフォルト)とそれ以上の場合で、処理順が異なりますが、だいたい以下のようになるはずです。

consumed normal task : 0
consumed normal task : 1
consumed normal task : 6
consumed normal task : 7
consumed normal task : 3
consumed normal task : 8
consumed normal task : 2
consumed normal task : 5
consumed normal task : 9
consumed normal task : 4
consumed high priority task : 0
consumed high priority task : 1
consumed high priority task : 2
fatal error: all goroutines are asleep - deadlock!

Go のチャネルは、先に送信されたものは先に受信されるようになっているため( https://golang.org/ref/mem ) 後からきたタスクは最後に処理されます。

寄り道1: Go の select は複数チャネルが処理待ちの場合はランダムで処理する

ところで、GoのSelectは、case の条件が複数あり、それぞれのチャネルに送受信できる状態のとき、ランダムにその中から1つの処理を選択します。
https://golang.org/ref/spec#Select_statements

試して見ましょう

package main

import "fmt"

var calls = [3]int{}

func test() {
	chs := [3]chan struct{}{}
	for i := 0; i < 3; i++ {
		chs[i] = make(chan struct{}, 1)
		chs[i] <- struct{}{}
	}
	select {
	case <-chs[0]:
		calls[0]++
	case <-chs[1]:
		calls[1]++
	case <-chs[2]:
		calls[2]++
	}
}

func main() {
	for i := 0; i < 300000; i++ {
		test()
	}
	for i := range calls {
		fmt.Printf("%d : %d times\n", i, calls[i])
	}
}

Go Playground で実行

だいたい以下のような結果になると思います。

0 : 100155 times
1 : 100090 times
2 : 99755 times

均等に処理されていますね。

寄り道2: select の default の挙動

select 文に default が定義されている場合、送受信できるチャネルがない場合には、即座に default の内容が実行されます。default がなければ、ブロックし続けます。

select {
   case <- ch1:
     ...
   case ch2 <- "hoge":
     ...
   default:
     // 全部がブロックされるようなときに実行される
}

3つの優先度をもつチャネル(ぽいもの)を作る

現実的なプロダクトでは、優先度なんていうものは、10段階つけようがあんまり意味がなくて、普通、優先度高、優先度低 があればことたります!(いいすぎ?)
つまり、3つのチャネルがあって、優先度高い方から順に消費するようなチャネル(のようなもの)が作れればいいわけです。

そこで、前述の 2つの select の性質を念頭に、以下のようなチャネルもどきを実装します。

type PriorityChannel struct {
	Out    chan interface{}
	High   chan interface{}
	Normal chan interface{}
	Low    chan interface{}
	stopCh chan struct{}
}

func NewPriorityChannel(closeCh <-chan struct{}) *PriorityChannel {
	pc := PriorityChannel{}
	pc.Out = make(chan interface{})
	pc.High = make(chan interface{})
	pc.Normal = make(chan interface{})
	pc.Low = make(chan interface{})
	pc.stopCh = make(chan struct{})

	pc.start()
	return &pc
}

func (pc *PriorityChannel) Close() {
	close(pc.stopCh)
	close(pc.High)
	close(pc.Normal)
	close(pc.Low)
	close(pc.Out)
}

func (pc *PriorityChannel) start() {
	go func() {
		for {
			select {
			case s := <-pc.High:
				pc.Out <- s
				continue
			case <-pc.stopCh:
				return
			default:
			}

			select {
			case s := <-pc.High:
				pc.Out <- s
				continue
			case s := <-pc.Normal:
				pc.Out <- s
				continue
			case <-pc.stopCh:
				return
			default:
			}

			select {
			case s := <-pc.High:
				pc.Out <- s
			case s := <-pc.Normal:
				pc.Out <- s
			case s := <-pc.Low:
				pc.Out <- s
			case <-pc.stopCh:
				return
			}
		}
	}()
}

start() の中身がミソです。

  1. High だけチェックして、受信できれば Out に送る
  2. High, Normal だけチェックして受信できれば Out に送る
  3. High, Normal, Low を同時に待ち受けてブロックする

という挙動になります。
頭の体操的な感じですね!早速さっきの例で使ってみましょう

// PriorityChannel の定義...

var queue = NewPriorityChannel()

func startConsumer() {
	for {
		select {
		case t := <-queue.Out:
                        // 変更 1
			fmt.Printf("consumed %s\n", t.(task).name)   
			time.Sleep(100 * time.Millisecond)
		}
	}
}

func main() {
	for i := 0; i < 10; i++ {
		go func(i int) {
                        // 変更 2
			queue.Normal <- task{fmt.Sprintf("normal task : %d", i)}
		}(i)
	}

	// 0.3秒後に優先度の高いタスクが3つ発生した!!
	time.Sleep(300 * time.Millisecond)
	for i := 0; i < 3; i++ {
		go func(i int) {
                        // 変更 3
			queue.High <- task{fmt.Sprintf("high priority task : %d", i)}
		}(i)
	}

	go startConsumer()

	// 全部のgorutineが待機状態になったら落ちる
	select {}
}

Go Playgroundで実行

以下のような出力になると思います。

consumed normal task : 0
consumed normal task : 5
consumed high priority task : 0
consumed high priority task : 2
consumed high priority task : 1
consumed normal task : 1
consumed normal task : 2
consumed normal task : 8
consumed normal task : 3
consumed normal task : 4
consumed normal task : 9
consumed normal task : 7
consumed normal task : 6
fatal error: all goroutines are asleep - deadlock!

だいたい0.3秒後の優先度高にしたいタスクが優先的に処理されていますね!

補足: 実質 buffer = 1 のチャネルになる

一回中身で受信して変数に入れたものを Out に送っているため、バッファが1のチャネルと似た挙動をしますので気をつけましょう。

まとめ

既存のチャネルを使ったコードに、少し手を加えるだけで、優先度付きのチャネルを作る例を紹介しました。
コード的にちょっとおもしろく、十分実用的です。実際 wacul のプロダクトでもこれに似たコードで実稼働しています。

10
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?