Spark SQLサンプルアプリの実行

  • 26
    いいね
  • 0
    コメント
この記事は最終更新日から1年以上が経過しています。

Spark SQLではDataFrameと呼ばれる抽象的なデータ構造(RDBのテーブルのように行と名前とデータ型が付与された列の概念を持つデータ構造)を用いる。DataFrameはRDD、HIVEテーブル、他のデータソース(ファイルなど)から生成できる。
-DataFrameによる操作
-テーブル形式のデータセットに対してクエリを発行

http://spark.apache.org/docs/latest/sql-programming-guide.html
https://spark.apache.org/docs/1.5.2/api/python/pyspark.sql.html

SparkSQLサンプルアプリの実行

Spark入門の6章に記載されているプログラムをScalaではなくPython3を使って書き直す。
csv形式のデザートメニュー(メニューID、メニューの名前、値段、カロリー)を使用して、ソートや集計などの処理を行う。

SparkSQLの準備

SparkSQLではメタデータをメタストアで管理し、メタストアはHiveのメタストアを利用するので、hive-site.xmlでメタストアの設定を行う。
以下の例ではテーブルの実体となるファイルを配置するディレクトリにhdfsを使用。

/opt/spark/conf/hive-site.xml
<configuration>
   <property>
      <name>hive.metastore.warehouse.dir</name>
      <value>hdfs:///user/y_tadayasu/data/metastore</value>
   </property>
</configuration>

データをHDFS上に配置

$ hdfs dfs -put Chapter6/data/dessert-* /user/y_tadayasu/data/

Spark SQLの初期化処理

Spark SQLを利用するためには、SparkContextに加えてSQLContextが必要。SQLContextはDataFrameの作成やテーブルとしてDataFrameを登録、テーブルを超えたSQLの実行、キャッシュテーブル、そしてperquetファイルの読み込みに利用される。
SQLContextは以下のようにして初期化できる。

>>> from pyspark.sql import SQLContext
>>> sqlContext = SQLContext(sc)

RDDからDataFrameを生成

RDDからDataFrameを生成するには、RDDが持つデータにスキーマ情報を付与する必要がある。Spark SQLはデータタイプを推測することにより、RowオブジェクトのRDDをDataFrameに変換することが可能である。Rowはkey/valueペアのリストを経由して構成される。keyはテーブルのカラム名として定義され、最初の行を見ることでタイプが推測される。そのため最初の行はとても重要で、データに間違いがあると推測も間違ってしまう。将来のバージョンではより複雑な推測方法を計画しており、より多くのデータを使って推測できるようになる。

テキストファイルをロードし、DataFrameで扱えるようにするために、それぞれの行をRowに変換する。

lines = sc.textFile("hdfs:///user/y_tadayasu/data/dessert-menu.csv").map(lambda l: l.split(","))

dessertMenu = lines.map(lambda p: Row(menuId=p[0], name=p[1],price=int(p[2]),kcal=int(p[3])))

テキストをロードし、","でスプリットしたRDDのデータ

['D-0', 'チョコレートパフェ', '490', '420']                                     
['D-1', 'プリンパフェ', '530', '380']
['D-2', 'いちごパフェ', '520', '320']
・・・{省略}・・・

Rowに変換したデータ

Row(calorie=420, menuId='D-0', menuName='チョコレートパフェ', price=490)        
Row(calorie=380, menuId='D-1', menuName='プリンパフェ', price=530)
Row(calorie=320, menuId='D-2', menuName='いちごパフェ', price=520)
・・・{省略}・・・

スキーマを推測し、DataFrameに対して直接クエリを発行することができないため、クエリを発行できるようにするためにDataFrameを一時テーブルに登録する。

# RDDからDataFrameを生成
df = sqlContext.createDataFrame(dessertMenu)
# DataFrameをテンポラリなテーブルとして登録
df.registerTempTable("dessert_table")
# 推測したスキーマ情報を表示
print(df.printSchema())

printSchemaのアウトプットは以下のとおり。nullableという属性は該当の列にnullを許すかどうか。trueの場合はnullが設定されることを許容する。

root
 |-- kcal: long (nullable = true)
 |-- menuId: string (nullable = true)
 |-- name: string (nullable = true)
 |-- price: long (nullable = true)

DataFrameにクエリを発行

先ほど登録した一時テーブルに対して、クエリを発行する。
一時テーブルにクエリを発行した時点では実際のデータ処理は行われない。RDDと同じようにアクションを通じて実際にデータ処理が行われる。ここではshowメソッドアクションを通じてDataFrameが表すデータ・セットの内容を表示する。

以下の例では、カロリーが300キロ以上のデザートがいくつあるかカウント。

numOver300KcalDF = sqlContext.sql("SELECT count(*) AS num_of_over_300Kcal FROM dessert_table WHERE kcal >= 300")

print(numOver300KcalDF.show())
+-------------------+                                                           
|num_of_over_300Kcal|
+-------------------+
|                  9|
+-------------------+

DataFrame APIでDataFrameを操作

DataFrame APIを利用すると、列の選択や行のフィルタリングなどSQL/HiveQLよりも細かい単位でDataFrameを操作できる。

DataFameの列を選択 selectメソッド

name列とprice列のみ選択

name_price = df.select(df['name'],df['price'])
print(name_price.show())
+----------+-----+
|      name|price|
+----------+-----+
| チョコレートパフェ|  490|
|    プリンパフェ|  530|
|    いちごパフェ|  520|
・・・省略・・・
+----------+-----+
only showing top 20 rows

全ての列を選択するには、selectメソッドに*を指定する。

DataFrame APIでは、列が指定できる箇所には、演算子や関数を用いて式を記述できる。
例として価格をドル表示する。表示は別名としてdollar priceを付ける。Scalaだと普通にasを使えるが、pythonだとaliasメソッドを使うっぽい。表示する際は行数が多いとうっとおしいので今回は4行だけ表示するようにした。

name_price = df.select(df['name'],(df['price']/120.0).alias('dollar price'))

print(name_price.show(4))
+---------+-----------------+
|     name|     dollar price|
+---------+-----------------+
|チョコレートパフェ|4.083333333333333|
|   プリンパフェ|4.416666666666667|
|   いちごパフェ|4.333333333333333|
|   パンナコッタ|              3.5|
+---------+-----------------+
only showing top 4 rows

フィルタリング where

SQLのwhere句に相当するもの。

over520YenDF = df.where(df["price"] >= 520)
+----+------+-------+-----+
|kcal|menuId|   name|price|
+----+------+-------+-----+
| 380|   D-1| プリンパフェ|  530|
| 320|   D-2| いちごパフェ|  520|
| 288|   D-4| チーズムース|  580|
| 251|   D-6|  ティラミス|  600|
| 650|  D-11|紫いものパフェ|  650|
+----+------+-------+-----+

selectと一緒に使用する場合は、whereの後にselectを指定する。先にselectで指定するとnameしかない状態になるので、priceでフィルタリングできなくなる。

name_price_over520YenDF = df.where(df["price"] >= 520).select(df['name'])
+-------+
|   name|
+-------+
| プリンパフェ|
| いちごパフェ|
| チーズムース|
|  ティラミス|
|紫いものパフェ|
+-------+

ソート orderBy

値段を昇順、カロリーを降順でソートする。

sortedDessertDF = df.orderBy(df['price'].asc(),df['kcal'].desc())
+----+------+----------+-----+
|kcal|menuId|      name|price|
+----+------+----------+-----+
| 120|  D-15|  カスタードプリン|  200|
| 160|  D-17|    チーズスフレ|  220|
| 248|   D-5|    アフォガート|  300|
| 220|  D-16|    チョコトルテ|  330|
・・・{省略}・・・
+----+------+----------+-----+
only showing top 20 rows

集約処理 aggメソッド

平均のカロリーを求める。
agg(*exprs)
利用可能なaggregate機能はavg, max, min, sum, count.
exprs – a dict mapping from column name (string) to aggregate functions (string), or a list of Column.

avgKcalDF = df.agg({"kcal":"avg"})
+-----------------+                                                             
|        avg(kcal)|
+-----------------+
|291.7826086956522|
+-----------------+

グループ毎に集約処理を行う groupBy(cols) + agg(exprs)

同じ価格帯にグループ化し、価格帯毎のメニューの数を数える。
python3では/で除算すると小数点が含まれるようになったので、小数点を切り捨てるには//を使用する必要がある。

numPerPriceRangeDF = df.groupBy((df['price'] // 100).alias("price_range")).agg({"price":"count"})

//を使用すると以下のようにエラーが発生。//以外の演算子だとエラーがでないので、どうやら//サポートされてないようである。
#時間を見つけて後で調べる。

$ spark-submit --master yarn-client chap6-1.py 
Traceback (most recent call last):
  File "/home/y_tadayasu/spark-sample-program/chap6-1.py", line 40, in <module>
    numPerPriceRangeDF = df.groupBy((df['price'] // 100).alias("price_range")).agg({"price":"count"})
TypeError: unsupported operand type(s) for //: 'Column' and 'int'

//の代わりに/を使うと以下のように集約処理を行うことができる。

+-----------+------------+                                                      
|price_range|count(price)|
+-----------+------------+
|        2.2|           1|
|        3.4|           1|
・・・{省略}・・・

DataFrame同士の結合 join(other, on=None, how=None)

2つ以上のDataFrameをキーで結合する。

Joins with another DataFrame, using the given join expression.

lines_order = sc.textFile("hdfs:///user/y_tadayasu/data/dessert-order.csv").map(lambda l: l.split(","))

dessertOrder = lines_order.map(lambda p: Row(sId=p[0], menuId=p[1],num=int(p[2])))

df2 = sqlContext.createDataFrame(dessertOrder)

amntPerMenuPerSlipDF = df.join(df2, df.menuId == df2.menuId,'inner').select(df2.sId,df.name,(df2.num*df.price).alia
s('amount_per_menu_per_slip'))
+-----+---------+------------------------+                                      
|  sId|     name|amount_per_menu_per_slip|
+-----+---------+------------------------+
|SID-1| クリームあんみつ|                    2000|
|SID-2|   チーズケーキ|                     400|
|SID-0|チョコレートパフェ|                     980|
|SID-0|   パンナコッタ|                     420|
|SID-2|   アフォガート|                     300|
|SID-2| バニラジェラート|                     360|
+-----+---------+------------------------+

UDF(User Defined Functions)の利用 registerFunctionメソッド ※Scalaだとregister

UDFはSparkSQLに機能を追加するシンプルな方法。
registerFunction(name, f, returnType=StringType)[source]

Parameters:
name – name of the UDF
samplingRatio – lambda function
returnType – a DataType object

使用する際はregisterFunctionの最初のパラメータで指定した名前を使う。

# -*- coding:utf-8 -*-
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import IntegerType
sc = SparkContext(appName="Example")
sqlContext = SQLContext(sc)
sqlContext.udf.register("strlen",lambda x:len(x),IntegerType())
print(sqlContext.sql("SELECT strlen('Hello')").show())
+---+
|_c0|
+---+
|  5|
+---+

Spark1.5系からUDAFが使えるようになったらしいが、pythonではunsupportedとのこと。

HiveのUTFも利用できるみたいであるが、そもそもHiveをよく知らないので今回はパス。

構造化データの処理

DataFrameの内容を書き出す

DataFrameWriterを利用することにより、DataFrameの内容を書き出すことができる。DataFrameWriterはDataFrameのwriteメソッドで取得できる。

Interface for saving the content of the DataFrame out into external storage.

format(source)
Specifies the underlying output data source.

Parameters: source – string, name of the data source, e.g. ‘json’, ‘parquet’.

save(path=None, format=None, mode=None, partitionBy=None, **options)
Saves the contents of the DataFrame to a data source.
saveメソッドでファイルに書き出すことができる。

df.write.format('json').save("result.json")

ファイル名だけ指定するとhdfs上のホーム直下に出力されるようである。

$ hdfs dfs -ls /user/y_tadayasu
Found 3 items
drwxr-xr-x   - y_tadayasu y_tadayasu          0 2015-12-15 02:13 /user/y_tadayasu/.sparkStaging
drwxr-xr-x   - y_tadayasu y_tadayasu          0 2015-12-10 03:03 /user/y_tadayasu/data
drwxr-xr-x   - y_tadayasu y_tadayasu          0 2015-12-15 02:13 /user/y_tadayasu/result.json
$ hdfs dfs -cat /user/y_tadayasu/result.json/part*
{"kcal":420,"menuId":"D-0","name":"チョコレートパフェ","price":490}
{"kcal":380,"menuId":"D-1","name":"プリンパフェ","price":530}
{"kcal":320,"menuId":"D-2","name":"いちごパフェ","price":520}
・・・{省略}・・・

書きだしたファイルから別のDataFrameを生成

DataFrameReaderを利用してデータセットからDataFrameを生成する。
DataFarmeReaderのloadメソッドにより、ファイルに対応付けられたDataFrameが返される。
注)loadメソッドを呼び出した時点ではスキーマ情報の生成に関するデータは読みこむがデータ全体の読み込みは行われていない。

df3 = sqlContext.read.format('json').load('result.json')
print(df3.show(3))
+----+------+---------+-----+                                                   
|kcal|menuId|     name|price|
+----+------+---------+-----+
| 420|   D-0|チョコレートパフェ|  490|
| 380|   D-1|   プリンパフェ|  530|
| 320|   D-2|   いちごパフェ|  520|
+----+------+---------+-----+
only showing top 3 rows

Spark SQLの独自テーブルに書き出す。

df.write.format('parquet').saveAsTable("result_tbl")

以下のようなエラーが発生。SQLContextだとテンポラリなテーブルが作成されるので、代わりにHiveContextを使用しろとのこと。pysparkなどでインタラクティブに実行したときは表示されない。

・・・{省略}・・・
py4j.protocol.Py4JJavaError: An error occurred while calling o115.saveAsTable.
: java.lang.RuntimeException: Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.
・・・{省略}・・・

以下のエラー?Messageが出力される。

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

どうやって作成されたテーブルを確認するかは今度調べる。