sparkを使うと処理が速い
と小耳に挟んだので一度動かしてみた。
#初期設定
Homebrew を使って下記の通り導入しました。
brew install apache-spark
#データ取得
下記のサイトからCSV形式の郵便番号データを取得した。
zipcloud
#起動方法
Homebrewでインストールしたので、下記のapache-sparkのフォルダ配下へ移動する。
cd /usr/local/Cellar/apache-spark/1.5.2/bin/
どうやらsparkはscala,java,python,Rに対応しているようですが、自分はpythonを使いたかったため
pyspark
で起動する。
"spark"のマークが見えたらOK。
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.5.2
/_/
Using Python version 2.7.11 (default, Dec 26 2015 17:47:53)
SparkContext available as sc, HiveContext available as sqlContext.
>>>
#実装内容
全国の郵便番号のデータは全部で約12万件ある。
実装する前に、大量にデータを処理することを想定してデータを88回コピペし、約1,200万件に倍加してCSVファイルを新たに作成した。
そして、
- 郵便番号に"7"が含んでいるもの
-
市町村名に動物の名前が含んでいるもの
をAND条件で検索するように実装してみた。
# -*- coding: utf-8 -*-
import time
from pyspark import SparkContext
def main():
#検索したい漢字
queryList = ["鹿","鳥","熊","猿","犬"]
#時間計測スタート
start = time.time()
#データセット
sc = SparkContext('local', 'Simple App')
logData = sc.textFile('KEN_ALL_OVER_TEN_MILLION.CSV')
#各クエリ毎に情報を抽出
for item in queryList:
#splitでリスト化
lines = logData.map(lambda x: x.split(','))
#郵便番号が7を含むものを抽出
numberPicks = lines.filter(lambda s: unicode('7', 'utf-8') in s[2])
#市町村名に対象の漢字が含んでいるものを抽出
namePicks = lines.filter(lambda s: unicode(item, 'utf-8') in s[7])
#リストに格納
desList = namePicks.collect()
#ログ出力
for line in desList:
s = u""
for i, unit in enumerate(line):
if i != 0:
s = s + u', '
s = s + unit
print s.encode('utf-8')
#ヒット数出力
outlog = "query:" + item.decode('utf-8') + u", count:" + \
unicode(str(len(desList)), 'utf-8') + ", Time:{0}".format(time.time() - start) + u"[sec]"
print outlog.encode('utf-8')
#時間計測ストップ
finish_time = time.time() - start
print u"Time[total]:{0}".format(finish_time) + u"[sec]"
#終了処理
sc.stop()
if __name__ == '__main__':
main()
評価をするにあたり、sparkを使わない場合のコードも書いてみた。
# -*- coding: utf-8 -*-
import time
def pickAnimal(recordList, qList, start):
#各クエリ毎に情報を抽出
for q in qList:
count = 0
for record in recordList:
sepRecord = record.split(",")
if len(sepRecord) == 15:
#郵便番号が7を含むものを抽出
#市町村名に対象の漢字が含んでいるものを抽出
if -1 < sepRecord[2].find("7") and -1 < sepRecord[7].find(q):
count = count + 1
#ログ出力
print record
#ヒット数出力
print "query:" + q + ", count:" + str(count) + ", Time:{0}".format(time.time() - start) + "[sec]"
def main():
sepRecordList = []
#検索したい漢字
queryList = ["鹿","鳥","熊","猿","犬"]
#データセット
srcpath = "KEN_ALL_OVER_TEN_MILLION.CSV"
srcIN = open(srcpath, 'r')
#時間計測スタート
start = time.time()
for line in srcIN:
sepRecordList.append(line)
pickAnimal(sepRecordList, queryList, start)
#時間計測ストップ
finish_time = time.time() - start
print "Time:{0}".format(finish_time) + "[sec]"
#終了処理
srcIN.close()
if __name__ == '__main__':
main()
#計測結果
$pyspark sparkSample.py
~(中略)~
Time[total]:645.52906394[sec]
$python plain.py
~(中略)~
Time:112.966698885[sec]
ふ、普通に実装した方が約6倍速い。。。
#編集後記
速さを実感するためには、分散処理の環境を整えるか機械学習で大量のデータを試行を繰り返さないとどうやらダメみたいです。
sparkの威力を肌で体験できるまでが次の目標な気がしました。
#参考
-
Spark Programming Guid
(https://spark.apache.org/docs/1.2.0/programming-guide.html) -
SparkContextメモ
(http://www.ne.jp/asahi/hishidama/home/tech/scala/spark/SparkContext.html) -
Apache Spark をPythonで操作する(pyspark)
(http://symfoware.blog68.fc2.com/blog-entry-1188.html)