私が以前Haskellで作成した(割と自己満足な)kazura-queueという比較的高速なキューのライブラリがあります。
このライブラリを作るときに考えたアレコレを書いてみたいと思います。
前提
ここでいうキューって何?
データ構造としてのキュー(Sequenceみたいな)や分散処理用のキュー?(RabbitMQみたいな)ではなく、並行処理のためにスレッド間の部品として使うようなキュー(Chan(やTQueueやTChan)みたいな)を意図しています。
ここではChanが持っているキューとしての性質をざっと書き出してみましょう。
- キューに入るデータの型は任意
-
Chan a
という型で表現できる範囲で好きなデータを入れることができる。
-
- FIFO
- キューなのだから当たり前といえば当たり前かもしれませんが一応。
- スレッドセーフ
- 読み出しを「待つ」ことが可能
- キューが空の時に読み出しを行おうとすると、そのスレッドはブロックされる。その状態で新たなデータが書き込まれるとブロックされたスレッドが起きる。
- (Chanは提供していませんが)読み出しを待たずに失敗して返るAPIもある方が望ましい。
- 非同期例外への耐性
- 書き込み/読み出し/読み出し待ち中のスレッドに対して非同期例外が飛んでも問題が起きない。1
- 不要な参照を持たない
- キューから読み出したデータへの不要な参照は持たず、(他の参照がなければ)すぐにGC可能になる。
- 待ち順の公平性
- 複数のスレッドが読み出しを待っているときにデータを書き込むと、「待ち」に入った順番にデータを読み出す。
Chanはキューとして使うこともできますが、単純なキューではなく「チャネル」として利用する機能も持っています。ここでいう「チャネル」は必ずしも一般的な言葉ではない気がしますが「書き込まれたデータを複数のキューに書き込まれたかのように読み出す機能を持っているもの」としておきます。(言い換えるとdupChanが使えるものが「チャネル」)
しかし、本記事では単純なキューとしてだけ考えることにします。上に列挙した性質も、
単純なキューとして一連の値を読み出すケースを想定して性質を書き出しています。
これらの性質がいつでも全て必要になるわけではありません。(実際TQueueやTChanにはない性質もあります)
しかし、これらの性質は多くの場合あった方が望ましいです。また、使う側としても、使おうとしているキューがどの性質を持っているのかは考えておいた方がよいでしょう。
高速って何?
ひとくちに高速といっても、速度には複数の尺度があり得ます。例えば以下です。
- 単位時間当たりに処理できるアイテムが多い。
- 書き込みから読み出しまでの時間が短い。
一つの尺度に絞って最高速を目指すということもできますが、高速さを求めるときに欲しいものは、往々にして「どの尺度でもそこそこ速い」ものであったりします。しかし、一方で尺度を定めなければ速度を計測することも難しくなります。
ここでは最終的にどのようなベンチマークで測定するか、という形で以下のように表現しておきます。
ここでは欲しい性能を以下のようにしておきます。
- 以下の状況で一定(大量)のデータを処理するのにかかる時間が短いこと
- 書き込みの頻度が高く、キューの中身は空ではないことが多い状況
- キューはひとつではなく、複数のキューが同時に稼働しているような状況
- ただし、他の状況でもできるだけ速くしたい
- 比較対象はChanやTQueueやTChan
また、ここでもうひとつしておかなければいけない話があります。
それは、 多くの場合、ChanやTQueueやTChanは十分に高速なので、より高速なキューを使用する必要はほとんどない ということです。
そもそもキューが律速因子になることはそんなにありません。必要以上の高速さを求めてバグに悩まされるよりも、手軽に確実なものを利用する方がよいと思います。
バージョンや環境など
本記事の内容は主に以下の環境で確認しています。
- stackage: lts-7.13 (ghc-8.0.1)
- OS: Windows10
ChanとMVarについて
高速なキューの話をする前に、まずは比較対象としてChan及びその重要な構成要素であるMVarについて簡単に説明します。
詳しく知りたい方は『Haskellによる並列・並行プログラミング』などを読むとよいでしょう。
キューの例としてはTQueueやTChanもあげていますが、この二つはstmを使用しているため、今回扱うライブラリとは勝手が違います。このため解説からは省略します。
MVar
MVarはロックと「待つ」機能のついたデータの入れ物です。データが入っている状態と空の状態があり、putMVarでデータを書き込み、takeMVarでデータを取り出して空にします。
空の状態でtakeMVar、もしくはデータがある状態でputMVarすると、それができる状態になるまでスレッドがブロックして待ち状態になります。
待ちには公平性があり複数のスレッドがブロックした場合、ブロックした順番で起きます。
その他、データがある状態を待って空にはせずにデータを読み出すのみのreadMVarといった操作も可能です。
MVarに対する操作は、(例えばIORefのような)単純なメモリの読み書きと比較すると、重い処理になります。
GHCにおいてスレッド間の待ち合わせを行う方法はいくつか存在しますが、基本的にはMVarかstm2を使います。stmは非常に便利ですが、他の部品も含めてstmを多用するのでなければ、MVarを用いることが多いようです。
Chan
まずはChanの型 を見てみましょう。
UNPACKやBangやderivingを取り除くと以下のようになっています。
data Chan a = Chan (MVar (Stream a)) (MVar (Stream a))
type Stream a = MVar (ChItem a)
data ChItem a = ChItem a (Stream a)
キューに書き込まれたデータを保持するのはStream
(MVar)の中のChItem
です。ChItem
は続くStream
への参照を持っており、一方向リストになっています。
Chan
はStream
を格納したMVarを二つ保持しており、二つのMVarは一連のStream
の別の場所(もしくは同じ場所)を参照しています。一つ目のMVarは読み出し用で、Stream
の次に読み出す位置への参照を持っています。二つ目のMVarは書き込み用で、Stream
の次に書き込む位置(Stream
の先頭)への参照を持っています。
書き込み処理(writeChan) では、読み出し用MVarから書き込む位置のStream
を取り出し、Stream
のMVarにChItem
を書き込みます。
読み出し処理(readChan) では、書き込み用MVarから読み出す位置のStream
を取り出し、Stream
のMVarからChItem
を読み出します。
処理をよく理解すると、書き込み用、読み出し用、及びStream
内の各MVarはそれぞれ別のスレッドセーフティのために使われていることがわかります。
- 複数のスレッドによる同時の読み出し(readChan)を制御するために、読み出し用
Stream
は読み出し用のMVarに格納されています。 - 複数のスレッドによる同時の書き込み(writeChan)を制御するために、書き込み用
Stream
は書き込み用のMVarに格納されています。 - 複数のスレッドによる同時の書き込み(writeChan)と読み出し(readChan)を制御するために、
Stream
の各ChItem
はMVarに格納されています。
書き込みと読み出しの操作は、それぞれ2回MVarに対する操作が必要になっており、これがChanの処理に必要な時間の大部分を占めていると考えられます。
unagi-chan
次にChanなどより高速なキューを実装したunagi-chan(うなぎちゃん)というなんだか可愛げな名前のライブラリについて紹介します。
unagi-chanの提供するキューは、hackageのページのグラフを見ればわかる通り速度に関してはかなり優れています。ただし、後述する通りいくつかの難を抱えています。
unagi-chanはデータの種類や用途に応じて最適化された複数の種類のキューを提供しているのですが、以降では汎用的に使用できるControl.Concurrent.Chan.Unagi(以降はChan.Unagiと記載)について見ていきます。
Chan.Unagiが早い理由
Chan.Unagiは多数の高速化のためのアイデアを実装しています。
しかし、その中で最も効果を発揮しているのは以下の点です。
- CAS(Compare-and-Swap)ではなくFetch-and-Addを使用し、できるだけ「処理のやり直し」が発生しないようにしている
CASとFetch-and-Add
リンク先のWikipediaを見てもらった方が早いかもしれませんが、CASとFetch-and-Addについて軽く紹介します。
- CAS(Compare-and-Swap)は、対象のメモリの値が指定された値と同じ場合にのみ書き換える操作をアトミックに行うCPU命令です。値が異なる場合には書き換えず、処理は失敗します。
- Fetch-and-Addは、対象のメモリの値に指定された値を足すという操作をアトミックに行うCPU命令です。
まず前提として、複数のスレッド間での並行処理を問題が起きないように行うには、何らかの特別な操作を行うCPU命令が必要になります。
通常はCASの方が便利かつ普及しているなので、こちらを使います。
CASは、複数スレッドの処理が競合した場合には失敗しますが、その場合、新しい値に基づいて処理をやりなおす必要があります。
これに対して、Fetch-and-Addは失敗せず、必ず値を更新することができるので、やりなおしのコストが発生しません。
このため、処理内容によってはFetch-and-Addを使うことで性能が向上することがあります。
ただし、CPU命令なので、Fetch-and-AddをサポートしていないCPUでは使えません。x86系CPUではFetch-and-Addをサポートしているため、利用することができます。
GHCにおけるCASとFetch-and-Addの利用
GHCのランタイム実装では並行処理のための基本的な部分でCASを利用しています。MVarの操作内でもCASが利用されています。
しかし、atomic-primopsというライブラリを使用することで、Fetch-and-AddをHaskellのコードから利用することができるようになります。(x86以外のCPUでは、Fetch-and-Addが使えないため、CASを利用してFetch-and-Addと同様の処理を実装したものが自動的に使われるようになっています)
追記:GHC7.8以降はatomic-primopsを使わなくてもGHC.PrimでFetch-and-Addを利用できる(fetchAddIntArray)ようです。
Chan.Unagiの仕組み
では、Chan.UnagiがFetch-and-Addをどのように使用するのか見ていきましょう。
Chan.Unagiでは書き込まれたデータを保持するための領域としてMutableArrayを用意しています。(ソースコード)
MutableArrayの長さは1024で、領域が足りなくなる前に次のMutableArrayを確保していきます。
MutableArrayとは別に書き込み用のカウンタと読み出し用のカウンタを保持しており、このカウンタの値が、一連のMutableArray上の座標を表しています。
書き込み及び読み出しの操作ではそれぞれのカウンタをFetch-and-Addでインクリメントしつつカウンタの値を取得し、カウンタの値の示す場所にデータを書き込み/読み出しに行きます。
- 書き込み操作同士の競合
- 書き込みを行う各スレッドは異なるカウンタ値を取得するため、書き込みスレッド同士がMutableArrayへのデータの書き込みで競合することはありません。
- 後続のMutableArrayの確保するのを待つ処理で競合することはあり得ますが、書き込みの頻度と比較すると少ないため、あまり問題にはなりません。
- 読み出し操作同士の競合
- Chan.Unagiをチャネルではなくキューとしてのみ見た場合、読み出し操作についても書き込み操作と同様、MutableArrayの異なる場所を見に行くため、競合することはありません。
- キューではなくチャネルとして見た場合、複数のスレッドが同時にMutableArrayの同じ場所の値を読み出そうとすることはありえます。
- 書き込み操作と読み出し操作の競合
- 書き込みスレッドと読み出しスレッドの処理が競合することはあります。詳しくは後述します。
MutableArrayに格納するデータは以下のように定義されています。ソースコード
data Cell a = Empty | Written a | Blocking !(MVar a)
それぞれ、以下の状態を表しています。
-
Empty
: まだ値が書き込まれていない状態 -
Written
: (読み出しスレッドが読み出そうとするより先に)値が書き込まれた状態 -
Blocking
: (書き込みスレッドが書き込むより先に)読み出しスレッドが値を読み出そうとした状態
書き込み及び読み出しは以下のように行われます。
- 書き込み時の動作
- 読み出し時の動作
書き込みスレッドだけでなく、読み出しスレッドもMutableArrayに値を書き込むことになります。同時に書き込みを行ってしまわないようにするため、MutableArrayへの書き込みはCASで行います。
実のところCellではなく、直接MVarがMutableArrayの中身であっても機能的には問題はないと思われます。しかし、MVarの操作は単純なMutableArrayへの書き込み/読み出し操作と比較して遅いため、できる限りMVarを使用しないようにしているのではないかと思われます。
Chan.Unagiの欠点
上記のような仕組みによって高速に動作するChan.Unagiですが、以下の欠点があります。
- 読み出しを行うスレッドに対する非同期例外の扱いが面倒
- データがすぐにGCされない
それぞれについて詳しく見ていきます。
読み出しを行うスレッドに対する非同期例外の扱いが面倒
書き込み・読み出しの各動作と非同期例外が絡んだシーンについて考えていきます。
mask
まず前提知識としてmaskについて軽く説明します。より詳細に知りたい場合はControl.Exceptionのドキュメントもしくは『Haskellによる並列・並行プログラミング』を読んでください。
- maskされた処理を行っている最中は、非同期例外が投げられても、即座に処理を中止することはせず、maskが解除された時点で例外が処理される。
- ただし、maskされた処理の最中であっても、当該スレッドがブロック状態(例えばMVarの値を待っている状態など)であれば、例外が処理される。
- ブロック状態でも例外を処理したくない場合はuninterruptibleMaskを使用する。
書き込み(writeChan)
writeChanはChan.Unagiのへの書き込みを行う関数です。
writeChanの中身の処理はmaskされており、ブロックもしないため特に問題は発生しません。ソースコード(実際に使われているのはmaskではなくmask_になっています)
読み出し(readChan)
readChanはChan.Unagiの読み出しを行う関数です。
readChanのドキュメントには「readChanでは非同期例外が投げられるとデータがロストする」と書かれています。
ソースコード で確認すると、なんとmaskされていません。
しかし、ロストする原因はmaskをさぼっているからではありません。maskしたとしてもロストは発生します。
例えば以下のようなケースを考えてみましょう。
- 空のChan.Unagiを用意する
- 読み出し用のスレッドにて、maskした状態でreadChanを呼び出す。読み出し用のスレッドは1つ目のデータの書き込みを待ってブロック状態になる
- 読み出し用のスレッドに非同期例外を投げる。読み出し用スレッドはブロック状態のため、maskされた状態でも非同期例外が処理される
- 書き込み用のスレッドにてwriteChanで書き込みを行う。1つ目のデータが書き込まれる
- 読み出し用スレッドにて、再びreadChanを呼び出す。手順2で読み出し用カウンタは既に回った後であるため、このreadChanは2つ目のデータの書き込みを待ってブロック状態になる
読み出し用カウンタは進む一方なので、手順4にて書き込んだ1つ目のデータは二度と読み出されることはありません。このデータはロストしたように観測されてしまいます。
Chan.UnagiはカウンタをFetch-and-Addで回しつつ値を取得することで、読み出しを試みた処理ごとに異なる座標の値を読み出そうとすることで競合をなくして高速化する仕組みでした。つまり、同じ座標を読み出しに行こうとする操作はたった一つしか存在しないのです。このため、読み出し座標が決定された以降の処理で非同期例外を受けると、その座標の値を読み出すことができなくなってしまいます。
readChanがmaskを使用しないのは、仮にmaskしたとしてもロストは発生し、maskせずともそれ以外の不整合が発生しないため、無駄なコストの発生するmaskを使用していないのだと考えられます。
readChanOnException
データがロストしては困るという場合に用意されているのが、readChanOnExceptionです。
非同期例外が投げられた場合は、その読み出し処理が読み出すはずだった値を読み出すアクション(IO a)を受け取るハンドラを渡すことで非同期例外時にもデータがロストしないようにすることができるというものです。
readChanOnExceptionを使うことで大抵の場合は対応可能なように思われます。しかし、そもそもこのようなデータのロストが発生しないChanやTQueueやTChan4などと比較すると、より多くのことを考えて使用する必要があります。
データがすぐにGCされない
Chanの性質として「不要な参照を持たず、(他の参照がなければ)すぐにGC可能」というものをあげました。しかしChan.Unagiでは、そうではありません。他の参照がなくなってもGCできないケースがあります。
Chan.UnagiではMutableArrayにデータを書き込みます。書き込まれたデータは読み出された後もMutableArrayに残り続けます。このため、MutableArray自体が誰にも参照されなくなってGCされるまで、書き込まれたデータはGCできない状態になります。
MutableArrayへの参照がなくなるのは、読み出し側がそのMutableArrayの全てのデータを読み終えて次のMutableArrayまで到達したときです。
Chan.Unagiで使用しているMutableArrayの長さは1024なので、最悪の場合1000個以上のデータが書き込まれて読み出されるまで、書き込まれたデータはGCされなくなってしまいます。
定常的に大量のデータが書き込まれ、読み出されるような使い方であれば、これが問題になるようなことはないかもしれません。しかし、データの書き込み頻度が一定でない場合などには、Chan.Unagiの利用によって、不要にメモリを圧迫してしまう可能性があるのです。
ところで、なぜChan.UnagiではMutableArrayが書き込まれたデータを保持し続けているのでしょうか。単純なキューとして使うのであれば、読み出し処理の最後でMutableArray内のデータを削除してしまえばよいはずです。しかし、Chan.Unagiはチャネルであるため、同じデータを読み出そうとするスレッドが複数存在しえます。全ての読み出しスレッドがそのデータを読み終えたならば削除しても問題ないはずですが、Chan.UnagiではMutableArrayの各座標のデータを誰が読み終えたかという管理はしていません。もしそのような管理を行うとその管理コストのために、Chan.Unagiのメリットである高速さを失ってしまうことになります。
なお、Chanでは、MutableArrayは使用せず、1つのデータにつき1つのMVarを使用しているためこの問題は発生しません。(Chan.UnagiのMutableArrayの長さが1である状態とほぼ同じと考えるとわかりやすいと思います)
まとめ
前編では、キューの要件や、比較的高速なキューであるChan.Unagiの仕組み、およびその欠点について解説しました。
後編では、Chan.Unagiの欠点を克服したキューを作る方法について解説する予定です。
-
stmについてはここでは説明しません。『Haskellによる並列・並行プログラミング』を読んでください。 ↩
-
Chan.Unagiをチャネルとして利用した場合、他のスレッドにより読み出し操作で
Blocking
に書き換えられることによりこのケースが発生します。単純にキューとしてのみ利用した場合にはこのケースは発生しえません。 ↩ -
ChanやTQueueやTChanであっても、ちょうど読み出した直後のタイミングで非同期例外が投げられた場合に値がロストしないようにするにはmaskなどを使う必要があります。 ↩