2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

チュートリアル:DatabricksでPySparkデータフレームを操作する

Posted at

Tutorial: Work with PySpark DataFrames on Databricks | Databricks on AWS [2022/10/7時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

本書では、DatabricksでApache Spark PythonデータフレームAPI(PySpark)を用いてどの様にデータをロードし、変換するのかを説明します。

Apache Spark PySpark API referenceもご覧ください。

データフレームとは?

データフレームは、異なるタイプを持つことがあるカラムを持つ2次元のラベル付きデータ構造です。データフレームをスプレッドシート、SQLテーブル、シリーズオブジェクトのディクショナリーのようなものと考えることができます。Apache Sparkデータフレームは、一般的なデータ分析問題を効率的に解決できる様に、豊富な機能セット(カラムの選択、フィルター、join、集計)を提供します。

Apache Sparkデータフレームは、耐障害性分散データセット(RDD)の上に構築された抽象化レイヤーです。SparkデータフレームとSpark SQLは統合されたプランニングと最適化エンジンを使用するので、Databricksでサポートされているすべての言語(Python、SQL、Scala、R)でほぼ同じパフォーマンスを得ることができます。

Pythonでデータフレームを作成する

ほとんどのApache Sparkのクエリーはデータフレームを返します。これには、テーブルの読み込み、ファイルからのデータロード、データを変換するオペレーションが含まれます。

また、以下のサンプルの様にlistやpandasデータフレームからSparkデータフレームを作成することができます。

Python
import pandas as pd

data = [[1, "Elia"], [2, "Teo"], [3, "Fang"]]

pdf = pd.DataFrame(data, columns=["id", "name"])

df1 = spark.createDataFrame(pdf)
df2 = spark.createDataFrame(data, schema="id LONG, name STRING")

テーブルをデータフレームに読み込む

DatabricksではすべてのテーブルでデフォルトでDelta Lakeを使います。以下のサンプルの様に、容易にテーブルをデータフレームにロードすることができます。

Python
spark.read.table("<catalog_name>.<schema_name>.<table_name>")

ファイルからデータフレームにロードする

サポートされている数多くのファイルフォーマットからデータをロードすることができます。以下のサンプルでは、多くのワークスペースでアクセスできる/databricks-datasetsディレクトリにあるデータセットを使用しています。Databricksのサンプルデータをご覧ください。

Python
df = (spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)

変換ステップをデータフレームに割り当てる

Sparkの変換処理のほとんどの結果はデータフレームを返します。他のシステムでCTE(共通テーブル式)や一時ビュー、データフレームを使うのと同じ様に、データフレーム変数にこれらの結果を割り当てることができます。

joinやunionでデータフレームを結合する

データフレームのjoinオペレーションでは標準的なSQLセマンティクスを使用します。joinは、指定されたマッチ条件とjoinのタイプに基づいて2つのデータフレームの結果を結合したものを返します。以下のサンプルはデフォルトであるinner joinです。

Python
joined_df = df1.join(df2, how="inner", on="id")

以下のサンプルの様に、unionオペレーションを用いてあるデータフレームを別のデータフレームに追加することができます。

Python
unioned_df = df1.union(df2)

データフレームにおける行のフィルタリング

.filter().where()を用いてデータフレームの行をフィルタリングすることができます。以下のサンプルにある様にパフォーマンスや構文に違いはありません。

Python
filtered_df = df.filter("id > 1")

filtered_df = df.where("id > 1")

データフレームで返却する行のサブセットを選択するか、編集するためにフィルタリングを使用します。

データフレームから列を選択する

以下のサンプルの様に.select()に1つ以上の列名を指定することで列を選択することができます。

Python
select_df = df.select("id", "name")

返却する行と列を限定するために、selectとフィルタリングのクエリーを組み合わせることができます。

Python
subset_df = df.filter("id > 1").select("name")

データフレームを参照する

テーブル形式でこのデータを参照するには、以下のサンプルの様にDatabricksのdisplay()コマンドを使うことができます。

Python
display(df)

データスキーマを出力する

Sparkでは、データフレームのカラム名とカラムの型を参照するためにスキーマという用語を使います。

注意
また、Databricksではカタログに登録されたテーブルのコレクションを参照するためにスキーマという用語を使います。

以下のサンプルの様に、.printSchema()メソッドを用いてスキーマを出力することができます。

Python
df.printSchema()

テーブルにデータフレームを保存する

Databricksでは、デフォルトですべてのテーブルでDelta Lakeを使います。以下の様に、データフレームの中身をテーブルに保存することができます。

Python
df.write.saveAsTable("<table_name>")

ファイルのコレクションにデータフレームを書き込む

ほとんどのSparkアプリケーションは大規模なデータセットを取り扱う様に設計されており、分散処理として動作しますので、Sparkは単一のファイルではなくファイルのディレクトリを書き出します。多くのデータシステムは、これらのファイルのディレクトリを読み込む様に設計されています。Databricksでは、多くのアプリケーションにおいてファイルパスよりもテーブルを用いることをお勧めしています。

Python
df.write.format("json").save("/tmp/json_data")

PySparkでSQLクエリーを実行する

Sparkデータフレームは、PythonとSQLを組み合わせるための数多くのオプションを提供しています。

selectExpr()メソッドを用いることで、以下の様にSQLクエリーとしてそれぞれのカラムを指定することができます。

Python
display(df.selectExpr("id", "upper(name) as big_name"))

以下のサンプルの様に、カラムが指定される場所のどこでもSQL文法を使える様に、pyspark.sql.functionsからexpr()関数をインポートすることができます。

Python
from pyspark.sql.functions import expr

display(df.select("id", expr("lower(name) as little_name")))

また、以下の様にPythonカーネルで任意のSQLクエリーを実行するためにspark.sql()を使うことができます。

Python
query_df = spark.sql("SELECT * FROM <table_name>)

ロジックはPythonカーネルで実行され、すべてのSQLクエリーは文字列として渡されるので、以下の様にSQLクエリーをパラメーター化するためにPythonのフォーマッティングを使うことができます。

Python
table_name = "my_table"

query_df = spark.sql(f"SELECT * FROM {table_name}")

Databricks 無料トライアル

Databricks 無料トライアル

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?