はじめに
Spark(分散処理基盤)に関連する話題がココ最近多くなってきたので、自宅PCでSparkを使ってみました。
分散処理と言っても、自宅環境で何台もPCがあるわけもないので、、、本稿ではSpark入りコンテナを使った例をご紹介。
本稿で紹介すること
- 環境の各種情報
- Spark入りDockerコンテナの起動
- K-meansクラスタリングの実行
紹介するPythonコードはリファクタリング未済のため、PEP8には則っていません!
本稿で紹介しないこと
- WSL2のインストール
- DockerCEのインストール
- JupyterHubのインストール&環境設定
- シルエット係数(シルエット分析)
環境の各種情報
筆者は、Windows 11のホスト上でWSL2/Ubuntuを起動しています。
- Ubuntu 20.04.6 LTS
- Docker CE 23.0.1
更に筆者は、WSL2/Ubuntuのホスト上でSpark入りDockerコンテナを起動しています。
- Ubuntu 22.04.1 LTS
- Python 3.8.13
- pip 22.2.2
- pyspark 3.3.0
- OpenJDK 17.0.4
- Spark 3.3.0
コンテナイメージはコチラ。
Spark入りDockerコンテナの起動
JupyterHubからSpark入りDockerコンテナを起動しました。
が、以下コマンドを実行してDockerコンテナを起動することも当然可能です。
$ docker run -d -it --name spark -p 8888:8888 jupyter/pyspark-notebook:python-3.8
K-meansクラスタリングの実行
Spark公式で公開されている例1を利用して実行しました。
ざっくり言うと、クラスタリング結果を可視化する部分を追加しました。
以下、Spark公式で公開されている例です。
ポイントは、pyspark.ml.clustering.KMeansを利用することです。
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
# Loads data.
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)
# Make predictions
predictions = model.transform(dataset)
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))
# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
print(center)
Inputデータの準備
お馴染み、Iris(アヤメ)の品種分類データ2を使いました。
ワインや乳がんなど、分類(判別)問題に使えるデータならOKです。
# アヤメの品種データ(Iris plants dataset)の読み込み
df_iris = load_iris(as_frame=True)
# 説明変数を取得
X = pd.DataFrame(df_iris.data, columns=df_iris.feature_names)
# 目的変数を取得
y = pd.Series(df_iris.target, name='target')
# 説明変数と目的変数を結合して、1つのDataFrameにする
pd_df_iris = pd.merge(X, y, how="left", left_index=True, right_index=True)
spark = SparkSession.builder \
.appName("kmeans-spark") \
.getOrCreate()
# pysparkDataFrameに変換
spark_df_iris = spark.createDataFrame(pd_df_iris)
assemble = VectorAssembler(inputCols=['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)', 'petal width (cm)'], outputCol='iris_features')
assembled_data = assemble.transform(spark_df_iris)
シルエット係数の確認
Iris(アヤメ)は3つに分類されることは自明ですが、、、
クラスタ数を2~10と変化させて、シルエット係数3がどう変化するか、特にクラスタ数が3のときいかほどかを確認しました。
(シルエット係数で何を測るか、どんな値域をとるのが良いのか、などは他有志の記事に委ねます。)
silhouette_scores = []
evaluator = ClusteringEvaluator(featuresCol='iris_features', metricName='silhouette', distanceMeasure='squaredEuclidean')
for K in range(2, 11):
# Trains a k-means model.
kmeans = KMeans(featuresCol='iris_features', k=K, seed=1)
model = kmeans.fit(assembled_data)
# Make predictions
predictions = model.transform(assembled_data)
# Evaluate clustering by computing Silhouette score
evaluation_score = evaluator.evaluate(predictions)
silhouette_scores.append(evaluation_score)
fig, ax = plt.subplots(1, 1, figsize=(10,8))
ax.plot(range(2,11), silhouette_scores)
ax.set_xlabel('Number of Clusters')
ax.set_ylabel('Silhouette Score')
K-meansクラスタリングの実行
本題、クラスタ数3(K=3)として、K-meansクラスタリングを実行しました。
# Trains a k-means model.
kmeans = KMeans(featuresCol='iris_features', k=3, seed=1)
model = kmeans.fit(assembled_data)
# Make predictions
predictions = model.transform(assembled_data)
実行結果の確認
Iris(アヤメ)の品種分類データは各クラスに50サンプルが分類されるデータです。
各クラスのサンプル数、改めてのシルエット係数、そして重心(セントロイド)を確認しました。
(乱数Seedを変えて実行すれば結果はまた違うかもしれませんが、まぁこんなものかなと思いました。)
# 各クラスタのサンプル数
predictions.groupBy("prediction").count().show()
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator(featuresCol='iris_features', metricName='silhouette', distanceMeasure='squaredEuclidean')
silhouette = evaluator.evaluate(predictions)
print("K={}".format(3))
print("シルエット係数:{}" .format(str(silhouette)))
# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
print(center)
更に、PCAをかけて2次元空間に描画しました。
正解と比べると、緑のクラスタが勢力を伸ばしているようでした。
正解(target)
予測(prediction)
Notebook(Pythonコード)
GitHubで公開しています。このタイミングですが、参考サイトの方々に感謝申し上げます。
まとめ
完全に手探りでしたが、Sparkを使ってK-meansクラスタリングを実行する方法を紹介しました。
小規模データならばSparkである必要性やありがたみを感じれませんが、大規模データならばその限りではないと思いました。
今後、PySparkでword2vecの実行他に挑戦しようかと思います。
-
Clustering | https://spark.apache.org/docs/3.3.0/ml-clustering.html#k-means ↩
-
sklearn.datasets.load_iris | https://scikit-learn.org/stable/modules/generated/sklearn.datasets.load_iris.html ↩
-
シルエット分析とは | https://hkawabata.github.io/technical-note/note/ML/Evaluation/silhouette-analysis.html ↩