Help us understand the problem. What is going on with this article?

Goの並行処理パターンを使って速攻で分かった気になる

この記事はチームラボエンジニアリングアドベントカレンダー10日目の記事です。
昨日の記事は、bonobono555さんによるCompletableFutureの公式ドキュメントを読んでも分からないので Java逆引きレシピを片手に理解するでした。

はじめに

趣味でGoを触るようになって半年ぐらいは経ったと思います!
こんなにハマったのは並行処理がめちゃくちゃ面白かったからです。

関数型でも基本的に上から処理されますよね?
(手続き型だと、基本上から、関数型は高階関数多用してると上からでは無くなりますが)
それがgoroutineを使うことによって簡単に並行に処理をさせる事が出来るんです!
これを、他の言語で同じ事やろうとしたらすごく難しいんです!(やった事ないから知らんけど)

とりあえずムチャクチャ楽しいんでこの記事がその入り口にでもなってくれたら嬉しいです!
この記事では詳しい説明より要点をまとめた並行処理パターンを紹介します!

並行処理とは? 並列処理と混ざってない?

突然ですが、並行と並列の違い分かりますか??

一言で言うと、
並行処理は1人がその場の判断で沢山の仕事をこなす事
並列処理は複数人が仕事を同時にこなす事
です!

並行処理はあくまで処理を同時に行ってはいないんです。
この様に処理を切り替えて行います。(人の目には同時に見える)
並行処理.png

それに比べ、並列処理は処理を同時に行っています。
並列処理.png

これが並行と並列のザックリとした違いです。

goroutineとは?

goroutine (ゴルーチン) は Go ランタイムによって管理される軽量スレッドです。スレッドのようでスレッドでなく、coroutine のようで coroutine ではありません。しかし、スレッドで coroutine を実行するかのごとく利用することが可能で、かつそれを簡単な構文で実現できます。

個人的にはこの説明が一番しっくり来ました!

詳しい説明はここでは行いませんが、goroutineの大元はメルヴィン・コンウェイさんのcoroutineの考え方を参考にしているみたいです!

syncパッケージ

まあ大枠はここまでにして(自分が説明するより良記事がある)早速実際に動かしてみましょう!

syncパッケージはGoの標準パッケージに用意されています。
相互排他ロック(チャネルのブロックの様な動き)をしてくれたり、複数のゴルーチンを扱う上でブロックを提供してくれたりします。
goroutineってほっとくと簡単に終わってしまったり(厳密には消えないけど)想定と違うタイミングで変数を上書きする事もあります。

これは説明するより見る方が早いです!

これ↓を実行するとどうなると思いますか??

func main() {
  go func() {
      fmt.Println("Hello")
  }()
}

"Hello"が出力されないんです!
ここでのgoroutineはメインゴルーチン(main関数)によって生成されているので、メインゴルーチンが終わると終わってしまいます。

こんな風に並行処理を始めると今まで見ないタイプの想定外に出くわすことが多いです!

syncパッケージを実際に使って解決してみましょう!

WaitGroup型

WaitGroupを使うとWait()を呼び出すと内部のカウンターが0になるまでブロックしてくれます!

func main() {
  var wg sync.WaitGroup

  wg.Add(1) // 内部のカウンターを1インクリメント
  go func() {
      defer wg.Done() // 内部のカウンターを1デクリメント
      fmt.Println("Hello")
  }()

  wg.Wait() // 内部のカウンターが0になるまで待機
}

先ほどのコードをこのように書き直すと出力される様になりましたね!

他にも頻出するのは
- Mutex & RWMutext型
- Cond型
- Once型
- Pool型
などなどが挙げられると思いますが、ここら辺の記事はたくさんあるのでそちらを見て見てください。

channel

並行処理を使う上で通信によってメモリ共有を提供してくれる代物です。
チャネルの説明としてとても分かりやすかったのが

水が流れる川のように、チャネルは情報の流れの水路として機能します。値はチャネルに沿って 渡され、そこから下流に読み込まれます。

本当にこの通りでストリームの考え方と近いかも知れません。

また、Goの設計思想でも下記の様に述べられています。

Do not communicate by sharing memory; instead, share memory by communicating.

メモリを共有することで通信しようとしないこと。代わりに通信することでメモリを共有すること。

チャネルのクセとか並行処理で気をつける事とかあれこれ

この後からはすぐに使える並行処理パターンを紹介していきます!

その前に最低限のチャネルの特性や並行処理で気をつける事などを抑えておきましょう。

ブロック

チャネルはブロックして、待つ事が出来ます。

(読み込みの場合)
↓で書き込みの処理(3行目)が無いと5行目でデッドロックを引き起こしてしまいます!

stringStream := make(chan string)
go func() {
  stringStream <- "Hello, World" // チャネルに流す
}()
fmt.Println(<-stringStream) // stringStreamに何か(string)が流れるまでブロックされる

チャネルのブロックを解消する方法は他にも

close(stringStream)

の様にチャネルを閉じてあげる事で <-stringStreamの部分が検知する事が出来ます。

また、配列と同じ様にチャネルもrangeで回す事が出来ます。

for str := range stringStream {
  fmt.Println(str)
}

rangeのfor文も先程と同じ様に、closeを検知してループが終了します。

また、バッファ付きチャネルは宣言時に決めたn個が入るまでブロックされます。

intStream := make(chan int, 5) // バッファが5
for integer := range intStream { // intStreamに5つ流れるまでブロックされる(ループが始まらない)
  fmt.Println(integer)
}

select文

これから紹介する並行処理パターンではselect文も欠かせません!

select {
case <-channel1:
  fmt.Println("channel1 set up")
case <-channel2:
  fmt.Println("channel2 set up")
}

select文はどのチャネルが準備出来たかを評価します。
どのチャネルも準備出来ていない場合はselect文全体がブロックします。
これはイメージ湧きづらいと思うので、並行処理パターン紹介の時に出て来た時にみて見てください!

チャネルには読み込み専用と書き込み専用とその両方行えるものがあります。
どのチャネルにどの権限を与えるかもしっかりと考えた方が良いです。
しかし、便利な事に以下の様にして権限の割り振りが出来ます。

ch := make(chan string) // 書き込み、読み込み両方できる
hoge() := func(stringStream <-chan string) { // 引数に取ったチャネルが読み込み権限のみ
  fmt.Println(<-stringStream) // 書き込みは出来ない 
}(ch) // 普通に渡せる!

使ったら片付ける!

goroutineってほっとくと簡単に終わってしまったり(厳密には消えないけど)想定と違うタイミングで変数を上書きする事もあります。

先程上で厳密には消えないと述べましたが、これには理由があります。
ゴルーチンはランタイムによってガベージコレクションされないので片付け無ければなりません。
いくら軽量だからと言って放置する事は思わぬメモリリーク(ゴルーチンリーク)を引き起こす事になりかねません。

あるgoroutineがgoroutineの生成の責務を担うのであれば、そのgoroutineを停止出来るようにする責務も担わなくてはなりません。(名言)

goroutine生成元(親)から子への終了のシグナルには慣習としてdoneという名前の読み込みチャネルにします。
インターフェイス型はgolangのコミュニティでも敬遠されるイメージがありますが、doneチャネルに関しては問題無いかと思ってます。

doSomething := func(done <-chan interface{}, stringStream <-chan string) {
  for {
    select {
    case s := <-stringStream: // 今回は関係無いです
      fmt.Println(s)
    case <-done: // close(done)を検知し、doSomethingのgoroutineを終了する
      fmt.Println("doSomething 終了")
      return
    }
  }
}

done := make(chan interface{})

go func() { // 5秒後にdoneチャネルを閉じる
  time.Sleep(5 * time.Second) 
  close(done)
}()

doSomething(done, nil) // 5秒後に終了する

こんな風に親から子を終了させます。
任意のタイミングで複数のgoroutineを(ほぼ)同時に終了させることも簡単に出来ますね!

並行処理パターン

お待たせしました!
ちょっと長くなりましたが、明日から使える並行処理パターンを紹介していきます。
並行処理パターンではステージを並べたパイプラインを意識します。
ステージは受け取るものと返すものが同じ型でなければなりません。
パイプラインを使う事で各ステージでの懸念事項を切り分けることが出来ます。

いきなりステージとか言われてもなんやよう分からんですよね。。
そこで最初に意識して欲しいのはジェネレーターです。
パイプラインの始めにはチャネルへの変換を必要とするデータの塊が必ずあるはずです。
その責務を担うのが、ジェネレーターです。

// intを受け取り、intのチャネルを返す
generator := func(done <-chan interface{}, integers ...int) <-chan int {
  intStream := make(chan int, len(integers)) // generator goroutineの中で返却用のチャネルを作る
  go func() { // 別goroutineでintStreamへ値を流していく
    defer close(intStream)
    for _, i := range integers {
      select {
      case <-done:
        fmt.Println("generator goroutine終了")
        return
      case intStream <- i:
      }
    }
  }()
  return intStream
}

これだけでも並行処理の恩恵に感動しませんか!?
今まで「配列作って、値入れて、そこからfor文で処理する」としていた物が「配列(チャネルだけど)に値を流しながら、順番にfor文で処理する」出来るんです!
単純に考えて早くなりそうですよね!

このチャネルを使ってパイプライン処理を書いてみましょう!

// ただintStreamから流れてきた物に引数の値(multiplier)を乗算しているだけです笑
// 今までfor文で回していたような処理ですね!
multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <-chan int {
  multipliedStream := make(chan int)
  go func() {
    defer close(multipliedStream)
    for i := range intStream {
      select {
      case <-done:
        return
      case multipliedStream <- i * multiplier;
      }
    }
  }()
  return multipliedStream
}

done := make(chan interface{})
defer close(done)

intStream := generator(done, 1, 2, 3, 4, 5)
pipeline := multiply(done, intStream, 2)

for v := range pipeline {
  fmt.Println(v)
}

これが並行処理の基本の一連の流れです!
雛形とまでは言えませんが、中の処理を変えるだけで、様々な処理が出来ると思います。
- ステージ
- パイプライン
- ジェネレーター
- doneチャネル
この要点を抑えるだけで綺麗な並行処理を行う事が出来ます!
これがパターンです!!!笑
ステージをもっと増やして組み合わせても問題ありません。

素数を5000まで出力すると、並行処理とブロック処理で2倍以上早かったです!

generator := func(done <-chan interface{}, num int) <-chan int {
  intStream := make(chan int)
  go func() {
    defer close(intStream)
    for i := 0; i < num; i++ {
      select {
      case <-done:
        return
      case intStream <- i:
      }
    }
  }()

  return intStream
}

// 素数判定(あってるかな?)
isPrimeNum := func(num int) bool {
  if num == 1 || num == 2 {
    for i := 2; i < num; i++ {
      if num%i == 0 {
        return false
      }
    }
  }

  return true
}

primeFinder := func(done <-chan interface{}, valueStream <-chan int) <-chan int {
  intStream := make(chan int)
  go func() {
    defer close(intStream)
    for v := range valueStream {
      if isPrimeNum(v) {
        intStream <- v
      }
    }
  }()

  return intStream
}

done := make(chan interface{})
defer close(done)

// ステージを複数使う時は重ねて記述しましょう!
for v := range primeFinder(done, generator(done, 5000)) {
  fmt.Println(v)
}

自身もまだまだなので間違いがあるかも知れませんが、温かく見守って頂ければと思います。
最後までありがとうございました!

明日の記事はyanananaさんによるジェネリクスを見ても逃げないです。

参考

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした