以前、きたるCPU10000コア時代に向けて近年で一番流行ったconcurrencyのおもちゃであるchannelをC++に実装した
( C++20 で goroutine/Channel みたいなことをやるライブラリ作った )
その際、golangのchannelを実装を読んだので実装について解説する
Channelとは?
機能的にはthreadでblocking_queueとか言われてるものと大体同じである
以下のような特徴を持つ
- (thread+mutexベースではなく)goroutine上で動く
- 有限のキューを用いる
- キューの要素数は0を許容する
- close機能がある
goroutineとは?
どうしてもgoroutineという言葉の定義が見つけられなかったが、goroutineという言葉はおおよそ下の2つの意味で使われていると思う
- なんかいい感じに実行・中断・再開してくれるgolangの軽量スレッドのこと
- golangのスケジューラのうちasyncのTask相当の、「実行できるもの」
なお、goroutineはソースコード上はg
と省略される
参考
-
https://golang.org/doc/faq#goroutines
- goroutineについて
-
https://morsmachine.dk/go-scheduler
- golangのスケジューラについて
-
https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/runtime2.go#L404
- goroutineの構造体について
読む
早速channelの実装を見ていく
https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go
channelの構造体定義
読み進めると分かるが、大体下のような構造になっている
struct chan{
std::deque<value_type> value_queue; // 本当はRingBufferだけどまぁ大体同じでしょ
std::list<RecvSudog> recv_queue;
std::list<SendSudog> send_queue;
bool closed = false;
std::mutex mutex;
};
channelは上のように主に3つのキューで出来ている
value_queue
は値を保存しておくqueueである
send/recv_queue
は送受信まちのgoroutineを保存しておくためのキューである
chanの生成
ここではchannelの生成を行っている
キューの要素or要素の型のサイズが0であるときvalue_queueを小さくする最適化をやってるが、それ以外面白いとこはないので読み飛ばす
送信処理の実装
送信処理の挙動は以下である
- channelがnilチャネルだったりclose済みの場合、何もしなかったり死んだりする
- recv_queueに受信待ちのgoroutineが存在する場合、そのgoroutineに要素を送信する
- value_queueに空きがある場合、value_queueに要素をpushする
- 自身のgoroutineをsend_queueにenqueueし、goroutineを中断する
これを頭に入れてからソースを読むと読みやすくなる。
入れたと思うのでさっそく読んでいく
上の方:https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go#L158-L200
mutex(OSのlock)を用いず、atomic命令だけで実装できる処理や最適化を行っている。
recv_queueに要素がある場合:https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go#L293
recv_queueからgoroutineをひとつ取り出し、値をgoroutineに渡し、channelのlockを解除し、取り出したgoroutineを再開可能(≒thread_poolにpost)にする
ここで注目すべきはlock解除後にgoroutineを再開可能を再開可能にしている点である。これは
「goroutineのスタックサイズの変更処理時にchannelのlock取ろうとしてdeadlockになる」という問題に起因している。
詳しくはこの辺のissueを見てほしい https://github.com/golang/go/issues/12967
value_queueに空きがある場合:https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go#L214
解説するほど面白いとこはないので各自実装読んでください
送信待ちqueueにenqueueする場合:https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go#L258
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
ここのgopark()
(https://github.com/golang/go/blob/fd6ba1c8a23d8a3fffb6c475b21f78510152ef5c/src/runtime/proc.go#L319) による中断とlockの解放について説明する
goparkを呼び出すと
- 現在のgoroutineを中断する
- goroutineの外(machine:goroutineとは?の項を参照)でlockの開放処理を行う
という流れになる。
なぜ「oroutineの外(machine)でlockの開放処理を行う」というようなややこしい処理をしているかというと、
先にchannelのlockを開放してした場合、他の処理がchannelを使用することによりgoroutineが中断される前にgoroutineが再開させられてしまう可能性があるためである
これを防ぐにはgoroutineが同時実行されないようにgoroutineにmutexをつければよいのだが、その場合追加mutexコストが必要になる
それを防ぐために現在のような実装になってると思われる
受信処理・closeの実装
送信処理と大体おなじなので各自読んでください
selectの実装
selectは(なぜか)別ファイルに分かれている
selectの挙動は以下である
- (nil channelやdefaultを含めて)即時実行可能なものがあればその操作を実行する
- selectの対象になるすべてのChannelに行いたい操作を実行待ちqueue(send|recv_queue)にenqueueし、中断する
- いずれかのchannelが実行され、goroutineが再開される
- enqueueしたもののうち、実行しなかったものをキャンセルする
これを頭に入れてからソースを読むと読みやすくなる。入れたと思うのでさっそく読んでいく
pollorderの決定や各channelのlock獲得処理:https://github.com/golang/go/blob/c06a354bccf60ea32ed74238be409a00aac292c5/src/runtime/select.go#L121-L230
pollorderは「即時実行可能なものがあればその操作を実行する」さい、どの順序で探索するか?を決定するlistである
実装を見ると分かるが、その順序はランダムになっている
これは非同期処理のエミュレートである
lockの獲得についてはchannelの構造体のメモリアドレスの順にlockを獲得するようにしている
これは複数のchannnelのlockを獲得する処理が複数存在する場合、全員が同じ順序でlockを獲得しないとdeadlockになる恐れがあるためである
// pass 1 - look for something already waiting
のコメントの通り実行可能な操作を探して見つけ次第実行する
すべてのchannelへのenqueue:https://github.com/golang/go/blob/c06a354bccf60ea32ed74238be409a00aac292c5/src/runtime/select.go#L288-L327
// pass 2 - enqueue on all chans
すべてのchannelにenqueueして中断する
実行・再開:https://github.com/golang/go/blob/c06a354bccf60ea32ed74238be409a00aac292c5/src/runtime/select.go#L327
いずれかのchannelによって再開されるとpass3が実行される
https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go#L819
実行時の処理として、dequeue時にg.selectDone
に対しCAS操作を行うことで複数のchannelがselectの処理を実行しないようにしている
// pass 3 - dequeue from unsuccessful chans
// otherwise they stack up on quiet channels
// record the successful case, if any.
// We singly-linked up the SudoGs in lock order.
pass3では pass2でenqueueしたもののうち、使用しなかったものをキューから削除する
おわりに
golangのchannelはよくよく見ていくと割とややこしい処理をしている