ここでは、daskを使って重いCSVファイルをPythonで処理した話を備忘録的に記録します。
Pandasで重いCSVファイルを処理するときに、たまに起こること
Pandasを使って重いCSVファイルを処理する際に、以下のようなエラーが発生することがあります。
私は最近、1億8000万行のECサイトのログデータに対して、各ユーザの各商品に対する閲覧回数を集約する計算を行ったところ以下のエラーが発生しました。
The Kernel crashed while executing code in the current cell or a previous cell.
Please review the code in the cell(s) to identify a possible cause of the failure.
Click here for more info.
View Jupyter log for further details.
原因としてはPCのメモリ不足などが考えられるのですが、メモリが少なくても大きなデータを扱いたいときがあります。
daskで大きなファイルを分割して処理する
daskとは
daskは並列分散コンピューティングのためのPythonライブラリです。
pandasと似たAPIを持つので、個人的にはかなり使いやすいです。
具体的なコード例
ここでは、daskを用いて、データを分割し処理するという方法を紹介します。
私自身あまり大きなデータを処理した経験がなかったのですが、今回紹介する方法で上の様なエラーを出さずに処理を完了させることができました。
以下に具体的なPythonのコードと説明を記載します。
# daskのインポート
import dask.dataframe as ddf
# データフレームの読み込み
df = ddf.read_csv('data.csv')
# パーティション数の設定と再パーティションの実施
npartitions = 10
df = df.repartition(npartitions=npartitions)
# データをパーティションごとに処理
for partition in df.to_delayed():
# daskデータフレームをpandasデータフレームに変換
df_current = partition.compute()
-------------------
--- 行いたい処理 ---
-------------------
# メモリの開放
del df_current
上記のコードでは、読み込んだデータに対して再パーティションを行っています。
パーティションとは、データテーブルを内部的に分割することです。
メモリ不足を引き起こさない様に自分でパーティション数を決め、再パーティションを行っています(daskでは読み込み時に自動的にパーティションされています)。
そして、for文でパーティションごとに分割されたテーブルを処理しています。
また、for文の終わりでは処理を行ったデータをメモリから解放しています。
これにより、メモリ不足を起こさずに処理を行うことができます。
おわりに
今回は、daskによるデータの分割と処理の方法を自分用のメモを兼ねて紹介しました。
もっと良い方法がありそうなので、引き続き調査を行いたいと思います。
この記事が参考になった方がいらっしゃれば幸いです。