Spark: The Definitive Guide http://shop.oreilly.com/product/0636920034957.do という本を読みながら、Python を使った Spark の基本概念をメモする。
DataFrame https://docs.databricks.com/spark/latest/dataframes-datasets/index.html
Spark の DataFrame とは、Pandas の DataFrame のようにカラム名に名前が付いた表だ。Pandas の DataFrame のように Python のオブジェクトから直接作ったり、csv や JSON から作る事が出来る。例えば Databrics 標準サンプルの flight-data/csv/2015-summary.csv
から作成してみる。まずファイルの内容を確認する。
%sh head /dbfs/databricks-datasets/definitive-guide/data/flight-data/csv/2015-summary.csv
DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40
Command took 0.55 seconds -- by tyamamiya@xevo.com at 10/8/2020, 11:42:09 AM on tyamamiya_grubhub
このファイルを読むには、spark.read
からアクセス出来る DataFrameReader https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader を使う。
flightData2015 = spark.read.load(
"/databricks-datasets/definitive-guide/data/flight-data/csv/2015-summary.csv",
format = "csv",
inferSchema = "true",
header = "true",
)
display(flightData2015)
Transformation
ここで、出来た flightData2015 の中から DEST_COUNTRY_NAME = 'United States'
の物だけを抜き出してみる。
flightData2015.where("DEST_COUNTRY_NAME = 'United States'")
Out[2]: DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: int]
ただ、この操作では具体的な答えが返らない。Spark では、本当に絶対に必要になるまで実行を遅延する事によって操作を最適化する戦略を取っている。where のようにその場では実行されない操作を Transformation と呼ぶ。Transformation には次の二種類がある。
- Narrow transformation: 一つの Partition で終わる操作。
- Wide transformation: 複数の Partition をまたがる操作。
Spark では、一つの DataFrame のカラムを複数の Partition に保存して同時並行で操作を行うのでこういう区別がある。Narrow transformation は、フィルタのように他の行を参照する必要の無い操作の事で、同時並行で操作を行える。Wide transformation とは、ソートのように他の行を参照する操作で、単純に同時並行で実行する事は出来ない。
Action
遅延された Transformation は Action を行うと実行される。例えば count() は検索結果の件数を数える Action だ。
flightData2015.where("DEST_COUNTRY_NAME = 'United States'").count()
Out[3]: 125
他にも、検索結果を先頭から n 件取り出す take() などがある。
explain()
Transformation が実際にどのように実行されるのかを見るには explain を使う。
flightData2015.sort("count").explain()
== Physical Plan ==
Sort [count#145 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#145 ASC NULLS FIRST, 200), true, [id=#453]
+- FileScan csv [DEST_COUNTRY_NAME#143,ORIGIN_COUNTRY_NAME#144,count#145] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[dbfs:/databricks-datasets/definitive-guide/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>
慣れると書いてある意味が分かるらしいが、最後に行われる操作が先頭に書いてある。つまり、時間の順序は下から上だ。一番下の csv を読み込む所から始まり、最後の一番上で Sort を行っている。
SQL
Python の中に Spark を操作する SQL を書くことが出来る。SQL で操作するには createOrReplaceTempView
で SQL から見えるようにする。
flightData2015.createOrReplaceTempView("flight_data_2015")
explain で調べると Python で書いたのと同じ操作が行われる事が分かる。
sql_way = spark.sql("""SELECT DEST_COUNTRY_NAME, count(*) FROM flight_data_2015 GROUP BY DEST_COUNTRY_NAME""")
sql_way.explain()
python_way = flightData2015.groupBy('DEST_COUNTRY_NAME').count()
python_way.explain()
== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#143], functions=[finalmerge_count(merge count#198L) AS count(1)#193L])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#143, 200), true, [id=#598]
+- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#143], functions=[partial_count(1) AS count#198L])
+- FileScan csv [DEST_COUNTRY_NAME#143] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[dbfs:/databricks-datasets/definitive-guide/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#143], functions=[finalmerge_count(merge count#210L) AS count(1)#205L])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#143, 200), true, [id=#649]
+- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#143], functions=[partial_count(1) AS count#210L])
+- FileScan csv [DEST_COUNTRY_NAME#143] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[dbfs:/databricks-datasets/definitive-guide/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
おっと、今気づいたのだが、この count() (GroupData のメソッド) は flightData2015.where("DEST_COUNTRY_NAME = 'United States'").count()
の時の count() (DataFrame のメソッド) と違って Action では無いな。Dataframe の場合はカウント値が返るので explain() 出来ない。
Partition 数の変更
デフォルトでは count() などに 200 個の Partition が使われる。これを調節する事が出来る。
spark.conf.set("spark.sql.shuffle.partitions", "5")
flightData2015.sort("count").explain()
== Physical Plan ==
Sort [count#145 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#145 ASC NULLS FIRST, 5), true, [id=#858]
+- FileScan csv [DEST_COUNTRY_NAME#143,ORIGIN_COUNTRY_NAME#144,count#145] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[dbfs:/databricks-datasets/definitive-guide/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>
Exchange rangepartitioning(count#145 ASC NULLS FIRST, 5)
のようになっているので、Partition が減った事が分かる。
その他、Python を使った Spark の使い方は https://docs.databricks.com/spark/latest/dataframes-datasets/introduction-to-dataframes-python.html にも沢山の例がある。