LoginSignup
53
29

More than 3 years have passed since last update.

chan chan は意外と美味しい

Last updated at Posted at 2019-12-14

すっかり寒くなってきてチャンチャン焼きが美味しい今日この頃ですね(^_^)

food_chanchanyaki.png

ところで、Go言語でchannelをchannelで受け渡し出来ること、ご存知でしょうか。
自分の周囲では使っている人少なそうですが、意外と便利なので使用例をいくつか紹介したいと思います。

使用例 1: Request/Response

channelは通常片方向の受け渡しですが、channelを二重にすることでレスポンスを受け取ることができます。

例えば処理結果のerrorを受け取りたい場合は chan chan error を使用します。

reqc := make(chan chan error)

リクエストを送る側は chan error をmakeして、chan chanに送信します。
このchannelに結果が返ってきます(結果が返るまでブロックされます)。


ch := make(chan error)
reqc <- ch // リクエスト送信!!

err := <-ch  // 終了結果を待機

リクエストを受ける側は受け取ったchannelに処理結果(ここではerror)を返します。

// 例えばfor-selectループ
for {
    select {
    case ch := <-reqc:
        err := doSomething()
        ch <- err // errorを返す
    }
}

サンプルコード

package main

import (
    "fmt"
    "log"
    "time"
)

type Daemon struct {
    reqc chan chan error
}

func (d *Daemon) StartLoop() {
    go func() {
        for {
            select {
            case ch := <-d.reqc:
                err := doSomething()

                ch <- err // errorを返す
            }
        }
    }()
}

func (d *Daemon) DoSomething() error {
    ch := make(chan error)
    d.reqc <- ch                 // リクエスト送信!!
    if err := <-ch; err != nil { // 終了結果を待機
        return err
    }
    return nil
}

func main() {

    d := &Daemon{
        reqc: make(chan chan error),
    }

    d.StartLoop()

    if err := d.DoSomething(); err != nil {
        log.Fatal(err)
    }

    fmt.Println("done")
}

参考

Advanced Go Concurrency 3 つのパターン 1
Understanding Chan Chan's in Go
https://github.com/micro/go-micro/blob/dd7677e6cca1f48e1a41bd7ce3f5edf3f4a8d9dc/api/server/http/http.go#L30

使用例 2: 処理の待機

Request/Responseと類似ですが、単純に処理完了を待機するのに使えます。
githubのOSSを検索した限りだとこの用例が比較的多かったです。

結果を返す必要はないのでchan chanは空構造体型で用意します。

reqc := make(chan chan struct{})

リクエストを送る側は chan struct{} をmakeして、chan chanに送信します。
処理完了はcloseによって通知されるのでchannel受信でそれを待ちます(完了までブロックされます)。


ch := make(chan error)
reqc <- ch // リクエスト送信!!

<-ch  // 終了結果を待機(closeされたら再開)

リクエストを受ける側は処理が完了したら受け取ったchannelをcloseします。

for {
    select {
    case ch := <-reqc:
        doSomething()
        close(ch) // 処理完了を通知する為のclose
    }
}

サンプルコード

package main

import (
    "fmt"
    "time"
)

type Daemon struct {
    reqc chan chan struct{}
}

func (d *Daemon) StartLoop() {
    go func() {
        for {
            select {
            case ch := <-d.reqc:
                doSomething()

                close(ch) // 処理完了を通知する為のclose
            }
        }
    }()
}

func (d *Daemon) DoSomething() {
    ch := make(chan struct{})
    d.reqc <- ch
    <-ch // 終了を待機(closeされたら再開)
}

func main() {

    d := &Daemon{
        reqc: make(chan chan struct{}),
    }

    d.StartLoop()

    d.DoSomething()

    fmt.Println("done")
}

参考

https://github.com/peco/peco/blob/dc15605aee1581634602cef22af6212e24a8fbe6/screen.go#L24
https://github.com/go-kit/kit/blob/e2d71a06a40aa95cb82ccd72e854893612c02db7/sd/eureka/instancer.go#L20

使用例 3: Subscriber登録

ある意味素直な用途ですが、Publisher/Subscriberで、Subscriber(channel)を登録するのに使用します。

サンプルコード


package main

import (
    "fmt"
    "time"
)

type PubSub struct {
    subscribe   chan chan string
    unsubscribe chan chan string
    publish     chan string
}

func (ps *PubSub) Subscribe(sub chan string) {
    ps.subscribe <- sub
}

func (ps *PubSub) Unsubscribe(sub chan string) {
    ps.unsubscribe <- sub
}

func (ps *PubSub) Publish(msg string) {
    ps.publish <- msg
}

func (ps *PubSub) Start() {
    go func() {
        subscribers := make(map[chan string]struct{})

        for {
            select {
            case ch := <-ps.subscribe:
                subscribers[ch] = struct{}{}
            case ch := <-ps.unsubscribe:
                delete(subscribers, ch)
            case msg := <-ps.publish:
                for sub := range subscribers {
                    select {
                    case sub <- msg:
                    default:
                    }
                }
            }
        }
    }()
}

参考

https://github.com/google/gocw/blob/c04bd445135da75ae4ab557a7e98f34fe7e78083/util/broker.go#L22
https://github.com/ethereum/go-ethereum/blob/736b45a87606e6cdfd5aecf38d259517b10e7f7e/eth/downloader/api.go#L33

使用例 4: 順序の保証

実は個人的には一番よく使う用例です。

goroutineで処理を高速化しつつ、順序を保証したいときにchan chanを利用します。

例えば大きなファイルを読み込みながら、1行ごとになんらかのフェッチ(DBアクセス, RPC, etc)をして加工するパイプライン処理を考えます。

↓はchan chan適用前のコードです。

package main

import (
    "fmt"
    "math/rand"
    "time"
)

// ファイルから1行ずつ読み込む
func readSomething() <-chan string {
    outCh := make(chan string)

    go func() {
        defer close(outCh)

        for i := 0; i < 1000; i++ {
            // ファイルを読みこむ代わりにSleep
            time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond)
            outCh <- fmt.Sprintf("line:%d", i)
        }
    }()

    return outCh
}

// フェッチ&加工
func fetchSomething(inCh <-chan string) <-chan string {
    outCh := make(chan string)

    go func() {
        defer close(outCh)

        for line := range inCh {
            // フェッチする代わりにSleep
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
            outCh <- fmt.Sprintf("%s ... fetched!", line)
        }
    }()

    return outCh
}

func main() {
    start := time.Now()

    for line := range fetchSomething(readSomething()) {
        fmt.Println(line)
    }

    fmt.Println("done", time.Now().Sub(start))
}

実行結果

line:0 ... fetched!
line:1 ... fetched!
line:2 ... fetched!
line:3 ... fetched!
(中略)
line:996 ... fetched!
line:997 ... fetched!
line:998 ... fetched!
line:999 ... fetched!
done 49.956134795s

goroutine使ってますがほぼ直列処理なのですごく時間がかかっています。
そこでフェッチ処理をgoroutineで並行処理して高速化をはかります。

↓chan chanはまだ出てきません。

// フェッチ&加工
func fetchSomething(inCh <-chan string) <-chan string {
    outCh := make(chan string)

    c := context.Background()

    go func() {
        defer close(outCh)

        var wg sync.WaitGroup

        sem := semaphore.NewWeighted(10)

        for line := range inCh {
            wg.Add(1)
            sem.Acquire(c, 1)

            go func(line string) {
                defer wg.Done()
                defer sem.Release(1)

                // フェッチする代わりにSleep
                time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
                outCh <- fmt.Sprintf("%s ... fetched!", line)
            }(line)
        }

        wg.Wait()
    }()

    return outCh
}

無条件に全部goroutineでぶん回せば劇速になりますが、入力ファイルが巨大になるとその分メモリなどリソースを消費するためセマフォ(golang.org/x/sync/semaphore) を使って同時実行数を10に制御しています。

実行結果

line:3 ... fetched!
line:8 ... fetched!
line:7 ... fetched!
line:2 ... fetched!
line:6 ... fetched!
line:0 ... fetched!
(中略)
line:991 ... fetched!
line:989 ... fetched!
line:995 ... fetched!
line:993 ... fetched!
line:996 ... fetched!
done 5.059474157s

かなり速くなりました!

しかしよく見ると結果が変わっています。
行は全て出力されていそうですが、順序が入れ替わってしまっています。
これはフェッチの為の複数goroutineが順不同で結果を出力channelに書き込んでいるからです。

出力が順不同でよければこれでOKです。
しかし、ときには出力順を入力と揃えたい、というケースもあるでしょう。

そんなとき chan chan の出番です!


// フェッチ&加工
func fetchSomething(inCh <-chan string) <-chan string {
    outChCh := make(chan chan string, 10)

    go func() {
        defer close(outChCh)

        for line := range inCh {
            outCh := make(chan string)
            outChCh <- outCh

            go func(line string) {
                // フェッチする代わりにSleep
                time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
                outCh <- fmt.Sprintf("%s ... fetched!", line)
            }(line)
        }
    }()

    // chan chanをそのまま後続処理に渡してもよいが、ここではchanに変換しておく
    outCh := make(chan string)
    go func() {
        defer close(outCh)

        for ch := range outChCh {
            outCh <- <-ch
        }
    }()

    return outCh
}

出力用バッファ付きchan chanを作成し、上流channelからデータを受け取ったらまずフェッチ処理出力用channelを作成してそれを出力用chan chanへ送信していまいます。
そしてgoroutineでフェッチ処理を行い、結果をフェッチ処理出力用channelへ送信します。

コードのちょっと面白い点として、前バージョンで使用していたセマフォがなくなっています。
chan chanのバッファサイズがセマフォの役割も担っています。
WaitGroupも不要になったのでコードが気持ちスッキリしました。(^^)v

実行結果

line:0 ... fetched!
line:1 ... fetched!
line:2 ... fetched!
line:3 ... fetched!
line:4 ... fetched!
line:5 ... fetched!
(中略)
line:995 ... fetched!
line:996 ... fetched!
line:997 ... fetched!
line:998 ... fetched!
line:999 ... fetched!
done 7.275727182s

整いました!\(^o^)/

順不同バージョンよりも若干スループットが落ちてますがこれは仕組み上仕方ないところです。
chan chanのバッファサイズでスループットとメモリリソースのトレードオフを調整できます。

参考

(2019.12.16追記: コメントで教えていただきました :bow: )
ASCII.jp:GoでたたくTCPソケット(後編)|Goならわかるシステムプログラミング
HTTPのパイプライニングでレスポンスの順序保証に使用

(2019.12.18追記: コメントで教えていただきました :bow: )
https://github.com/sago35/ochan
chan + 順序制御 = ochan (2019/07/13)

出力の順序保証を行うライブラリ

おしまいちゃんちゃん

OSSから利用例をたくさん探して紹介する予定でしたが存外見つからず、少しか紹介できませんでした(´・ω・`)
他にも面白い使用例ご存知でしたらコメントで教えていただけると嬉しいです。


  1. 昔参加していた勉強会の記事がひっかかった。懐かしい・・・(^ω^) 

53
29
4

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
53
29