クラスタリングの手法のひとつに、混合ガウスモデルの推定があります。これは、データセットを複数個のガウス分布で表現、及びクラスタ分けする手法です。通常、これらのガウス分布は有限個を仮定しますが、もし無限個あったとしたら、どんなクラスタリングになるのでしょうか。
本記事では、機械学習プロフェッショナルシリーズ『ノンパラメトリックベイズ 点過程と統計的機械学習の数理』を参考にして、「無限混合ガウスモデル」及び「ノンパラメトリック」の概要と実装を紹介します。この本、非常に分かりやすいのでおすすめです。
詳細な理論の話は省略し、概要とストーリーを重視したいと思います。
また、実装はソースコードを記載する程度に留めます。
※前回の記事
【Python】周辺化ギブスサンプリングを実装してみた
ただし、本記事だけ読んでも流れが伝わるように意識しています。
##無限混合ガウスモデルの導入
通常の混合ガウスモデルでは、データセットが有限個のガウス分布で表現され、各データがその中のガウス分布と紐付いていると考えます。クラスラ数(=ガウス分布数)はあらかじめ決めておく必要がありますが、適切なクラスタ数はデータセットによって変化し、それを把握するのは容易ではありません。
もしクラスタリング中にクラスタ数も自動的に決まれば、上記の課題に悩まずに済みますよね。それを実現するのが、無限混合ガウスモデルです。
無限混合ガウスモデルでは、「ガウス分布は無限個存在し、データセットのデータは、そのうちの有限個と結びついている」という考え方に基づいています。
次の図は、後ほどお見せするクラスタリングの結果です。(めんどうなので図を使いまわします。ご了承ください。)
このデータセットは3個のガウス分布で表現されています。
通常の混合ガウスモデルでは、この世界にちょうど3個のガウス分布があると考えるのに対して、無限混合ガウスモデルでは、データセットと結びつかない潜在的なガウス分布が無限個あると考えます。
このように考えることで、データが既存の3個のクラスタに結びつく確率に加え、平均がどこにあるのか分からない未知のクラスタと結びつく確率も計算できるのです。
つまり、無限混合ガウスモデルによるクラスタリングでは、データが新しいクラスタに割り当てられたり、一方で既存のクラスタがどのデータとの結びつかず消滅したりと、クラスタ数を増減させながら、最後は適切な数に収束させることができます。
##確率分布の算出
ここから計算式を掲載します。
通常の混合ガウス分布では、($x_{1:N}$1を観測しないときの)$z_i$の確率分布にカテゴリカル分布を適用します。
P(z_i = k | \pi) = \pi_k
ここで、$\pi := (\pi_1, \cdots, \pi_K)$は$(K-1)$次元単体上の点です。2
$\pi$の値の決め方として、一様分布を仮定したり、事前分布を仮定してベイズ推定する方法3があります。
一方、無限混合ガウスモデルでは、中華料理過程と呼ばれる確率過程を適用します。これは、$i$番目以外のデータのクラスタ$z_{1:N}^{\backslash i}$から、$i$番目のデータのクラスタ$z_i$の確率分布を算出する仕組みです。
P(z_i = k | z_{1:N}^{\backslash i}, \alpha) =
\left\{
\begin{array}{ll}
\frac{n_k}{N - 1 + \alpha} & (k = 1, \cdots, K) \\
\frac{\alpha}{N - 1 + \alpha} & (k = K + 1)
\end{array}
\right.
$k=1, \cdots, K$が既存のクラスタ、$k=K+1$が新しいクラスタに相当します。ここで、$n_k$は$i \neq j, z_j=k$となる$j$の個数、$\alpha$は正の値を取るハイパパラメータです。式から分かる通り、$\alpha$が大きほど新しいクラスタに割当される確率が大きくなります。今後、この確率分布を${\rm CRP}(k | z_{1:N}^{\backslash i}, \alpha)$と書くことにします。
次に、$x_{1:N}$を観測したときの確率分布を考えます。通常の混合ガウスモデルでは、次のようにして求めることができます。
\begin{align}
P(z_i = k | x_{1:N}, \mu_{1:K}, \Sigma_{1:K}, \pi)
&= \frac{P(z_i = k | \pi) N(x_i | \mu_k, \Sigma_k)}{\sum_{l=1}^{K}P(z_i = l | \pi) N(x_i | \mu_l, \Sigma_l)} \\
&= \frac{\pi_k N(x_i | \mu_k, \Sigma_k)}{\sum_{l=1}^{K}\pi_l N(x_i | \mu_l, \Sigma_l)}
\end{align}
$N(x | \mu, \Sigma)$は多変量ガウス分布の確率密度関数です。
無限混合ガウスモデルでは、上式の$P(z_i = k | \pi)$を中華料理過程の式に変えることで確率分布を求めることができます。
\begin{align}
P(z_i = k | x_{1:N}, \mu_{1:K}, \Sigma_{1:K}, z_{1:N}^{\backslash i}, \alpha)
&= \frac{{\rm CRP}(k | z_{1:N}^{\backslash i}, \alpha) N(x_i | \mu_k, \Sigma_k)}{\sum_{l=1}^{K+1}{\rm CRP}(l | z_{1:N}^{\backslash i}, \alpha) N(x_i | \mu_l, \Sigma_l)}
\end{align}
ここで、ひとつ注意があります。既存のクラスタ$k=1, \cdots, K$については、各クラスタに所属するデータから平均$\mu_k$と分散共分散行列$\Sigma_k$を推定することが可能です。
ところが、新しいクラスタ$k=K+1$については、クラスタに所属するデータがないため、直接的な平均と分散共分散行列の推定ができません。
そこで、今回は次の方法を採用します。
- $\mu_k$に事前分布を仮定し、同時分布$P(x_i, \mu_k| \cdots)$を周辺化して、$\mu_k$を陽に推定せずに確率分布を求める
- $\Sigma_k$を全てのクラスタで共通の行列$\sigma^2 I$とする。($\sigma^2$はハイパパラメータ)
平均に関して、ベイズ推定と周辺化によって、事前分布のみから確率分布を計算します。
分散共分散行列に関して、上記のように固定してもクラスタリングには十分なので、今回は推定なしにします。4もちろん、分散を推定してクラスタリングすることも可能です。
場合分けするのはナンセンスなので、この手法を$k=1, \cdots, K+1$のときにも適用します。
なお、周辺化については前回の記事をご参照ください。
##ノンパラメトリックベイズによるクラスタリング
$N$個のデータのクラスタ$z_{1 :N}$を推定するため、$z_1$から$z_N$まで一つずつ順次確率的にサンプリングすることにします。この方法をギブスサンプリングといい、今回のように、無限次元の確率分布をもとにした方法を特にノンパラメトリックベイズといいます。
この方法を実行するため、初期のクラスタリング$z_{1:N}$を何かしら仮定する必要があります。今回は全て同じクラスタ$k=1$を初期クラスタリングとして、$z_1$から$z_N$までのサンプリングを複数回反復することにします。最初は1つしかクラスタがありませんが、平均から遠いデータが新しいクラスタと結びついて、クラスタ数を増やしながらクラスリングが進むイメージです。
##実装
無限混合ガウスモデルによるクラスタリングを実装しました。
前回の実装をベースにしていますが、全体的にブラッシュアップしており、差分が大きいかもしれません。
ソースコード
from collections import Counter
from functools import partial
from typing import Sequence, Tuple
import numpy as np
from scipy.special import logsumexp
def _log_gaussian(x: np.ndarray, mu: np.ndarray, var: float) -> np.ndarray:
# 前回記事では2*np.piを省略していましたが、分かりやすさのため今回は計算に入れます
d = x.shape[0]
norm_squared = ((x - mu) * (x - mu)).sum(axis=1)
return - d * np.log(2 * np.pi * var) / 2 - norm_squared / (2 * var)
# クラスタを格納する配列クラス
class InfiniteClusterArray(object):
def __init__(self, array: Sequence[int]):
self._array = np.array(array, dtype=int)
self._counter = Counter(array)
self._n_clusters = max(array) + 1
def update(self, i: int, k: int):
assert k <= self._n_clusters
pre_value = self._array[i]
if pre_value == k:
return
if self._counter[pre_value] > 0:
self._counter[pre_value] -= 1
self._array[i] = k
self._counter[k] += 1
if k == self._n_clusters:
self._n_clusters += 1
if not self._counter[pre_value]:
self._array -= 1 * (self._array > pre_value)
self._counter = Counter(self._array)
self._n_clusters -= 1
@property
def array(self) -> np.ndarray:
return self._array.copy()
@property
def n_clusters(self) -> int:
return self._n_clusters
def count(self, k: int, excluded=None) -> int:
assert (excluded is None) or isinstance(excluded, int)
if excluded is None:
return self._counter[k]
counted = self._counter[k] - bool(self._array[excluded] == k)
return counted
def __getitem__(self, i: int) -> int:
return self._array[i]
# データセットを格納する配列クラス
class DataArray(object):
def __init__(self, array: np.ndarray):
assert array.ndim == 2
self._array = array
@property
def n(self) -> int:
return self._array.shape[0]
@property
def d(self) -> int:
return self._array.shape[1]
def mean(self, excluded=None) -> np.ndarray:
assert (excluded is None) or isinstance(excluded, Sequence)
if excluded is None:
return self._array.mean(axis=0)
else:
return self._array[excluded].mean(axis=0)
def __getitem__(self, i):
return self._array.__getitem__(i)
def __iter__(self):
for i in range(self.n):
yield self._array[i]
def __str__(self) -> str:
return f'DataArray({self._array})'
# 無限混合ガウスのギブスサンプリングクラス
class IGMMGibbsSampling(object):
def __init__(self, var: float, var_pri: float, alpha: float, seed=None):
# 事前分布の平均は、データセット全体の平均に設定されます。
assert (var > 0) and (var_pri > 0)
assert alpha > 0
self.var = var
self.var_pri = var_pri
self.alpha = alpha
self._random = np.random.RandomState(seed)
def predict_clusters(self, X: np.ndarray, n_iter: int, init_z=None) -> np.ndarray:
# クラスタを推定するメソッド
assert X.ndim == 2
X = DataArray(X)
mu_pri = X.mean()
if init_z is None:
init_z = np.zeros(X.n, dtype=int)
z = InfiniteClusterArray(init_z)
for _ in range(n_iter):
compute_probs = partial(self._compute_pmf_zi, X, z, mu_pri)
for i in range(X.n):
probs = compute_probs(i)
z_i = self._random.multinomial(1, probs)
z_i = np.where(z_i)[0][0] # One-hot to index
z.update(i, z_i)
return z.array
def _compute_pmf_zi(self, X: DataArray, z: InfiniteClusterArray, mu_pri: np.ndarray, i: int) -> np.ndarray:
mu, var = self._compute_marginal_distribution(X, z, mu_pri, i)
pi = np.array([
z.count(k, excluded=i) / (X.n - 1 + self.alpha)
if k < z.n_clusters
else self.alpha / (X.n - 1 + self.alpha)
for k in range(z.n_clusters + 1)
])
log_pdf_xi = _log_gaussian(X[i], mu, var)
with np.errstate(divide='ignore'):
pmf_zi = np.exp(np.log(pi) + log_pdf_xi - logsumexp(np.log(pi) + log_pdf_xi)[np.newaxis])
return pmf_zi
def _compute_marginal_distribution(self, X: DataArray, z: InfiniteClusterArray,
mu_pri: np.ndarray, i: int) -> Tuple[np.ndarray, float]:
n_vec = np.array([
z.count(k, excluded=i)
for k in range(z.n_clusters + 1)
], dtype=np.int)
tmp_vec = 1 / (n_vec / self.var + 1 / self.var_pri)
mu = np.tile(mu_pri / self.var_pri, (z.n_clusters + 1, 1)) # Shape (z.n_clusters + 1, X.d)
for k in range(z.n_clusters):
excluded_list = [
j for j in range(X.n)
if (j != i) and (z[j] == k)
]
if excluded_list:
mean = X.mean(excluded=excluded_list)
mu[k] += n_vec[k] * mean / self.var
mu *= tmp_vec[:, np.newaxis]
var = tmp_vec + self.var
return mu, var
##クラスタリングの実践
試しに、実装した自作クラスを使ってクラスタリングしてみます。
今回は、次のデータセットに適用します。
ソースコード
Jupyter Notebookを使用しています。
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
%matplotlib inline
np.random.seed(0)
cmap = plt.rcParams['axes.prop_cycle'].by_key()['color']
pi = [0.6, 0.2, 0.2]
mu = np.array([[1, 2], [-2, 0], [3, -2]])
var = [0.8, 0.4, 0.2]
size = 100
ndim = 2
def make_data():
z = np.random.multinomial(1, pvals=pi)
z = np.where(z==1)[0][0]
cov = var[z] * np.eye(2)
return np.random.multivariate_normal(mu[z], cov)
X = np.array([make_data() for _ in range(100)])
fig = plt.figure()
ax = plt.axes()
ax.scatter(X[:, 0], X[:, 1], color='gray')
ax.set_aspect('equal')
plt.show()
序盤でお見せしたデータセットの、まだクラスタリングされていない状態のものです。
データの個数は100個で、3つのガウス分布からランダムに選んで作成しています。
したがって、3つのクラスタに分けるのが適切です。
それでは、クラスタリングしてみましょう。
model = IGMMGibbsSampling(var=0.5, var_pri=4, alpha=0.1, seed=0) # ハイパパラメータをコンストラクタにわたす
clusters = model.predict_clusters(X, n_iter=10) #クラスタリングの実行
# 可視化
fig = plt.figure()
ax = plt.axes()
for k in range(max(clusters) + 1):
data = np.array([
X[i, :] for i in range(X.shape[0])
if clusters[i] == k
])
ax.scatter(data[:, 0], data[:, 1], label=f'cluster {k + 1}', color=cmap[k])
ax.set_aspect('equal')
plt.legend()
plt.show()
クラスタで共通の分散var
はデータセット小さめの$0.5$、平均の事前分布の分散var_pri
は、一様分布っぽくするために大きめの$4$、中華料理過程のパラメータalpha
は$0.1$にしています。
ちなみに、平均の事前分布の平均は、データセット全体の平均になるように実装されています。
クラスタリングメソッドpredict_clusters
に、データセットと反復回数n_iter
を渡しています。ここで、反復回数を10としました。
結果は、次のようになりました。
クラスタ数を指定せず、全て同じクラスタの状態からスタートしたのにもかかわらず、3個のクラスタに分かれてくれました。
せっかくなので、反復回数ごとのクラスタリングの推移を調べてみます。
n_iter=2, 4, 6, 8, 10
におけるクラスタリング結果を図示しました。
- n_iter = 2
- n_iter = 4
- n_iter = 6
- n_iter = 8
- n_iter = 10
初期状態から多めにクラスタができ、そこから徐々にクラスタが減って3個になったようです。
ちなみに、ここからn_iter
を増やしても、クラスタ数は基本的に3個の状態を維持していました。
※たまにデータが1個だけのクラスタができて、クラスタ数が4個になることもありました。
##ハイパパラメータの調整
中華料理過程のハイパパラメータ$\alpha$は、上述した通り、大きほど新しいクラスタが出来やすくなります。
もともとクラスタ数の推定を省略するという動機があり、この$\alpha$の調整に苦戦してしまったら導入した意義が薄れます。
$\alpha$を変えながら試行したところ、基本的には小さめにすれば問題ないという印象を受けました。
次の図は、$\alpha=0.0001$、反復回数10回の結果です。
しっかり3つのクラスタに分かれています。
$\alpha$をかなり小さめにしても、反復回数に大きなインパクトなく、適切にクラスタリングできるようです。
ちなみに、$\alpha=1$にすると、10回反復時点で5個のクラスタができ、その後も3個に減ることはありませんでした。$\alpha$が大きすぎるとうまくクラスタ数が決まらないようです。
機械学習で使われる最適化アルゴリズムのステップ幅と同じ性質を持っていますね。
##おわりに
以上、無限混合ガウスモデルによるクラスタリングを試してみました。
狙い通りのクラスタ数に分かれてくれたのでおもしろかったです。
-
$N$個の変数$x_1, \cdots, x_N$を、省略して$x_{1:N}$と書くことにします。また、その中から$i$番目を除いた$N-1$個の変数を$x_{1:N}^{\backslash i}$と書くことにします。 ↩
-
$\pi_1, \cdots, \pi_K \ge 0$及び$\sum_{k=1}^K\pi_k=1$を満たす$(\pi_1, \cdots, \pi_K)$の集合を$(K-1)$次元単体と言います。もともとは幾何学で使われる集合で、$0$次元単体は1点、$1$次元単体は線分、$2$次元単体は三角形を表しています。ベイズ推定の本を読んでると、確率の文脈でこの集合が使われるのを目にします。 ↩
-
$\pi$を$z_{1:N}^{\backslash i}$の観測から最尤推定する方法もありますが、この方法ではクラスタ$k$が消滅するすると常に$\pi_k=0$となり、クラスタが復活しなくなるため注意が必要です。 ↩
-
$\Sigma_k$をクラスタ共通の固定値にしても、k-meansと同様の精度でクラスタリングができると思います。なぜなら、k-means法は、上記のように分散を固定した混合ガウスモデルにおいて、各$\mu_k$や$z_i$を最尤推定した場合と同等だからです。 ↩