第37回勉強会(2014/08/09) - 長岡 IT開発者 勉強会(NDS)
Goにおける並行性基礎
ゴルーチン
並行性のための基本機能。スレッドのようなもので、とても軽量。
goステートメントで生成される。
- 関数
go logging("Hello, %s\n", name)
- メソッド
go logger.Printf("Hello, %s\n", name)
- クロージャ
go func() { // HL1
log.Printf("Hello, %s\n", name)
}() // HL1
ゴルーチン: Example
package main
import (
"log"
"os"
"time"
)
func logging(fmt string, args ...interface{}) {
log.Printf(fmt, args...)
}
func main() {
name := "Gopher"
go logging("Hello, %s\n", name)
logger := log.New(os.Stdout, "", log.Ldate | log.Ltime)
go logger.Printf("Hello, %s\n", name)
go func() {
log.Printf("Hello, %s\n", name)
}()
time.Sleep(100 * time.Millisecond)
}
同期
syncパッケージで提供される基本的な同期機能。
- sync.Mutex (ミューテックス)
- sync.Once (pthread_once(3))
- sync.WaitGroup (カウンタセマフォ)
- sync.Pool (フリーリスト)
同期: Example - sync.WaitGroup
package main
import (
"fmt"
"sync"
)
func main() {
name := "Gopher"
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
fmt.Printf("Hello, %s\n", name)
}()
wg.Wait()
}
チャネル
ゴルーチン間の通信を提供。双方向パイプに近い。メッセージ型を指定して利用する。
doneCh := make(chan struct{})
送受信はバッファサイズ分でブロックされる。(デフォルトはバッファなし)
go func() {
fmt.Printf("Hello, %s\n", name)
doneCh <- struct{}{} // blocking until ready to read
}()
<-doneCh // blocking until a value is received
引数や戻り値にも使用できる。
チャネル: Example
package main
import "fmt"
func main() {
name := "Gopher"
doneCh := make(chan struct{})
go func() {
fmt.Printf("Hello, %s\n", name)
doneCh <- struct{}{} // blocking until ready to read
}()
<-doneCh // blocking until a value is received
}
並行性パターン
Bockground jobs
package main
import (
"fmt"
"strings"
"sync"
)
func main() {
var wg sync.WaitGroup
args := []string{"foo", "bar", "baz"}
for _, a := range args {
wg.Add(1)
go func(str string) {
fmt.Println(strings.ToUpper(str))
wg.Done()
}(a)
}
wg.Wait() // waiting for all goroutines to finish
}
Unbuffered channel
package main
import (
"fmt"
"math/rand"
"time"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
func from(conCh chan<- int) {
conCh <- rand.Intn(100) // blocking until ready to read
}
func to(conCh <-chan int) {
fmt.Printf("Received %d\n", <-conCh) // blocking until a value is received
}
func main() {
conCh := make(chan int)
go from(conCh)
go to(conCh)
time.Sleep(100 * time.Millisecond)
}
Buffered channel
package main
import (
"log"
"math/rand"
"time"
)
func main() {
limCh := make(chan struct{}, 3)
for i := 0; i < 10; i++ {
limCh <- struct{}{}
go func(i int) {
defer func() { <-limCh }()
log.Println("BEGIN", i)
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
log.Println("END", i)
}(i)
}
for i := 0; i < cap(limCh); i++ {
limCh <- struct{}{}
}
}
Reading from multiple channels - select
package main
import "log"
func main() {
cntCh := make(chan int)
sumCh := make(chan int)
go func() {
var s int
for i := 0; i < 1000; i++ {
cntCh <- i
s += i
}
sumCh <- s
}()
loop:
for {
select {
case c := <-cntCh:
log.Println(c)
case s := <-sumCh:
log.Println("SUM", s)
break loop
}
}
}
Consume all values from a channel - range
package main
import (
"log"
"time"
)
func main() {
ch := make(chan int)
go func() {
defer func() {
close(ch)
}()
for i := 0; i < 10; i++ {
ch <- i
time.Sleep(time.Second)
}
}()
for v := range ch {
log.Println(v)
}
}
Signaling
package main
import (
"log"
"sync"
)
func main() {
var wg sync.WaitGroup
startCh := make(chan struct{})
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(i int) {
<-startCh // blocking until the channel is closed
log.Println(i)
wg.Done()
}(i)
}
close(startCh) // closing the channel in order to start goroutines
wg.Wait()
}
NOTE:
チャネルからの読み込みは値とチャネルの状態を返す。
value, isOpen := <- channel
チャネルが閉じられている場合、値はゼロ値、状態はfalseとなり読み込みはブロックされない。
Safe counter
package main
import (
"log"
"sync"
)
func counter(ch chan<- int64) {
go func() {
var c int64 = 1
for {
ch <- c // HL
c++
}
}()
}
func main() {
var wg sync.WaitGroup
cntCh := make(chan int64)
counter(cntCh)
start := make(chan struct{})
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(i int) {
<-start
log.Printf("#%03d: % 4d\n", i, <-cntCh)
wg.Done()
}(i)
}
close(start)
wg.Wait()
}
NOTE:
単純なカウンタであれば sync.atomic パッケージでも実現可能。
Timer
package main
import (
"container/ring"
"fmt"
"log"
"time"
)
var aa []string = []string{
"ヽ(゚∀゚)ノ",
"( ゚∀)ノ",
"( ゚)ノ",
"ヽ( )ノ",
"ヽ(゚ )",
"ヽ(∀゚ )ノ",
}
func main() {
ch := make(chan string)
go func() {
r := ring.New(len(aa))
for _, s := range aa {
r.Value = s
r = r.Next()
}
for {
r = r.Next()
ch <- fmt.Sprintf("%v", r.Value)
time.Sleep(100 * time.Millisecond)
}
}()
timeoutCh := time.After(5 * time.Second)
loop:
for {
select {
case v := <-ch:
fmt.Print("\x0c") // New Page
fmt.Println(v)
case <-timeoutCh:
log.Println("timed out")
break loop
}
}
}
NOTE:
- time.After
- time.Tick
- time.Sleep
Fan-out
処理を分散させる
package main
import (
"log"
"sync"
)
func main() {
outCh := make(chan byte)
go func() {
defer func() {
close(outCh)
}()
for i := 0; i < 26; i++ {
outCh <- byte(65 + i) // A..Z
}
}()
var wg sync.WaitGroup
startCh := make(chan struct{})
for i := 0; i < 3; i++ {
wg.Add(1)
go func(i int) {
<-startCh
for b := range outCh {
log.Printf("#%d %s", i, string(b))
}
wg.Done()
}(i)
}
close(startCh)
wg.Wait()
}
Fan-in
チャネルを多重化する
package main
import (
"bytes"
"log"
"sync"
)
func fanIn(ch ...<-chan byte) <-chan byte {
var wg sync.WaitGroup
outCh := make(chan byte)
for _, c := range ch {
wg.Add(1)
go func(c <-chan byte) {
for b := range c {
outCh <- b
}
wg.Done()
}(c)
}
go func() {
wg.Wait()
close(outCh)
}()
return outCh
}
func main() {
ch1 := make(chan byte)
ch2 := make(chan byte)
go func() {
defer func() {
close(ch1)
close(ch2)
}()
for i := 0; i < 26; i++ {
ch1 <- byte(65 + i) // A..Z
ch2 <- byte(97 + i) // a..z
}
}()
var b bytes.Buffer
outCh := fanIn(ch1, ch2)
for v := range outCh {
b.WriteByte(v)
}
log.Println(b.String())
}
Coroutine
package main
import "log"
func hello(yield chan<- string) {
go func() {
yield <- "Hello"
yield <- "Gopher"
yield <- "Hello NDS"
}()
}
func main() {
co := make(chan string)
hello(co)
log.Println(<-co, "World")
log.Println("Hello", <-co)
log.Println(<-co)
}
Generator
package main
import (
"log"
"math/rand"
"time"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
func generator(n int) <-chan int {
yield := make(chan int)
go func() {
for i := 0; i < n; i++ {
yield <- rand.Int()
}
close(yield)
}()
return yield
}
func main() {
for v := range generator(100) {
log.Println(v)
}
}
Future
package main
import (
"fmt"
"log"
"strconv"
"sync"
)
type futureUint64 struct {
value uint64
ch chan uint64
once sync.Once
}
func (f *futureUint64) String() string {
f.once.Do(func() { f.value = <-f.ch })
return strconv.FormatUint(f.value, 10)
}
func Fibonacci(n uint64) fmt.Stringer {
f := new(futureUint64)
f.ch = make(chan uint64)
go func() {
f.ch <- fib(n)
}()
return f
}
func fib(n uint64) uint64 {
switch n {
case 0:
return 0
case 1, 2:
return 1
default:
return fib(n-1) + fib(n-2)
}
}
func main() {
f := Fibonacci(20)
log.Println("The 20th Fibonacci number is ", f)
}
Actor model
package main
import "log"
type actor chan<- interface{}
type PutReq struct {
key, val string
}
type GetReq struct {
key string
ack chan<- string
}
func NewMapActor() actor {
a := make(chan interface{})
go func() {
m := make(map[string]string)
for {
switch r := (<-a).(type) {
case PutReq: // HL
m[r.key] = r.val
case GetReq: // HL
r.ack <- m[r.key]
}
}
}()
return actor(a)
}
func main() {
a := NewMapActor()
a <- PutReq{key: "hoge", val: "HOGEHOGE"}
ack := make(chan string)
a <- GetReq{key: "hoge", ack: ack}
log.Println(<-ack)
}
Appendix
並列度をあげる
デフォルトは1。変更は runtime.GOMAXPROCS(int) で行う。
runtime.GOMAXPROCS(2)
CPUコア数は runtime.NumCPU() で取得する。
runtime.GOMAXPROCS(runtime.NumCPU())
現在のゴルーチン数を取得
package main
import (
"log"
"runtime"
)
func main() {
log.Printf("running %d goroutines", runtime.NumGoroutine())
}
- 現在のメモリ情報を取得
package main
import (
"encoding/json"
"fmt"
"runtime"
)
func main() {
var m runtime.MemStats
runtime.ReadMemStats(&m)
b, _ := json.MarshalIndent(m, "", " ")
fmt.Println(string(b))
}