KMEANSの初期化処理とは
KMEANSは最初に適当な初期位置を決めて推定と中心値の計算を収束するまで繰り返します、初期位置が優秀なほうが精度は上がり収束までの繰り返し回数も少なくて済みます。初期化手法にはRANDOMとk-means++の二通りがありRANDOMは名前の通りクラスタの数だけランダムにサンプルを選びそれらを初期位置とします、k-means++は最初の点はランダムに選びますが2つ目以降は距離が遠いほど確率が高くなるようにD(x)^2で確率分布を作りできるだけ遠い物をクラスタの数だけ選びます。ここの解説がとてもわかりやすいです → https://www.medi-08-data-06.work/entry/kmeans
動機
マイコンでKMEANSやる事情ができたのでKMEANS++同等で軽くてメモリを食わない初期化処理が必要
思いついた方法
① 中心値同士の全ての距離の平均値を算出
② サンプルの所属クラスタを推定
③ ②の距離が①の半分(距離を直径としその半径と考える)より大きかったらそれを新しい中心値とする
④ ③によりクラスタを新しいサンプルで更新した場合は①へ、そうでない場合は②へ
対象データ全てで①~④を繰り返す
あるサンプルのクラスタ推定時の距離が中心値同士の距離平均より大きいということはそのサンプルは新しいクラスタで分布は今の中心値よりももっと広いに違いないので中心値同士の距離平均が広がりきるまで広げてみようという発想です。わかりづらいですが自分も上手く説明できませんのでソース見てください。
iris、seedsによる1000回試験結果
iris
処理 | イテレーション平均 | イテレーション分散 | 正解平均 | 正解分散 |
---|---|---|---|---|
random | 6.418 | 2.174 | 0.763 | 0.1268 |
k-means++ | 5.38 | 1.63 | 0.804 | 0.09 |
新手法 | 6.104 | 2.088 | 0.801 | 0.09 |
seeds
処理 | イテレーション平均 | イテレーション分散 | 正解平均 | 正解分散 |
---|---|---|---|---|
random | 9.109 | 3.237 | 0.921 | 0.0049 |
k-means++ | 7.096 | 2.4284 | 0.921 | 0.0051 |
新手法 | 7.368 | 2.56 | 0.921 | 0.0046 |
k-means++には及びませんでしたがランダムよりは明確に良い結果が出ていると思います、イテレーション回数が少なく精度も出ていますし分散も少ないので結果のばらつきも少ないようです。もう少し量があって8クラスタくらいのサンプルがあると良いのですが都合良い物がなかったのでirisとseedsのみの試験としました。
ちなみに実際仕事で使った所当試験よりも高い精度とイテレーション回数の削減が得られたためサンプルによっての相性が激しい手法のようです。
以下ソースコード載せておきます。
初期化処理のソースコード
def Kmeans_Predict(means, x):
distances = []
for m in means:
distances.append(np.linalg.norm(m - x))
predict = np.argmin(distances)
return distances[predict], predict
# 中心値同士の平均距離を計算
def KmeansInit_CalcRadiusAverageDistance(means):
length = len(means)
avrDistance = 0
cnt = 0
for i in range(length):
for j in range(i):
if j == i: continue
avrDistance += np.linalg.norm(means[i] - means[j])
cnt += 1
return (avrDistance / cnt/ 2)
def KmeansInit_FarawayCentroids(n_clusters, x):
means = np.zeros((n_clusters, x.shape[1]))
distanceThreshold = 0
for cnt in range(1):
for ix in x:
distance, predict = Kmeans_Predict(means, ix)
if distance > distanceThreshold:
# 中心値同士の平均距離より大きいのならそのサンプルは新しいクラスタということにする
means[predict] = ix
distanceThreshold = KmeansInit_CalcRadiusAverageDistance(means)
else:
# 中心値同士の平均距離と同じかそれ以下なら妥当な位置なので中心値の更新はしない
pass
return means
試験ソースコード全体
import math
import numpy as np
import sklearn.cluster
import sklearn.preprocessing
import sys
def Kmeans_Predict(means, x):
distances = []
for m in means:
distances.append(np.linalg.norm(m - x))
predict = np.argmin(distances)
return distances[predict], predict
def KmeansInit_CalcRadiusAverageDistance(means):
length = len(means)
avrDistance = 0
cnt = 0
for i in range(length):
for j in range(i):
if j == i: continue
avrDistance += np.linalg.norm(means[i] - means[j])
cnt += 1
return (avrDistance / cnt/ 2)
def KmeansInit_FarawayCentroids(n_clusters, x):
means = np.zeros((n_clusters, x.shape[1]))
distanceThreshold = 0
for cnt in range(1):
for ix in x:
distance, predict = Kmeans_Predict(means, ix)
if distance > distanceThreshold:
means[predict] = ix
distanceThreshold = KmeansInit_CalcRadiusAverageDistance(means)
return means
def loadIris():
data = np.loadtxt("./iris_dataset.txt", delimiter="\t", dtype=str)
length = len(data)
x = data[:,0:4].astype(np.float)
names = data[:,4]
nameList = np.unique(data[:,4])
y = np.zeros(length)
for i, name in enumerate(nameList):
y[names == name] = i
return x, y
def loadSeeds():
data = np.loadtxt("./seeds_dataset.txt", delimiter="\t", dtype=str)
length = len(data)
x = data[:,0:7].astype(np.float)
y = data[:,7].astype(float)
return x, y
def KmeansModel(init, n_clusters):
return sklearn.cluster.KMeans(
n_clusters=n_clusters,
init=init,
n_init=1,
max_iter=100,
tol=1e-5,
verbose=3
)
def CalcAccuracy(y, predicts):
answers = np.unique(y)
clusters = np.sort(np.unique(predicts))
accuracy = []
ignoreCluster = []
for ans in answers:
pred = predicts[y == ans]
total = []
totalClusters = []
for c in clusters:
if c in ignoreCluster:
continue
total.append(np.sum(pred == c))
totalClusters.append(c)
maxIdx = np.argmax(total)
ignoreCluster.append(totalClusters[maxIdx])
acc = total[maxIdx] / len(pred)
accuracy.append(acc)
return accuracy
def KmeansTestSub(init, n_clusters, x, y):
model = KmeansModel(init, n_clusters)
model.fit(x)
predicts = model.predict(x)
accuracy = CalcAccuracy(y, predicts)
return [model.n_iter_, np.mean(accuracy)] + list(accuracy)
def shuffleData(x, y):
idxs = np.arange(len(x))
np.random.shuffle(idxs)
x, y = x[idxs], y[idxs]
return x, y
def KmeansTest(dataset, n_clusters, prefix="", test_count=1000):
x, y = dataset
scaler = sklearn.preprocessing.StandardScaler()
scaler.fit(x)
x = scaler.transform(x)
# farway
report = []
for i in range(test_count):
x, y = shuffleData(x, y)
init = KmeansInit_FarawayCentroids(n_clusters, x)
rep = KmeansTestSub(init, n_clusters, x, y)
report.append(rep)
report = np.vstack([report, np.mean(report, axis=0)])
report = np.vstack([report, np.std(report, axis=0)])
np.savetxt("./{}report_farway.txt".format(prefix), report, delimiter="\t")
# random
report = []
for i in range(test_count):
rep = KmeansTestSub("random", n_clusters, x, y)
report.append(rep)
report = np.vstack([report, np.mean(report, axis=0)])
report = np.vstack([report, np.std(report, axis=0)])
np.savetxt("./{}report_random.txt".format(prefix), report, delimiter="\t")
# k-means++
report = []
for i in range(test_count):
rep = KmeansTestSub("k-means++", n_clusters, x, y)
report.append(rep)
report = np.vstack([report, np.mean(report, axis=0)])
report = np.vstack([report, np.std(report, axis=0)])
np.savetxt("./{}report_kmeansPP.txt".format(prefix), report, delimiter="\t")
def run():
KmeansTest(loadIris(), prefix="iris_", n_clusters=3)
KmeansTest(loadSeeds(), prefix="seeds_", n_clusters=3)
if __name__ == "__main__":
run()
# python kmeans_test.py
以上です
以上です