Posted at

Spark SQLサンプルアプリの実行

More than 3 years have passed since last update.

Spark SQLではDataFrameと呼ばれる抽象的なデータ構造(RDBのテーブルのように行と名前とデータ型が付与された列の概念を持つデータ構造)を用いる。DataFrameはRDD、HIVEテーブル、他のデータソース(ファイルなど)から生成できる。

-DataFrameによる操作

-テーブル形式のデータセットに対してクエリを発行

http://spark.apache.org/docs/latest/sql-programming-guide.html

https://spark.apache.org/docs/1.5.2/api/python/pyspark.sql.html


SparkSQLサンプルアプリの実行

Spark入門の6章に記載されているプログラムをScalaではなくPython3を使って書き直す。

csv形式のデザートメニュー(メニューID、メニューの名前、値段、カロリー)を使用して、ソートや集計などの処理を行う。


SparkSQLの準備

SparkSQLではメタデータをメタストアで管理し、メタストアはHiveのメタストアを利用するので、hive-site.xmlでメタストアの設定を行う。

以下の例ではテーブルの実体となるファイルを配置するディレクトリにhdfsを使用。


/opt/spark/conf/hive-site.xml

<configuration>

<property>
<name>hive.metastore.warehouse.dir</name>
<value>hdfs:///user/y_tadayasu/data/metastore</value>
</property>
</configuration>

データをHDFS上に配置

$ hdfs dfs -put Chapter6/data/dessert-* /user/y_tadayasu/data/


Spark SQLの初期化処理

Spark SQLを利用するためには、SparkContextに加えてSQLContextが必要。SQLContextはDataFrameの作成やテーブルとしてDataFrameを登録、テーブルを超えたSQLの実行、キャッシュテーブル、そしてperquetファイルの読み込みに利用される。

SQLContextは以下のようにして初期化できる。

>>> from pyspark.sql import SQLContext

>>> sqlContext = SQLContext(sc)


RDDからDataFrameを生成

RDDからDataFrameを生成するには、RDDが持つデータにスキーマ情報を付与する必要がある。Spark SQLはデータタイプを推測することにより、RowオブジェクトのRDDをDataFrameに変換することが可能である。Rowはkey/valueペアのリストを経由して構成される。keyはテーブルのカラム名として定義され、最初の行を見ることでタイプが推測される。そのため最初の行はとても重要で、データに間違いがあると推測も間違ってしまう。将来のバージョンではより複雑な推測方法を計画しており、より多くのデータを使って推測できるようになる。

テキストファイルをロードし、DataFrameで扱えるようにするために、それぞれの行をRowに変換する。

lines = sc.textFile("hdfs:///user/y_tadayasu/data/dessert-menu.csv").map(lambda l: l.split(","))

dessertMenu = lines.map(lambda p: Row(menuId=p[0], name=p[1],price=int(p[2]),kcal=int(p[3])))

テキストをロードし、","でスプリットしたRDDのデータ

['D-0', 'チョコレートパフェ', '490', '420']                                     

['D-1', 'プリンパフェ', '530', '380']
['D-2', 'いちごパフェ', '520', '320']
・・・{省略}・・・

Rowに変換したデータ

Row(calorie=420, menuId='D-0', menuName='チョコレートパフェ', price=490)        

Row(calorie=380, menuId='D-1', menuName='プリンパフェ', price=530)
Row(calorie=320, menuId='D-2', menuName='いちごパフェ', price=520)
・・・{省略}・・・

スキーマを推測し、DataFrameに対して直接クエリを発行することができないため、クエリを発行できるようにするためにDataFrameを一時テーブルに登録する。

# RDDからDataFrameを生成

df = sqlContext.createDataFrame(dessertMenu)
# DataFrameをテンポラリなテーブルとして登録
df.registerTempTable("dessert_table")
# 推測したスキーマ情報を表示
print(df.printSchema())

printSchemaのアウトプットは以下のとおり。nullableという属性は該当の列にnullを許すかどうか。trueの場合はnullが設定されることを許容する。

root

|-- kcal: long (nullable = true)
|-- menuId: string (nullable = true)
|-- name: string (nullable = true)
|-- price: long (nullable = true)


DataFrameにクエリを発行

先ほど登録した一時テーブルに対して、クエリを発行する。

一時テーブルにクエリを発行した時点では実際のデータ処理は行われない。RDDと同じようにアクションを通じて実際にデータ処理が行われる。ここではshowメソッドアクションを通じてDataFrameが表すデータ・セットの内容を表示する。

以下の例では、カロリーが300キロ以上のデザートがいくつあるかカウント。

numOver300KcalDF = sqlContext.sql("SELECT count(*) AS num_of_over_300Kcal FROM dessert_table WHERE kcal >= 300")

print(numOver300KcalDF.show())

+-------------------+                                                           

|num_of_over_300Kcal|
+-------------------+
| 9|
+-------------------+


DataFrame APIでDataFrameを操作

DataFrame APIを利用すると、列の選択や行のフィルタリングなどSQL/HiveQLよりも細かい単位でDataFrameを操作できる。


DataFameの列を選択 selectメソッド

name列とprice列のみ選択

name_price = df.select(df['name'],df['price'])

print(name_price.show())

+----------+-----+

| name|price|
+----------+-----+
| チョコレートパフェ| 490|
| プリンパフェ| 530|
| いちごパフェ| 520|
・・・省略・・・
+----------+-----+
only showing top 20 rows

全ての列を選択するには、selectメソッドに*を指定する。

DataFrame APIでは、列が指定できる箇所には、演算子や関数を用いて式を記述できる。

例として価格をドル表示する。表示は別名としてdollar priceを付ける。Scalaだと普通にasを使えるが、pythonだとaliasメソッドを使うっぽい。表示する際は行数が多いとうっとおしいので今回は4行だけ表示するようにした。

name_price = df.select(df['name'],(df['price']/120.0).alias('dollar price'))

print(name_price.show(4))

+---------+-----------------+

| name| dollar price|
+---------+-----------------+
|チョコレートパフェ|4.083333333333333|
| プリンパフェ|4.416666666666667|
| いちごパフェ|4.333333333333333|
| パンナコッタ| 3.5|
+---------+-----------------+
only showing top 4 rows


フィルタリング where

SQLのwhere句に相当するもの。

over520YenDF = df.where(df["price"] >= 520)

+----+------+-------+-----+

|kcal|menuId| name|price|
+----+------+-------+-----+
| 380| D-1| プリンパフェ| 530|
| 320| D-2| いちごパフェ| 520|
| 288| D-4| チーズムース| 580|
| 251| D-6| ティラミス| 600|
| 650| D-11|紫いものパフェ| 650|
+----+------+-------+-----+

selectと一緒に使用する場合は、whereの後にselectを指定する。先にselectで指定するとnameしかない状態になるので、priceでフィルタリングできなくなる。

name_price_over520YenDF = df.where(df["price"] >= 520).select(df['name'])

+-------+

| name|
+-------+
| プリンパフェ|
| いちごパフェ|
| チーズムース|
| ティラミス|
|紫いものパフェ|
+-------+


ソート orderBy

値段を昇順、カロリーを降順でソートする。

sortedDessertDF = df.orderBy(df['price'].asc(),df['kcal'].desc())

+----+------+----------+-----+

|kcal|menuId| name|price|
+----+------+----------+-----+
| 120| D-15| カスタードプリン| 200|
| 160| D-17| チーズスフレ| 220|
| 248| D-5| アフォガート| 300|
| 220| D-16| チョコトルテ| 330|
・・・{省略}・・・
+----+------+----------+-----+
only showing top 20 rows


集約処理 aggメソッド

平均のカロリーを求める。

agg(*exprs)

利用可能なaggregate機能はavg, max, min, sum, count.

exprs – a dict mapping from column name (string) to aggregate functions (string), or a list of Column.

avgKcalDF = df.agg({"kcal":"avg"})

+-----------------+                                                             

| avg(kcal)|
+-----------------+
|291.7826086956522|
+-----------------+


グループ毎に集約処理を行う groupBy(cols) + agg(exprs)

同じ価格帯にグループ化し、価格帯毎のメニューの数を数える。

python3では/で除算すると小数点が含まれるようになったので、小数点を切り捨てるには//を使用する必要がある。

numPerPriceRangeDF = df.groupBy((df['price'] // 100).alias("price_range")).agg({"price":"count"})

//を使用すると以下のようにエラーが発生。//以外の演算子だとエラーがでないので、どうやら//サポートされてないようである。

#時間を見つけて後で調べる。

$ spark-submit --master yarn-client chap6-1.py 

Traceback (most recent call last):
File "/home/y_tadayasu/spark-sample-program/chap6-1.py", line 40, in <module>
numPerPriceRangeDF = df.groupBy((df['price'] // 100).alias("price_range")).agg({"price":"count"})
TypeError: unsupported operand type(s) for //: 'Column' and 'int'

//の代わりに/を使うと以下のように集約処理を行うことができる。

+-----------+------------+                                                      

|price_range|count(price)|
+-----------+------------+
| 2.2| 1|
| 3.4| 1|
・・・{省略}・・・


DataFrame同士の結合 join(other, on=None, how=None)

2つ以上のDataFrameをキーで結合する。

Joins with another DataFrame, using the given join expression.

lines_order = sc.textFile("hdfs:///user/y_tadayasu/data/dessert-order.csv").map(lambda l: l.split(","))

dessertOrder = lines_order.map(lambda p: Row(sId=p[0], menuId=p[1],num=int(p[2])))

df2 = sqlContext.createDataFrame(dessertOrder)

amntPerMenuPerSlipDF = df.join(df2, df.menuId == df2.menuId,'inner').select(df2.sId,df.name,(df2.num*df.price).alia
s('amount_per_menu_per_slip'))

+-----+---------+------------------------+                                      

| sId| name|amount_per_menu_per_slip|
+-----+---------+------------------------+
|SID-1| クリームあんみつ| 2000|
|SID-2| チーズケーキ| 400|
|SID-0|チョコレートパフェ| 980|
|SID-0| パンナコッタ| 420|
|SID-2| アフォガート| 300|
|SID-2| バニラジェラート| 360|
+-----+---------+------------------------+


UDF(User Defined Functions)の利用 registerFunctionメソッド ※Scalaだとregister

UDFはSparkSQLに機能を追加するシンプルな方法。

registerFunction(name, f, returnType=StringType)[source]

Parameters:

name – name of the UDF

samplingRatio – lambda function

returnType – a DataType object

使用する際はregisterFunctionの最初のパラメータで指定した名前を使う。

# -*- coding:utf-8 -*-

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import IntegerType
sc = SparkContext(appName="Example")
sqlContext = SQLContext(sc)
sqlContext.udf.register("strlen",lambda x:len(x),IntegerType())
print(sqlContext.sql("SELECT strlen('Hello')").show())

+---+

|_c0|
+---+
| 5|
+---+

Spark1.5系からUDAFが使えるようになったらしいが、pythonではunsupportedとのこと。

HiveのUTFも利用できるみたいであるが、そもそもHiveをよく知らないので今回はパス。


構造化データの処理


DataFrameの内容を書き出す

DataFrameWriterを利用することにより、DataFrameの内容を書き出すことができる。DataFrameWriterはDataFrameのwriteメソッドで取得できる。

Interface for saving the content of the DataFrame out into external storage.

format(source)

Specifies the underlying output data source.

Parameters: source – string, name of the data source, e.g. ‘json’, ‘parquet’.

save(path=None, format=None, mode=None, partitionBy=None, **options)

Saves the contents of the DataFrame to a data source.

saveメソッドでファイルに書き出すことができる。

df.write.format('json').save("result.json")

ファイル名だけ指定するとhdfs上のホーム直下に出力されるようである。

$ hdfs dfs -ls /user/y_tadayasu

Found 3 items
drwxr-xr-x - y_tadayasu y_tadayasu 0 2015-12-15 02:13 /user/y_tadayasu/.sparkStaging
drwxr-xr-x - y_tadayasu y_tadayasu 0 2015-12-10 03:03 /user/y_tadayasu/data
drwxr-xr-x - y_tadayasu y_tadayasu 0 2015-12-15 02:13 /user/y_tadayasu/result.json

$ hdfs dfs -cat /user/y_tadayasu/result.json/part*

{"kcal":420,"menuId":"D-0","name":"チョコレートパフェ","price":490}
{"kcal":380,"menuId":"D-1","name":"プリンパフェ","price":530}
{"kcal":320,"menuId":"D-2","name":"いちごパフェ","price":520}
・・・{省略}・・・


書きだしたファイルから別のDataFrameを生成

DataFrameReaderを利用してデータセットからDataFrameを生成する。

DataFarmeReaderのloadメソッドにより、ファイルに対応付けられたDataFrameが返される。

注)loadメソッドを呼び出した時点ではスキーマ情報の生成に関するデータは読みこむがデータ全体の読み込みは行われていない。

df3 = sqlContext.read.format('json').load('result.json')

print(df3.show(3))

+----+------+---------+-----+                                                   

|kcal|menuId| name|price|
+----+------+---------+-----+
| 420| D-0|チョコレートパフェ| 490|
| 380| D-1| プリンパフェ| 530|
| 320| D-2| いちごパフェ| 520|
+----+------+---------+-----+
only showing top 3 rows


Spark SQLの独自テーブルに書き出す。

df.write.format('parquet').saveAsTable("result_tbl")

以下のようなエラーが発生。SQLContextだとテンポラリなテーブルが作成されるので、代わりにHiveContextを使用しろとのこと。pysparkなどでインタラクティブに実行したときは表示されない。

・・・{省略}・・・

py4j.protocol.Py4JJavaError: An error occurred while calling o115.saveAsTable.
: java.lang.RuntimeException: Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.
・・・{省略}・・・

以下のエラー?Messageが出力される。

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

どうやって作成されたテーブルを確認するかは今度調べる。