先日単一のファイルでのCSV・Parquet・HDF5におけるPandas・Dask・Vaexの簡単なパフォーマンス比較をしました。
ただ、普段の作業では時系列データなどでたくさんのファイルを扱ったりすることが多いため、今回は複数ファイル・且つ普通に扱うとメモリが結構きついくらいの行数も含めてざっとパフォーマンス比較をしていきます。
TL;DR
- 今回試した時系列想定のデータでは思っていたほどVaex + 未圧縮HDF5とVaex + Snappy圧縮されたParquetで差が出ませんでした。取り回しのしやすさやファイルサイズなどを考えるとSnappy圧縮のParquetは結構良い選択に思えます。
- ※前回の単一ファイルでの比較では、1つのファイルで行数がもっとぐぐっと多い場合にはHDF5の方がParquetよりも速かったので、1つのファイルだけで大量のデータが入っているようなケースだとHDF5が輝いてきそうです。
- PandasやDaskは時系列データ的に多くのファイルがある場合、HDF5であまり速度が出ないようです(特にDaskが顕著に遅い)。
- 時系列データとして、今回検証したような1ファイル辺り10万行前後で集計した場合、VaexとDaskでそこまで極端な差は出ないようです(2倍強程度Vaexは速いものの)。どちらかというと今回試した環境のディスクアクセス的なところもネックになっているかもしれません?
- しかしながら2億行弱くらいでも計算が1分半くらいで終わるのは大分快適で助かります。
追記
2021-01-29
Twitterでyukieiji@工場長(仮)さんからご指摘いただいたため、比較のコードが不適切だったかもしれません そのうち機会があればDaskのその辺りもう少し深堀りします!
複数ファイルでのParquet・HDF5でのVaex・Daskなどのパフォーマンス比較 https://t.co/ei4EEzpVL9 #Qiita @simonritchie_sdより
— yukieiji@工場長(仮) (@yukieiji_2_sab) January 28, 2021
これDaskが遅い原因ってListのAppend処理じゃない?
うちでListのAppend処理はずしたら190sの処理が最速5sくなったぞい
使う環境
以下のDockerイメージの設定で進めていきます。ホストはWindows10のラップトップを使っています。
FROM jupyter/datascience-notebook:python-3.8.6
RUN pip install vaex==2.6.1
RUN pip install jupyter-contrib-nbextensions==0.5.1
RUN jupyter contrib nbextension install --user
RUN jupyter nbextension enable hinterland/hinterland \
&& jupyter nbextension enable toc2/main
以下のようなOSやライブラリの環境となります。
- Ubuntuは20.04
- vaex==2.6.1
- pandas==1.1.4
- dask==2.30.0
- Jupyter notebook
対象とするフォーマット
前回の記事の比較で、使うとしたら未圧縮HDF5かSnappy圧縮のParquetか・・・といった感じだったため、今回はCSVは含めずにHDF5とSnappy圧縮のParquetの2つのフォーマットを扱います。
また、ライブラリに関してはメモリに乗る程度の行数の比較であればPandasも含め、結構多い行数になってきたらDaskとVaexの2つのみで進めます。
※以降で各コードに触れていきますが、スキップして結果を確認したい方はこちらの節に飛んでいただけますと幸いです。
データの準備
前回のパフォーマンス比較の記事のコードをある程度流用していきます。
時系列データを想定し、1日約8万~12万行5列のデータを持ったデータセットとして想定し2016年1月~2020年12月末までの5年分のデータで準備します。
5つのカラムは以下のような構成にします。
- column_a : int -> 0~4999999の範囲でランダムな値を設定。
- column_b : str -> 日時の文字列(例 : 2020-12-31 15:10:20)を設定。
- column_c : int -> 0~100の範囲でランダムな値を設定。
- column_d : int -> 100, 300, 500, 1000, 3000, 10000のいずれかの値を設定。
- column_e : str -> char_num 20文字の数の小文字と大文字のアルファベットの文字列を設定。
また、HDF5はライブラリによって階層構造が変わっていたりするため、各ライブラリごとに別々に保存しています。
from string import ascii_letters
import random
from datetime import date, datetime
import numpy as np
import pandas as pd
import dask.dataframe as dd
from pandas.tseries.offsets import Day
import vaex
def make_random_ascii_str(char_num):
"""
指定された文字数のランダムな小文字と大文字の文字列を生成する。
Parameters
----------
char_num : int
生成する文字数。
Returns
-------
random_str : str
生成された文字列。
"""
return ''.join(random.choices(population=ascii_letters, k=char_num))
def make_pandas_df(row_num, char_num, date_str):
"""
検証用のPandasのデータフレームを生成する。
Parameters
----------
row_num : int
生成するデータフレームの行数。
char_num : str
文字列のカラムに設定する値の文字数。
date_str : str
対象の日付の文字列。
Returns
-------
pandas_df : pd.DataFrame
生成されたデータフレーム。以下の5つのカラムに各値が設定される。
- column_a : int -> 0~4999999の範囲でランダムな値。
- column_b : str -> 日時の文字列(例 : 2020-12-31 15:10:20)。
- column_c : int -> 0~100の範囲でランダムな値。
- column_d : int -> 100, 300, 500, 1000, 3000, 10000のいずれかの値。
- column_e : str -> char_num 20文字の数の小文字と大文字のアルファベットの文字列。
"""
pandas_df = pd.DataFrame(
index=np.arange(0, row_num), columns=['column_a'])
row_num = len(pandas_df)
pandas_df['column_a'] = np.random.randint(0, 5000000, size=row_num)
random_hours = np.random.randint(low=0, high=24, size=row_num)
random_minutes = np.random.randint(low=0, high=60, size=row_num)
random_seconds = np.random.randint(low=0, high=60, size=row_num)
times = []
for i in range(row_num):
hour_str = str(random_hours[i]).zfill(2)
minute_str = str(random_minutes[i]).zfill(2)
second_str = str(random_seconds[i]).zfill(2)
time_str = f'{date_str} {hour_str}:{minute_str}:{second_str}'
times.append(time_str)
pandas_df['column_b'] = times
pandas_df['column_c'] = np.random.randint(0, 100, size=row_num)
pandas_df['column_d'] = np.random.choice(a=[100, 300, 500, 1000, 3000, 10000], size=row_num)
pandas_df['column_e'] = [make_random_ascii_str(char_num=20) for _ in range(row_num)]
pandas_df.sort_values(by='column_b', inplace=True)
return pandas_df
def get_pandas_hdf5_file_path(date_str):
"""
対象日のPandasのHDF5ファイルのパスを取得する。
Parameters
----------
date_str : str
対象日の文字列。
Returns
-------
file_path : str
生成されたファイルパス。
"""
return f'pandas_{date_str}.hdf5'
def get_dask_hdf5_file_path(date_str):
"""
対象日のDaskのHDF5ファイルのパスを取得する。
Parameters
----------
date_str : str
対象日の文字列。
Returns
-------
file_path : str
生成されたファイルパス。
"""
return f'dask_{date_str}.hdf5'
def get_vaex_hdf5_file_path(date_str):
"""
対象日のVaexのHDF5ファイルのパスを取得する。
Parameters
----------
date_str : str
対象日の文字列。
Returns
-------
file_path : str
生成されたファイルパス。
"""
return f'vaex_{date_str}.hdf5'
def get_parquet_file_path(date_str):
"""
対象日のParquetファイルのパスを取得する。
Parameters
----------
date_str : str
対象日の文字列。
Returns
-------
file_path : str
生成されたファイルパス。
"""
return f'{date_str}.parquet'
def save_data():
"""
検証用の時系列のデータの各ファイルを保存する。
"""
current_date = date(2016, 1, 1)
last_date = date(2020, 12, 31)
while current_date <= last_date:
date_str = current_date.strftime('%Y-%m-%d')
row_num = random.randint(80_000, 120_000)
print(
datetime.now(), date_str, 'の保存処理を開始。行数 :', row_num)
pandas_df = make_pandas_df(
row_num=row_num, char_num=20, date_str=date_str)
vaex_df = vaex.from_pandas(df=pandas_df, copy_index=False)
dask_df = dd.from_pandas(data=pandas_df, npartitions=1)
pandas_df.to_hdf(
path_or_buf=get_pandas_hdf5_file_path(date_str=date_str),
key='data', mode='w')
dask_df.to_hdf(
path_or_buf=get_dask_hdf5_file_path(date_str=date_str),
key='data', mode='w')
vaex_df.export_hdf5(
path=get_vaex_hdf5_file_path(date_str=date_str))
vaex_df.export_parquet(
path=get_parquet_file_path(date_str=date_str))
current_date += Day()
save_data()
2021-01-17 07:58:24.950967 2016-01-01 の保存処理を開始。行数 : 83831
2021-01-17 07:58:26.994136 2016-01-02 の保存処理を開始。行数 : 117457
2021-01-17 07:58:29.859080 2016-01-03 の保存処理を開始。行数 : 101470
2021-01-17 07:58:32.381448 2016-01-04 の保存処理を開始。行数 : 88966
...
読み込みと計算処理を追加していく
DaskやVaexが遅延評価される都合、読み込みなどだけだと比較ができないので読み込み → ある程度の計算などの処理といった形で、計算が終わるまでどのくらいかかるのかといった比較を想定して進めます。
今回は以下のような2つのパターンを設けます。
- [パターン1] : column_e でabという文字列を含む行のみにスライス -> スライス後の行数を算出します。
- [パターン2] : column_a の値が300万以下の値にスライス -> column_e の文字列の値の先頭がaから始まる行のみにスライス -> column_d の値でGROUP BY -> 各グループごとに column_a の最大値を算出します。
読みこみ処理の追加
Pandas・Dask・Vaexそれぞれで追加していきます。
def read_pandas_df_from_hdf5(start_date, last_date):
"""
指定された日付範囲でのPandasのデータフレームをHDF5ファイルから読み込む。
Parameters
----------
start_date : date
日付範囲の開始日。
last_date : date
日付範囲の最終日。
Returns
-------
df : pd.DataFrame
読み込まれたPandasのデータフレーム。
"""
df_list = []
current_date = start_date
while current_date <= last_date:
date_str = current_date.strftime('%Y-%m-%d')
file_path = get_pandas_hdf5_file_path(date_str=date_str)
df = pd.read_hdf(path_or_buf=file_path, key='data')
df_list.append(df)
current_date += Day()
df = pd.concat(df_list, ignore_index=True, copy=False)
return df
def read_dask_df_from_hdf5(start_date, last_date):
"""
指定された日付範囲でのDaskのデータフレームをHDF5ファイルから読み込む
Parameters
----------
start_date : date
日付範囲の開始日。
last_date : date
日付範囲の最終日。
Returns
-------
df : dd.DataFrame
読み込まれたDaskのデータフレーム。
"""
df_list = []
current_date = start_date
while current_date <= last_date:
date_str = current_date.strftime('%Y-%m-%d')
file_path = get_dask_hdf5_file_path(date_str=date_str)
df = dd.read_hdf(pattern=file_path, key='data')
df_list.append(df)
current_date += Day()
df = dd.concat(dfs=df_list)
return df
def read_vaex_df_from_hdf5(start_date, last_date):
"""
指定された日付範囲でのVaexのデータフレームをHDF5ファイルから読み込む
Parameters
----------
start_date : date
日付範囲の開始日。
last_date : date
日付範囲の最終日。
Returns
-------
df : vaex.dataframe.DataFrame
読み込まれたVaexのデータフレーム。
"""
file_paths = []
current_date = start_date
while current_date <= last_date:
date_str = current_date.strftime('%Y-%m-%d')
file_path = get_vaex_hdf5_file_path(date_str=date_str)
file_paths.append(file_path)
current_date += Day()
vaex_df = vaex.open_many(filenames=file_paths)
return vaex_df
def read_pandas_df_from_parquet(start_date, last_date):
"""
指定された日付範囲でのPandasのデータフレームをParquetから読み込む。
Parameters
----------
start_date : date
日付範囲の開始日。
last_date : date
日付範囲の最終日。
Returns
-------
df : pd.DataFrame
読み込まれたPandasのデータフレーム。
"""
df_list = []
current_date = start_date
while current_date <= last_date:
date_str = current_date.strftime('%Y-%m-%d')
file_path = get_parquet_file_path(date_str=date_str)
df = pd.read_parquet(path=file_path)
df_list.append(df)
current_date += Day()
df = pd.concat(df_list, ignore_index=True, copy=False)
return df
def read_dask_df_from_parquet(start_date, last_date):
"""
指定された日付範囲でのDaskのデータフレームをParquetファイルから読み込む
Parameters
----------
start_date : date
日付範囲の開始日。
last_date : date
日付範囲の最終日。
Returns
-------
df : dd.DataFrame
読み込まれたDaskのデータフレーム。
"""
df_list = []
current_date = start_date
while current_date <= last_date:
date_str = current_date.strftime('%Y-%m-%d')
file_path = get_parquet_file_path(date_str=date_str)
df = dd.read_parquet(path=file_path)
df_list.append(df)
current_date += Day()
df = dd.concat(dfs=df_list)
return df
def read_vaex_df_from_parquet(start_date, last_date):
"""
指定された日付範囲でのVaexのデータフレームをParquetファイルから読み込む
Parameters
----------
start_date : date
日付範囲の開始日。
last_date : date
日付範囲の最終日。
Returns
-------
df : vaex.dataframe.DataFrame
読み込まれたVaexのデータフレーム。
"""
file_paths = []
current_date = start_date
while current_date <= last_date:
date_str = current_date.strftime('%Y-%m-%d')
file_path = get_parquet_file_path(date_str=date_str)
file_paths.append(file_path)
current_date += Day()
vaex_df = vaex.open_many(filenames=file_paths)
return vaex_df
パターン1の処理の追加
- column_e でabという文字列を含む行のみにスライス
- スライス後の行数を算出
という処理を、それぞれのライブラリで追加していきます。
def calculate_pattern_1_with_pandas_df(pandas_df):
"""
1つ目のパターンの計算を、Pandasのデータフレームで行う。
Parameters
----------
pandas_df : pd.DataFrame
対象のPandasのデータフレーム。
Returns
-------
row_count : int
算出結果の行数。
"""
pandas_df = pandas_df[pandas_df['column_e'].str.contains('ab')]
row_count = len(pandas_df)
return row_count
def calculate_pattern_1_with_dask_df(dask_df):
"""
1つ目のパターンの計算を、Daskのデータフレームで行う。
Parameters
----------
dask_df : dd.DataFrame
対象のDaskのデータフレーム。
Returns
-------
row_count : int
算出結果の行数。
"""
dask_df = dask_df[dask_df['column_e'].str.contains('ab')]
row_count = len(dask_df)
return row_count
def calculate_pattern_1_with_vaex_df(vaex_df):
"""
3つ目のパターンの計算を、Vaexのデータフレームで行う。
Parameters
----------
vaex_df : vaex.dataframe.DataFrame
対象のVaexのデータフレーム。
Returns
-------
row_count : int
算出結果の行数。
"""
vaex_df = vaex_df[vaex_df['column_e'].str.contains('ab')]
row_count = len(vaex_df)
return row_count
パターン2の処理の追加
- column_a の値が300万以下の値にスライス
- column_e の文字列の値の先頭がaから始まる行のみにスライス
- column_d の値でGROUP BY
- 各グループごとに column_a の最大値を算出
という処理を、それぞれのライブラリで追加していきます。
def calculate_pattern_2_with_pandas_df(pandas_df):
"""
2つ目のパターンの計算を、Pandasのデータフレームで行う。
Parameters
----------
pandas_df : pd.DataFrame
対象のPandasのデータフレーム。
Returns
-------
max_sr : pd.Series
算出された各合計値を格納したシリーズ。
"""
pandas_df = pandas_df[pandas_df['column_a'] <= 3_000_000]
pandas_df = pandas_df[pandas_df['column_e'].str.startswith('a')]
grouped = pandas_df.groupby(by='column_d')
max_df = grouped.max()
max_sr = max_df['column_a']
return max_sr
def calculate_pattern_2_with_dask_df(dask_df):
"""
2つ目のパターンの計算を、Daskのデータフレームで行う。
Parameters
----------
dask_df : dd.DataFrame
対象のDaskのデータフレーム。
Returns
-------
max_sr : pd.Series
算出された各合計値を格納したシリーズ。
"""
dask_df = dask_df[dask_df['column_a'] <= 3_000_000]
dask_df = dask_df[dask_df['column_e'].str.startswith('a')]
grouped = dask_df.groupby(by='column_d')
max_df = grouped.max()
max_sr = max_df['column_a']
max_sr = max_sr.compute()
return max_sr
def calculate_pattern_2_with_vaex_df(vaex_df):
"""
2つ目のパターンの計算を、Vaexのデータフレームで行う。
Parameters
----------
vaex_df : vaex.dataframe.DataFrame
対象のVaexのデータフレーム。
Returns
-------
max_sr : pd.Series
算出された各合計値を格納したシリーズ。
"""
vaex_df = vaex_df[vaex_df['column_a'] <= 3_000_000]
vaex_df = vaex_df[vaex_df['column_e'].str.startswith('a')]
max_df = vaex_df.groupby(
by='column_d',
agg={
'column_a': vaex.agg.max,
})
max_df = max_df.to_pandas_df(column_names=['column_a', 'column_d'])
max_df.index = max_df['column_d']
max_sr = max_df['column_a']
return max_sr
読み込みと集計処理の追加
各読み込み処理と集計処理が追加し終わったので、それらを組み合わせた処理を書いていきます。
from timeit import timeit
from copy import deepcopy
import sys
class ReadAndCalcRunner:
def __init__(self, label, pattern, read_func, calc_func):
"""
読み込みと計算処理の組み合わせの設定や実行処理を扱うクラス。
Parameters
----------
label : str
対象の組み合わせの識別用のラベル。
例 : csv_no_compression_pandas
pattern : int
対象のパターン(1~3)。
read_func : Callable
読み込み処理を扱う関数。引数は省略可で、データフレームを変えす
形式が必要になる。
calc_func : Callable
計算処理を扱う関数。第一引数にデータフレームを受け付ける形式の
ものを指定する必要がある。
"""
self.label = label
self.pattern = pattern
self._read_func = read_func
self._calc_func = calc_func
def run(self, n, start_date, last_date, debug=False):
"""
読み込みと計算処理を実行する。実行後、 mean_seconds 属性に
実行の平均秒数(float)が設定される。
Parameters
----------
n : int
実行回数。多い方が処理時間の精度が高くなるものの、完了
するまでが長くなるので注意。
start_date : date
日付範囲の開始日。
last_date : date
日付範囲の最終日。
debug : bool, default False
デバッグ設定。Trueを指定した場合、計算結果の出力がされる。
"""
statement = 'df = self._read_func(start_date=start_date, last_date=last_date);'
if not debug:
statement += 'self._calc_func(df);'
else:
statement += 'result = self._calc_func(df); print(result)'
result_seconds = timeit(
stmt=statement,
number=n, globals=locals())
self.mean_seconds = result_seconds / n
this_module = sys.modules[__name__]
FORMATS = (
'parquet',
'hdf5',
)
LIBS = (
'pandas',
'dask',
'vaex',
)
PATTERNS = (1, 2)
runners = []
for format_str in FORMATS:
for lib_str in LIBS:
for pattern in PATTERNS:
label = f'{format_str}_{lib_str}'
read_func_name = f'read_{lib_str}_df_from_{format_str}'
read_func = getattr(this_module, read_func_name)
calc_func_name = f'calculate_pattern_{pattern}_with_{lib_str}_df'
calc_func = getattr(this_module, calc_func_name)
runners.append(
ReadAndCalcRunner(
label=label,
pattern=pattern,
read_func=read_func,
calc_func=calc_func)
)
プロット用の処理を追加
処理時間の比較用の関数などを追加していきます。
import matplotlib
from matplotlib.ticker import ScalarFormatter, FormatStrFormatter
import matplotlib.pyplot as plt
matplotlib.style.use('ggplot')
def plot_time_info(time_sr, patten, print_sr = True):
"""
処理時間のプロットを行う。
Parameters
----------
time_sr : pd.Series
処理時間を格納したシリーズ。
patten : int
対象の集計処理のパターン(1~3)。
print_sr : bool, default True
プロット内容のシリーズを出力して表示するかどうか。
"""
sr = time_sr.copy()
sr.sort_values(inplace=True, ascending=False)
if print_sr:
print(sr.sort_values(ascending=True))
title = f'Read and calculation seconds (pattern: {patten})'
ax = sr.plot(kind='barh', figsize=(10, len(sr) // 2), title=title)
ax.xaxis.set_major_formatter(FormatStrFormatter('%.3f'))
plt.show()
各処理を一通り流すための処理の追加
記述がシンプルになるように、各処理(読み込み・集計・可視化)を一気に処理するための関数を追加しておきます。
def run_and_get_result_time_sr(
runners, pattern, n, start_date, last_date, skip_pandas, skip_dask_hdf5,
skip_dask_parquet):
"""
指定されたパターンの各計測処理を実行し、結果の各秒数の値を格納した
シリーズを取得する。
Parameters
----------
runners : list of ReadAndCalcRunner
実行処理の定義などを保持するインスタンスのリスト。
pattern : int
実行するパターン(1~3)。
n : int
実行回数。多い方が処理時間の精度が高くなるものの、完了
するまでが長くなるので注意。
start_date : date
日付範囲の開始日。
last_date : date
日付範囲の最終日。
skip_pandas : bool
Pandasの処理をスキップするかどうか。
skip_dask_hdf5 : bool
HDF5形式のDaskの処理をスキップするかどうか。
skip_dask_parquet : bool
Parquet形式のDaskの処理をスキップするかどうか。
Returns
-------
sr : pd.Series
計測結果を格納したシリーズ。インデックスには各フォーマットと
ライブラリ識別用のラベルとパターンの連結された文字列が設定され、
値に秒数が設定される。
"""
runners = deepcopy(runners)
data_dict = {}
for runner in runners:
if runner.pattern != pattern:
continue
if skip_pandas and 'pandas' in runner.label:
continue
if skip_dask_hdf5 and 'dask' in runner.label and 'hdf5' in runner.label:
continue
if skip_dask_parquet and 'dask' in runner.label and 'parquet' in runner.label:
continue
label = f'{runner.label}_{pattern}'
print(datetime.now(), label, 'の処理を開始...')
runner.run(n=n, start_date=start_date, last_date=last_date)
data_dict[label] = runner.mean_seconds
sr = pd.Series(data=data_dict)
return sr
def run_overall(
n, pattern, start_date, last_date, skip_pandas, skip_dask_hdf5,
skip_dask_parquet):
"""
対象のパターンに対して、各処理(読み込み・集計・可視化など)を
全体的に実行する。
Parameters
----------
n : int
読み込みと計算の実行回数。多い方が処理時間が正確になるものの、
完了するまで長時間必要となるので注意。
pattern : int
対象とする計算のパターン(1~3)。
start_date : date
日付範囲の開始日。
last_date : date
日付範囲の最終日。
skip_pandas : bool
Pandasの処理をスキップするかどうか。
skip_dask_hdf5 : bool
HDF5形式のDaskの処理をスキップするかどうか。
skip_dask_parquet : bool
Parquet形式のDaskの処理をスキップするかどうか。
"""
time_sr = run_and_get_result_time_sr(
runners=runners, pattern=pattern, n=n, start_date=start_date,
last_date=last_date, skip_pandas=skip_pandas,
skip_dask_hdf5=skip_dask_hdf5,
skip_dask_parquet=skip_dask_parquet)
plot_time_info(time_sr=time_sr, patten=pattern)
実際に実行してみる
準備が出来たので各処理時間を集計していきます。以下の各条件で試していきます。なお、Pandasは3ヵ月分のものまでを対象とし、以降はDaskとVaexでのみ実行していきます。
- 1ヵ月分(約300万行)
- 3ヵ月分(約900万行)
- 6ヵ月分(約1800万行)
- 1年分(約3600万行)
- 3年分(約1億800万行)
- 5年分(約1億8000万行)
1ヵ月分での集計
パターン1:
run_overall(
n=5, pattern=1,
start_date=date(2016, 1, 1),
last_date=date(2016, 1, 31),
skip_pandas=False)
hdf5_vaex_1 1.015516
parquet_vaex_1 1.028920
parquet_pandas_1 2.685143
parquet_dask_1 2.828006
hdf5_pandas_1 3.311069
hdf5_dask_1 7.616159
- Snappy圧縮されているにも関わらず、Parquet + Vaexの処理時間がかなり未圧縮HDF5に近い数字がでています。
- PandasとDaskはHDF5の方が(未圧縮にも関わらず)遅いようです。特にDaskの処理時間が気になります。
パターン2:
run_overall(
n=5, pattern=2,
start_date=date(2016, 1, 1),
last_date=date(2016, 1, 31),
skip_pandas=False)
hdf5_vaex_2 0.766808
parquet_vaex_2 0.848183
parquet_pandas_2 2.436566
parquet_dask_2 2.961728
hdf5_pandas_2 4.134251
hdf5_dask_2 8.657277
3ヵ月分での集計
パターン1:
run_overall(
n=5, pattern=1,
start_date=date(2016, 1, 1),
last_date=date(2016, 3, 31),
skip_pandas=False)
hdf5_vaex_1 2.260799
parquet_vaex_1 2.649166
parquet_dask_1 8.578201
parquet_pandas_1 8.656629
hdf5_pandas_1 9.994132
hdf5_dask_1 22.766739
パターン2:
run_overall(
n=5, pattern=2,
start_date=date(2016, 1, 1),
last_date=date(2016, 3, 31),
skip_pandas=False)
hdf5_vaex_2 1.894970
parquet_vaex_2 2.529330
parquet_pandas_2 7.110901
hdf5_pandas_2 9.252198
parquet_dask_2 10.688318
hdf5_dask_2 23.362928
6ヵ月分での集計
ここからはPandasをスキップしてDaskとVaexのみで進めていきます。
また、処理時間の都合で、各パターンでの実行回数を5回から3回に減らして実行していきます。
加えてDask + HDF5がかなり遅いので、処理時間の都合でそれをスキップできるように調整します。
パターン1:
run_overall(
n=3, pattern=1,
start_date=date(2016, 1, 1),
last_date=date(2016, 6, 30),
skip_pandas=True,
skip_dask_hdf5=True)
hdf5_vaex_1 4.621812
parquet_vaex_1 5.633019
parquet_dask_1 17.827765
パターン2:
run_overall(
n=3, pattern=2,
start_date=date(2016, 1, 1),
last_date=date(2016, 6, 30),
skip_pandas=True,
skip_dask_hdf5=True)
hdf5_vaex_2 4.231214
parquet_vaex_2 5.312496
parquet_dask_2 17.153308
1年分での集計
実行回数を2回に減らしていきます。
パターン1:
run_overall(
n=2, pattern=1,
start_date=date(2016, 1, 1),
last_date=date(2016, 12, 31),
skip_pandas=True,
skip_dask_hdf5=True)
hdf5_vaex_1 9.618381
parquet_vaex_1 11.091080
parquet_dask_1 36.335810
パターン2:
run_overall(
n=2, pattern=2,
start_date=date(2016, 1, 1),
last_date=date(2016, 12, 31),
skip_pandas=True,
skip_dask_hdf5=True)
hdf5_vaex_2 9.132881
parquet_vaex_2 11.136143
parquet_dask_2 34.377085
3年分での集計
試行回数が大分少なくなりますが、以降は処理時間の都合で1回ずつのみ実行していきます。
パターン1:
run_overall(
n=1, pattern=1,
start_date=date(2016, 1, 1),
last_date=date(2018, 12, 31),
skip_pandas=True,
skip_dask_hdf5=True)
hdf5_vaex_1 40.676083
parquet_vaex_1 43.035784
parquet_dask_1 100.698389
パターン2:
run_overall(
n=1, pattern=2,
start_date=date(2016, 1, 1),
last_date=date(2018, 12, 31),
skip_pandas=True,
skip_dask_hdf5=True)
hdf5_vaex_2 40.061167
parquet_vaex_2 42.218093
parquet_dask_2 102.830116
5年分での集計
Dockerで動かしているのが影響している?のか、メモリなどで問題なさそうでも何故かDaskのものを実行中にJupyterのカーネルが落ちるので、5年分のものはVaexのみで進めてみます。
パターン1:
run_overall(
n=1, pattern=1,
start_date=date(2016, 1, 1),
last_date=date(2020, 12, 31),
skip_pandas=True,
skip_dask_hdf5=True,
skip_dask_parquet=True)
parquet_vaex_1 95.578259
hdf5_vaex_1 99.258315
試行回数が1回なのでぶれた可能性もありますが、まさかのParquetの方が速い結果に。
パターン2:
run_overall(
n=1, pattern=2,
start_date=date(2016, 1, 1),
last_date=date(2020, 12, 31),
skip_pandas=True,
skip_dask_hdf5=True,
skip_dask_parquet=True)
hdf5_vaex_2 78.231278
parquet_vaex_2 93.696743
こちらは普通にHDF5の方が速い結果に。