LoginSignup
14
16

More than 1 year has passed since last update.

今さら聞けないPython - Sparkのご紹介

Last updated at Posted at 2023-03-31

こちらのウェビナーで説明した内容の抜粋です。最初のセクションの「pandasを用いたデータ分析」をカバーしています。今さら聞けないPython - scikit-learnを用いた機械学習の続きです。

ウェビナーで使用したノートブックはこちらにあります。

全体構成は以下の通りです。

本記事でカバーしているノートブックはこちらです。

Sparkのご紹介

PCで処理できるデータ量であればpandas/scikit-learnで十分ですが…

  • Apache Sparkは、多数のマシンで並列でコードを実行するための洗練された分散処理フレームワークです。これによって、大量データも効率的に処理することができます。
  • Sparkでもデータフレームでデータを取り扱うことが可能です。Python(PySpark)やRなどのAPIを提供しており、pandasとの相互運用も可能です。
pandas pandas API on Spark(旧Koalas) Apache Spark(PySpark)
データセットが小さい場合はpandasが正しい選択肢となります。 大量データを操作する必要があり、PySparkではなくpandas APIを活用したいと考える際には最適な選択肢となります。
※APIの対応状況など注意事項があります。
大量データに対する処理が必要な場合は、Apache Sparkのような並列データフレームを使用することで高速化が期待できます。
  • Sparkデータフレームとは何か?
  • どのように分散カウント処理を実行するのか?
  • トランスフォーメーション vs. アクション
  • Spark SQL
%fs ls databricks-datasets/COVID/covid-19-data/

Screenshot 2023-03-31 at 11.42.01.png

このデータはどのように表現されるのか?

Unified Engine

最初はRDDがありました...

  • Resilient: 耐障害性
  • Distributed: 複数ノードに分散
  • Dataset: パーティション分けされたデータのコレクション

RDDは作成されると不変となり、問題特定を可能にするために自分のリネージを追跡し続けます。

... そして、データフレームが生まれました。

  • 高レベルのAPI
  • ユーザーフレンドリー
  • 最適化、パフォーマンス改善

RDD vs DataFrames

NYT COVIDデータからデータフレームの作成

Python
covid_df = spark.read.csv("dbfs:/databricks-datasets/COVID/covid-19-data/us-counties.csv")
covid_df.show()
+----------+-----------+----------+-----+-----+------+
|       _c0|        _c1|       _c2|  _c3|  _c4|   _c5|
+----------+-----------+----------+-----+-----+------+
|      date|     county|     state| fips|cases|deaths|
|2020-01-21|  Snohomish|Washington|53061|    1|     0|
|2020-01-22|  Snohomish|Washington|53061|    1|     0|
|2020-01-23|  Snohomish|Washington|53061|    1|     0|
|2020-01-24|       Cook|  Illinois|17031|    1|     0|
|2020-01-24|  Snohomish|Washington|53061|    1|     0|
|2020-01-25|     Orange|California|06059|    1|     0|
|2020-01-25|       Cook|  Illinois|17031|    1|     0|
|2020-01-25|  Snohomish|Washington|53061|    1|     0|
|2020-01-26|   Maricopa|   Arizona|04013|    1|     0|

CSVリーダーにどのようなオプションを指定できるのかを確認するために、Sparkドキュメントを参照しましょう。

Python
covid_df = spark.read.csv("dbfs:/databricks-datasets/COVID/covid-19-data/us-counties.csv", header=True, inferSchema=True)
covid_df.show()
+----------+-----------+----------+-----+-----+------+
|      date|     county|     state| fips|cases|deaths|
+----------+-----------+----------+-----+-----+------+
|2020-01-21|  Snohomish|Washington|53061|    1|     0|
|2020-01-22|  Snohomish|Washington|53061|    1|     0|
|2020-01-23|  Snohomish|Washington|53061|    1|     0|
|2020-01-24|       Cook|  Illinois|17031|    1|     0|
|2020-01-24|  Snohomish|Washington|53061|    1|     0|
|2020-01-25|     Orange|California| 6059|    1|     0|
|2020-01-25|       Cook|  Illinois|17031|    1|     0|

レコード数はいくつでしょうか?

  • (一つづつ)M&Mを数えるのではなく、データフレームの行数をカウントしましょう。

ここでのSparkジョブはどのようなものになるのでしょうか?

  • ステージ数はいくつでしょうか?
Python
covid_df.count()
Out[3]: 1111930

Sparkコードを書いてみましょう!

  • 住んでいる郡(Los Angeles)の情報のみを参照したいです。
  • 最新の情報を一番上に表示したいです。
Python
(covid_df
 .sort(covid_df["date"].desc()) 
 .filter(covid_df["county"] == "Los Angeles")) 
Out[4]: DataFrame[date: date, county: string, state: string, fips: int, cases: int, deaths: int]

...何も起きません。なぜでしょうか?

トランスフォーメーション vs アクション

Sparkでは2つのタイプのオペレーションがあります: トランスフォーメーションとアクションです。

Apache Sparkの基本は以下のように説明されます

  • トランスフォーメーションは 怠惰(LAZY) です
  • アクションは 懸命(EAGER) です
Python
# 上と同じオペレーションです
(covid_df
 .sort(covid_df["date"].desc()) 
 .filter(covid_df["county"] == "Los Angeles")) 
Out[5]: DataFrame[date: date, county: string, state: string, fips: int, cases: int, deaths: int]

なぜ結果が表示されないのでしょうか?Sortfilterは、Sparkでは遅延評価されるトランスフォーメーションです。

遅延評価にはいくつかのメリットがあります。

  • 最初からすべてのデータをロードする必要がありません。
    • 本当に大きなデータセットでは技術的に不可能です。
  • オペレーションの並列化が容易です。
    • 単一マシン、単一スレッド、単一のデータ要素に対して、N個の異なるトランスフォーメーションを処理することが可能です。
  • 最も重要なことですが、これによってこのフレームワーク様々な最適化処理を自動で適用できるようになります。
    • これもまた我々がデータフレームを活用する理由なのです!

SparkのCatalystオプティマイザーができることは様々です。この状況にのみフォーカスしていきましょう。詳細はこちらのブログ記事をご覧ください!

Catalyst

Python
(covid_df
 .sort(covid_df["date"].desc()) 
 .filter(covid_df["county"] == "Los Angeles") 
 .show())  # これがアクションです!
+----------+-----------+----------+----+-------+------+
|      date|     county|     state|fips|  cases|deaths|
+----------+-----------+----------+----+-------+------+
|2021-03-11|Los Angeles|California|6037|1208672| 22304|
|2021-03-10|Los Angeles|California|6037|1207361| 22213|
|2021-03-09|Los Angeles|California|6037|1205924| 22099|
|2021-03-08|Los Angeles|California|6037|1204665| 22041|
|2021-03-07|Los Angeles|California|6037|1203799| 22029|
|2021-03-06|Los Angeles|California|6037|1202513| 22008|
|2021-03-05|Los Angeles|California|6037|1200764| 21910|
|2021-03-04|Los Angeles|California|6037|1198737| 21778|

実際に最適化処理を確認することができます!

  • Spark UIに移動します
  • Sparkジョブに関連づけられているSQLクエリーをクリックします
  • 論理的、物理的プランを確認します!
    • フィルタリングとソートが入れ替えられています。

Spark SQL

SparkではSQLを使用することもできます。データフレームをテーブルやビューに登録することで、SQLでアクセスが可能となります。

Python
covid_df.createOrReplaceTempView("covid")
SQL
%sql

SELECT * 
FROM covid

さらには、Databricksでは結果を簡単にグラフにすることができます。以下では、グラフの設定でkeys = date, grouping = county, values = casesの設定を行っています。
Screenshot 2023-03-31 at 11.54.17.png

SQL
%sql

SELECT * 
FROM covid 
WHERE county = "Los Angeles"

こちらは、keys = date, grouping = county, values = cases, deathsの設定です。
Screenshot 2023-03-31 at 11.55.48.png

グループに集計関数を適用することもできます。

SQL
%sql

SELECT max(cases) AS max_cases, max(deaths) AS max_deaths, county 
FROM covid 
GROUP BY county 
ORDER BY max_cases DESC
LIMIT 10

Screenshot 2023-03-31 at 11.56.27.png

自分で分析してみましょう!

  • トライしてみるアイデアがいくつかあります。
  • こちらに更なるサンプルがあります。

これはcensus.govから取得した国勢調査データです

  • NYTデータに対応するfipsコードカラムを構成するのに十分なデータが含まれています。
Shell
%sh wget https://www2.census.gov/programs-surveys/popest/datasets/2010-2019/counties/totals/co-est2019-alldata.csv && cp co-est2019-alldata.csv /dbfs/tmp
%sh wget https://www2.census.gov/programs-surveys/popest/datasets/2010-2019/counties/totals/co-est2019-alldata.csv && cp co-est2019-alldata.csv /dbfs/tmp
--2023-03-09 08:27:10--  https://www2.census.gov/programs-surveys/popest/datasets/2010-2019/counties/totals/co-est2019-alldata.csv
Resolving www2.census.gov (www2.census.gov)... 104.86.249.71, 2600:1409:3000:39e::208c, 2600:1409:3000:383::208c
Connecting to www2.census.gov (www2.census.gov)|104.86.249.71|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/csv]
Saving to: ‘co-est2019-alldata.csv.1’

     0K .......... .......... .......... .......... .......... 2.59M
    50K .......... .......... .......... .......... .......... 1.91M
   100K .......... .......... .......... .......... .......... 2.85M
   150K .......... .......... .......... .......... .......... 5.47M
Python
census_df = spark.read.csv("dbfs:/tmp/co-est2019-alldata.csv", header=True, inferSchema=True)

# display()はDatabricksのみで使用できる関数です。show()のようにデータを表示しますが、上のSQLのセクションで見たようにビジュアライゼーションのオプションを提供しています。
display(census_df)

Screenshot 2023-03-31 at 11.57.46.png

NYTデータにマッチするfipsカラムを追加できるように上のデータフレームを調整します。こちらがユーザー定義関数(UDF)に関するドキュメントです。

Python
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def make_fips(state_code, county_code):
  if len(str(county_code)) == 1:
    return str(state_code) + "00" + str(county_code)
  elif len(str(county_code)) == 2:
    return str(state_code) + "0" + str(county_code)
  else:
    return str(state_code) + str(county_code)

make_fips_udf = udf(make_fips, StringType())
  
census_df = census_df.withColumn("fips", make_fips_udf(census_df.STATE, census_df.COUNTY))

同じカラムを持つ国勢調査データとCOVIDのデータの準備ができたので、2つのデータフレームをjoinしましょう。

Python
covid_with_census = (covid_df
                     .na.drop(subset=["fips"])
                     .join(census_df.drop("COUNTY", "STATE"), on=['fips'], how='inner'))

最も人口の多い郡では感染者数はどのようになっているでしょうか?

Python
display(covid_with_census.filter("POPESTIMATE2019 > 2000000").select("county", "cases", "date"))

# keys = date, grouping = county, values = cases

Screenshot 2023-03-31 at 11.59.06.png

NYTデータセットは日毎に新しい行が追加されるので、感染者数は日毎に増加します。郡ごとの最新の数のみを取得しましょう。

  • 以下では、カラムを参照するためにcol関数を使用しています。これはdf["column_name"]と同じようなものです。
  • 郡ごとの最新の行を取得するために、ウィンドウ関数を活用しています。
Python
from pyspark.sql.functions import row_number, col
from pyspark.sql import Window

w = Window.partitionBy("fips").orderBy(col("date").desc())
current_covid_rates = (covid_with_census
                       .withColumn("row_num", row_number().over(w))
                       .filter(col("row_num") == 1)
                       .drop("row_num"))

感染者数を人口にスケールさせた場合、最も困難に直面した郡はどこでしょうか?

Python
current_covid_rates = (current_covid_rates
                       .withColumn("case_rates_percent", 100*(col("cases")/col("POPESTIMATE2019")))
                       .sort(col("case_rates_percent").desc()))

# トップ10の郡を参照します
display(current_covid_rates.select("county", "state", "cases", "POPESTIMATE2019", "case_rates_percent").limit(10))

Screenshot 2023-03-31 at 11.59.59.png

Pandas API on Spark

ここまではPySparkを用いてデータを処理してきましたが、PySparkの文法を覚えるのは大変...という意見もあるかと思います。そこで、Pandas APIを利用しつつもSparkを操作できるAPIとしてPandas API on Spark(旧Koalas)があります。

Python
import pyspark.pandas as ps
Python
# Pandas on Sparkデータフレームに変換
psdf = current_covid_rates.pandas_api()
Python
# pandasのお作法でカラムにアクセスします
psdf['state']
Out[20]: 0            Colorado
1             Georgia
2            Colorado
3            Arkansas
4        South Dakota
5           Tennessee
6              Kansas
7        South Dakota
8        South Dakota
9           Tennessee
Python
psdf.head(2)

Screenshot 2023-03-31 at 12.01.18.png

Python
psdf.describe()

Screenshot 2023-03-31 at 12.01.46.png

Python
psdf.sort_values(by='deaths', ascending=False).head(10)

Screenshot 2023-03-31 at 12.02.17.png

これで、ウェビナー「今さら聞けないPython」でカバーした内容は以上となります。

Databricksクイックスタートガイド

Databricksクイックスタートガイド

Databricks無料トライアル

Databricks無料トライアル

14
16
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
16