GoのChannelを使いこなせるようになるための手引

  • 181
    いいね
  • 0
    コメント
この記事は最終更新日から1年以上が経過しています。

Go使いたくなる理由の一つに、マルチスレッドプログラミング的なものを高速な言語で安全に実装したいというのがある。Goにおいてそれを支えるのが、自前で実装した軽量スレッドといえるgoルーチンと、mutexなどのロックの代わりに使えるChannelという概念だ。

実際に実装するときに、Goルーチンは難しくないが、Channelを使うのは割と知識と経験が必要なのでここでは、Channelについてすこし詳しく書いてみる。

Message Passing

まずは理論から。
Goのチャネルなどのロックを使わない方法の並行処理はMessage Passingと呼ばれている。
以下の英語版Wikipediaにあるように数学的な理論にもなっているしっかりした枠組み。

https://en.wikipedia.org/wiki/Message_passing

ErlangのActor Modelなどもこの仲間。GoのチャネルとActor Modelは、実は、同等の概念で表現方法が違うだけらしい。
(ref: http://talks.golang.org/2012/concurrency.slide#10)

共有メモリ vs メッセージパッシング

並行処理に関してメッセージパッシングの対照的な概念が共有メモリ。違いの説明は以下のStack Overflowが分かりやすい。

http://stackoverflow.com/questions/1853284/whats-the-difference-between-the-message-passing-and-shared-memory-concurrency

モデル データの受け渡し(コミュニケーション)の方法 例えてみるなら...
Shared Memory 複数のプロセスがロックを取りながら共通のメモリにアクセスする。 全員が机を囲んでその上にある紙とペンで自由に情報をやり取り。ただし、ペンや紙の取り合いとかにならないように気をつけなければいけない。
Message Passing 各プロセスはメッセージを送り合い、内容は書き変わらない。 各自自分の机に座り、自分のところにある紙を使い手紙のようにして情報をやり取り。

Channelの基礎

Channelの使い方について学ぶなら、Go by Exampleのチャネルのパートを読み進めるのが一番よい。実行しながら試せる。
https://gobyexample.com/channels から https://gobyexample.com/stateful-goroutines まで。

最初の例は以下。

channels.go
package main

import "fmt"

func main() {
    messages := make(chan string)
    go func() { messages <- "ping" }()
    msg := <-messages
    fmt.Println(msg)
}
$ go run channels.go
ping

自分で作ったgoルーチンとmainのgoルーチンでChannelを通して情報をロックを使わずに安全に渡せている。

実用的なサンプル

2012年のGoogle/IOで話されたRob Pikeのスライドの最後の方に書いているが今でも最も優れていると思うので紹介しておく。
http://talks.golang.org/2012/concurrency.slide#42 から51ページまでの内容。

いろんなマイクロサービスに同時にリクエストして結果を返す例をGoogle検索に例えてはなしている。特に2.0->3.0が秀逸。遅い結果はすて、その分レプリカに並行リクエストする。

Google検索の例

Q. Google検索って何してる?
A. 与えられたクエリに対して、1ページ分の複数の検索結果(と広告)を返す
Q. 検索結果はどうやって得る?
A. クエリを、Web検索、画像検索、動画検索、マップ検索、ニュース検索などに投げ、結果を混ぜて返す

どういうふうに実装する?

Google検索の模倣

Web、画像、動画検索の結果を以下のように0ミリ-100ミリ秒のウェイトするだけのモックサーバーとして実装する

search.go
var (
    Web = fakeSearch("web")
    Image = fakeSearch("image")
    Video = fakeSearch("video")
)

type Search func(query string) Result

func fakeSearch(kind string) Search {
        return func(query string) Result {
              time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
              return Result(fmt.Sprintf("%s result for %q\n", kind, query))
        }
}

上のフレームワークをテストする呼び出し側の実装

上の関数を呼び出すmain()に当たる部分は"golang"を検索し、結果と検索にかかった時間を表示するようなプログラムとする

search.go
func main() {
    rand.Seed(time.Now().UnixNano())
    start := time.Now()
    results := Google("golang")
    elapsed := time.Since(start)
    fmt.Println(results)
    fmt.Println(elapsed)
}

Google検索 1.0

Web、画像、動画検索をそれぞれ順番に呼び出し結果をつなげて返す

search.go
func Google(query string) (results []Result) {
    results = append(results, Web(query))
    results = append(results, Image(query))
    results = append(results, Video(query))
    return
}

55ms

Google検索 2.0

Web、画像、動画検索を並列で呼び出し結果をつなげて返す

search.go
func Google(query string) (results []Result) {
    c := make(chan Result)
    go func() { c <- Web(query) } ()
    go func() { c <- Image(query) } ()
    go func() { c <- Video(query) } ()

    for i := 0; i < 3; i++ {
        result := <-c
        results = append(results, result)
    }
    return
}

28ms

Google検索 2.1

もしどれかが遅くて80ミリ以上かかった場合、そのサーバーの検索結果を待たずに、残りの結果だけつなげて返す

search.go
func Google(query string) (results []Result) {
    c := make(chan Result)
    go func() { c <- Web(query) } ()
    go func() { c <- Image(query) } ()
    go func() { c <- Video(query) } ()

    timeout := time.After(80 * time.Millisecond)
    for i := 0; i < 3; i++ {
        select {
        case result := <-c:
            results = append(results, result)
        case <-timeout:
            fmt.Println("timed out")
            return
        }
    }
    return
}

タイムアウトを防ぐ

Q. 処理に時間がかかった時に、結果が捨てられるのを防ぐためには?
A. サーバーを冗長化する。リクエストを複数のレプリカに投げ、一番早く返ってきた結果を使う

search.go

func First(query string, replicas ...Search) Result {
    c := make(chan Result)
    searchReplica := func(i int) { c <- replicas[i](query) }
    for i := range replicas {
        go searchReplica(i)
    }
    return <-c
}

このFirstを使ってmain()を以下のように書き換え

search.go
func main() {
    rand.Seed(time.Now().UnixNano())
    start := time.Now()
    result := First("golang",
        fakeSearch("replica 1"),
        fakeSearch("replica 2"))
    elapsed := time.Since(start)
    fmt.Println(result)
    fmt.Println(elapsed)
}

Google Search 3.0

search.go
func Google(query string) (results []Result) {
    c := make(chan Result)
    go func() { c <- First(query, Web1, Web2) } ()
    go func() { c <- First(query, Image1, Image2) } ()
    go func() { c <- First(query, Video1, Video2) } ()
    timeout := time.After(80 * time.Millisecond)
    for i := 0; i < 3; i++ {
        select {
        case result := <-c:
            results = append(results, result)
        case <-timeout:
            fmt.Println("timed out")
            return
        }
    }
    return
}

ここまでやっても...

No locks.
No condition variables.
No callbacks.

が保たれている。

Channelの実装について

多少チャネルの実装についても紹介しておく。

実装は以下に存在する。コメント除けば500行程度のコード。
https://github.com/golang/go/blob/master/src/runtime/chan.go

基本は以下のhchanという構造体。

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters
    lock     mutex
}

重要なのはrecvq,sendqという<-c,c<-に対応する2つのQueueで、そこに送受信したいメモリ情報が入っていく。
双方送れる状態になった場合、ロックを持ったコピー機能のmemmoveを使って、データを送信先から送信元にメモリコピーするという単純な実装。

アドベントカレンダー

この記事はGoその3レーンの初日の記事です。

もあるので是非参照してみてください。Go人気ですね!