Posted at

Apache Sparkとpythonでワードカウント(Mac OSX)

More than 3 years have passed since last update.


概要

Apache Sparkの検証の第一歩として。

Hadoop経験者であればよくご存知かと思いますが、ファイル内の同一の語句をカウントするアレです。

環境はMac OSXですが、Linuxでもほぼ同じかと。

コード一式はこちら


インストール

$ brew install apache-spark


インストール確認

spark-shellが動いてscala>が表示されればOK

$ /usr/local/bin/spark-shell

log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.1
/_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
16/04/07 16:44:47 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/04/07 16:44:47 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/04/07 16:44:51 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/04/07 16:44:51 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
16/04/07 16:44:53 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/04/07 16:44:53 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/04/07 16:44:56 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/04/07 16:44:56 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
SQL context available as sqlContext.

scala>


pythonでローカルファイルのワードカウントをしてみる

こちらは、公式サイトの記載を参考に書きました。


ディレクトリ構成

下記のように準備してください。

$ tree

.
├── input
│   └── data # 読み込むテキスト
└── wordcount.py # 実行スクリプト

1 directory, 4 files


コードを書く

ここではpythonを使います。

scalaやJavaでも書けるよう。得意なのでいきましょう。

こんな感じ。


wordcount.py

#!/usr/bin/env python

# coding:utf-8

from pyspark import SparkContext

def execute(sc, src, dest):
'''
ワードカウントを実行する
'''

# srcファイルを読み込み
text_file = sc.textFile(src)
counts = text_file.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
# 結果を書き出し
counts.saveAsTextFile(dest)

if __name__ == '__main__':
sc = SparkContext('local', 'WordCount')
src = './input'
dest = './output'
execute(sc, src, dest)



読み込みファイル準備

適当に。

例えばこんな感じ。

aaa

bbb
ccc
aaa
bbbb
ccc
aaa


実行

下記コマンド。

$ which pyspark

/usr/local/bin/pyspark

# 実行
$ pyspark ./wordcount.py

実行するとダーッとログが流れます。(Hadoop Streamingみたい)


確認

(u'aaa', 3)

(u'bbbb', 1)
(u'bbb', 1)
(u'ccc', 2)

正しくカウントされました。


おまけ

出力先ディレクトリ(./output)が生成済の場合、次回の処理に失敗するので注意。

下記のようなシェルを同じディレクトリに添えておくと良いです。


exec.sh

#!/bin/bash

rm -fR ./output
/usr/local/bin/pyspark ./wordcount.py

echo ">>>>> result"
cat ./output/*


$ sh exec.sh

・・・
>>>>> result
(u'aaa', 3)
(u'bbbb', 1)
(u'bbb', 1)
(u'ccc', 2)