はじめに
この記事は,Kaggle Advent Calendar 2022第6日目の記事になります。
本記事では、 32GB超のCSVデータの基本統計量を、小規模マシンでも省メモリかつ高速に計算するテクニック について解説します。
Kaggleコンペに限らず、
- マシンスペックが低いため、大きなデータセットを満足に処理できず困っている
- 毎回行うファイル読み込みが遅いので、もっと高速化したい ⚡
といった悩みや課題を抱えている方の参考になれば幸いです。
モチベーション
データ分析業務やKaggle等のコンペティションで初めてのデータセットを扱う場合、いきなり機械学習アルゴリズムを行うことはまず無く、最初にデータ観察を行うのが一般的です。
テーブルデータであれば、各カラムの基本統計量(最小値、最大値、平均、分散、四分位数)などを計算・可視化し、データクレンジングの要否や特徴量設計の方針などを検討します。
(GoogleCloudのVertex AI概要より引用。左上の「Data Readiness」がデータ観察に該当)
しかし、 データセットの規模がマシンスペック(特にメモリサイズ)を大きく上回る場合、データセットを読み込むだけでメモリエラーが発生し、次工程である特徴量設計や学習・チューニングに中々進めない ことが往々にしてあります。
クラウド環境が利用可能であればハイメモリタイプのインスタンスを使用することで解決しますが、例えば
- データセットに個人情報が含まれているため、社内環境でしか分析できない(社外にデータを持ち出せない)
- 参加したコンペがCode Competitionで、指定された環境でしか分析できない
といった場合、 利用可能な分析環境で何とかするしかない のが実情です。
クラウド環境が利用できる場合であっても、コンピューティングリソースは基本的に従量課金制のため、 同等の処理結果・処理時間をより小さいインスタンスタイプで実行できれば、費用の節約 にもつながります。
そこで、 低スペックマシンで大規模なデータセットの基本統計量を算出する方法 にニーズがあると考え、その方法の一つを記事化したものになります。
検証環境
データセット
今年2022年の秋に開催された、Kaggle AMEXコンペのテストデータ(test_data.csv
)を使用します。
CSVファイルによるテーブルコンペで、 学習データは約16GB(553万行)、テストデータは約32GB(1,136万行) もあります。2021年に開催されたRiiidコンペはデータセットが1億行超もあることで有名でしたが、データサイズで比較すれば今回の方が6倍以上も大きいサイズになります。
$ du -ah *.csv
32G test_data.csv
16G train_data.csv
マシンスペック
GoogleCloudのC2D-highcpu-2インスタンスを使用します。
マシンスペックは 2vCPU/4GBメモリであり、メモリサイズはKaggleのCPUカーネル(4vCPU/16GBメモリ)の1/4、データサイズと比べると約1/8しかありません。
(https://cloud.google.com/compute/docs/compute-optimized-machines#c2d-high-cpu より抜粋)
使用パッケージ
本検証では、Pandas、Dask、PyArrowを使用します。
全てpipでインストール可能です。
$ pip install pandas dask pyarrow
pythonと各パッケージのバージョンは以下の通りとなります。
$ python -V
Python 3.7.12
$ pip freeze | grep -E "pandas|dask|pyarrow"
dask==2022.2.0
pandas==1.3.5
pyarrow==10.0.0
実行時間とメモリ使用量の計測には、LinuxのGNU版timeパッケージを使用しました。
$ sudo apt install time
Pandasで実行(失敗)
まずはPandasで実行してみましょう。
pandasにはdescribe()という各カラムの基本統計量を算出できる便利なメソッドがありますので、これを使用します。
import pandas as pd
if __name__ == '__main__':
data = pd.read_csv('train_data.csv')
stat = data.describe()
print(stat)
Pandasは全てのデータを一度に読み込んでメモリに乗せる仕様のため、このコードはメモリエラーで失敗してしまいます。
Pandasの公式ドキュメントでは、大規模なデータセットを扱う場合、
- 必要な列のみを読み込む
- 効率的なデータ型を使用する
- データをチャンクで分割して処理する
ことが推奨されています。
しかし、 データを読み込む時点では、どれが必要な列なのか、どんなデータ型が適切なのかはまだ分かりません。チャンクで分割処理する方法も、 四分位数などの統計量は列データ全体を必要とする ため、これも単純には適用できません。
Daskで実行(成功!)
続いて、Daskを使用します。
DaskはPython用の並列・分散処理パッケージであり、大規模なデータに対してもスケーリング可能な機能を多数提供しています。Pandasの公式ドキュメントでも、エコシステムの1つとして紹介されています。
Daskでは以下のように実装します。
describe()
の後にcompute()
メソッドで遅延処理を実行している点を除けば、APIはPandasとほぼ同じです。
import dask.dataframe as dd
if __name__ == '__main__':
data = dd.read_csv('test_data.csv')
stat = data.describe().compute()
print(stat)
実行結果は以下となります。実行時間は約390秒、メモリ使用量は約1.3GBでした。
データサイズのわずか1/8程度しかメモリ搭載していないマシンでも、メモリエラーを起こさずに計算できました。素晴らしい!!!
P_2 D_39 B_1 B_2 R_1 S_3 D_41 ... D_139 D_140 D_141 D_142 D_143 D_144 D_145
count 5.485466e+06 5.531451e+06 5.531451e+06 5.529435e+06 5.531451e+06 4.510907e+06 5.529435e+06 ... 5.429903e+06 5.490819e+06 5.429903e+06 944408.000000 5.429903e+06 5.490724e+06 5.429903e+06
mean 6.563340e-01 1.531172e-01 1.240100e-01 6.214887e-01 7.880270e-02 2.258455e-01 5.978469e-02 ... 1.789305e-01 2.664348e-02 1.645212e-01 0.390799 1.788022e-01 5.238952e-02 6.233496e-02
std 2.446494e-01 2.700709e-01 2.119869e-01 4.014877e-01 2.263971e-01 1.933475e-01 2.025443e-01 ... 3.790614e-01 1.455480e-01 3.482771e-01 0.236182 3.789498e-01 1.825135e-01 1.934937e-01
min -4.589548e-01 5.026190e-09 -7.588799e+00 9.192280e-09 1.534223e-09 -6.271320e-01 5.566545e-10 ... 3.767347e-10 3.725073e-09 1.650100e-10 -0.014539 5.549692e-09 2.500991e-09 1.226024e-09
25% 5.077288e-01 4.728840e-03 9.636755e-03 1.377500e-01 2.997485e-03 1.317796e-01 2.989860e-03 ... 3.169253e-03 2.648101e-03 3.170379e-03 0.247961 3.184138e-03 2.866789e-03 3.166187e-03
50% 7.148664e-01 9.370886e-03 3.701816e-02 8.151693e-01 5.950186e-03 1.676472e-01 5.934894e-03 ... 6.260630e-03 5.249856e-03 6.255896e-03 0.437398 6.260704e-03 5.654534e-03 6.291452e-03
75% 8.811929e-01 2.449055e-01 1.511139e-01 1.003019e+00 8.839325e-03 2.803336e-01 8.842138e-03 ... 9.344230e-03 7.773080e-03 9.360699e-03 0.602584 9.352387e-03 8.455284e-03 9.344943e-03
max 1.010000e+00 5.389619e+00 1.324060e+00 1.010000e+00 3.256284e+00 5.482888e+00 8.988807e+00 ... 1.010000e+00 1.010000e+00 1.339910e+00 2.229368 1.010000e+00 1.343331e+00 4.827630e+00
[8 rows x 186 columns]
なぜDaskでは省メモリで計算できたのでしょうか?
Daskのread_csv()にはblocksize
という引数があり、入力データをblocksize
毎にパーティション分割して読み込みます(指定が無い場合、blocksize
はCPUコア数やメモリサイズから自動計算されます)。
各統計量はパーティション毎に並列で計算され、最後にデータ全体の統計量として再計算されるため、ピークメモリを抑えることに成功しています。
高速化する
先程のプログラムは成功しましたが、実行に6分半近く要しています。
これを更に高速化していきたいと思います。
Parquet形式に事前変換する
フラットファイルであるCSV
形式は読み込みが遅いため、カラムナフォーマットであるParquet
形式に事前変換します。
変換処理が1回だけ余計に発生しますが、2回目以降はカラムナフォーマットの方が読み込みが高速なため(後述)、 コンペのようにファイル読み込みを何度も行うユースケースでは事前変換がおススメ です。
変換処理にもDaskを使用します。
import dask.dataframe as dd
if __name__ == '__main__':
data = dd.read_csv('test_data.csv')
data.to_parquet('test_parquet')
処理時間は約420秒、メモリ使用量は約1GBでした。
出力先のtest_parquet
ディレクトリには、元データが530個のParquet
ファイルに分割保存され、総サイズは約16GB(元データの半分)となりました。
分割数は、read_csv()で設定されたblocksize
に依存します。
デフォルトの最大値はblocksize=64MB
であり、より大きな値を設定すれば分割数を減らすことができます(その分、パーティションあたりのデータサイズが増えるため、メモリ使用量は増加します)。
Parquet形式から読み込む
以降は、事前変換したParquet
ファイルから読み込みます。
Daskのread_parquet()にディレクトリを指定するだけで、出力したParquet
ファイル全量を一括で読み込むことが可能です。
import dask.dataframe as dd
if __name__ == '__main__':
data = dd.read_parquet('test_parquet')
stat = data.describe().compute()
print(stat)
処理時間は約210秒、メモリ使用量は約1.3GBでした。
元のCSVファイルから読み込むのと比較して、同等のメモリ使用量を維持しつつ、処理時間を4割以上削減できました!
(おまけ)describeメソッドの使い方
私は基本統計量の結果を、適切なデータ型の確認によく利用しています。
bit数の少ないデータ型に変換することで、 メモリ上のデータサイズが劇的に減るだけでなく、特徴量設計などの計算時間短縮 も期待できます。
一例として、以下のような変換があります。
- 整数で最大値が255未満 ⇒
np.int8
型 -
0
と1
しかない ⇒bool
型 - ユニークな数が少ない(カーディナリティが低い) ⇒
CategoricalDtype
型 - 欠損値(
NaN
)以外は整数っぽい ⇒ 欠値補間 + 整数型
データに極端な外れ値が含まれている場合、最小値や最大値だけでは適切なデータ型は特定できないため、パーセンタイルを使用します。
describe()のパーセンタイルはデフォルトで[0.25, 0.75]
ですが、quantiles
引数で変更可能です。
percentiles=[0.01, 0.99]
にすると、上下1%ずつを外れ値として除外できるため、残り98%のデータから比較的妥当なデータ範囲を絞り込めるようになります。
import dask.dataframe as dd
if __name__ == '__main__':
data = dd.read_parquet('test_parquet')
stat = data.describe(percentiles=[0.01, 0.99]).compute()
print(stat)
P_2 D_39 B_1 B_2 R_1 S_3 D_41 ... D_139 D_140 D_141 D_142 D_143 D_144 D_145
count 1.130388e+07 1.136376e+07 1.136376e+07 1.136084e+07 1.136376e+07 9.622797e+06 1.136084e+07 ... 1.126606e+07 1.134001e+07 1.126606e+07 1.966795e+06 1.126606e+07 1.133992e+07 1.126606e+07
mean 6.576889e-01 1.562754e-01 1.279497e-01 6.167632e-01 7.489555e-02 2.274024e-01 5.915285e-02 ... 1.796090e-01 2.769862e-02 1.655753e-01 4.068860e-01 1.794860e-01 5.198750e-02 6.377648e-02
std 2.468916e-01 2.657579e-01 2.126895e-01 4.059172e-01 2.160619e-01 1.988265e-01 2.006950e-01 ... 3.796441e-01 1.489702e-01 3.498050e-01 2.484312e-01 3.795397e-01 1.821541e-01 1.973693e-01
min -4.658552e-01 6.204270e-09 -7.057417e+00 2.125048e-10 6.911055e-10 -6.855620e-01 4.726168e-10 ... 1.119644e-09 1.485373e-09 8.903923e-11 -1.471698e-02 8.253177e-11 1.051330e-09 1.150208e-09
1% 3.394018e-02 2.264711e-04 6.388284e-04 4.502037e-03 1.364020e-04 1.524112e-02 1.400369e-04 ... 1.558728e-04 1.245405e-04 1.465424e-04 2.643156e-02 1.466159e-04 1.349791e-04 1.542352e-04
50% 7.233169e-01 9.784972e-03 3.896192e-02 8.153240e-01 5.881068e-03 1.675709e-01 5.889806e-03 ... 6.291292e-03 5.217998e-03 6.297293e-03 4.524809e-01 6.313159e-03 5.708498e-03 6.338714e-03
99% 1.007214e+00 1.142791e+00 1.089292e+00 1.009767e+00 1.009128e+00 1.176328e+00 1.242532e+00 ... 1.009574e+00 1.007079e+00 1.030071e+00 1.286142e+00 1.009572e+00 1.147772e+00 1.369364e+00
max 1.010000e+00 9.330448e+00 1.324060e+00 1.010000e+00 3.258507e+00 4.341215e+00 1.211719e+01 ... 1.010000e+00 1.010000e+00 1.320065e+00 2.280940e+00 1.010000e+00 1.343333e+00 5.462223e+00
[8 rows x 186 columns]
まとめ
本記事では、32GB超のCSVデータの基本統計量を小規模マシンでも省メモリかつ高速に計算するテクニック について解説しました。
今回はDaskを使用しましたが、PythonであればPySparkやVaexなど他の分散パッケージでも類似の効果が期待できます。
本記事が、私を含め、大きなデータセットの扱いに困っている方々の参考になれば幸いです。