0
2

More than 3 years have passed since last update.

ランダムよりはマシな精度で高速なKMEANS初期化処理を作りました

Last updated at Posted at 2020-02-13

KMEANSの初期化処理とは

KMEANSは最初に適当な初期位置を決めて推定と中心値の計算を収束するまで繰り返します、初期位置が優秀なほうが精度は上がり収束までの繰り返し回数も少なくて済みます。初期化手法にはRANDOMとk-means++の二通りがありRANDOMは名前の通りクラスタの数だけランダムにサンプルを選びそれらを初期位置とします、k-means++は最初の点はランダムに選びますが2つ目以降は距離が遠いほど確率が高くなるようにD(x)^2で確率分布を作りできるだけ遠い物をクラスタの数だけ選びます。ここの解説がとてもわかりやすいです → https://www.medi-08-data-06.work/entry/kmeans

動機

マイコンでKMEANSやる事情ができたのでKMEANS++同等で軽くてメモリを食わない初期化処理が必要

思いついた方法

① 中心値同士の全ての距離の平均値を算出
② サンプルの所属クラスタを推定
③ ②の距離が①の半分(距離を直径としその半径と考える)より大きかったらそれを新しい中心値とする
④ ③によりクラスタを新しいサンプルで更新した場合は①へ、そうでない場合は②へ
対象データ全てで①~④を繰り返す

あるサンプルのクラスタ推定時の距離が中心値同士の距離平均より大きいということはそのサンプルは新しいクラスタで分布は今の中心値よりももっと広いに違いないので中心値同士の距離平均が広がりきるまで広げてみようという発想です。わかりづらいですが自分も上手く説明できませんのでソース見てください。

iris、seedsによる1000回試験結果

iris

処理 イテレーション平均 イテレーション分散 正解平均 正解分散
random 6.418 2.174 0.763 0.1268
k-means++ 5.38 1.63 0.804 0.09
新手法 6.104 2.088 0.801 0.09

seeds

処理 イテレーション平均 イテレーション分散 正解平均 正解分散
random 9.109 3.237 0.921 0.0049
k-means++ 7.096 2.4284 0.921 0.0051
新手法 7.368 2.56 0.921 0.0046

k-means++には及びませんでしたがランダムよりは明確に良い結果が出ていると思います、イテレーション回数が少なく精度も出ていますし分散も少ないので結果のばらつきも少ないようです。もう少し量があって8クラスタくらいのサンプルがあると良いのですが都合良い物がなかったのでirisとseedsのみの試験としました。
ちなみに実際仕事で使った所当試験よりも高い精度とイテレーション回数の削減が得られたためサンプルによっての相性が激しい手法のようです。
以下ソースコード載せておきます。

初期化処理のソースコード

def Kmeans_Predict(means, x):
    distances = []
    for m in means:
        distances.append(np.linalg.norm(m - x))
    predict = np.argmin(distances)
    return distances[predict], predict

# 中心値同士の平均距離を計算
def KmeansInit_CalcRadiusAverageDistance(means):
    length = len(means)
    avrDistance = 0
    cnt = 0
    for i in range(length):
        for j in range(i):
            if j == i: continue
            avrDistance += np.linalg.norm(means[i] - means[j])
            cnt += 1
    return (avrDistance / cnt/ 2)

def KmeansInit_FarawayCentroids(n_clusters, x):
    means = np.zeros((n_clusters, x.shape[1]))
    distanceThreshold = 0
    for cnt in range(1):
        for ix in x:
            distance, predict = Kmeans_Predict(means, ix)
            if distance > distanceThreshold:
                # 中心値同士の平均距離より大きいのならそのサンプルは新しいクラスタということにする
                means[predict] = ix
                distanceThreshold = KmeansInit_CalcRadiusAverageDistance(means)
            else:
                # 中心値同士の平均距離と同じかそれ以下なら妥当な位置なので中心値の更新はしない
                pass
    return means

試験ソースコード全体

import math
import numpy as np
import sklearn.cluster
import sklearn.preprocessing
import sys

def Kmeans_Predict(means, x):
    distances = []
    for m in means:
        distances.append(np.linalg.norm(m - x))
    predict = np.argmin(distances)
    return distances[predict], predict

def KmeansInit_CalcRadiusAverageDistance(means):
    length = len(means)
    avrDistance = 0
    cnt = 0
    for i in range(length):
        for j in range(i):
            if j == i: continue
            avrDistance += np.linalg.norm(means[i] - means[j])
            cnt += 1
    return (avrDistance / cnt/ 2)

def KmeansInit_FarawayCentroids(n_clusters, x):
    means = np.zeros((n_clusters, x.shape[1]))
    distanceThreshold = 0
    for cnt in range(1):
        for ix in x:
            distance, predict = Kmeans_Predict(means, ix)
            if distance > distanceThreshold:
                means[predict] = ix
                distanceThreshold = KmeansInit_CalcRadiusAverageDistance(means)
    return means

def loadIris():
    data = np.loadtxt("./iris_dataset.txt", delimiter="\t", dtype=str)
    length = len(data)
    x = data[:,0:4].astype(np.float)
    names = data[:,4]
    nameList = np.unique(data[:,4])
    y = np.zeros(length)
    for i, name in enumerate(nameList):
        y[names == name] = i
    return x, y

def loadSeeds():
    data = np.loadtxt("./seeds_dataset.txt", delimiter="\t", dtype=str)
    length = len(data)
    x = data[:,0:7].astype(np.float)
    y = data[:,7].astype(float)
    return x, y


def KmeansModel(init, n_clusters):
    return sklearn.cluster.KMeans(
        n_clusters=n_clusters,
        init=init,
        n_init=1,
        max_iter=100,
        tol=1e-5,
        verbose=3
    )

def CalcAccuracy(y, predicts):
    answers = np.unique(y)
    clusters = np.sort(np.unique(predicts))
    accuracy = []
    ignoreCluster = []
    for ans in answers:
        pred = predicts[y == ans]
        total = []
        totalClusters = []
        for c in clusters:
            if c in ignoreCluster:
                continue
            total.append(np.sum(pred == c))
            totalClusters.append(c)
        maxIdx = np.argmax(total)
        ignoreCluster.append(totalClusters[maxIdx])
        acc = total[maxIdx] / len(pred)
        accuracy.append(acc)
    return accuracy

def KmeansTestSub(init, n_clusters, x, y):
    model = KmeansModel(init, n_clusters)
    model.fit(x)
    predicts = model.predict(x)
    accuracy = CalcAccuracy(y, predicts)
    return [model.n_iter_, np.mean(accuracy)] + list(accuracy)

def shuffleData(x, y):
    idxs = np.arange(len(x))
    np.random.shuffle(idxs)
    x, y = x[idxs], y[idxs]
    return x, y

def KmeansTest(dataset, n_clusters, prefix="", test_count=1000):
    x, y = dataset
    scaler = sklearn.preprocessing.StandardScaler()
    scaler.fit(x)
    x = scaler.transform(x)

    # farway
    report = []
    for i in range(test_count):
        x, y = shuffleData(x, y)
        init = KmeansInit_FarawayCentroids(n_clusters, x)
        rep = KmeansTestSub(init, n_clusters, x, y)
        report.append(rep)
    report = np.vstack([report, np.mean(report, axis=0)])
    report = np.vstack([report, np.std(report, axis=0)])
    np.savetxt("./{}report_farway.txt".format(prefix), report, delimiter="\t")

    # random
    report = []
    for i in range(test_count):
        rep = KmeansTestSub("random", n_clusters, x, y)
        report.append(rep)
    report = np.vstack([report, np.mean(report, axis=0)])
    report = np.vstack([report, np.std(report, axis=0)])
    np.savetxt("./{}report_random.txt".format(prefix), report, delimiter="\t")

    # k-means++
    report = []
    for i in range(test_count):
        rep = KmeansTestSub("k-means++", n_clusters, x, y)
        report.append(rep)
    report = np.vstack([report, np.mean(report, axis=0)])
    report = np.vstack([report, np.std(report, axis=0)])
    np.savetxt("./{}report_kmeansPP.txt".format(prefix), report, delimiter="\t")

def run():
    KmeansTest(loadIris(), prefix="iris_", n_clusters=3)
    KmeansTest(loadSeeds(), prefix="seeds_", n_clusters=3)

if __name__ == "__main__":
    run()
# python kmeans_test.py

以上です

以上です

0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2