ゴルーチンと並行性パターン

  • 57
    いいね
  • 0
    コメント
この記事は最終更新日から1年以上が経過しています。

第37回勉強会(2014/08/09) - 長岡 IT開発者 勉強会(NDS)

@hayajo

Goにおける並行性基礎

ゴルーチン

並行性のための基本機能。スレッドのようなもので、とても軽量。

goステートメントで生成される。

  • 関数
go logging("Hello, %s\n", name)
  • メソッド
go logger.Printf("Hello, %s\n", name)
  • クロージャ
go func() { // HL1
  log.Printf("Hello, %s\n", name)
}() // HL1

ゴルーチン: Example

package main

import (
    "log"
    "os"
    "time"
)

func logging(fmt string, args ...interface{}) {
    log.Printf(fmt, args...)
}

func main() {
    name := "Gopher"

    go logging("Hello, %s\n", name)

    logger := log.New(os.Stdout, "", log.Ldate | log.Ltime)
    go logger.Printf("Hello, %s\n", name)

    go func() {
        log.Printf("Hello, %s\n", name)
    }()

    time.Sleep(100 * time.Millisecond)
}

同期

syncパッケージで提供される基本的な同期機能。

  • sync.Mutex (ミューテックス)
  • sync.Once (pthread_once(3))
  • sync.WaitGroup (カウンタセマフォ)
  • sync.Pool (フリーリスト)

同期: Example - sync.WaitGroup

package main

import (
    "fmt"
    "sync"
)

func main() {
    name := "Gopher"

    var wg sync.WaitGroup

    wg.Add(1)

    go func() {
        defer wg.Done()
        fmt.Printf("Hello, %s\n", name)
    }()

    wg.Wait()
}

チャネル

ゴルーチン間の通信を提供。双方向パイプに近い。メッセージ型を指定して利用する。

doneCh := make(chan struct{})

送受信はバッファサイズ分でブロックされる。(デフォルトはバッファなし)

go func() {
  fmt.Printf("Hello, %s\n", name)
  doneCh <- struct{}{} // blocking until ready to read
}()

<-doneCh // blocking until a value is received

引数や戻り値にも使用できる。

チャネル: Example

package main

import "fmt"

func main() {
    name := "Gopher"

    doneCh := make(chan struct{})

    go func() {
        fmt.Printf("Hello, %s\n", name)
        doneCh <- struct{}{} // blocking until ready to read
    }()

    <-doneCh // blocking until a value is received
}

並行性パターン

Bockground jobs

package main

import (
    "fmt"
    "strings"
    "sync"
)

func main() {
    var wg sync.WaitGroup

    args := []string{"foo", "bar", "baz"}

    for _, a := range args {
        wg.Add(1)
        go func(str string) {
            fmt.Println(strings.ToUpper(str))
            wg.Done()
        }(a)
    }

    wg.Wait() // waiting for all goroutines to finish
}

Unbuffered channel

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func init() {
    rand.Seed(time.Now().UnixNano())
}

func from(conCh chan<- int) {
    conCh <- rand.Intn(100) // blocking until ready to read
}

func to(conCh <-chan int) {
    fmt.Printf("Received %d\n", <-conCh) // blocking until a value is received
}

func main() {
    conCh := make(chan int)
    go from(conCh)
    go to(conCh)
    time.Sleep(100 * time.Millisecond)
}

Buffered channel

package main

import (
    "log"
    "math/rand"
    "time"
)

func main() {
    limCh := make(chan struct{}, 3)

    for i := 0; i < 10; i++ {
        limCh <- struct{}{}

        go func(i int) {
            defer func() { <-limCh }()
            log.Println("BEGIN", i)
            time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
            log.Println("END", i)
        }(i)
    }

    for i := 0; i < cap(limCh); i++ {
        limCh <- struct{}{}
    }
}

Reading from multiple channels - select

package main

import "log"

func main() {
    cntCh := make(chan int)
    sumCh := make(chan int)

    go func() {
        var s int
        for i := 0; i < 1000; i++ {
            cntCh <- i
            s += i
        }
        sumCh <- s
    }()

loop:
    for {
        select {
        case c := <-cntCh:
            log.Println(c)
        case s := <-sumCh:
            log.Println("SUM", s)
            break loop
        }
    }
}

Consume all values from a channel - range

package main

import (
    "log"
    "time"
)

func main() {
    ch := make(chan int)

    go func() {
        defer func() {
            close(ch)
        }()

        for i := 0; i < 10; i++ {
            ch <- i
            time.Sleep(time.Second)
        }
    }()

    for v := range ch {
        log.Println(v)
    }
}

Signaling

package main

import (
    "log"
    "sync"
)

func main() {
    var wg sync.WaitGroup

    startCh := make(chan struct{})

    for i := 0; i < 1000; i++ {
        wg.Add(1)

        go func(i int) {
            <-startCh // blocking until the channel is closed
            log.Println(i)
            wg.Done()
        }(i)
    }

    close(startCh) // closing the channel in order to start goroutines

    wg.Wait()
}

NOTE:

チャネルからの読み込みは値とチャネルの状態を返す。

value, isOpen := <- channel

チャネルが閉じられている場合、値はゼロ値、状態はfalseとなり読み込みはブロックされない。

Safe counter

package main

import (
    "log"
    "sync"
)

func counter(ch chan<- int64) {
    go func() {
        var c int64 = 1
        for {
            ch <- c // HL
            c++
        }
    }()
}

func main() {
    var wg sync.WaitGroup

    cntCh := make(chan int64)

    counter(cntCh)

    start := make(chan struct{})

    for i := 0; i < 1000; i++ {
        wg.Add(1)

        go func(i int) {
            <-start
            log.Printf("#%03d: % 4d\n", i, <-cntCh)
            wg.Done()
        }(i)
    }

    close(start)

    wg.Wait()
}

NOTE:

単純なカウンタであれば sync.atomic パッケージでも実現可能。

Timer

package main

import (
    "container/ring"
    "fmt"
    "log"
    "time"
)

var aa []string = []string{
    "ヽ(゚∀゚)ノ",
    "( ゚∀)ノ",
    "(  ゚)ノ",
    "ヽ(  )ノ",
    "ヽ(゚  )",
    "ヽ(∀゚ )ノ",
}

func main() {
    ch := make(chan string)

    go func() {
        r := ring.New(len(aa))

        for _, s := range aa {
            r.Value = s
            r = r.Next()
        }

        for {
            r = r.Next()
            ch <- fmt.Sprintf("%v", r.Value)
            time.Sleep(100 * time.Millisecond)
        }
    }()

    timeoutCh := time.After(5 * time.Second)

loop:
    for {
        select {
        case v := <-ch:
            fmt.Print("\x0c") // New Page
            fmt.Println(v)
        case <-timeoutCh:
            log.Println("timed out")
            break loop
        }
    }
}

NOTE:

  • time.After
  • time.Tick
  • time.Sleep

Fan-out

処理を分散させる

package main

import (
    "log"
    "sync"
)

func main() {
    outCh := make(chan byte)

    go func() {
        defer func() {
            close(outCh)
        }()

        for i := 0; i < 26; i++ {
            outCh <- byte(65 + i) // A..Z
        }
    }()

    var wg sync.WaitGroup

    startCh := make(chan struct{})

    for i := 0; i < 3; i++ {
        wg.Add(1)

        go func(i int) {
            <-startCh
            for b := range outCh {
                log.Printf("#%d %s", i, string(b))
            }
            wg.Done()
        }(i)
    }

    close(startCh)

    wg.Wait()
}

Fan-in

チャネルを多重化する

package main

import (
    "bytes"
    "log"
    "sync"
)

func fanIn(ch ...<-chan byte) <-chan byte {

    var wg sync.WaitGroup

    outCh := make(chan byte)

    for _, c := range ch {
        wg.Add(1)

        go func(c <-chan byte) {
            for b := range c {
                outCh <- b
            }
            wg.Done()
        }(c)
    }

    go func() {
        wg.Wait()
        close(outCh)
    }()
    return outCh
}

func main() {
    ch1 := make(chan byte)
    ch2 := make(chan byte)

    go func() {
        defer func() {
            close(ch1)
            close(ch2)
        }()
        for i := 0; i < 26; i++ {
            ch1 <- byte(65 + i) // A..Z
            ch2 <- byte(97 + i) // a..z
        }
    }()

    var b bytes.Buffer

    outCh := fanIn(ch1, ch2)

    for v := range outCh {
        b.WriteByte(v)
    }

    log.Println(b.String())
}

Coroutine

コルーチン - Wikipedia

package main

import "log"

func hello(yield chan<- string) {
    go func() {
        yield <- "Hello"
        yield <- "Gopher"
        yield <- "Hello NDS"
    }()
}

func main() {
    co := make(chan string)

    hello(co)

    log.Println(<-co, "World")
    log.Println("Hello", <-co)
    log.Println(<-co)
}

Generator

ジェネレータ (プログラミング) - Wikipedia

package main

import (
    "log"
    "math/rand"
    "time"
)

func init() {
    rand.Seed(time.Now().UnixNano())
}

func generator(n int) <-chan int {
    yield := make(chan int)

    go func() {
        for i := 0; i < n; i++ {
            yield <- rand.Int()
        }
        close(yield)
    }()

    return yield
}

func main() {
    for v := range generator(100) {
        log.Println(v)
    }
}

Future

future - Wikipedia

package main

import (
    "fmt"
    "log"
    "strconv"
    "sync"
)

type futureUint64 struct {
    value uint64
    ch    chan uint64
    once  sync.Once
}

func (f *futureUint64) String() string {
    f.once.Do(func() { f.value = <-f.ch })
    return strconv.FormatUint(f.value, 10)
}

func Fibonacci(n uint64) fmt.Stringer {
    f := new(futureUint64)
    f.ch = make(chan uint64)
    go func() {
        f.ch <- fib(n)
    }()
    return f
}

func fib(n uint64) uint64 {
    switch n {
    case 0:
        return 0
    case 1, 2:
        return 1
    default:
        return fib(n-1) + fib(n-2)
    }
}

func main() {
    f := Fibonacci(20)
    log.Println("The 20th Fibonacci number is ", f)
}

Actor model

アクターモデル - Wikipedia

package main

import "log"

type actor chan<- interface{}

type PutReq struct {
    key, val string
}

type GetReq struct {
    key string
    ack chan<- string
}

func NewMapActor() actor {
    a := make(chan interface{})

    go func() {
        m := make(map[string]string)

        for {
            switch r := (<-a).(type) {
            case PutReq: // HL
                m[r.key] = r.val
            case GetReq: // HL
                r.ack <- m[r.key]
            }
        }
    }()

    return actor(a)
}

func main() {
    a := NewMapActor()

    a <- PutReq{key: "hoge", val: "HOGEHOGE"}

    ack := make(chan string)
    a <- GetReq{key: "hoge", ack: ack}
    log.Println(<-ack)
}

Appendix

並列度をあげる

デフォルトは1。変更は runtime.GOMAXPROCS(int) で行う。

runtime.GOMAXPROCS(2)

CPUコア数は runtime.NumCPU() で取得する。

runtime.GOMAXPROCS(runtime.NumCPU())

現在のゴルーチン数を取得

package main

import (
    "log"
    "runtime"
)

func main() {
    log.Printf("running %d goroutines", runtime.NumGoroutine())
}
  • 現在のメモリ情報を取得
package main

import (
    "encoding/json"
    "fmt"
    "runtime"
)

func main() {
    var m runtime.MemStats
    runtime.ReadMemStats(&m)
    b, _ := json.MarshalIndent(m, "", "  ")
    fmt.Println(string(b))
}