1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

pandasのswifter、何をしてるのか

Posted at

参考記事:https://qiita.com/heisenberg_/items/fc4d2498f208a1d1426b

公式github:https://github.com/jmcarpenter2/swifter

swifterコード本体: https://github.com/jmcarpenter2/swifter/blob/master/swifter/swifter.py

ver1.0リーリス時のブログ:https://medium.com/@jmcarpenter2/swifter-1-0-0-automatically-efficient-pandas-and-modin-dataframe-applies-cfbd9555e7c8

swifterを使うと、プログレスバーが出るまでに時間がかかることがある。何をしているのか気になったので調べてみた。

概要(まとめ)

  • sampleをmin(1000,要素数の25分の1)個ランダムに選ぶ。これらのサンプルに対して
    • まずベクトル化できるか試す
      • ベクトル化とは
        • ループや反復処理を明示的に記述する代わりに、データ全体に対して一度に操作を適用する方法
        • numpyを使って処理する
      • ベクトル化がかなり速度に寄与するので、最初に試す
      • 試す方法
        • 直接適用してエラーが出ないか見る
        • エラーが出なくても結果が一緒化を見る
        • うまく行ってれば、全体に対してベクトル化(関数の直接適用)を行う
        • うまくいかなかれば次。
    • ベクトル化できない場合、daskによる並列化を行うか判断。
      • daskは並列処理ライブラリ
      • まずsampleに普通にapplyを適用し、しきい値より時間がかかる際はdaskによる並列化を行う。
      • しきい値は自分でも設定できる。デフォルトは1秒
        pandas.DataFrame.swifter.set_dask_threshold(dask_threshold=1).apply

調査方法

詳しい内容が気になる方は以下も見てみてください。

  • swifter.applyの重要そうな部分(コード本体400行目~)

    def apply(self, func, axis=0, raw=False, result_type=None, args=(), **kwds):
        """
        Apply the function to the DataFrame using swifter
        """
        # If there are no rows return early using Pandas
        if not self._nrows:
            return self._obj.apply(func, axis=axis, raw=raw, result_type=result_type, args=args, **kwds)
    
        # If parallel processing is forced by the user, then skip the logic and apply dask
        if self._force_parallel:
            return self._parallel_apply(func, axis, raw, result_type, *args, **kwds)
    
        sample = self._obj.iloc[self._SAMPLE_INDEX]
        # check if input is string
        # or if the user is overriding the string processing default
        allow_dask_processing = True if self._allow_dask_on_strings else ("object" not in sample.dtypes.values)
    
        try:  # try to vectorize
            with suppress_stdout_stderr_logging():
                tmp_df = func(sample, *args, **kwds)
                sample_df = sample.apply(func, axis=axis, raw=raw, result_type=result_type, args=args, **kwds)
                self._validate_apply(
                    np.array_equal(sample_df, tmp_df) & (hasattr(tmp_df, "shape")) & (sample_df.shape == tmp_df.shape),
                    error_message=("Vectorized function sample does not match pandas apply sample."),
                )
            return func(self._obj, *args, **kwds)
        except ERRORS_TO_HANDLE:  # if can't vectorize, estimate time to pandas apply
            wrapped = self._wrapped_apply(func, axis=axis, raw=raw, result_type=result_type, args=args, **kwds)
            timed = timeit.timeit(wrapped, number=N_REPEATS)
            sample_proc_est = timed / N_REPEATS
            est_apply_duration = sample_proc_est / self._SAMPLE_SIZE * self._nrows
    
            # if pandas sample apply takes too long
            # and not performing str processing, use dask
            if (est_apply_duration > self._dask_threshold) and allow_dask_processing and axis == 1:
                return self._parallel_apply(func, axis, raw, result_type, *args, **kwds)
            else:  # use pandas
                return self._pandas_apply(self._obj, func, axis, raw, result_type, *args, **kwds)
    
    

この部分が何してるか

ChatGPTに聞いてみた

このコードは、swifter ライブラリ内の apply メソッドの一部で、pandasのデータフレームやシリーズに関数を適用する際に内部的に使用されます。このメソッドの目的は、より効率的に関数をデータに適用することで、処理を高速化することです。以下に各部分の説明をします。

  1. 空のデータフレームに対する処理: データフレームに行がない場合、すぐに標準のpandasの apply メソッドを使用して結果を返します。
  2. 並列処理の強制実行: ユーザーが並列処理を強制的に指定している場合、Daskを使用して関数を並列に適用します。
  3. ベクトル化可能性のテスト: 最初にサンプルデータを取り、提供された関数がベクトル化可能かどうかをテストします。この部分では、関数がサンプルデータに対して正しく適用できるかどうかを検証し、ベクトル化された適用と標準の適用の結果を比較します。
  4. エラー処理と実行時間の推定: ベクトル化が不可能な場合、関数の実行時間を推定し、その時間が設定された閾値を超えるかどうかを評価します。この評価に基づき、Daskを使用した並列処理か、通常のpandas apply のどちらかを選択します。
  5. Daskの使用条件: データフレームのdtypeがオブジェクト(例えば文字列)でない、またはユーザーが明示的にDaskの使用を許可している場合に、Daskによる処理が可能です。さらに、推定された処理時間が設定された閾値を超えていれば、Daskを用いた並列処理が行われます。

このメソッドは、パフォーマンスの最適化を図るために様々な条件と設定を利用しており、データのサイズや処理の複雑性に応じて最も適切な処理方法を動的に選択します。これにより、大規模なデータセットや計算に時間がかかる操作に対して効率的なデータ処理を実現します。

これを受けて色々調べた

なぜdf.swifter.apply という形で使えるの?

以下のコードでpandasにswifterをくっつけている?

@pd.api.extensions.register_dataframe_accessor("swifter")

pandasのデータフレームに新しいアクセサを登録して使えるようにしている。

pandas側で設定しているようで、他のライブラリでも同じようなことができるかどうかは確認しないとらしい。

可能性のテストなどはどう行っている?

ベクトル化可能かどうか、daskとの実行時間の比較などはどのように行っているのか。
基本的には、特定の数だけsampleを持ってきて、そこにテストを行っている。

  • sampleの取ってくる方法

    https://github.com/jmcarpenter2/swifter/blob/master/swifter/base.py
    に、

    self._SAMPLE_SIZE = SAMPLE_SIZE if self._nrows > (25 * SAMPLE_SIZE) else int(ceil(self._nrows / 25))
    self._SAMPLE_INDEX = sorted(np.random.choice(range(self._nrows), size=self._SAMPLE_SIZE, replace=False))
    

    とある。SAMPLE_SIZEと要素数の25分の1の小さい方の個数分サンプルとして使っていることがわかる。SAMPLE_SIZEはデフォルトで1000。

  • ベクトル化できるかの判断

    関数をそのまま適用して、エラーが起こるかを試している。

    try:  # try to vectorize
        with suppress_stdout_stderr_logging():
            tmp_df = func(sample, *args, **kwds)
            sample_df = sample.apply(func, convert_dtype=convert_dtype, args=args, **kwds)
            self._validate_apply(
                np.array_equal(sample_df, tmp_df) & (hasattr(tmp_df, "shape")) & (sample_df.shape == tmp_df.shape),
                error_message=("Vectorized function sample doesn't match pandas apply sample."),
            )
        return func(self._obj, *args, **kwds)
    except ERRORS_TO_HANDLE:
    

    try-catchで、そのまま適用してエラーが出るようならdaskによる並列化を試しに行く。
    また、適用できても結果が異なる場合もエラーを出してdask処理に進ませる。

  • Daskを使うかの判断

    swifter.pyの該当部分

    except ERRORS_TO_HANDLE:  # if can't vectorize, estimate time to pandas apply
        wrapped = self._wrapped_apply(func, axis=axis, raw=raw, result_type=result_type, args=args, **kwds)
        timed = timeit.timeit(wrapped, number=N_REPEATS)
        sample_proc_est = timed / N_REPEATS
        est_apply_duration = sample_proc_est / self._SAMPLE_SIZE * self._nrows
    
        # if pandas sample apply takes too long
        # and not performing str processing, use dask
        if (est_apply_duration > self._dask_threshold) and allow_dask_processing and axis == 1:
            return self._parallel_apply(func, axis, raw, result_type, *args, **kwds)
        else:  # use pandas
            return self._pandas_apply(self._obj, func, axis, raw, result_type, *args, **kwds)
    

swifterのdocument(https://github.com/jmcarpenter2/swifter/blob/ea8d30ab1055441cca10cd07913c78370c284352/docs/documentation.md?plain=1#L218)内の記述

  1. pandas.DataFrame.swifter.set_dask_threshold(dask_threshold=1).apply

Specify the dask threshold (in seconds) for the max allowable time estimate for a pandas apply on the full dataframe

def pandas.DataFrame.swifter.set_dask_threshold(dask_threshold=1)

For example, let's say we have a pandas dataframe df. The following will perform a swifter apply, with the threshold set to 3 seconds

df.swifter.set_dask_threshold(dask_threshold=3).apply(lambda x: x+1)

デフォルトでは、全体をpandasのapplyのみで処理したら1秒かかりそうと判断したらdaskを使う事になっている。このしきい値は引数で調整できる。

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?