ポアソン混合モデル その5:ポアソン混合モデルによる学習、EMアルゴリズムの適用<実装編-1:やっと推論>
今回は、EMアルゴリズムを使ってポアソン混合モデルによる学習を行うことにする。実装編です。
理論編はこちら。
ポアソン混合モデル その3:ポアソン混合モデルによる学習・推論、EMアルゴリズムの適用<理論編>
一応、手順をまとめておく。
0-0.データを準備する。
0-1.πとλを初期化する。
1.E-Stepを実行する。
$$
γ(s_{n,k}) = \frac{π_k Poi(x_n|λ_k)}{\sum_{k=1}^{K} π_k Poi(x_n|λ_k)}
$$
2ー1.M-Stepで $ λ^* $ を求める。
$$
λ^*_k = \frac{\sum_{n=1}^{N} γ(s_{n,k}) x_n}{\sum_{n=1}^{N} γ(s_{n,k})}
$$
2-2.M-Stepで $ π^* $ を求める。
$$
π^*_k = \frac{\sum_{n=1}^{N} γ(s_{n,k})}{N}
$$
3.$Q$関数の値で収束判定を行う。
$$
Q(\textbf{π}, \textbf{λ})
= \sum_{n=1}^{N} \sum_{k=1}^{K} γ(s_{n,k}) \ln ( \pi_k Poi(x_n|λ_k))
$$
収束していない場合、手順1.2ー1.2-2.を適当な回数繰り返す。
※いくつか注意事項。
(1)0-1.で $ \textbf{π} $ と $ λ_k (k = 1, 2, .. , K) $をてきとうに初期化している。
共役事前分布を用いる近似推論手法を用いる場合は $ \textbf{π} ∼ Dir(\textbf{π} | \textbf{α}) , λ_k ∼ Gam(λ_k | a_k, b_k) $ などの重要な仮定があるが、EMアルゴリズムでは関係ない話なのでただただてきとうに初期化した。
(2)3.で $ Q $ 関数の値による収束判定を行う。以前に投稿した本シリーズの記事では全く扱ってこなかった気がするが、察してほしい。忘れていました。
0-0.データを準備する。
データを準備する。ポアソン分布は1次元の入力に対する確率質量を表す分布なので、1次元のデータを用意する。
'''1.パラメータのてきとうな初期化'''
import numpy as np
K = 3
pi_vec_init = np.array([3/6, 2/6, 1/6])
lam_vec_init = np.array([10,20,40])
N = 1000
'''2.クラスタの割り振り'''
# (0,1,2)から確率(3/6,2/6,1/6)でN個のデータをサンプリング
S = np.random.choice(a=range(K), size=N, p=pi_vec_init)
'''3.ポアソン分布からのサンプリング'''
from scipy import stats
# 各nについて、3つのポアソン分布のそれぞれからのサンプリングを行っちゃう
# ここでnp.array()にする必要性はない。
mixed_poisson = np.array([stats.poisson.rvs(mu=lam_vec_init[k], size=N) for k in range(K)])
data = np.array([mixed_poisson[s][i] for i, s in enumerate(S)])
クラスタごとの分布を一応示しておく。
import matplotlib.pyplot as plt
plt.hist(mixed_poisson[0][np.where(S == 0)], alpha=0.7, bins=100)
plt.hist(mixed_poisson[1][np.where(S == 1)], alpha=0.7, bins=100)
plt.hist(mixed_poisson[2][np.where(S == 2)], alpha=0.7, bins=100)
全体でみるとこんな感じ
plt.hist(data, bins=100)
このヒストグラムを見てクラスターの数を決めることは難しそうだ。今回は3つのクラスターが存在するという仮定だの、3つのクラスターがありそうだという知見が得られているものとして話を進めよう。
0-1.πとλを初期化する。
クラスタの混合割合 $ \textbf{π} $ と $ λ_k (k = 1, 2, .. , K) $ をてきとうに初期化する。
$ π $ の実現値に基づいてEMアルゴリズムを進める以上、初期化は手動でてきとうな値を決めればよい。
ディリクレ分布からのサンプリングは行わず、
$$
\textbf{π} = (\frac{1}{3}, \frac{1}{3}, \frac{1}{3})
$$
と適当に初期化する。
$ λ_k (k = 1, 2, 3) $ についてはヒストグラムを見ながら3つの $ λ $ を決め打ちしてみる。
データがポアソン混合モデルで近似できるとするならば、各々のポアソン分布の平均は $ λ $ である。
$$
\textbf{λ} = (15, 25, 45)
$$
くらいかなぁ、とてきとうに初期化してみる。
import numpy as np
K = 3
pi_vec = np.array([1/3, 1/3, 1/3])
lam_vec = np.array([15,25,45])
1.E-Stepを実行する。
$$
γ(s_{n,k}) = \frac{π_k Poi(x_n|λ_k)}{\sum_{k=1}^{K} π_k Poi(x_n|λ_k)}
$$
ここで、 $ γ(s_{n,k}) $ は $ s_{n,k} $ の事後分布であり、右辺の計算に $ s_{n,k} $ の値を使っていないことに注意。
from scipy import stats
# 単一データ x_n を入力にとる
def update_gamma(x_n):
# k番目のクラスタでの確率質量とクラスタ重みの積。lambda関数として型だけ作る。
partial_poisson = lambda k, x : pi_vec[k] * stats.poisson.pmf(x, mu=lam_vec[k])
# k=1,2,3 について足し合わせる
all_poisson = np.array([partial_poisson(k=k, x=x_n) for k in range(K)]).sum()
# 各クラスタでのγの値を要素にもつK次元のnp.arrayをつくる
gamma_n = [partial_poisson(k=k, x=x_n) / all_poisson for k in range(K)]
return gamma_n
# 複数データ X = (x_1, .., x_n) を入力にとる
def gamma_all(X):
return np.array([update_gamma(x_n) for x_n in X])
gamma_matrix = gamma_all(data)
2ー1.M-Stepで λ* を求める。
$$
λ^*_k = \frac{\sum_{n=1}^{N} γ(s_{n,k}) x_n}{\sum_{n=1}^{N} γ(s_{n,k})}
$$
$ γ(s_{n,k}) $ (を成分にもつ行列)が求まったので、簡単な計算をするだけ。
def update_lambda(gamma_matrix, X):
# 分子
weighted_x = lambda k : np.sum(gamma_matrix[:, k] * X)
# 分母
weight_sum = lambda k : np.sum(gamma_matrix[:, k])
lam_vec = [weighted_x(k=k) / weight_sum(k=k) for k in range(K)]
return lam_vec
lam_vec = update_lambda(gamma_matrix=gamma_matrix, X=data)
2-2.M-Stepで π* を求める。
$$
π^*_k = \frac{\sum_{n=1}^{N} γ(s_{n,k})}{N}
$$
$ γ(s_{n,k}) $ 所与なので、こちらは秒で終わる。
def update_pi(gamma_matrix):
pi_k = lambda k : np.sum(gamma_matrix[:, k]) / len(gamma_matrix)
pi_vec = [pi_k(k=k) for k in range(K)]
return pi_vec
pi_vec = update_pi(gamma_matrix=gamma_matrix)
まだ1順しかしていないが、この時点でどの程度推論できているか、確認してみよう。
import matplotlib.pyplot as plt
from scipy import stats
# データの描画
plt.hist(data, bins=50, density=True, alpha=0.5)
x = np.arange(60)
# 初期のπ、λの値でクラスタごとのポアソン分布とその混合分布を描画する
original_poisson = lambda k : pi_vec_init[k] * stats.poisson.pmf(x, mu = lam_vec_init[k])
ori_poissons = [original_poisson(k=k) for k in range(K)]
ori_mixed_poisson = np.sum(ori_poissons, axis=0)
for k in range(K):
plt.plot(x, ori_poissons[k], color='royalblue')
plt.plot(x, ori_mixed_poisson, color='aqua')
# 1度EMアルゴリズムを実行して得られたπ、λの値で描画する
inferred_poisson = lambda k : pi_vec[k] * stats.poisson.pmf(x, mu = lam_vec[k])
inf_poissons = [inferred_poisson(k=k) for k in range(K)]
inf_mixed_poisson = np.sum(inf_poissons, axis=0)
for k in range(K):
plt.plot(x, inf_poissons[k], color='red', linestyle='dotted', alpha=1)
plt.plot(x, inf_mixed_poisson, color='orange', linestyle='dotted', alpha=1)
ピークの位置がそれぞれ多少ずれてはいるものの、このまま最適化を進めたらちゃんと一致しそう。
3.Q関数の値で収束判定を行う。
$$
Q(\textbf{π}, \textbf{λ})
= \sum_{n=1}^{N} \sum_{k=1}^{K} γ(s_{n,k}) \ln ( \pi_k Poi(x_n|λ_k))
$$
収束していない場合、手順1.2ー1.2-2.を適当な回数繰り返す。
今回は簡単すぎるデータだったが、今後を考えて$ Q $ 関数の実装だけはやっておこうか。
from scipy import stats
# 単一データ x_n に対するQ関数の値
def calc_q_n(n, x):
# avoid ZeroDivisionError
partial_log_poisson = lambda k : np.log1p(pi_vec[k] * stats.poisson.pmf(x, mu=lam_vec[k]))
gamma = gamma_matrix[n]
q_n = np.array([partial_log_poisson(k=k) * gamma[k] for k in range(K)]).sum()
return q_n
# 複数データ X = x_1, x_2, .., x_n に対するQ関数の値
def calc_Q(X):
Q = np.array([calc_q_n(n, x_n) for n, x_n in enumerate(X)]).sum()
return Q
Q = calc_Q(data)
print(f'value of Q function is {Q}.')
ちなみに、$ Q $ 関数の計算をE-Stepに含めるとする説明も多くある。今回は、E-StepとM-Step終了後に $ Q $ 関数の値を計算することにする。
実際には、これらの手順を何度も繰り返したり、複数の初期値に対して実行することになる。
簡単に繰り返せるよう、これまでの手順をまとめてクラスとして作ってみよう。
import numpy as np
from scipy import stats
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib.pyplot import cm, rc
# Google Colabではこの文が無いとアニメーションが再生されない。
rc('animation', html='jshtml')
class Mixed_Poisson:
def __init__(self, pi_vec, lam_vec):
if len(pi_vec) != len(lam_vec):
raise ValueError(
"shapes of pi_vec and lam_vec didn't match."
)
# クラスタ数K
self.K = len(pi_vec)
# クラスタの混合割合π(K次元ベクトル)の格納先
self.pi_history = np.array(pi_vec)[np.newaxis]
# クラスタごとのパラメータλ(K次元ベクトル)の格納先
self.lam_history = np.array(lam_vec)[np.newaxis]
# Q関数の値の格納先
self.Q_history = np.array([0])[np.newaxis]
# E-Stepで計算するγ(N×Kの行列) N:データ数
self.gamma_matrix = None
# データ
self.X = None
def execute_EM(self, X, n_iter=1):
self.X = X
for i in range(n_iter):
self.execute_E_Step()
self.execute_M_Step()
self.check_convergence()
def execute_E_Step(self):
# γの更新
self.gamma_matrix = self.gamma_all()
def execute_M_Step(self):
# λ、πを更新して格納先np.arrayの末尾に追加する
self.lam_history = np.vstack((self.lam_history, self.update_lambda()))
self.pi_history = np.vstack((self.pi_history, self.update_pi()))
def check_convergence(self):
# Q関数の値を計算して格納先np.arrayの末尾に追加する
self.Q_history = np.vstack((self.Q_history, self.calc_Q()))
def gamma_all(self):
# データごとにupdate_gamma()を呼び出す
return np.array([self.update_gamma(x_n) for x_n in self.X])
def update_gamma(self, x):
# k番目のクラスタでの確率質量とクラスタ重みの積。lambda関数として型だけ作る
# pi_history[-1][k]は最新のπベクトルのクラスタkに対応する値のこと
partial_poisson = lambda k, x : self.pi_history[-1][k] * stats.poisson.pmf(x, mu=self.lam_history[-1][k])
all_poisson = np.array([partial_poisson(k=k, x=x) for k in range(self.K)]).sum()
gamma_n = [partial_poisson(k=k, x=x) / all_poisson for k in range(self.K)]
return gamma_n
def update_lambda(self):
# クラスタkにおけるλの更新式の分母、分子をlambda関数として用意する
weighted_x = lambda k : np.sum(self.gamma_matrix[:, k] * self.X)
weight_sum = lambda k : np.sum(self.gamma_matrix[:, k])
lam_vec = [weighted_x(k=k) / weight_sum(k=k) for k in range(self.K)]
return lam_vec
def update_pi(self):
pi_k = lambda k : np.sum(self.gamma_matrix[:, k]) / len(self.gamma_matrix)
pi_vec = [pi_k(k=k) for k in range(self.K)]
return pi_vec
def calc_Q(self):
# データごとに呼び出したcalc_q_nの値の総和をとる
Q = np.array([self.calc_q_n(n, x_n) for n, x_n in enumerate(self.X)]).sum()
return Q
def calc_q_n(self, n, x):
# avoid ZeroDivisionError
partial_log_poisson = lambda k : np.log1p(self.pi_history[-1][k] * stats.poisson.pmf(x, mu=self.lam_history[-1][k]))
gamma = self.gamma_matrix[n]
q_n = np.array([partial_log_poisson(k=k) * gamma[k] for k in range(K)]).sum()
return q_n
# 描画用メソッド
# nth番目の推論結果を描画する。デフォルトでは最新の結果(ヒストリーのインデックス'-1'の値)を表示させる
def show_one_inference(self, nth=-1):
x_min = min(self.X)
x_max = max(self.X)
# x軸の分割数
x = np.arange(start=x_min, stop=x_max)
# インデックスnthに対応するiterationでのπ、λを用いて、k番目のクラスタのポアソン分布の確率質量を計算するlambda関数
kth_poisson = lambda k : self.pi_history[nth][k] * stats.poisson.pmf(x, mu = self.lam_history[nth][k])
mixed_poisson = np.zeros(shape=len(x))
for k in range(self.K):
# クラスタkのポアソン分布を描画する
poisson = kth_poisson(k=k)
plt.plot(x, poisson)
mixed_poisson += poisson
# 混合した分布を描画する
plt.plot(x, mixed_poisson, color='orange')
plt.hist(x=self.X, density=True, bins=100, color='silver')
def show_inference_process(self):
# クラスタの数だけ色をつくる
color = cm.brg_r(np.linspace(0, 1, self.K))
x_min = min(self.X)
x_max = max(self.X)
# x軸の分割数
x = np.arange(start=x_min, stop=x_max)
fig = plt.figure()
# 各要素に各iterationの描画情報を入れる
ims = []
# pi_historyの要素の数だけ、実行する。
for n_iter in range(len(self.pi_history)):
# インデックスnthに対応するiterationでのπ、λを用いて、k番目のクラスタのポアソン分布の確率質量を計算するlambda関数
kth_poisson = lambda k : self.pi_history[n_iter][k] * stats.poisson.pmf(x, mu = self.lam_history[n_iter][k])
mixed_poisson = np.zeros(shape=len(x))
# n_iter回目のiterationの描画(クラスタごとのポアソン分布と全体の混合した分布)情報をリストとしてまとめる
nth_imgs = []
for k, c in zip(range(self.K), color):
# クラスタkのポアソン分布
poisson = kth_poisson(k=k)
im, = plt.plot(x, poisson, color=c)
nth_imgs.append(im)
mixed_poisson += poisson
# 混合した分布
im1, = plt.plot(x, mixed_poisson, color='black')
nth_imgs.append(im1)
# n_iter回目のiterationの描画情報をimsに追加する。
ims.append(nth_imgs)
plt.hist(x=(self.X,), density=True, bins=100, color='silver')
# 10枚のプロットを 100ms ごとに表示
ani = animation.ArtistAnimation(fig, ims, interval=100)
plt.close()
plt.plot(np.arange(len(self.Q_history)), self.Q_history)
# Google Colaboratoryを使用していて、gifとして保存する場合、以下のコマンド1.を別セルで実行後、2.をコメントアウトする。
# 1.!apt-get update && apt-get install imagemagick
# 2.ani.save("output.gif", writer="imagemagick")
# 参照:https://qiita.com/shinmura0/items/ed96863281637e4fa10c
return ani
このクラスを用いてポアソン混合モデルによる推論を行った結果が下記。
sample = Mixed_Poisson(pi_vec=[0.2,0.3,0.5], lam_vec=[5,10,15])
sample.execute_EM(X=data, n_iter=50)
#sample.show_one_inference(nth=0)
sample.show_inference_process()
視覚的にわかりやすいよう、Matplotlib.ArtistAnimation メソッドを利用した。