LoginSignup
7
8

More than 5 years have passed since last update.

sparkを使って郵便番号情報を抽出

Posted at

            sparkを使うと処理が速い
と小耳に挟んだので一度動かしてみた。

初期設定

Homebrew を使って下記の通り導入しました。

brew install apache-spark

データ取得

下記のサイトからCSV形式の郵便番号データを取得した。
zipcloud

起動方法

Homebrewでインストールしたので、下記のapache-sparkのフォルダ配下へ移動する。

cd /usr/local/Cellar/apache-spark/1.5.2/bin/

どうやらsparkはscala,java,python,Rに対応しているようですが、自分はpythonを使いたかったため

pyspark

で起動する。
"spark"のマークが見えたらOK。

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.2
      /_/

Using Python version 2.7.11 (default, Dec 26 2015 17:47:53)
SparkContext available as sc, HiveContext available as sqlContext.
>>>

実装内容

全国の郵便番号のデータは全部で約12万件ある。
実装する前に、大量にデータを処理することを想定してデータを88回コピペし、約1,200万件に倍加してCSVファイルを新たに作成した。
そして、
- 郵便番号に"7"が含んでいるもの
- 市町村名に動物の名前が含んでいるもの
をAND条件で検索するように実装してみた。

sparkSample.py
# -*- coding: utf-8 -*-
import time
from pyspark import SparkContext

def main():
    #検索したい漢字
    queryList = ["鹿","鳥","熊","猿","犬"]

    #時間計測スタート
    start = time.time()

    #データセット
    sc = SparkContext('local', 'Simple App')
    logData = sc.textFile('KEN_ALL_OVER_TEN_MILLION.CSV')

    #各クエリ毎に情報を抽出
    for item in queryList:
        #splitでリスト化
        lines = logData.map(lambda x: x.split(','))
        #郵便番号が7を含むものを抽出
        numberPicks = lines.filter(lambda s: unicode('7', 'utf-8') in s[2])
        #市町村名に対象の漢字が含んでいるものを抽出
        namePicks = lines.filter(lambda s: unicode(item, 'utf-8') in s[7])
        #リストに格納
        desList = namePicks.collect()

        #ログ出力
        for line in desList:
            s = u""
            for i, unit in enumerate(line):
                if i != 0:
                    s = s + u', '
                s = s + unit
            print s.encode('utf-8')

        #ヒット数出力
        outlog = "query:" + item.decode('utf-8') + u", count:" + \
            unicode(str(len(desList)), 'utf-8') + ", Time:{0}".format(time.time() - start) + u"[sec]"
        print outlog.encode('utf-8')    

    #時間計測ストップ
    finish_time = time.time() - start
    print u"Time[total]:{0}".format(finish_time) + u"[sec]"

    #終了処理
    sc.stop()

if __name__ == '__main__':
    main()

評価をするにあたり、sparkを使わない場合のコードも書いてみた。

plain.py
# -*- coding: utf-8 -*-
import time

def pickAnimal(recordList, qList, start):
    #各クエリ毎に情報を抽出
    for q in qList:
        count = 0
        for record in recordList:
            sepRecord = record.split(",")
            if len(sepRecord) == 15:
                #郵便番号が7を含むものを抽出
                #市町村名に対象の漢字が含んでいるものを抽出
                if -1 < sepRecord[2].find("7") and -1 < sepRecord[7].find(q):
                    count = count + 1
                    #ログ出力
                    print record
                    #ヒット数出力
        print "query:" + q + ", count:" + str(count) + ", Time:{0}".format(time.time() - start) + "[sec]"

def main():

    sepRecordList = []
    #検索したい漢字
    queryList = ["鹿","鳥","熊","猿","犬"]

    #データセット
    srcpath = "KEN_ALL_OVER_TEN_MILLION.CSV"
    srcIN = open(srcpath, 'r')

    #時間計測スタート
    start = time.time()
    for line in srcIN:
        sepRecordList.append(line)

    pickAnimal(sepRecordList, queryList, start)

    #時間計測ストップ
    finish_time = time.time() - start
    print "Time:{0}".format(finish_time) + "[sec]"

    #終了処理
    srcIN.close()

if __name__ == '__main__':
    main()

計測結果

$pyspark sparkSample.py
~(中略)~
Time[total]:645.52906394[sec]
$python plain.py
~(中略)~
Time:112.966698885[sec]

ふ、普通に実装した方が約6倍速い。。。

編集後記

速さを実感するためには、分散処理の環境を整えるか機械学習で大量のデータを試行を繰り返さないとどうやらダメみたいです。
sparkの威力を肌で体験できるまでが次の目標な気がしました。

参考

7
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
8