そもそもGMM(混合ガウスモデル)ってなんだ?
複数の正規分布の重ね合わせでデータセットの確率密度を推定するモデル
各データ点について、どの正規分布に所属するのか確率密度の高さで比較することで、クラスタリングなども行うことができます。
モデルについて何がわかればいいの?
コンポーネント数(混合された正規分布の数)をハイパーパラメータとして、
- 各コンポーネントの平均ベクトル: $\boldsymbol{\mu}$
- 各コンポーネントの共分散行列: $\boldsymbol{\Sigma}$
- 混合係数ベクトル(各コンポーネントの重み): $\boldsymbol{\pi}$
がわかればモデルを構築することができます。これらのモデルパラメータを最尤推定法で求める必要があります。
最尤推定というのは、パラメータを調整して与えられたデータに最もフィットするように最適化することです。
余談ですが、ここで、各コンポーネント共分散行列の値が等しい対角要素しかない、各コンポーネントの寄与率が等しいという条件を仮定すれば、GMMによるクラスタリングはk-means法によるクラスタリングと等価になります。
つまり、k-means法は単純化されたGMMだとみなすことができます。面白いですね。
どうやって最尤推定なんてするの? 目的関数はどうするの?
>どうやって最尤推定なんてするの?
EMアルゴリズムを使用することが多いらしいです。
EMアルゴリズムというのは EvaluateメソッドとModifyメソッドを使用してそれぞれ尤度評価、パラメータ更新を何度も繰り返して行うアルゴリズム。
>目的関数はどうするの?
対数尤度関数を使用することが多いらしい。次式で与えられます。
$N$:データ点数, $n$:混合成分数, $N(x_j | \mu_k, \Sigma_k)$:第$k$成分のガウス分布の確率密度関数
$\log L(\boldsymbol{\pi}, \boldsymbol{\mu}, \boldsymbol{\Sigma} | \mathbf{X}) = \log \left\{ \prod_{j=1}^{N} \sum_{k=1}^{n} \pi_k N(x_j | \mu_k, \Sigma_k) \right\} = \sum_{j=1}^{N} \log \left\{ \sum_{k=1}^{n} \pi_k N(x_j | \mu_k, \Sigma_k) \right\}$
この値を最大にするように最適化を行います
EMアルゴリズムって具体的には何やってんの?
Modifyメソッドでは、ラグランジュの未定乗数法を使うらしい。目的関数が既知(解析的に表現できる)で微分可能かつ、制約条件がある場合(今回は$\boldsymbol{\pi}$の和が1)に使える極値の発見法です。
関数形状が不明な最適化問題のように学習率などの設定は不要です。
e-step,m-stepでは不確実な操作を行わないのもEMアルゴリズムの利点だと考えています。
ちなみに、最も基本的なアルゴリズムでは各パラメータの更新は次の式に従って行われます。
$\pi_k^{\text{new}} = \frac{N_k}{N}$
$\mu_k^{\text{new}} = \frac{\sum_{j=1}^{N} \gamma(z_{j,k}) x_j}{N_k}$
$\Sigma_k^{\text{new}} = \frac{1}{N_k} \sum_{j=1}^{N} \gamma(z_{j,k}) \left(x_j - \mu_k^{\text{new}}\right) \left(x_j - \mu_k^{\text{new}}\right)^T$
なお、ここで出てきた$\gamma(z_{j,k})$は負担率という値は各データポイントについて、それぞれのコンポーネントがどれくらい寄与しているか、を示す値です。詳しく知ろうとするとベイズの定理の話になり、議論が脱線するので次の式で与えられることだけをまずは知っておくのが良いと思います。
$\frac{\pi_k N(x_i | \mu_k, \Sigma_k)}{\sum_{j=1}^{n} \pi_j N(x_i | \mu_j, \Sigma_j)}$
実際にやってみよう!!
実装の式を見た方がいろいろわかりやすいと思うので、やってみます。
scikit-learnには一行で実行できてしまう関数がありますが、今回はnumpyとscipyの機能を使用して実装してみます。
まずはライブラリインポート
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from matplotlib.patches import Ellipse
from scipy.stats import multivariate_normal
クラス、メソッドの作成
class GMMFromScratch:
def __init__(self, n_components=3, max_iter=10, tol=1e-3, random_state=None):
"""
Gaussian Mixture Model implementation from scratch using EM algorithm
Parameters:
- n_components: Number of Gaussian components
- max_iter: Maximum number of iterations
- tol: Tolerance for convergence
- random_state: Random seed for reproducibility
"""
self.n_components = n_components
self.max_iter = max_iter
self.tol = tol
self.random_state = random_state
# Parameters to be learned
self.means_ = None # Mean vectors for each component
self.covariances_ = None # Covariance matrices for each component
self.weights_ = None # Mixing coefficients (prior probabilities)
self.responsibilities_ = None # Posterior probabilities
self.prev_means_ = None # Previous means for convergence check
self.prev_covariances_ = None # Previous covariances for convergence check
self.prev_weights_ = None # Previous weights for convergence check
self.log_likelihood_history = [] # Track convergence
def _initialize_parameters(self, X):
"""
Initialize parameters for EM algorithm
"""
n_samples, n_features = X.shape
if self.random_state is not None:
np.random.seed(self.random_state)
# Initialize means randomly from data points
random_indices = np.random.choice(n_samples, self.n_components, replace=False)
self.means_ = X[random_indices].copy()
# Initialize covariances as identity matrices scaled by data variance
data_variance = np.var(X, axis=0)
self.covariances_ = np.array([np.diag(data_variance) for _ in range(self.n_components)])
# Initialize weights uniformly
self.weights_ = np.ones(self.n_components) / self.n_components
print("Initial parameters:")
print(f"Means: {self.means_}")
print(f"Weights: {self.weights_}")
print("-" * 50)
def _e_step(self, X):
"""
E-step: Calculate responsibilities (posterior probabilities)
"""
n_samples = X.shape[0]
self.responsibilities_ = np.zeros((n_samples, self.n_components))
# Calculate likelihood for each component
for k in range(self.n_components):
likelihood = multivariate_normal.pdf(X, self.means_[k], self.covariances_[k])
self.responsibilities_[:, k] = self.weights_[k] * likelihood
# likelihood before normalization
total_likelihood = np.sum(self.responsibilities_, axis=1, keepdims=True)
# Normalize to get posterior probabilities
self.responsibilities_ /= total_likelihood
# calculalte log likelihood
log_likelihood = np.sum(np.log(total_likelihood.flatten()))
return log_likelihood
def _m_step(self, X):
"""
M-step: Update parameters based on responsibilities
Update formulas:
N_k = Σ_i r_ik
μ_k = (Σ_i r_ik * x_i) / N_k
Σ_k = (Σ_i r_ik * (x_i - μ_k)(x_i - μ_k)^T) / N_k
π_k = N_k / N
"""
# save previous parameters for convergence check
self.prev_means_ = self.means_
self.prev_covariances_ = self.covariances_
self.prev_weights_ = self.weights_
n_samples, n_features = X.shape
# Calculate effective number of points for each component
N_k = np.sum(self.responsibilities_, axis=0)
# Update weights (mixing coefficients)
self.weights_ = N_k / n_samples
# Update means
for k in range(self.n_components):
if N_k[k] > 1e-10: # Avoid division by zero
self.means_[k] = np.sum(self.responsibilities_[:, k:k+1] * X, axis=0) / N_k[k]
# Update covariances
for k in range(self.n_components):
if N_k[k] > 1e-10: # Avoid division by zero
diff = X - self.means_[k]
weighted_diff = self.responsibilities_[:, k:k+1] * diff
self.covariances_[k] = np.dot(weighted_diff.T, diff) / N_k[k]
# Add small regularization to avoid singular matrices
self.covariances_[k] += np.eye(n_features) * 1e-6
def fit(self, X):
"""
Fit the GMM model using EM algorithm
"""
print("=== GMM Learning Process ===")
print(f"Data shape: {X.shape}")
print(f"Number of components: {self.n_components}")
print("=" * 50)
# Initialize parameters
self._initialize_parameters(X)
log_likelihood = self._e_step(X)
# Initialize log-likelihood for convergence check
prev_log_likelihood = -np.inf
for iteration in range(self.max_iter):
# M-step
self._m_step(X)
# E-step
log_likelihood = self._e_step(X)
# Check for convergence
if abs(log_likelihood - prev_log_likelihood) < self.tol:
print(f"Converged after {iteration + 1} iterations")
break
# Print progress every 10 iterations
if (iteration + 1) % 10 == 0:
print(f"Iteration {iteration + 1}: Log-likelihood = {log_likelihood:.4f}")
self.log_likelihood_history.append(log_likelihood)
prev_log_likelihood = log_likelihood
print(f"Final log-likelihood: {log_likelihood:.4f}")
return self