今回は、Sparkの機械学習ライブラリであるMLlibを使って、Alibaba CloudのE-MapReduceでどのように実装して、機械学習を行うのかを見てみましょう。
前提
EMR-3.16.0
クラスタータイプは Hadoop
ハードウェア構成(Header)はecs.sn1ne.2xlargeを1台
ハードウェア構成(Worker)はecs.sn1ne.2xlargeを3台
# cat /etc/redhat-release
CentOS Linux release 7.4.1708 (Core)
# uname -r
3.10.0-693.2.2.el7.x86_64
# spark-submit --version
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.3.2
/_/
Using Scala version 2.11.8, OpenJDK 64-Bit Server VM, 1.8.0_201
Branch v2.3.2
Compiled by user root on 2018-12-04T02:09:51Z
Revision cff86aa7412f70083780075f2cf2669b8ad23741
Url git@gitlab.alibaba-inc.com:soe/emr-spark.git
Type --help for more information.
# hadoop version
Hadoop 2.7.2
Subversion http://gitlab.alibaba-inc.com/soe/emr-hadoop.git -r d2cd70f951304b8ca3d12991262e7e0d321abefc
Compiled by root on 2018-11-30T09:31Z
Compiled with protoc 2.5.0
From source with checksum 4447ed9f24dcd981df7daaadd5bafc0
This command was run using /opt/apps/ecm/service/hadoop/2.7.2-1.3.2/package/hadoop-2.7.2-1.3.2/share/hadoop/common/hadoop-common-2.7.2.jar
実装について
今回機械学習の手法の中で最も幅広く活用されている手法の一つである線形回帰を例として、身長と体重の関係を考えてみましょう。一般的に身長が増えると、体重も増えます。このように、とある体重の値が与えられた時に、それと相関関係のある値(身長)を予測するのをpysparkで実装しました。(MLlibを使ってどのようなことができるかにフォーカスさせていただきますので、特徴抽出や評価手法などの機械学習に関する詳細はこちらで割愛します。)
from __future__ import print_function
from pyspark.ml.regression import LinearRegression
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
if __name__ == "__main__":
spark = SparkSession.builder.appName("LinearRegression").getOrCreate()
inputLines = spark.sparkContext.textFile("oss://spark-mllib/input/regression.txt")
data = inputLines.map(lambda x: x.split(",")).map(lambda x: (float(x[0]), Vectors.dense(float(x[1]))))
colNames = ["label", "features"]
df = data.toDF(colNames)
trainTest = df.randomSplit([0.7, 0.3])
trainingDF = trainTest[0]
testDF = trainTest[1]
lir = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
model = lir.fit(trainingDF)
fullPredictions = model.transform(testDF).cache()
predictions = fullPredictions.select("prediction").rdd.map(lambda x: x[0])
labels = fullPredictions.select("label").rdd.map(lambda x: x[0])
predictionAndLabel = predictions.zip(labels).collect()
for prediction in predictionAndLabel:
print(prediction)
spark.stop()
実行する
まず、ソースコード(spark-linear-regression.py)及びデータセット(regression.txt)をOSSへアップロードします。
次は、下記のコマンドでソースコードを実行します。左側の予測値と右側のラベル値も表示されるようになりました。
spark-submit oss://spark-mllib/input/spark-linear-regression.py
(1.1248119166750934, 1.5)
(1.0962357553383204, 1.52)
(1.260548683024765, 1.53)
(1.110523836006707, 1.53)
(1.1533880780118664, 1.54)
(1.2176844410196055, 1.8)
(1.3248450460325043, 1.82)
(1.3462771670350842, 1.86)
(1.3462771670350842, 1.86)
(1.231972521687992, 1.93)
(1.5105900947215287, 1.95)
(1.4820139333847557, 1.98)
(1.3319890863666977, 2.0)
(1.4963020140531422, 2.09)
(1.8034957484234517, 2.67)
最後
いかがでしたでしょうか。MapReduceで機械学習を実装する雰囲気をつかんでいただけましたでしょうか。今後とも、Spark MLlibの実際の利用例を皆さんに紹介していきたいと思います。