はじめに
constrained k-means[1]はクラスタサイズ1に最小(最大)の制約を加えた手法です。クラスタ数が多いときやデータの次元が大きいときにしばしば発生するクラスタサイズの不均衡を解消することができます。
次の図と表はA1データセット[2]をk-means、k-means++およびconstrained k-meansによってクラスタリングした結果です2。k-meansではデータ数が極端に少ないクラスタが存在するのに対し、constrained k-meansではデータが均等に割り当てられています。このデータセットのようにクラスタサイズの最小・最大が既知の場合、constrained k-meansは他の手法よりも精度が高い傾向があります。
Method | Inertia |
---|---|
k-means | $2.74 \times 10^{10}$ |
k-means++ | $1.49 \times 10^{10}$ |
constrained k-means | $1.22 \times 10^{10}$ |
本記事ではconstrained k-meansを解説するとともにPythonによる実装をご紹介します。
K-Means
まずはk-meansのおさらいです。
多くのクラスタリング手法に共通する目的は、$m$個のデータ$X_i$を$k$個のクラスタ$P_h$に割り当てる際、クラスタの中心$C_h$とのMSE(平均二乗誤差)を最小化することです。
$$
{\rm MSE} = \sum_{h=1}^{k} \sum_{X_i \in P_h}
\frac {||X_i-C_h||^2} {m}
$$
つまり、クラスタ内のばらつきを小さくしたいのですね。k-meansは以下のアルゴリズムに従いこの目的の達成を目指します。
初期化ステップ
k個のクラスタの初期中心をデータから一様な確率で選びます。
割り当てステップ
データを最も距離が近いクラスタに割り当てます。
$$
P_j^{(t)} = \{
X_i : ||X_i-C_j^{(t)}|| \leq ||X_i-C_{j^*}^{(t)}||
\}
$$
$$
(1 \leq j^* \leq K)
$$
更新ステップ
クラスタの中心を更新します。
$$
C_j^{(t+1)} = \frac {1} {|P_j^{(t)}|}
\sum_{X_i \in P_j^{(t)}} X_i
$$
クラスタの中心が変動しなくなるまで割り当てステップと更新ステップを繰り返します。
k-meansの問題点
k-meansはクラスタの初期中心を一様な確率で選ぶため精度がまちまちです。良い精度を得るにはクラスタリングを何度も試行しなければなりません。
K-Means++
k-means++はk-meansの初期化ステップを以下のように改良した手法です。
k-means++の初期化ステップ
- 一様な確率でデータから中心を1つ選びます。
- データを最も距離が近いクラスタに割り当てます。
- 重み${{||X_i-C_j^{(0)}||^2}}$に比例する確率分布に従ってデータから次の初期中心をランダムに選びます。
- K個の中心が揃うまでステップ2と3を繰り返します。
最も近いクラスタとの距離の二乗に比例する確率分布を導入することで、初期中心が互いに距離を保ちやすくなります。k-meansよりも早く収束し、かつ精度が高いのはこのためです。
k-meansとk-means++の欠点
k-meansとk-means++はどちらもMSEを最小化することを主たる目的としており、クラスタサイズのバランスを考慮しません。そのため、クラスタ数が多いときやデータの次元が大きいときにクラスタサイズの不均衡がしばしば発生します。
Constrained K-Means
constrained k-meansは上記のクラスタサイズの不均衡を解決します。まずはk-meansの割り当てステップを一般化しましょう。
一般化したk-meansの割り当てステップ
\begin{align}
\underset{T}{\operatorname{minimize}} & \quad
\sum_{i=1}^m \sum_{h=1}^k T_{i,h} \cdot (\frac{1}{2}||X_i - C_h||^2)\\
\operatorname{subject to} & \quad \sum_{h=1}^k T_{i,h} = 1, \quad i=1,...,m \\
& \quad T_{i,h} \geq 0, \quad i=1,...,m, \quad h=1,...,k
\end{align}
目的関数の係数こそ変われどクラスタ内のばらつきを小さくしたいという気持ちは同じです。新たに登場した$T_{i,h}$は、データ$X_i$をクラスタ$P_h$に割り当てたら1、そうでなければ0になる変数です。制約条件は、各データをそれぞれ1つのクラスタにのみ割り当てることを表します。
constrained k-meansはその名の通り、k-meansの割り当てステップに制約条件を加えた手法です。論文ではクラスタサイズの最小値を制約条件として与えていますが、この記事では実用性を加味して最大値も条件に加えています。式は次の通りです。
constrained k-meansの割り当てステップ
\begin{align}
\underset{T}{\operatorname{minimize}} & \quad
\sum_{i=1}^m \sum_{h=1}^k T_{i,h} \cdot (\frac{1}{2}||X_i - C_h||^2)\\
\operatorname{subject to} & \quad \sum_{h=1}^k T_{i,h} = 1, \quad i=1,...,m \\
& \quad \tau_{min} \leq \sum_{i=1}^m T_{i,h} \leq \tau_{max}, \quad h=1,...,k\\
& \quad T_{i,h} \geq 0, \quad i=1,...,m, \quad h=1,...,k
\end{align}
$\tau_{min}$と$\tau_{max}$はクラスタサイズの最小値と最大値を表します。
constrained k-meansの鮮やかな点はこの割り当てステップを最小コストフロー(MCF)問題に落とし込んだことにあります。MCF問題とは、供給、需要、そして容量が条件を満たしかつコストが最小となるフローネットワークの経路を見つける最適化問題です。ネットワークシンプレックスアルゴリズムによって解くことができます。
実際にconstrained k-meansの割り当てステップをフローネットワークで表現したのが次の図です。ここでは需要を「マイナスの供給」とみなしています。また、終点に人工的なノードを配置し、フローネットワークの需要と供給の総和を0に調整しています。
各データは1つのクラスタにのみ割り当てる必要があるので供給と容量がそれぞれ1になります。コストは中心点との距離の二乗です。一方、各クラスタには$\tau_{min}$個以上のデータを割り当てなければならないので需要が$\tau_{min}$になります。また、容量を$\tau_{max}-\tau_{min}$にすることでクラスタサイズを$\tau_{max}$以下に制限します。
最後に一般化した更新ステップの式を記します。
一般化した更新ステップ
C_j^{(t+1)} = \left\{
\begin{array}{ll}
\frac {\sum_{i=1}^m T_{i,h}^{(t)} X_i}{\sum_{i=1}^m T_{i,h}^{(t)}} & \operatorname{if} \quad \sum_{i=1}^m T_{i,h}^{(t)} > 0 \\
C_h^{(t)} & \operatorname{otherwise}
\end{array}
\right.
PythonによるConstrained K-Meansの実装
最後にPythonによるconstrained k-meansの実装をご紹介します。MCFの最適化にはortoolsを用いました。
Constrained K-Means
"""Constrained k-means clustering"""
# constrained k-means clustering implementation of
# Bradley, Paul S., Kristin P. Bennett, and Ayhan Demiriz. "Constrained k-means clustering." Microsoft Research, Redmond 20.0 (2000): 0.
from functools import partial
import numpy as np
from ortools.graph.python import min_cost_flow
from scipy.spatial.distance import cdist
# 初期化ステップ
def random(X, n_clusters, random_state, **kwargs):
n_samples, n_features = X.shape
indices = random_state.choice(n_samples, size=n_clusters, replace=True)
centers = X[indices]
return centers, indices
class ConstrainedKMeans:
def __init__(
self,
n_clusters, # クラスタ数
min_membership, # クラスタあたりの最小データ数
max_membership=None, # クラスタあたりの最大データ数
random_state=0, # シード値
init="random", # 初期化ステップ
n_init=10, # 初期化ステップを異なるシード値で試行する回数
max_iter=300, # 更新ステップの最大イテレーション数
tol=1e-4, # 中心点の更新に伴う変動がこの値よりも小さければクラスタリングを終了する
**kwargs,
):
self.valid(
n_clusters,
min_membership,
max_membership,
random_state,
init,
n_init,
max_iter,
tol,
)
self.n_clusters = n_clusters
self.min_membership = min_membership
self.max_membership = max_membership
self.random_state = np.random.RandomState(random_state)
if init == "random":
self.init = partial(
random,
n_clusters = self.n_clusters,
random_state = self.random_state,
**kwargs,
)
self.n_init = n_init
self.max_iter = max_iter
self.tol = tol
self.kwargs = kwargs
self.centers = None
self.labels = None
# smcfのハイパラを定義する
def get_smcf_params(self, n_samples):
# データのノード番号
X_nodes = np.arange(n_samples)
# 中心点のノード番号
cluster_nodes = np.arange(n_samples, n_samples + self.n_clusters)
# 最終的な需要を記述するために人工のノードを用意する
artificial_demand_node = np.array([n_samples + self.n_clusters])
# エッジの始点
start_nodes = np.concatenate(
[np.repeat(X_nodes, self.n_clusters), cluster_nodes]
)
# エッジの終点
end_nodes = np.concatenate(
[
np.tile(cluster_nodes, n_samples),
np.repeat(artificial_demand_node, self.n_clusters),
]
)
# エッジの容量
# クラスタあたりの最大データ数 (max_membership) を指定可能
capacities = np.concatenate(
[
np.ones(
self.n_clusters * n_samples,
),
np.full(
self.n_clusters,
n_samples - self.n_clusters * self.min_membership
if self.max_membership is None
else self.max_membership - self.min_membership,
),
]
)
# ノードの供給 (マイナスは需要)
# データの供給は1
# 中心点の需要はクラスタあたりの最小データ数 (min_membership)
supplies = np.concatenate(
[
np.ones(
n_samples,
),
-1 * np.full(self.n_clusters, self.min_membership),
np.array([-n_samples + self.n_clusters * self.min_membership]),
]
).tolist()
return start_nodes, end_nodes, capacities, supplies
# エッジのコスト(中心点との二乗ユークリッド距離)を計算する
def calc_unit_costs(self, X, centers):
dist_sq = cdist(X, centers, "sqeuclidean") # (n_samples, n_clusters)
unit_costs = np.concatenate([dist_sq.flatten(), np.zeros(self.n_clusters)])
return unit_costs, dist_sq
# 最小コストフロー問題を解き、各データが割り当てられたクラスタをマスクの形で得る
def clustering(self, start_nodes, end_nodes, capacities, supplies, unit_costs):
smcf = min_cost_flow.SimpleMinCostFlow()
all_arcs = smcf.add_arcs_with_capacity_and_unit_cost(
start_nodes, end_nodes, capacities, unit_costs
)
smcf.set_nodes_supplies(np.arange(0, len(supplies)), supplies)
status = smcf.solve()
solution_flows = smcf.flows(all_arcs)
mask = solution_flows[: -self.n_clusters].reshape(
(-1, self.n_clusters)
) # (n_samples, n_clusters)
return mask
# クラスタリングする
def fit(self, X):
self.valid_tr(X)
# Xが1-Dの場合2-Dに変換する
if X.ndim == 1:
X = X[:, np.newaxis]
n_samples, _ = X.shape
# smcfのハイパラを得る
params = self.get_smcf_params(n_samples)
# クラスタの初期中心を選ぶ
best_inertia, centers, self.init_indices = None, None, None
for i in range(self.n_init):
centers_, init_indices_ = self.init(X, **self.kwargs)
# コストを計算する
unit_costs, dist_sq = self.calc_unit_costs(X, centers_)
# 各データが割り当てられたクラスタをマスクの形で得る
mask = self.clustering(*params, unit_costs) # (n_samples, n_clusters)
inertia = np.sum(mask * dist_sq) # クラスタ内二乗ユークリッド距離の総和
if best_inertia is None or best_inertia > inertia:
best_inertia = inertia
centers = centers_
self.init_indices = init_indices_
# エッジのコスト(中心点との二乗ユークリッド距離)を計算する
unit_costs, dist_sq = self.calc_unit_costs(X, centers)
for iter_ in range(0, self.max_iter):
# 各データが所属するクラスタをマスクの形で得る
mask = self.clustering(*params, unit_costs) # (n_samples, n_clusters)
# 中心点を計算する
centers_ = np.dot(mask.T, X) / np.sum(mask, axis=0)[:, np.newaxis]
# コストを更新する
unit_costs, dist_sq = self.calc_unit_costs(X, centers_)
# 中心点の更新前後の差の平方和を計算し、tol以下なら終了する
centers_squared_diff = np.sum((centers_ - centers) ** 2)
# 中心点を更新する
centers = centers_
if centers_squared_diff <= self.tol:
break
self.inertia = np.sum(mask * dist_sq) # クラスタ内二乗ユークリッド距離の総和
self.centers = centers # 中心点
self.labels = np.argmax(mask, axis=-1) # 各データの所属クラスタのラベル
self.iter_ = iter_ # クラスタリングに要したイテレーション数
return self
# クラスタリングし、ラベルを返す
def fit_predict(self, X):
return self.fit(X).labels
# 未知のデータを既知の中心点によってクラスタリングし、ラベルを得る
def predict(self, X):
self.valid_pr(X)
# Xが1-Dの場合2-Dに変換する
if X.ndim == 1:
X = X[:, np.newaxis]
n_samples, _ = X.shape
params = self.get_smcf_params(n_samples)
unit_costs = self.calc_unit_costs(X, self.centers)
mask = self.clustering(*params, unit_costs)
labels = np.argmax(mask, axis=1)
return labels
# 初期化の引数を検証する
def valid(
self,
n_clusters,
min_membership,
max_membership,
random_state,
init,
n_init,
max_iter,
tol,
):
if not isinstance(n_clusters, int):
raise TypeError("n_clusters must be an integer")
if not isinstance(min_membership, int):
raise TypeError("min_membership must be an integer")
if not isinstance(max_membership, int) and max_membership is not None:
raise TypeError("max_membership must be an integer or None")
if not isinstance(random_state, int) and random_state is not None:
raise TypeError("random_state must be an integer")
if init not in ["random"]:
raise TypeError("init must be 'random'")
if not isinstance(n_init, int):
raise TypeError("n_init must be an integer")
if not isinstance(max_iter, int):
raise TypeError("max_iter must be an integer")
if not isinstance(tol, int) and not isinstance(tol, float):
raise TypeError("tol must be an integer or float")
if n_clusters < 2:
raise ValueError("n_clusters must be 2 or more")
if min_membership < 0:
raise ValueError("min_membership must be 0 or more")
if isinstance(max_membership, int):
if max_membership < min_membership or max_membership < 0:
raise ValueError("max_membership must be at least min_membership")
if isinstance(random_state, int):
if random_state < 0:
raise ValueError("random_state must be 0 or more")
if n_init <= 0:
raise ValueError("n_ijnit must be 1 or more")
if max_iter < 1:
raise ValueError("max_iter must be 1 or more")
if tol < 0:
raise ValueError("tol must be 0 or more")
# fitの引数を検証する
def valid_tr(self, X):
if not isinstance(X, np.ndarray):
raise TypeError("X must be numpy.ndarray")
if X.ndim < 1 or X.ndim > 2:
raise ValueError("X must be 1-D or 2-D")
if (
X.shape[0] < self.n_clusters
or X.shape[0] < self.n_clusters * self.min_membership
):
raise ValueError("X must have at least n_clusters * min_membership rows")
if self.max_membership is not None:
if X.shape[0] > self.n_clusters * self.max_membership:
raise ValueError(
"X must have at most n_clusters * max_membership rows"
)
# predictの引数を検証する
def valid_pr(self, X):
if self.centers is None or self.labels is None:
raise RuntimeError("fit method hasn't been called")
if not isinstance(X, np.ndarray):
raise TypeError("X must be numpy.ndarray")
if X.ndim < 1 or X.ndim > 2:
raise ValueError("X must be 1-D or 2-D")
if (
X.shape[0] < self.n_clusters
or X.shape[0] < self.n_clusters * self.min_membership
):
raise ValueError("X must have at least n_clusters * min_membership rows")
if self.max_membership is not None:
if X.shape[0] > self.n_clusters * self.max_membership:
raise ValueError(
"X must have at most n_clusters * max_membership rows"
)
if X.ndim == 2:
if X.shape[-1] != self.centers.shape[-1]:
raise ValueError("input shape does not match the centers shape")
データセット
"""
P. Fänti and S. Sieranoja
K-means properties on six clustering benchmark datasets
Applied Intelligence, 48 (12), 4743-4759, December 2018
"""
import urllib
import io
import zipfile
url = "https://cs.uef.fi/sipu/datasets/"
a_sets = ["a1", "a2", "a3"]
dataset = {}
# A-setsのデータをダウンロードする
for name in a_sets:
values = urllib.request.urlopen(url + name + ".txt").read().decode()
values = values.splitlines()
values = [[float(v) for v in val.split()] for val in values]
dataset[name] = {"X": np.array(values)}
# A-setsのラベルをダウンロードする
files = urllib.request.urlopen(url + "a-gt-pa.zip").read()
files = zipfile.ZipFile(io.BytesIO(files))
for name in a_sets:
values = files.read(f"{name}-ga.pa").decode()
values = values.splitlines()[4:]
values = [int(v) for v in values]
dataset[name]["labels"] = np.array(values)
実験
import seaborn as sns
from matplotlib import pyplot as plt
name = "a1"
X = dataset[name]["X"]
labels = dataset[name]["labels"]
n_clusters = np.unique(labels).size
n_samples, dims = X.shape
membership = int(n_samples / n_clusters)
fig = plt.figure(figsize = (8,4), tight_layout=True)
fig.suptitle(f"Results on {name} dataset")
ax1 = fig.add_subplot(1, 2, 1)
ax2 = fig.add_subplot(1, 2, 2)
# 正解ラベル
sns.scatterplot(
x=X[:,0], y=X[:,1], hue=labels, legend=None, palette="Set1", ax=ax1
)
ax1.tick_params(
labelbottom=False, labelleft=False, labelright=False, labeltop=False,
bottom=False, left=False, right=False, top=False
)
ax1.set_title("Ground Truth")
# constrained k-means
cluster = ConstrainedKMeans(
n_clusters, membership, membership, init="random", n_init=10, random_state=10
)
cluster.fit(X)
sns.scatterplot(
x=X[:,0], y=X[:,1], hue=cluster.labels, legend=None, palette="Set1", ax=ax2
)
ax2.tick_params(
labelbottom=False, labelleft=False, labelright=False, labeltop=False,
bottom=False, left=False, right=False, top=False
)
ax2.set_title("Constrained K-Means")
plt.show()
参考文献
- Bradley, Paul S., Kristin P. Bennett, and Ayhan Demiriz. "Constrained k-means clustering." Microsoft Research, Redmond 20.0 (2000): 0.
- https://cs.uef.fi/sipu/datasets/