3
4

More than 3 years have passed since last update.

続・わかりやすい パターン認識 教師なし学習入門 アルゴリズムのサンプルコードを書いてみた

Last updated at Posted at 2020-08-11

「続・わかりやすい パターン認識 教師なし学習入門」ではアルゴリズムのサンプルコードが公開されていません。そこで、自分で書いてみました。
なお、具体的に実験をした結果はGithubで公開しています。

目次

5章 教師付き学習、教師なし学習

データ作成

教師付き学習、教師なし学習を適用するためのデータを作るクラスです


class dice(object):

    def __init__(self, theta, pi):
        self.theta = theta
        self.pi = pi

    def generate_data(self, num):
        x, s = [], []
        for _ in range(num):
            s.append(self.__spots(self.pi))
            x.append(self.__spots(self.theta[s[-1]]))

        return x, s

    @staticmethod
    def __spots(pi):
        p = np.random.rand()
        for i in range(len(pi)):
            if p < pi[:i + 1].sum():
                return i

教師付き学習

教師付き学習のクラスです。

class Supervised_learning(object):
    def __init__(self):
        self.pi = None
        self.theta = None
        self.n = None

    def estimate(self, x, s):
        self.n = self.__count(x, s)
        self.pi = self.n.sum(axis=1) / self.n.sum()
        self.theta = self.n / np.tile(self.n.sum(axis=1), (self.n.shape[1], 1)).T

    @staticmethod
    def __count(x, s):
        n = np.zeros((max(s) + 1, max(x) + 1))
        for x_, s_ in zip(x, s):
            n[s_][x_] += 1
        return n

教師なし学習

教師なし学習のクラスです。


class Unsupervised_learning(object):
    def __init__(self, theta=None, pi=None, c=3, m=2):
        self.c = c
        self.m = m

        # Step1 If no initial value is specified, it will be determined at random
        if pi is None:
            pi = np.random.rand(self.c)
        if theta is None:
            rand = np.random.rand(self.c, self.m)
            theta = rand / rand.sum(axis=1).reshape(-1, 1)  # normalisation
        self.pi = pi
        self.theta = theta
        self.r = None
        self.p = None

    def estimate(self, x):
        self.r = np.array([i for i in collections.Counter(x).values()])

        # Step2
        self.__update_p()

        # termination condition
        epsilon = 1e-6
        log_p = -sys.float_info.max
        delta = None

        while delta is None or delta >= epsilon:
            # Step3
            self.__update_param()

            # Step4
            log_p_new = self.__log_likelihood()
            delta = log_p_new - log_p
            log_p = log_p_new

    def __update_p(self):
        numer = self.pi.reshape(-1, 1) * self.theta
        self.p = numer / numer.sum(axis=0)

    def __update_param(self):
        self.pi = (self.r * self.p).sum(axis=1) / self.r.sum()
        self.__update_p()
        numer = self.r * self.p
        # self.theta = (numer.T / numer.sum(axis=1)).T  # if theta is known, comment out on this line.

    def __log_likelihood(self):
        log_p = np.log((self.pi.reshape(-1, 1) * self.theta).sum(axis=0))
        return (self.r * log_p).sum(axis=0)

7章 マルコフモデル

データ作成

マルコフモデルに従ったデータを作るクラスです。


class Markov_dice(object):

    def __init__(self, A, B, rho):
        self.A = A
        self.B = B
        self.rho = rho

    def generate_spots(self, n):
        s, x = [], []
        s += self.__spots(self.rho.reshape(1, -1), 0)
        for n in range(n):
            x += self.__spots(self.B, s[-1])
            s += self.__spots(self.A, s[-1])
        return np.array(s[:-1]), np.array(x)

    def __spots(self, theta, s_t):
        p = np.random.rand()
        for i in range(theta.shape[0]):
            if p < theta[s_t, :i + 1].sum():
                break
        return [i]

マルコフモデルのパラメータ推定

マルコフモデルのパラメータを推定するクラスです


class Markov(object):
    def __init__(self):
        self.A = None
        self.B = None
        self.n = None
        self.m = None

    def estimate(self, x, s):
        self.__initialize(x, s)
        self.n, self.m = self.__count(x, s)
        self.A = self.m / self.n.sum(axis=1).reshape(-1, 1)
        self.B = self.n / self.n.sum(axis=1).reshape(-1, 1)

    def __initialize(self, x, s):
        self.A = np.zeros((max(s) + 1, max(s) + 1))
        self.B = np.zeros((max(s) + 1, max(x) + 1))

    @staticmethod
    def __count(x, s):
        n = np.zeros((max(s) + 1, max(x) + 1))
        for x_, s_ in zip(x, s):
            n[s_][x_] += 1

        m = np.zeros((max(s) + 1, max(s) + 1))
        for i in range(len(s) - 1):
            m[s[i]][s[i + 1]] += 1

        return n, m

8章 隠れマルコフモデル

データ作成

7章 マルコフモデルと同様の方法でデータを作成します

隠れマルコフモデルのパラメータ推定

隠れマルコフモデルのパラメータ推定はpythonでHMMのパラメータ推定実装で詳しく説明されています。

9章 混合正規分布

データ作成

混合正規分布に従ったデータを作成するクラスです。


class mixed_normal_dice(object):
    """mixed normal distribution model"""

    def __init__(self, K=3, mu=None, sigma=None):
        self.K = K
        # If no initial value is specified, it will be determined at random
        if mu is None:
            mu = []
            for i in range(self.K):
                mu += [(np.random.rand() - 0.5) * 10.0]  # range: -5.0 - 5.0
        if sigma is None:
            sigma = []
            for i in range(self.K):
                sigma += [np.random.rand() * 3.0]  # range: 0.0 - 3.0:

        self.mu = mu
        self.scale = sigma

    def generate_data(self, N):
        X = []
        mu_star = []
        sigma_star = []
        for i in range(self.K):
            if self.mu.ndim >= 2:
                # In the case of multidimensional normal distribution
                X = np.append(X, np.random.multivariate_normal(self.mu[i], self.scale[i], N // self.K))
                result = X.reshape(-1, self.mu.ndim), mu_star, sigma_star
            else:
                # In the case of one-dimensional normal distribution
                X = np.append(X, np.random.normal(self.mu[i], self.scale[i], N // self.K))
                result = X, mu_star, sigma_star
            mu_star = np.append(mu_star, self.mu[i])
            sigma_star = np.append(sigma_star, self.scale[i])

        return result

混合正規分布のパラメータ推定

混合正規分布に従ったデータをパラメータ推定するクラスです。(グラフを描画する関数も含めています)
pythonで混合正規分布実装を参考にしました。


class mixed_normal_distribution(object):
    """Parameter Estimation for Mixed Normal Distribution"""

    def __init__(self, K, pi=None, mu=None, sigma=None):
        self.K = K
        # If no initial value is specified, it will be determined at random
        if pi is None:
            pi = np.random.rand(self.K)
        if mu is None:
            mu = np.random.randn(K)
        if sigma is None:
            sigma = np.abs(np.random.randn(K))
        self.pi = pi
        self.mu = mu
        self.sigma = sigma

    def estimate(self, X):
        # termination condition
        epsilon = 0.000001

        # Step1 initialize gmm parameter
        Q = -sys.float_info.max
        delta = None

        # EM algorithm

        while delta is None or delta >= epsilon:
            gf = self.__gaussian(self.mu, self.sigma)
            # E step: example posterior probability of hidden variable gamma
            gamma = self.__estimate_posterior_likelihood(X, self.pi, gf)

            # M step: miximize Q function by estimating mu, sigma and pi
            self.__estimate_gmm_parameter(X, gamma)

            # calculate Q function
            Q_new = self.__calc_Q(X, gamma)
            delta = Q_new - Q
            Q = Q_new

    @staticmethod
    def __estimate_posterior_likelihood(X, pi, gf):
        l = np.zeros((X.size, pi.size))
        for (i, x) in enumerate(X):
            l[i, :] = gf(x)

        return pi * l * np.vectorize(lambda y: 1 / y)(l.sum(axis=1).reshape(-1, 1))

    @staticmethod
    def __gaussian(mu, sigma):
        def f(x):
            return np.exp(-0.5 * (x - mu) ** 2 / sigma) / np.sqrt(2 * np.pi * sigma)

        return f

    def __estimate_gmm_parameter(self, X, gamma):
        N = gamma.sum(axis=0)
        self.mu = (gamma * X.reshape((-1, 1))).sum(axis=0) / N
        self.sigma = (gamma * (X.reshape(-1, 1) - self.mu) ** 2).sum(axis=0) / N
        self.pi = N / X.size

    def __calc_Q(self, X, gamma):
        Q = (gamma * (np.log(self.pi * (2 * np.pi * self.sigma) ** (-0.5)))).sum()
        for (i, x) in enumerate(X):
            Q += (gamma[i, :] * (-0.5 * (x - self.mu) ** 2 / self.sigma)).sum()
        return Q

    def result(self, mu_star, sigma_star):
        print(u'mu*: %s, sigma*: %s' % (str(np.sort(np.around(mu_star, 3))), str(np.sort(np.around(sigma_star, 3)))))
        print(u'mu : %s, sigma : %s' % (str(np.sort(np.around(self.mu, 3))), str(np.sort(np.around(self.sigma, 3)))))

    def graph(self, X):
        n, bins, _ = plt.hist(X, 50, density=True, alpha=0.3)
        seq = np.arange(-15, 15, 0.02)
        for i in range(self.K):
            plt.plot(seq, self.__gaussian(self.mu[i], self.sigma[i])(seq), linewidth=2.0)
        plt.savefig('gmm_graph.png')
        plt.show()

10章 凸クラスタリング

データ作成

9章 混合正規分布と同様の方法でデータを作成します

凸クラスタリング法

データを凸クラスタリングするクラスです。(グラフを描画する関数も含めています)
凸クラスタリング法の実装と実験とそのコードが載っているConvex Clusteringを一部参考にしました。


class convex_clustering(object):
    def __init__(self, sigma):
        # Step1 Set the variance to an appropriate value.
        self.sigma = sigma
        self.pi = None
        self.n = None
        self.X = None

    def clustering(self, x):
        self.X = x
        # Step2 Calculating a prior f
        self.n = len(x)
        f = self.__calc_f(self.X, self.sigma, self.__gaussian)

        # Step3 Set the initial value of prior probability pi
        self.pi = np.repeat(1 / self.n, self.n)

        # termination condition
        epsilon = 1e-6
        log_p = -sys.float_info.max
        delta = None

        # Step4 Update parameters
        while delta is None or delta > epsilon:
            self.__update_pi(f)
            log_p_new = self.__log_likelihood(self.pi, f)
            delta = log_p_new - log_p
            log_p = log_p_new

    def __calc_f(self, x, sigma, gauss_func):
        """Calculating f

        Args:
            x: pattern
            sigma: variance
            gauss_func: normal distribution function

        Returns: f

        """
        f = [gauss_func(x, mu, sigma) for mu in x]
        return np.reshape(f, (self.n, self.n))

    def __update_pi(self, f):
        numer = self.pi.reshape(-1, 1) * f
        denom = numer.sum(axis=0)
        self.pi = np.sum(numer / denom, axis=1) / self.n
        # To improve the convergence efficiency, set the pi near zero to zero
        for i, pi_i in enumerate(self.pi):
            if pi_i < 1e-7:
                self.pi[i] = 0

    @staticmethod
    def __log_likelihood(pi, f):
        return np.log((pi * f.T).sum(axis=1)).sum(axis=0)

    def __gaussian(self, x, mu, sigma):
        if self.sigma.ndim >= 2:
            # In the case of multidimensional normal distribution
            result = scs.multivariate_normal.pdf(x, mu, sigma[0])
        else:
            # In the case of 1D normal distribution
            result = scs.norm.pdf(x, np.tile(mu, len(x)), np.tile(sigma, len(x) // len(sigma)))

        return result

    def graph(self):
        # Half an initial pi is used as threshold.
        threshold = 0.01
        mu_valid, pi_valid = self.__select_valid_centroid(self.X, self.pi, threshold)
        K = len(mu_valid)
        print(f"The number of used centroids is ({K}/{self.n}) using sigma:({np.max(self.sigma)}).")
        const_sigma = np.array([self.sigma, ] * K)

        self.__plot_sample(self.X)
        self.__plot_gauss(mu_valid, const_sigma, self.X, pi_valid)
        plt.show()

    def __plot_sample(self, X):
        if self.sigma.ndim >= 2:
            x, y = X.T
            plt.plot(x, y, "bo")
        else:
            bins = int(np.ceil(max(X) - min(X))) * 4
            plt.hist(X, bins, fc="b")

    def __plot_gauss(self, mu, sigma, X, pi):
        if self.sigma.ndim >= 2:
            # In the case of multidimensional normal distribution
            delta = 0.025  # Sampling rate of contour.
            x_min, y_min = np.floor(X.min(axis=0))
            x_max, y_max = np.ceil(X.max(axis=0))
            X, Y = np.meshgrid(np.arange(x_min, x_max, delta),
                               np.arange(y_min, y_max, delta))
            grid = np.array([X.T, Y.T]).T

            circle_rate = 3  # Adjust size of a drawn circle.
            Z = 0
            for i in np.arange(0, len(mu)):
                Z += self.__gaussian(grid, mu[i], sigma[i] * circle_rate) * pi[i]
            plt.contour(X, Y, Z, 2, linewidths=2, colors="g")
        else:
            # In the case of 1D normal distribution
            grid = np.linspace(np.floor(X.min()), np.ceil(X.max()), self.n)
            weight = self.n / np.sqrt(2 * np.pi)
            dist = np.zeros(self.n)
            for i in range(len(mu)):
                dist += self.__gaussian(grid, mu[i], sigma[i]) * weight * pi[i]
            plt.plot(grid, dist, "g", linewidth=2.0)

    @staticmethod
    def __select_valid_centroid(x, pi, threshold):
        """
        1 Get the index of the most probable pi
        2 Add the highest prior probability x value (centroid) to mu
        3 Add the highest prior probability pi value to the ratio
        4 If it is below the threshold, break

        Args:
            x: Data (centroid)
            pi: prior probability
            threshold: The threshold for judging prior probability pi as valid

        Returns: Prior probability pi determined to be valid and its centroid

        """
        mu_valid = np.array([])
        pi_valid = np.array([])
        indexs = pi.argsort()[::-1]
        for row, index in enumerate(indexs):
            if pi[index] < threshold:
                if 0 < row:
                    mu_valid = np.reshape(mu_valid, (row, mu_valid.size // row))
                break
            else:
                mu_valid = np.append(mu_valid, x[index])
                pi_valid = np.append(pi_valid, pi[index])
        return mu_valid, pi_valid

最後に

11章〜13章は勉強中です。
間違い等あれば、コメントにてご指摘お願いします。

3
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
4