Edited at

Sparkを使って分析アプリを実現(pyspark/sparksql)

More than 1 year has passed since last update.


実行環境

macOS 10.13.1 (High Sierra)

Spark 2.2.0

Python 2.7.12


Spark インストール

mac os で brew install apache-spark と撃ちこむ。

windows / linux でもモジュールをダウンロードして展開するだけ。

# データの準備

今回は BAY AREA BikeShare のオープンデータを利用

curl コマンドで取得し、unzip

curl -O https://s3.amazonaws.com/babs-open-data/babs_open_data_year_2.zip

unzip babs_open_data_year_2.zip

ホームページ

http://www.bayareabikeshare.com/open-data/


sparkシェルの実行

pyspark で起動する。

~/dev/spark $ pyspark 

Python 2.7.12 |Continuum Analytics, Inc.| (default, Jul 2 2016, 17:43:17)
[GCC 4.2.1 (Based on Apple Inc. build 5658) (LLVM build 2336.11.00)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Anaconda is brought to you by Continuum Analytics.
Please check out: http://continuum.io/thanks and https://anaconda.org
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/12/17 21:49:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/12/17 21:49:22 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.2.0
/_/

Using Python version 2.7.12 (default, Jul 2 2016 17:43:17)
SparkSession available as 'spark'.
>>>

CSVファイルを読み込ませる。

>>> from pyspark.sql.types import *

>>> df = spark.read.csv('201508_station_data.csv')

このまま df.show(5) と実行すると5行表示する。簡単に取得できる。

>>> df.show(5)

+----------+--------------------+---------+-----------+---------+--------+------------+
| _c0| _c1| _c2| _c3| _c4| _c5| _c6|
+----------+--------------------+---------+-----------+---------+--------+------------+
|station_id| name| lat| long|dockcount|landmark|installation|
| 2|San Jose Diridon ...|37.329732|-121.901782| 27|San Jose| 8/6/2013|
| 3|San Jose Civic Ce...|37.330698|-121.888979| 15|San Jose| 8/5/2013|
| 4|Santa Clara at Al...|37.333988|-121.894902| 11|San Jose| 8/6/2013|
| 5| Adobe on Almaden|37.331415| -121.8932| 19|San Jose| 8/5/2013|
+----------+--------------------+---------+-----------+---------+--------+------------+
only showing top 5 rows

カラム名が _c0 などとなってしまうので、カラム定義を行って再度読み込ませてみる。

from pyspark.sql.types import *

struct = StructType([
StructField('station_id', IntegerType(), False),
StructField('name', StringType(), False),
StructField('lat', DoubleType(), False),
StructField('long', DoubleType(), False),
StructField('dockcount', IntegerType(), False),
StructField('landmar', StringType(), False),
StructField('installation', StringType(), False)
])

df = spark.read.csv('201508_station_data.csv', header='true', schema=struct)

CSV取込みは pyspark.sql.DataFrameReader を利用している。header 行のオプションは下記を参照した。[https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader]

実行結果

>>> df.show(5)

+----------+--------------------+---------+-----------+---------+--------+------------+
|station_id| name| lat| long|dockcount| landmar|installation|
+----------+--------------------+---------+-----------+---------+--------+------------+
| 2|San Jose Diridon ...|37.329732|-121.901782| 27|San Jose| 8/6/2013|
| 3|San Jose Civic Ce...|37.330698|-121.888979| 15|San Jose| 8/5/2013|
| 4|Santa Clara at Al...|37.333988|-121.894902| 11|San Jose| 8/6/2013|
| 5| Adobe on Almaden|37.331415| -121.8932| 19|San Jose| 8/5/2013|
| 6| San Pedro Square|37.336721|-121.894074| 15|San Jose| 8/7/2013|
+----------+--------------------+---------+-----------+---------+--------+------------+
only showing top 5 rows

つぎは sqlの記述で、項目の選択・集約関数・結合をやってみて記事にする。