Spark SQLではDataFrameと呼ばれる抽象的なデータ構造(RDBのテーブルのように行と名前とデータ型が付与された列の概念を持つデータ構造)を用いる。DataFrameはRDD、HIVEテーブル、他のデータソース(ファイルなど)から生成できる。
-DataFrameによる操作
-テーブル形式のデータセットに対してクエリを発行
http://spark.apache.org/docs/latest/sql-programming-guide.html
https://spark.apache.org/docs/1.5.2/api/python/pyspark.sql.html
SparkSQLサンプルアプリの実行
Spark入門の6章に記載されているプログラムをScalaではなくPython3を使って書き直す。
csv形式のデザートメニュー(メニューID、メニューの名前、値段、カロリー)を使用して、ソートや集計などの処理を行う。
SparkSQLの準備
SparkSQLではメタデータをメタストアで管理し、メタストアはHiveのメタストアを利用するので、hive-site.xmlでメタストアの設定を行う。
以下の例ではテーブルの実体となるファイルを配置するディレクトリにhdfsを使用。
<configuration>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>hdfs:///user/y_tadayasu/data/metastore</value>
</property>
</configuration>
データをHDFS上に配置
$ hdfs dfs -put Chapter6/data/dessert-* /user/y_tadayasu/data/
Spark SQLの初期化処理
Spark SQLを利用するためには、SparkContextに加えてSQLContextが必要。SQLContextはDataFrameの作成やテーブルとしてDataFrameを登録、テーブルを超えたSQLの実行、キャッシュテーブル、そしてperquetファイルの読み込みに利用される。
SQLContextは以下のようにして初期化できる。
>>> from pyspark.sql import SQLContext
>>> sqlContext = SQLContext(sc)
##RDDからDataFrameを生成
RDDからDataFrameを生成するには、RDDが持つデータにスキーマ情報を付与する必要がある。Spark SQLはデータタイプを推測することにより、RowオブジェクトのRDDをDataFrameに変換することが可能である。Rowはkey/valueペアのリストを経由して構成される。keyはテーブルのカラム名として定義され、最初の行を見ることでタイプが推測される。そのため最初の行はとても重要で、データに間違いがあると推測も間違ってしまう。将来のバージョンではより複雑な推測方法を計画しており、より多くのデータを使って推測できるようになる。
テキストファイルをロードし、DataFrameで扱えるようにするために、それぞれの行をRowに変換する。
lines = sc.textFile("hdfs:///user/y_tadayasu/data/dessert-menu.csv").map(lambda l: l.split(","))
dessertMenu = lines.map(lambda p: Row(menuId=p[0], name=p[1],price=int(p[2]),kcal=int(p[3])))
テキストをロードし、","でスプリットしたRDDのデータ
['D-0', 'チョコレートパフェ', '490', '420']
['D-1', 'プリンパフェ', '530', '380']
['D-2', 'いちごパフェ', '520', '320']
・・・{省略}・・・
Rowに変換したデータ
Row(calorie=420, menuId='D-0', menuName='チョコレートパフェ', price=490)
Row(calorie=380, menuId='D-1', menuName='プリンパフェ', price=530)
Row(calorie=320, menuId='D-2', menuName='いちごパフェ', price=520)
・・・{省略}・・・
スキーマを推測し、DataFrameに対して直接クエリを発行することができないため、クエリを発行できるようにするためにDataFrameを一時テーブルに登録する。
# RDDからDataFrameを生成
df = sqlContext.createDataFrame(dessertMenu)
# DataFrameをテンポラリなテーブルとして登録
df.registerTempTable("dessert_table")
# 推測したスキーマ情報を表示
print(df.printSchema())
printSchemaのアウトプットは以下のとおり。nullableという属性は該当の列にnullを許すかどうか。trueの場合はnullが設定されることを許容する。
root
|-- kcal: long (nullable = true)
|-- menuId: string (nullable = true)
|-- name: string (nullable = true)
|-- price: long (nullable = true)
DataFrameにクエリを発行
先ほど登録した一時テーブルに対して、クエリを発行する。
一時テーブルにクエリを発行した時点では実際のデータ処理は行われない。RDDと同じようにアクションを通じて実際にデータ処理が行われる。ここではshowメソッドアクションを通じてDataFrameが表すデータ・セットの内容を表示する。
以下の例では、カロリーが300キロ以上のデザートがいくつあるかカウント。
numOver300KcalDF = sqlContext.sql("SELECT count(*) AS num_of_over_300Kcal FROM dessert_table WHERE kcal >= 300")
print(numOver300KcalDF.show())
+-------------------+
|num_of_over_300Kcal|
+-------------------+
| 9|
+-------------------+
DataFrame APIでDataFrameを操作
DataFrame APIを利用すると、列の選択や行のフィルタリングなどSQL/HiveQLよりも細かい単位でDataFrameを操作できる。
DataFameの列を選択 selectメソッド
name列とprice列のみ選択
name_price = df.select(df['name'],df['price'])
print(name_price.show())
+----------+-----+
| name|price|
+----------+-----+
| チョコレートパフェ| 490|
| プリンパフェ| 530|
| いちごパフェ| 520|
・・・省略・・・
+----------+-----+
only showing top 20 rows
全ての列を選択するには、selectメソッドに*を指定する。
DataFrame APIでは、列が指定できる箇所には、演算子や関数を用いて式を記述できる。
例として価格をドル表示する。表示は別名としてdollar priceを付ける。Scalaだと普通にasを使えるが、pythonだとaliasメソッドを使うっぽい。表示する際は行数が多いとうっとおしいので今回は4行だけ表示するようにした。
name_price = df.select(df['name'],(df['price']/120.0).alias('dollar price'))
print(name_price.show(4))
+---------+-----------------+
| name| dollar price|
+---------+-----------------+
|チョコレートパフェ|4.083333333333333|
| プリンパフェ|4.416666666666667|
| いちごパフェ|4.333333333333333|
| パンナコッタ| 3.5|
+---------+-----------------+
only showing top 4 rows
フィルタリング where
SQLのwhere句に相当するもの。
over520YenDF = df.where(df["price"] >= 520)
+----+------+-------+-----+
|kcal|menuId| name|price|
+----+------+-------+-----+
| 380| D-1| プリンパフェ| 530|
| 320| D-2| いちごパフェ| 520|
| 288| D-4| チーズムース| 580|
| 251| D-6| ティラミス| 600|
| 650| D-11|紫いものパフェ| 650|
+----+------+-------+-----+
selectと一緒に使用する場合は、whereの後にselectを指定する。先にselectで指定するとnameしかない状態になるので、priceでフィルタリングできなくなる。
name_price_over520YenDF = df.where(df["price"] >= 520).select(df['name'])
+-------+
| name|
+-------+
| プリンパフェ|
| いちごパフェ|
| チーズムース|
| ティラミス|
|紫いものパフェ|
+-------+
ソート orderBy
値段を昇順、カロリーを降順でソートする。
sortedDessertDF = df.orderBy(df['price'].asc(),df['kcal'].desc())
+----+------+----------+-----+
|kcal|menuId| name|price|
+----+------+----------+-----+
| 120| D-15| カスタードプリン| 200|
| 160| D-17| チーズスフレ| 220|
| 248| D-5| アフォガート| 300|
| 220| D-16| チョコトルテ| 330|
・・・{省略}・・・
+----+------+----------+-----+
only showing top 20 rows
###集約処理 aggメソッド
平均のカロリーを求める。
agg(*exprs)
利用可能なaggregate機能はavg, max, min, sum, count.
exprs – a dict mapping from column name (string) to aggregate functions (string), or a list of Column.
avgKcalDF = df.agg({"kcal":"avg"})
+-----------------+
| avg(kcal)|
+-----------------+
|291.7826086956522|
+-----------------+
グループ毎に集約処理を行う groupBy(*cols) + agg(*exprs)
同じ価格帯にグループ化し、価格帯毎のメニューの数を数える。
python3では/で除算すると小数点が含まれるようになったので、小数点を切り捨てるには//を使用する必要がある。
numPerPriceRangeDF = df.groupBy((df['price'] // 100).alias("price_range")).agg({"price":"count"})
//を使用すると以下のようにエラーが発生。//以外の演算子だとエラーがでないので、どうやら//サポートされてないようである。
#時間を見つけて後で調べる。
$ spark-submit --master yarn-client chap6-1.py
Traceback (most recent call last):
File "/home/y_tadayasu/spark-sample-program/chap6-1.py", line 40, in <module>
numPerPriceRangeDF = df.groupBy((df['price'] // 100).alias("price_range")).agg({"price":"count"})
TypeError: unsupported operand type(s) for //: 'Column' and 'int'
//の代わりに/を使うと以下のように集約処理を行うことができる。
+-----------+------------+
|price_range|count(price)|
+-----------+------------+
| 2.2| 1|
| 3.4| 1|
・・・{省略}・・・
DataFrame同士の結合 join(other, on=None, how=None)
2つ以上のDataFrameをキーで結合する。
Joins with another DataFrame, using the given join expression.
lines_order = sc.textFile("hdfs:///user/y_tadayasu/data/dessert-order.csv").map(lambda l: l.split(","))
dessertOrder = lines_order.map(lambda p: Row(sId=p[0], menuId=p[1],num=int(p[2])))
df2 = sqlContext.createDataFrame(dessertOrder)
amntPerMenuPerSlipDF = df.join(df2, df.menuId == df2.menuId,'inner').select(df2.sId,df.name,(df2.num*df.price).alia
s('amount_per_menu_per_slip'))
+-----+---------+------------------------+
| sId| name|amount_per_menu_per_slip|
+-----+---------+------------------------+
|SID-1| クリームあんみつ| 2000|
|SID-2| チーズケーキ| 400|
|SID-0|チョコレートパフェ| 980|
|SID-0| パンナコッタ| 420|
|SID-2| アフォガート| 300|
|SID-2| バニラジェラート| 360|
+-----+---------+------------------------+
UDF(User Defined Functions)の利用 registerFunctionメソッド ※Scalaだとregister
UDFはSparkSQLに機能を追加するシンプルな方法。
registerFunction(name, f, returnType=StringType)[source]
Parameters:
name – name of the UDF
samplingRatio – lambda function
returnType – a DataType object
使用する際はregisterFunctionの最初のパラメータで指定した名前を使う。
# -*- coding:utf-8 -*-
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import IntegerType
sc = SparkContext(appName="Example")
sqlContext = SQLContext(sc)
sqlContext.udf.register("strlen",lambda x:len(x),IntegerType())
print(sqlContext.sql("SELECT strlen('Hello')").show())
+---+
|_c0|
+---+
| 5|
+---+
Spark1.5系からUDAFが使えるようになったらしいが、pythonではunsupportedとのこと。
HiveのUTFも利用できるみたいであるが、そもそもHiveをよく知らないので今回はパス。
構造化データの処理
DataFrameの内容を書き出す
DataFrameWriterを利用することにより、DataFrameの内容を書き出すことができる。DataFrameWriterはDataFrameのwriteメソッドで取得できる。
Interface for saving the content of the DataFrame out into external storage.
format(source)
Specifies the underlying output data source.
Parameters: source – string, name of the data source, e.g. ‘json’, ‘parquet’.
save(path=None, format=None, mode=None, partitionBy=None, **options)
Saves the contents of the DataFrame to a data source.
saveメソッドでファイルに書き出すことができる。
df.write.format('json').save("result.json")
ファイル名だけ指定するとhdfs上のホーム直下に出力されるようである。
$ hdfs dfs -ls /user/y_tadayasu
Found 3 items
drwxr-xr-x - y_tadayasu y_tadayasu 0 2015-12-15 02:13 /user/y_tadayasu/.sparkStaging
drwxr-xr-x - y_tadayasu y_tadayasu 0 2015-12-10 03:03 /user/y_tadayasu/data
drwxr-xr-x - y_tadayasu y_tadayasu 0 2015-12-15 02:13 /user/y_tadayasu/result.json
$ hdfs dfs -cat /user/y_tadayasu/result.json/part*
{"kcal":420,"menuId":"D-0","name":"チョコレートパフェ","price":490}
{"kcal":380,"menuId":"D-1","name":"プリンパフェ","price":530}
{"kcal":320,"menuId":"D-2","name":"いちごパフェ","price":520}
・・・{省略}・・・
書きだしたファイルから別のDataFrameを生成
DataFrameReaderを利用してデータセットからDataFrameを生成する。
DataFarmeReaderのloadメソッドにより、ファイルに対応付けられたDataFrameが返される。
注)loadメソッドを呼び出した時点ではスキーマ情報の生成に関するデータは読みこむがデータ全体の読み込みは行われていない。
df3 = sqlContext.read.format('json').load('result.json')
print(df3.show(3))
+----+------+---------+-----+
|kcal|menuId| name|price|
+----+------+---------+-----+
| 420| D-0|チョコレートパフェ| 490|
| 380| D-1| プリンパフェ| 530|
| 320| D-2| いちごパフェ| 520|
+----+------+---------+-----+
only showing top 3 rows
Spark SQLの独自テーブルに書き出す。
df.write.format('parquet').saveAsTable("result_tbl")
以下のようなエラーが発生。SQLContextだとテンポラリなテーブルが作成されるので、代わりにHiveContextを使用しろとのこと。pysparkなどでインタラクティブに実行したときは表示されない。
・・・{省略}・・・
py4j.protocol.Py4JJavaError: An error occurred while calling o115.saveAsTable.
: java.lang.RuntimeException: Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.
・・・{省略}・・・
以下のエラー?Messageが出力される。
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
どうやって作成されたテーブルを確認するかは今度調べる。