参考記事:https://qiita.com/heisenberg_/items/fc4d2498f208a1d1426b
公式github:https://github.com/jmcarpenter2/swifter
swifterコード本体: https://github.com/jmcarpenter2/swifter/blob/master/swifter/swifter.py
ver1.0リーリス時のブログ:https://medium.com/@jmcarpenter2/swifter-1-0-0-automatically-efficient-pandas-and-modin-dataframe-applies-cfbd9555e7c8
swifterを使うと、プログレスバーが出るまでに時間がかかることがある。何をしているのか気になったので調べてみた。
概要(まとめ)
- sampleをmin(1000,要素数の25分の1)個ランダムに選ぶ。これらのサンプルに対して
- まずベクトル化できるか試す
- ベクトル化とは
- ループや反復処理を明示的に記述する代わりに、データ全体に対して一度に操作を適用する方法
- numpyを使って処理する
- ベクトル化がかなり速度に寄与するので、最初に試す
- 試す方法
- 直接適用してエラーが出ないか見る
- エラーが出なくても結果が一緒化を見る
- うまく行ってれば、全体に対してベクトル化(関数の直接適用)を行う
- うまくいかなかれば次。
- ベクトル化とは
- ベクトル化できない場合、daskによる並列化を行うか判断。
- daskは並列処理ライブラリ
- まずsampleに普通にapplyを適用し、しきい値より時間がかかる際はdaskによる並列化を行う。
- しきい値は自分でも設定できる。デフォルトは1秒
pandas.DataFrame.swifter.set_dask_threshold(dask_threshold=1).apply
- まずベクトル化できるか試す
調査方法
詳しい内容が気になる方は以下も見てみてください。
-
swifter.apply
の重要そうな部分(コード本体400行目~)def apply(self, func, axis=0, raw=False, result_type=None, args=(), **kwds): """ Apply the function to the DataFrame using swifter """ # If there are no rows return early using Pandas if not self._nrows: return self._obj.apply(func, axis=axis, raw=raw, result_type=result_type, args=args, **kwds) # If parallel processing is forced by the user, then skip the logic and apply dask if self._force_parallel: return self._parallel_apply(func, axis, raw, result_type, *args, **kwds) sample = self._obj.iloc[self._SAMPLE_INDEX] # check if input is string # or if the user is overriding the string processing default allow_dask_processing = True if self._allow_dask_on_strings else ("object" not in sample.dtypes.values) try: # try to vectorize with suppress_stdout_stderr_logging(): tmp_df = func(sample, *args, **kwds) sample_df = sample.apply(func, axis=axis, raw=raw, result_type=result_type, args=args, **kwds) self._validate_apply( np.array_equal(sample_df, tmp_df) & (hasattr(tmp_df, "shape")) & (sample_df.shape == tmp_df.shape), error_message=("Vectorized function sample does not match pandas apply sample."), ) return func(self._obj, *args, **kwds) except ERRORS_TO_HANDLE: # if can't vectorize, estimate time to pandas apply wrapped = self._wrapped_apply(func, axis=axis, raw=raw, result_type=result_type, args=args, **kwds) timed = timeit.timeit(wrapped, number=N_REPEATS) sample_proc_est = timed / N_REPEATS est_apply_duration = sample_proc_est / self._SAMPLE_SIZE * self._nrows # if pandas sample apply takes too long # and not performing str processing, use dask if (est_apply_duration > self._dask_threshold) and allow_dask_processing and axis == 1: return self._parallel_apply(func, axis, raw, result_type, *args, **kwds) else: # use pandas return self._pandas_apply(self._obj, func, axis, raw, result_type, *args, **kwds)
この部分が何してるか
ChatGPTに聞いてみた
このコードは、
swifter
ライブラリ内のapply
メソッドの一部で、pandasのデータフレームやシリーズに関数を適用する際に内部的に使用されます。このメソッドの目的は、より効率的に関数をデータに適用することで、処理を高速化することです。以下に各部分の説明をします。
- 空のデータフレームに対する処理: データフレームに行がない場合、すぐに標準のpandasの
apply
メソッドを使用して結果を返します。- 並列処理の強制実行: ユーザーが並列処理を強制的に指定している場合、Daskを使用して関数を並列に適用します。
- ベクトル化可能性のテスト: 最初にサンプルデータを取り、提供された関数がベクトル化可能かどうかをテストします。この部分では、関数がサンプルデータに対して正しく適用できるかどうかを検証し、ベクトル化された適用と標準の適用の結果を比較します。
- エラー処理と実行時間の推定: ベクトル化が不可能な場合、関数の実行時間を推定し、その時間が設定された閾値を超えるかどうかを評価します。この評価に基づき、Daskを使用した並列処理か、通常のpandas
apply
のどちらかを選択します。- Daskの使用条件: データフレームのdtypeがオブジェクト(例えば文字列)でない、またはユーザーが明示的にDaskの使用を許可している場合に、Daskによる処理が可能です。さらに、推定された処理時間が設定された閾値を超えていれば、Daskを用いた並列処理が行われます。
このメソッドは、パフォーマンスの最適化を図るために様々な条件と設定を利用しており、データのサイズや処理の複雑性に応じて最も適切な処理方法を動的に選択します。これにより、大規模なデータセットや計算に時間がかかる操作に対して効率的なデータ処理を実現します。
これを受けて色々調べた
なぜdf.swifter.apply という形で使えるの?
以下のコードでpandasにswifterをくっつけている?
@pd.api.extensions.register_dataframe_accessor("swifter")
pandasのデータフレームに新しいアクセサを登録して使えるようにしている。
pandas側で設定しているようで、他のライブラリでも同じようなことができるかどうかは確認しないとらしい。
可能性のテストなどはどう行っている?
ベクトル化可能かどうか、daskとの実行時間の比較などはどのように行っているのか。
基本的には、特定の数だけsampleを持ってきて、そこにテストを行っている。
-
sampleの取ってくる方法
https://github.com/jmcarpenter2/swifter/blob/master/swifter/base.py
に、self._SAMPLE_SIZE = SAMPLE_SIZE if self._nrows > (25 * SAMPLE_SIZE) else int(ceil(self._nrows / 25)) self._SAMPLE_INDEX = sorted(np.random.choice(range(self._nrows), size=self._SAMPLE_SIZE, replace=False))
とある。SAMPLE_SIZEと要素数の25分の1の小さい方の個数分サンプルとして使っていることがわかる。SAMPLE_SIZEはデフォルトで1000。
-
ベクトル化できるかの判断
関数をそのまま適用して、エラーが起こるかを試している。
try: # try to vectorize with suppress_stdout_stderr_logging(): tmp_df = func(sample, *args, **kwds) sample_df = sample.apply(func, convert_dtype=convert_dtype, args=args, **kwds) self._validate_apply( np.array_equal(sample_df, tmp_df) & (hasattr(tmp_df, "shape")) & (sample_df.shape == tmp_df.shape), error_message=("Vectorized function sample doesn't match pandas apply sample."), ) return func(self._obj, *args, **kwds) except ERRORS_TO_HANDLE:
try-catchで、そのまま適用してエラーが出るようならdaskによる並列化を試しに行く。
また、適用できても結果が異なる場合もエラーを出してdask処理に進ませる。 -
Daskを使うかの判断
swifter.pyの該当部分
except ERRORS_TO_HANDLE: # if can't vectorize, estimate time to pandas apply wrapped = self._wrapped_apply(func, axis=axis, raw=raw, result_type=result_type, args=args, **kwds) timed = timeit.timeit(wrapped, number=N_REPEATS) sample_proc_est = timed / N_REPEATS est_apply_duration = sample_proc_est / self._SAMPLE_SIZE * self._nrows # if pandas sample apply takes too long # and not performing str processing, use dask if (est_apply_duration > self._dask_threshold) and allow_dask_processing and axis == 1: return self._parallel_apply(func, axis, raw, result_type, *args, **kwds) else: # use pandas return self._pandas_apply(self._obj, func, axis, raw, result_type, *args, **kwds)
swifterのdocument(https://github.com/jmcarpenter2/swifter/blob/ea8d30ab1055441cca10cd07913c78370c284352/docs/documentation.md?plain=1#L218)内の記述
pandas.DataFrame.swifter.set_dask_threshold(dask_threshold=1).apply
Specify the dask threshold (in seconds) for the max allowable time estimate for a pandas apply on the full dataframe
def pandas.DataFrame.swifter.set_dask_threshold(dask_threshold=1)
For example, let's say we have a pandas dataframe df. The following will perform a swifter apply, with the threshold set to 3 seconds
df.swifter.set_dask_threshold(dask_threshold=3).apply(lambda x: x+1)
デフォルトでは、全体をpandasのapplyのみで処理したら1秒かかりそうと判断したらdaskを使う事になっている。このしきい値は引数で調整できる。