はじめに
この記事はGo Advent Calendar 2016 の17日目の記事です。
並行処理をする時にはロックが重要になります。
GoではchannelとMutexという仕組みを提供しています。
この記事では色々なロックを作りたいと思います。
tl;dr
channelでロックを作ることは有意義です。
ただ、Mutexのほうが良いこともあります。
良い方法を選びましょう
ロック無し
ロックの意味を確かめるためにも、まずはロック無しのコードから始めます。
func main() {
counter := 0
for i := 0; i < 1000; i++ {
go func() {
counter++
fmt.Print("*")
}()
}
time.Sleep(3 * time.Second)
fmt.Printf("\n%d\n", counter)
}
この関数を実行すると、*
の数は1000個出力されますが、counter
の値が1000以下になることがあることが確認できます。
ロックしたいですね
Mutexでロックする
ロックといえば、Mutexですね。これを使えば簡単にロックできます。
func main() {
var mu sync.Mutex
counter := 0
for i := 0; i < 1000; i++ {
go func() {
mu.Lock()
defer mu.Unlock()
counter++
}()
}
time.Sleep(3 * time.Second)
fmt.Println(counter)
}
Mutexはロックされたら、開放されるまで他の処理にロックを渡さないので、count++
が順番に行われます。そのため、何回実行してもcounter
の値が1000になります。
WaitGroupによる待機
ロックはできましたが、sleepがあると何秒か待つ必要があり面倒なので、sync.WaitGroupを使って待ち時間を少なくします。
func main() {
var mu sync.Mutex
wg := sync.WaitGroup{}
counter := 0
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
mu.Lock()
defer mu.Unlock()
counter++
wg.Done()
}()
}
wg.Wait()
fmt.Println(counter)
}
WaitGroupは「Add
した回数Done
されるまで待ち続ける」という命令を書けるようにした仕組みです。
WaitGroupのおかげで1000回Done
されたらすぐにPrintln
が実行できるようになり、時間を直打ちする汚さから開放されました。
WaitGroupはどうやって実装されているか
さて、WaitGroupを使うとDoneされた回数を数えることができますが、なぜきちんと動作するのでしょうか。
sync/waitgroup.goを見ると、atomicを使用しています。
コードの中では、WaitGroup#Add
の時に値を追加し、WaitGroup#Done
で1減らし、WaitGroup#Wait
により、値が0になるまで待機します。
先ほどのコードに直接書くと、以下のようになります。
func main() {
var mu sync.Mutex
var waitCounter int64 = 0
counter := 0
for i := 0; i < 1000; i++ {
atomic.AddInt64(&waitCounter, 1)
go func() {
mu.Lock()
defer mu.Unlock()
counter++
atomic.AddInt64(&waitCounter, -1)
}()
}
for {
i := atomic.LoadInt64(&waitCounter)
if i == 0 {
break
}
}
fmt.Println(counter)
}
簡単ですね。
更にatomicはなぜ動くのかを見たいところですが、Githubにあるとおりアセンブラです。
僕はアセンブラが説明できないので、これ以上は興味があれば読んでみてください。
以降でatomicを使用してもいいのですが、atomicを使用すると長くなるので、以降はWaitGroupを使用します。
channelによるロック
さて、Mutexによって並行処理は問題なく行えるようになりましたが、
「Share memory by communicating, don't communicate by sharing memory」
というモットーがあるように、メモリ空間を共有すると依存関係が複雑になるので、各処理には依存関係が少なくしたいところです。
channelを使ってロックします。
func main() {
wg := sync.WaitGroup{}
ch := make(chan int)
counter := 0
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
ch <- 1
}()
}
go func() {
for {
i := <-ch
counter += i
wg.Done()
}
}()
wg.Wait()
fmt.Println(counter)
}
channelは次のchannel受信(i := <-ch
)まで待機するのでcounter
は順番に加算されます。
複数の命令を順次実行する
channelを使用してロックしましたが、複数の命令を同時に実行しないようにすることが出来ると便利ですね。
channelを外出して、実装します。
func main() {
dc := NewDoubleCounter()
for i := 0; i < 1000; i++ {
go func(t int) {
dc.Add(t)
}(i % 2)
}
dc.Wait()
dc.Print()
}
type DoubleCounter struct {
valueA int
valueB int
commandCh chan int
wg sync.WaitGroup
}
func NewDoubleCounter() *DoubleCounter {
dc := &DoubleCounter{
valueA: 0,
valueB: 0,
commandCh: make(chan int),
}
dc.wg.Add(1000)
go dc.start()
return dc
}
func (dc *DoubleCounter) start() {
for {
target := <- dc.commandCh
if target == 0 {
dc.valueA++
} else {
dc.valueB++
}
dc.wg.Done()
}
}
func (dc *DoubleCounter) Add(t int) {
dc.commandCh <- t
}
func (dc *DoubleCounter) Wait() {
dc.wg.Wait()
}
func (dc *DoubleCounter) Print() {
fmt.Printf("A: %d, B: %d\n", dc.valueA, dc.valueB)
}
この処理ではDoubleCounter内に1つのchannelをもたせ、受信し続けています。
Add
に渡す内容で処理を分岐することでAddを通して呼び出されるすべての処理は同時に一つ実行されます。
この実装ではcommandChはintを取っていますが、処理内容を保持する構造体を渡すことで、色々な処理が順次実行出来るようになります。
(WaitGroupで終了を待っていますがこれはサンプルの都合で、channelを他に用意して待機することもできます)
複数ロックを一つにまとめる
先程は複数の処理を一つづつ処理しましたが、それぞれをロックしたいこともありますね。
ひとつの構造体の中でchannelを複数持てばできます。
func main() {
ml := NewMultiLocker()
for i := 0; i < 1000; i++ {
go func() {
ml.AddA()
}()
go func() {
ml.AddB()
}()
}
ml.Wait()
ml.Print()
}
type MultiCounter struct {
valueA int
valueB int
chA chan int
chB chan int
wg sync.WaitGroup
}
func NewMultiLocker() *MultiCounter {
mc := &MultiCounter{
valueA: 0,
valueB: 0,
chA: make(chan int),
chB: make(chan int),
}
mc.wg.Add(2000)
mc.start()
return mc
}
func (mc *MultiCounter) start() {
go func() {
for {
<- mc.chA
mc.valueA++
mc.wg.Done()
}
}()
go func() {
for {
<-mc.chB
mc.valueB++
mc.wg.Done()
}
}()
}
func (mc *MultiCounter) AddA() {
mc.chA <- 1
}
func (mc *MultiCounter) AddB() {
mc.chB <- 1
}
func (mc *MultiCounter) Wait() {
mc.wg.Wait()
}
func (mc *MultiCounter) Print() {
fmt.Printf("A: %d, B: %d\n", mc.valueA, mc.valueB)
}
それぞれのcounterが1000になっていることが確認できます。
Mutexで書く
これまでchannelを書いてきましたが、長いコードになってきました。
実際に使うコードで複数channelを使うようにすると、結構長くなり見通しが悪くなります。
そこで最後に、Mutexを使用して見やすくします。
func main() {
mc := NewMutexCounter()
for i := 0; i < 1000; i++ {
go func() {
mc.AddA()
}()
go func() {
mc.AddB()
}()
}
mc.Wait()
mc.Print()
}
type MutexCounter struct {
valueA int
valueB int
muA sync.Mutex
muB sync.Mutex
wg sync.WaitGroup
}
func NewMutexCounter() *MutexCounter {
mc := &MutexCounter{
valueA: 0,
valueB: 0,
}
mc.wg.Add(2000)
return mc
}
func (mc *MutexCounter) AddA() {
mc.muA.Lock()
defer mc.muA.Unlock()
mc.valueA++
mc.wg.Done()
}
func (mc *MutexCounter) AddB() {
mc.muB.Lock()
defer mc.muB.Unlock()
mc.valueB++
mc.wg.Done()
}
func (mc *MutexCounter) Wait() {
mc.wg.Wait()
}
func (ml *MutexCounter) Print() {
fmt.Printf("A: %d, B: %d\n", ml.valueA, ml.valueB)
}
チャネル部分のコードが減ってすっきりしました。
このように、channelよりもMutexを使用するほうが見た目が良いこともあるので、こういう状況になった場合channelである必要がなければMutexを使用しています。
終わりに
Goでのロックする書き方をchannelとMutexを使用して色々と書きました。
Goには「Share memory by communicating, don't communicate by sharing memory.」という考え方がありchannelを推奨していますが
golang/go のwiki(MutexOrChannel)にも
A common Go newbie mistake is to over-use channels and goroutines just because it's possible, and/or because it's fun. Don't be afraid to use a sync.Mutex if that fits your problem best.
とあるように、Mutexを使ったほうが楽なこともあるのでその時はMutexを使いましょう。