Tutorial: Work with PySpark DataFrames on Databricks | Databricks on AWS [2022/10/7時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
本書では、DatabricksでApache Spark PythonデータフレームAPI(PySpark)を用いてどの様にデータをロードし、変換するのかを説明します。
Apache Spark PySpark API referenceもご覧ください。
データフレームとは?
データフレームは、異なるタイプを持つことがあるカラムを持つ2次元のラベル付きデータ構造です。データフレームをスプレッドシート、SQLテーブル、シリーズオブジェクトのディクショナリーのようなものと考えることができます。Apache Sparkデータフレームは、一般的なデータ分析問題を効率的に解決できる様に、豊富な機能セット(カラムの選択、フィルター、join、集計)を提供します。
Apache Sparkデータフレームは、耐障害性分散データセット(RDD)の上に構築された抽象化レイヤーです。SparkデータフレームとSpark SQLは統合されたプランニングと最適化エンジンを使用するので、Databricksでサポートされているすべての言語(Python、SQL、Scala、R)でほぼ同じパフォーマンスを得ることができます。
Pythonでデータフレームを作成する
ほとんどのApache Sparkのクエリーはデータフレームを返します。これには、テーブルの読み込み、ファイルからのデータロード、データを変換するオペレーションが含まれます。
また、以下のサンプルの様にlistやpandasデータフレームからSparkデータフレームを作成することができます。
import pandas as pd
data = [[1, "Elia"], [2, "Teo"], [3, "Fang"]]
pdf = pd.DataFrame(data, columns=["id", "name"])
df1 = spark.createDataFrame(pdf)
df2 = spark.createDataFrame(data, schema="id LONG, name STRING")
テーブルをデータフレームに読み込む
DatabricksではすべてのテーブルでデフォルトでDelta Lakeを使います。以下のサンプルの様に、容易にテーブルをデータフレームにロードすることができます。
spark.read.table("<catalog_name>.<schema_name>.<table_name>")
ファイルからデータフレームにロードする
サポートされている数多くのファイルフォーマットからデータをロードすることができます。以下のサンプルでは、多くのワークスペースでアクセスできる/databricks-datasets
ディレクトリにあるデータセットを使用しています。Databricksのサンプルデータをご覧ください。
df = (spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
変換ステップをデータフレームに割り当てる
Sparkの変換処理のほとんどの結果はデータフレームを返します。他のシステムでCTE(共通テーブル式)や一時ビュー、データフレームを使うのと同じ様に、データフレーム変数にこれらの結果を割り当てることができます。
joinやunionでデータフレームを結合する
データフレームのjoinオペレーションでは標準的なSQLセマンティクスを使用します。joinは、指定されたマッチ条件とjoinのタイプに基づいて2つのデータフレームの結果を結合したものを返します。以下のサンプルはデフォルトであるinner joinです。
joined_df = df1.join(df2, how="inner", on="id")
以下のサンプルの様に、unionオペレーションを用いてあるデータフレームを別のデータフレームに追加することができます。
unioned_df = df1.union(df2)
データフレームにおける行のフィルタリング
.filter()
や.where()
を用いてデータフレームの行をフィルタリングすることができます。以下のサンプルにある様にパフォーマンスや構文に違いはありません。
filtered_df = df.filter("id > 1")
filtered_df = df.where("id > 1")
データフレームで返却する行のサブセットを選択するか、編集するためにフィルタリングを使用します。
データフレームから列を選択する
以下のサンプルの様に.select()
に1つ以上の列名を指定することで列を選択することができます。
select_df = df.select("id", "name")
返却する行と列を限定するために、selectとフィルタリングのクエリーを組み合わせることができます。
subset_df = df.filter("id > 1").select("name")
データフレームを参照する
テーブル形式でこのデータを参照するには、以下のサンプルの様にDatabricksのdisplay()
コマンドを使うことができます。
display(df)
データスキーマを出力する
Sparkでは、データフレームのカラム名とカラムの型を参照するためにスキーマという用語を使います。
注意
また、Databricksではカタログに登録されたテーブルのコレクションを参照するためにスキーマという用語を使います。
以下のサンプルの様に、.printSchema()
メソッドを用いてスキーマを出力することができます。
df.printSchema()
テーブルにデータフレームを保存する
Databricksでは、デフォルトですべてのテーブルでDelta Lakeを使います。以下の様に、データフレームの中身をテーブルに保存することができます。
df.write.saveAsTable("<table_name>")
ファイルのコレクションにデータフレームを書き込む
ほとんどのSparkアプリケーションは大規模なデータセットを取り扱う様に設計されており、分散処理として動作しますので、Sparkは単一のファイルではなくファイルのディレクトリを書き出します。多くのデータシステムは、これらのファイルのディレクトリを読み込む様に設計されています。Databricksでは、多くのアプリケーションにおいてファイルパスよりもテーブルを用いることをお勧めしています。
df.write.format("json").save("/tmp/json_data")
PySparkでSQLクエリーを実行する
Sparkデータフレームは、PythonとSQLを組み合わせるための数多くのオプションを提供しています。
selectExpr()
メソッドを用いることで、以下の様にSQLクエリーとしてそれぞれのカラムを指定することができます。
display(df.selectExpr("id", "upper(name) as big_name"))
以下のサンプルの様に、カラムが指定される場所のどこでもSQL文法を使える様に、pyspark.sql.functions
からexpr()
関数をインポートすることができます。
from pyspark.sql.functions import expr
display(df.select("id", expr("lower(name) as little_name")))
また、以下の様にPythonカーネルで任意のSQLクエリーを実行するためにspark.sql()
を使うことができます。
query_df = spark.sql("SELECT * FROM <table_name>)
ロジックはPythonカーネルで実行され、すべてのSQLクエリーは文字列として渡されるので、以下の様にSQLクエリーをパラメーター化するためにPythonのフォーマッティングを使うことができます。
table_name = "my_table"
query_df = spark.sql(f"SELECT * FROM {table_name}")