こちらのウェビナーで説明した内容の抜粋です。最初のセクションの「pandasを用いたデータ分析」をカバーしています。今さら聞けないPython - scikit-learnを用いた機械学習の続きです。
ウェビナーで使用したノートブックはこちらにあります。
全体構成は以下の通りです。
- 今さら聞けないPython - Pythonの基礎
- 今さら聞けないPython - pandasを用いたデータ分析
- 今さら聞けないPython - scikit-learnを用いた機械学習
- 今さら聞けないPython - Sparkのご紹介
本記事でカバーしているノートブックはこちらです。
Sparkのご紹介
PCで処理できるデータ量であればpandas/scikit-learnで十分ですが…
- Apache Sparkは、多数のマシンで並列でコードを実行するための洗練された分散処理フレームワークです。これによって、大量データも効率的に処理することができます。
- Sparkでもデータフレームでデータを取り扱うことが可能です。Python(PySpark)やRなどのAPIを提供しており、pandasとの相互運用も可能です。
pandas | pandas API on Spark(旧Koalas) | Apache Spark(PySpark) |
---|---|---|
データセットが小さい場合はpandasが正しい選択肢となります。 | 大量データを操作する必要があり、PySparkではなくpandas APIを活用したいと考える際には最適な選択肢となります。 ※APIの対応状況など注意事項があります。 |
大量データに対する処理が必要な場合は、Apache Sparkのような並列データフレームを使用することで高速化が期待できます。 |
- Sparkデータフレームとは何か?
- NYTデータセットの読み込み
- どのように分散カウント処理を実行するのか?
- トランスフォーメーション vs. アクション
- Spark SQL
参考資料
Sparkドキュメント
%fs ls databricks-datasets/COVID/covid-19-data/
このデータはどのように表現されるのか?
最初はRDDがありました...
- Resilient: 耐障害性
- Distributed: 複数ノードに分散
- Dataset: パーティション分けされたデータのコレクション
RDDは作成されると不変となり、問題特定を可能にするために自分のリネージを追跡し続けます。
... そして、データフレームが生まれました。
- 高レベルのAPI
- ユーザーフレンドリー
- 最適化、パフォーマンス改善
NYT COVIDデータからデータフレームの作成
covid_df = spark.read.csv("dbfs:/databricks-datasets/COVID/covid-19-data/us-counties.csv")
covid_df.show()
+----------+-----------+----------+-----+-----+------+
| _c0| _c1| _c2| _c3| _c4| _c5|
+----------+-----------+----------+-----+-----+------+
| date| county| state| fips|cases|deaths|
|2020-01-21| Snohomish|Washington|53061| 1| 0|
|2020-01-22| Snohomish|Washington|53061| 1| 0|
|2020-01-23| Snohomish|Washington|53061| 1| 0|
|2020-01-24| Cook| Illinois|17031| 1| 0|
|2020-01-24| Snohomish|Washington|53061| 1| 0|
|2020-01-25| Orange|California|06059| 1| 0|
|2020-01-25| Cook| Illinois|17031| 1| 0|
|2020-01-25| Snohomish|Washington|53061| 1| 0|
|2020-01-26| Maricopa| Arizona|04013| 1| 0|
CSVリーダーにどのようなオプションを指定できるのかを確認するために、Sparkドキュメントを参照しましょう。
covid_df = spark.read.csv("dbfs:/databricks-datasets/COVID/covid-19-data/us-counties.csv", header=True, inferSchema=True)
covid_df.show()
+----------+-----------+----------+-----+-----+------+
| date| county| state| fips|cases|deaths|
+----------+-----------+----------+-----+-----+------+
|2020-01-21| Snohomish|Washington|53061| 1| 0|
|2020-01-22| Snohomish|Washington|53061| 1| 0|
|2020-01-23| Snohomish|Washington|53061| 1| 0|
|2020-01-24| Cook| Illinois|17031| 1| 0|
|2020-01-24| Snohomish|Washington|53061| 1| 0|
|2020-01-25| Orange|California| 6059| 1| 0|
|2020-01-25| Cook| Illinois|17031| 1| 0|
レコード数はいくつでしょうか?
- (一つづつ)M&Mを数えるのではなく、データフレームの行数をカウントしましょう。
ここでのSparkジョブはどのようなものになるのでしょうか?
- ステージ数はいくつでしょうか?
covid_df.count()
Out[3]: 1111930
Sparkコードを書いてみましょう!
- 住んでいる郡(Los Angeles)の情報のみを参照したいです。
- 最新の情報を一番上に表示したいです。
(covid_df
.sort(covid_df["date"].desc())
.filter(covid_df["county"] == "Los Angeles"))
Out[4]: DataFrame[date: date, county: string, state: string, fips: int, cases: int, deaths: int]
...何も起きません。なぜでしょうか?
トランスフォーメーション vs アクション
Sparkでは2つのタイプのオペレーションがあります: トランスフォーメーションとアクションです。
Apache Sparkの基本は以下のように説明されます
- トランスフォーメーションは 怠惰(LAZY) です
- アクションは 懸命(EAGER) です
# 上と同じオペレーションです
(covid_df
.sort(covid_df["date"].desc())
.filter(covid_df["county"] == "Los Angeles"))
Out[5]: DataFrame[date: date, county: string, state: string, fips: int, cases: int, deaths: int]
なぜ結果が表示されないのでしょうか?Sortとfilterは、Sparkでは遅延評価されるトランスフォーメーション
です。
遅延評価にはいくつかのメリットがあります。
- 最初からすべてのデータをロードする必要がありません。
- 本当に大きなデータセットでは技術的に不可能です。
- オペレーションの並列化が容易です。
- 単一マシン、単一スレッド、単一のデータ要素に対して、N個の異なるトランスフォーメーションを処理することが可能です。
- 最も重要なことですが、これによってこのフレームワーク様々な最適化処理を自動で適用できるようになります。
- これもまた我々がデータフレームを活用する理由なのです!
SparkのCatalystオプティマイザーができることは様々です。この状況にのみフォーカスしていきましょう。詳細はこちらのブログ記事をご覧ください!
(covid_df
.sort(covid_df["date"].desc())
.filter(covid_df["county"] == "Los Angeles")
.show()) # これがアクションです!
+----------+-----------+----------+----+-------+------+
| date| county| state|fips| cases|deaths|
+----------+-----------+----------+----+-------+------+
|2021-03-11|Los Angeles|California|6037|1208672| 22304|
|2021-03-10|Los Angeles|California|6037|1207361| 22213|
|2021-03-09|Los Angeles|California|6037|1205924| 22099|
|2021-03-08|Los Angeles|California|6037|1204665| 22041|
|2021-03-07|Los Angeles|California|6037|1203799| 22029|
|2021-03-06|Los Angeles|California|6037|1202513| 22008|
|2021-03-05|Los Angeles|California|6037|1200764| 21910|
|2021-03-04|Los Angeles|California|6037|1198737| 21778|
実際に最適化処理を確認することができます!
- Spark UIに移動します
- Sparkジョブに関連づけられているSQLクエリーをクリックします
- 論理的、物理的プランを確認します!
- フィルタリングとソートが入れ替えられています。
Spark SQL
SparkではSQLを使用することもできます。データフレームをテーブルやビューに登録することで、SQLでアクセスが可能となります。
covid_df.createOrReplaceTempView("covid")
%sql
SELECT *
FROM covid
さらには、Databricksでは結果を簡単にグラフにすることができます。以下では、グラフの設定でkeys = date, grouping = county, values = cases
の設定を行っています。
%sql
SELECT *
FROM covid
WHERE county = "Los Angeles"
こちらは、keys = date, grouping = county, values = cases, deaths
の設定です。
グループに集計関数を適用することもできます。
%sql
SELECT max(cases) AS max_cases, max(deaths) AS max_deaths, county
FROM covid
GROUP BY county
ORDER BY max_cases DESC
LIMIT 10
自分で分析してみましょう!
- トライしてみるアイデアがいくつかあります。
- こちらに更なるサンプルがあります。
これはcensus.govから取得した国勢調査データです
- NYTデータに対応するfipsコードカラムを構成するのに十分なデータが含まれています。
%sh wget https://www2.census.gov/programs-surveys/popest/datasets/2010-2019/counties/totals/co-est2019-alldata.csv && cp co-est2019-alldata.csv /dbfs/tmp
%sh wget https://www2.census.gov/programs-surveys/popest/datasets/2010-2019/counties/totals/co-est2019-alldata.csv && cp co-est2019-alldata.csv /dbfs/tmp
--2023-03-09 08:27:10-- https://www2.census.gov/programs-surveys/popest/datasets/2010-2019/counties/totals/co-est2019-alldata.csv
Resolving www2.census.gov (www2.census.gov)... 104.86.249.71, 2600:1409:3000:39e::208c, 2600:1409:3000:383::208c
Connecting to www2.census.gov (www2.census.gov)|104.86.249.71|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/csv]
Saving to: ‘co-est2019-alldata.csv.1’
0K .......... .......... .......... .......... .......... 2.59M
50K .......... .......... .......... .......... .......... 1.91M
100K .......... .......... .......... .......... .......... 2.85M
150K .......... .......... .......... .......... .......... 5.47M
census_df = spark.read.csv("dbfs:/tmp/co-est2019-alldata.csv", header=True, inferSchema=True)
# display()はDatabricksのみで使用できる関数です。show()のようにデータを表示しますが、上のSQLのセクションで見たようにビジュアライゼーションのオプションを提供しています。
display(census_df)
NYTデータにマッチするfipsカラムを追加できるように上のデータフレームを調整します。こちらがユーザー定義関数(UDF)に関するドキュメントです。
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def make_fips(state_code, county_code):
if len(str(county_code)) == 1:
return str(state_code) + "00" + str(county_code)
elif len(str(county_code)) == 2:
return str(state_code) + "0" + str(county_code)
else:
return str(state_code) + str(county_code)
make_fips_udf = udf(make_fips, StringType())
census_df = census_df.withColumn("fips", make_fips_udf(census_df.STATE, census_df.COUNTY))
同じカラムを持つ国勢調査データとCOVIDのデータの準備ができたので、2つのデータフレームをjoinしましょう。
covid_with_census = (covid_df
.na.drop(subset=["fips"])
.join(census_df.drop("COUNTY", "STATE"), on=['fips'], how='inner'))
最も人口の多い郡では感染者数はどのようになっているでしょうか?
display(covid_with_census.filter("POPESTIMATE2019 > 2000000").select("county", "cases", "date"))
# keys = date, grouping = county, values = cases
NYTデータセットは日毎に新しい行が追加されるので、感染者数は日毎に増加します。郡ごとの最新の数のみを取得しましょう。
- 以下では、カラムを参照するために
col
関数を使用しています。これはdf["column_name"]
と同じようなものです。 - 郡ごとの最新の行を取得するために、ウィンドウ関数を活用しています。
from pyspark.sql.functions import row_number, col
from pyspark.sql import Window
w = Window.partitionBy("fips").orderBy(col("date").desc())
current_covid_rates = (covid_with_census
.withColumn("row_num", row_number().over(w))
.filter(col("row_num") == 1)
.drop("row_num"))
感染者数を人口にスケールさせた場合、最も困難に直面した郡はどこでしょうか?
current_covid_rates = (current_covid_rates
.withColumn("case_rates_percent", 100*(col("cases")/col("POPESTIMATE2019")))
.sort(col("case_rates_percent").desc()))
# トップ10の郡を参照します
display(current_covid_rates.select("county", "state", "cases", "POPESTIMATE2019", "case_rates_percent").limit(10))
Pandas API on Spark
ここまではPySparkを用いてデータを処理してきましたが、PySparkの文法を覚えるのは大変...
という意見もあるかと思います。そこで、Pandas APIを利用しつつもSparkを操作できるAPIとしてPandas API on Spark(旧Koalas)があります。
- SparkにおけるPandas API
- Supported pandas API — PySpark 3.3.2 documentation
- pandasユーザーがPandas API on Sparkでつまづいたあれこれ - KAKEHASHI Tech Blog
import pyspark.pandas as ps
# Pandas on Sparkデータフレームに変換
psdf = current_covid_rates.pandas_api()
# pandasのお作法でカラムにアクセスします
psdf['state']
Out[20]: 0 Colorado
1 Georgia
2 Colorado
3 Arkansas
4 South Dakota
5 Tennessee
6 Kansas
7 South Dakota
8 South Dakota
9 Tennessee
psdf.head(2)
psdf.describe()
psdf.sort_values(by='deaths', ascending=False).head(10)
これで、ウェビナー「今さら聞けないPython」でカバーした内容は以上となります。