Go でロックする

  • 39
    Like
  • 0
    Comment

はじめに

この記事はGo Advent Calendar 2016 の17日目の記事です。

並行処理をする時にはロックが重要になります。
GoではchannelとMutexという仕組みを提供しています。
この記事では色々なロックを作りたいと思います。

tl;dr

channelでロックを作ることは有意義です。
ただ、Mutexのほうが良いこともあります。
良い方法を選びましょう

ロック無し

ロックの意味を確かめるためにも、まずはロック無しのコードから始めます。

func main() {
    counter := 0
    for i := 0; i < 1000; i++ {
        go func() {
            counter++
            fmt.Print("*")
        }()
    }

    time.Sleep(3 * time.Second)
    fmt.Printf("\n%d\n", counter)
}

この関数を実行すると、*の数は1000個出力されますが、counterの値が1000以下になることがあることが確認できます。
ロックしたいですね

Mutexでロックする

ロックといえば、Mutexですね。これを使えば簡単にロックできます。

func main() {
    var mu sync.Mutex
    counter := 0
    for i := 0; i < 1000; i++ {
        go func() {
            mu.Lock()
            defer mu.Unlock()
            counter++
        }()
    }

    time.Sleep(3 * time.Second)
    fmt.Println(counter)
}

Mutexはロックされたら、開放されるまで他の処理にロックを渡さないので、count++が順番に行われます。そのため、何回実行してもcounterの値が1000になります。

WaitGroupによる待機

ロックはできましたが、sleepがあると何秒か待つ必要があり面倒なので、sync.WaitGroupを使って待ち時間を少なくします。

func main() {
    var mu sync.Mutex
    wg := sync.WaitGroup{}
    counter := 0
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            mu.Lock()
            defer mu.Unlock()
            counter++
            wg.Done()
        }()
    }

    wg.Wait()
    fmt.Println(counter)
}

WaitGroupは「Addした回数Doneされるまで待ち続ける」という命令を書けるようにした仕組みです。
WaitGroupのおかげで1000回DoneされたらすぐにPrintlnが実行できるようになり、時間を直打ちする汚さから開放されました。

WaitGroupはどうやって実装されているか

さて、WaitGroupを使うとDoneされた回数を数えることができますが、なぜきちんと動作するのでしょうか。
sync/waitgroup.goを見ると、atomicを使用しています。
コードの中では、WaitGroup#Addの時に値を追加し、WaitGroup#Doneで1減らし、WaitGroup#Waitにより、値が0になるまで待機します。
先ほどのコードに直接書くと、以下のようになります。

func main() {
    var mu sync.Mutex
    var waitCounter int64 = 0
    counter := 0
    for i := 0; i < 1000; i++ {
        atomic.AddInt64(&waitCounter, 1)
        go func() {
            mu.Lock()
            defer mu.Unlock()
            counter++
            atomic.AddInt64(&waitCounter, -1)
        }()
    }

    for {
        i := atomic.LoadInt64(&waitCounter)
        if i == 0 {
            break
        }
    }

    fmt.Println(counter)
}

簡単ですね。
更にatomicはなぜ動くのかを見たいところですが、Githubにあるとおりアセンブラです。
僕はアセンブラが説明できないので、これ以上は興味があれば読んでみてください。

以降でatomicを使用してもいいのですが、atomicを使用すると長くなるので、以降はWaitGroupを使用します。

channelによるロック

さて、Mutexによって並行処理は問題なく行えるようになりましたが、
「Share memory by communicating, don't communicate by sharing memory」
というモットーがあるように、メモリ空間を共有すると依存関係が複雑になるので、各処理には依存関係が少なくしたいところです。
channelを使ってロックします。

func main() {
    wg := sync.WaitGroup{}
    ch := make(chan int)

    counter := 0
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            ch <- 1
        }()
    }

    go func() {
        for {
            i := <-ch
            counter += i
            wg.Done()
        }
    }()

    wg.Wait()
    fmt.Println(counter)
}

channelは次のchannel受信(i := <-ch)まで待機するのでcounterは順番に加算されます。

複数の命令を順次実行する

channelを使用してロックしましたが、複数の命令を同時に実行しないようにすることが出来ると便利ですね。
channelを外出して、実装します。

func main() {
    dc := NewDoubleCounter()
    for i := 0; i < 1000; i++ {
        go func(t int) {
            dc.Add(t)
        }(i % 2)
    }

    dc.Wait()
    dc.Print()
}

type DoubleCounter struct {
    valueA int
    valueB int

    commandCh chan int
    wg sync.WaitGroup
}

func NewDoubleCounter() *DoubleCounter {
    dc := &DoubleCounter{
        valueA: 0,
        valueB: 0,
        commandCh: make(chan int),
    }
    dc.wg.Add(1000)
    go dc.start()
    return dc
}

func (dc *DoubleCounter) start() {
    for {
        target := <- dc.commandCh
        if target == 0 {
            dc.valueA++
        } else {
            dc.valueB++
        }
        dc.wg.Done()
    }
}

func (dc *DoubleCounter) Add(t int) {
    dc.commandCh <- t
}

func (dc *DoubleCounter) Wait() {
    dc.wg.Wait()
}

func (dc *DoubleCounter) Print() {
    fmt.Printf("A: %d, B: %d\n", dc.valueA, dc.valueB)
}

この処理ではDoubleCounter内に1つのchannelをもたせ、受信し続けています。
Add に渡す内容で処理を分岐することでAddを通して呼び出されるすべての処理は同時に一つ実行されます。
この実装ではcommandChはintを取っていますが、処理内容を保持する構造体を渡すことで、色々な処理が順次実行出来るようになります。
(WaitGroupで終了を待っていますがこれはサンプルの都合で、channelを他に用意して待機することもできます)

複数ロックを一つにまとめる

先程は複数の処理を一つづつ処理しましたが、それぞれをロックしたいこともありますね。
ひとつの構造体の中でchannelを複数持てばできます。

func main() {
    ml := NewMultiLocker()
    for i := 0; i < 1000; i++ {
        go func() {
            ml.AddA()
        }()
        go func() {
            ml.AddB()
        }()
    }
    ml.Wait()
    ml.Print()
}


type MultiCounter struct {
    valueA int
    valueB int

    chA chan int
    chB chan int
    wg sync.WaitGroup
}

func NewMultiLocker() *MultiCounter {
    mc := &MultiCounter{
        valueA: 0,
        valueB: 0,
        chA: make(chan int),
        chB: make(chan int),
    }
    mc.wg.Add(2000)
    mc.start()
    return mc
}

func (mc *MultiCounter) start() {
    go func() {
        for {
            <- mc.chA
            mc.valueA++
            mc.wg.Done()
        }
    }()

    go func() {
        for {
            <-mc.chB
            mc.valueB++
            mc.wg.Done()
        }
    }()
}

func (mc *MultiCounter) AddA() {
    mc.chA <- 1
}

func (mc *MultiCounter) AddB() {
    mc.chB <- 1
}

func (mc *MultiCounter) Wait() {
    mc.wg.Wait()
}

func (mc *MultiCounter) Print() {
    fmt.Printf("A: %d, B: %d\n", mc.valueA, mc.valueB)
}

それぞれのcounterが1000になっていることが確認できます。

Mutexで書く

これまでchannelを書いてきましたが、長いコードになってきました。
実際に使うコードで複数channelを使うようにすると、結構長くなり見通しが悪くなります。
そこで最後に、Mutexを使用して見やすくします。

func main() {
    mc := NewMutexCounter()
    for i := 0; i < 1000; i++ {
        go func() {
            mc.AddA()
        }()
        go func() {
            mc.AddB()
        }()
    }
    mc.Wait()
    mc.Print()
}

type MutexCounter struct {
    valueA int
    valueB int

    muA sync.Mutex
    muB sync.Mutex

    wg sync.WaitGroup
}

func NewMutexCounter() *MutexCounter {
    mc := &MutexCounter{
        valueA: 0,
        valueB: 0,
    }
    mc.wg.Add(2000)
    return mc
}

func (mc *MutexCounter) AddA() {
    mc.muA.Lock()
    defer mc.muA.Unlock()
    mc.valueA++
    mc.wg.Done()
}

func (mc *MutexCounter) AddB() {
    mc.muB.Lock()
    defer mc.muB.Unlock()
    mc.valueB++
    mc.wg.Done()
}

func (mc *MutexCounter) Wait() {
    mc.wg.Wait()
}

func (ml *MutexCounter) Print() {
    fmt.Printf("A: %d, B: %d\n", ml.valueA, ml.valueB)
}

チャネル部分のコードが減ってすっきりしました。
このように、channelよりもMutexを使用するほうが見た目が良いこともあるので、こういう状況になった場合channelである必要がなければMutexを使用しています。

終わりに

Goでのロックする書き方をchannelとMutexを使用して色々と書きました。

Goには「Share memory by communicating, don't communicate by sharing memory.」という考え方がありchannelを推奨していますが
golang/go のwiki(MutexOrChannel)にも

A common Go newbie mistake is to over-use channels and goroutines just because it's possible, and/or because it's fun. Don't be afraid to use a sync.Mutex if that fits your problem best.

とあるように、Mutexを使ったほうが楽なこともあるのでその時はMutexを使いましょう。