実行環境
macOS 10.13.1 (High Sierra)
Spark 2.2.0
Python 2.7.12
Spark インストール
mac os で brew install apache-spark
と撃ちこむ。
windows / linux でもモジュールをダウンロードして展開するだけ。
# データの準備
今回は BAY AREA BikeShare のオープンデータを利用
curl コマンドで取得し、unzip
curl -O https://s3.amazonaws.com/babs-open-data/babs_open_data_year_2.zip
unzip babs_open_data_year_2.zip
ホームページ
http://www.bayareabikeshare.com/open-data/
sparkシェルの実行
pyspark
で起動する。
~/dev/spark $ pyspark
Python 2.7.12 |Continuum Analytics, Inc.| (default, Jul 2 2016, 17:43:17)
[GCC 4.2.1 (Based on Apple Inc. build 5658) (LLVM build 2336.11.00)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Anaconda is brought to you by Continuum Analytics.
Please check out: http://continuum.io/thanks and https://anaconda.org
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/12/17 21:49:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/12/17 21:49:22 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.2.0
/_/
Using Python version 2.7.12 (default, Jul 2 2016 17:43:17)
SparkSession available as 'spark'.
>>>
CSVファイルを読み込ませる。
>>> from pyspark.sql.types import *
>>> df = spark.read.csv('201508_station_data.csv')
このまま df.show(5) と実行すると5行表示する。簡単に取得できる。
>>> df.show(5)
+----------+--------------------+---------+-----------+---------+--------+------------+
| _c0| _c1| _c2| _c3| _c4| _c5| _c6|
+----------+--------------------+---------+-----------+---------+--------+------------+
|station_id| name| lat| long|dockcount|landmark|installation|
| 2|San Jose Diridon ...|37.329732|-121.901782| 27|San Jose| 8/6/2013|
| 3|San Jose Civic Ce...|37.330698|-121.888979| 15|San Jose| 8/5/2013|
| 4|Santa Clara at Al...|37.333988|-121.894902| 11|San Jose| 8/6/2013|
| 5| Adobe on Almaden|37.331415| -121.8932| 19|San Jose| 8/5/2013|
+----------+--------------------+---------+-----------+---------+--------+------------+
only showing top 5 rows
カラム名が _c0 などとなってしまうので、カラム定義を行って再度読み込ませてみる。
from pyspark.sql.types import *
struct = StructType([
StructField('station_id', IntegerType(), False),
StructField('name', StringType(), False),
StructField('lat', DoubleType(), False),
StructField('long', DoubleType(), False),
StructField('dockcount', IntegerType(), False),
StructField('landmar', StringType(), False),
StructField('installation', StringType(), False)
])
df = spark.read.csv('201508_station_data.csv', header='true', schema=struct)
CSV取込みは pyspark.sql.DataFrameReader を利用している。header 行のオプションは下記を参照した。[https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader]
実行結果
>>> df.show(5)
+----------+--------------------+---------+-----------+---------+--------+------------+
|station_id| name| lat| long|dockcount| landmar|installation|
+----------+--------------------+---------+-----------+---------+--------+------------+
| 2|San Jose Diridon ...|37.329732|-121.901782| 27|San Jose| 8/6/2013|
| 3|San Jose Civic Ce...|37.330698|-121.888979| 15|San Jose| 8/5/2013|
| 4|Santa Clara at Al...|37.333988|-121.894902| 11|San Jose| 8/6/2013|
| 5| Adobe on Almaden|37.331415| -121.8932| 19|San Jose| 8/5/2013|
| 6| San Pedro Square|37.336721|-121.894074| 15|San Jose| 8/7/2013|
+----------+--------------------+---------+-----------+---------+--------+------------+
only showing top 5 rows
つぎは sqlの記述で、項目の選択・集約関数・結合をやってみて記事にする。