LoginSignup
15
9

More than 3 years have passed since last update.

Go runtimeの内部実装を覗きながら、チャンネルのことを知る

Posted at

Intro

Goは平行処理の利便性と簡単さで選ばれることが多い言語です。

その基礎にある主なコンポーネントはGoRoutinesとチャンネルの二つで、一緒い使われることが非常に多いです。Goのチュートリアルをやったことあるなら大体触ったことある身近なものでしょうが、チャンネルとは実際どういったデータ構造なのか?内部のロジックや処理順番はどうなっているのか?と意外と分かりづらいところも多いので、今回はチャンネルのソースコードを覗きながらその構造について少し説明して行きます。

まずチャンネルのソースコードが見れる場所です。
コンピュータにGoがインストールされている場合は、Goがインストールされているroot dirからchan.goというファイルを探します。(GoのパッケージやGOPATHが指している場所ではなくruntimeの場所なので、例えばMacのHomebrewによってGoがインストールしてある場合は/usr/local/Cellar/go/[version]/libexec/src/runtime/chan.goみたいな場所にあります。

また、golangの公式Githubからも見れます:
https://github.com/golang/go/blob/master/src/runtime/chan.go

ここで参照するソースコードは主にchan.goと、その他関連のsrc/runtime/[].goのページからの抜粋です。

まず、Goのチャンネルは何をするものなのか?

goroutine同士でデータやメッセージの引き渡し、双方向のread/writeの処理を可能にするデータ構造です。データや処理のパイプラインという表現でよく説明されます。

特徴

  • 読み込み順番はFIFO
  • goroutine-safe (goroutine同士でメモリー扱いの安全性を担保する)
  • goroutineのスケジューリング(block/unblock処理)をコントロールできる

一つのデータ型にしては責務が多いですね。

作成例

// nilチャンネルを宣言。nilチャンネルのままだと書き込み、読み込み不可能。
var myChan chan int 

// チャンネルを作成&初期化。書き込み・読み込み可能。
myChan := make(chan int) 

*チャンネルの作成時に指定したチャンネルのデータ型(上記はint)以外のデータ型を扱えないので要注意。

channelは主に3種類ある

  • synchronous (バッファなし, mutexあり)
  • asynchronous (バッファあり)
  • asynchronous, 0-size (chan struct{}) (バッファなし)

バッファ無しチャンネルは容量がないため、receiver, senderの両方が必要。
バッファを指定すると、バッファがいっぱいになるまで書き込みができ、値を読み込むことでバッファの容量が開くのでasync処理が可能。
大きさゼロ、バッファ無しのchan struct{} = semaphoreです。

チャンネルの中身

まず作成してみる:

awesomeChan := make(chan string) 
fmt.Printf("awesomeChan = %v\n", awesomeChan)

channelのvalueを出力してみると、メモリーアドレスが帰ってきます。
なぜかと言うと、チャンネルの実態は、hchanというデータ構造へのポインターです。

makechanの実装を見てみましょう。

( go/src/runtime/chan.go )

func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }

    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    var c *hchan
    switch {
    case mem == 0:
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
    }
    return c
}

解読していきます。

func makechan(t *chantype, size int) *hchan {
    elem := t.elem
...

作成時にチャンネルが扱うデータ型chantypeオブジェクトへのポインター、そしてバッファの大きさを指定します。

chantypeの実装はsrc/runtime/type.goの中にあります。

// compiler checks this but be safe.
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
...

データ型の大きさやoverflowなどのチェックです。compilerが良しなにやってくれるので省略します。

var c *hchan
...

return c

ここはチャンネルの本性、hchanオブジェクトです。

makechanの戻り値を見ていると、チャンネルと言うものはhchanへのポインターだと分かります。

make(chan ...) でチャンネルを作成すると、heap上hchanというデータ構造用にメモリーがallocateされ、このhchanへのポインターを返します。

hchanの値の設定を見ていると、polymorphicなバッファを持っているものだとすぐ分かります:

( go/src/runtime/chan.go )

switch {
    case mem == 0:
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)

hchanは何かと言いますと、ロックの持っているqueueを拡張したイメージです。

中身を見ていきます。

( go/src/runtime/chan.go )

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters

...
    lock mutex
}

hchanは環状queuebuf、 バッファ内の書き込み位置のindexsendxと読み込み位置のindexrecvx、ロックを取得するためのmutexlock、チャンネルにread/write待ちのgoroutine情報のリストrecvqsendq、チャンネルがclosedかどうかのフラグなどのフィールドを持っています。

(struct{}チャンネルで使っているHchan(hchanの仲間)は、環状バッファもsend/recvもなく、チャンネルに書き込まれている値の数を記録している唯一のカウンターだけで状態を管理しています。full/emptyのコールもこのカウンターの値だけを参照します)。

それぞれのフィールドの意味を掴めるために、ここでgoroutinesが(バッファ有り)チャンネルに読み込み・書き込みする時の大まかな処理フローを説明します。

goroutinesによるチャンネルへの読み込み・書き込みの処理例

// 1つ目のGo routine
func main(){
...
  awesomeChan := make(chan int, 11);  // bufferサイズ11のチャンネルを作成
  for _, awesomeNumber := range awesomeNumber {
    awesomeChan <- task
  }


...
}

// 2つ目のGo routine
func processNumbers(channelToGetFrom <-chan string) {
  for {
    awesomeNumber := <- channelToGetFrom
    doStuffWithNumber(awesomeNumber)
  }
}

処理の概要
Case 1) synchronousで1対1の読み込み・書き込みの時の処理

はじめに一つ目のgoroutinemain関数が我がチャンネルに何かを送る。
1. mainがawesomeChanの中のhchanのロックを取得
2. hchanのバッファに値を追加してくれる(実際にバッファーのメモリー中にのコピーを行う)
3. 追加後、ロックを解除する

今度、受け取り側のgoroutineprocessNumbers関数がmainから送られてきた値を受け取る。
1. processNumbersがawesomeChanの中のhchanのロックを取得
2. hchanのバッファから値を取り出す(awesomeNumber変数の指しているメモリー位置の中に値をコピーする)
3. ロックを解除する

*チャンネルから読み込み、チャンネルへの書き込み時ともにメモリーのコピー(上記の2.)が行われるので、hchanのmutexによって保護されているバッファ以外、二つのgoroutineが共有するメモリーがない。これはgoroutine同士のメモリー安全性を担保できる一つのメカニズムます。

Case 2) bufferの容量がいっぱいになった時の処理

受け取り側の関数の処理時間が長かったり、チャンネルのバッファへの値の書き込み速度に追いつかなかったりする状況があったとします。

  1. バッファの容量がいっぱいになりバッファに書き込みできなかった
  2. main関数の実行が止まって待つ(blocking処理)
  3. hchanバッファの容量に開きが出たらmain関数の処理を再開

このblocking処理はどう行われるかと言うと、

→ ここでsudogとruntime schedulerくんの出番です。

  • sudog型のstructをallocateする→goroutineの待ち状況・再開条件をsudog型を使ってチャンネルの中に書き込み→実行をブロックする旨をスケジューラーくんに伝えます。

  • 次にチャンネルから読み込むgoroutineが現れたら、処理をresumeする。

sudogを見てみる:

( go/src/runtime/runtime2.go )

type sudog struct {
    // The following fields are protected by the hchan.lock of the
    // channel this sudog is blocking on. shrinkstack depends on
    // this for sudogs involved in channel ops.

    g *g

    // isSelect indicates g is participating in a select, so
    // g.selectDone must be CAS'd to win the wake-up race.
    isSelect bool
    next     *sudog
    prev     *sudog
    elem     unsafe.Pointer // data element (may point to stack)

    // The following fields are never accessed concurrently.
    // For channels, waitlink is only accessed by g.
    // For semaphores, all fields (including the ones above)
    // are only accessed when holding a semaRoot lock.

    acquiretime int64
    releasetime int64
    ticket      uint32
    parent      *sudog // semaRoot binary tree
    waitlink    *sudog // g.waiting list or semaRoot
    waittail    *sudog // semaRoot
    c           *hchan // channel
}

sudogは待っているgoroutineへのポインタg、そして待たれているelem(書き込み・読み取り待ち値)へのポインターelemを持っています。

sudogを作成しチャンネルのsendq(送り待ちのqueue)のリストに追加することで、チャンネルに対してrecvを行うGoRoutineが現れた時に、gの処理を再開できます。

recv関数を見てみる:

( go/src/runtime/chan.go )

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if c.dataqsiz == 0 {
        if raceenabled {
            racesync(c, sg)
        }
        if ep != nil {
            // copy data from sender
            recvDirect(c.elemtype, sg, ep)
        }
    } else {
        // Queue is full. Take the item at the
        // head of the queue. Make the sender enqueue
        // its item at the tail of the queue. Since the
        // queue is full, those are both the same slot.
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
            raceacquireg(sg.g, qp)
            racereleaseg(sg.g, qp)
        }
        // copy data from queue to receiver
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        // copy data from sender to queue
        typedmemmove(c.elemtype, qp, sg.elem)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
    sg.elem = nil
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    goready(gp, skip+1)
}

sendx, recvxでチャンネルにデータの挿入・取り出し位置を管理する。バッファーのあるhchanのqueueの中に値を入れていくと、sendxとrecvxが一緒にincrementされ、値を取り出すと受け取りのindexがincrementされる。

チャンネルのバッファがいっぱいで更にsendqにも値が入っている場合、sendqにあるsudogsgを外し、待っている値をバッファに追加する:

// copy data from sender to queue
        typedmemmove(c.elemtype, qp, sg.elem)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz

その後、待っていたgoroutineが実行可能になったことをgoreadyコールでスケジューラーに知らせます:

    sg.elem = nil   // sudogで処理待ちの値を外す
    gp := sg.g   // sudogで処理待ちgoroutine
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    goready(gp, skip+1)  // 処理待ちgoroutineを実行可能にする

スケジューラーがgoroutineをrunqueueに追加することで、今度は処理が再開されます。

*Goのスレッドについて

GoRoutineはGoランタイムによって作成されているユーザー空間の軽量スレッドで、GoのランタイムスケジューラーによってOSスレッドの上にマルチプレックスされる。GoRoutineを走らせるために、OSのスレッドがスケジューリングcontext(実行可能なgoroutineのrunqueue)を持ち、それぞれがそこから随時実行するGoRoutineを選ぶ。

そのため、上記のgoready, goparkコールで、スケジューラーにスケジューリングコンテキストのrunqueueにそれぞれのgoroutineを追加することができます。goparkシグナルでgoroutineが実行されていたOSのスレッドも解放され、スケジューラーのrunqueueから他のgoroutineを実行できます。

*これによってgoroutinesはblockされることがありながらも、作成も実行ものリソースがかかるOSのスレッドは中断されずにすむ

Case 3) bufferの中に値がないのに受け取りを待っている関数がある
  1. processNumbersがawesomeChanから読もうとするができない
  2. processNumbersがsudogを作り、resume状態を登録し、そのsudogをチャンネルのrecvqの中に入れる
  3. processNumbersがスケジューラーに(gopark自分)シグナルを送り、チャンネルにsendがあるまでブロックされる
  4. mainが値をバッファー内に書き込む→スケジューラーがprocessNumberをresume可能と判断するか、受け取り変数のメモリー位置に直接値を書き込む(mainがprocessNumberのスタックに直接書き込む)

読み込み待ちの場合は自分の情報をチャンネルのrecvq(読み取り待ちのqueue)に追加する。

これでsudogを利用し、blockされている読み込み待ちのgoroutineを解放するsend関数を見てみる:

( go/src/runtime/chan.go )

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if raceenabled {
        if c.dataqsiz == 0 {
            racesync(c, sg)
        } else {
            // Pretend we go through the buffer, even though
            // we copy directly. Note that we need to increment
            // the head/tail locations only when raceenabled.
            qp := chanbuf(c, c.recvx)
            raceacquire(qp)
            racerelease(qp)
            raceacquireg(sg.g, qp)
            racereleaseg(sg.g, qp)
            c.recvx++
            if c.recvx == c.dataqsiz {
                c.recvx = 0
            }
            c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
        }
    }
    if sg.elem != nil {
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    goready(gp, skip+1)
}

まず、send関数はhchan, sudogへのポインターを引数として受け取る。

if sg.elem != nil {
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }

sudogにsend待ちの値がある時(hchanのバッファがいっぱいの時)、sendDirectでstackに送られます。

( go/src/runtime/chan.go )

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    // src is on our stack, dst is a slot on another stack.

    // Once we read sg.elem out of sg, it will no longer
    // be updated if the destination's stack gets copied (shrunk).
    // So make sure that no preemption points can happen between read & use.
    dst := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    // No need for cgo write barrier checks because dst is always
    // Go memory.
    memmove(dst, src, t.size)
}

次は、

gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    ...
    goready(gp, skip+1)

stackロックが解除され、スケジューラーに対してsendを待っていたgp関数をgoreadyシグナルでスケジューリング可能とさせます。

チャンネルのgoroutine・メモリー安全性をなぜ担保できているか、実装から見える

  • チャンネルはただのポインターであるため、Fxn同士でそのまま渡せる(チャンネルへのぽいんたーを渡す必要ない)
  • そのままを渡しても、二つのFxnでも同じバッファーを参照します(deepcopyなど行われない)
  • goroutineは全員自分自身のスタックを持っている
  • チャンネルでの処理以外、goroutineのスタック同士で読み込み・書き込みをすることはない (↑受け取り待ちの変数がある時に、ロックを取得する必要なく、受け取り関数のメモリーに直接コピーをすることが可能なので、メモリーコピーとロック処理を一個減らすためのものだそうです。賢い。)
  • 一つのg(goroutineへのポインター)は複数の待ち行列(sudog)に登録できる

チャンネルが行える他のオペレーション

  • close (channelを閉めるとmutexをロック→closedフラグを設定し→待っている全てのgoroutineを解放させるの一連の作業が行われる)
  • select (関連チャンネルをシャッフル→メッセージ受け取り・読み込み可能なものがあるか確認→全てをブロックする。runtime/select.goの中で実装されている)

こちらを含めて実装の他の多くのことは省略しましたので、興味ある方は是非runtimeのソースコードを眺めてみてください。

runtime/chan.goの中身

chan.goの中で定義されている主な内容。
他にも関連している関数はruntime/runtime2.go, runtime/type.goなどにも登場されます。

const:
- maxAlign
- hchansize
- debugChan

struct:
- hchan
- waitq

func:
- reflect_makechan
- makechan64
- makechan
- chanbuf
- full
- chansend1
- chansend
- send
- sendDirect
- recvDirect
- closechan
- empty
- chanrecv, chanrecv1, chanrecv2
- recv
- chanparkcommit
- selectnbsend
- selectnbrecv, 2
- reflect_chansend, reflect_chanrecv, reflect_chanlenなど
- enqueue, dequeue
- raceaddr
- racesync
15
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
15
9