analytics
Spark
bigdata
sparksql

Sparkを使って分析アプリを実現(pyspark/sparksql)

More than 1 year has passed since last update.

実行環境

macOS 10.13.1 (High Sierra)
Spark 2.2.0
Python 2.7.12

Spark インストール

mac os で brew install apache-spark と撃ちこむ。
windows / linux でもモジュールをダウンロードして展開するだけ。

# データの準備

今回は BAY AREA BikeShare のオープンデータを利用

curl コマンドで取得し、unzip

curl -O https://s3.amazonaws.com/babs-open-data/babs_open_data_year_2.zip
unzip babs_open_data_year_2.zip

ホームページ
http://www.bayareabikeshare.com/open-data/

sparkシェルの実行

pyspark で起動する。

~/dev/spark $ pyspark 
Python 2.7.12 |Continuum Analytics, Inc.| (default, Jul  2 2016, 17:43:17) 
[GCC 4.2.1 (Based on Apple Inc. build 5658) (LLVM build 2336.11.00)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Anaconda is brought to you by Continuum Analytics.
Please check out: http://continuum.io/thanks and https://anaconda.org
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/12/17 21:49:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/12/17 21:49:22 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Python version 2.7.12 (default, Jul  2 2016 17:43:17)
SparkSession available as 'spark'.
>>> 

CSVファイルを読み込ませる。

>>> from pyspark.sql.types import *
>>> df = spark.read.csv('201508_station_data.csv')

このまま df.show(5) と実行すると5行表示する。簡単に取得できる。

>>> df.show(5)
+----------+--------------------+---------+-----------+---------+--------+------------+
|       _c0|                 _c1|      _c2|        _c3|      _c4|     _c5|         _c6|
+----------+--------------------+---------+-----------+---------+--------+------------+
|station_id|                name|      lat|       long|dockcount|landmark|installation|
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|San Jose|    8/6/2013|
|         3|San Jose Civic Ce...|37.330698|-121.888979|       15|San Jose|    8/5/2013|
|         4|Santa Clara at Al...|37.333988|-121.894902|       11|San Jose|    8/6/2013|
|         5|    Adobe on Almaden|37.331415|  -121.8932|       19|San Jose|    8/5/2013|
+----------+--------------------+---------+-----------+---------+--------+------------+
only showing top 5 rows

カラム名が _c0 などとなってしまうので、カラム定義を行って再度読み込ませてみる。

from pyspark.sql.types import *

struct = StructType([
        StructField('station_id', IntegerType(), False),
        StructField('name', StringType(), False),
        StructField('lat', DoubleType(), False),
        StructField('long', DoubleType(), False),
        StructField('dockcount', IntegerType(), False),
        StructField('landmar', StringType(), False),
        StructField('installation', StringType(), False)
     ])

df = spark.read.csv('201508_station_data.csv', header='true', schema=struct)

CSV取込みは pyspark.sql.DataFrameReader を利用している。header 行のオプションは下記を参照した。[https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader]

実行結果

>>> df.show(5)
+----------+--------------------+---------+-----------+---------+--------+------------+
|station_id|                name|      lat|       long|dockcount| landmar|installation|
+----------+--------------------+---------+-----------+---------+--------+------------+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|San Jose|    8/6/2013|
|         3|San Jose Civic Ce...|37.330698|-121.888979|       15|San Jose|    8/5/2013|
|         4|Santa Clara at Al...|37.333988|-121.894902|       11|San Jose|    8/6/2013|
|         5|    Adobe on Almaden|37.331415|  -121.8932|       19|San Jose|    8/5/2013|
|         6|    San Pedro Square|37.336721|-121.894074|       15|San Jose|    8/7/2013|
+----------+--------------------+---------+-----------+---------+--------+------------+
only showing top 5 rows

つぎは sqlの記述で、項目の選択・集約関数・結合をやってみて記事にする。