「続・わかりやすい パターン認識 教師なし学習入門」ではアルゴリズムのサンプルコードが公開されていません。そこで、自分で書いてみました。
なお、具体的に実験をした結果はGithubで公開しています。
目次
5章 教師付き学習、教師なし学習
データ作成
教師付き学習、教師なし学習を適用するためのデータを作るクラスです
class dice(object):
def __init__(self, theta, pi):
self.theta = theta
self.pi = pi
def generate_data(self, num):
x, s = [], []
for _ in range(num):
s.append(self.__spots(self.pi))
x.append(self.__spots(self.theta[s[-1]]))
return x, s
@staticmethod
def __spots(pi):
p = np.random.rand()
for i in range(len(pi)):
if p < pi[:i + 1].sum():
return i
教師付き学習
教師付き学習のクラスです。
class Supervised_learning(object):
def __init__(self):
self.pi = None
self.theta = None
self.n = None
def estimate(self, x, s):
self.n = self.__count(x, s)
self.pi = self.n.sum(axis=1) / self.n.sum()
self.theta = self.n / np.tile(self.n.sum(axis=1), (self.n.shape[1], 1)).T
@staticmethod
def __count(x, s):
n = np.zeros((max(s) + 1, max(x) + 1))
for x_, s_ in zip(x, s):
n[s_][x_] += 1
return n
教師なし学習
教師なし学習のクラスです。
class Unsupervised_learning(object):
def __init__(self, theta=None, pi=None, c=3, m=2):
self.c = c
self.m = m
# Step1 If no initial value is specified, it will be determined at random
if pi is None:
pi = np.random.rand(self.c)
if theta is None:
rand = np.random.rand(self.c, self.m)
theta = rand / rand.sum(axis=1).reshape(-1, 1) # normalisation
self.pi = pi
self.theta = theta
self.r = None
self.p = None
def estimate(self, x):
self.r = np.array([i for i in collections.Counter(x).values()])
# Step2
self.__update_p()
# termination condition
epsilon = 1e-6
log_p = -sys.float_info.max
delta = None
while delta is None or delta >= epsilon:
# Step3
self.__update_param()
# Step4
log_p_new = self.__log_likelihood()
delta = log_p_new - log_p
log_p = log_p_new
def __update_p(self):
numer = self.pi.reshape(-1, 1) * self.theta
self.p = numer / numer.sum(axis=0)
def __update_param(self):
self.pi = (self.r * self.p).sum(axis=1) / self.r.sum()
self.__update_p()
numer = self.r * self.p
# self.theta = (numer.T / numer.sum(axis=1)).T # if theta is known, comment out on this line.
def __log_likelihood(self):
log_p = np.log((self.pi.reshape(-1, 1) * self.theta).sum(axis=0))
return (self.r * log_p).sum(axis=0)
7章 マルコフモデル
データ作成
マルコフモデルに従ったデータを作るクラスです。
class Markov_dice(object):
def __init__(self, A, B, rho):
self.A = A
self.B = B
self.rho = rho
def generate_spots(self, n):
s, x = [], []
s += self.__spots(self.rho.reshape(1, -1), 0)
for n in range(n):
x += self.__spots(self.B, s[-1])
s += self.__spots(self.A, s[-1])
return np.array(s[:-1]), np.array(x)
def __spots(self, theta, s_t):
p = np.random.rand()
for i in range(theta.shape[0]):
if p < theta[s_t, :i + 1].sum():
break
return [i]
マルコフモデルのパラメータ推定
マルコフモデルのパラメータを推定するクラスです
class Markov(object):
def __init__(self):
self.A = None
self.B = None
self.n = None
self.m = None
def estimate(self, x, s):
self.__initialize(x, s)
self.n, self.m = self.__count(x, s)
self.A = self.m / self.n.sum(axis=1).reshape(-1, 1)
self.B = self.n / self.n.sum(axis=1).reshape(-1, 1)
def __initialize(self, x, s):
self.A = np.zeros((max(s) + 1, max(s) + 1))
self.B = np.zeros((max(s) + 1, max(x) + 1))
@staticmethod
def __count(x, s):
n = np.zeros((max(s) + 1, max(x) + 1))
for x_, s_ in zip(x, s):
n[s_][x_] += 1
m = np.zeros((max(s) + 1, max(s) + 1))
for i in range(len(s) - 1):
m[s[i]][s[i + 1]] += 1
return n, m
8章 隠れマルコフモデル
データ作成
7章 マルコフモデルと同様の方法でデータを作成します
隠れマルコフモデルのパラメータ推定
隠れマルコフモデルのパラメータ推定はpythonでHMMのパラメータ推定実装で詳しく説明されています。
9章 混合正規分布
データ作成
混合正規分布に従ったデータを作成するクラスです。
class mixed_normal_dice(object):
"""mixed normal distribution model"""
def __init__(self, K=3, mu=None, sigma=None):
self.K = K
# If no initial value is specified, it will be determined at random
if mu is None:
mu = []
for i in range(self.K):
mu += [(np.random.rand() - 0.5) * 10.0] # range: -5.0 - 5.0
if sigma is None:
sigma = []
for i in range(self.K):
sigma += [np.random.rand() * 3.0] # range: 0.0 - 3.0:
self.mu = mu
self.scale = sigma
def generate_data(self, N):
X = []
mu_star = []
sigma_star = []
for i in range(self.K):
if self.mu.ndim >= 2:
# In the case of multidimensional normal distribution
X = np.append(X, np.random.multivariate_normal(self.mu[i], self.scale[i], N // self.K))
result = X.reshape(-1, self.mu.ndim), mu_star, sigma_star
else:
# In the case of one-dimensional normal distribution
X = np.append(X, np.random.normal(self.mu[i], self.scale[i], N // self.K))
result = X, mu_star, sigma_star
mu_star = np.append(mu_star, self.mu[i])
sigma_star = np.append(sigma_star, self.scale[i])
return result
混合正規分布のパラメータ推定
混合正規分布に従ったデータをパラメータ推定するクラスです。(グラフを描画する関数も含めています)
pythonで混合正規分布実装を参考にしました。
class mixed_normal_distribution(object):
"""Parameter Estimation for Mixed Normal Distribution"""
def __init__(self, K, pi=None, mu=None, sigma=None):
self.K = K
# If no initial value is specified, it will be determined at random
if pi is None:
pi = np.random.rand(self.K)
if mu is None:
mu = np.random.randn(K)
if sigma is None:
sigma = np.abs(np.random.randn(K))
self.pi = pi
self.mu = mu
self.sigma = sigma
def estimate(self, X):
# termination condition
epsilon = 0.000001
# Step1 initialize gmm parameter
Q = -sys.float_info.max
delta = None
# EM algorithm
while delta is None or delta >= epsilon:
gf = self.__gaussian(self.mu, self.sigma)
# E step: example posterior probability of hidden variable gamma
gamma = self.__estimate_posterior_likelihood(X, self.pi, gf)
# M step: miximize Q function by estimating mu, sigma and pi
self.__estimate_gmm_parameter(X, gamma)
# calculate Q function
Q_new = self.__calc_Q(X, gamma)
delta = Q_new - Q
Q = Q_new
@staticmethod
def __estimate_posterior_likelihood(X, pi, gf):
l = np.zeros((X.size, pi.size))
for (i, x) in enumerate(X):
l[i, :] = gf(x)
return pi * l * np.vectorize(lambda y: 1 / y)(l.sum(axis=1).reshape(-1, 1))
@staticmethod
def __gaussian(mu, sigma):
def f(x):
return np.exp(-0.5 * (x - mu) ** 2 / sigma) / np.sqrt(2 * np.pi * sigma)
return f
def __estimate_gmm_parameter(self, X, gamma):
N = gamma.sum(axis=0)
self.mu = (gamma * X.reshape((-1, 1))).sum(axis=0) / N
self.sigma = (gamma * (X.reshape(-1, 1) - self.mu) ** 2).sum(axis=0) / N
self.pi = N / X.size
def __calc_Q(self, X, gamma):
Q = (gamma * (np.log(self.pi * (2 * np.pi * self.sigma) ** (-0.5)))).sum()
for (i, x) in enumerate(X):
Q += (gamma[i, :] * (-0.5 * (x - self.mu) ** 2 / self.sigma)).sum()
return Q
def result(self, mu_star, sigma_star):
print(u'mu*: %s, sigma*: %s' % (str(np.sort(np.around(mu_star, 3))), str(np.sort(np.around(sigma_star, 3)))))
print(u'mu : %s, sigma : %s' % (str(np.sort(np.around(self.mu, 3))), str(np.sort(np.around(self.sigma, 3)))))
def graph(self, X):
n, bins, _ = plt.hist(X, 50, density=True, alpha=0.3)
seq = np.arange(-15, 15, 0.02)
for i in range(self.K):
plt.plot(seq, self.__gaussian(self.mu[i], self.sigma[i])(seq), linewidth=2.0)
plt.savefig('gmm_graph.png')
plt.show()
10章 凸クラスタリング
データ作成
9章 混合正規分布と同様の方法でデータを作成します
凸クラスタリング法
データを凸クラスタリングするクラスです。(グラフを描画する関数も含めています)
凸クラスタリング法の実装と実験とそのコードが載っているConvex Clusteringを一部参考にしました。
class convex_clustering(object):
def __init__(self, sigma):
# Step1 Set the variance to an appropriate value.
self.sigma = sigma
self.pi = None
self.n = None
self.X = None
def clustering(self, x):
self.X = x
# Step2 Calculating a prior f
self.n = len(x)
f = self.__calc_f(self.X, self.sigma, self.__gaussian)
# Step3 Set the initial value of prior probability pi
self.pi = np.repeat(1 / self.n, self.n)
# termination condition
epsilon = 1e-6
log_p = -sys.float_info.max
delta = None
# Step4 Update parameters
while delta is None or delta > epsilon:
self.__update_pi(f)
log_p_new = self.__log_likelihood(self.pi, f)
delta = log_p_new - log_p
log_p = log_p_new
def __calc_f(self, x, sigma, gauss_func):
"""Calculating f
Args:
x: pattern
sigma: variance
gauss_func: normal distribution function
Returns: f
"""
f = [gauss_func(x, mu, sigma) for mu in x]
return np.reshape(f, (self.n, self.n))
def __update_pi(self, f):
numer = self.pi.reshape(-1, 1) * f
denom = numer.sum(axis=0)
self.pi = np.sum(numer / denom, axis=1) / self.n
# To improve the convergence efficiency, set the pi near zero to zero
for i, pi_i in enumerate(self.pi):
if pi_i < 1e-7:
self.pi[i] = 0
@staticmethod
def __log_likelihood(pi, f):
return np.log((pi * f.T).sum(axis=1)).sum(axis=0)
def __gaussian(self, x, mu, sigma):
if self.sigma.ndim >= 2:
# In the case of multidimensional normal distribution
result = scs.multivariate_normal.pdf(x, mu, sigma[0])
else:
# In the case of 1D normal distribution
result = scs.norm.pdf(x, np.tile(mu, len(x)), np.tile(sigma, len(x) // len(sigma)))
return result
def graph(self):
# Half an initial pi is used as threshold.
threshold = 0.01
mu_valid, pi_valid = self.__select_valid_centroid(self.X, self.pi, threshold)
K = len(mu_valid)
print(f"The number of used centroids is ({K}/{self.n}) using sigma:({np.max(self.sigma)}).")
const_sigma = np.array([self.sigma, ] * K)
self.__plot_sample(self.X)
self.__plot_gauss(mu_valid, const_sigma, self.X, pi_valid)
plt.show()
def __plot_sample(self, X):
if self.sigma.ndim >= 2:
x, y = X.T
plt.plot(x, y, "bo")
else:
bins = int(np.ceil(max(X) - min(X))) * 4
plt.hist(X, bins, fc="b")
def __plot_gauss(self, mu, sigma, X, pi):
if self.sigma.ndim >= 2:
# In the case of multidimensional normal distribution
delta = 0.025 # Sampling rate of contour.
x_min, y_min = np.floor(X.min(axis=0))
x_max, y_max = np.ceil(X.max(axis=0))
X, Y = np.meshgrid(np.arange(x_min, x_max, delta),
np.arange(y_min, y_max, delta))
grid = np.array([X.T, Y.T]).T
circle_rate = 3 # Adjust size of a drawn circle.
Z = 0
for i in np.arange(0, len(mu)):
Z += self.__gaussian(grid, mu[i], sigma[i] * circle_rate) * pi[i]
plt.contour(X, Y, Z, 2, linewidths=2, colors="g")
else:
# In the case of 1D normal distribution
grid = np.linspace(np.floor(X.min()), np.ceil(X.max()), self.n)
weight = self.n / np.sqrt(2 * np.pi)
dist = np.zeros(self.n)
for i in range(len(mu)):
dist += self.__gaussian(grid, mu[i], sigma[i]) * weight * pi[i]
plt.plot(grid, dist, "g", linewidth=2.0)
@staticmethod
def __select_valid_centroid(x, pi, threshold):
"""
1 Get the index of the most probable pi
2 Add the highest prior probability x value (centroid) to mu
3 Add the highest prior probability pi value to the ratio
4 If it is below the threshold, break
Args:
x: Data (centroid)
pi: prior probability
threshold: The threshold for judging prior probability pi as valid
Returns: Prior probability pi determined to be valid and its centroid
"""
mu_valid = np.array([])
pi_valid = np.array([])
indexs = pi.argsort()[::-1]
for row, index in enumerate(indexs):
if pi[index] < threshold:
if 0 < row:
mu_valid = np.reshape(mu_valid, (row, mu_valid.size // row))
break
else:
mu_valid = np.append(mu_valid, x[index])
pi_valid = np.append(pi_valid, pi[index])
return mu_valid, pi_valid
最後に
11章〜13章は勉強中です。
間違い等あれば、コメントにてご指摘お願いします。