LoginSignup
47
47

More than 5 years have passed since last update.

Spark SQLサンプルアプリの実行

Posted at

Spark SQLではDataFrameと呼ばれる抽象的なデータ構造(RDBのテーブルのように行と名前とデータ型が付与された列の概念を持つデータ構造)を用いる。DataFrameはRDD、HIVEテーブル、他のデータソース(ファイルなど)から生成できる。
-DataFrameによる操作
-テーブル形式のデータセットに対してクエリを発行

http://spark.apache.org/docs/latest/sql-programming-guide.html
https://spark.apache.org/docs/1.5.2/api/python/pyspark.sql.html

SparkSQLサンプルアプリの実行

Spark入門の6章に記載されているプログラムをScalaではなくPython3を使って書き直す。
csv形式のデザートメニュー(メニューID、メニューの名前、値段、カロリー)を使用して、ソートや集計などの処理を行う。

SparkSQLの準備

SparkSQLではメタデータをメタストアで管理し、メタストアはHiveのメタストアを利用するので、hive-site.xmlでメタストアの設定を行う。
以下の例ではテーブルの実体となるファイルを配置するディレクトリにhdfsを使用。

/opt/spark/conf/hive-site.xml
<configuration>
   <property>
      <name>hive.metastore.warehouse.dir</name>
      <value>hdfs:///user/y_tadayasu/data/metastore</value>
   </property>
</configuration>

データをHDFS上に配置

$ hdfs dfs -put Chapter6/data/dessert-* /user/y_tadayasu/data/

Spark SQLの初期化処理

Spark SQLを利用するためには、SparkContextに加えてSQLContextが必要。SQLContextはDataFrameの作成やテーブルとしてDataFrameを登録、テーブルを超えたSQLの実行、キャッシュテーブル、そしてperquetファイルの読み込みに利用される。
SQLContextは以下のようにして初期化できる。

>>> from pyspark.sql import SQLContext
>>> sqlContext = SQLContext(sc)

RDDからDataFrameを生成

RDDからDataFrameを生成するには、RDDが持つデータにスキーマ情報を付与する必要がある。Spark SQLはデータタイプを推測することにより、RowオブジェクトのRDDをDataFrameに変換することが可能である。Rowはkey/valueペアのリストを経由して構成される。keyはテーブルのカラム名として定義され、最初の行を見ることでタイプが推測される。そのため最初の行はとても重要で、データに間違いがあると推測も間違ってしまう。将来のバージョンではより複雑な推測方法を計画しており、より多くのデータを使って推測できるようになる。

テキストファイルをロードし、DataFrameで扱えるようにするために、それぞれの行をRowに変換する。

lines = sc.textFile("hdfs:///user/y_tadayasu/data/dessert-menu.csv").map(lambda l: l.split(","))

dessertMenu = lines.map(lambda p: Row(menuId=p[0], name=p[1],price=int(p[2]),kcal=int(p[3])))

テキストをロードし、","でスプリットしたRDDのデータ

['D-0', 'チョコレートパフェ', '490', '420']                                     
['D-1', 'プリンパフェ', '530', '380']
['D-2', 'いちごパフェ', '520', '320']
・・・{省略}・・・

Rowに変換したデータ

Row(calorie=420, menuId='D-0', menuName='チョコレートパフェ', price=490)        
Row(calorie=380, menuId='D-1', menuName='プリンパフェ', price=530)
Row(calorie=320, menuId='D-2', menuName='いちごパフェ', price=520)
・・・{省略}・・・

スキーマを推測し、DataFrameに対して直接クエリを発行することができないため、クエリを発行できるようにするためにDataFrameを一時テーブルに登録する。

# RDDからDataFrameを生成
df = sqlContext.createDataFrame(dessertMenu)
# DataFrameをテンポラリなテーブルとして登録
df.registerTempTable("dessert_table")
# 推測したスキーマ情報を表示
print(df.printSchema())

printSchemaのアウトプットは以下のとおり。nullableという属性は該当の列にnullを許すかどうか。trueの場合はnullが設定されることを許容する。

root
 |-- kcal: long (nullable = true)
 |-- menuId: string (nullable = true)
 |-- name: string (nullable = true)
 |-- price: long (nullable = true)

DataFrameにクエリを発行

先ほど登録した一時テーブルに対して、クエリを発行する。
一時テーブルにクエリを発行した時点では実際のデータ処理は行われない。RDDと同じようにアクションを通じて実際にデータ処理が行われる。ここではshowメソッドアクションを通じてDataFrameが表すデータ・セットの内容を表示する。

以下の例では、カロリーが300キロ以上のデザートがいくつあるかカウント。

numOver300KcalDF = sqlContext.sql("SELECT count(*) AS num_of_over_300Kcal FROM dessert_table WHERE kcal >= 300")

print(numOver300KcalDF.show())
+-------------------+                                                           
|num_of_over_300Kcal|
+-------------------+
|                  9|
+-------------------+

DataFrame APIでDataFrameを操作

DataFrame APIを利用すると、列の選択や行のフィルタリングなどSQL/HiveQLよりも細かい単位でDataFrameを操作できる。

DataFameの列を選択 selectメソッド

name列とprice列のみ選択

name_price = df.select(df['name'],df['price'])
print(name_price.show())
+----------+-----+
|      name|price|
+----------+-----+
| チョコレートパフェ|  490|
|    プリンパフェ|  530|
|    いちごパフェ|  520|
・・・省略・・・
+----------+-----+
only showing top 20 rows

全ての列を選択するには、selectメソッドに*を指定する。

DataFrame APIでは、列が指定できる箇所には、演算子や関数を用いて式を記述できる。
例として価格をドル表示する。表示は別名としてdollar priceを付ける。Scalaだと普通にasを使えるが、pythonだとaliasメソッドを使うっぽい。表示する際は行数が多いとうっとおしいので今回は4行だけ表示するようにした。

name_price = df.select(df['name'],(df['price']/120.0).alias('dollar price'))

print(name_price.show(4))
+---------+-----------------+
|     name|     dollar price|
+---------+-----------------+
|チョコレートパフェ|4.083333333333333|
|   プリンパフェ|4.416666666666667|
|   いちごパフェ|4.333333333333333|
|   パンナコッタ|              3.5|
+---------+-----------------+
only showing top 4 rows

フィルタリング where

SQLのwhere句に相当するもの。

over520YenDF = df.where(df["price"] >= 520)
+----+------+-------+-----+
|kcal|menuId|   name|price|
+----+------+-------+-----+
| 380|   D-1| プリンパフェ|  530|
| 320|   D-2| いちごパフェ|  520|
| 288|   D-4| チーズムース|  580|
| 251|   D-6|  ティラミス|  600|
| 650|  D-11|紫いものパフェ|  650|
+----+------+-------+-----+

selectと一緒に使用する場合は、whereの後にselectを指定する。先にselectで指定するとnameしかない状態になるので、priceでフィルタリングできなくなる。

name_price_over520YenDF = df.where(df["price"] >= 520).select(df['name'])
+-------+
|   name|
+-------+
| プリンパフェ|
| いちごパフェ|
| チーズムース|
|  ティラミス|
|紫いものパフェ|
+-------+

ソート orderBy

値段を昇順、カロリーを降順でソートする。

sortedDessertDF = df.orderBy(df['price'].asc(),df['kcal'].desc())
+----+------+----------+-----+
|kcal|menuId|      name|price|
+----+------+----------+-----+
| 120|  D-15|  カスタードプリン|  200|
| 160|  D-17|    チーズスフレ|  220|
| 248|   D-5|    アフォガート|  300|
| 220|  D-16|    チョコトルテ|  330|
・・・{省略}・・・
+----+------+----------+-----+
only showing top 20 rows

集約処理 aggメソッド

平均のカロリーを求める。
agg(*exprs)
利用可能なaggregate機能はavg, max, min, sum, count.
exprs – a dict mapping from column name (string) to aggregate functions (string), or a list of Column.

avgKcalDF = df.agg({"kcal":"avg"})
+-----------------+                                                             
|        avg(kcal)|
+-----------------+
|291.7826086956522|
+-----------------+

グループ毎に集約処理を行う groupBy(*cols) + agg(*exprs)

同じ価格帯にグループ化し、価格帯毎のメニューの数を数える。
python3では/で除算すると小数点が含まれるようになったので、小数点を切り捨てるには//を使用する必要がある。

numPerPriceRangeDF = df.groupBy((df['price'] // 100).alias("price_range")).agg({"price":"count"})

//を使用すると以下のようにエラーが発生。//以外の演算子だとエラーがでないので、どうやら//サポートされてないようである。
#時間を見つけて後で調べる。

$ spark-submit --master yarn-client chap6-1.py 
Traceback (most recent call last):
  File "/home/y_tadayasu/spark-sample-program/chap6-1.py", line 40, in <module>
    numPerPriceRangeDF = df.groupBy((df['price'] // 100).alias("price_range")).agg({"price":"count"})
TypeError: unsupported operand type(s) for //: 'Column' and 'int'

//の代わりに/を使うと以下のように集約処理を行うことができる。

+-----------+------------+                                                      
|price_range|count(price)|
+-----------+------------+
|        2.2|           1|
|        3.4|           1|
・・・{省略}・・・

DataFrame同士の結合 join(other, on=None, how=None)

2つ以上のDataFrameをキーで結合する。

Joins with another DataFrame, using the given join expression.

lines_order = sc.textFile("hdfs:///user/y_tadayasu/data/dessert-order.csv").map(lambda l: l.split(","))

dessertOrder = lines_order.map(lambda p: Row(sId=p[0], menuId=p[1],num=int(p[2])))

df2 = sqlContext.createDataFrame(dessertOrder)

amntPerMenuPerSlipDF = df.join(df2, df.menuId == df2.menuId,'inner').select(df2.sId,df.name,(df2.num*df.price).alia
s('amount_per_menu_per_slip'))
+-----+---------+------------------------+                                      
|  sId|     name|amount_per_menu_per_slip|
+-----+---------+------------------------+
|SID-1| クリームあんみつ|                    2000|
|SID-2|   チーズケーキ|                     400|
|SID-0|チョコレートパフェ|                     980|
|SID-0|   パンナコッタ|                     420|
|SID-2|   アフォガート|                     300|
|SID-2| バニラジェラート|                     360|
+-----+---------+------------------------+

UDF(User Defined Functions)の利用 registerFunctionメソッド ※Scalaだとregister

UDFはSparkSQLに機能を追加するシンプルな方法。
registerFunction(name, f, returnType=StringType)[source]

Parameters:
name – name of the UDF
samplingRatio – lambda function
returnType – a DataType object

使用する際はregisterFunctionの最初のパラメータで指定した名前を使う。

# -*- coding:utf-8 -*-
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import IntegerType
sc = SparkContext(appName="Example")
sqlContext = SQLContext(sc)
sqlContext.udf.register("strlen",lambda x:len(x),IntegerType())
print(sqlContext.sql("SELECT strlen('Hello')").show())
+---+
|_c0|
+---+
|  5|
+---+

Spark1.5系からUDAFが使えるようになったらしいが、pythonではunsupportedとのこと。

HiveのUTFも利用できるみたいであるが、そもそもHiveをよく知らないので今回はパス。

構造化データの処理

DataFrameの内容を書き出す

DataFrameWriterを利用することにより、DataFrameの内容を書き出すことができる。DataFrameWriterはDataFrameのwriteメソッドで取得できる。

Interface for saving the content of the DataFrame out into external storage.

format(source)
Specifies the underlying output data source.

Parameters: source – string, name of the data source, e.g. ‘json’, ‘parquet’.

save(path=None, format=None, mode=None, partitionBy=None, **options)
Saves the contents of the DataFrame to a data source.
saveメソッドでファイルに書き出すことができる。

df.write.format('json').save("result.json")

ファイル名だけ指定するとhdfs上のホーム直下に出力されるようである。

$ hdfs dfs -ls /user/y_tadayasu
Found 3 items
drwxr-xr-x   - y_tadayasu y_tadayasu          0 2015-12-15 02:13 /user/y_tadayasu/.sparkStaging
drwxr-xr-x   - y_tadayasu y_tadayasu          0 2015-12-10 03:03 /user/y_tadayasu/data
drwxr-xr-x   - y_tadayasu y_tadayasu          0 2015-12-15 02:13 /user/y_tadayasu/result.json
$ hdfs dfs -cat /user/y_tadayasu/result.json/part*
{"kcal":420,"menuId":"D-0","name":"チョコレートパフェ","price":490}
{"kcal":380,"menuId":"D-1","name":"プリンパフェ","price":530}
{"kcal":320,"menuId":"D-2","name":"いちごパフェ","price":520}
・・・{省略}・・・

書きだしたファイルから別のDataFrameを生成

DataFrameReaderを利用してデータセットからDataFrameを生成する。
DataFarmeReaderのloadメソッドにより、ファイルに対応付けられたDataFrameが返される。
注)loadメソッドを呼び出した時点ではスキーマ情報の生成に関するデータは読みこむがデータ全体の読み込みは行われていない。

df3 = sqlContext.read.format('json').load('result.json')
print(df3.show(3))
+----+------+---------+-----+                                                   
|kcal|menuId|     name|price|
+----+------+---------+-----+
| 420|   D-0|チョコレートパフェ|  490|
| 380|   D-1|   プリンパフェ|  530|
| 320|   D-2|   いちごパフェ|  520|
+----+------+---------+-----+
only showing top 3 rows

Spark SQLの独自テーブルに書き出す。

df.write.format('parquet').saveAsTable("result_tbl")

以下のようなエラーが発生。SQLContextだとテンポラリなテーブルが作成されるので、代わりにHiveContextを使用しろとのこと。pysparkなどでインタラクティブに実行したときは表示されない。

・・・{省略}・・・
py4j.protocol.Py4JJavaError: An error occurred while calling o115.saveAsTable.
: java.lang.RuntimeException: Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.
・・・{省略}・・・

以下のエラー?Messageが出力される。

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

どうやって作成されたテーブルを確認するかは今度調べる。

47
47
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
47
47