5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

第二のドワンゴAdvent Calendar 2020

Day 13

golangのchannelの実装を読んでみた

Last updated at Posted at 2020-12-28

以前、きたるCPU10000コア時代に向けて近年で一番流行ったconcurrencyのおもちゃであるchannelをC++に実装した
( C++20 で goroutine/Channel みたいなことをやるライブラリ作った )
その際、golangのchannelを実装を読んだので実装について解説する

Channelとは?

機能的にはthreadでblocking_queueとか言われてるものと大体同じである

以下のような特徴を持つ

  • (thread+mutexベースではなく)goroutine上で動く
  • 有限のキューを用いる
  • キューの要素数は0を許容する
  • close機能がある

goroutineとは?

どうしてもgoroutineという言葉の定義が見つけられなかったが、goroutineという言葉はおおよそ下の2つの意味で使われていると思う

  1. なんかいい感じに実行・中断・再開してくれるgolangの軽量スレッドのこと
  2. golangのスケジューラのうちasyncのTask相当の、「実行できるもの」

なお、goroutineはソースコード上はg と省略される

参考

読む

早速channelの実装を見ていく
https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go

channelの構造体定義

読み進めると分かるが、大体下のような構造になっている

struct chan{
  std::deque<value_type> value_queue; // 本当はRingBufferだけどまぁ大体同じでしょ
  std::list<RecvSudog> recv_queue;
  std::list<SendSudog> send_queue;
  bool closed = false;
  std::mutex mutex;
};

channelは上のように主に3つのキューで出来ている
value_queueは値を保存しておくqueueである
send/recv_queueは送受信まちのgoroutineを保存しておくためのキューである

chanの生成

ここではchannelの生成を行っている
キューの要素or要素の型のサイズが0であるときvalue_queueを小さくする最適化をやってるが、それ以外面白いとこはないので読み飛ばす

送信処理の実装

送信処理の挙動は以下である

  1. channelがnilチャネルだったりclose済みの場合、何もしなかったり死んだりする
  2. recv_queueに受信待ちのgoroutineが存在する場合、そのgoroutineに要素を送信する
  3. value_queueに空きがある場合、value_queueに要素をpushする
  4. 自身のgoroutineをsend_queueにenqueueし、goroutineを中断する

これを頭に入れてからソースを読むと読みやすくなる。
入れたと思うのでさっそく読んでいく

上の方:https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go#L158-L200
mutex(OSのlock)を用いず、atomic命令だけで実装できる処理や最適化を行っている。

recv_queueに要素がある場合:https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go#L293
recv_queueからgoroutineをひとつ取り出し、値をgoroutineに渡し、channelのlockを解除し、取り出したgoroutineを再開可能(≒thread_poolにpost)にする

ここで注目すべきはlock解除後にgoroutineを再開可能を再開可能にしている点である。これは
「goroutineのスタックサイズの変更処理時にchannelのlock取ろうとしてdeadlockになる」という問題に起因している。
詳しくはこの辺のissueを見てほしい https://github.com/golang/go/issues/12967

value_queueに空きがある場合:https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go#L214
解説するほど面白いとこはないので各自実装読んでください

送信待ちqueueにenqueueする場合:https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go#L258

gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

ここのgopark()(https://github.com/golang/go/blob/fd6ba1c8a23d8a3fffb6c475b21f78510152ef5c/src/runtime/proc.go#L319) による中断とlockの解放について説明する

goparkを呼び出すと

  1. 現在のgoroutineを中断する
  2. goroutineの外(machine:goroutineとは?の項を参照)でlockの開放処理を行う

という流れになる。

なぜ「oroutineの外(machine)でlockの開放処理を行う」というようなややこしい処理をしているかというと、
先にchannelのlockを開放してした場合、他の処理がchannelを使用することによりgoroutineが中断される前にgoroutineが再開させられてしまう可能性があるためである

これを防ぐにはgoroutineが同時実行されないようにgoroutineにmutexをつければよいのだが、その場合追加mutexコストが必要になる
それを防ぐために現在のような実装になってると思われる

受信処理・closeの実装

送信処理と大体おなじなので各自読んでください

selectの実装

selectは(なぜか)別ファイルに分かれている

selectの挙動は以下である

  1. (nil channelやdefaultを含めて)即時実行可能なものがあればその操作を実行する
  2. selectの対象になるすべてのChannelに行いたい操作を実行待ちqueue(send|recv_queue)にenqueueし、中断する
  3. いずれかのchannelが実行され、goroutineが再開される
  4. enqueueしたもののうち、実行しなかったものをキャンセルする

これを頭に入れてからソースを読むと読みやすくなる。入れたと思うのでさっそく読んでいく

pollorderの決定や各channelのlock獲得処理:https://github.com/golang/go/blob/c06a354bccf60ea32ed74238be409a00aac292c5/src/runtime/select.go#L121-L230

pollorderは「即時実行可能なものがあればその操作を実行する」さい、どの順序で探索するか?を決定するlistである
実装を見ると分かるが、その順序はランダムになっている
これは非同期処理のエミュレートである

lockの獲得についてはchannelの構造体のメモリアドレスの順にlockを獲得するようにしている
これは複数のchannnelのlockを獲得する処理が複数存在する場合、全員が同じ順序でlockを獲得しないとdeadlockになる恐れがあるためである

実行可能なものの探索:https://github.com/golang/go/blob/c06a354bccf60ea32ed74238be409a00aac292c5/src/runtime/select.go#L243

// pass 1 - look for something already waiting

のコメントの通り実行可能な操作を探して見つけ次第実行する

すべてのchannelへのenqueue:https://github.com/golang/go/blob/c06a354bccf60ea32ed74238be409a00aac292c5/src/runtime/select.go#L288-L327

// pass 2 - enqueue on all chans

すべてのchannelにenqueueして中断する

実行・再開:https://github.com/golang/go/blob/c06a354bccf60ea32ed74238be409a00aac292c5/src/runtime/select.go#L327
いずれかのchannelによって再開されるとpass3が実行される
https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go#L819
実行時の処理として、dequeue時にg.selectDoneに対しCAS操作を行うことで複数のchannelがselectの処理を実行しないようにしている

再開後のあと処理:https://github.com/golang/go/blob/c06a354bccf60ea32ed74238be409a00aac292c5/src/runtime/select.go#L336

// pass 3 - dequeue from unsuccessful chans
// otherwise they stack up on quiet channels
// record the successful case, if any.
// We singly-linked up the SudoGs in lock order.

pass3では pass2でenqueueしたもののうち、使用しなかったものをキューから削除する

おわりに

golangのchannelはよくよく見ていくと割とややこしい処理をしている

5
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?