はじめに
機械学習のリサーチペーパーを読み、アルゴリズムをコンピューター言語で実装する力がほしいと思いこのシリーズを始めました。高度の数学は必要ありませんが、理解できていれば、データ構造、動作理解、更にはトラブルシューティングに役に立つはずです。
今回はK-Means法(K-Means Clustering)の実装に関する要点を見ていきます。重要だと思った数式とそのアルゴリズムを実装し、今後の忘備録とします。
K-Means法を実装するためには、数学の基本的な概念—例えば、クラスタ内平方和、ユークリッド距離の計算、平均(重心)の求め方、そして反復的な更新と収束の考え方—が非常に役立ちます。
ここで取り上げたいのは以下になります。
- 初期化
- 距離計算
- 割り当て
- 更新
- 収束判定
- クラスタ内平方和(Within-Cluster Sum of Squares)
- ロイド K-means アルゴリズム ( Lloyd's K-means Algorithm)
K-Means法とは
K-Means法は、与えられたデータセットをあらかじめ定めた個数(K個)のクラスタに分割するための代表的な教師なし学習 のアルゴリズムです。各クラスタ内のデータ点同士ができるだけ似ており、クラスタ間の差異が大きくなるようにグループ分けを行います。
K-meansアルゴリズムは、K個のセントロイドを初期設定し、各データ点を最も近いセントロイドに割り当てた後、各クラスタに属するデータ点の平均を計算してセントロイドを更新します。このプロセスを、データ点の割り当てが変わらなくなるまで繰り返し、最終的なクラスタ分割を実現します。
概要と計算については、YouTubeで「K-means clustering」などで検索すると多数の解説が見つかります。例:
- 目で見てわかる k-meansクラスタリングの基本から限界まで
- 教師なし学習(クラスタリング)のk-meansをわかりやすく説明【機械学習入門26】
- k-means clustering - explained
数学的基礎
クラスタ内平方和(Within-Cluster Sum of Squares, WCSS)
K-Means法の数式 WCSS は以下になります。
\text{WCSS} = \sum_{i=1}^{n} \min_{j \in \{0, \dots, K-1\}} \| x_i - \mu_j \|^2
- $x_i$:データ点(例:(1, 2))。全部で $ n $ 個
- $\mu_j $:クラスタ $ j $ のセントロイド(例:$ (1, 1) $)クラスタ数は $ K $
- $ | x_i - \mu_j |^2 $:データ点とセントロイドの距離の2乗
- $ \min_{j \in {0, \dots, K-1}} $:各データ点について、すべてのセントロイドとの距離から一番小さいものを選ぶ。
- $ \sum_{i=1}^{n} $:すべてのデータ点の最小距離を合計
この式は、クラスタ内平方和(Within-Cluster Sum of Squares, WCSS)と呼ばれます。各データ点に対して、すべてのクラスタのセントロイドとの二乗距離を計算し、その中で最も小さい値、すなわちデータ点が最も近いセントロイドとの二乗距離を得たものを全データ点で合計することで算出されます。
WCSSによって全体のクラスタリングの良さ(凝集度)が定量的に評価されるので、K-meansの目標はWCSSを最小にするクラスタを探すことになります。[参考] https://scikit-learn.org/stable/modules/clustering.html#k-means
定量的に評価するため、WCSS数式のアウトプットは1つのスカラー値になります。
ユークリッド距離の二乗
上記の数式にある $ || x_i - \mu_j ||^2 $ は、ユークリッド距離の2乗、すなわちデータ点 $ x_i $ とセントロイド $ \mu_j $ との間の距離の2乗を表します。これは L2 ノルム (L2 Norm) とも呼ばれ、次の数式で計算されます。
\|x_i - \mu_j\|^2 = \sum_{m=1}^{d} (x_{im} - \mu_{jm})^2
まず、 データ点 $ x_i $ とセントロイド $ \mu_j $ の各特徴(X軸やY軸の次元)を比較します。そして、各次元 において、$ x_{im} $(データ点の値)と $ \mu_{jm} $(セントロイドの値)の差を2乗します。つまり、
(x_{im} - \mu_{jm})^2
更に、すべての次元で計算した2乗差を合計して、1つの距離の2乗を得ます。すなわち、
\sum_{m=1}^{d} (x_{im} - \mu_{jm})^2
この計算を、すべてのデータ点 $ x_i $ ($i = 1, 2, \dots, n$) とすべてのセントロイド $ \mu_j $ ($j = 0, 1, \dots, K-1$) の各ペアに対して行い、結果を距離行列 ( S ) に格納します。つまり、
S[i,j] = \| x_i - \mu_j \|^2
もしデータ点が $ m $ 個($ i = 1, 2, \dots, m $)とセントロイドが $ K $ 個($ j = 0, 1, \dots, K-1 $)ある場合、 全てのペアは $ m \times K $ 組となります。
参考記事は https://qiita.com/kz-halfmoon/items/db337fa2077fe3b5d53b が良いと思います。
実装
k-means法によって得られるクラスタリング結果は、初期化に大きく依存する局所解 (local minimum) となります。 なお、グローバル最適解(全データ点に対して最も良いクラスタリング結果)の算出はNP-hardであり、実用的なアルゴリズムとしてはロイドアルゴリズムなどを利用して局所最適解に収束させるのが一般的です。
今回の実装範囲は、目次にもありますが、以下を含みます。順番に実装するとK-Means法をほとんどカバーしています。
1. 初期化(データセット取得と初期セントロイド値の設定)
ループ(収束するまで)
2. 距離計算
3. 割り当て
4. 更新
5. 収束判定
6. WCSS計算
それではよろしくお願いします。
1. 初期化
初期のセントロイドについては、基本的な K-means の場合、特定の「数式」で初期値を決めるというよりは、単純にデータセットからランダムに選ぶ方法が一般的です。初期値は
\mu_i^{(0)} = x_{r_i}, \quad r_i \in \{1, 2, \dots, m\},
のように、データセットの中からランダムに選んだ 𝑥 の値(観測値)と考えることができます。
データセットからランダムに選ぶコードをPythonで実装してみましょう。
最初のデータセットはどのようなもになるでしょう?
ここでは、5個のデータ点を含むデータセットで、2次元のデータ(X軸とY軸)を考えてみましょう。データセットXは以下になります。
X = np.array([
[1, 2],
[1, 4],
[1, 0],
[10, 2],
[10, 4]
])
あと、クラスタの数 K も必要になります。
K=2
この2つのインプットを利用し、Xの行列からK個の"ランダム"に選ばれたデータ点を取り出します。アウトプットのイメージは以下のようになり、ランダムに0と3の行を取り出しています。
# X[0,3]
[[1, 2],
[10, 2]]
以下実装例です。
import numpy as np
def init_centers(X: np.ndarray, k: int) -> np.ndarray:
"""
Randomly samples k observations from X as centers.
Parameters:
-----------
X : np.ndarray
A 2-D numpy array with shape (n, d), where n is the number of observations and d is the number of features.
k : int
The number of centers (clusters) to select.
Returns:
--------
np.ndarray
A numpy array containing k rows selected from X, with shape (k, d).
"""
# Determine the number of observations (n) from X.
n = X.shape[0]
# Randomly select k unique indices from the n observations.
indices = np.random.choice(n, size=k, replace=False)
# Return the selected observations as centers.
return X[indices]
いかがでしょうか?案外簡単に実装できました。
2. 距離計算
つぎに距離計算を実装してみたいと思います。
数式はこんな形でした。
\|x_i - \mu_j\|^2 = \sum_{m=1}^d (x_{im} - \mu_{jm})^2
やりたいことは、Xのデータセットから、各データポイントと現在のセントロイドのポイントの要素をそれぞれ取り出し、差異を計算し、二乗して、総和を取ることでした。各データポイントの距離が計算されます。
実装する前に手動で計算し、内容をより深く理解しましょう
データセット X とセントロイドのcentersには以下のデータが格納されていると仮定します。
# X
[[1, 2],
[1, 4],
[1, 0],
[10, 2],
[10, 4]]
# centers
[[1, 2],
[10, 2]]
では、計算してみましょう。
$ i $が0の場合、X[0] = [1,2]になります。その時の距離は
- center[0] = [1,2] との距離:$ (1 - 1)^2 + (2 - 2)^2 = 0$
- center[1] = [10,2]との距離:$ (1 - 10)^2 + (2 -2)^2 = 81$
$ i $が1の場合、X[1] = [1,4]になります。その時の距離は
- center[0] = [1,2]との距離:$ (1 - 1)^2 + (4 -2)^2 = 4$
- center[1] = [10,2]との距離:$ (1 - 10)^2 + (4 -2)^2 = 85$
等々、最終的には以下になるはずです。
S = [
[0 81]
[4 85]
[4 85]
[81 0]
[85 4]
]
ユークリッド距離の2乗を計算するコードをPythonで実装してみましょう。
簡単に実装するには、iとjの2重ループで、Xの行列とcenterのベクトルの要素が取れます。その要素に対して、引き算し、二乗したものの和を取ればいいはずです。S[i, j] を定義し、新し計算された距離がセントロイド分格納されます。center[0]とで計算した距離はS[i,0] に入り、center[1]とで計算した距離はS[i,1]に入ります。np.sum関数でデータ点の各要素と各セントロイドの要素を引き算、2乗、そして総和を取っています。
import numpy as np
def compute_d2(X: np.ndarray, centers: np.ndarray) -> np.ndarray:
"""
Computes the squared Euclidean distance between each observation in X and each cluster center.
Parameters
----------
X : np.ndarray
A 2-D numpy array of shape (m, d) where each row is an observation and d is the number of features.
centers : np.ndarray
A 2-D numpy array of shape (k, d) where each row represents a cluster center.
Returns
-------
np.ndarray
A 2-D numpy array S of shape (m, k), where S[i, j] is the squared Euclidean distance between observation X[i] and center centers[j].
"""
# Determine the number of observations (m) and the number of clusters (k)
m = X.shape[0]
k = centers.shape[0]
# Initialize the distance matrix S with zeros, shape (m, k)
S = np.zeros((m, k))
# Loop through each observation and each center to compute the squared distance
for i in range(m):
for j in range(k):
# Compute the squared Euclidean distance between observation i and center j
# by summing the squared differences of each corresponding feature.
S[i, j] = np.sum((X[i] - centers[j]) ** 2)
return S
2重ループというのはあまり演算的には美しくないですが、アルゴリズムを理解する場合には、許容できる実装ではないでしょうか。2重ループを利用しない実装は追記にあるので、そちらを参考にしてください。
3. 割り当て
各データ点に、すでに分かってる距離の中で一番小さいクラスタを割り当てます。
まず数式は、以下になります。
y_i = \arg\min_{j \in \{0,\dots,k-1\}} s_{ij}
各データ点 $x_i$ に対し、すでに計算済みの距離 $ s_{ij} $(クラスタ $ j = 0, \dots, K-1 $)の中で最小のものを選び、そのクラスタ番号を$ y_i $ を割り当てます。
ユークリッド距離の2乗の計算済みデータが以下と仮定すると
S = [
[0 81]
[4 85]
[4 85]
[81 0]
[85 4]
]
- 行:データ点 $x_i, i=1,…,5$
- 列:クラスタ $j=0,1$
上記では、各データ点の距離を比べて、一番小さいクラスタの番号を付けます。例えば、データ点が$x_1$の場合、$S_1$は[0 81]であり、比較(0 <81)すると、クラスタ0の方が小さいので、$y_i$に0を割り当てます。
従って、各$S_i$のクラスタ割り当て$ y_i $は以下になります。
y_i =[0,0,0,1,1]
割り当てを計算するコードをPythonで実装してみましょう。
def assign_cluster_labels(S: np.ndarray) -> np.ndarray:
# For each observation (each row of S), find the index of the smallest distance.
# np.argmin with axis=1 returns a 1-D array with the index of the minimum value in each row.
return np.argmin(S, axis=1)
ここは、np.argmin関数にたどり着ければ、簡単に実装できます。
4. 更新
データ点がクラスタに分かれたら、そのクラスタのデータを使って、セントロイドの値を新しくします。
数式を示します。
\mu_j \equiv \frac{1}{|C_j|} \sum_{x \in C_j} x
- $ \frac{1}{|C_j|} $ : クラスタサイズの逆数(平均を計算する為)
- $ \sum_{x \in C_j} x $ : クラスタ $ C_j $ に属する全データ点の総和
各クラスタに割り当てられたデータ点の総和の平均を取っています。
ではまずは手動で計算してみましょう。データは以下を利用します。
# データ点
# X
[[1, 2],
[1, 4],
[1, 0],
[10, 2],
[10, 4]]
# 新しいクラスタの割り振り
y_i =[0,0,0,1,1]
- クラスタ0の場合
- ラベル0のデータポイント:
$ X[0] = [1.2] $
$ X[1] = [1,4] $
$ X[2] = [1,0] $
- ラベル0のデータポイント:
\text{center[0]} = \left[ \frac{1+1+1}{3},\ \frac{2+4+0}{3} \right] = [1,2]
- クラスタ1の場合
- -ラベル1のデータポイント:
$ X[3] = [10,2] $
$ X[4] = [10,4] $
- -ラベル1のデータポイント:
\text{center[1]} = \left[ \frac{10+10}{2},\ \frac{2+4}{2} \right] = [10,3]
従って、centersは
\text{centers} = \begin{pmatrix} 1 & 2 \\ 10 & 3 \end{pmatrix}
となります。
セントロイド値の更新を計算するコードをPythonで実装してみましょう。
各クラスタ $ j $ について、X[y == j, :]
を使ってクラスタ $ j $ に属するデータ点を全て抽出し、各特徴(次元)の平均を縦方向(axis=0
)で計算して、セントロイド $\mu_j $を更新します。
import numpy as np
def update_centers(X: np.ndarray, y: np.ndarray) -> np.ndarray:
"""
Updates cluster centers by computing the mean of all observations assigned to each cluster.
Parameters
----------
X : np.ndarray
A 2-D numpy array of shape (m, d) where each row is an observation and d is the number of features.
y : np.ndarray
A 1-D numpy array of shape (m,) containing the cluster labels for each observation.
The labels are assumed to be non-negative integers.
Returns
-------
np.ndarray
A 2-D numpy array of shape (k, d) where each row is the mean (centroid) of the observations
in that cluster. Here, k is determined as max(y) + 1.
"""
# Get the number of observations (m) and features (d)
m, d = X.shape # For example, m=5, d=2
# Ensure that the length of y equals the number of observations and that labels are non-negative
assert m == len(y), "Number of labels must equal the number of observations."
assert np.min(y) >= 0, "Cluster labels must be non-negative."
# Determine the number of clusters, k, as the maximum label plus one.
k = int(np.max(y)) + 1 # For example, if max(y)==3, then k will be 4.
# Initialize the centers array with shape (k, d)
centers = np.empty((k, d))
# Loop over each cluster index j from 0 to k-1
for j in range(k):
# Calculate the mean of all observations in cluster j.
# X[y == j, :] selects all rows in X where the label is j.
centers[j, :] = np.mean(X[y == j, :], axis=0)
# Return the updated centers
return centers
ここの、ループ実装も美しくないかもしれません。ループを利用しない、うまい実装方法があれば教えてください。
5. 収束判定
前回のセントロイドのデータ $ \mu_j $と今回のセントロイドのデータ $ \mu_j $を比較して、もし同じなら収束したと判定します。
収束判定を計算するコードをPythonで実装してみましょう。
前回と今回のセントロイドのデータを取り出して、setで比較して、同じかどうかの判定結果を返します。
def has_converged(prev_centers: list, curr_centers: list) -> bool:
"""
Check if the centroids have converged by comparing previous and current centroids.
Args:
prev_centers: List of previous centroids, each represented as a tuple of floats.
curr_centers: List of current centroids, each represented as a tuple of floats.
Returns:
bool: True if centroids have not changed (converged), False otherwise.
"""
# Convert centroids to sets of tuples to ignore order and compare equality
return set([tuple(x) for x in prev_centers]) == set([tuple(x) for x in curr_centers])
以外と簡単な実装で出来ました。
6. クラスタ内平方和(Within-Cluster Sum of Squares, WCSS)
上記で説明しましたが、K-meansの結果を定量的に評価するWCSSの数式は、以下になります。
\text{WCSS} = \sum_{i=1}^{n} \min_{j \in \{0, \dots, K-1\}} \| x_i - \mu_j \|^2
各データ点から一番近いセントロイドまでの距離の2乗を全部足したものです。
実装する前に、まずは手動で計算してみましょう。データはユークリッド距離の2乗を既に計算した$S$を利用します。
# データ点
S = [
[0 81]
[4 85]
[4 85]
[81 0]
[85 4]
]
# 新しいクラスタへ割り振ったラベル
y_i =[0,0,0,1,1]
ラベルに0と1が存在するので、ラベル別に計算します。
-
クラスタ0の場合($y_i=0$のデータ点)
- データ点1(0行目):S[0,0]=0
- データ点2(1行目):S[1,0]=4
- データ点3(2行目):S[2,0]=4
- 合計= $0 + 4 + 4 = 8$
-
クラスタ1の場合($yi=1$のデータ点)
- データ点4(3行目):S[3,1]=0
- データ点5(4行目):S[4,1]=4
- 合計:$0+4=4$
クラスタ0の合計は 8、クラスタ1の合計は 4。
従って:
$$ \text{WCSS} = 8 + 4 = 12 $$
となります。
WCSSを計算するコードをPythonで実装してみましょう。
import numpy as np
from numpy.typing import NDArray
def WCSS(S: np.ndarray) -> float:
"""
Computes the Within-Cluster Sum of Squares (WCSS) from a distance matrix S.
Each row in S corresponds to an observation, and each column to a cluster. The function
computes the minimum squared distance (i.e., the squared distance to the nearest cluster center)
for each observation and then sums these minimum distances.
Parameters
----------
S : np.ndarray
A 2-D numpy array of shape (m, k), where S[i, j] is the squared distance between
observation i and cluster center j.
Returns
-------
float
The total within-cluster sum of squares.
"""
# Compute the minimum squared distance for each observation (for each row in S)
min_distances: NDArray = np.min(S, axis=1)
# Sum the minimum distances across all observations to obtain a single scalar value
total_wcss: float = np.sum(min_distances)
return total_wcss
K-Means アルゴリズム実装
最後に今までの実装を組み合わせてみます。
ef kmeans(X: np.ndarray, k: int,
starting_centers: np.ndarray = None,
max_steps: float = np.inf) -> np.ndarray:
"""
Performs Lloyd's k-means clustering algorithm on the data X.
This function iteratively assigns each observation in X to the nearest cluster center
and then updates the centers based on the mean of the assigned points. The process
repeats until the centers converge or the maximum number of steps is reached.
Parameters
----------
X : np.ndarray
Data matrix of shape (m, d) where each row is an observation and d is the number of features.
k : int
The number of clusters.
starting_centers : np.ndarray, optional
A (k, d) numpy array representing initial cluster centers. If None,
centers will be initialized using a helper function `init_centers(X, k)`.
max_steps : float, optional
The maximum number of iterations allowed.
Returns
-------
np.ndarray
A 1-D numpy array of cluster labels of length m, where each element is the assigned cluster index for that observation.
"""
# Initialize centers: use provided starting centers or randomly select k observations as centers.
if starting_centers is None:
centers = init_centers(X, k)
else:
centers = starting_centers.copy() # Use a copy to avoid modifying the original centers.
# Number of observations.
m = X.shape[0]
# Initialize labels array to hold the cluster assignment for each observation.
labels = np.zeros(m, dtype=int)
# Convergence flag and iteration counter.
converged = False
i = 1
while (not converged) and (i <= max_steps):
# Save the current centers to check for convergence later.
old_centers = centers.copy()
### ----- YOUR CODE HERE -----
# Step 1: Compute the distance matrix S between each observation and each current center.
# S will have a shape of (m, k) where S[i, j] is the squared Euclidean distance between X[i] and centers[j].
S = compute_d2(X, old_centers)
# Step 2: Assign each observation to its nearest cluster center.
# This is done by selecting, for each row in S, the index (cluster label) of the smallest distance.
labels[:] = assign_cluster_labels(S)
# Step 3: Update the cluster centers.
# For each cluster, compute the mean of the observations in X that were assigned to that cluster.
centers = update_centers(X, labels)
### ---------------------------
# Check for convergence: if the centers haven't changed significantly, then the algorithm has converged.
converged = has_converged(old_centers, centers)
# Optionally, compute and print the Within-Cluster Sum of Squares (WCSS) as a measure of clustering quality.
print("iteration", i, "WCSS =", WCSS(S))
# Increment the iteration counter.
i += 1
# Return the final cluster labels.
return labels
# Example usage:
if __name__ == "__main__":
# Suppose we have a dataset 'points' (an np.ndarray) and we wish to cluster it into k clusters.
# For instance, here is a small synthetic dataset:
points = np.array([
[1, 2],
[1, 4],
[1, 0],
[10, 2],
[10, 4]
], dtype=np.float64)
# Let k be the number of clusters. For example, k = 2:
k = 2
# Optionally, you could provide starting centers; here we select point 0 and 3 as initial centers.
starting_centers = points[[0, 3], :]
# Run k-means to obtain clustering labels.
clustering = kmeans(points, k, starting_centers=starting_centers, max_steps=20)
print("Final cluster labels:", clustering)
終わりに
いかがでしたでしょうか?もっと簡単に出来るはずですが、各ステップと対応する数式を考えるとこれぐらいの実装内容でますはいいと思います。
もっと簡素化にしたければ、関数を統合するのもいいかもしれません。
もっと高速にしたければ、ループをやめる方向もありです。
そもそも成熟したライブラリも豊富なのでそちらを利用するのもありです。
コメントやご指摘あればお願いします。
Happy Hacking!
追記
幾つかのLLMで検索すると、ユークリッド距離の二乗の実装がループ無しでもできるので掲載しておきます。
まず、ユークリッド距離の二乗の式
\|x_i - \mu_j\|^2 = \sum_{m=1}^d (x_{im} - \mu_{jm})^2
を展開すると、
\|x_i - \mu_j\|^2 = \sum_{m=1}^d x_{im}^2 - 2 \sum_{m=1}^d x_{im}\mu_{jm} + \sum_{m=1}^d \mu_{jm}^2
になり、
また、ベクトル形式では、次のように書けます。
\|x_i - \mu_j\|^2 = \|x_i\|^2 - 2\, x_i\cdot\mu_j + \|\mu_j\|^2
ここで、
- $|x_i|^2 = \sum_{m=1}^d x_{im}^2$ はデータポイント $x_i$ のノルムの2乗
- $ x_i \cdot \mu_j = \sum_{m=1}^d x_{im}\mu_{jm}$ は $x_i$ と $\mu_j$ の内積
- $ |\mu_j|^2 = \sum_{m=1}^d \mu_{jm}^2 $ はセントロイド $ \mu_j $ のノルムの2乗
になります。
この式を、全データ点 $i = 1,2,\dots, n $ と全セントロイド $j = 1,2,\dots, k$ について計算し、行列 (S) に格納するのが目標です。
def compute_d2(X: NDArray[np.float64], centers: NDArray[np.float64]) -> NDArray[np.float64]:
# Compute the squared norm of each data point: ||x_i||^2 = Σ_m x_{im}^2
# Shape: (n, 1), where n is the number of data points
# np.square(X) computes x_{im}^2 for each component, and sum along axis=1 gives the
# sum of squared components for each point. keepdims=True ensures broadcasting compatibility.
X_sq: NDArray[np.float64] = np.sum(np.square(X), axis=1, keepdims=True)
# Compute the squared norm of each centroid: ||μ_j||^2 = Σ_m μ_{jm}^2
# Shape: (k,), where k is the number of centroids
# Sum along axis=1 gives the sum of squared components for each centroid.
centers_sq: NDArray[np.float64] = np.sum(np.square(centers), axis=1)
# Compute the dot product between each data point and centroid: x_i · μ_j
# Shape: (n, k), where X is (n, d) and centers.T is (d, k)
# This gives the cross term needed for the distance expansion.
cross_term: NDArray[np.float64] = np.dot(X, centers.T)
# Compute the squared distance matrix: S[i,j] = ||x_i - μ_j||^2
# Using the expansion: ||x_i - μ_j||^2 = ||x_i||^2 + ||μ_j||^2 - 2 * (x_i · μ_j)
# X_sq (n, 1) broadcasts to (n, k), centers_sq (k,) broadcasts to (1, k) -> (n, k)
# cross_term is already (n, k), so the result S is (n, k)
S: NDArray[np.float64] = X_sq + centers_sq - 2 * cross_term
# Ensure non-negative distances (handle potential floating-point errors)
# In theory, ||x_i - μ_j||^2 >= 0, but small numerical errors might produce tiny negative values
S = np.maximum(S, 0)
return S
という実装になります。