LoginSignup
5

More than 5 years have passed since last update.

今更ならがApache Sparkをつかってみる

Last updated at Posted at 2016-07-23

今更ではありますが、いじってなかったのでapache sparkをいじってみようかと思い少しやってみました。
目的としては、これを理解して更に機械学習系のライブラリも使って何かやってみたいなーと思ったからです。
これはその学習覚書的な位置づけです。

ちなみにMLlibの箇所は、こちらの記事も参考にさせていただきました :bow:

スクリーンショット 2016-07-24 1.01.06.png

セットアップ

  • java 1.8
  • mac
  • apache spark 1.6系
  1. sparkのダウンロード
    apache sparkのダウンロードはここからダウンロードできます。

  2. 解凍 & SPARK_HOMEを設定してbinディレクトリにパスを通しておきます。

> tar xfv spark-1.6.1-bin-hadoop2.6.tgz

> mv spark-1.6.1-bin-hadoop2.6 spark-1.6.1

~/.bashrc
export SPARK_HOME=/usr/local/project/apache-spark/spark-1.6.1
export PATH=$PATH:$SPARK_HOME/bin

source ~/.bashrc

  1. pathが通っているか確認します。
> spark-shell

Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_25)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
16/07/23 23:53:02 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/07/23 23:53:03 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/07/23 23:53:09 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/07/23 23:53:09 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
16/07/23 23:53:13 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/07/23 23:53:14 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/07/23 23:53:20 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/07/23 23:53:20 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
SQL context available as sqlContext.

scala>

こんな感じに起動したら、ひとまずセットアップは終わりです。

簡単にいじる

ここのページquick startの内容を簡単やってみると良いでしょう。

> pyspark

> textFile = sc.textFile("README.md")

> textFile.count()

>  textFile.first()

quick startの中身を見てみる

これもQuick startに書いてあるやつなんですが、やってみました。

>>> def max(a, b):
...   if a > b:
...     return a
...   else:
...     return b
...
>>> textFile.map(lambda line: len(line.split())).reduce(max)
>>> wordCounts.collect()
16/07/25 12:58:36 INFO SparkContext: Starting job: collect at <stdin>:1
16/07/25 12:58:36 INFO DAGScheduler: Registering RDD 9 (reduceByKey at <stdin>:1)
16/07/25 12:58:36 INFO DAGScheduler: Got job 5 (collect at <stdin>:1) with 2 output partitions
16/07/25 12:58:36 INFO DAGScheduler: Final stage: ResultStage 6 (collect at <stdin>:1)
16/07/25 12:58:36 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 5)
16/07/25 12:58:36 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 5)
16/07/25 12:58:36 INFO DAGScheduler: Submitting ShuffleMapStage 5 (PairwiseRDD[9] at reduceByKey at <stdin>:1), which has no missing parents
16/07/25 12:58:36 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 8.2 KB, free 195.2 KB)
16/07/25 12:58:36 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 5.2 KB, free 200.4 KB)
16/07/25 12:58:36 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on localhost:54829 (size: 5.2 KB, free: 511.1 MB)
16/07/25 12:58:36 INFO SparkContext: Created broadcast 6 from broadcast at DAGScheduler.scala:1006
16/07/25 12:58:36 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 5 (PairwiseRDD[9] at reduceByKey at <stdin>:1)
16/07/25 12:58:36 INFO TaskSchedulerImpl: Adding task set 5.0 with 2 tasks
16/07/25 12:58:36 INFO TaskSetManager: Starting task 0.0 in stage 5.0 (TID 9, localhost, partition 0,PROCESS_LOCAL, 2149 bytes)
16/07/25 12:58:36 INFO TaskSetManager: Starting task 1.0 in stage 5.0 (TID 10, localhost, partition 1,PROCESS_LOCAL, 2149 bytes)
16/07/25 12:58:36 INFO Executor: Running task 0.0 in stage 5.0 (TID 9)
16/07/25 12:58:36 INFO Executor: Running task 1.0 in stage 5.0 (TID 10)
16/07/25 12:58:36 INFO HadoopRDD: Input split: file:/usr/local/project/apache-spark/spark-1.6.2/README.md:1679+1680
16/07/25 12:58:36 INFO HadoopRDD: Input split: file:/usr/local/project/apache-spark/spark-1.6.2/README.md:0+1679
/usr/local/project/apache-spark/spark-1.6.2/python/lib/pyspark.zip/pyspark/shuffle.py:58: UserWarning: Please install psutil to have better support with spilling
/usr/local/project/apache-spark/spark-1.6.2/python/lib/pyspark.zip/pyspark/shuffle.py:58: UserWarning: Please install psutil to have better support with spilling
16/07/25 12:58:36 INFO PythonRunner: Times: total = 70, boot = 12, init = 10, finish = 48
16/07/25 12:58:36 INFO PythonRunner: Times: total = 65, boot = 6, init = 10, finish = 49
16/07/25 12:58:36 INFO Executor: Finished task 0.0 in stage 5.0 (TID 9). 2318 bytes result sent to driver
16/07/25 12:58:36 INFO Executor: Finished task 1.0 in stage 5.0 (TID 10). 2318 bytes result sent to driver
16/07/25 12:58:36 INFO TaskSetManager: Finished task 0.0 in stage 5.0 (TID 9) in 150 ms on localhost (1/2)
16/07/25 12:58:36 INFO TaskSetManager: Finished task 1.0 in stage 5.0 (TID 10) in 146 ms on localhost (2/2)
16/07/25 12:58:36 INFO TaskSchedulerImpl: Removed TaskSet 5.0, whose tasks have all completed, from pool
16/07/25 12:58:36 INFO DAGScheduler: ShuffleMapStage 5 (reduceByKey at <stdin>:1) finished in 0.154 s
16/07/25 12:58:36 INFO DAGScheduler: looking for newly runnable stages
16/07/25 12:58:36 INFO DAGScheduler: running: Set()
16/07/25 12:58:36 INFO DAGScheduler: waiting: Set(ResultStage 6)
16/07/25 12:58:36 INFO DAGScheduler: failed: Set()
16/07/25 12:58:36 INFO DAGScheduler: Submitting ResultStage 6 (PythonRDD[12] at collect at <stdin>:1), which has no missing parents
16/07/25 12:58:36 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 5.1 KB, free 205.4 KB)
16/07/25 12:58:36 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 3.2 KB, free 208.6 KB)
16/07/25 12:58:36 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on localhost:54829 (size: 3.2 KB, free: 511.1 MB)
16/07/25 12:58:36 INFO SparkContext: Created broadcast 7 from broadcast at DAGScheduler.scala:1006
16/07/25 12:58:36 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 6 (PythonRDD[12] at collect at <stdin>:1)
16/07/25 12:58:36 INFO TaskSchedulerImpl: Adding task set 6.0 with 2 tasks
16/07/25 12:58:36 INFO TaskSetManager: Starting task 0.0 in stage 6.0 (TID 11, localhost, partition 0,NODE_LOCAL, 1894 bytes)
16/07/25 12:58:36 INFO TaskSetManager: Starting task 1.0 in stage 6.0 (TID 12, localhost, partition 1,NODE_LOCAL, 1894 bytes)
16/07/25 12:58:36 INFO Executor: Running task 0.0 in stage 6.0 (TID 11)
16/07/25 12:58:36 INFO Executor: Running task 1.0 in stage 6.0 (TID 12)
16/07/25 12:58:36 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
16/07/25 12:58:36 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
16/07/25 12:58:36 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 10 ms
16/07/25 12:58:36 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 10 ms
16/07/25 12:58:36 INFO PythonRunner: Times: total = 10, boot = -97, init = 106, finish = 1
16/07/25 12:58:36 INFO Executor: Finished task 1.0 in stage 6.0 (TID 12). 3633 bytes result sent to driver
16/07/25 12:58:36 INFO PythonRunner: Times: total = 15, boot = -101, init = 116, finish = 0
16/07/25 12:58:36 INFO Executor: Finished task 0.0 in stage 6.0 (TID 11). 3853 bytes result sent to driver
16/07/25 12:58:36 INFO TaskSetManager: Finished task 1.0 in stage 6.0 (TID 12) in 61 ms on localhost (1/2)
16/07/25 12:58:36 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 11) in 64 ms on localhost (2/2)
16/07/25 12:58:36 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool
16/07/25 12:58:36 INFO DAGScheduler: ResultStage 6 (collect at <stdin>:1) finished in 0.070 s
16/07/25 12:58:36 INFO DAGScheduler: Job 5 finished: collect at <stdin>:1, took 0.294915 s
[(u'when', 1), (u'R,', 1), (u'including', 3), (u'computation', 1), (u'using:', 1), (u'guidance', 2), (u'Scala,', 1), (u'environment', 1), (u'only', 1), (u'rich', 1), (u'Apache', 1), (u'sc.parallelize(range(1000)).count()', 1), (u'Building', 1), (u'guide,', 1), (u'return', 2), (u'Please', 3), (u'Try', 1), (u'not', 1), (u'Spark', 13), (u'scala>', 1), (u'Note', 1), (u'cluster.', 1), (u'./bin/pyspark', 1), (u'params', 1), (u'through', 1), (u'GraphX', 1), (u'[run', 1), (u'abbreviated', 1), (u'[project', 2), (u'##', 8), (u'library', 1), (u'see', 1), (u'"local"', 1), (u'[Apache', 1), (u'will', 1), (u'#', 1), (u'processing,', 1), (u'for', 11), (u'[building', 1), (u'provides', 1), (u'print', 1), (u'supports', 2), (u'built,', 1), (u'[params]`.', 1), (u'available', 1), (u'run', 7), (u'tests](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools).', 1), (u'This', 2), (u'Hadoop,', 2), (u'Tests', 1), (u'example:', 1), (u'-DskipTests', 1), (u'Maven](http://maven.apache.org/).', 1), (u'programming', 1), (u'running', 1), (u'against', 1), (u'site,', 1), (u'comes', 1), (u'package.', 1), (u'and', 10), (u'package.)', 1), (u'prefer', 1), (u'documentation,', 1), (u'submit', 1), (u'tools', 1), (u'use', 3), (u'from', 1), (u'For', 2), (u'./bin/run-example', 2), (u'fast', 1), (u'systems.', 1), (u'<http://spark.apache.org/>', 1), (u'Hadoop-supported', 1), (u'way', 1), (u'README', 1), (u'MASTER', 1), (u'engine', 1), (u'building', 2), (u'usage', 1), (u'instance:', 1), (u'with', 3), (u'protocols', 1), (u'And', 1), (u'this', 1), (u'setup', 1), (u'shell:', 2), (u'project', 1), (u'following', 2), (u'distribution', 1), (u'detailed', 2), (u'have', 1), (u'stream', 1), (u'is', 6), (u'higher-level', 1), (u'tests', 2), (u'1000:', 2), (u'sample', 1), (u'["Specifying', 1), (u'Alternatively,', 1), (u'file', 1), (u'need', 1), (u'You', 3), (u'instructions.', 1), (u'different', 1), (u'programs,', 1), (u'storage', 1), (u'same', 1), (u'machine', 1), (u'Running', 1), (u'which', 2), (u'you', 4), (u'A', 1), (u'About', 1), (u'sc.parallelize(1', 1), (u'locally.', 1), (u'Hive', 2), (u'optimized', 1), (u'uses', 1), (u'Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version)', 1), (u'variable', 1), (u'The', 1), (u'data', 1), (u'a', 8), (u'"yarn"', 1), (u'Thriftserver', 1), (u'processing.', 1), (u'./bin/spark-shell', 1), (u'Python', 2), (u'Spark](#building-spark).', 1), (u'clean', 1), (u'the', 21), (u'requires', 1), (u'talk', 1), (u'help', 1), (u'Hadoop', 3), (u'high-level', 1), (u'find', 1), (u'web', 1), (u'Shell', 2), (u'how', 2), (u'graph', 1), (u'run:', 1), (u'should', 2), (u'to', 14), (u'module,', 1), (u'given.', 1), (u'directory.', 1), (u'must', 1), (u'SparkPi', 2), (u'do', 2), (u'Programs', 1), (u'Many', 1), (u'YARN,', 1), (u'using', 2), (u'Example', 1), (u'Once', 1), (u'HDFS', 1), (u'Because', 1), (u'name', 1), (u'Testing', 1), (u'refer', 2), (u'Streaming', 1), (u'SQL', 2), (u'them,', 1), (u'analysis.', 1), (u'set', 2), (u'Scala', 2), (u'thread,', 1), (u'individual', 1), (u'examples', 2), (u'changed', 1), (u'runs.', 1), (u'Pi', 1), (u'More', 1), (u'Python,', 2), (u'Versions', 1), (u'its', 1), (u'version', 1), (u'wiki](https://cwiki.apache.org/confluence/display/SPARK).', 1), (u'`./bin/run-example', 1), (u'Configuration', 1), (u'command,', 2), (u'can', 6), (u'core', 1), (u'Guide](http://spark.apache.org/docs/latest/configuration.html)', 1), (u'MASTER=spark://host:7077', 1), (u'Documentation', 1), (u'downloaded', 1), (u'distributions.', 1), (u'Spark.', 1), (u'Spark"](http://spark.apache.org/docs/latest/building-spark.html).', 1), (u'["Building', 1), (u'`examples`', 2), (u'on', 5), (u'package', 1), (u'of', 5), (u'APIs', 1), (u'pre-built', 1), (u'Big', 1), (u'or', 3), (u'learning,', 1), (u'locally', 2), (u'overview', 1), (u'one', 2), (u'(You', 1), (u'Online', 1), (u'versions', 1), (u'your', 1), (u'threads.', 1), (u'>>>', 1), (u'spark://', 1), (u'contains', 1), (u'system', 1), (u'start', 1), (u'build/mvn', 1), (u'basic', 1), (u'configure', 1), (u'that', 2), (u'N', 1), (u'"local[N]"', 1), (u'DataFrames,', 1), (u'particular', 2), (u'be', 2), (u'an', 3), (u'easiest', 1), (u'Interactive', 2), (u'cluster', 2), (u'page](http://spark.apache.org/documentation.html)', 1), (u'<class>', 1), (u'example', 3), (u'are', 1), (u'Data.', 1), (u'mesos://', 1), (u'computing', 1), (u'URL,', 1), (u'in', 5), (u'general', 2), (u'To', 2), (u'at', 2), (u'1000).count()', 1), (u'if', 4), (u'built', 1), (u'no', 1), (u'Java,', 1), (u'MLlib', 1), (u'also', 4), (u'other', 1), (u'build', 3), (u'online', 1), (u'several', 1), (u'[Configuration', 1), (u'class', 2), (u'programs', 2), (u'documentation', 3), (u'It', 2), (u'graphs', 1), (u'./dev/run-tests', 1), (u'first', 1), (u'latest', 1)]
>>>

Spark Programming Guideの中を順にやってみる

Getting Startから

rdd = sc.parallelize(range(1, 10)).map(lambda x: (x, 'a' * x))
rdd.saveAsSequenceFile('~/hoge.txt')
sorted(sc.sequenceFile('~/hoge.txt').collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa'), (4, u'aaaa'), (5, u'aaaaa'), (6, u'aaaaaa'), (7, u'aaaaaaa'), (8, u'aaaaaaaa'), (9, u'aaaaaaaaa')]

MLlib

ここを見ながらやる

Collaborative Filtering - RDD-based APIはレコメンドエンジンなどでよく聞くから、それをやってみる

テストデータは[これ](https://github.com/apache/spark/blob/master/data/mllib/als/test.data)です。

最後にpredictメソッドでデータを予想していますが、テストデータでは3,2,5.0なので、5に近い値が算出されれば良いのかと思っています。

from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
data = sc.textFile("data/mllib/als/test.data")
ratings = data.map(lambda l: l.split(',')).map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
rank = 10
numIterations = 10
model = ALS.train(ratings, rank, numIterations)
model.predict(3, 2) 
・・・・・・・・・・・・・・・・
・・・・
4.996948080474724

4.99なので、ほぼ5。結果としては良いのかなと思っています。

MLlib以外のサンプルについては、また別記事にします。
一旦この記事はここまで

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5