概要
Chainerは,ミニバッチの提供と学習の進捗情報管理を行う責務を持つIteratorクラス用のインターフェースを提供している.このインターフェースは,Datasetクラスを通じて取得した複数の学習データを用いてミニバッチを生成するクラスの実装に使用される.Chainer Version 6.5時点においては複数のIteratorクラス実装が提供されている.
本稿では,並列に学習データをロードし高速にミニバッチを編成できるMultiprocessIteratorについて解説する.なお,本稿で解説するChainerのバージョンは6.5.0を前提とする.
はじめに
MultiprocessIteratorは,Datasetクラスを通じて学習データを取得する処理を並列かつ非同期に実行する.この機構により,ミニバッチ生成速度のスループット向上が期待できる.
例えば,シーケンシャルに学習データを取得するSerialIteratorの__next__()
メソッドでは,以下のように学習データを順次ロードしてミニバッチを編成している.
batch = [self.dataset[index] for index in indices]
Datasetクラスによる学習データ1単位のロードに必要なコスト(以下ロードコスト
)が無視できるほど小さければ,シーケンシャルにロードしてもread I/Oによるパフォーマンス上の問題は生じない.例えばMNISTやCIFAR10/100を用いる場合は合計のファイルサイズが小さく,また単一のバイナリとして提供されているためパフォーマンス上の問題は生じないケースがほとんどである.
一方で,ImageNetのような個別の画像をそのまま学習データとしてロードする場合は,以下の理由によりロードコストが高くなる傾向にある.
- 訓練データはそれぞれが独立した画像ファイルであるため,学習データを1個ロードする度に
open
,read
,close
のシステムコールが毎回実行される - 個別のファイルをrandom readするのでシーケンシャルリードよりもストレージへのread I/O負荷が上昇する
- 画像ファイルからndarray objectに変換する計算コストは,一度PILなどの別クラスへの変換を経由するためpickleファイルをロードしてデシリアライズするよりも一般に高い
- 画像の枚数がtrainデータで枚数が128万枚,合計サイズが約136GiBとMNISTやCIFARと比べて多いため合計ロードコストが高い
ロードコストが高くなると比例してミニバッチ生成の所要時間も長期化する.その結果,学習処理の各iterationの冒頭にCPU上の処理によるオーバーヘッドが発生し,GPUをCPUのオーバーヘッドの分だけidle状態にする.その結果,訓練処理においてGPUの性能を活かしきれない.
この問題に対して,MultiprocessIteratorは以下のアプローチにより解決を図る.
- マルチプロセスで並列に訓練データをロードする
- 訓練データ処理とミニバッチ生成をオーバーラップさせる
このアプローチにより,ミニバッチ生成のスループットが向上し,さらに訓練処理とのオーバーラップにより1 iterationの所要時間は$max(Time_{forward} + Time_{backward},~Time_{minibatch~loading})$で決定される.どちらの時間で律速されるかは訓練データのファイルサイズ,モデルの計算量,使用するハードウェアの性能など様々な要因に左右されるため,MultiprocessIteratorの導入で訓練処理全体が高速化できるとは限らない.しかし,少なくとも$Time_{minibatch~loading}$の方が遅い場合は処理時間の改善が期待できる.
SerialIteratorとMultiprocessIteratorのパフォーマンス比較
過去に筆者が進捗報告で作ったスライドがあるのでこれを見てほしい.MultiprocessIteratorを使うことで,訓練処理中のGPUのカーネル発行間隔が明らかに狭くなり,学習時間が削減される効果が確認できる.
MultiprocessIteratorの使い方
以下にMultiprocessIteratorのオブジェクトを生成し,ミニバッチを取り出すコードのサンプルを示す.
from chainer.datasets.image_dataset import LabeledImageDataset
from chainer.iterators.multiprocess_iterator import MultiprocessIterator
dataset = chainer.LabeledImageDataset(pairs="/data/imagenet/train")
iterator = MultiprocessIterator(
dataset=dataset,
batch_size=32,
n_processes=2,
n_prefetch=10
)
mini_batch = iterator.__next__()
他のIterator実装と同様にdatasetやバッチサイズを指定してオブジェクトを生成する点は同じである.一方で,固有のパラメータとしてn_processes
とn_prefetch
を持つ.これらはDatasetからデータをロードするプロセス数と,メモリ上にキューイングするミニバッチの個数の上限値をそれぞれ指定する.
n_processes
は未指定の場合は利用可能なCPUコアをすべて使用しようとする.この挙動はmultiprocessing.Poolの仕様による.そのためChainerMNを用いた並列訓練を行う際,ローカルのMPIプロセスを複数使用する場合は,その計算ノードの合計CPUコア数以上のプロセスを起動してパフォーマンスが低下しないよう,適切なサイズのn_processes
を指定すべきである.
n_prefetch
は未指定の場合はデフォルトで1
が使用される.この場合,1個ミニバッチがキューに存在しているとそれ以上ミニバッチを生成しない(後述).基本的に1で問題ないと思われるが,何らかの理由で訓練データを多めにキューイングしておきたい場合は任意の数を設定する.
全コンストラクタの引数についてはリファレンスを参照されたい.
MultiprocessIteratorの処理内容
以下に筆者が作成したMultiprocessIteratorのシーケンス図を示す.Iteratorとしてのインターフェースは至ってシンプルであるが,その内部はスレッドとプロセスを組み合わせた複雑な構造になっていることが見て取れる.
MultiprocessIteratorは内部に_Communicator
と_PrefetchLoop
というクラスを保持し,実際のミニバッチ生成処理はこれらのクラスが担う.以下,それぞれのクラスについて簡単に解説する.
_Communicator
スレッドセーフQueueとして実装されており,_PrefetchLoopによって生成されたミニバッチの格納し, MultiprocessIterator.next()の内部でミニバッチを渡す役割を担う.格納は_Communicator.put(),取得は_Communicator#get()で実行される.前述したように,n_prefetchによって指定された上限数以上にミニバッチをput()によって格納しないよう,上限値を超えそうな場合はthreading.Conditionでwaitするように実装されている.また,ミニバッチが1つも存在しないにget()を呼んだも別のthreading.Conditionオブジェクトでwaitする.
_PrefetchLoop
MultiprocessIteratorのメイン処理である並列プロセスによる並列訓練データロードを実行する.起動時に別スレッドを立ち上げ,内部のループ処理でミニバッチ生成と_Communicatorオブジェクトへの格納を繰り返す.
高速化のために工夫されていると考えられる点として,Datasetクラスから取得されたデータを自前でシリアライズ,デシリアライズしている点が挙げられる.筆者が独自に実験したところ,numpy.ndarrayオブジェクトをmap_asyncで集約すると,暗黙的に内部で実行されるシリアライズ・デシリアライズにかなりのCPU時間を要することが分かった.正確な時間は覚えていないのだが,MultiprocessIterator自前のシリアライズ・デシリアライズを入れる場合と比べて一桁違う程度のCPU時間の差が生じた.このオーバーヘッドがmultiprocessing.Poolの実装により生じるのかは定かではないが,このオーバーヘッドを回避するために自前でpack,unpackを行っているものと考えられる.
また,見どころとしてmultiprocessing.Pool.map_async()
で複数プロセスにより並列に実行される関数_fetch_run
がグローバル関数として実装されている点が挙げられる.これは,試しにメンバ関数に変更してみれば確認できるのだが,_PrefetchLoopのオブジェクトにpickle化できないオブジェクトが含まれており_PrefetchLoopのメンバ関数として実装した関数をmap__async()の引数に渡すことができない.この問題を回避するためにグローバル関数として定義してmap_async()に指定しており,さらに_fetch_run()の内部で使用するDatasetオブジェクト等の変数もグローバル変数として定義されている.
関連研究
MultiprocessIteratorが採用しているアプローチと同様のアプローチは他の機械学習フレームワークでも採用されている.例えばTensorFlowにおいてもマルチスレッドによる並列ロード及びミニバッチのバッファリングを行うパイプラインを構築する機能が提供されており,その効果が実証されている [1].さらにread I/Oも高速化するために訓練データをメモリやローカルストレージにプリロードしながら並列ロードとバッファリングを行う手法の研究も存在する [2][3].その他,Pytorchのtorch.utils.data.DataLoaderクラスでもnum_workers
というパラメータでプロセス数を指定することで並列に訓練データのロードが可能となる.MXNetのmxnet.gluon.data.DataLoaderクラスも同様の指定が可能なパラメータを持つ.
全く異なるアプローチとして,同じ訓練データに異なるAugumentを適用した出力を繰り返し使用することで,生の訓練データをストレージからロードする合計回数を減らすことで訓練処理全体の高速化を図る研究も存在する [4].
おわりに
本稿ではChainerのMultiprocessIteratorの特徴とその内部構造について概略を説明した.Chainerを使用していて訓練データのロードがボトルネックになって困っていればMultiprocessIteratorの採用は1つの選択肢となり得ると考えられる.仮にCPUコア数が少ないサーバ上で実行する場合であっても,ミニバッチ生成処理のオーバーラップ化は最小で1プロセスだけ追加で生成できれば実現可能なので,試してみる価値はあると思われる.
最後に,自分の大学院での研究 [3など]でもMultiprocessIteratorのアプローチからは多くのことを参考にさせていただき,提案手法の実装でも活用させていただいた.この場を借りて御礼申し上げたい.
参考文献
-
[1] S. W. D. Chien et al., "Characterizing Deep-Learning I/O Workloads in TensorFlow", 2018 IEEE/ACM 3rd International Workshop on Parallel Data Storage & Data Intensive Scalable Computing Systems (PDSW-DISCS), Dallas, TX, USA, 2018, pp. 54-63. doi: 10.1109/PDSW-DISCS.2018.00011 online
-
[2] Y. Zhu et al., "Entropy-Aware I/O Pipelining for Large-Scale Deep Learning on HPC Systems", 2018 IEEE 26th International Symposium on Modeling, Analysis, and Simulation of Computer and Telecommunication Systems (MASCOTS), Milwaukee, WI, 2018, pp. 145-156. doi: 10.1109/MASCOTS.2018.00023 online
-
[3] Kazuhiro Serizawa and Osamu Tatebe. 2019. "Accelerating Machine Learning I/O by Overlapping Data Staging and Mini-batch Generations". In Proceedings of the 6th IEEE/ACM International Conference on Big Data Computing, Applications and Technologies (BDCAT '19). ACM, New York, NY, USA, 31-34. DOI: 10.1145/3365109.3368768 online
-
[4] Choi, Dami, et al. "Faster neural network training with data echoing." arXiv preprint arXiv:1907.05550 (2019). online