第37回勉強会(2014/08/09) - 長岡 IT開発者 勉強会(NDS)
Goにおける並行性基礎
ゴルーチン
並行性のための基本機能。スレッドのようなもので、とても軽量。
goステートメントで生成される。
- 関数
 
go logging("Hello, %s\n", name)
- メソッド
 
go logger.Printf("Hello, %s\n", name)
- クロージャ
 
go func() { // HL1
  log.Printf("Hello, %s\n", name)
}() // HL1
ゴルーチン: Example
package main
import (
	"log"
	"os"
	"time"
)
func logging(fmt string, args ...interface{}) {
	log.Printf(fmt, args...)
}
func main() {
	name := "Gopher"
	go logging("Hello, %s\n", name)
	logger := log.New(os.Stdout, "", log.Ldate | log.Ltime)
	go logger.Printf("Hello, %s\n", name)
	go func() {
		log.Printf("Hello, %s\n", name)
	}()
	time.Sleep(100 * time.Millisecond)
}
同期
syncパッケージで提供される基本的な同期機能。
- sync.Mutex (ミューテックス)
 - sync.Once (pthread_once(3))
 - sync.WaitGroup (カウンタセマフォ)
 - sync.Pool (フリーリスト)
 
同期: Example - sync.WaitGroup
package main
import (
	"fmt"
	"sync"
)
func main() {
	name := "Gopher"
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		fmt.Printf("Hello, %s\n", name)
	}()
	wg.Wait()
}
チャネル
ゴルーチン間の通信を提供。双方向パイプに近い。メッセージ型を指定して利用する。
doneCh := make(chan struct{})
送受信はバッファサイズ分でブロックされる。(デフォルトはバッファなし)
go func() {
  fmt.Printf("Hello, %s\n", name)
  doneCh <- struct{}{} // blocking until ready to read
}()
<-doneCh // blocking until a value is received
引数や戻り値にも使用できる。
チャネル: Example
package main
import "fmt"
func main() {
	name := "Gopher"
	doneCh := make(chan struct{})
	go func() {
		fmt.Printf("Hello, %s\n", name)
		doneCh <- struct{}{} // blocking until ready to read
	}()
	<-doneCh // blocking until a value is received
}
並行性パターン
Bockground jobs
package main
import (
	"fmt"
	"strings"
	"sync"
)
func main() {
	var wg sync.WaitGroup
	args := []string{"foo", "bar", "baz"}
	for _, a := range args {
		wg.Add(1)
		go func(str string) {
			fmt.Println(strings.ToUpper(str))
			wg.Done()
		}(a)
	}
	wg.Wait() // waiting for all goroutines to finish
}
Unbuffered channel
package main
import (
	"fmt"
	"math/rand"
	"time"
)
func init() {
	rand.Seed(time.Now().UnixNano())
}
func from(conCh chan<- int) {
	conCh <- rand.Intn(100) // blocking until ready to read
}
func to(conCh <-chan int) {
	fmt.Printf("Received %d\n", <-conCh) // blocking until a value is received
}
func main() {
	conCh := make(chan int)
	go from(conCh)
	go to(conCh)
	time.Sleep(100 * time.Millisecond)
}
Buffered channel
package main
import (
	"log"
	"math/rand"
	"time"
)
func main() {
	limCh := make(chan struct{}, 3)
	for i := 0; i < 10; i++ {
		limCh <- struct{}{}
		go func(i int) {
			defer func() { <-limCh }()
			log.Println("BEGIN", i)
			time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
			log.Println("END", i)
		}(i)
	}
	for i := 0; i < cap(limCh); i++ {
		limCh <- struct{}{}
	}
}
Reading from multiple channels - select
package main
import "log"
func main() {
	cntCh := make(chan int)
	sumCh := make(chan int)
	go func() {
		var s int
		for i := 0; i < 1000; i++ {
			cntCh <- i
			s += i
		}
		sumCh <- s
	}()
loop:
	for {
		select {
		case c := <-cntCh:
			log.Println(c)
		case s := <-sumCh:
			log.Println("SUM", s)
			break loop
		}
	}
}
Consume all values from a channel - range
package main
import (
	"log"
	"time"
)
func main() {
	ch := make(chan int)
	go func() {
		defer func() {
			close(ch)
		}()
		for i := 0; i < 10; i++ {
			ch <- i
			time.Sleep(time.Second)
		}
	}()
	for v := range ch {
		log.Println(v)
	}
}
Signaling
package main
import (
	"log"
	"sync"
)
func main() {
	var wg sync.WaitGroup
	startCh := make(chan struct{})
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func(i int) {
			<-startCh // blocking until the channel is closed
			log.Println(i)
			wg.Done()
		}(i)
	}
	close(startCh) // closing the channel in order to start goroutines
	wg.Wait()
}
NOTE:
チャネルからの読み込みは値とチャネルの状態を返す。
value, isOpen := <- channel
チャネルが閉じられている場合、値はゼロ値、状態はfalseとなり読み込みはブロックされない。
Safe counter
package main
import (
	"log"
	"sync"
)
func counter(ch chan<- int64) {
	go func() {
		var c int64 = 1
		for {
			ch <- c // HL
			c++
		}
	}()
}
func main() {
	var wg sync.WaitGroup
	cntCh := make(chan int64)
	counter(cntCh)
	start := make(chan struct{})
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func(i int) {
			<-start
			log.Printf("#%03d: % 4d\n", i, <-cntCh)
			wg.Done()
		}(i)
	}
	close(start)
	wg.Wait()
}
NOTE:
単純なカウンタであれば sync.atomic パッケージでも実現可能。
Timer
package main
import (
	"container/ring"
	"fmt"
	"log"
	"time"
)
var aa []string = []string{
	"ヽ(゚∀゚)ノ",
	"( ゚∀)ノ",
	"(  ゚)ノ",
	"ヽ(  )ノ",
	"ヽ(゚  )",
	"ヽ(∀゚ )ノ",
}
func main() {
	ch := make(chan string)
	go func() {
		r := ring.New(len(aa))
		for _, s := range aa {
			r.Value = s
			r = r.Next()
		}
		for {
			r = r.Next()
			ch <- fmt.Sprintf("%v", r.Value)
			time.Sleep(100 * time.Millisecond)
		}
	}()
	timeoutCh := time.After(5 * time.Second)
loop:
	for {
		select {
		case v := <-ch:
			fmt.Print("\x0c") // New Page
			fmt.Println(v)
		case <-timeoutCh:
			log.Println("timed out")
			break loop
		}
	}
}
NOTE:
- time.After
 - time.Tick
 - time.Sleep
 
Fan-out
処理を分散させる
package main
import (
	"log"
	"sync"
)
func main() {
	outCh := make(chan byte)
	go func() {
		defer func() {
			close(outCh)
		}()
		for i := 0; i < 26; i++ {
			outCh <- byte(65 + i) // A..Z
		}
	}()
	var wg sync.WaitGroup
	startCh := make(chan struct{})
	for i := 0; i < 3; i++ {
		wg.Add(1)
		go func(i int) {
			<-startCh
			for b := range outCh {
				log.Printf("#%d %s", i, string(b))
			}
			wg.Done()
		}(i)
	}
	close(startCh)
	wg.Wait()
}
Fan-in
チャネルを多重化する
package main
import (
	"bytes"
	"log"
	"sync"
)
func fanIn(ch ...<-chan byte) <-chan byte {
	var wg sync.WaitGroup
	outCh := make(chan byte)
	for _, c := range ch {
		wg.Add(1)
		go func(c <-chan byte) {
			for b := range c {
				outCh <- b
			}
			wg.Done()
		}(c)
	}
	go func() {
		wg.Wait()
		close(outCh)
	}()
	return outCh
}
func main() {
	ch1 := make(chan byte)
	ch2 := make(chan byte)
	go func() {
		defer func() {
			close(ch1)
			close(ch2)
		}()
		for i := 0; i < 26; i++ {
			ch1 <- byte(65 + i) // A..Z
			ch2 <- byte(97 + i) // a..z
		}
	}()
	var b bytes.Buffer
	outCh := fanIn(ch1, ch2)
	for v := range outCh {
		b.WriteByte(v)
	}
	log.Println(b.String())
}
Coroutine
package main
import "log"
func hello(yield chan<- string) {
	go func() {
		yield <- "Hello"
		yield <- "Gopher"
		yield <- "Hello NDS"
	}()
}
func main() {
	co := make(chan string)
	hello(co)
	log.Println(<-co, "World")
	log.Println("Hello", <-co)
	log.Println(<-co)
}
Generator
package main
import (
	"log"
	"math/rand"
	"time"
)
func init() {
	rand.Seed(time.Now().UnixNano())
}
func generator(n int) <-chan int {
	yield := make(chan int)
	go func() {
		for i := 0; i < n; i++ {
			yield <- rand.Int()
		}
		close(yield)
	}()
	return yield
}
func main() {
	for v := range generator(100) {
		log.Println(v)
	}
}
Future
package main
import (
	"fmt"
	"log"
	"strconv"
	"sync"
)
type futureUint64 struct {
	value uint64
	ch    chan uint64
	once  sync.Once
}
func (f *futureUint64) String() string {
	f.once.Do(func() { f.value = <-f.ch })
	return strconv.FormatUint(f.value, 10)
}
func Fibonacci(n uint64) fmt.Stringer {
	f := new(futureUint64)
	f.ch = make(chan uint64)
	go func() {
		f.ch <- fib(n)
	}()
	return f
}
func fib(n uint64) uint64 {
	switch n {
	case 0:
		return 0
	case 1, 2:
		return 1
	default:
		return fib(n-1) + fib(n-2)
	}
}
func main() {
	f := Fibonacci(20)
	log.Println("The 20th Fibonacci number is ", f)
}
Actor model
package main
import "log"
type actor chan<- interface{}
type PutReq struct {
	key, val string
}
type GetReq struct {
	key string
	ack chan<- string
}
func NewMapActor() actor {
	a := make(chan interface{})
	go func() {
		m := make(map[string]string)
		for {
			switch r := (<-a).(type) {
			case PutReq: // HL
				m[r.key] = r.val
			case GetReq: // HL
				r.ack <- m[r.key]
			}
		}
	}()
	return actor(a)
}
func main() {
	a := NewMapActor()
	a <- PutReq{key: "hoge", val: "HOGEHOGE"}
	ack := make(chan string)
	a <- GetReq{key: "hoge", ack: ack}
	log.Println(<-ack)
}
Appendix
並列度をあげる
デフォルトは1。変更は runtime.GOMAXPROCS(int) で行う。
runtime.GOMAXPROCS(2)
CPUコア数は runtime.NumCPU() で取得する。
runtime.GOMAXPROCS(runtime.NumCPU())
現在のゴルーチン数を取得
package main
import (
	"log"
	"runtime"
)
func main() {
	log.Printf("running %d goroutines", runtime.NumGoroutine())
}
- 現在のメモリ情報を取得
 
package main
import (
	"encoding/json"
	"fmt"
	"runtime"
)
func main() {
	var m runtime.MemStats
	runtime.ReadMemStats(&m)
	b, _ := json.MarshalIndent(m, "", "  ")
	fmt.Println(string(b))
}