0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

PySparkでK-meansクラスタリングを実行する

Posted at

はじめに

Spark(分散処理基盤)に関連する話題がココ最近多くなってきたので、自宅PCでSparkを使ってみました。
分散処理と言っても、自宅環境で何台もPCがあるわけもないので、、、本稿ではSpark入りコンテナを使った例をご紹介。

本稿で紹介すること

  • 環境の各種情報
  • Spark入りDockerコンテナの起動
  • K-meansクラスタリングの実行

紹介するPythonコードはリファクタリング未済のため、PEP8には則っていません!

本稿で紹介しないこと

  • WSL2のインストール
  • DockerCEのインストール
  • JupyterHubのインストール&環境設定
  • シルエット係数(シルエット分析)

環境の各種情報

筆者は、Windows 11のホスト上でWSL2/Ubuntuを起動しています。

  • Ubuntu 20.04.6 LTS
  • Docker CE 23.0.1

image.png

更に筆者は、WSL2/Ubuntuのホスト上でSpark入りDockerコンテナを起動しています。

  • Ubuntu 22.04.1 LTS
  • Python 3.8.13
  • pip 22.2.2
  • pyspark 3.3.0
  • OpenJDK 17.0.4
  • Spark 3.3.0

コンテナイメージはコチラ。

image.png

Spark入りDockerコンテナの起動

JupyterHubからSpark入りDockerコンテナを起動しました。
が、以下コマンドを実行してDockerコンテナを起動することも当然可能です。

$ docker run -d -it --name spark -p 8888:8888 jupyter/pyspark-notebook:python-3.8

image.png

K-meansクラスタリングの実行

Spark公式で公開されている例1を利用して実行しました。
ざっくり言うと、クラスタリング結果を可視化する部分を追加しました。

以下、Spark公式で公開されている例です。
ポイントは、pyspark.ml.clustering.KMeansを利用することです。

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Loads data.
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

# Make predictions
predictions = model.transform(dataset)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Inputデータの準備

お馴染み、Iris(アヤメ)の品種分類データ2を使いました。

ワインや乳がんなど、分類(判別)問題に使えるデータならOKです。

# アヤメの品種データ(Iris plants dataset)の読み込み
df_iris = load_iris(as_frame=True)

# 説明変数を取得
X = pd.DataFrame(df_iris.data, columns=df_iris.feature_names)
# 目的変数を取得
y = pd.Series(df_iris.target, name='target')

# 説明変数と目的変数を結合して、1つのDataFrameにする
pd_df_iris = pd.merge(X, y, how="left", left_index=True, right_index=True)

spark = SparkSession.builder \
    .appName("kmeans-spark") \
    .getOrCreate()

# pysparkDataFrameに変換
spark_df_iris = spark.createDataFrame(pd_df_iris)

assemble = VectorAssembler(inputCols=['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)', 'petal width (cm)'], outputCol='iris_features')
assembled_data = assemble.transform(spark_df_iris)

シルエット係数の確認

Iris(アヤメ)は3つに分類されることは自明ですが、、、
クラスタ数を2~10と変化させて、シルエット係数3がどう変化するか、特にクラスタ数が3のときいかほどかを確認しました。
(シルエット係数で何を測るか、どんな値域をとるのが良いのか、などは他有志の記事に委ねます。)

silhouette_scores = []
evaluator = ClusteringEvaluator(featuresCol='iris_features', metricName='silhouette', distanceMeasure='squaredEuclidean')

for K in range(2, 11):
    # Trains a k-means model.
    kmeans = KMeans(featuresCol='iris_features', k=K, seed=1)
    model = kmeans.fit(assembled_data)
    # Make predictions
    predictions = model.transform(assembled_data)
    # Evaluate clustering by computing Silhouette score
    evaluation_score = evaluator.evaluate(predictions)
    silhouette_scores.append(evaluation_score)

fig, ax = plt.subplots(1, 1, figsize=(10,8))
ax.plot(range(2,11), silhouette_scores)
ax.set_xlabel('Number of Clusters')
ax.set_ylabel('Silhouette Score')

K-meansクラスタリングの実行

本題、クラスタ数3(K=3)として、K-meansクラスタリングを実行しました。

# Trains a k-means model.
kmeans = KMeans(featuresCol='iris_features', k=3, seed=1)
model = kmeans.fit(assembled_data)
# Make predictions
predictions = model.transform(assembled_data)

実行結果の確認

Iris(アヤメ)の品種分類データは各クラスに50サンプルが分類されるデータです。
各クラスのサンプル数、改めてのシルエット係数、そして重心(セントロイド)を確認しました。
(乱数Seedを変えて実行すれば結果はまた違うかもしれませんが、まぁこんなものかなと思いました。)

# 各クラスタのサンプル数
predictions.groupBy("prediction").count().show()

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator(featuresCol='iris_features', metricName='silhouette', distanceMeasure='squaredEuclidean')
silhouette = evaluator.evaluate(predictions)
print("K={}".format(3))
print("シルエット係数:{}" .format(str(silhouette)))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

更に、PCAをかけて2次元空間に描画しました。
正解と比べると、緑のクラスタが勢力を伸ばしているようでした。

正解(target)

target_scatter.png

予測(prediction)

predictions_scatter.png

Notebook(Pythonコード)

GitHubで公開しています。このタイミングですが、参考サイトの方々に感謝申し上げます。

まとめ

完全に手探りでしたが、Sparkを使ってK-meansクラスタリングを実行する方法を紹介しました。
小規模データならばSparkである必要性やありがたみを感じれませんが、大規模データならばその限りではないと思いました。
今後、PySparkでword2vecの実行他に挑戦しようかと思います。

  1. Clustering | https://spark.apache.org/docs/3.3.0/ml-clustering.html#k-means

  2. sklearn.datasets.load_iris | https://scikit-learn.org/stable/modules/generated/sklearn.datasets.load_iris.html

  3. シルエット分析とは | https://hkawabata.github.io/technical-note/note/ML/Evaluation/silhouette-analysis.html

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?