1. awakia

    Posted

    awakia
Changes in title
+GoのChannelを使いこなせるようになるための手引
Changes in tags
Changes in body
Source | HTML | Preview
@@ -0,0 +1,256 @@
+
+Go使いたくなる理由の一つに、マルチスレッドプログラミング的なものを高速な言語で安全に実装したいというのがある。
+Goにおいてそれを支えるのが、自前で実装した軽量スレッドといえるgoルーチンと、mutexなどのロックの代わりに使えるChannelという概念だ。
+
+実際に実装するときに、Goルーチンは難しくないが、Channelを使うのは割と知識と経験が必要なのでここでは、Channelについてすこし詳しく書いてみる
+
+## Message Passing
+
+まずは理論から。
+Goのチャネルなどのロックを使わない方法の並行処理はMessage Passingと呼ばれている。
+
+https://en.wikipedia.org/wiki/Message_passing
+
+ErlangのActor Modelなどもこの仲間。GoのチャネルとActor Modelは、実は、同等の概念で表現方法が違うだけらしい。
+
+### 共有メモリ vs メッセージパッシング
+
+以下のStack Overflowが分かりやすい。
+
+http://stackoverflow.com/questions/1853284/whats-the-difference-between-the-message-passing-and-shared-memory-concurrency
+
+
+|モデル|データの受け渡し(コミュニケーション)の方法|例えてみるなら...|
+|:-:|:-:|:-:|
+|Shared Memory|複数のプロセスがロックを取りながら共通のメモリにアクセスする。|全員が机を囲んでその上にある紙とペンで自由に情報をやり取り。ただし、ペンや紙の取り合いとかにならないように気をつけなければいけない。|
+|Message Passing|各プロセスはメッセージを送り合い、内容は書き変わらない。|各自自分の机に座り、自分のところにある紙を使い手紙のようにして情報をやり取り。|
+
+
+## Channelの基礎
+
+Channelの使い方について学ぶなら、`Go by Example`のチャネルのパートを読み進めるのが一番よい。実行しながら試せる。
+https://gobyexample.com/channels から https://gobyexample.com/stateful-goroutines まで。
+
+最初の例は以下。
+
+```go:channels.go
+package main
+
+import "fmt"
+
+func main() {
+ messages := make(chan string)
+ go func() { messages <- "ping" }()
+ msg := <-messages
+ fmt.Println(msg)
+}
+```
+
+```bash
+$ go run channels.go
+ping
+```
+
+自分で作ったgoルーチンとmainのgoルーチンでChannelを通して情報をロックを使わずに安全に渡せている。
+
+## 実用的なサンプル
+
+2012年のGoogle/IOで話されたRob Pikeのスライドの最後の方に書いているが今でも最も優れていると思うので紹介しておく。
+http://talks.golang.org/2012/concurrency.slide#42 から51ページまでの内容。
+
+### Google検索の例
+
+Q. Google検索って何してる?
+A. 与えられたクエリに対して、1ページ分の複数の検索結果(と広告)を返す
+Q. 検索結果はどうやって得る?
+A. クエリを、Web検索、画像検索、動画検索、マップ検索、ニュース検索などに投げ、結果を混ぜて返す
+
+どういうふうに実装する?
+
+### Google検索の模倣
+
+Web、画像、動画検索の結果を以下のように0ミリ-100ミリ秒のウェイトするだけのモックサーバーとして実装する
+
+```search.go
+var (
+ Web = fakeSearch("web")
+ Image = fakeSearch("image")
+ Video = fakeSearch("video")
+)
+
+type Search func(query string) Result
+
+func fakeSearch(kind string) Search {
+ return func(query string) Result {
+ time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
+ return Result(fmt.Sprintf("%s result for %q\n", kind, query))
+ }
+}
+```
+
+### 上のフレームワークをテストする呼び出し側の実装
+
+上の関数を呼び出す`main()`に当たる部分は`"golang"`を検索し、結果と検索にかかった時間を表示するようなプログラムとする
+
+```search.go
+func main() {
+ rand.Seed(time.Now().UnixNano())
+ start := time.Now()
+ results := Google("golang")
+ elapsed := time.Since(start)
+ fmt.Println(results)
+ fmt.Println(elapsed)
+}
+```
+
+### Google検索 1.0
+
+Web、画像、動画検索をそれぞれ順番に呼び出し結果をつなげて返す
+
+```search.go
+func Google(query string) (results []Result) {
+ results = append(results, Web(query))
+ results = append(results, Image(query))
+ results = append(results, Video(query))
+ return
+}
+```
+
+> 55ms
+
+### Google検索 2.0
+
+Web、画像、動画検索を並列で呼び出し結果をつなげて返す
+
+```search.go
+func Google(query string) (results []Result) {
+ c := make(chan Result)
+ go func() { c <- Web(query) } ()
+ go func() { c <- Image(query) } ()
+ go func() { c <- Video(query) } ()
+
+ for i := 0; i < 3; i++ {
+ result := <-c
+ results = append(results, result)
+ }
+ return
+}
+```
+
+> 28ms
+
+### Google検索 2.1
+
+もしどれかが遅くて80ミリ以上かかった場合、そのサーバーの検索結果を待たずに、残りの結果だけつなげて返す
+
+```search.go
+func Google(query string) (results []Result) {
+ c := make(chan Result)
+ go func() { c <- Web(query) } ()
+ go func() { c <- Image(query) } ()
+ go func() { c <- Video(query) } ()
+
+ timeout := time.After(80 * time.Millisecond)
+ for i := 0; i < 3; i++ {
+ select {
+ case result := <-c:
+ results = append(results, result)
+ case <-timeout:
+ fmt.Println("timed out")
+ return
+ }
+ }
+ return
+}
+```
+
+### タイムアウトを防ぐ
+
+Q. 処理に時間がかかった時に、結果が捨てられるのを防ぐためには?
+A. サーバーを冗長化する。リクエストを複数のレプリカに投げ、一番早く返ってきた結果を使う
+
+```search.go
+
+func First(query string, replicas ...Search) Result {
+ c := make(chan Result)
+ searchReplica := func(i int) { c <- replicas[i](query) }
+ for i := range replicas {
+ go searchReplica(i)
+ }
+ return <-c
+}
+```
+
+このFirstを使って`main()`を以下のように書き換え
+
+```search.go
+func main() {
+ rand.Seed(time.Now().UnixNano())
+ start := time.Now()
+ result := First("golang",
+ fakeSearch("replica 1"),
+ fakeSearch("replica 2"))
+ elapsed := time.Since(start)
+ fmt.Println(result)
+ fmt.Println(elapsed)
+}
+```
+
+
+### Google Search 3.0
+
+```search.go
+func Google(query string) (results []Result) {
+ c := make(chan Result)
+ go func() { c <- First(query, Web1, Web2) } ()
+ go func() { c <- First(query, Image1, Image2) } ()
+ go func() { c <- First(query, Video1, Video2) } ()
+ timeout := time.After(80 * time.Millisecond)
+ for i := 0; i < 3; i++ {
+ select {
+ case result := <-c:
+ results = append(results, result)
+ case <-timeout:
+ fmt.Println("timed out")
+ return
+ }
+ }
+ return
+}
+```
+
+### ここまでやっても...
+
+No locks.
+No condition variables.
+No callbacks.
+
+が保たれている。
+
+## Channelの実装について
+
+多少チャネルの実装についても紹介しておく。
+実装は以下に存在する。コメント除けば500行程度のコード。
+https://github.com/golang/go/blob/master/src/runtime/chan.go
+
+
+基本は以下の`hchan`という構造体。
+
+```
+type hchan struct {
+ qcount uint // total data in the queue
+ dataqsiz uint // size of the circular queue
+ buf unsafe.Pointer // points to an array of dataqsiz elements
+ elemsize uint16
+ closed uint32
+ elemtype *_type // element type
+ sendx uint // send index
+ recvx uint // receive index
+ recvq waitq // list of recv waiters
+ sendq waitq // list of send waiters
+ lock mutex
+}
+```
+
+重要なのは`recvq`,`sendq`という`<-c`,`c<-`に対応する2つのQueueでそこにデータを入れていく。
+双方送れる状態になった場合、ロックを持ったコピーである`memmove`を使ってデータを送信先から送信元にメモリコピーするという単純な実装。