3
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

複数ファイルでのParquet・HDF5でのVaex・Daskなどのパフォーマンス比較

Last updated at Posted at 2021-01-17

先日単一のファイルでのCSV・Parquet・HDF5におけるPandas・Dask・Vaexの簡単なパフォーマンス比較をしました。

ただ、普段の作業では時系列データなどでたくさんのファイルを扱ったりすることが多いため、今回は複数ファイル・且つ普通に扱うとメモリが結構きついくらいの行数も含めてざっとパフォーマンス比較をしていきます。

TL;DR

  • 今回試した時系列想定のデータでは思っていたほどVaex + 未圧縮HDF5とVaex + Snappy圧縮されたParquetで差が出ませんでした。取り回しのしやすさやファイルサイズなどを考えるとSnappy圧縮のParquetは結構良い選択に思えます。
    • ※前回の単一ファイルでの比較では、1つのファイルで行数がもっとぐぐっと多い場合にはHDF5の方がParquetよりも速かったので、1つのファイルだけで大量のデータが入っているようなケースだとHDF5が輝いてきそうです。
  • PandasやDaskは時系列データ的に多くのファイルがある場合、HDF5であまり速度が出ないようです(特にDaskが顕著に遅い)。
  • 時系列データとして、今回検証したような1ファイル辺り10万行前後で集計した場合、VaexとDaskでそこまで極端な差は出ないようです(2倍強程度Vaexは速いものの)。どちらかというと今回試した環境のディスクアクセス的なところもネックになっているかもしれません?
  • しかしながら2億行弱くらいでも計算が1分半くらいで終わるのは大分快適で助かります。

追記

2021-01-29

Twitterでyukieiji@工場長(仮)さんからご指摘いただいたため、比較のコードが不適切だったかもしれません:qiitan-cry: そのうち機会があればDaskのその辺りもう少し深堀りします!

使う環境

以下のDockerイメージの設定で進めていきます。ホストはWindows10のラップトップを使っています。

FROM jupyter/datascience-notebook:python-3.8.6
RUN pip install vaex==2.6.1
RUN pip install jupyter-contrib-nbextensions==0.5.1
RUN jupyter contrib nbextension install --user
RUN jupyter nbextension enable hinterland/hinterland \
    && jupyter nbextension enable toc2/main

以下のようなOSやライブラリの環境となります。

  • Ubuntuは20.04
  • vaex==2.6.1
  • pandas==1.1.4
  • dask==2.30.0
  • Jupyter notebook

対象とするフォーマット

前回の記事の比較で、使うとしたら未圧縮HDF5かSnappy圧縮のParquetか・・・といった感じだったため、今回はCSVは含めずにHDF5とSnappy圧縮のParquetの2つのフォーマットを扱います。

また、ライブラリに関してはメモリに乗る程度の行数の比較であればPandasも含め、結構多い行数になってきたらDaskとVaexの2つのみで進めます。


※以降で各コードに触れていきますが、スキップして結果を確認したい方はこちらの節に飛んでいただけますと幸いです。

データの準備

前回のパフォーマンス比較の記事のコードをある程度流用していきます。

時系列データを想定し、1日約8万~12万行5列のデータを持ったデータセットとして想定し2016年1月~2020年12月末までの5年分のデータで準備します。

5つのカラムは以下のような構成にします。

  • column_a : int -> 0~4999999の範囲でランダムな値を設定。
  • column_b : str -> 日時の文字列(例 : 2020-12-31 15:10:20)を設定。
  • column_c : int -> 0~100の範囲でランダムな値を設定。
  • column_d : int -> 100, 300, 500, 1000, 3000, 10000のいずれかの値を設定。
  • column_e : str -> char_num 20文字の数の小文字と大文字のアルファベットの文字列を設定。

また、HDF5はライブラリによって階層構造が変わっていたりするため、各ライブラリごとに別々に保存しています。

from string import ascii_letters
import random
from datetime import date, datetime

import numpy as np
import pandas as pd
import dask.dataframe as dd
from pandas.tseries.offsets import Day
import vaex


def make_random_ascii_str(char_num):
    """
    指定された文字数のランダムな小文字と大文字の文字列を生成する。

    Parameters
    ----------
    char_num : int
        生成する文字数。

    Returns
    -------
    random_str : str
        生成された文字列。
    """
    return ''.join(random.choices(population=ascii_letters, k=char_num))


def make_pandas_df(row_num, char_num, date_str):
    """
    検証用のPandasのデータフレームを生成する。

    Parameters
    ----------
    row_num : int
        生成するデータフレームの行数。
    char_num : str
        文字列のカラムに設定する値の文字数。
    date_str : str
        対象の日付の文字列。

    Returns
    -------
    pandas_df : pd.DataFrame
        生成されたデータフレーム。以下の5つのカラムに各値が設定される。
        - column_a : int -> 0~4999999の範囲でランダムな値。
        - column_b : str -> 日時の文字列(例 : 2020-12-31 15:10:20)。
        - column_c : int -> 0~100の範囲でランダムな値。
        - column_d : int -> 100, 300, 500, 1000, 3000, 10000のいずれかの値。
        - column_e : str -> char_num 20文字の数の小文字と大文字のアルファベットの文字列。
    """
    pandas_df = pd.DataFrame(
        index=np.arange(0, row_num), columns=['column_a'])
    row_num = len(pandas_df)
    pandas_df['column_a'] = np.random.randint(0, 5000000, size=row_num)
    
    random_hours = np.random.randint(low=0, high=24, size=row_num)
    random_minutes = np.random.randint(low=0, high=60, size=row_num)
    random_seconds = np.random.randint(low=0, high=60, size=row_num)
    times = []
    for i in range(row_num):
        hour_str = str(random_hours[i]).zfill(2)
        minute_str = str(random_minutes[i]).zfill(2)
        second_str = str(random_seconds[i]).zfill(2)
        time_str = f'{date_str} {hour_str}:{minute_str}:{second_str}'
        times.append(time_str)
    pandas_df['column_b'] = times

    pandas_df['column_c'] = np.random.randint(0, 100, size=row_num)
    pandas_df['column_d'] = np.random.choice(a=[100, 300, 500, 1000, 3000, 10000], size=row_num)
    pandas_df['column_e'] = [make_random_ascii_str(char_num=20) for _ in range(row_num)]
    pandas_df.sort_values(by='column_b', inplace=True)
    return pandas_df


def get_pandas_hdf5_file_path(date_str):
    """
    対象日のPandasのHDF5ファイルのパスを取得する。

    Parameters
    ----------
    date_str : str
        対象日の文字列。
    
    Returns
    -------
    file_path : str
        生成されたファイルパス。
    """
    return f'pandas_{date_str}.hdf5'


def get_dask_hdf5_file_path(date_str):
    """
    対象日のDaskのHDF5ファイルのパスを取得する。

    Parameters
    ----------
    date_str : str
        対象日の文字列。
    
    Returns
    -------
    file_path : str
        生成されたファイルパス。
    """
    return f'dask_{date_str}.hdf5'


def get_vaex_hdf5_file_path(date_str):
    """
    対象日のVaexのHDF5ファイルのパスを取得する。

    Parameters
    ----------
    date_str : str
        対象日の文字列。
    
    Returns
    -------
    file_path : str
        生成されたファイルパス。
    """
    return f'vaex_{date_str}.hdf5'


def get_parquet_file_path(date_str):
    """
    対象日のParquetファイルのパスを取得する。

    Parameters
    ----------
    date_str : str
        対象日の文字列。
    
    Returns
    -------
    file_path : str
        生成されたファイルパス。
    """
    return f'{date_str}.parquet'


def save_data():
    """
    検証用の時系列のデータの各ファイルを保存する。
    """
    current_date = date(2016, 1, 1)
    last_date = date(2020, 12, 31)
    while current_date <= last_date:
        date_str = current_date.strftime('%Y-%m-%d')
        row_num = random.randint(80_000, 120_000)
        print(
            datetime.now(), date_str, 'の保存処理を開始。行数 :', row_num)
        
        pandas_df = make_pandas_df(
            row_num=row_num, char_num=20, date_str=date_str)
        vaex_df = vaex.from_pandas(df=pandas_df, copy_index=False)
        dask_df = dd.from_pandas(data=pandas_df, npartitions=1)
        
        pandas_df.to_hdf(
            path_or_buf=get_pandas_hdf5_file_path(date_str=date_str),
            key='data', mode='w')
        
        dask_df.to_hdf(
            path_or_buf=get_dask_hdf5_file_path(date_str=date_str),
            key='data', mode='w')
        
        vaex_df.export_hdf5(
            path=get_vaex_hdf5_file_path(date_str=date_str))
        
        vaex_df.export_parquet(
            path=get_parquet_file_path(date_str=date_str))
        
        current_date += Day()


save_data()
2021-01-17 07:58:24.950967 2016-01-01 の保存処理を開始。行数 : 83831
2021-01-17 07:58:26.994136 2016-01-02 の保存処理を開始。行数 : 117457
2021-01-17 07:58:29.859080 2016-01-03 の保存処理を開始。行数 : 101470
2021-01-17 07:58:32.381448 2016-01-04 の保存処理を開始。行数 : 88966
...

読み込みと計算処理を追加していく

DaskやVaexが遅延評価される都合、読み込みなどだけだと比較ができないので読み込み → ある程度の計算などの処理といった形で、計算が終わるまでどのくらいかかるのかといった比較を想定して進めます。

今回は以下のような2つのパターンを設けます。

  • [パターン1] : column_e でabという文字列を含む行のみにスライス -> スライス後の行数を算出します。
  • [パターン2] : column_a の値が300万以下の値にスライス -> column_e の文字列の値の先頭がaから始まる行のみにスライス -> column_d の値でGROUP BY -> 各グループごとに column_a の最大値を算出します。

読みこみ処理の追加

Pandas・Dask・Vaexそれぞれで追加していきます。

def read_pandas_df_from_hdf5(start_date, last_date):
    """
    指定された日付範囲でのPandasのデータフレームをHDF5ファイルから読み込む。

    Parameters
    ----------
    start_date : date
        日付範囲の開始日。
    last_date : date
        日付範囲の最終日。
    
    Returns
    -------
    df : pd.DataFrame
        読み込まれたPandasのデータフレーム。
    """
    df_list = []
    current_date = start_date
    while current_date <= last_date:
        date_str = current_date.strftime('%Y-%m-%d')
        file_path = get_pandas_hdf5_file_path(date_str=date_str)
        df = pd.read_hdf(path_or_buf=file_path, key='data')
        df_list.append(df)
        current_date += Day()
    df = pd.concat(df_list, ignore_index=True, copy=False)
    return df


def read_dask_df_from_hdf5(start_date, last_date):
    """
    指定された日付範囲でのDaskのデータフレームをHDF5ファイルから読み込む

    Parameters
    ----------
    start_date : date
        日付範囲の開始日。
    last_date : date
        日付範囲の最終日。
    
    Returns
    -------
    df : dd.DataFrame
        読み込まれたDaskのデータフレーム。
    """
    df_list = []
    current_date = start_date
    while current_date <= last_date:
        date_str = current_date.strftime('%Y-%m-%d')
        file_path = get_dask_hdf5_file_path(date_str=date_str)
        df = dd.read_hdf(pattern=file_path, key='data')
        df_list.append(df)
        current_date += Day()
    df = dd.concat(dfs=df_list)
    return df


def read_vaex_df_from_hdf5(start_date, last_date):
    """
    指定された日付範囲でのVaexのデータフレームをHDF5ファイルから読み込む

    Parameters
    ----------
    start_date : date
        日付範囲の開始日。
    last_date : date
        日付範囲の最終日。
    
    Returns
    -------
    df : vaex.dataframe.DataFrame
        読み込まれたVaexのデータフレーム。
    """
    file_paths = []
    current_date = start_date
    while current_date <= last_date:
        date_str = current_date.strftime('%Y-%m-%d')
        file_path = get_vaex_hdf5_file_path(date_str=date_str)
        file_paths.append(file_path)
        current_date += Day()
    vaex_df = vaex.open_many(filenames=file_paths)
    return vaex_df


def read_pandas_df_from_parquet(start_date, last_date):
    """
    指定された日付範囲でのPandasのデータフレームをParquetから読み込む。

    Parameters
    ----------
    start_date : date
        日付範囲の開始日。
    last_date : date
        日付範囲の最終日。
    
    Returns
    -------
    df : pd.DataFrame
        読み込まれたPandasのデータフレーム。
    """
    df_list = []
    current_date = start_date
    while current_date <= last_date:
        date_str = current_date.strftime('%Y-%m-%d')
        file_path = get_parquet_file_path(date_str=date_str)
        df = pd.read_parquet(path=file_path)
        df_list.append(df)
        current_date += Day()
    df = pd.concat(df_list, ignore_index=True, copy=False)
    return df


def read_dask_df_from_parquet(start_date, last_date):
    """
    指定された日付範囲でのDaskのデータフレームをParquetファイルから読み込む

    Parameters
    ----------
    start_date : date
        日付範囲の開始日。
    last_date : date
        日付範囲の最終日。
    
    Returns
    -------
    df : dd.DataFrame
        読み込まれたDaskのデータフレーム。
    """
    df_list = []
    current_date = start_date
    while current_date <= last_date:
        date_str = current_date.strftime('%Y-%m-%d')
        file_path = get_parquet_file_path(date_str=date_str)
        df = dd.read_parquet(path=file_path)
        df_list.append(df)
        current_date += Day()
    df = dd.concat(dfs=df_list)
    return df


def read_vaex_df_from_parquet(start_date, last_date):
    """
    指定された日付範囲でのVaexのデータフレームをParquetファイルから読み込む

    Parameters
    ----------
    start_date : date
        日付範囲の開始日。
    last_date : date
        日付範囲の最終日。
    
    Returns
    -------
    df : vaex.dataframe.DataFrame
        読み込まれたVaexのデータフレーム。
    """
    file_paths = []
    current_date = start_date
    while current_date <= last_date:
        date_str = current_date.strftime('%Y-%m-%d')
        file_path = get_parquet_file_path(date_str=date_str)
        file_paths.append(file_path)
        current_date += Day()
    vaex_df = vaex.open_many(filenames=file_paths)
    return vaex_df

パターン1の処理の追加

  • column_e でabという文字列を含む行のみにスライス
  • スライス後の行数を算出

という処理を、それぞれのライブラリで追加していきます。

def calculate_pattern_1_with_pandas_df(pandas_df):
    """
    1つ目のパターンの計算を、Pandasのデータフレームで行う。

    Parameters
    ----------
    pandas_df : pd.DataFrame
        対象のPandasのデータフレーム。
    
    Returns
    -------
    row_count : int
        算出結果の行数。
    """
    pandas_df = pandas_df[pandas_df['column_e'].str.contains('ab')]
    row_count = len(pandas_df)
    return row_count


def calculate_pattern_1_with_dask_df(dask_df):
    """
    1つ目のパターンの計算を、Daskのデータフレームで行う。

    Parameters
    ----------
    dask_df : dd.DataFrame
        対象のDaskのデータフレーム。

    Returns
    -------
    row_count : int
        算出結果の行数。
    """
    dask_df = dask_df[dask_df['column_e'].str.contains('ab')]
    row_count = len(dask_df)
    return row_count


def calculate_pattern_1_with_vaex_df(vaex_df):
    """
    3つ目のパターンの計算を、Vaexのデータフレームで行う。

    Parameters
    ----------
    vaex_df : vaex.dataframe.DataFrame
        対象のVaexのデータフレーム。

    Returns
    -------
    row_count : int
        算出結果の行数。
    """
    vaex_df = vaex_df[vaex_df['column_e'].str.contains('ab')]
    row_count = len(vaex_df)
    return row_count

パターン2の処理の追加

  • column_a の値が300万以下の値にスライス
  • column_e の文字列の値の先頭がaから始まる行のみにスライス
  • column_d の値でGROUP BY
  • 各グループごとに column_a の最大値を算出

という処理を、それぞれのライブラリで追加していきます。

def calculate_pattern_2_with_pandas_df(pandas_df):
    """
    2つ目のパターンの計算を、Pandasのデータフレームで行う。

    Parameters
    ----------
    pandas_df : pd.DataFrame
        対象のPandasのデータフレーム。

    Returns
    -------
    max_sr : pd.Series
        算出された各合計値を格納したシリーズ。
    """
    pandas_df = pandas_df[pandas_df['column_a'] <= 3_000_000]
    pandas_df = pandas_df[pandas_df['column_e'].str.startswith('a')]
    grouped = pandas_df.groupby(by='column_d')
    max_df = grouped.max()
    max_sr = max_df['column_a']
    return max_sr


def calculate_pattern_2_with_dask_df(dask_df):
    """
    2つ目のパターンの計算を、Daskのデータフレームで行う。

    Parameters
    ----------
    dask_df : dd.DataFrame
        対象のDaskのデータフレーム。

    Returns
    -------
    max_sr : pd.Series
        算出された各合計値を格納したシリーズ。
    """
    dask_df = dask_df[dask_df['column_a'] <= 3_000_000]
    dask_df = dask_df[dask_df['column_e'].str.startswith('a')]
    grouped = dask_df.groupby(by='column_d')
    max_df = grouped.max()
    max_sr = max_df['column_a']
    max_sr = max_sr.compute()
    return max_sr


def calculate_pattern_2_with_vaex_df(vaex_df):
    """
    2つ目のパターンの計算を、Vaexのデータフレームで行う。

    Parameters
    ----------
    vaex_df : vaex.dataframe.DataFrame
        対象のVaexのデータフレーム。

    Returns
    -------
    max_sr : pd.Series
        算出された各合計値を格納したシリーズ。
    """
    vaex_df = vaex_df[vaex_df['column_a'] <= 3_000_000]
    vaex_df = vaex_df[vaex_df['column_e'].str.startswith('a')]
    max_df = vaex_df.groupby(
        by='column_d',
        agg={
            'column_a': vaex.agg.max,
        })
    max_df = max_df.to_pandas_df(column_names=['column_a', 'column_d'])
    max_df.index = max_df['column_d']
    max_sr = max_df['column_a']
    return max_sr

読み込みと集計処理の追加

各読み込み処理と集計処理が追加し終わったので、それらを組み合わせた処理を書いていきます。

from timeit import timeit
from copy import deepcopy
import sys


class ReadAndCalcRunner:

    def __init__(self, label, pattern, read_func, calc_func):
        """
        読み込みと計算処理の組み合わせの設定や実行処理を扱うクラス。

        Parameters
        ----------
        label : str
            対象の組み合わせの識別用のラベル。
            例 : csv_no_compression_pandas
        pattern : int
            対象のパターン(1~3)。
        read_func : Callable
            読み込み処理を扱う関数。引数は省略可で、データフレームを変えす
            形式が必要になる。
        calc_func : Callable
            計算処理を扱う関数。第一引数にデータフレームを受け付ける形式の
            ものを指定する必要がある。
        """
        self.label = label
        self.pattern = pattern
        self._read_func = read_func
        self._calc_func = calc_func

    def run(self, n, start_date, last_date, debug=False):
        """
        読み込みと計算処理を実行する。実行後、 mean_seconds 属性に
        実行の平均秒数(float)が設定される。

        Parameters
        ----------
        n : int
            実行回数。多い方が処理時間の精度が高くなるものの、完了
            するまでが長くなるので注意。
        start_date : date
            日付範囲の開始日。
        last_date : date
            日付範囲の最終日。
        debug : bool, default False
            デバッグ設定。Trueを指定した場合、計算結果の出力がされる。
        """
        statement = 'df = self._read_func(start_date=start_date, last_date=last_date);'
        if not debug:
            statement += 'self._calc_func(df);'
        else:
            statement += 'result = self._calc_func(df); print(result)'
        result_seconds = timeit(
            stmt=statement,
            number=n, globals=locals())
        self.mean_seconds = result_seconds / n


this_module = sys.modules[__name__]
FORMATS = (
    'parquet',
    'hdf5',
)
LIBS = (
    'pandas',
    'dask',
    'vaex',
)
PATTERNS = (1, 2)

runners = []

for format_str in FORMATS:
    for lib_str in LIBS:
        for pattern in PATTERNS:
            label = f'{format_str}_{lib_str}'
            read_func_name = f'read_{lib_str}_df_from_{format_str}'
            read_func = getattr(this_module, read_func_name)

            calc_func_name = f'calculate_pattern_{pattern}_with_{lib_str}_df'
            calc_func = getattr(this_module, calc_func_name)

            runners.append(
                ReadAndCalcRunner(
                    label=label,
                    pattern=pattern,
                    read_func=read_func,
                    calc_func=calc_func)
            )

プロット用の処理を追加

処理時間の比較用の関数などを追加していきます。

import matplotlib
from matplotlib.ticker import ScalarFormatter, FormatStrFormatter
import matplotlib.pyplot as plt

matplotlib.style.use('ggplot')


def plot_time_info(time_sr, patten, print_sr = True):
    """
    処理時間のプロットを行う。

    Parameters
    ----------
    time_sr : pd.Series
        処理時間を格納したシリーズ。
    patten : int
        対象の集計処理のパターン(1~3)。
    print_sr : bool, default True
        プロット内容のシリーズを出力して表示するかどうか。
    """
    sr = time_sr.copy()
    sr.sort_values(inplace=True, ascending=False)
    if print_sr:
        print(sr.sort_values(ascending=True))
    title = f'Read and calculation seconds (pattern: {patten})'
    ax = sr.plot(kind='barh', figsize=(10, len(sr) // 2), title=title)
    ax.xaxis.set_major_formatter(FormatStrFormatter('%.3f'))
    plt.show()

各処理を一通り流すための処理の追加

記述がシンプルになるように、各処理(読み込み・集計・可視化)を一気に処理するための関数を追加しておきます。

def run_and_get_result_time_sr(
        runners, pattern, n, start_date, last_date, skip_pandas, skip_dask_hdf5,
        skip_dask_parquet):
    """
    指定されたパターンの各計測処理を実行し、結果の各秒数の値を格納した
    シリーズを取得する。

    Parameters
    ----------
    runners : list of ReadAndCalcRunner
        実行処理の定義などを保持するインスタンスのリスト。
    pattern : int
        実行するパターン(1~3)。
    n : int
        実行回数。多い方が処理時間の精度が高くなるものの、完了
        するまでが長くなるので注意。
    start_date : date
        日付範囲の開始日。
    last_date : date
        日付範囲の最終日。
    skip_pandas : bool
        Pandasの処理をスキップするかどうか。
    skip_dask_hdf5 : bool
        HDF5形式のDaskの処理をスキップするかどうか。
    skip_dask_parquet : bool
        Parquet形式のDaskの処理をスキップするかどうか。

    Returns
    -------
    sr : pd.Series
        計測結果を格納したシリーズ。インデックスには各フォーマットと
        ライブラリ識別用のラベルとパターンの連結された文字列が設定され、
        値に秒数が設定される。
    """
    runners = deepcopy(runners)
    data_dict = {}
    for runner in runners:
        if runner.pattern != pattern:
            continue
        if skip_pandas and 'pandas' in runner.label:
            continue
        if skip_dask_hdf5 and 'dask' in runner.label and 'hdf5' in runner.label:
            continue
        if skip_dask_parquet and 'dask' in runner.label and 'parquet' in runner.label:
            continue
        label = f'{runner.label}_{pattern}'
        print(datetime.now(), label, 'の処理を開始...')
        runner.run(n=n, start_date=start_date, last_date=last_date)
        data_dict[label] = runner.mean_seconds
    sr = pd.Series(data=data_dict)
    return sr


def run_overall(
        n, pattern, start_date, last_date, skip_pandas, skip_dask_hdf5,
        skip_dask_parquet):
    """
    対象のパターンに対して、各処理(読み込み・集計・可視化など)を
    全体的に実行する。

    Parameters
    ----------
    n : int
        読み込みと計算の実行回数。多い方が処理時間が正確になるものの、
        完了するまで長時間必要となるので注意。
    pattern : int
        対象とする計算のパターン(1~3)。
    start_date : date
        日付範囲の開始日。
    last_date : date
        日付範囲の最終日。
    skip_pandas : bool
        Pandasの処理をスキップするかどうか。
    skip_dask_hdf5 : bool
        HDF5形式のDaskの処理をスキップするかどうか。
    skip_dask_parquet : bool
        Parquet形式のDaskの処理をスキップするかどうか。
    """
    time_sr = run_and_get_result_time_sr(
        runners=runners, pattern=pattern, n=n, start_date=start_date,
        last_date=last_date, skip_pandas=skip_pandas,
        skip_dask_hdf5=skip_dask_hdf5,
        skip_dask_parquet=skip_dask_parquet)
    plot_time_info(time_sr=time_sr, patten=pattern)

実際に実行してみる

準備が出来たので各処理時間を集計していきます。以下の各条件で試していきます。なお、Pandasは3ヵ月分のものまでを対象とし、以降はDaskとVaexでのみ実行していきます。

  • 1ヵ月分(約300万行)
  • 3ヵ月分(約900万行)
  • 6ヵ月分(約1800万行)
  • 1年分(約3600万行)
  • 3年分(約1億800万行)
  • 5年分(約1億8000万行)

1ヵ月分での集計

パターン1:

run_overall(
    n=5, pattern=1,
    start_date=date(2016, 1, 1),
    last_date=date(2016, 1, 31),
    skip_pandas=False)
hdf5_vaex_1         1.015516
parquet_vaex_1      1.028920
parquet_pandas_1    2.685143
parquet_dask_1      2.828006
hdf5_pandas_1       3.311069
hdf5_dask_1         7.616159

image.png

  • Snappy圧縮されているにも関わらず、Parquet + Vaexの処理時間がかなり未圧縮HDF5に近い数字がでています。
  • PandasとDaskはHDF5の方が(未圧縮にも関わらず)遅いようです。特にDaskの処理時間が気になります。

パターン2:

run_overall(
    n=5, pattern=2,
    start_date=date(2016, 1, 1),
    last_date=date(2016, 1, 31),
    skip_pandas=False)
hdf5_vaex_2         0.766808
parquet_vaex_2      0.848183
parquet_pandas_2    2.436566
parquet_dask_2      2.961728
hdf5_pandas_2       4.134251
hdf5_dask_2         8.657277

image.png

3ヵ月分での集計

パターン1:

run_overall(
    n=5, pattern=1,
    start_date=date(2016, 1, 1),
    last_date=date(2016, 3, 31),
    skip_pandas=False)
hdf5_vaex_1          2.260799
parquet_vaex_1       2.649166
parquet_dask_1       8.578201
parquet_pandas_1     8.656629
hdf5_pandas_1        9.994132
hdf5_dask_1         22.766739

image.png

パターン2:

run_overall(
    n=5, pattern=2,
    start_date=date(2016, 1, 1),
    last_date=date(2016, 3, 31),
    skip_pandas=False)
hdf5_vaex_2          1.894970
parquet_vaex_2       2.529330
parquet_pandas_2     7.110901
hdf5_pandas_2        9.252198
parquet_dask_2      10.688318
hdf5_dask_2         23.362928

image.png

6ヵ月分での集計

ここからはPandasをスキップしてDaskとVaexのみで進めていきます。
また、処理時間の都合で、各パターンでの実行回数を5回から3回に減らして実行していきます。

加えてDask + HDF5がかなり遅いので、処理時間の都合でそれをスキップできるように調整します。

パターン1:

run_overall(
    n=3, pattern=1,
    start_date=date(2016, 1, 1),
    last_date=date(2016, 6, 30),
    skip_pandas=True,
    skip_dask_hdf5=True)
hdf5_vaex_1        4.621812
parquet_vaex_1     5.633019
parquet_dask_1    17.827765

image.png

パターン2:

run_overall(
    n=3, pattern=2,
    start_date=date(2016, 1, 1),
    last_date=date(2016, 6, 30),
    skip_pandas=True,
    skip_dask_hdf5=True)
hdf5_vaex_2        4.231214
parquet_vaex_2     5.312496
parquet_dask_2    17.153308

image.png

1年分での集計

実行回数を2回に減らしていきます。

パターン1:

run_overall(
    n=2, pattern=1,
    start_date=date(2016, 1, 1),
    last_date=date(2016, 12, 31),
    skip_pandas=True,
    skip_dask_hdf5=True)
hdf5_vaex_1        9.618381
parquet_vaex_1    11.091080
parquet_dask_1    36.335810

image.png

パターン2:

run_overall(
    n=2, pattern=2,
    start_date=date(2016, 1, 1),
    last_date=date(2016, 12, 31),
    skip_pandas=True,
    skip_dask_hdf5=True)
hdf5_vaex_2        9.132881
parquet_vaex_2    11.136143
parquet_dask_2    34.377085

image.png

3年分での集計

試行回数が大分少なくなりますが、以降は処理時間の都合で1回ずつのみ実行していきます。

パターン1:

run_overall(
    n=1, pattern=1,
    start_date=date(2016, 1, 1),
    last_date=date(2018, 12, 31),
    skip_pandas=True,
    skip_dask_hdf5=True)
hdf5_vaex_1        40.676083
parquet_vaex_1     43.035784
parquet_dask_1    100.698389

image.png

パターン2:

run_overall(
    n=1, pattern=2,
    start_date=date(2016, 1, 1),
    last_date=date(2018, 12, 31),
    skip_pandas=True,
    skip_dask_hdf5=True)
hdf5_vaex_2        40.061167
parquet_vaex_2     42.218093
parquet_dask_2    102.830116

image.png

5年分での集計

Dockerで動かしているのが影響している?のか、メモリなどで問題なさそうでも何故かDaskのものを実行中にJupyterのカーネルが落ちるので、5年分のものはVaexのみで進めてみます。

パターン1:

run_overall(
    n=1, pattern=1,
    start_date=date(2016, 1, 1),
    last_date=date(2020, 12, 31),
    skip_pandas=True,
    skip_dask_hdf5=True,
    skip_dask_parquet=True)
parquet_vaex_1    95.578259
hdf5_vaex_1       99.258315

image.png

試行回数が1回なのでぶれた可能性もありますが、まさかのParquetの方が速い結果に。

パターン2:

run_overall(
    n=1, pattern=2,
    start_date=date(2016, 1, 1),
    last_date=date(2020, 12, 31),
    skip_pandas=True,
    skip_dask_hdf5=True,
    skip_dask_parquet=True)
hdf5_vaex_2       78.231278
parquet_vaex_2    93.696743

こちらは普通にHDF5の方が速い結果に。

image.png

3
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?