はじめに
業務で行っていたとある分析で、大量の確率モデルの確率密度関数を積分して確率を求める機会があったので、この記事ではその際に調べたことを、架空の問題設定を通して紹介していこうと思います。
問題設定
今回はダミーのデータセットを用いて説明しようと思います。
ECサイトのユーザの毎月の決済回数を集計して、平均と標準偏差を算出したという体で、以下のようにしてNumpyとPandasでダミーデータを作成します。
rg = np.random.default_rng()
models = []
for i in range(100000):
id = str(i)
usage = rg.uniform(0, 30, 12) # 各ユーザの12か月の利用回数を0~30の一様分布で生成
mean = float(np.mean(usage))
std = float(np.std(usage))
models.append((id, mean, std))
df = pd.DataFrame(models)
この平均と標準偏差をパラメータとして持つ正規分布を各ユーザの月間決済回数を表すモデルとして、そこから月間決済回数が3回以上となる確率を計算したいとします。
各ユーザの月間決済回数$x$が3回以上となる確率$P$は以下の式で表すことができます。
P = \int_3^\infty f(x)dx
($f(x)$は正規分布の確率密度関数)
上記を計算するために、今回は正規分布の確率密度関数に対してモンテカルロ法による数値積分を適用しようと思います。
モンテカルロ法による数値積分
モンテカルロ法による積分では大数の法則を利用しています。
大数の法則は$N$が十分大きいとき、確率変数$X$の母平均と標本平均がほぼ一致するというものなので、式にすると下記のようになります。
E[X] \simeq \frac{1}{N} \sum_{i=1}^{N}x_i
一方、確率変数$X$が連続値かつ確率密度関数が$f(x)$の分布に従うとき、平均は以下のように算出できます。
E[X] = \int_{-\infty}^{\infty} x f(x) dx
つまり、以下のように近似することができます。
\begin{align}
E[X] & = \int_{-\infty}^{\infty} x f(x) dx \\
& \simeq \frac{1}{N} \sum_{i=1}^{N}x_i
\end{align}
ここで、新たに、下記のような関数を考えてみましょう。
p(x) = \left\{
\begin{array}{ll}
1 & (a \leq x \leq b) \\
0 & (otherwise)
\end{array}
\right.
この関数を使って新たに$p(X)$の平均を考えてみると下記のようになります。
\begin{align}
E[p(X)] & = \int_{-\infty}^{\infty} p(x) f(x) dx \\
& = \int_{a}^{b} f(x) dx \\
& \simeq \frac{1}{N} \sum_{i=1}^{N}p(x_i)
\end{align}
この式から、確率変数$X$の標本が定積分の範囲内にあるかを0、1で判定し、その平均を求めると確率変数$X$が従う確率密度関数$f(x)$の定積分が近似できることがわかります。
今回の問題設定に当てはめてみる
上記で説明した方法を使うと、今回計算したい式も同様に近似することができます。
\begin{align}
P & = \int_3^\infty f(x)dx \\
& \simeq \frac{1}{N} \sum_{i=1}^{N}p(x_i)
\end{align}
ただし、
p(x) = \left\{
\begin{array}{ll}
1 & (3 \leq x) \\
0 & (otherwise)
\end{array}
\right.
です。
Pythonで実装すると下記のようになります。
def calc_score(mean, std):
rg = np.random.default_rng()
samples = rg.normal(mean, std, size=1000)
return float(np.mean(np.where(3 <= samples, 1, 0)))
scores = df.apply(lambda x: calc_score(x["mean"], x["std"]), axis=1)
結果
0 0.743
1 0.694
2 0.696
3 0.693
4 0.776
...
99995 0.892
99996 0.704
99997 0.862
99998 0.964
99999 0.861
Length: 100000, dtype: float64
PySparkのUDFでの実装
上記ではダミーデータは十万件のデータでしたが、実際のサービスのユーザ数は数千万人になることもあります。
さらに、例えば商品カテゴリ別に決済回数をモデリングしようとした場合、数億個のモデルが必要になっていきます。
そこで、分散処理で大量のデータを扱うことが可能なPySparkを利用して上記の計算を実装してみます。
(PySpark自体の詳細な解説はほかの記事を参考にしてください。)
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(df) # PandasのDataFrameをSpark用に変換
udf_calc_score = F.udf(calc_score) # PySparkのUDFに変換
result_udf = sdf.withColumn("score", udf_calc_score("mean", "std"))
result_udf.orderBy(F.col("id").cast("integer")).show()
結果
+---+------------------+------------------+------+
| id| mean| std| score|
+---+------------------+------------------+------+
| 0| 4.864985012897209| 2.711979067616106|0.7514|
| 1| 4.207096055467278| 2.711996246888057|0.6715|
| 2| 4.122583504095169|2.0070016565740505|0.7118|
| 3| 4.503288706628242|3.1332821680418603|0.6769|
| 4| 4.836283456042358|2.2431420014116106|0.7973|
| 5| 5.039709282775973|1.9655357794424146|0.8439|
| 6| 5.40800442972003|2.1199497864627306|0.8665|
| 7| 5.359114109298748|2.8897029323163594|0.7922|
| 8| 5.962014844006652| 2.420626673147105|0.8911|
| 9| 4.520956315122755| 3.129555951589222|0.6874|
| 10| 4.697220094340217|2.9001570790159517|0.7227|
| 11| 4.340648523527992|2.3903541140708082|0.7169|
| 12| 5.224398167302211| 2.798044688705444|0.7861|
| 13| 4.956513131965814|2.4723820857852794|0.7842|
| 14| 6.195062272858313| 2.592223124176471| 0.89|
| 15| 6.40957902615551|2.8045611874643153|0.8904|
| 16|5.3615439280325745|3.0187471199212714|0.7899|
| 17| 5.43548662730421| 2.566188399487096|0.8284|
| 18|5.0752067593774095| 2.528505031361047|0.7955|
| 19|4.8684189029305225| 3.014598256452725|0.7332|
+---+------------------+------------------+------+
only showing top 20 rows
このように、PySparkではUDFという機能を使うと簡単にPythonの処理を分散処理することが可能です。
(numpyのクラス(np.float64)などが返り値になっているものはUDFとして使えないので注意が必要です。)
結果を見てみると、Pandasとnumpyを利用したコードと若干異なっているため、乱数が1000サンプル程度だと数値積分の精度が良くないことがわかります。
が、今回はあくまで説明用なので良しとします。
PySparkのPandas UDFでの実装
上記のPySparkのUDFは、一つ一つのレコードに対しての処理しかできず、PySparkの裏側で動いているJAVAとの通信に時間がかかるため、処理時間が長くなりがちという欠点があるといわれています。
(実際、私の業務では最初UDFを使った実装を使ってみたのですが、処理が現実的な時間で回りきらなかったです。)
そこで、Pythonで実装した処理をPySparkで利用するためのもう一つの機能であるPandas UDFを利用して実装してみようと思います。
@F.pandas_udf("double")
def pandas_udf_calc_score(mean: pd.Series, std: pd.Series) -> pd.Series:
rg = np.random.default_rng()
mean = mean.to_numpy()[:, None]
std = std.to_numpy()[:, None]
n = len(mean)
samples = rg.standard_normal(size=(n, 1000))
samples = mean + samples * std
return pd.Series(np.mean(np.where(3 <= samples, 1, 0), axis=-1))
result_pandas_udf = sdf.withColumn("score", pandas_udf_calc_score("mean", "std"))
result_pandas_udf.orderBy(F.col("id").cast("integer")).show()
結果
+---+------------------+------------------+-----+
| id| mean| std|score|
+---+------------------+------------------+-----+
| 0| 4.864985012897209| 2.711979067616106|0.763|
| 1| 4.207096055467278| 2.711996246888057|0.645|
| 2| 4.122583504095169|2.0070016565740505| 0.71|
| 3| 4.503288706628242|3.1332821680418603| 0.67|
| 4| 4.836283456042358|2.2431420014116106|0.807|
| 5| 5.039709282775973|1.9655357794424146|0.856|
| 6| 5.40800442972003|2.1199497864627306|0.872|
| 7| 5.359114109298748|2.8897029323163594|0.802|
| 8| 5.962014844006652| 2.420626673147105|0.875|
| 9| 4.520956315122755| 3.129555951589222|0.677|
| 10| 4.697220094340217|2.9001570790159517|0.746|
| 11| 4.340648523527992|2.3903541140708082| 0.72|
| 12| 5.224398167302211| 2.798044688705444|0.779|
| 13| 4.956513131965814|2.4723820857852794|0.776|
| 14| 6.195062272858313| 2.592223124176471|0.888|
| 15| 6.40957902615551|2.8045611874643153|0.891|
| 16|5.3615439280325745|3.0187471199212714|0.789|
| 17| 5.43548662730421| 2.566188399487096|0.838|
| 18|5.0752067593774095| 2.528505031361047|0.796|
| 19|4.8684189029305225| 3.014598256452725|0.738|
+---+------------------+------------------+-----+
only showing top 20 rows
Pandas UDFはpandasのSeriesを引数とする関数をPySparkで利用できるようにする機能です。
つまり、複数のレコードをまとめて計算することができるため、この性質を活かして処理を高速化することができます。
上記の例では、numpyの正規分布の乱数生成が1つのパラメータに対してしか計算できない(=複数の分布の計算ではforループが必要になる)ため、標準正規分布の乱数を各レコードのパラメータを使って変数変換して計算することで、複数のレコードをまとめて計算しています。
この例のように、10万件程度のデータだとUDFとPandas_UDFの差はあまりないとおもわれますが、分散処理が必須になってくるようなデータ量だと大きな違いになるはずです。
まとめ
今回は、私が業務で大量の確率モデルの確率密度関数を積分する際に必要になった知識をご紹介しました。
まとめると、下記のようになります。
- 積分自体はモンテカルロ法を用いて数値積分することで実現が可能
- データ量があまりに多い場合、PySparkで分散処理が可能
- PySparkで数値積分の分散処理を行う際はPandas UDFが有効
この記事を最後まで読んでいただきありがとうございました。