Help us understand the problem. What is going on with this article?

ゴルーチンと並行性パターン

More than 5 years have passed since last update.

第37回勉強会(2014/08/09) - 長岡 IT開発者 勉強会(NDS)

@hayajo

Goにおける並行性基礎

ゴルーチン

並行性のための基本機能。スレッドのようなもので、とても軽量。

goステートメントで生成される。

  • 関数
go logging("Hello, %s\n", name)
  • メソッド
go logger.Printf("Hello, %s\n", name)
  • クロージャ
go func() { // HL1
  log.Printf("Hello, %s\n", name)
}() // HL1

ゴルーチン: Example

package main

import (
    "log"
    "os"
    "time"
)

func logging(fmt string, args ...interface{}) {
    log.Printf(fmt, args...)
}

func main() {
    name := "Gopher"

    go logging("Hello, %s\n", name)

    logger := log.New(os.Stdout, "", log.Ldate | log.Ltime)
    go logger.Printf("Hello, %s\n", name)

    go func() {
        log.Printf("Hello, %s\n", name)
    }()

    time.Sleep(100 * time.Millisecond)
}

同期

syncパッケージで提供される基本的な同期機能。

  • sync.Mutex (ミューテックス)
  • sync.Once (pthread_once(3))
  • sync.WaitGroup (カウンタセマフォ)
  • sync.Pool (フリーリスト)

同期: Example - sync.WaitGroup

package main

import (
    "fmt"
    "sync"
)

func main() {
    name := "Gopher"

    var wg sync.WaitGroup

    wg.Add(1)

    go func() {
        defer wg.Done()
        fmt.Printf("Hello, %s\n", name)
    }()

    wg.Wait()
}

チャネル

ゴルーチン間の通信を提供。双方向パイプに近い。メッセージ型を指定して利用する。

doneCh := make(chan struct{})

送受信はバッファサイズ分でブロックされる。(デフォルトはバッファなし)

go func() {
  fmt.Printf("Hello, %s\n", name)
  doneCh <- struct{}{} // blocking until ready to read
}()

<-doneCh // blocking until a value is received

引数や戻り値にも使用できる。

チャネル: Example

package main

import "fmt"

func main() {
    name := "Gopher"

    doneCh := make(chan struct{})

    go func() {
        fmt.Printf("Hello, %s\n", name)
        doneCh <- struct{}{} // blocking until ready to read
    }()

    <-doneCh // blocking until a value is received
}

並行性パターン

Bockground jobs

package main

import (
    "fmt"
    "strings"
    "sync"
)

func main() {
    var wg sync.WaitGroup

    args := []string{"foo", "bar", "baz"}

    for _, a := range args {
        wg.Add(1)
        go func(str string) {
            fmt.Println(strings.ToUpper(str))
            wg.Done()
        }(a)
    }

    wg.Wait() // waiting for all goroutines to finish
}

Unbuffered channel

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func init() {
    rand.Seed(time.Now().UnixNano())
}

func from(conCh chan<- int) {
    conCh <- rand.Intn(100) // blocking until ready to read
}

func to(conCh <-chan int) {
    fmt.Printf("Received %d\n", <-conCh) // blocking until a value is received
}

func main() {
    conCh := make(chan int)
    go from(conCh)
    go to(conCh)
    time.Sleep(100 * time.Millisecond)
}

Buffered channel

package main

import (
    "log"
    "math/rand"
    "time"
)

func main() {
    limCh := make(chan struct{}, 3)

    for i := 0; i < 10; i++ {
        limCh <- struct{}{}

        go func(i int) {
            defer func() { <-limCh }()
            log.Println("BEGIN", i)
            time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
            log.Println("END", i)
        }(i)
    }

    for i := 0; i < cap(limCh); i++ {
        limCh <- struct{}{}
    }
}

Reading from multiple channels - select

package main

import "log"

func main() {
    cntCh := make(chan int)
    sumCh := make(chan int)

    go func() {
        var s int
        for i := 0; i < 1000; i++ {
            cntCh <- i
            s += i
        }
        sumCh <- s
    }()

loop:
    for {
        select {
        case c := <-cntCh:
            log.Println(c)
        case s := <-sumCh:
            log.Println("SUM", s)
            break loop
        }
    }
}

Consume all values from a channel - range

package main

import (
    "log"
    "time"
)

func main() {
    ch := make(chan int)

    go func() {
        defer func() {
            close(ch)
        }()

        for i := 0; i < 10; i++ {
            ch <- i
            time.Sleep(time.Second)
        }
    }()

    for v := range ch {
        log.Println(v)
    }
}

Signaling

package main

import (
    "log"
    "sync"
)

func main() {
    var wg sync.WaitGroup

    startCh := make(chan struct{})

    for i := 0; i < 1000; i++ {
        wg.Add(1)

        go func(i int) {
            <-startCh // blocking until the channel is closed
            log.Println(i)
            wg.Done()
        }(i)
    }

    close(startCh) // closing the channel in order to start goroutines

    wg.Wait()
}

NOTE:

チャネルからの読み込みは値とチャネルの状態を返す。

value, isOpen := <- channel

チャネルが閉じられている場合、値はゼロ値、状態はfalseとなり読み込みはブロックされない。

Safe counter

package main

import (
    "log"
    "sync"
)

func counter(ch chan<- int64) {
    go func() {
        var c int64 = 1
        for {
            ch <- c // HL
            c++
        }
    }()
}

func main() {
    var wg sync.WaitGroup

    cntCh := make(chan int64)

    counter(cntCh)

    start := make(chan struct{})

    for i := 0; i < 1000; i++ {
        wg.Add(1)

        go func(i int) {
            <-start
            log.Printf("#%03d: % 4d\n", i, <-cntCh)
            wg.Done()
        }(i)
    }

    close(start)

    wg.Wait()
}

NOTE:

単純なカウンタであれば sync.atomic パッケージでも実現可能。

Timer

package main

import (
    "container/ring"
    "fmt"
    "log"
    "time"
)

var aa []string = []string{
    "ヽ(゚∀゚)ノ",
    "( ゚∀)ノ",
    "(  ゚)ノ",
    "ヽ(  )ノ",
    "ヽ(゚  )",
    "ヽ(∀゚ )ノ",
}

func main() {
    ch := make(chan string)

    go func() {
        r := ring.New(len(aa))

        for _, s := range aa {
            r.Value = s
            r = r.Next()
        }

        for {
            r = r.Next()
            ch <- fmt.Sprintf("%v", r.Value)
            time.Sleep(100 * time.Millisecond)
        }
    }()

    timeoutCh := time.After(5 * time.Second)

loop:
    for {
        select {
        case v := <-ch:
            fmt.Print("\x0c") // New Page
            fmt.Println(v)
        case <-timeoutCh:
            log.Println("timed out")
            break loop
        }
    }
}

NOTE:

  • time.After
  • time.Tick
  • time.Sleep

Fan-out

処理を分散させる

package main

import (
    "log"
    "sync"
)

func main() {
    outCh := make(chan byte)

    go func() {
        defer func() {
            close(outCh)
        }()

        for i := 0; i < 26; i++ {
            outCh <- byte(65 + i) // A..Z
        }
    }()

    var wg sync.WaitGroup

    startCh := make(chan struct{})

    for i := 0; i < 3; i++ {
        wg.Add(1)

        go func(i int) {
            <-startCh
            for b := range outCh {
                log.Printf("#%d %s", i, string(b))
            }
            wg.Done()
        }(i)
    }

    close(startCh)

    wg.Wait()
}

Fan-in

チャネルを多重化する

package main

import (
    "bytes"
    "log"
    "sync"
)

func fanIn(ch ...<-chan byte) <-chan byte {

    var wg sync.WaitGroup

    outCh := make(chan byte)

    for _, c := range ch {
        wg.Add(1)

        go func(c <-chan byte) {
            for b := range c {
                outCh <- b
            }
            wg.Done()
        }(c)
    }

    go func() {
        wg.Wait()
        close(outCh)
    }()
    return outCh
}

func main() {
    ch1 := make(chan byte)
    ch2 := make(chan byte)

    go func() {
        defer func() {
            close(ch1)
            close(ch2)
        }()
        for i := 0; i < 26; i++ {
            ch1 <- byte(65 + i) // A..Z
            ch2 <- byte(97 + i) // a..z
        }
    }()

    var b bytes.Buffer

    outCh := fanIn(ch1, ch2)

    for v := range outCh {
        b.WriteByte(v)
    }

    log.Println(b.String())
}

Coroutine

コルーチン - Wikipedia

package main

import "log"

func hello(yield chan<- string) {
    go func() {
        yield <- "Hello"
        yield <- "Gopher"
        yield <- "Hello NDS"
    }()
}

func main() {
    co := make(chan string)

    hello(co)

    log.Println(<-co, "World")
    log.Println("Hello", <-co)
    log.Println(<-co)
}

Generator

ジェネレータ (プログラミング) - Wikipedia

package main

import (
    "log"
    "math/rand"
    "time"
)

func init() {
    rand.Seed(time.Now().UnixNano())
}

func generator(n int) <-chan int {
    yield := make(chan int)

    go func() {
        for i := 0; i < n; i++ {
            yield <- rand.Int()
        }
        close(yield)
    }()

    return yield
}

func main() {
    for v := range generator(100) {
        log.Println(v)
    }
}

Future

future - Wikipedia

package main

import (
    "fmt"
    "log"
    "strconv"
    "sync"
)

type futureUint64 struct {
    value uint64
    ch    chan uint64
    once  sync.Once
}

func (f *futureUint64) String() string {
    f.once.Do(func() { f.value = <-f.ch })
    return strconv.FormatUint(f.value, 10)
}

func Fibonacci(n uint64) fmt.Stringer {
    f := new(futureUint64)
    f.ch = make(chan uint64)
    go func() {
        f.ch <- fib(n)
    }()
    return f
}

func fib(n uint64) uint64 {
    switch n {
    case 0:
        return 0
    case 1, 2:
        return 1
    default:
        return fib(n-1) + fib(n-2)
    }
}

func main() {
    f := Fibonacci(20)
    log.Println("The 20th Fibonacci number is ", f)
}

Actor model

アクターモデル - Wikipedia

package main

import "log"

type actor chan<- interface{}

type PutReq struct {
    key, val string
}

type GetReq struct {
    key string
    ack chan<- string
}

func NewMapActor() actor {
    a := make(chan interface{})

    go func() {
        m := make(map[string]string)

        for {
            switch r := (<-a).(type) {
            case PutReq: // HL
                m[r.key] = r.val
            case GetReq: // HL
                r.ack <- m[r.key]
            }
        }
    }()

    return actor(a)
}

func main() {
    a := NewMapActor()

    a <- PutReq{key: "hoge", val: "HOGEHOGE"}

    ack := make(chan string)
    a <- GetReq{key: "hoge", ack: ack}
    log.Println(<-ack)
}

Appendix

並列度をあげる

デフォルトは1。変更は runtime.GOMAXPROCS(int) で行う。

runtime.GOMAXPROCS(2)

CPUコア数は runtime.NumCPU() で取得する。

runtime.GOMAXPROCS(runtime.NumCPU())

現在のゴルーチン数を取得

package main

import (
    "log"
    "runtime"
)

func main() {
    log.Printf("running %d goroutines", runtime.NumGoroutine())
}
  • 現在のメモリ情報を取得
package main

import (
    "encoding/json"
    "fmt"
    "runtime"
)

func main() {
    var m runtime.MemStats
    runtime.ReadMemStats(&m)
    b, _ := json.MarshalIndent(m, "", "  ")
    fmt.Println(string(b))
}
hayajo
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away