LoginSignup
17
9

More than 3 years have passed since last update.

CSV・Parquet・HDF5のフォーマットにおけるVaex・Dask・Pandasのパフォーマンスの比較をやってみた(単体ファイル対象)。

Last updated at Posted at 2021-01-11

本記事はCSV、Parquet、HDF5などのデータフォーマットにおける、Vaex、Dask、Pandasなどのパフォーマンス比較用の記事となります。

お仕事に役立ったらいいなぁくらいの軽い気持ちでの緩く荒めの対応なので、細かいところのツッコミはご容赦いただけますと幸いです。

弊社の前提条件と検証の目的

前提条件として弊社はプロジェクト側もAWSで統一・データ基盤もAWSで統一・・・といったようにマルチクラウドにはせずにAWSを中心とする形で組まれています(ログ関係だけGCPでBigQuery・・・みたいなことはしていません)。

そうするとAWSでKinesis FirehoseなりAthenaなりを使う形でデータ置き場がS3が中心になってきます。Athenaなど以外にもS3をマウントしてPandasやDaskなどでも色々利用しています(且つ、弊社だとAthenaなどよりもPandasなどでの利用の方がメインになっています)。

S3にデータを置く場合、フォーマットが色々自由に選択できます(BigQueryでもCloud Storageとかに対してクエリを投げたりで近いことはできるかもしれませんが・・・)。そうするとCSVやらParquetやらのいずれかのフォーマット(もしくは複数のフォーマットを並列で配置して使い分けする形)を選択する必要があります。

また、ログ基盤などだとデータ量が膨大になるため、圧縮などをした場合のディスクサイズの比較や読み書きの速度などをある程度把握しておきたいとも今まで感じていました。

フォーマットごとのディスクサイズの差はS3におけるディスクコストだけでなく、Athenaなどではスキャンコストが圧縮された後のファイルサイズで計算されるので結構馬鹿にできません。

各フォーマットごとの検証前の所感と何故比較でCSV・Parquet・HDF5を選んだのか

世の中にはログに使える様々なファイルフォーマットが存在しますが、検証前では弊社の用途を加味して以下のような所感を持っている状態で進めています。

  • CSV : 大抵の環境(言語・ライブラリ・サービスetc)で扱えます。速度は他のフォーマットと比べると結構遅いので大きなデータだと少しきついと感じる時があります。
  • JSON : CSV同様多くの環境でサポートされており、且つデータで階層構造を持ったり各値にキーの文字列が割り振られるのでCSVなどで列がずれたりが起きづらいと感じています。ただし弊社のログだと行列のデータが大半となるため、CSVと比べるとファイルサイズが大きくなってあまり選択するメリットが少ないとも思っています。ただし同じキー名などが大半になるので、圧縮をかけるとぐぐっとサイズが小さくなるので圧縮するならそこまで極端にファイルサイズはCSVと比べて大きくならない印象でもあります。
  • Parquet : 速度はHDF5などと比べるとぼちぼち遅い?印象は持っています。ただしCSVなどと比べると結構速い印象です。カラムナフォーマットの恩恵で、圧縮後のファイルサイズがぐぐっと小さくなる可能性を秘めている印象を持っています(今回扱うフォーマットの中では一番小さくなるかなと)。AWSでも多くのサービスでサポートされていますし、Pandas・Dask・Vaexなどの各ライブラリでもサポートされています。ただし1つのファイルが小さいと逆効果(CSVなどを圧縮した方がファイルサイズが小さくなりうる)とも感じています。Firehoseなどで数分単位などでログが保存された際に、ファイルが小さいケース(テーブル)も結構あるでしょうから、データレイクでのそういったデータは日次分で統合などをしないとあまりメリットが無いケースもありそうです。各ライブラリでの読み書きは大分シンプルで楽に感じています。
  • HDF5 : 各Pythonライブラリでは扱えるものの、AWSの各サービス(Athenaなど)は対応していません。Vaexでの利用ではよく使われているイメージです(Vaexでの高速な処理を引き出すサンプルの多くがHDF5だったり等)。Pickleなどに肉薄する速度が出るイメージ(Pickleよりもわずかに遅いくらいのイメージ)でいます。
  • Pickleやnpyなど : Pickle(Pythonオブジェクトの直列化されたバイナリ)やnpy(NumPy形式のバイナリ)などに関しては、Pythonで扱う上では非常に高速に扱えますが、PythonやNumPyのバージョン変更などで使えなくなったりするので、長期のログの保存としては扱いが現実的ではない(一時的な分析などであれば大分有益)ので本記事では触れません。また、NumPy形式のmemmapなどを使うケースを除いて、一度に全てをメモリに乗せる必要がある(VaexやDaskでは基本的には直接扱えない)点やAWS環境でもサポートされていない点もマイナスに感じています。

TL;DR

結構コードなどが長くなったので、先に結論から記載しておきます。

  • 思っていたほどParquetがCSVと比べてディスクサイズ面で優位になりませんでした。今回試した限りでは結構行数が多くないとディスクコスト削減の面ではプラスにならない印象です。
    • 数千行程度であればCSVの方が優秀だったりするため、データレイクなどへの送信で数分単位で小さいファイルを送るケースなどは注意が必要です。
    • 特にKinesis Firehoseなどを使った場合、Parquet変換で余計にAWSのコストがかかったりするので、ディスクコストも高く、変換コストもかかる・・・みたいなことになりかねません。
  • データレイクのデータを日次で1つのParquetファイルにまとめる・・・みたいなケースや、ETLで抽出したデータなどであれはParquetは輝きそうな気がします。
  • 速度に関しては数百行といったレベルであればCSVの方が好ましいですが、それ以降はParquetが大分速い速度を出してくれます。
  • ごく小さいデータの場合VaexよりもPandasの方が早めですが、行数が万行の単位、場合によっては数千行程度でもVaexの方が速くなってきます。
  • 十万行~といった規模になってくると、大分ParquetよりもHDF5の方がVaexで速くなってきます。それまではParquet側の方が結構優位だったりするようです。
  • Pandasのデフォルト設定でもある、SnappyのParquetの圧縮方式は、読み込みなどでの影響が大分小さいようです。ディスクサイズ削減とパフォーマンスのバランスの良い圧縮と言えます。ファイルサイズが大きくなってくると、gzip圧縮との差がかなり顕著になってきます。
  • 今回試した範囲ではDaskの結果が良くありませんでした。検証の行数が少なく且つ単体ファイルという点が大きく影響しているかもしれません。Vaexに関しては単体ファイルでも非常に良いパフォーマンスで動作してくれるようです。
  • 思っていたよりもParquet + Vaexのパフォーマンスが良く、取り回しの楽さやファイルサイズなどの優位性のバランスからCSVの代わりに結構積極的に検討してもいいかもしれないという印象を受けました。
  • Vaex + CSVは相当遅いので、あまり使うメリットが少ないと思われます。

検証する内容

  • ファイルのフォーマットは未圧縮CSV・gzip圧縮のCSV・未圧縮Parquet・Snappy圧縮のParquet・gzip圧縮のParquet・未圧縮のHDF5・zlib圧縮のHDF5(Vaexを除く)で対応します。
  • ライブラリはPandas、Dask、Vaexを使います。
  • ディスクサイズの比較とプロット・読み込みと計算の比較とプロットを行います。
  • 行数はデータレイクなどで数分単位で細かくデータが送られるケースも想定し、少ない行数も含めて検証してあります。単一のファイルで500行・2000行・5000行・1万行・5万行・10万行・50万行・100万行で試しています。

※Dask・Vaex・HDF5などに関しては以前別途記事を書いているので必要に応じてそちらをご参照ください。

今回やらないこと

  • 複数ファイルで大量の行数のデータに対する制御は今回スキップします。将来機会があれば別の記事で行います(時間的な面と記事の長さを加味し)。
  • 書き込み速度に関しては私の用途では優先度が低い(読み込みに比べると回数が少ない)ため、ほとんど触れません。

使う環境

以下の設定のDocker環境によるPython3.8のJupyterのノートブックを利用していきます。このイメージのOSはUbuntuは20.04になります。

FROM jupyter/datascience-notebook:python-3.8.6
RUN pip install vaex==2.6.1
RUN pip install jupyter-contrib-nbextensions==0.5.1
RUN jupyter contrib nbextension install --user
RUN jupyter nbextension enable hinterland/hinterland \
    && jupyter nbextension enable toc2/main

PandasやDaskなどは上記のDockerfileで指定しなくても入るため、そちらのバージョンを使っていきます。具体的には各ライブラリは以下のバージョンが入ることになります。

  • vaex==2.6.1
  • pandas==1.1.4
  • dask==2.30.0

ホストのOSはWindows10、メモリ32GB、CPUはi7-8700 3.20GHz(6コア12スレッド)のマシンを使っています。

検証環境における留意点

S3をマウントしてPythonライブラリで扱ったりもしている都合、本当はS3を準備して検証するべきですが、今回は手間を加味してローカルで検証していきます。


以下でコードを書いていっていますが、大分長いので実際の比較の節までスキップしたい・・・という方はこちらの節をクリックして飛んでいただけますと幸いです。

500行5列程度の小さいファイルでの比較

まずは500行5列程度の小さいファイルでの比較をやってみます。
データレイクなどでFirehoseやFluentdで短いスパンで(あまり行数の多くない)特定のデータセットを保存したケースを想定します。

データの準備

データの準備用の処理を書いていきます。
5つの各カラムは以下のように設定しました。

  • column_a : int -> 0~4999999の範囲でランダムな値が設定されます。
  • column_b : int -> 0~10000の範囲でランダムな値が設定されます。
  • column_c : int -> 0~100の範囲でランダムな値が設定されます。
  • column_d : int -> 100, 300, 500, 1000, 3000, 10000のいずれかの値が設定されます。スマホアプリで言うところのアプリ内課金的ないくつかの固定値が設定されるものを想定します。
  • e : str -> 20文字の小文字と大文字のアルファベットの文字列が設定されます。
from string import ascii_letters
import random

import numpy as np
import pandas as pd


def make_random_ascii_str(char_num):
    """
    指定された文字数のランダムな小文字と大文字の文字列を生成する。

    Parameters
    ----------
    char_num : int
        生成する文字数。

    Returns
    -------
    random_str : str
        生成された文字列。
    """
    return ''.join(random.choices(population=ascii_letters, k=char_num))


def make_basic_pandas_df(row_num, char_num):
    """
    基本的な構造の検証用のPandasのデータフレームを生成する。

    Parameters
    ----------
    row_num : int
        生成するデータフレームの行数。
    char_num : str
        文字列のカラムに設定する値の文字数。

    Returns
    -------
    pandas_df : pd.DataFrame
        生成されたデータフレーム。以下の5つのカラムに各値が設定される。
        - column_a : int -> 0~4999999の範囲でランダムな値が設定される。
        - column_b : int -> 0~10000の範囲でランダムな値が設定される。
        - column_c : int -> 0~100の範囲でランダムな値が設定される。
        - column_d : int -> 100, 300, 500, 1000, 3000, 10000のいずれかの値が設定される。
        - column_e : str -> char_num で指定された文字数の小文字と大文字のアルファベットの
            文字列される。
    """
    pandas_df = pd.DataFrame(
        index=np.arange(0, row_num), columns=['column_a'])
    row_num = len(pandas_df)
    pandas_df['column_a'] = np.random.randint(0, 5000000, size=row_num)
    pandas_df['column_b'] = np.random.randint(0, 10000, size=row_num)
    pandas_df['column_c'] = np.random.randint(0, 100, size=row_num)
    pandas_df['column_d'] = np.random.choice(a=[100, 300, 500, 1000, 3000, 10000], size=row_num)
    pandas_df['column_e'] = [make_random_ascii_str(char_num=20) for _ in range(row_num)]
    return pandas_df

以下のような500行のデータフレームが生成されます。

pandas_df = make_basic_pandas_df(row_num=500, char_num=20)
print(pandas_df.head())
   column_a  column_b  column_c  column_d              column_e
0    217141      3701        90       300  qgnzZkIMxOIhiIdvWZNF
1   3123093      9938        60      3000  oMcRXqyeYLuWAqLknVmG
2   3547451      4437        55       300  wJMPyrJGaLciRPQiSiuC
3   1141404      2126        92     10000  pvQUQekUPVddWOyccdfD
4    641110      3734        52      1000  fesOBYqfgofLVNwTLGEc

ファイルの保存処理の追加

ディスクサイズや読み込み速度比較のため、以下の各フォーマットで出力されるようにします。なお、HDF5に関しては各ライブラリでデータ階層の内容が異なったりするため各ライブラリごとに出力します。

また、保存に関してはインデックスは保存せず、エンコーディングのオプションがある書き出し関してはにUTF-8を指定していきます。

  • 未圧縮CSV
  • gzip圧縮されたCSV
  • 未圧縮Parquet
  • Snappy圧縮されたParquet
  • gzip圧縮されたParquet
  • 未圧縮HDF5
  • zlib圧縮されたHDF5

主に書き出し速度の比較時の話となりますが、CSVに関しては本記事で使うVaexのバージョンでは書き出しのインターフェイスが無いためPandasとDaskでのみ扱います。また、VaexではParquetの圧縮方式の指定が無い(Snappyのみとなる)ため、書き出しの処理はPandasで行い、そちらのデータをVaex参照する形で読み込みを行います。

HDF5については未圧縮のもののみVaexが対応しているため、PandasとDask側でのみ圧縮関係は対応します。また、圧縮率のレベル指定(0~9で、数値が高くなるほど高圧縮・高負荷)に関しては今回は4を指定しています。

from datetime import datetime

import vaex
import dask.dataframe as dd


class FP:
    """
    ディスクサイズや読み込み速度の比較用の各フォーマットの保存先の
    ファイルパスを定義したクラス。

    Attributes
    ----------
    CSV_NO_COMPRESSION : str
        未圧縮のCSVのパス。
    CSV_GZIP : str
        gzip圧縮されたCSVのパス。
    PARQUET_NO_COMPRESSION : str
        未圧縮のParquetのパス。
    PARQUET_SNAPPY : str
        Snappy圧縮されたParquetのパス。
    PARQUET_GZIP : str
        gzip圧縮されたParquetのパス。
    HDF5_NO_COMPRESSION_PANDAS : str
        未圧縮のHDF5のPandas用のファイルのパス。
    HDF5_NO_COMPRESSION_DASK : str
        未圧縮のHDF5のDask用のファイルパス。
    HDF5_NO_COMPRESSION_VAEX : str
        未圧縮のHDF5のVaex用のファイルパス。
    HDF5_ZLIB_PANDAS : str
        zlib圧縮のHDF5のPandas用のファイルパス。
    HDF5_ZLIB_DASK : str
        zlib圧縮のHDF5のDask用のファイルパス。
    """

    CSV_NO_COMPRESSION = 'csv_no_compression.csv'
    CSV_GZIP = 'csv_gzip.csv.gz'

    PARQUET_NO_COMPRESSION = 'parquet_no_compression.parquet'
    PARQUET_SNAPPY = 'parquet_snappy.parquet'
    PARQUET_GZIP = 'parquet_gzip.parquet'

    HDF5_NO_COMPRESSION_PANDAS = 'hdf5_no_compression_pandas.hdf5'
    HDF5_NO_COMPRESSION_DASK = 'hdf5_no_compression_dask.hdf5'
    HDF5_NO_COMPRESSION_VAEX = 'hdf5_no_compression_vaex.hdf5'
    HDF5_ZLIB_PANDAS = 'hdf5_zlib_pandas.hdf5'
    HDF5_ZLIB_DASK = 'hdf5_zlib_dask.hdf5'


def remove_each_format_files():
    """
    保存済みの各ファイルの削除を行う。
    """
    file_or_dir_names = os.listdir()
    data_extensions = ('.csv', '.gz', '.parquet', '.hdf5')
    for file_or_dir_name in file_or_dir_names:
        if not os.path.isfile(file_or_dir_name):
            continue
        extension = f'.{file_or_dir_name.split(".")[-1]}'
        if extension not in data_extensions:
            continue
        os.remove(file_or_dir_name)


def save_each_format(pandas_df):
    """
   各フォーマットにおけるファイルの保存を行う。

    Parameters
    ----------
    pandas_df : pd.DataFrame
        保存対象のデータフレーム。
    """
    remove_each_format_files()
    dask_df = dd.from_pandas(data=pandas_df, npartitions=1)
    vaex_df = vaex.from_pandas(df=pandas_df, copy_index=False)

    print(datetime.now(), '未圧縮のCSVの保存を開始...')
    pandas_df.to_csv(FP.CSV_NO_COMPRESSION, index=False, encoding='utf-8')

    print(datetime.now(), 'gzip圧縮されたCSVの保存を開始...')
    pandas_df.to_csv(FP.CSV_GZIP, index=False, encoding='utf-8', compression='gzip')

    print(datetime.now(), '未圧縮のParquetの保存を開始...')
    pandas_df.to_parquet(FP.PARQUET_NO_COMPRESSION, compression=None, index=False)

    print(datetime.now(), 'Snappy圧縮されたParquetの保存を開始...')
    pandas_df.to_parquet(FP.PARQUET_SNAPPY, compression='snappy', index=False)

    print(datetime.now(), 'gzip圧縮されたParquetの保存を開始...')
    pandas_df.to_parquet(FP.PARQUET_GZIP, compression='gzip', index=False)

    print(datetime.now(), '未圧縮のPandasのHDF5の保存を開始...')
    pandas_df.to_hdf(
        FP.HDF5_NO_COMPRESSION_PANDAS, key='data', mode='w', complib=None, index=False)

    print(datetime.now(), '未圧縮のDaskのHDF5の保存を開始...')
    dask_df.to_hdf(
        FP.HDF5_NO_COMPRESSION_DASK, key='data', mode='w', complib=None, index=False)

    print(datetime.now(), '未圧縮のVaexのHDF5の保存を開始...')
    vaex_df.export_hdf5(FP.HDF5_NO_COMPRESSION_VAEX)

    print(datetime.now(), 'zlib圧縮されたPandasのHDF5の保存を開始...')
    pandas_df.to_hdf(
        FP.HDF5_ZLIB_PANDAS, key='data', mode='w', complib='zlib',
        complevel=4, index=False)

    print(datetime.now(), 'zlib圧縮されたDaskのHDF5の保存を開始...')
    dask_df.to_hdf(
        FP.HDF5_ZLIB_DASK, key='data', mode='w', complib='zlib',
        complevel=4, index=False)

    print(datetime.now(), '保存処理が完了。')

圧縮方式についての補足

gzip, zlib, Snappyと色々な圧縮方式を扱っていきますが、Snappyなどに馴染みの薄い方もいらっしゃる(私もParquetなどに触り始めるまでに馴染みが薄かった)と思うため、少し補足しておきます。

PandasのHDF5のデフォルトの圧縮方式になっているzlibは、gzipやzipフォーマットなどを作るために使われるライブラリなので、圧縮方式でgzipzlibと指定されていたら基本的に圧縮の傾向は同じと見て問題ないと思います。

参考 : How are zlib, gzip and zip related? What do they have in common and how are they different?

Snappyに関してはgzipなどとは大分性質が異なり、以下のような特徴を持ちます。

  • gzipなどよりもファイルサイズは大きくなります(圧縮率が低くなります)。
  • gzipなどよりも読み書きの速度がぐぐっと速くなります。
  • 1つのファイルの読み書きでも内部のデータを分割することができます。つまりVaexなどで一度に全データメモリに持ったりせずに一定行数ごとにメモリにデータを持ちつつ書き込んだりして瞬間的な負荷を下げたり、ライブラリによって並列化などが圧縮ファイルでもしやすくなっています。

参考 :

ディスクサイズの比較用の処理の追加

保存された各フォーマットのディスクサイズを取得する処理を追加していきます。後々プロットなどもしようと思うため、Pandasのシリーズで各値を保持するようにしておきます。

import os


def get_disk_size_info_sr():
    """
    保存された各フォーマットのファイルのディスクサイズの情報を格納した
    シリーズを取得する。

    Returns
    -------
    sr : pd.Series
        ディスクサイズの情報を格納したシリーズ。以下のインデックスが
        設定される。
        - csv_no_compression : int -> 未圧縮CSVのファイルサイズ。
        - csv_gzip : int -> gzip圧縮されたCVのファイルサイズ。
        - parquet_no_compression : int -> 未圧縮のParquetのファイルサイズ。
        - parquet_snappy : int -> Snappy圧縮されたParquetのファイルサイズ。
        - parquet_gzip : int -> gzip圧縮されたParquetのファイルサイズ。
        - hdf5_no_compression : int -> 未圧縮のHDF5のファイルサイズ(Pandasのファイルを参照する)。
        - hdf5_zlib : int -> zlibで圧縮されたHDF5のファイルサイズ(Pandasのファイルを参照する)。
    """
    sr = pd.Series(
        data={
            'csv_no_compression': os.path.getsize(FP.CSV_NO_COMPRESSION),
            'csv_gzip': os.path.getsize(FP.CSV_GZIP),
            'parquet_no_compression': os.path.getsize(FP.PARQUET_NO_COMPRESSION),
            'parquet_snappy': os.path.getsize(FP.PARQUET_SNAPPY),
            'parquet_gzip': os.path.getsize(FP.PARQUET_GZIP),
            'hdf5_no_compression': os.path.getsize(FP.HDF5_NO_COMPRESSION_PANDAS),
            'hdf5_zlib': os.path.getsize(FP.HDF5_ZLIB_PANDAS),
        })
    return sr

ファイルサイズプロット用の処理を追加

前節で追加したファイルサイズのデータを格納したシリーズをプロットするための処理を追加していきます。

import matplotlib
from matplotlib.ticker import ScalarFormatter, FormatStrFormatter
import matplotlib.pyplot as plt
from enum import Enum

matplotlib.style.use('ggplot')


class PlotFileSizeUnit(Enum):
    """
    プロット用のファイルサイズの単位のEnumを扱うクラス。

    Attributes
    ----------
    BYTES : str
        B(バイト)表示用の定数。
    KILO_BYTES : str
        KB表示用の定数。
    MEGA_BYTES : str
        MB表示用の定数。
    """
    BYTES = 'Bytes'
    KILO_BYTES = 'KB'
    MEGA_BYTES = 'MB'


def plot_file_size_info(
        sr, exclude_hdf5=False, unit=PlotFileSizeUnit.BYTES,
        print_sr = True):
    """
    ファイルサイズの比較用のプロットを行う。

    Parameters
    ----------
    sr : pd.Series
        ファイルサイズの情報を格納したシリーズ。以下のインデックスが必要になる。
        - csv_no_compression : int
        - csv_gzip : int
        - parquet_no_compression : int
        - parquet_snappy : int
        - parquet_gzip : int
        - hdf5_no_compression : int
        - hdf5_zlib : int
    exclude_hdf5 : bool, default False
        HDF5のものをプロット対象外にするかどうか。
    unit : PlotFileSizeUnit
        プロットのファイルサイズの単位の指定。
    print_sr : bool, default True
        プロットのシリーズの内容を出力するかどうか。
    """
    sr = sr.copy()
    sr.sort_values(inplace=True, ascending=False)
    if exclude_hdf5:
        sr = sr[~sr.index.isin(['hdf5_no_compression', 'hdf5_zlib'])]
    if unit == PlotFileSizeUnit.KILO_BYTES:
        sr = sr / 1000
    elif unit == PlotFileSizeUnit.MEGA_BYTES:
        sr = sr / 1000 / 1000
    if print_sr:
        print(sr.sort_values(ascending=True))
    title = f'File size ({unit.value})'
    if exclude_hdf5:
        title += ' - hdf5 excluded'
    ax = sr.plot(kind='barh', figsize=(10, len(sr) // 2), title=title)
    ax.xaxis.set_major_formatter(FormatStrFormatter('%d'))
    plt.show()

読み込みと計算速度の計算用の処理を追加

読み込み速度の値を取得するための処理を書いていきます。なお、処理時間の計測にはtimeitを使いますが、マジックコマンドだと数が多く記述が煩雑になるのでマジックコマンドは使わずに関数内で計算して処理していきます。

なお、DaskやVaexに関してはフォーマットにもよりますが読み込みなどが遅延評価になったりします。そのため読み込みだけでは処理時間の取得ができませんので、ある程度のライトな計算処理を扱うようにし、読み込みと計算が終わるまでの時間で計算していきます。

処理としては以下のようなパターンを設けます。Vaexは文字列関係が特に速いという話なので、文字列操作も含めます。2つ目のパターンは計算を少し多めにしてあります。

  • [パターン1]. 読み込み -> column_a の値が100万以上の値にスライス -> column_b の値の合計値を算出します。
  • [パターン2]. 読み込み -> column_a の値が300万以下の値にスライス -> column_b の値が1000以上にスライス -> column_e の文字列の値の先頭がaから始まる行のみにスライス -> column_d の値でGROUP BY -> 各グループごとに column_b の最大値を算出します。
  • [パターン3]. 読み込み -> column_e でabという文字列を含む行のみにスライス -> スライス後の行数を算出します。

パターン1の各計算処理の追加

column_a の値が100万以上の値にスライス -> column_b の値の合計値を算出するパターンのPandas・Dask・Vaexの処理を書いていきます。データの読み込み周りは別途追加していきます。

def calculate_pattern_1_with_pandas_df(pandas_df):
    """
    1つ目のパターンの計算を、Pandasのデータフレームで行う。

    Parameters
    ----------
    pandas_df : pd.DataFrame
        対象のPandasのデータフレーム。

    Returns
    -------
    sum_val : int
        算出された合計値。
    """
    pandas_df = pandas_df[pandas_df['column_a'] >= 1_000_000]
    sum_val = pandas_df['column_b'].sum()
    return sum_val


def calculate_pattern_1_with_dask_df(dask_df):
    """
    1つ目のパターンの計算を、Daskのデータフレームで行う。

    Parameters
    ----------
    dask_df : dd.DataFrame
        対象のDaskのデータフレーム。

    Returns
    -------
    sum_val : int
        算出された合計値。
    """
    dask_df = dask_df[dask_df['column_a'] >= 1_000_000]
    sum_val = dask_df['column_b'].sum()
    sum_val = sum_val.compute()
    return sum_val


def calculate_pattern_1_with_vaex_df(vaex_df):
    """
    1つ目のパターンの計算を、Vaexのデータフレームで行う。

    Parameters
    ----------
    vaex_df : vaex.dataframe.DataFrame
        対象のVaexのデータフレーム。

    Returns
    -------
    sum_val : int
        算出された合計値。
    """
    vaex_df = vaex_df[vaex_df['column_a'] >= 1_000_000]
    sum_val = int(vaex_df.column_b.sum())
    return sum_val

パターン2の各計算処理の追加

column_a の値が300万以下の値にスライス -> column_b の値が7000以下にスライス -> column_e の文字列の値の先頭がaから始まる行のみにスライス -> column_c の値でGROUP BY -> 各グループごとにcolumn_b の最大値を算出するパターンのPandas・Dask・Vaexの処理を書いていきます。

def calculate_pattern_2_with_pandas_df(pandas_df):
    """
    2つ目のパターンの計算を、Pandasのデータフレームで行う。

    Parameters
    ----------
    pandas_df : pd.DataFrame
        対象のPandasのデータフレーム。

    Returns
    -------
    max_sr : pd.Series
        算出された各合計値を格納したシリーズ。
    """
    pandas_df = pandas_df[pandas_df['column_a'] <= 3_000_000]
    pandas_df = pandas_df[pandas_df['column_b'] >= 1000]
    pandas_df = pandas_df[pandas_df['column_e'].str.startswith('a')]
    grouped = pandas_df.groupby(by='column_d')
    max_df = grouped.max()
    max_sr = max_df['column_b']
    return max_sr


def calculate_pattern_2_with_dask_df(dask_df):
    """
    2つ目のパターンの計算を、Daskのデータフレームで行う。

    Parameters
    ----------
    dask_df : dd.DataFrame
        対象のDaskのデータフレーム。

    Returns
    -------
    max_sr : pd.Series
        算出された各合計値を格納したシリーズ。
    """
    dask_df = dask_df[dask_df['column_a'] <= 3_000_000]
    dask_df = dask_df[dask_df['column_b'] >= 1000]
    dask_df = dask_df[dask_df['column_e'].str.startswith('a')]
    grouped = dask_df.groupby(by='column_d')
    max_df = grouped.max()
    max_sr = max_df['column_b']
    max_sr = max_sr.compute()
    return max_sr


def calculate_pattern_2_with_vaex_df(vaex_df):
    """
    2つ目のパターンの計算を、Vaexのデータフレームで行う。

    Parameters
    ----------
    vaex_df : vaex.dataframe.DataFrame
        対象のVaexのデータフレーム。

    Returns
    -------
    max_sr : pd.Series
        算出された各合計値を格納したシリーズ。
    """
    vaex_df = vaex_df[vaex_df['column_a'] <= 3_000_000]
    vaex_df = vaex_df[vaex_df['column_b'] >= 1000]
    vaex_df = vaex_df[vaex_df['column_e'].str.startswith('a')]
    max_df = vaex_df.groupby(
        by='column_d',
        agg={
            'column_b': vaex.agg.max,
        })
    max_df = max_df.to_pandas_df(column_names=['column_d', 'column_b'])
    max_df.index = max_df['column_d']
    max_sr = max_df['column_b']
    return max_sr

パターン3の各計算処理の追加

column_e でabという文字列を含む行のみにスライス -> スライス後の行数を算出するパターンのPandas・Dask・Vaexの処理を書いていきます。

def calculate_pattern_3_with_pandas_df(pandas_df):
    """
    3つ目のパターンの計算を、Pandasのデータフレームで行う。

    Parameters
    ----------
    pandas_df : pd.DataFrame
        対象のPandasのデータフレーム。

    Returns
    -------
    row_count : int
        算出結果の行数。
    """
    pandas_df = pandas_df[pandas_df['column_e'].str.contains('ab')]
    row_count = len(pandas_df)
    return row_count


def calculate_pattern_3_with_dask_df(dask_df):
    """
    3つ目のパターンの計算を、Daskのデータフレームで行う。

    Parameters
    ----------
    dask_df : dd.DataFrame
        対象のDaskのデータフレーム。

    Returns
    -------
    row_count : int
        算出結果の行数。
    """
    dask_df = dask_df[dask_df['column_e'].str.contains('ab')]
    row_count = len(dask_df)
    return row_count


def calculate_pattern_3_with_vaex_df(vaex_df):
    """
    3つ目のパターンの計算を、Vaexのデータフレームで行う。

    Parameters
    ----------
    vaex_df : vaex.dataframe.DataFrame
        対象のVaexのデータフレーム。

    Returns
    -------
    row_count : int
        算出結果の行数。
    """
    vaex_df = vaex_df[vaex_df['column_e'].str.contains('ab')]
    row_count = len(vaex_df)
    return row_count

各フォーマットごとの読み込み処理の追加

各フォーマットごとのデータの読み込み処理を書いていきます。

def read_csv_no_compression_pandas_df():
    """
    未圧縮のCSVデータからPandasのデータフレームを読み込む。

    Returns
    -------
    pandas_df : pd.DataFrame
        読み込まれたPandasのデータフレーム。
    """
    pandas_df = pd.read_csv(FP.CSV_NO_COMPRESSION)
    return pandas_df


def read_csv_no_compression_dask_df():
    """
    未圧縮のCSVデータからDaskのデータフレームを読み込む。

    Returns
    -------
    dask_df : dd.DataFrame
        読み込まれたDaskのデータフレーム。
    """
    dask_df = dd.read_csv(FP.CSV_NO_COMPRESSION)
    return dask_df


def read_csv_no_compression_vaex_df():
    """
    未圧縮のCSVデータからVaexのデータフレームを読み込む。

    Returns
    -------
    vaex_df : vaex.dataframe.DataFrame
        読み込まれたVaexのデータフレーム。
    """
    vaex_df = vaex.from_csv(FP.CSV_NO_COMPRESSION, copy_index=False)
    return vaex_df


def read_csv_gzip_pandas_df():
    """
    gzip圧縮されたCSVデータからPandasのデータフレームを読み込む。

    Returns
    -------
    pandas_df : pd.DataFrame
        読み込まれたPandasのデータフレーム。
    """
    pandas_df = pd.read_csv(FP.CSV_GZIP, compression='gzip')
    return pandas_df


def read_csv_gzip_dask_df():
    """
    gzip圧縮されたCSVデータからDaskのデータフレームを読み込む。

    Returns
    -------
    dask_df : dd.DataFrame
        読み込まれたDaskのデータフレーム。
    """
    dask_df = dd.read_csv(
        FP.CSV_GZIP, compression='gzip', blocksize=None)
    return dask_df


def read_csv_gzip_vaex_df():
    """
    gzip圧縮されたCSVデータからVaexのデータフレームを読み込む。

    Returns
    -------
    vaex_df : vaex.dataframe.DataFrame
        読み込まれたVaexのデータフレーム。
    """
    vaex_df = vaex.from_csv(
        FP.CSV_GZIP, compression='gzip', copy_index=False)
    return vaex_df


def read_parquet_no_compression_pandas_df():
    """
    未圧縮のParquetデータからPandasのデータフレームを読み込む。

    Returns
    -------
    pandas_df : pd.DataFrame
        読み込まれたPandasのデータフレーム。
    """
    pandas_df = pd.read_parquet(FP.PARQUET_NO_COMPRESSION)
    return pandas_df


def read_parquet_no_compression_dask_df():
    """
    未圧縮のParquetのデータからDaskのデータフレームを読み込む。

    Returns
    -------
    dask_df : dd.DataFrame
        読み込まれたDaskのデータフレーム。
    """
    dask_df = dd.read_parquet(FP.PARQUET_NO_COMPRESSION)
    return dask_df


def read_parquet_no_compression_vaex_df():
    """
    未圧縮のParquetのデータからVaexのデータフレームを読み込む。

    Returns
    -------
    vaex_df : vaex.dataframe.DataFrame
        読み込まれたVaexのデータフレーム。
    """
    vaex_df = vaex.open(FP.PARQUET_NO_COMPRESSION, copy_index=False)
    return vaex_df


def read_parquet_snappy_pandas_df():
    """
    Snappy圧縮されたParquetのデータからPandasのデータフレームを読み込む。

    Returns
    -------
    pandas_df : pd.DataFrame
        読み込まれたPandasのデータフレーム。
    """
    pandas_df = pd.read_parquet(FP.PARQUET_SNAPPY)
    return pandas_df


def read_parquet_snappy_dask_df():
    """
    Snappy圧縮されたParquetのデータからDaskのデータフレームを読み込む

    Returns
    -------
    dask_df : dd.DataFrame
        読み込まれたDaskのデータフレーム。
    """
    dask_df = dd.read_parquet(FP.PARQUET_SNAPPY)
    return dask_df


def read_parquet_snappy_vaex_df():
    """
    Snappy圧縮されたParquetのデータからVaexのデータフレームを読み込む

    Returns
    -------
    vaex_df : vaex.dataframe.DataFrame
        読み込まれたVaexのデータフレーム。
    """
    vaex_df = vaex.open(FP.PARQUET_SNAPPY, copy_index=False)
    return vaex_df


def read_parquet_gzip_pandas_df():
    """
    gzip圧縮されたParquetのデータからPandasのデータフレームを読み込む。

    Returns
    -------
    pandas_df : pd.DataFrame
        読み込まれたPandasのデータフレーム。
    """
    pandas_df = pd.read_parquet(FP.PARQUET_GZIP)
    return pandas_df


def read_parquet_gzip_dask_df():
    """
    gzip圧縮されたParquetのデータからDaskのデータフレームを読み込む

    Returns
    -------
    dask_df : dd.DataFrame
        読み込まれたDaskのデータフレーム。
    """
    dask_df = dd.read_parquet(FP.PARQUET_GZIP)
    return dask_df


def read_parquet_gzip_vaex_df():
    """
    gzip圧縮されたParquetのデータからVaexのデータフレームを読み込む

    Returns
    -------
    vaex_df : vaex.dataframe.DataFrame
        読み込まれたVaexのデータフレーム。
    """
    vaex_df = vaex.open(FP.PARQUET_GZIP, copy_index=False)
    return vaex_df


def read_hdf5_no_compression_pandas_df():
    """
    未圧縮のHDF5データからPandasのデータフレームを読み込む。

    Returns
    -------
    pandas_df : pd.DataFrame
        読み込まれたPandasのデータフレーム。
    """
    pandas_df = pd.read_hdf(FP.HDF5_NO_COMPRESSION_PANDAS, key='data')
    return pandas_df


def read_hdf5_no_compression_dask_df():
    """
    未圧縮のHDF5データからDaskのデータフレームを読み込む。

    Returns
    -------
    dask_df : dd.DataFrame
        読み込まれたDaskのデータフレーム
    """
    dask_df = dd.read_hdf(FP.HDF5_NO_COMPRESSION_DASK, key='data')
    return dask_df


def read_hdf5_no_compression_vaex_df():
    """
    未圧縮のHDF5データからVaexのデータフレームを読み込む。

    Returns
    -------
    vaex_df : vaex.dataframe.DataFrame
        読み込まれたVaexのデータフレーム。
    """
    vaex_df = vaex.open(FP.HDF5_NO_COMPRESSION_VAEX, copy_index=False)
    return vaex_df


def read_hdf5_zlib_pandas_df():
    """
    zlib圧縮されたHDF5のデータからPandasのデータフレームを読み込む。

    Returns
    -------
    pandas_df : pd.DataFrame
        読み込まれたPandasのデータフレーム。
    """
    pandas_df = pd.read_hdf(FP.HDF5_ZLIB_PANDAS, key='data')
    return pandas_df


def read_hdf5_zlib_dask_df():
    """
    zlib圧縮されたHDF5のデータからDaskのデータフレームを読み込む。

    Returns
    -------
    dask_df : dd.DataFrame
        読み込まれたDaskのデータフレーム
    """
    dask_df = dd.read_hdf(FP.HDF5_ZLIB_DASK, key='data')
    return dask_df

書いていて思いましたが、Parquet関係は読み込みの記述がシンプルでいいですね(圧縮の有無や種類による記述の差異やライブラリごとの差異が無い点・同じファイルで対応ができる点など)。

読み込みと集計処理の追加

各読み込み処理と集計処理が追加し終わったので、それらを組み合わせた処理を書いていきます。

1つ1つ組み合わせの関数を書いていくと大分長くなるので、クラスを設けて計測処理などを扱っていきます。

from timeit import timeit


class ReadAndCalcRunner:

    def __init__(self, label, pattern, read_func, calc_func):
        """
        読み込みと計算処理の組み合わせの設定や実行処理を扱うクラス。

        Parameters
        ----------
        label : str
            対象の組み合わせの識別用のラベル。
            例 : csv_no_compression_pandas
        pattern : int
            対象のパターン(1~3)。
        read_func : Callable
            読み込み処理を扱う関数。引数は省略可で、データフレームを変えす
            形式が必要になる。
        calc_func : Callable
            計算処理を扱う関数。第一引数にデータフレームを受け付ける形式の
            ものを指定する必要がある。
        """
        self.label = label
        self.pattern = pattern
        self._read_func = read_func
        self._calc_func = calc_func

    def run(self, n, debug=False):
        """
        読み込みと計算処理を実行する。実行後、 mean_seconds 属性に
        実行の平均秒数(float)が設定される。

        Parameters
        ----------
        n : int
            実行回数。多い方が処理時間の精度が高くなるものの、完了
            するまでが長くなるので注意。
        debug : bool, default False
            デバッグ設定。Trueを指定した場合、計算結果の出力がされる。
        """
        statement = 'df = self._read_func();'
        if not debug:
            statement += 'self._calc_func(df);'
        else:
            statement += 'result = self._calc_func(df); print(result)'
        result_seconds = timeit(
            stmt=statement,
            number=n, globals=locals())
        self.mean_seconds = result_seconds / n

追加したクラスを使って、各組合せの定義をしていきます。

各関数などはルールに沿った名前(フォーマットやライブラリなどを使った名前)になっているので、ループで回して追加します。インデントが多く気になりますが、今回は使い捨てのコードなのでこのまま進めます。

from copy import deepcopy
import sys

this_module = sys.modules[__name__]
FORMATS = (
    'csv_no_compression',
    'csv_gzip',
    'parquet_no_compression',
    'parquet_snappy',
    'parquet_gzip',
    'hdf5_no_compression',
    'hdf5_zlib',
)
LIBS = (
    'pandas',
    'dask',
    'vaex',
)
PATTERNS = (1, 2, 3)
runners = []

# HDF5の圧縮ファイルはVaexではインターフェイスが無いので関数を定義していません。
for format_str in FORMATS:
    for lib_str in LIBS:
        for pattern in PATTERNS:
            label = f'{format_str}_{lib_str}'
            read_func_name = f'read_{format_str}_{lib_str}_df'
            if not hasattr(this_module, read_func_name):
                print(
                    '対象の読み込み処理の関数が見つからないため定義の追加を'
                    f'スキップしました : {read_func_name}')
                continue
            read_func = getattr(this_module, read_func_name)

            calc_func_name = f'calculate_pattern_{pattern}_with_{lib_str}_df'
            if not hasattr(this_module, calc_func_name):
                print(
                    '対象の計算処理の関数が見つからないため定義の追加を'
                    f'スキップしました : {calc_func_name}')
                continue
            calc_func = getattr(this_module, calc_func_name)

            runners.append(
                ReadAndCalcRunner(
                    label=label,
                    pattern=pattern,
                    read_func=read_func,
                    calc_func=calc_func)
            )

これで60件の組み合わせ(ファイルフォーマット7種類 × ライブラリ3種類 × 計算パターン3種類 - Vaexで対応していないHDFの圧縮フォーマット3件 = 60件)の定義が追加されました。

また、各パターンごとの処理時間を保持するシリーズの取得処理も追加しておきます。

def run_and_get_result_time_sr(runners, pattern, n):
    """
    指定されたパターンの各計測処理を実行し、結果の各秒数の値を格納した
    シリーズを取得する。

    Parameters
    ----------
    runners : list of ReadAndCalcRunner
        実行処理の定義などを保持するインスタンスのリスト。
    pattern : int
        実行するパターン(1~3)。
    n : int
        実行回数。多い方が処理時間の精度が高くなるものの、完了
        するまでが長くなるので注意。

    Returns
    -------
    sr : pd.Series
        計測結果を格納したシリーズ。インデックスには各フォーマットと
        ライブラリ識別用のラベルとパターンの連結された文字列が設定され、
        値に秒数が設定される。
    """
    runners = deepcopy(runners)
    data_dict = {}
    for runner in runners:
        if runner.pattern != pattern:
            continue
        label = f'{runner.label}_{pattern}'
        runner.run(n=n)
        data_dict[label] = runner.mean_seconds
    sr = pd.Series(data=data_dict)
    return sr

処理時間の可視化の処理の追加

先ほど追加した、各パターンでの集計時間のシリーズの内容をプロットする処理を追加していきます。

def plot_time_info(time_sr, patten, print_sr = True):
    """
    処理時間のプロットを行う。

    Parameters
    ----------
    time_sr : pd.Series
        処理時間を格納したシリーズ。
    patten : int
        対象の集計処理のパターン(1~3)。
    print_sr : bool, default True
        プロット内容のシリーズを出力して表示するかどうか。
    """
    sr = time_sr.copy()
    sr.sort_values(inplace=True, ascending=False)
    if print_sr:
        print(sr.sort_values(ascending=True))
    title = f'Read and calculation seconds (pattern: {patten})'
    ax = sr.plot(kind='barh', figsize=(10, len(sr) // 2), title=title)
    ax.xaxis.set_major_formatter(FormatStrFormatter('%.3f'))
    plt.show()

各処理を一通り流すための処理の追加

記述がシンプルになるように、各処理(保存・読み込み・集計・可視化)を一気に処理するための関数を追加しておきます。

def run_overall(row_num, n, unit, pattern, save_data=True):
    """
    対象のパターンに対して、各処理(保存・読み込み・集計・可視化など)を
    全体的に実行する。

    Parameters
    ----------
    row_num : int
        対象とするデータの行数。
    n : int
        読み込みと計算の実行回数。多い方が処理時間が正確になるものの、
        完了するまで長時間必要となるので注意。
    unit : PlotFileSizeUnit
        ファイルサイズの単位。
    pattern : int
        対象とする計算のパターン(1~3)。
    save_data : bool, default True
        データの保存処理を行うかどうか。別のパターンの計算実行でデータを
        使いまわす場合などにはFalseを指定する(余分な保存処理やプロットが
        スキップされる)。
    """
    if save_data:
        pandas_df = make_basic_pandas_df(row_num=row_num, char_num=20)
        save_each_format(pandas_df=pandas_df)
        disk_size_info_sr = get_disk_size_info_sr()
        plot_file_size_info(sr=disk_size_info_sr, unit=unit)
        plot_file_size_info(sr=disk_size_info_sr, unit=unit, exclude_hdf5=True)

    time_sr = run_and_get_result_time_sr(
        runners=runners, pattern=pattern, n=n)
    plot_time_info(time_sr=time_sr, patten=pattern)

実際に実行してみる

ここまで(予想以上に)大分長くなった感じですが、やっとこ実行と可視化などをやっていきます。この節では500行5列の小さいデータが対象です。

まずは保存処理とパターン1のものから流していきます。

run_overall(
    row_num=500, n=20, unit=PlotFileSizeUnit.KILO_BYTES,
    pattern=1, save_data=True)
2021-01-11 10:50:52.504175 未圧縮のCSVの保存を開始...
2021-01-11 10:50:52.506491 gzip圧縮されたCSVの保存を開始...
2021-01-11 10:50:52.509357 未圧縮のParquetの保存を開始...
2021-01-11 10:50:52.511121 Snappy圧縮されたParquetの保存を開始...
2021-01-11 10:50:52.512792 gzip圧縮されたParquetの保存を開始...
2021-01-11 10:50:52.516839 未圧縮のPandasのHDF5の保存を開始...
2021-01-11 10:50:52.525298 未圧縮のDaskのHDF5の保存を開始...
2021-01-11 10:50:52.530746 未圧縮のVaexのHDF5の保存を開始...
2021-01-11 10:50:52.592227 zlib圧縮されたPandasのHDF5の保存を開始...
2021-01-11 10:50:52.602412 zlib圧縮されたDaskのHDF5の保存を開始...
2021-01-11 10:50:52.608369 保存処理が完了。

保存処理はこの規模のファイルだとどれも一瞬です。

ディスクサイズ :

csv_gzip                    13.698
parquet_gzip                17.913
csv_no_compression          20.671
parquet_snappy              22.427
parquet_no_compression      26.952
hdf5_zlib                 1084.923
hdf5_no_compression       1091.984

image.png

各フォーマットのディスクサイズはHDF5関係がかなり大きくなりました。これはデータ云々というよりもHDF5自体の最低限必要なサイズが絡んでいそうな印象です(そのためこの規模のデータだとHDF5で圧縮指定をしてもほぼサイズが変わりません)。

このことから小さいデータではHDF5は不向きということが分かります。実際に階層データという性質上、一つのファイルに多くのデータを入れる形のフォーマットとなっているので違和感は無いといえば無い形になります。

見づらいのでHDF5を除いたプロットも確認してみます。

image.png

全体的にParquetよりもCSVの方が優秀な数値が出ています。Snappy圧縮をした場合でも生のCSVにParquetが負けています。

これは、Parquet自体が「小さいファイルでは向いていないので避けるべき」ということが他の記事やスライドなどでも言われており、こういったケースではParquetの利用はディスクサイズ的な見地からすると向いていません。

image.png

The Parquet Format and Performance Optimization Opportunitiesの25スライド目から引用。

つまりS3のデータレイク的なところにFirehoseやらFluentdなどで数分単位などの短いスパンで小さいファイルを保存する場合、パフォーマンスやディスクサイズ削減のためにParquetを選択した場合CSVなどに対して逆効果になり得ます。FirehoseなどだとParquet変換を設定すると余計にAWSのコストが増えるので注意が必要です。対象とするデータセットとファイル分割数(どのくらいの頻度でデータを保存するのか)などを加味してCSVにするのかParquetにするのかなどを選択すると良さそうです。

パターン1の処理時間 :

読み込みと計算の処理時間を見ていきます。まずはパターン1の計算(シンプルな読み込みとスライス・合計値の算出)です。

csv_no_compression_pandas_1        0.002111
csv_gzip_pandas_1                  0.002147
parquet_no_compression_pandas_1    0.002332
parquet_gzip_pandas_1              0.002439
parquet_snappy_pandas_1            0.002447
parquet_snappy_vaex_1              0.003457
parquet_gzip_vaex_1                0.003632
parquet_no_compression_vaex_1      0.003653
hdf5_no_compression_vaex_1         0.005135
csv_no_compression_vaex_1          0.006282
csv_gzip_vaex_1                    0.006520
hdf5_no_compression_pandas_1       0.006611
hdf5_zlib_pandas_1                 0.007772
csv_gzip_dask_1                    0.009453
csv_no_compression_dask_1          0.009640
parquet_gzip_dask_1                0.010723
parquet_no_compression_dask_1      0.010762
parquet_snappy_dask_1              0.010876
hdf5_zlib_dask_1                   0.017010
hdf5_no_compression_dask_1         0.017790

image.png

CSV + Pandasの組み合わせが速いことが分かります。逆にDaskは遅めです。特にHDF5とDaskの組み合わせが遅くなっています。

VaexもDask程ではありませんが、Pandasと比べると大分遅くなっています。

これはDaskのドキュメントのベストプラクティスのページにもあるように、メモリに収まるような小さいデータであれば、Daskを使うと無駄に並列化や計算グラフ・遅延評価などの面でのオーバーヘッドで逆に遅くなるという点が影響しています。

参考 : Best Practices - Dask documentation

こういった場合は普通にPandasを使っていきましょう。

また、ディスクサイズだけでなく速度に関してもこの規模のデータだとParquetとCSVで大きな差が生まれません(Parquet側が大きく速くなったりはしません)。

パターン2の処理時間 :

今度はパターン2の計算(文字列関係を含んだ少し長めの計算)です。

run_overall(
    row_num=500, n=20, unit=PlotFileSizeUnit.KILO_BYTES,
    pattern=2, save_data=False)
csv_gzip_pandas_2                  0.005518
parquet_snappy_pandas_2            0.005648
parquet_no_compression_pandas_2    0.005668
parquet_no_compression_vaex_2      0.005784
parquet_gzip_pandas_2              0.005845
csv_no_compression_pandas_2        0.005853
parquet_snappy_vaex_2              0.005925
parquet_gzip_vaex_2                0.006170
hdf5_no_compression_vaex_2         0.007094
csv_no_compression_vaex_2          0.008296
csv_gzip_vaex_2                    0.008618
hdf5_no_compression_pandas_2       0.010095
hdf5_zlib_pandas_2                 0.010720
csv_no_compression_dask_2          0.021852
csv_gzip_dask_2                    0.022408
parquet_no_compression_dask_2      0.022504
parquet_snappy_dask_2              0.022828
parquet_gzip_dask_2                0.023470
hdf5_no_compression_dask_2         0.030001
hdf5_zlib_dask_2                   0.030424

image.png

CSVやPandasが優位な点は変わらず、ただしこれくらい処理が多いとVaex + Parquetの組み合わせもPandasに肉薄するレベルなようです。

パターン3の処理時間 :

続いてパターン3の計算(シンプルな読み込みと文字列処理・行数カウントのみの計算)です。

run_overall(
    row_num=500, n=20, unit=PlotFileSizeUnit.KILO_BYTES,
    pattern=3, save_data=False)
csv_no_compression_pandas_3        0.002348
csv_gzip_pandas_3                  0.002382
parquet_no_compression_pandas_3    0.002456
parquet_snappy_pandas_3            0.002553
parquet_no_compression_vaex_3      0.002748
parquet_gzip_pandas_3              0.002815
parquet_gzip_vaex_3                0.002947
parquet_snappy_vaex_3              0.003032
hdf5_no_compression_vaex_3         0.004627
csv_gzip_vaex_3                    0.006253
csv_no_compression_vaex_3          0.006258
hdf5_no_compression_pandas_3       0.006701
hdf5_zlib_pandas_3                 0.007467
csv_no_compression_dask_3          0.010006
parquet_no_compression_dask_3      0.011430
parquet_snappy_dask_3              0.011600
parquet_gzip_dask_3                0.011652
csv_gzip_dask_3                    0.012087
hdf5_no_compression_dask_3         0.018044
hdf5_zlib_dask_3                   0.018257

image.png

こちらはPandasやCSVが優秀な結果となりました。Vaexは文字列操作系が速いといっても、処理やデータがかなり少ない場合は普通にPandas(+ ParquetではなくCSV)の方が優秀な結果となるようです。

2000行・5列のファイルでの比較

少しデータを増やして2000行で試してみます。

run_overall(
    row_num=2000, n=20, unit=PlotFileSizeUnit.KILO_BYTES,
    pattern=1, save_data=True)
2021-01-11 10:56:19.629120 未圧縮のCSVの保存を開始...
2021-01-11 10:56:19.635464 gzip圧縮されたCSVの保存を開始...
2021-01-11 10:56:19.646662 未圧縮のParquetの保存を開始...
2021-01-11 10:56:19.649773 Snappy圧縮されたParquetの保存を開始...
2021-01-11 10:56:19.652469 gzip圧縮されたParquetの保存を開始...
2021-01-11 10:56:19.663905 未圧縮のPandasのHDF5の保存を開始...
2021-01-11 10:56:19.672032 未圧縮のDaskのHDF5の保存を開始...
2021-01-11 10:56:19.678898 未圧縮のVaexのHDF5の保存を開始...
2021-01-11 10:56:19.692107 zlib圧縮されたPandasのHDF5の保存を開始...
2021-01-11 10:56:19.702695 zlib圧縮されたDaskのHDF5の保存を開始...
2021-01-11 10:56:19.711186 保存処理が完了。
csv_gzip                    53.358
parquet_gzip                59.419
parquet_snappy              77.077
csv_no_compression          82.433
parquet_no_compression      94.157
hdf5_zlib                 1131.977
hdf5_no_compression       1186.488

image.png

image.png

まずはディスクサイズですが、未圧縮CSVに圧縮済みParquetが負けるといったことはこの規模では無くなるようです。ただし同じ圧縮フォーマットであればまだCSVの方が優秀です。

parquet_snappy_pandas_1            0.002602
parquet_no_compression_pandas_1    0.002694
csv_no_compression_pandas_1        0.003224
parquet_gzip_pandas_1              0.003239
parquet_no_compression_vaex_1      0.003570
csv_gzip_pandas_1                  0.003762
parquet_snappy_vaex_1              0.003798
parquet_gzip_vaex_1                0.004458
hdf5_no_compression_vaex_1         0.005106
hdf5_no_compression_pandas_1       0.006921
hdf5_zlib_pandas_1                 0.008155
parquet_snappy_dask_1              0.011113
parquet_no_compression_dask_1      0.011172
csv_no_compression_dask_1          0.011765
parquet_gzip_dask_1                0.012178
csv_gzip_dask_1                    0.012792
csv_no_compression_vaex_1          0.015389
csv_gzip_vaex_1                    0.015665
hdf5_no_compression_dask_1         0.017919
hdf5_zlib_dask_1                   0.018418

image.png

パターン1ではPandas + Parquetの組み合わせがCSVよりも速くなってきました。また、Snappy圧縮されていても未圧縮のケースとほぼ同じレベルとなっているようです。

続いてパターン2を見ていきます。

run_overall(
    row_num=2000, n=20, unit=PlotFileSizeUnit.KILO_BYTES,
    pattern=2, save_data=False)

image.png

parquet_snappy_vaex_2              0.006218
parquet_no_compression_vaex_2      0.006335
parquet_gzip_vaex_2                0.006521
parquet_no_compression_pandas_2    0.006702
parquet_snappy_pandas_2            0.006715
parquet_gzip_pandas_2              0.007106
csv_no_compression_pandas_2        0.007148
hdf5_no_compression_vaex_2         0.007222
csv_gzip_pandas_2                  0.007696
hdf5_no_compression_pandas_2       0.011027
hdf5_zlib_pandas_2                 0.011480
csv_no_compression_vaex_2          0.016811
csv_gzip_vaex_2                    0.017303
parquet_snappy_dask_2              0.023846
csv_no_compression_dask_2          0.024046
parquet_gzip_dask_2                0.024549
parquet_no_compression_dask_2      0.024705
csv_gzip_dask_2                    0.025683
hdf5_zlib_dask_2                   0.031773
hdf5_no_compression_dask_2         0.031882

image.png

上位がParquetもしくはHDF5のVaexが目立ってきました。CSV + Vaexの組み合わせが遅いのは、VaexのdocstringにCSVの読み込みはPandasを使って行われると書かれてあり、CSVを使った場合は速くならないとよく見かけるので違和感はありません。

続いてパターン3です。

run_overall(
    row_num=2000, n=20, unit=PlotFileSizeUnit.KILO_BYTES,
    pattern=3, save_data=False)
parquet_snappy_vaex_3              0.003102
parquet_no_compression_vaex_3      0.003210
parquet_snappy_pandas_3            0.003282
parquet_no_compression_pandas_3    0.003468
parquet_gzip_vaex_3                0.003671
parquet_gzip_pandas_3              0.003808
csv_no_compression_pandas_3        0.003832
csv_gzip_pandas_3                  0.004321
hdf5_no_compression_vaex_3         0.004826
hdf5_no_compression_pandas_3       0.007564
hdf5_zlib_pandas_3                 0.008139
parquet_snappy_dask_3              0.012079
csv_no_compression_dask_3          0.012595
parquet_gzip_dask_3                0.012903
parquet_no_compression_dask_3      0.013000
csv_gzip_dask_3                    0.013813
csv_no_compression_vaex_3          0.014868
csv_gzip_vaex_3                    0.015253
hdf5_no_compression_dask_3         0.019515
hdf5_zlib_dask_3                   0.019784

image.png

こちらもシンプルな計算であるにも関わらず、文字列関係が絡む影響なのかVaexが上位で目立ちます。とはいえPandasも肉薄しています。
他の特徴として、上位がParquetが目立ってきました。

5000行・5列のファイルでの比較

続いて5000行で試します。

run_overall(
    row_num=5000, n=20, unit=PlotFileSizeUnit.KILO_BYTES,
    pattern=1, save_data=True)
2021-01-11 11:00:51.745293 未圧縮のCSVの保存を開始...
2021-01-11 11:00:51.759650 gzip圧縮されたCSVの保存を開始...
2021-01-11 11:00:51.785199 未圧縮のParquetの保存を開始...
2021-01-11 11:00:51.788939 Snappy圧縮されたParquetの保存を開始...
2021-01-11 11:00:51.792390 gzip圧縮されたParquetの保存を開始...
2021-01-11 11:00:51.819260 未圧縮のPandasのHDF5の保存を開始...
2021-01-11 11:00:51.827838 未圧縮のDaskのHDF5の保存を開始...
2021-01-11 11:00:51.836513 未圧縮のVaexのHDF5の保存を開始...
2021-01-11 11:00:51.849884 zlib圧縮されたPandasのHDF5の保存を開始...
2021-01-11 11:00:51.862147 zlib圧縮されたDaskのHDF5の保存を開始...
2021-01-11 11:00:51.875275 保存処理が完了。
csv_gzip                   132.612
parquet_gzip               142.983
parquet_snappy             186.526
csv_no_compression         206.247
parquet_no_compression     226.477
hdf5_zlib                 1226.921
hdf5_no_compression       1375.496

image.png

ディスクサイズはこの行数でもまだ同一の圧縮フォーマットであればParquetよりもCSVの方が優秀なようです。

parquet_snappy_pandas_1            0.003580
parquet_no_compression_vaex_1      0.003808
parquet_snappy_vaex_1              0.003830
parquet_no_compression_pandas_1    0.003854
parquet_gzip_pandas_1              0.004526
parquet_gzip_vaex_1                0.005253
hdf5_no_compression_vaex_1         0.005374
csv_no_compression_pandas_1        0.005457
csv_gzip_pandas_1                  0.006872
hdf5_no_compression_pandas_1       0.007761
hdf5_zlib_pandas_1                 0.008421
parquet_no_compression_dask_1      0.012125
parquet_snappy_dask_1              0.012637
parquet_gzip_dask_1                0.013911
csv_no_compression_dask_1          0.017265
csv_gzip_dask_1                    0.019980
hdf5_no_compression_dask_1         0.020944
hdf5_zlib_dask_1                   0.022355
csv_no_compression_vaex_1          0.033170
csv_gzip_vaex_1                    0.034185

image.png

パターン1の速度は上位がPandas + ParquetもしくはVaex + Parquetという組み合わせとなりました。
また、最下位2つがVaex + CSVという組み合わせになっています。VaexのCSVの苦手感が出ていますね。

続いてパターン2です。

run_overall(
    row_num=5000, n=20, unit=PlotFileSizeUnit.KILO_BYTES,
    pattern=2, save_data=False)
parquet_no_compression_vaex_2      0.006076
parquet_snappy_vaex_2              0.006594
hdf5_no_compression_vaex_2         0.007164
parquet_gzip_vaex_2                0.007509
parquet_snappy_pandas_2            0.008118
parquet_no_compression_pandas_2    0.008377
parquet_gzip_pandas_2              0.010168
csv_no_compression_pandas_2        0.010611
csv_gzip_pandas_2                  0.011410
hdf5_no_compression_pandas_2       0.012331
hdf5_zlib_pandas_2                 0.013154
parquet_snappy_dask_2              0.025350
parquet_no_compression_dask_2      0.025814
parquet_gzip_dask_2                0.027304
csv_no_compression_dask_2          0.031233
csv_gzip_dask_2                    0.033282
csv_no_compression_vaex_2          0.034763
hdf5_no_compression_dask_2         0.035235
csv_gzip_vaex_2                    0.035323
hdf5_zlib_dask_2                   0.036155

image.png

上位がCSV以外のVaexが占める形となりました。

続いてパターン3です。

run_overall(
    row_num=5000, n=20, unit=PlotFileSizeUnit.KILO_BYTES,
    pattern=3, save_data=False)
parquet_no_compression_vaex_3      0.003913
parquet_snappy_vaex_3              0.003922
parquet_snappy_pandas_3            0.005204
parquet_no_compression_pandas_3    0.005261
parquet_gzip_vaex_3                0.005454
hdf5_no_compression_vaex_3         0.005745
parquet_gzip_pandas_3              0.006408
csv_no_compression_pandas_3        0.007710
csv_gzip_pandas_3                  0.008557
hdf5_no_compression_pandas_3       0.009136
hdf5_zlib_pandas_3                 0.010429
parquet_snappy_dask_3              0.013971
parquet_no_compression_dask_3      0.014509
parquet_gzip_dask_3                0.015864
csv_no_compression_dask_3          0.020176
csv_gzip_dask_3                    0.022093
hdf5_no_compression_dask_3         0.022922
hdf5_zlib_dask_3                   0.024209
csv_no_compression_vaex_3          0.032915
csv_gzip_vaex_3                    0.034003

image.png

こちらもParquet + Vaexの組み合わせが速い結果となっています。

1万行・5列のファイルでの比較

run_overall(
    row_num=10_000, n=20, unit=PlotFileSizeUnit.KILO_BYTES,
    pattern=1, save_data=True)
2021-01-11 11:05:12.979342 未圧縮のCSVの保存を開始...
2021-01-11 11:05:13.004653 gzip圧縮されたCSVの保存を開始...
2021-01-11 11:05:13.054621 未圧縮のParquetの保存を開始...
2021-01-11 11:05:13.060265 Snappy圧縮されたParquetの保存を開始...
2021-01-11 11:05:13.065405 gzip圧縮されたParquetの保存を開始...
2021-01-11 11:05:13.119761 未圧縮のPandasのHDF5の保存を開始...
2021-01-11 11:05:13.129005 未圧縮のDaskのHDF5の保存を開始...
2021-01-11 11:05:13.141110 未圧縮のVaexのHDF5の保存を開始...
2021-01-11 11:05:13.155398 zlib圧縮されたPandasのHDF5の保存を開始...
2021-01-11 11:05:13.170231 zlib圧縮されたDaskのHDF5の保存を開始...
2021-01-11 11:05:13.190747 保存処理が完了。
csv_gzip                   264.581
parquet_gzip               281.168
parquet_snappy             366.227
csv_no_compression         412.415
parquet_no_compression     439.441
hdf5_zlib                 1385.012
hdf5_no_compression       1690.528

image.png

肉薄していますが、いまだParquetがCSV.gzには勝てていないようです。このくらいの規模になるとParquetの方が有利になると考えていましたが、そうでも無いのでしょうか・・・?ランダムな文字列のカラムを含んでいる影響とかでしょうか・・・?(似ている数字ばかりのカラムで構成されていればParquetの方がもっとぐぐっと有利になる等?)

parquet_no_compression_vaex_1      0.004094
parquet_snappy_vaex_1              0.004115
parquet_snappy_pandas_1            0.004121
parquet_no_compression_pandas_1    0.004316
hdf5_no_compression_vaex_1         0.005096
parquet_gzip_pandas_1              0.006430
parquet_gzip_vaex_1                0.006769
csv_no_compression_pandas_1        0.008503
hdf5_no_compression_pandas_1       0.008716
hdf5_zlib_pandas_1                 0.009722
csv_gzip_pandas_1                  0.011210
parquet_snappy_dask_1              0.012788
parquet_no_compression_dask_1      0.013130
parquet_gzip_dask_1                0.016297
csv_no_compression_dask_1          0.021566
hdf5_no_compression_dask_1         0.024418
csv_gzip_dask_1                    0.026689
hdf5_zlib_dask_1                   0.027605
csv_no_compression_vaex_1          0.060481
csv_gzip_vaex_1                    0.062800

image.png

パターン1の速度面では大分上位がParquetが占める形になり、CSVの多くが下の方になってきました。

続いてパターン2です。

run_overall(
    row_num=10_000, n=20, unit=PlotFileSizeUnit.KILO_BYTES,
    pattern=2, save_data=False)
parquet_no_compression_vaex_2      0.006648
parquet_snappy_vaex_2              0.006972
hdf5_no_compression_vaex_2         0.007346
parquet_gzip_vaex_2                0.009408
parquet_no_compression_pandas_2    0.010664
parquet_snappy_pandas_2            0.011815
parquet_gzip_pandas_2              0.013018
hdf5_no_compression_pandas_2       0.014761
csv_no_compression_pandas_2        0.015061
csv_gzip_pandas_2                  0.016204
hdf5_zlib_pandas_2                 0.017293
parquet_no_compression_dask_2      0.028120
parquet_snappy_dask_2              0.028369
parquet_gzip_dask_2                0.032319
csv_no_compression_dask_2          0.035587
csv_gzip_dask_2                    0.040959
hdf5_no_compression_dask_2         0.041060
hdf5_zlib_dask_2                   0.044385
csv_no_compression_vaex_2          0.062479
csv_gzip_vaex_2                    0.065195

image.png

Vaex + Parquetが速いという結果のままのようです。

続いてパターン3です。

run_overall(
    row_num=10_000, n=20, unit=PlotFileSizeUnit.KILO_BYTES,
    pattern=3, save_data=False)
parquet_snappy_vaex_3              0.005270
parquet_no_compression_vaex_3      0.005497
hdf5_no_compression_vaex_3         0.006828
parquet_gzip_vaex_3                0.008125
parquet_no_compression_pandas_3    0.008216
parquet_snappy_pandas_3            0.008459
parquet_gzip_pandas_3              0.010360
hdf5_no_compression_pandas_3       0.011945
csv_no_compression_pandas_3        0.012204
csv_gzip_pandas_3                  0.013290
hdf5_zlib_pandas_3                 0.013831
parquet_snappy_dask_3              0.017223
parquet_no_compression_dask_3      0.017663
parquet_gzip_dask_3                0.020585
csv_no_compression_dask_3          0.026153
hdf5_no_compression_dask_3         0.029112
csv_gzip_dask_3                    0.029944
hdf5_zlib_dask_3                   0.031420
csv_no_compression_vaex_3          0.061439
csv_gzip_vaex_3                    0.063572

image.png

こちらも5000行のときとそこまで傾向は変わらずといった感じのようです。

5万行・5列のファイルでの比較

段々と行数が増えてきました。続いて5万行のファイルです。時間が少しかかってくるので、実行回数(引数n)をここから減らしたりもしていきます。

run_overall(
    row_num=50_000, n=10, unit=PlotFileSizeUnit.KILO_BYTES,
    pattern=1, save_data=True)
2021-01-11 11:08:51.857375 未圧縮のCSVの保存を開始...
2021-01-11 11:08:51.984150 gzip圧縮されたCSVの保存を開始...
2021-01-11 11:08:52.236012 未圧縮のParquetの保存を開始...
2021-01-11 11:08:52.253600 Snappy圧縮されたParquetの保存を開始...
2021-01-11 11:08:52.272157 gzip圧縮されたParquetの保存を開始...
2021-01-11 11:08:52.519038 未圧縮のPandasのHDF5の保存を開始...
2021-01-11 11:08:52.535660 未圧縮のDaskのHDF5の保存を開始...
2021-01-11 11:08:52.578888 未圧縮のVaexのHDF5の保存を開始...
2021-01-11 11:08:52.600329 zlib圧縮されたPandasのHDF5の保存を開始...
2021-01-11 11:08:52.639769 zlib圧縮されたDaskのHDF5の保存を開始...
2021-01-11 11:08:52.721034 保存処理が完了。
csv_gzip                  1320.048
parquet_gzip              1343.917
parquet_snappy            1744.180
parquet_no_compression    2021.223
csv_no_compression        2062.447
hdf5_zlib                 2649.801
hdf5_no_compression       4210.736

image.png

zlib圧縮した場合など、HDF5のサイズが大分他のフォーマットの数値に近づいてきました。

CSVとParquetはほぼ同じ数値になっています。

hdf5_no_compression_vaex_1         0.005502
parquet_no_compression_vaex_1      0.006683
parquet_snappy_vaex_1              0.007091
parquet_snappy_pandas_1            0.011606
parquet_no_compression_pandas_1    0.011925
hdf5_no_compression_pandas_1       0.014885
parquet_gzip_vaex_1                0.018865
hdf5_zlib_pandas_1                 0.021611
parquet_snappy_dask_1              0.022349
parquet_gzip_pandas_1              0.022426
parquet_no_compression_dask_1      0.023482
csv_no_compression_pandas_1        0.030946
parquet_gzip_dask_1                0.039596
csv_gzip_pandas_1                  0.041766
csv_no_compression_dask_1          0.045717
hdf5_no_compression_dask_1         0.056825
csv_gzip_dask_1                    0.061265
hdf5_zlib_dask_1                   0.068667
csv_no_compression_vaex_1          0.282166
csv_gzip_vaex_1                    0.293648

image.png

パターン1の速度は1万行のときはPandasが上位を占めていましたが、5万行になってくるとVaexが占める形となってきました。また、前までParquetが占めていたのに対してHDF5がトップに上がってきました。

トップのHDF5 + Vaexと最下位のCSV + Vaexの差がかなり顕著になってきています。

続いてパターン2です。

run_overall(
    row_num=50_000, n=10, unit=PlotFileSizeUnit.KILO_BYTES,
    pattern=2, save_data=False)
hdf5_no_compression_vaex_2         0.007800
parquet_no_compression_vaex_2      0.009540
parquet_snappy_vaex_2              0.010558
parquet_gzip_vaex_2                0.022170
parquet_snappy_pandas_2            0.026817
parquet_no_compression_pandas_2    0.028118
hdf5_no_compression_pandas_2       0.030404
hdf5_zlib_pandas_2                 0.036779
parquet_gzip_pandas_2              0.039773
parquet_snappy_dask_2              0.047335
csv_no_compression_pandas_2        0.048013
parquet_no_compression_dask_2      0.049952
csv_gzip_pandas_2                  0.060180
parquet_gzip_dask_2                0.065872
csv_no_compression_dask_2          0.070893
hdf5_no_compression_dask_2         0.084020
csv_gzip_dask_2                    0.085232
hdf5_zlib_dask_2                   0.093619
csv_no_compression_vaex_2          0.286849
csv_gzip_vaex_2                    0.299390

image.png

大分Vaex + HDF5の速さが目立ってくるようになってきました。Pandasで一番速いケースと比べて3倍強程度の速度差が出てきています。

また、Vaexはこの規模になるとHDF5が一番得意な挙動となるようですが、PandasなどはHDF5よりもParquetの方が良い成績が出ているのが特徴的です。また、Snappy圧縮しても各ライブラリで未圧縮時と比べてかなり小さい影響しか出ていないことが分かります。

HDF5 + Vaexの組み合わせが速いものの、Parquet + Vaexの組み合わせも良い成績になっています。

続いてパターン3です。

run_overall(
    row_num=50_000, n=10, unit=PlotFileSizeUnit.KILO_BYTES,
    pattern=3, save_data=False)
hdf5_no_compression_vaex_3         0.014123
parquet_no_compression_vaex_3      0.014571
parquet_snappy_vaex_3              0.015824
parquet_gzip_vaex_3                0.027412
parquet_snappy_pandas_3            0.030680
parquet_no_compression_pandas_3    0.031236
hdf5_no_compression_pandas_3       0.034395
hdf5_zlib_pandas_3                 0.040794
parquet_snappy_dask_3              0.042222
parquet_no_compression_dask_3      0.043285
parquet_gzip_pandas_3              0.043325
csv_no_compression_pandas_3        0.051075
parquet_gzip_dask_3                0.059660
csv_gzip_pandas_3                  0.062562
csv_no_compression_dask_3          0.065778
hdf5_no_compression_dask_3         0.078580
csv_gzip_dask_3                    0.080105
hdf5_zlib_dask_3                   0.087821
csv_no_compression_vaex_3          0.291779
csv_gzip_vaex_3                    0.302356

image.png

こちらもPandasの最速のケースと比べてVaexが2倍強程度速いといった形で、順位などは同じような傾向のようです。

10万行・5列のファイルでの比較

run_overall(
    row_num=100_000, n=10, unit=PlotFileSizeUnit.KILO_BYTES,
    pattern=1, save_data=True)
2021-01-11 11:13:13.365703 未圧縮のCSVの保存を開始...
2021-01-11 11:13:13.620238 gzip圧縮されたCSVの保存を開始...
2021-01-11 11:13:14.130993 未圧縮のParquetの保存を開始...
2021-01-11 11:13:14.155798 Snappy圧縮されたParquetの保存を開始...
2021-01-11 11:13:14.184444 gzip圧縮されたParquetの保存を開始...
2021-01-11 11:13:14.657454 未圧縮のPandasのHDF5の保存を開始...
2021-01-11 11:13:14.690242 未圧縮のDaskのHDF5の保存を開始...
2021-01-11 11:13:14.768885 未圧縮のVaexのHDF5の保存を開始...
2021-01-11 11:13:14.797431 zlib圧縮されたPandasのHDF5の保存を開始...
2021-01-11 11:13:14.869557 zlib圧縮されたDaskのHDF5の保存を開始...
2021-01-11 11:13:15.030618 保存処理が完了。
parquet_gzip              2592.230
csv_gzip                  2639.877
parquet_snappy            3365.400
parquet_no_compression    3878.444
csv_no_compression        4123.427
hdf5_zlib                 4230.293
hdf5_no_compression       7361.000

ディスクサイズに関しては10万行でやっとParquetがCSVより良い成績になりました。
また、HDF5のzlib圧縮した場合が未圧縮のCSVと肉薄するくらいになってきました。

image.png

hdf5_no_compression_vaex_1         0.006252
parquet_no_compression_vaex_1      0.009005
parquet_snappy_vaex_1              0.010455
hdf5_no_compression_pandas_1       0.023864
parquet_snappy_pandas_1            0.025336
parquet_no_compression_pandas_1    0.027367
parquet_gzip_vaex_1                0.032729
parquet_snappy_dask_1              0.037711
hdf5_zlib_pandas_1                 0.038161
parquet_no_compression_dask_1      0.041333
parquet_gzip_pandas_1              0.047903
csv_no_compression_pandas_1        0.062740
parquet_gzip_dask_1                0.075457
csv_no_compression_dask_1          0.079175
csv_gzip_pandas_1                  0.085268
hdf5_no_compression_dask_1         0.099241
csv_gzip_dask_1                    0.104104
hdf5_zlib_dask_1                   0.126630
csv_no_compression_vaex_1          0.562856
csv_gzip_vaex_1                    0.583959

image.png

パターン1ではぼちぼちVaexのHDF5とParquetにおける差が出てきました。
文字列が絡まないシンプルな処理ではPandasの最速のパターンの4倍弱くらいの速度がVaex + HDF5の組み合わせで出てきています。

続いてパターン2です。

run_overall(
    row_num=100_000, n=10, unit=PlotFileSizeUnit.KILO_BYTES,
    pattern=2, save_data=False)
hdf5_no_compression_vaex_2         0.008375
parquet_no_compression_vaex_2      0.011477
parquet_snappy_vaex_2              0.014794
parquet_gzip_vaex_2                0.035837
hdf5_no_compression_pandas_2       0.053891
parquet_no_compression_pandas_2    0.054034
parquet_snappy_pandas_2            0.057317
hdf5_zlib_pandas_2                 0.065070
parquet_gzip_pandas_2              0.078063
parquet_no_compression_dask_2      0.082266
parquet_snappy_dask_2              0.085195
csv_no_compression_pandas_2        0.091169
csv_gzip_pandas_2                  0.116742
parquet_gzip_dask_2                0.117286
csv_no_compression_dask_2          0.117380
hdf5_no_compression_dask_2         0.140906
csv_gzip_dask_2                    0.145802
hdf5_zlib_dask_2                   0.160026
csv_no_compression_vaex_2          0.569166
csv_gzip_vaex_2                    0.591414

image.png

HDF5 + Vaexの組み合わせがPandas + HDF5の組み合わせに対して6倍強くらいの速度になってきました。また、PandasでもHDF5の速さが目立ってきました。

そしてCSV + Vaexの遅さがかなり顕著に感じてきました・・・。

続いてパターン3です。

run_overall(
    row_num=100_000, n=10, unit=PlotFileSizeUnit.KILO_BYTES,
    pattern=3, save_data=False)
hdf5_no_compression_vaex_3         0.024104
parquet_no_compression_vaex_3      0.025932
parquet_snappy_vaex_3              0.028616
parquet_gzip_vaex_3                0.049881
hdf5_no_compression_pandas_3       0.066834
parquet_no_compression_pandas_3    0.066970
parquet_snappy_pandas_3            0.068455
hdf5_zlib_pandas_3                 0.077481
parquet_no_compression_dask_3      0.084306
parquet_snappy_dask_3              0.084496
parquet_gzip_pandas_3              0.087239
csv_no_compression_pandas_3        0.104310
parquet_gzip_dask_3                0.114899
csv_no_compression_dask_3          0.117771
csv_gzip_pandas_3                  0.120368
csv_gzip_dask_3                    0.145227
hdf5_no_compression_dask_3         0.145364
hdf5_zlib_dask_3                   0.162844
csv_no_compression_vaex_3          0.586798
csv_gzip_vaex_3                    0.608084

image.png

50万行・5列のファイルでの比較

50万行のファイルに対して試していきます。ここまでの結果から、Vaex + CSVの速度の遅さがかなり気になるレベルになってきているのと、Vaexを使うのならCSVの代わりにParquetかHDF5にするかな・・・という感じになってきたので、計算時間がかかるのも踏まえCSV + Vaexの組み合わせを以下のように実行用のリストを調整して外していきます。

new_runners = []
for runner in runners:
    if 'csv' in runner.label and 'vaex' in runner.label:
        continue
    new_runners.append(runner)
runners = new_runners
run_overall(
    row_num=500_000, n=10, unit=PlotFileSizeUnit.KILO_BYTES,
    pattern=1, save_data=True)
2021-01-11 11:23:47.660002 未圧縮のCSVの保存を開始...
2021-01-11 11:23:48.923959 gzip圧縮されたCSVの保存を開始...
2021-01-11 11:23:51.454028 未圧縮のParquetの保存を開始...
2021-01-11 11:23:51.541757 Snappy圧縮されたParquetの保存を開始...
2021-01-11 11:23:51.642734 gzip圧縮されたParquetの保存を開始...
2021-01-11 11:23:53.860624 未圧縮のPandasのHDF5の保存を開始...
2021-01-11 11:23:53.991259 未圧縮のDaskのHDF5の保存を開始...
2021-01-11 11:23:54.336325 未圧縮のVaexのHDF5の保存を開始...
2021-01-11 11:23:54.416441 zlib圧縮されたPandasのHDF5の保存を開始...
2021-01-11 11:23:54.762570 zlib圧縮されたDaskのHDF5の保存を開始...
2021-01-11 11:23:55.578732 保存処理が完了。
parquet_gzip              11759.944
csv_gzip                  13196.472
parquet_snappy            15549.102
hdf5_zlib                 16642.935
parquet_no_compression    17962.955
csv_no_compression        20616.341
hdf5_no_compression       32563.056

image.png

ここまで来ると流石にParquetの方がぼちぼぢファイルサイズが小さくなってきます。
また、zlibのHDF5がSnappyのParquetなどと同じくらいにまで小さくなってきています。

hdf5_no_compression_vaex_1         0.009741
parquet_no_compression_vaex_1      0.030765
parquet_snappy_vaex_1              0.035542
hdf5_no_compression_pandas_1       0.102294
parquet_no_compression_pandas_1    0.137117
parquet_gzip_vaex_1                0.139221
hdf5_zlib_pandas_1                 0.148760
parquet_snappy_pandas_1            0.150342
parquet_no_compression_dask_1      0.193046
parquet_snappy_dask_1              0.202919
parquet_gzip_pandas_1              0.248371
csv_no_compression_pandas_1        0.309617
csv_no_compression_dask_1          0.338050
parquet_gzip_dask_1                0.344163
csv_gzip_pandas_1                  0.408416
hdf5_no_compression_dask_1         0.444608
csv_gzip_dask_1                    0.458034
hdf5_zlib_dask_1                   0.535816

image.png

パターン1の速度に関してはHDF5 + Vaexの組み合わせがかなり顕著な速さになってきました。
未圧縮のParquet + Vaex、もしくはSnappy圧縮のものと比べて4倍弱くらいの速さになっています。
また、gzipのParquetのVaexとHDF5のVaexを比べると14倍くらいの差になってきています。gzipの遅さが目立ってきましたね。

PandasのHDF5と比べても、VaexのHDF5だと10倍くらいの差になってきています。

run_overall(
    row_num=500_000, n=10, unit=PlotFileSizeUnit.KILO_BYTES,
    pattern=2, save_data=False)
hdf5_no_compression_vaex_2         0.013669
parquet_no_compression_vaex_2      0.033931
parquet_snappy_vaex_2              0.039456
parquet_gzip_vaex_2                0.142346
hdf5_no_compression_pandas_2       0.240121
parquet_no_compression_pandas_2    0.276711
parquet_snappy_pandas_2            0.282710
hdf5_zlib_pandas_2                 0.289592
parquet_no_compression_dask_2      0.346122
parquet_snappy_dask_2              0.353175
parquet_gzip_pandas_2              0.389155
csv_no_compression_pandas_2        0.444285
csv_no_compression_dask_2          0.493776
parquet_gzip_dask_2                0.496309
csv_gzip_pandas_2                  0.554616
hdf5_no_compression_dask_2         0.589466
csv_gzip_dask_2                    0.606128
hdf5_zlib_dask_2                   0.682696

image.png

パターン2の方はVaexのHDF5がPandasよりも18倍といったレベルにまでなってきています。とても速い。

このレベルでもまだDaskは遅い感じですが、Vaexはかなり速くなってきています。

続いてパターン3です。

run_overall(
    row_num=500_000, n=10, unit=PlotFileSizeUnit.KILO_BYTES,
    pattern=3, save_data=False)
hdf5_no_compression_vaex_3         0.095968
parquet_no_compression_vaex_3      0.116657
parquet_snappy_vaex_3              0.120759
parquet_gzip_vaex_3                0.228370
hdf5_no_compression_pandas_3       0.294173
hdf5_zlib_pandas_3                 0.343163
parquet_no_compression_pandas_3    0.345838
parquet_snappy_pandas_3            0.352694
parquet_no_compression_dask_3      0.396586
parquet_snappy_dask_3              0.397329
parquet_gzip_pandas_3              0.453819
csv_no_compression_pandas_3        0.497882
parquet_gzip_dask_3                0.544685
csv_no_compression_dask_3          0.593265
hdf5_no_compression_dask_3         0.627107
csv_gzip_dask_3                    0.646636
csv_gzip_pandas_3                  0.650796
hdf5_zlib_dask_3                   0.721067

image.png

こちらはパターン2と比べてそこまでVaexのHDF5とParquetで差は無いようです。Pandasと比べても3倍差程度のようです。計算処理が多いとVaexが輝いてきそうですね。

100万行・5列のファイルでの比較

run_overall(
    row_num=1_000_000, n=10, unit=PlotFileSizeUnit.KILO_BYTES,
    pattern=1, save_data=True)
2021-01-11 12:11:13.260384 未圧縮のCSVの保存を開始...
2021-01-11 12:11:15.783632 gzip圧縮されたCSVの保存を開始...
2021-01-11 12:11:20.860890 未圧縮のParquetの保存を開始...
2021-01-11 12:11:21.009845 Snappy圧縮されたParquetの保存を開始...
2021-01-11 12:11:21.205758 gzip圧縮されたParquetの保存を開始...
2021-01-11 12:11:25.590414 未圧縮のPandasのHDF5の保存を開始...
2021-01-11 12:11:25.861853 未圧縮のDaskのHDF5の保存を開始...
2021-01-11 12:11:26.538407 未圧縮のVaexのHDF5の保存を開始...
2021-01-11 12:11:26.681708 zlib圧縮されたPandasのHDF5の保存を開始...
2021-01-11 12:11:27.372079 zlib圧縮されたDaskのHDF5の保存を開始...
2021-01-11 12:11:28.995101 保存処理が完了。

ここまで来るとCSV.gzのファイルなどは結構保存時間がかかるようになってきます。一方でSnappyのParquetやVaex経由での未圧縮のHDF5などは結構書き込みも速いなという印象があります(SnappyのParquetもVaexならもっと速くなりそうではあります)。

parquet_gzip              23126.870
csv_gzip                  26390.647
parquet_snappy            30674.674
hdf5_zlib                 32221.201
parquet_no_compression    35467.713
csv_no_compression        41232.579
hdf5_no_compression       64065.632

image.png

hdf5_no_compression_vaex_1         0.015352
parquet_no_compression_vaex_1      0.058410
parquet_snappy_vaex_1              0.066124
hdf5_no_compression_pandas_1       0.203620
parquet_gzip_vaex_1                0.276284
parquet_snappy_pandas_1            0.304232
hdf5_zlib_pandas_1                 0.316301
parquet_no_compression_dask_1      0.369187
parquet_snappy_dask_1              0.374635
parquet_no_compression_pandas_1    0.396081
parquet_gzip_pandas_1              0.508149
csv_no_compression_pandas_1        0.592706
csv_no_compression_dask_1          0.611989
parquet_gzip_dask_1                0.658335
csv_gzip_pandas_1                  0.816507
hdf5_no_compression_dask_1         0.850102
csv_gzip_dask_1                    0.888696
hdf5_zlib_dask_1                   1.185779

image.png

パターン1でも大分HDF5のVaexの速さが目立ちます。2番目と3番目のParquetに関しては、流石にHDF5と比べると大分遅くなってきているものの、Snappy圧縮の有無はそこまで顕著にパフォーマンスには影響していないようです。

run_overall(
    row_num=1_000_000, n=10, unit=PlotFileSizeUnit.KILO_BYTES,
    pattern=2, save_data=False)
hdf5_no_compression_vaex_2         0.020531
parquet_no_compression_vaex_2      0.062161
parquet_snappy_vaex_2              0.069428
parquet_gzip_vaex_2                0.280575
hdf5_no_compression_pandas_2       0.472019
hdf5_zlib_pandas_2                 0.563466
parquet_no_compression_pandas_2    0.670589
parquet_snappy_pandas_2            0.705254
parquet_snappy_dask_2              0.720435
parquet_gzip_pandas_2              0.748364
parquet_no_compression_dask_2      0.837436
parquet_gzip_dask_2                1.061136
csv_gzip_pandas_2                  1.080294
hdf5_no_compression_dask_2         1.142565
csv_no_compression_pandas_2        1.180453
csv_no_compression_dask_2          1.219284
hdf5_zlib_dask_2                   1.320128
csv_gzip_dask_2                    1.794837

image.png

パターン2に関してはもっと顕著で、Pandasの24倍弱といったくらいにまでなっています。

run_overall(
    row_num=1_000_000, n=10, unit=PlotFileSizeUnit.KILO_BYTES,
    pattern=3, save_data=False)
hdf5_no_compression_vaex_3         0.188722
parquet_no_compression_vaex_3      0.239557
parquet_snappy_vaex_3              0.314108
parquet_gzip_vaex_3                0.554921
parquet_no_compression_pandas_3    0.679682
parquet_snappy_pandas_3            0.699997
parquet_no_compression_dask_3      0.739946
hdf5_zlib_pandas_3                 0.741525
hdf5_no_compression_pandas_3       0.748426
parquet_snappy_dask_3              0.756193
csv_no_compression_pandas_3        0.970395
csv_no_compression_dask_3          1.007364
parquet_gzip_pandas_3              1.105683
parquet_gzip_dask_3                1.250151
csv_gzip_pandas_3                  1.267784
csv_gzip_dask_3                    1.338443
hdf5_zlib_dask_3                   1.423993
hdf5_no_compression_dask_3         1.655324

image.png

500万行・5列のファイルでの比較

一気にデータを増やして次は500万行で試してみます。

run_overall(
    row_num=5_000_000, n=5, unit=PlotFileSizeUnit.KILO_BYTES,
    pattern=1, save_data=True)

すると何やらエラーが。

ValueError: array is of length 1048576, while the length of the DataFrame is 5000000

どうやらVaexのParquetのチャンクサイズ周りのバグのようです。

参考 :

比較的新し目のissueのようで、本記事で扱っているバージョンだと対応されていないような気配があります。1つのファイルで約100万行を超えているデータを扱いたい場合はファイルを分けるか、もっと新しいバージョンのVaexのものの利用を検討した方が良さそうです(Vaex3系や4系が安定してきたらそちらを利用するなど)。

普段もっと大きいサイズのデータを扱うことが多いですし、データサイズが大きくなるほどVaexなどのパフォーマンスの優位性が上がっていっている気がするため、大きいデータでの検証まで今回手が回らなかったのが少し残念ではありますが、とりあえず本記事では単体ファイルでの検証なので一旦100万行までの検証で止めておこうと思います。

将来機会があれば別の記事で複数ファイルで巨大なデータに対する検証などをやれたらなと思います(1つ辺り100万行のファイルを同時に大量に扱うなど)。

参考サイト・参考文献まとめ

17
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
17
9