Edited at

ゴルーチンと並行性パターン

More than 3 years have passed since last update.

第37回勉強会(2014/08/09) - 長岡 IT開発者 勉強会(NDS)

@hayajo


Goにおける並行性基礎


ゴルーチン

並行性のための基本機能。スレッドのようなもので、とても軽量。

goステートメントで生成される。


  • 関数

go logging("Hello, %s\n", name)


  • メソッド

go logger.Printf("Hello, %s\n", name)


  • クロージャ

go func() { // HL1

log.Printf("Hello, %s\n", name)
}() // HL1


ゴルーチン: Example

package main

import (
"log"
"os"
"time"
)

func logging(fmt string, args ...interface{}) {
log.Printf(fmt, args...)
}

func main() {
name := "Gopher"

go logging("Hello, %s\n", name)

logger := log.New(os.Stdout, "", log.Ldate | log.Ltime)
go logger.Printf("Hello, %s\n", name)

go func() {
log.Printf("Hello, %s\n", name)
}()

time.Sleep(100 * time.Millisecond)
}


同期

syncパッケージで提供される基本的な同期機能。


  • sync.Mutex (ミューテックス)

  • sync.Once (pthread_once(3))

  • sync.WaitGroup (カウンタセマフォ)

  • sync.Pool (フリーリスト)


同期: Example - sync.WaitGroup

package main

import (
"fmt"
"sync"
)

func main() {
name := "Gopher"

var wg sync.WaitGroup

wg.Add(1)

go func() {
defer wg.Done()
fmt.Printf("Hello, %s\n", name)
}()

wg.Wait()
}


チャネル

ゴルーチン間の通信を提供。双方向パイプに近い。メッセージ型を指定して利用する。

doneCh := make(chan struct{})

送受信はバッファサイズ分でブロックされる。(デフォルトはバッファなし)

go func() {

fmt.Printf("Hello, %s\n", name)
doneCh <- struct{}{} // blocking until ready to read
}()

<-doneCh // blocking until a value is received

引数や戻り値にも使用できる。


チャネル: Example

package main

import "fmt"

func main() {
name := "Gopher"

doneCh := make(chan struct{})

go func() {
fmt.Printf("Hello, %s\n", name)
doneCh <- struct{}{} // blocking until ready to read
}()

<-doneCh // blocking until a value is received
}


並行性パターン


Bockground jobs

package main

import (
"fmt"
"strings"
"sync"
)

func main() {
var wg sync.WaitGroup

args := []string{"foo", "bar", "baz"}

for _, a := range args {
wg.Add(1)
go func(str string) {
fmt.Println(strings.ToUpper(str))
wg.Done()
}(a)
}

wg.Wait() // waiting for all goroutines to finish
}


Unbuffered channel

package main

import (
"fmt"
"math/rand"
"time"
)

func init() {
rand.Seed(time.Now().UnixNano())
}

func from(conCh chan<- int) {
conCh <- rand.Intn(100) // blocking until ready to read
}

func to(conCh <-chan int) {
fmt.Printf("Received %d\n", <-conCh) // blocking until a value is received
}

func main() {
conCh := make(chan int)
go from(conCh)
go to(conCh)
time.Sleep(100 * time.Millisecond)
}


Buffered channel

package main

import (
"log"
"math/rand"
"time"
)

func main() {
limCh := make(chan struct{}, 3)

for i := 0; i < 10; i++ {
limCh <- struct{}{}

go func(i int) {
defer func() { <-limCh }()
log.Println("BEGIN", i)
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
log.Println("END", i)
}(i)
}

for i := 0; i < cap(limCh); i++ {
limCh <- struct{}{}
}
}


Reading from multiple channels - select

package main

import "log"

func main() {
cntCh := make(chan int)
sumCh := make(chan int)

go func() {
var s int
for i := 0; i < 1000; i++ {
cntCh <- i
s += i
}
sumCh <- s
}()

loop:
for {
select {
case c := <-cntCh:
log.Println(c)
case s := <-sumCh:
log.Println("SUM", s)
break loop
}
}
}


Consume all values from a channel - range

package main

import (
"log"
"time"
)

func main() {
ch := make(chan int)

go func() {
defer func() {
close(ch)
}()

for i := 0; i < 10; i++ {
ch <- i
time.Sleep(time.Second)
}
}()

for v := range ch {
log.Println(v)
}
}


Signaling

package main

import (
"log"
"sync"
)

func main() {
var wg sync.WaitGroup

startCh := make(chan struct{})

for i := 0; i < 1000; i++ {
wg.Add(1)

go func(i int) {
<-startCh // blocking until the channel is closed
log.Println(i)
wg.Done()
}(i)
}

close(startCh) // closing the channel in order to start goroutines

wg.Wait()
}

NOTE:

チャネルからの読み込みは値とチャネルの状態を返す。

value, isOpen := <- channel

チャネルが閉じられている場合、値はゼロ値、状態はfalseとなり読み込みはブロックされない。


Safe counter

package main

import (
"log"
"sync"
)

func counter(ch chan<- int64) {
go func() {
var c int64 = 1
for {
ch <- c // HL
c++
}
}()
}

func main() {
var wg sync.WaitGroup

cntCh := make(chan int64)

counter(cntCh)

start := make(chan struct{})

for i := 0; i < 1000; i++ {
wg.Add(1)

go func(i int) {
<-start
log.Printf("#%03d: % 4d\n", i, <-cntCh)
wg.Done()
}(i)
}

close(start)

wg.Wait()
}

NOTE:

単純なカウンタであれば sync.atomic パッケージでも実現可能。


Timer

package main

import (
"container/ring"
"fmt"
"log"
"time"
)

var aa []string = []string{
"ヽ(゚∀゚)ノ",
"( ゚∀)ノ",
"(  ゚)ノ",
"ヽ(  )ノ",
"ヽ(゚  )",
"ヽ(∀゚ )ノ",
}

func main() {
ch := make(chan string)

go func() {
r := ring.New(len(aa))

for _, s := range aa {
r.Value = s
r = r.Next()
}

for {
r = r.Next()
ch <- fmt.Sprintf("%v", r.Value)
time.Sleep(100 * time.Millisecond)
}
}()

timeoutCh := time.After(5 * time.Second)

loop:
for {
select {
case v := <-ch:
fmt.Print("\x0c") // New Page
fmt.Println(v)
case <-timeoutCh:
log.Println("timed out")
break loop
}
}
}

NOTE:


  • time.After

  • time.Tick

  • time.Sleep


Fan-out

処理を分散させる

package main

import (
"log"
"sync"
)

func main() {
outCh := make(chan byte)

go func() {
defer func() {
close(outCh)
}()

for i := 0; i < 26; i++ {
outCh <- byte(65 + i) // A..Z
}
}()

var wg sync.WaitGroup

startCh := make(chan struct{})

for i := 0; i < 3; i++ {
wg.Add(1)

go func(i int) {
<-startCh
for b := range outCh {
log.Printf("#%d %s", i, string(b))
}
wg.Done()
}(i)
}

close(startCh)

wg.Wait()
}


Fan-in

チャネルを多重化する

package main

import (
"bytes"
"log"
"sync"
)

func fanIn(ch ...<-chan byte) <-chan byte {

var wg sync.WaitGroup

outCh := make(chan byte)

for _, c := range ch {
wg.Add(1)

go func(c <-chan byte) {
for b := range c {
outCh <- b
}
wg.Done()
}(c)
}

go func() {
wg.Wait()
close(outCh)
}()
return outCh
}

func main() {
ch1 := make(chan byte)
ch2 := make(chan byte)

go func() {
defer func() {
close(ch1)
close(ch2)
}()
for i := 0; i < 26; i++ {
ch1 <- byte(65 + i) // A..Z
ch2 <- byte(97 + i) // a..z
}
}()

var b bytes.Buffer

outCh := fanIn(ch1, ch2)

for v := range outCh {
b.WriteByte(v)
}

log.Println(b.String())
}


Coroutine

コルーチン - Wikipedia

package main

import "log"

func hello(yield chan<- string) {
go func() {
yield <- "Hello"
yield <- "Gopher"
yield <- "Hello NDS"
}()
}

func main() {
co := make(chan string)

hello(co)

log.Println(<-co, "World")
log.Println("Hello", <-co)
log.Println(<-co)
}


Generator

ジェネレータ (プログラミング) - Wikipedia

package main

import (
"log"
"math/rand"
"time"
)

func init() {
rand.Seed(time.Now().UnixNano())
}

func generator(n int) <-chan int {
yield := make(chan int)

go func() {
for i := 0; i < n; i++ {
yield <- rand.Int()
}
close(yield)
}()

return yield
}

func main() {
for v := range generator(100) {
log.Println(v)
}
}


Future

future - Wikipedia

package main

import (
"fmt"
"log"
"strconv"
"sync"
)

type futureUint64 struct {
value uint64
ch chan uint64
once sync.Once
}

func (f *futureUint64) String() string {
f.once.Do(func() { f.value = <-f.ch })
return strconv.FormatUint(f.value, 10)
}

func Fibonacci(n uint64) fmt.Stringer {
f := new(futureUint64)
f.ch = make(chan uint64)
go func() {
f.ch <- fib(n)
}()
return f
}

func fib(n uint64) uint64 {
switch n {
case 0:
return 0
case 1, 2:
return 1
default:
return fib(n-1) + fib(n-2)
}
}

func main() {
f := Fibonacci(20)
log.Println("The 20th Fibonacci number is ", f)
}


Actor model

アクターモデル - Wikipedia

package main

import "log"

type actor chan<- interface{}

type PutReq struct {
key, val string
}

type GetReq struct {
key string
ack chan<- string
}

func NewMapActor() actor {
a := make(chan interface{})

go func() {
m := make(map[string]string)

for {
switch r := (<-a).(type) {
case PutReq: // HL
m[r.key] = r.val
case GetReq: // HL
r.ack <- m[r.key]
}
}
}()

return actor(a)
}

func main() {
a := NewMapActor()

a <- PutReq{key: "hoge", val: "HOGEHOGE"}

ack := make(chan string)
a <- GetReq{key: "hoge", ack: ack}
log.Println(<-ack)
}


Appendix


並列度をあげる

デフォルトは1。変更は runtime.GOMAXPROCS(int) で行う。

runtime.GOMAXPROCS(2)

CPUコア数は runtime.NumCPU() で取得する。

runtime.GOMAXPROCS(runtime.NumCPU())


現在のゴルーチン数を取得

package main

import (
"log"
"runtime"
)

func main() {
log.Printf("running %d goroutines", runtime.NumGoroutine())
}


  • 現在のメモリ情報を取得

package main

import (
"encoding/json"
"fmt"
"runtime"
)

func main() {
var m runtime.MemStats
runtime.ReadMemStats(&m)
b, _ := json.MarshalIndent(m, "", " ")
fmt.Println(string(b))
}