LoginSignup
8
6

More than 5 years have passed since last update.

Apache Sparkとpythonでワードカウント(Mac OSX)

Posted at

概要

Apache Sparkの検証の第一歩として。
Hadoop経験者であればよくご存知かと思いますが、ファイル内の同一の語句をカウントするアレです。
環境はMac OSXですが、Linuxでもほぼ同じかと。
コード一式はこちら

インストール

$ brew install apache-spark

インストール確認

spark-shellが動いてscala>が表示されればOK

$ /usr/local/bin/spark-shell
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
16/04/07 16:44:47 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/04/07 16:44:47 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/04/07 16:44:51 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/04/07 16:44:51 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
16/04/07 16:44:53 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/04/07 16:44:53 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/04/07 16:44:56 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/04/07 16:44:56 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
SQL context available as sqlContext.

scala>

pythonでローカルファイルのワードカウントをしてみる

こちらは、公式サイトの記載を参考に書きました。

ディレクトリ構成

下記のように準備してください。

$ tree
.
├── input
│   └── data # 読み込むテキスト
└── wordcount.py # 実行スクリプト

1 directory, 4 files

コードを書く

ここではpythonを使います。
scalaやJavaでも書けるよう。得意なのでいきましょう。
こんな感じ。

wordcount.py
#!/usr/bin/env python
# coding:utf-8

from pyspark import SparkContext

def execute(sc, src, dest):
    '''
    ワードカウントを実行する
    '''
    # srcファイルを読み込み
    text_file = sc.textFile(src)
    counts = text_file.flatMap(lambda line: line.split(" ")) \
                 .map(lambda word: (word, 1)) \
                 .reduceByKey(lambda a, b: a + b)
    # 結果を書き出し
    counts.saveAsTextFile(dest)

if __name__ == '__main__':
    sc = SparkContext('local', 'WordCount')
    src  = './input'
    dest = './output'
    execute(sc, src, dest)

読み込みファイル準備

適当に。
例えばこんな感じ。

aaa
bbb
ccc
aaa
bbbb
ccc
aaa

実行

下記コマンド。

$ which pyspark
/usr/local/bin/pyspark

# 実行
$ pyspark ./wordcount.py

実行するとダーッとログが流れます。(Hadoop Streamingみたい)

確認

(u'aaa', 3)
(u'bbbb', 1)
(u'bbb', 1)
(u'ccc', 2)

正しくカウントされました。

おまけ

出力先ディレクトリ(./output)が生成済の場合、次回の処理に失敗するので注意。
下記のようなシェルを同じディレクトリに添えておくと良いです。

exec.sh
#!/bin/bash

rm -fR ./output
/usr/local/bin/pyspark ./wordcount.py

echo ">>>>> result"
cat ./output/*
$ sh exec.sh
・・・
>>>>> result
(u'aaa', 3)
(u'bbbb', 1)
(u'bbb', 1)
(u'ccc', 2)
8
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
6