LoginSignup
9

More than 3 years have passed since last update.

posted at

Goルーチンに入門してみた

酒井潤さんがUdemyで講師をしている以下の講座のGoルーチンに関するレクチャがとてもわかりやすかったので、学習したことをまとめています。

[講座名]
現役シリコンバレーエンジニアが教えるGo入門 + 応用でビットコインのシストレFintechアプリの開発
https://www.udemy.com/go-fintech/

私もまだ全部を勉強したわけではなかったですが、文法から実際のアプリ開発まで非常によくまとまっていてわかりやすいです。

Goに関する書籍やスクールが日本で広く普及されていない中でのコスパは最高だと思ってます。

Goルーチン(というか非同期処理全般)は奥が深いので、必要に応じて使っていこうかと思う次第です。
もともとGoを学ぼうと思った動機が、簡単なコマンドラインツール(シェル)的にRubyでできるような少し凝ったことを型を使って出来たらいいぐらいの感じで使いたかったので、私はそんなに使わないかもしれません。

Goルーチンで実行する関数に戻り値がない場合

基本的な実行

ダサいですが、time.sleep関数を使っています。

package main

import (
    "fmt"
    "time"
)

// goルーチンで実行する関数
func asyncFunc(s string) {
    for i := 0; i < 5; i ++ {
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}

// メイン関数
func main() {
    // goルーチンの関数の実行
    for i := 0 ; i < 3; i ++ {
        str := fmt.Sprintf("Go routine (no: %v)", i)
        go asyncFunc(str)
    }

    // time.sleepしないとgoルーチンの実行完了よりも先にメイン関数の処理が完了してしまう。
    time.Sleep(1 * time.Second)
}

実行結果は以下のとおりです。

Go routine (no: 2)
Go routine (no: 1)
Go routine (no: 0)
Go routine (no: 1)
Go routine (no: 0)
Go routine (no: 2)
Go routine (no: 0)
Go routine (no: 1)
Go routine (no: 2)
Go routine (no: 1)
Go routine (no: 2)
Go routine (no: 0)
Go routine (no: 1)
Go routine (no: 2)
Go routine (no: 0)

sync.WaitGroupを使って実行完了を待つ

time.sleep関数で待つやり方はどのくらいsleepすればいいかわからないので、sync.WaitGroupでGoルーチンで実行する関数の完了を待つ方法。

package main

import (
    "fmt"
    "sync"
    "time"
)

// goルーチンで実行する関数
func asyncFunc(s string, wg*sync.WaitGroup) {
    defer wg.Done() // WaitGroupを最後に完了しないといけない。
    for i := 0; i < 5; i ++ {
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}

// メイン関数
func main() {
    // goルーチンで非同期に実行される処理を待つために
    // WaitGroupを使う。
    var wg sync.WaitGroup

    // goルーチンの関数の実行
    for i := 0 ; i < 3; i ++ {
        wg.Add(1) // goルーチンを実行する関数分だけAddする。
        str := fmt.Sprintf("Go routine (no: %v)", i)
        go asyncFunc(str, &wg)
    }

    // goルーチンで実行される関数が終了するまで待つ。
    wg.Wait()
}

実行結果は以下のとおりです。

Go routine (no: 2)
Go routine (no: 0)
Go routine (no: 1)
Go routine (no: 1)
Go routine (no: 2)
Go routine (no: 0)
Go routine (no: 0)
Go routine (no: 2)
Go routine (no: 1)
Go routine (no: 0)
Go routine (no: 1)
Go routine (no: 2)
Go routine (no: 2)
Go routine (no: 1)
Go routine (no: 0)

Goルーチンで実行する関数に戻り値がある場合

チャネル

チャネルを介して結果を受け取るためにチャネルを使います。

goルーチンの関数にチャネルを渡し、処理を非同期で実行し、結果を受け取る際には、ブロッキングして結果を受け取っています。

package main

import "fmt"

// goルーチンの関数
func asyncFunc(s []int, c chan int) {
    sum := 0
    for _, v := range s {
        sum += v
    }
    c <- sum
}

func main() {
    // スライスの生成
    s := []int{1, 2, 3, 4, 5}

    // チャネルの生成
    // チャネルを介してgoルーチンの関数とやりとりします。
    c := make(chan int)

    // goルーチンの関数実行。
    // 引数にチャネルを渡している。
    go asyncFunc(s, c)
    go asyncFunc(s, c)

    // 結果を受け取る
    x := <-c // ここで処理がブロッキングされる
    fmt.Println(x)

    y := <-c // ここで処理がブロッキングされる
    fmt.Println(y)
}

バッファーチャネル

チャネルにバッファサイズを定義することもできる。
goルーチンの関数側でチャネルをクローズする必要があります。

package main

import "fmt"

func asyncFunc(s []int, c chan int) {
    sum := 0
    for _, v := range s {
        sum += v
        c <- sum // チャネルに演算結果を渡している
    }
    close(c) // closeで閉じること!
}

func main() {
    s := []int{1, 2, 3, 4, 5}

    c := make(chan int, len(s)) // チャネルのバッファサイズ: 5
    go asyncFunc(s, c)

    // チャネルのバッファサイズ分だけループ処理
    for i := range c {
        fmt.Println(i)
    }
}

結果は以下のとおり。

1
3
6
10
15

上記のコードでチャネルのクローズ処理close(c)を忘れると、以下のようなデッドロックエラーが発生する。

1
3
6
10
15
fatal error: all goroutines are asleep - deadlock!

Goルーチンを使った代表的なメッセージングパターン

Producer/Consumerパターン

前述の知識を流用した応用的なパターンです。

sync.WaitGroupとチャネルを使ってProducer/Consumerパターンを実装。チャネルがメッセージキューのような役割を果たしています。
ひとつのチャネル(キュー相当)をProducerとConsumerで共有していることがポイントです。

package main

import (
    "fmt"
    "sync"
    "time"
)

// Producer
func producer(ch chan int, i int) {
    ch <- i * 2
}

// Consumer
func consumer(no int, ch chan int, wg *sync.WaitGroup) {
    for i := range ch {
        func() {
            defer wg.Done()
            time.Sleep(100 * time.Millisecond)
            fmt.Printf("process no: %v result: %v\n", no, i*1000)
        }()
    }
}

func main() {
    var wg sync.WaitGroup

    ch := make(chan int)

    // Producerでメッセージを生成する
    for i := 0; i < 100; i ++ {
        wg.Add(1)
        go producer(ch, i)
    }

    // Consumerがメッセージをポーリング
    // consumerの数を3つに設定
    for i := 0; i < 3; i ++ {
        go consumer(i, ch, &wg)
    }

    wg.Wait() // 10回Addされた分だけ待つ。
    close(ch) // 閉じないとConsumerが待ち続ける

    fmt.Println("Done")
}

process 2000
process 0
process 8000
process 4000
process 6000
process 10000
process 12000
process 14000
process 16000
process 18000
Done

fan-out/fan-inパターン

パイプライン(fan-out/fan-in)を使った並列処理も以下のように実装できます。

package main

import "fmt"

// stage1 fan-in
// firstというチャネルにメッセージを詰める
func producer(first chan int) {
    defer close(first) // firstチャネルにメッセージを詰め終わったらチャネルをクローズ

    for i := 0; i < 10; i ++ {
        first <- i
    }
}

// stage2 fan-out, fan-in
// firstチャネルからメッセージを取り出し、
// 演算後にsecondチャネルにメッセージを詰める。
// <-はなくてもいい。
func multi2(first <-chan int, second chan<- int) {
    defer close(second) // secondチャネルにメッセージを詰め終わったらチャネルをクローズ
    for i := range first {
        second <- i * 2
    }
}

// stage3 fan-out, fan-in
// secondチャネルからメッセージを取り出し、
// 演算後にthirdチャネルにメッセージを詰める。
func multi4(second <-chan int, third chan<- int) {
    defer close(third)
    for i := range second {
        third <- i * 2
    }
}

func main() {
    // チャネルの生成
    first := make(chan int)
    second := make(chan int)
    third := make(chan int)

    go producer(first)
    go multi2(first, second)
    go multi4(second, third)

    // パイプラインの結果をthirdチャネルから取り出して表示する
    for result := range third {
        fmt.Println(result)
    }
}

結果は以下のとおりです。

0
4
8
12
16
20
24
28
32
36

for-selectを使ったチャネルからのメッセージ受信

for-selectを使って同時にブロッキングせずにチャネルからメッセージを受信することができます。
処理の内容・結果の異なるgoルーチンの関数の結果を同時に処理するためのパターンかと思います。

package main

import (
    "fmt"
    "time"
)

func goroutine1(ch chan string)  {
    for {
        ch <- "packet from 1"
        time.Sleep(2 * time.Second)
    }
}

func goroutine2(ch chan int)  {
    for {
        ch <- 100
        time.Sleep(1 * time.Second)
    }
}

func main() {
    c1 := make(chan string)
    c2 := make(chan int)
    go goroutine1(c1)
    go goroutine2(c2)

    // selectを使って同時にブロッキングせずに受信する。
    for {
        select {
        case msg1 := <- c1:
            fmt.Println(msg1)
        case msg2 := <- c2:
            fmt.Println(msg2)
        }
    }
}
100
packet from 1
100
packet from 1
100
100
packet from 1
100
100
packet from 1
100
100
packet from 1
100
100
packet from 1
100
100
packet from 1
100
100
packet from 1
100
100
packet from 1
100
...

SyncのMutex

複数スレッドからの同時更新、同時参照時の不整合、デッドロックを回避するために、SyncのMutexを利用できます。

package main

import (
    "fmt"
    "sync"
    "time"
)

// mutexを持つ構造体を定義する。
type Counter struct {
    v   map[string]int
    mux sync.Mutex // mutex
}

// インクリメント
func (c *Counter) Inc(key string) {
    c.mux.Lock()         // ロックする。
    defer c.mux.Unlock() // ロックを解除する。
    c.v[key]++
}

// 値をゲットする。
func (c *Counter) Value(key string) int {
    c.mux.Lock()
    defer c.mux.Unlock()
    return c.v[key]
}

func main() {
    /* 以下のgoルーチンは同時に書き込むとエラーが発生します。
    c := make(map[string]int)

    go func() {
        for i := 0; i< 10; i++ {}
        c["key"] += 1
    }()

    go func() {
        for i := 0; i< 10; i++ {}
        c["key"] += 1
    }()

    time.Sleep(1 * time.Second)
    fmt.Println(c, c["key"])
    */


    // 以下はMutexを使って不整合が発生しないようにした書き方。
    c := Counter{v: make(map[string]int)}
    go func() {
        for i := 0; i< 10; i++ {
            c.Inc("key")
        }
    }()

    go func() {
        for i := 0; i< 10; i++ {
            c.Inc("key")
        }
    }()

    time.Sleep(1 * time.Second)
    fmt.Println(c, c.Value("key"))
}
{map[key:20] {0 0}} 20

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
9