LoginSignup
0
2

More than 5 years have passed since last update.

Hadoop/Spark/SparkSQL/SparkStreaming/SparkMLlib入門

Last updated at Posted at 2018-02-12

Hadoop、Spark、Spark SQL、Spark Streaming、Spark MLlibを一通り試用した。
環境はCloudera Quickstart VM (VirutalBox)。

Hadoop

HadoopでWord Countする。

今回分析対象にするテキストファイルを拾ってくる。

bash
[cloudera@quickstart ~]$ wget http://stewartonbibleschool.org/bible/text/genesis.txt

HDFSにコピー。

bash
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal genesis.txt 
[cloudera@quickstart ~]$ hadoop fs -ls
Found 1 items
-rw-r--r--   1 cloudera cloudera     207327 2018-01-28 08:32 genesis.txt

MapReduceのアプリケーションの例やWordcountの使用法を確認。

bash
[cloudera@quickstart ~]$ hadoop jar /usr/jars/hadoop-examples.jar 
An example program must be given as the first argument.
Valid program names are:
  aggregatewordcount: An Aggregate based map/reduce program that counts the words in the input files.
  aggregatewordhist: An Aggregate based map/reduce program that computes the histogram of the words in the input files.
  bbp: A map/reduce program that uses Bailey-Borwein-Plouffe to compute exact digits of Pi.
  dbcount: An example job that count the pageview counts from a database.
  distbbp: A map/reduce program that uses a BBP-type formula to compute exact bits of Pi.
  grep: A map/reduce program that counts the matches of a regex in the input.
  join: A job that effects a join over sorted, equally partitioned datasets
  multifilewc: A job that counts words from several files.
  pentomino: A map/reduce tile laying program to find solutions to pentomino problems.
  pi: A map/reduce program that estimates Pi using a quasi-Monte Carlo method.
  randomtextwriter: A map/reduce program that writes 10GB of random textual data per node.
  randomwriter: A map/reduce program that writes 10GB of random data per node.
  secondarysort: An example defining a secondary sort to the reduce.
  sort: A map/reduce program that sorts the data written by the random writer.
  sudoku: A sudoku solver.
  teragen: Generate data for the terasort
  terasort: Run the terasort
  teravalidate: Checking results of terasort
  wordcount: A map/reduce program that counts the words in the input files.
  wordmean: A map/reduce program that counts the average length of the words in the input files.
  wordmedian: A map/reduce program that counts the median length of the words in the input files.
  wordstandarddeviation: A map/reduce program that counts the standard deviation of the length of the words in the input files.
[cloudera@quickstart ~]$ hadoop jar /usr/jars/hadoop-examples.jar wordcount
Usage: wordcount <in> [<in>...] <out>

Word Countを実行。

bash
[cloudera@quickstart ~]$ hadoop jar /usr/jars/hadoop-examples.jar wordcount genesis.txt out
…()…
18/01/28 08:35:52 INFO mapreduce.Job:  map 0% reduce 0%
18/01/28 08:35:58 INFO mapreduce.Job:  map 100% reduce 0%
18/01/28 08:36:05 INFO mapreduce.Job:  map 100% reduce 100%
…()

結果のファイルの確認。

bash
[cloudera@quickstart ~]$ hadoop fs -ls
Found 2 items
-rw-r--r--   1 cloudera cloudera     207327 2018-01-28 08:32 genesis.txt
drwxr-xr-x   - cloudera cloudera          0 2018-01-28 08:36 out
[cloudera@quickstart ~]$ hadoop fs -ls out/
Found 2 items
-rw-r--r--   1 cloudera cloudera          0 2018-01-28 08:36 out/_SUCCESS
-rw-r--r--   1 cloudera cloudera      54576 2018-01-28 08:36 out/part-r-00000

ローカルにコピー。

bash
[cloudera@quickstart ~]$ hadoop fs -copyToLocal out/part-r-00000 local.txt
[cloudera@quickstart ~]$ head local.txt
(for    2
(from   1
(is 1
(out    1
(the    1
10:10:  1
10:11:  1
10:12:  1
10:13:  1
10:14:  1

Spark

SparkでWord Countする。

Wordcountの対象ファイルを確認。さっき使ったのと同じやつ。

bash
[cloudera@quickstart ~]$ hadoop fs -ls
Found 1 items
-rw-r--r--   1 cloudera cloudera    5458199 2018-01-28 02:10 genesis.txt

PySpark/Jupyter notebookを立ち上げてHDFSからRDDに読み込む。各行がRDDにおける1要素になる。

PySpark
lines = sc.textFile("hdfs:/user/cloudera/genesis.txt")

正しく読み込めてるか確認する。

PySpark
lines.count()
# >>> 1538

各行を単語毎に区切って、各単語と1をペアにしたタプルにする。
ちなみにflatMapは、[line, line, ...]を[[word, word, ...], [word, ...], ...]ではなく[word, word, word, ...]にするためのマップ関数。mapは普通のマップ関数。

PySpark
words = lines.flatMap(lambda line: line.split(" "))
tuples = words.map(lambda word: (word, 1))

キー毎にreduceする。

PySpark
counts = tuples.reduceByKey(lambda a, b: (a+b))

RDDからHDFSに保存する。RDDのパーティションをまとめて、ひとつの出力ファイルにするためにcoalesce(1)を使う。

PySpark
counts.coalesce(1).saveAsTextFile("hdfs:/user/cloudera/wordcount/outputDir")

ローカルファイルにコピー。

bash
[cloudera@quickstart ~]$ hadoop fs -copyToLocal wordcount/outputDir/part-00000 count.txt
[cloudera@quickstart ~]$ head count.txt
('', 393)
('womb.', 3)
('21:23:', 1)
('27:15:', 1)
('subtilty,', 1)
('sea.', 1)
('23:15:', 1)
('bulls,', 1)
('backward.', 1)
('doest:', 1)

SparkSQL

SparkSQLでGROUPBYやJOINなど基本的な操作をしてみる。

PySparkでPostgreSQLのテーブルを読みこむ:

PySpark
from pyspark.sql import SQLContext
sqlsc = SQLContext(sc)
df = sqlsc.read.format("jdbc")\
  .option("url", "jdbc:postgresql://localhost/cloudera?user=cloudera")\
  .option("dbtable", "gameclicks")\
  .load()

df.printSchema()
# >>> root
# >>>  |-- timestamp: timestamp (nullable = false)
# >>>  |-- clickid: integer (nullable = false)
# >>>  |-- userid: integer (nullable = false)
# >>>  |-- usersessionid: integer (nullable = false)
# >>>  |-- ishit: integer (nullable = false)
# >>>  |-- teamid: integer (nullable = false)
# >>>  |-- teamlevel: integer (nullable = false)

df.count()
# >>> 755806

df.show(6)
# >>> +--------------------+-------+------+-------------+-----+------+---------+
# >>> |           timestamp|clickid|userid|usersessionid|ishit|teamid|teamlevel|
# >>> +--------------------+-------+------+-------------+-----+------+---------+
# >>> |2016-05-26 15:06:...|    105|  1038|         5916|    0|    25|        1|
# >>> |2016-05-26 15:07:...|    154|  1099|         5898|    0|    44|        1|
# >>> |2016-05-26 15:07:...|    229|   899|         5757|    0|    71|        1|
# >>> |2016-05-26 15:07:...|    322|  2197|         5854|    0|    99|        1|
# >>> |2016-05-26 15:07:...|     22|  1362|         5739|    0|    13|        1|
# >>> |2016-05-26 15:07:...|    107|  1071|         5939|    0|    27|        1|
# >>> +--------------------+-------+------+-------------+-----+------+---------+
# >>> only showing top 6 rows

SELECT、FILTER、GroupByなどを試す:

PySpark
df.select("userid", "teamid").show(6)
# >>> +------+------+
# >>> |userid|teamid|
# >>> +------+------+
# >>> |  1038|    25|
# >>> |  1099|    44|
# >>> |   899|    71|
# >>> |  2197|    99|
# >>> |  1362|    13|
# >>> |  1071|    27|
# >>> +------+------+
# >>> only showing top 6 rows

df.filter(df["teamid"] < 20).select("userid", "teamid").show(6)
# >>> +------+------+
# >>> |userid|teamid|
# >>> +------+------+
# >>> |  1362|    13|
# >>> |  1072|    13|
# >>> |   624|     2|
# >>> |   217|    18|
# >>> |  1072|    13|
# >>> |   937|    11|
# >>> +------+------+
# >>> only showing top 6 rows

df.groupBy("teamid").count().show(6)
# >>> +------+-----+
# >>> |teamid|count|
# >>> +------+-----+
# >>> |    32| 8734|
# >>> |    35| 8817|
# >>> |    36| 8755|
# >>> |    39| 9398|
# >>> |    44| 9045|
# >>> |    51| 8911|
# >>> +------+-----+
# >>> only showing top 6 rows

平均と合計の計算:

PySpark
from pyspark.sql.functions import *
df.select(mean('ishit'), sum('ishit')).show()
# >>> +------------------+----------+
# >>> |        avg(ishit)|sum(ishit)|
# >>> +------------------+----------+
# >>> |0.1103232840173272|     83383|
# >>> +------------------+----------+

JOINを試す:

PySpark
df2 = sqlsc.read.format("jdbc")\
  .option("url", "jdbc:postgresql://localhost/cloudera?user=cloudera")\
  .option("dbtable", "adclicks")\
  .load()

df2.printSchema()
# >>> root
# >>>  |-- timestamp: timestamp (nullable = false)
# >>>  |-- txid: integer (nullable = false)
# >>>  |-- usersessionid: integer (nullable = false)
# >>>  |-- teamid: integer (nullable = false)
# >>>  |-- userid: integer (nullable = false)
# >>>  |-- adid: integer (nullable = false)
# >>>  |-- adcategory: string (nullable = false)

merge = df.join(df2, "userid")
merge.printSchema()
# >>> root
# >>>  |-- userid: integer (nullable = false)
# >>>  |-- timestamp: timestamp (nullable = false)
# >>>  |-- clickid: integer (nullable = false)
# >>>  |-- usersessionid: integer (nullable = false)
# >>>  |-- ishit: integer (nullable = false)
# >>>  |-- teamid: integer (nullable = false)
# >>>  |-- teamlevel: integer (nullable = false)
# >>>  |-- timestamp: timestamp (nullable = false)
# >>>  |-- txid: integer (nullable = false)
# >>>  |-- usersessionid: integer (nullable = false)
# >>>  |-- teamid: integer (nullable = false)
# >>>  |-- adid: integer (nullable = false)
# >>>  |-- adcategory: string (nullable = false)

merge.show(6)
# >>> +------+--------------------+-------+-------------+-----+------+---------+--------------------+-----+-------------+------+----+----------+
# >>> |userid|           timestamp|clickid|usersessionid|ishit|teamid|teamlevel|           timestamp| txid|usersessionid|teamid|adid|adcategory|
# >>> +------+--------------------+-------+-------------+-----+------+---------+--------------------+-----+-------------+------+----+----------+
# >>> |   231|2016-06-08 00:45:...| 376796|        23626|    0|   142|        4|2016-06-08 01:40:...|23669|        23626|   142|  27|     games|
# >>> |   231|2016-06-08 00:45:...| 376796|        23626|    0|   142|        4|2016-06-08 09:24:...|24122|        23626|   142|   4|     games|
# >>> |   231|2016-06-08 00:45:...| 376796|        23626|    0|   142|        4|2016-06-08 17:21:...|24659|        23626|   142|  22| computers|
# >>> |   231|2016-06-08 00:45:...| 376796|        23626|    0|   142|        4|2016-06-08 23:34:...|25076|        23626|   142|  21|    movies|
# >>> |   231|2016-06-08 00:45:...| 376796|        23626|    0|   142|        4|2016-06-09 16:32:...|26220|        23626|   142|  16|  clothing|
# >>> |   231|2016-06-08 00:45:...| 376796|        23626|    0|   142|        4|2016-06-10 10:43:...|28180|        27925|   142|  13| computers|
# >>> +------+--------------------+-------+-------------+-----+------+---------+--------------------+-----+-------------+------+----+----------+
# >>> only showing top 6 rows

Spark Streaming

Spark Steamingでバッチインターバルやウインドウ幅の設定などをする。

PySpark
import re
def parse(line):
    match = re.search("Sx=(\d+)", line)
    if match:
        val = match.group(1)
        return [int(val)]
    return []

from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc,1)
lines = ssc.socketTextStream("rtd.hpwren.ucsd.edu", 12020)
vals = lines.flatMap(parse)
window = vals.window(10,5)

ウィンドウ内の各データに対して最大、最小、平均、標準偏差を求める

PySpark
def stats(rdd):
    print(rdd.collect())
    if rdd.count() > 0:
        print("max = {}, min = {}, mean = {}, stdev = {}".format(rdd.max(), rdd.min(), rdd.mean(), rdd.stdev()))
window.foreachRDD(lambda rdd: stats(rdd))
ssc.start()

# >>> [10, 10, 10]
# >>> max = 10, min = 10, mean = 10.0, stdev = 0.0
# >>> [10, 10, 10, 10, 10, 10, 10, 10]
# >>> max = 10, min = 10, mean = 10.0, stdev = 0.0
# >>> [10, 10, 10, 10, 10, 9, 8, 8, 8, 8]
# >>> max = 10, min = 8, mean = 9.1, stdev = 0.9433981132056604
# >>> [9, 8, 8, 8, 8, 8, 8, 8, 8, 8]
# >>> max = 9, min = 8, mean = 8.1, stdev = 0.29999999999999993
# >>> [8, 8, 8, 8, 8, 8, 8, 8, 8, 8]
# >>> max = 8, min = 8, mean = 8.0, stdev = 0.0

ストリーミングを停止する。

PySpark
ssc.stop()

# >>> [8, 8, 8, 8, 8, 8, 8, 8, 8, 8]
# >>> max = 8, min = 8, mean = 8.0, stdev = 0.0

Spark MLlib

JPEXのスポット市場取引データに対して決定木アルゴリズムを試用する。

データを読み込む。

PySpark
rom pyspark.sql import SQLContext
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer
sqlContext = SQLContext(sc)
df = sqlContext.read.load("file:///home/cloudera/tmp4/spot_2017.csv",
                          format="com.databricks.spark.csv",
                          header='true', inferSchema='true')
df.count()
# >>> 15312

df.columns
# >>> ['id',
# >>>  'date',
# >>>  'time',
# >>>  'sell_kWh',
# >>>  'buy_kWh',
# >>>  'contract_kWh',
# >>>  'sys_price',
# >>>  'price_hokkaido',
# >>>  'price_tohoku',
# >>>  'price_tokyo',
# >>>  'price_chubu',
# >>>  'price_hokuriku',
# >>>  'price_kansai',
# >>>  'price_chugoku',
# >>>  'price_shikoku',
# >>>  'price_kyushu']

システムプライスが9円/kWhを超えるかどうかを予測対象とし、
各地域のエリアプライスを特徴量とする。

PySpark
binarizer = Binarizer(threshold=9.0, inputCol="sys_price", outputCol="label")
binarizedDF = binarizer.transform(df)
binarizedDF.select("sys_price","label").show(6)
# >>> +---------+-----+
# >>> |sys_price|label|
# >>> +---------+-----+
# >>> |    10.23|  1.0|
# >>> |     9.62|  1.0|
# >>> |     9.17|  1.0|
# >>> |      8.8|  0.0|
# >>> |     8.89|  0.0|
# >>> |     8.69|  0.0|
# >>> +---------+-----+
# >>> only showing top 6 rows             

featureColumns = [ 'price_hokkaido',
                   'price_tohoku',
                   'price_tokyo',
                   'price_chubu',
                   'price_hokuriku',
                   'price_kansai',
                   'price_chugoku',
                   'price_shikoku',
                   'price_kyushu']
assembler = VectorAssembler(inputCols=featureColumns, outputCol="features")
assembled = assembler.transform(binarizedDF)

訓練データとテストデータに分ける。

PySpark
(trainingData, testData) = assembled.randomSplit([0.8, 0.2], seed=12345)
(trainingData.count(), testData.count())
# >>> (12194, 3118)

学習と予測

PySpark
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=5,
                            minInstancesPerNode=20, impurity="gini")
pipeline = Pipeline(stages=[dt])
model = pipeline.fit(trainingData)
predictions = model.transform(testData)
predictions.select("prediction", "label").show(10)
# >>> +----------+-----+
# >>> |prediction|label|
# >>> +----------+-----+
# >>> |       0.0|  0.0|
# >>> |       0.0|  0.0|
# >>> |       0.0|  0.0|
# >>> |       1.0|  1.0|
# >>> |       0.0|  1.0|
# >>> |       1.0|  1.0|
# >>> |       1.0|  1.0|
# >>> |       1.0|  1.0|
# >>> |       1.0|  1.0|
# >>> |       0.0|  0.0|
# >>> +----------+-----+
# >>> only showing top 10 rows

# predictions.select("prediction", "label").write.save(path="file:///home/cloudera/tmp4/predictions.csv",
#                                                      format="com.databricks.spark.csv",
#                                                      header="true")

予測精度による評価

PySpark
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
evaluator = MulticlassClassificationEvaluator(labelCol="label", 
                                              predictionCol="prediction",
                                              metricName="precision")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))
# >>> Accuracy = 0.949968

混同行列(Confusion Matix)による評価

PySpark
predictions = predictions.select("prediction", "label")
# predictions.rdd.take(2)
# >>> [Row(prediction=0.0, label=0.0), Row(prediction=0.0, label=0.0)]

# predictions.rdd.map(tuple).take(2)
# >>> [(0.0, 0.0), (0.0, 0.0)]

metrics = MulticlassMetrics(predictions.rdd.map(tuple))
metrics.confusionMatrix().toArray().transpose()
# >>> array([[ 1755.,    84.],
# >>>        [   72.,  1207.]])

参考

Big Data Specialization, UCSanDiego, cousera

0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2