2
0

実践を通じて学ぶSpark SQL

Last updated at Posted at 2024-02-26

Spark SQLとは結局何か?ふと思い立ちました。

Spark SQL は、Spark での SQL のネイティブサポートを可能にし、RDD(Sparkの分散データセット)や外部ソースに保存されたデータのクエリ処理を効率化します。また、RDD をリレーショナルテーブルと同様に処理することが可能です。このような優れた抽象化の統合により、開発者は複雑な分析で外部データをクエリする SQL コマンドを、容易に、単一のアプリケーション内に混在させることができます。具体的には、Spark SQL で開発者は次のことができるようになります。

  • Parquet ファイルや Hive テーブルからのリレーショナルデータのインポート
  • インポートされたデータと既存の RDD に対する SQL クエリの実行
  • Hive テーブルや Parquet ファイルへの容易な RDD の書き出し

わかるようでわからない。

こちらの記事で触れているSparkコアAPIのSQLがSpark SQL。ということは、大雑把に言えばSparkのSQL APIということ。

Screenshot 2023-08-16 at 13.04.12.png

こちらにも書かれているように、Sparkに対してSQLで問い合わせを可能にしているモジュールとのこと。

results = spark.sql(
  "SELECT * FROM people")

でも、そうすると気になるのは、上のSELECT文のテーブルpeopleがどこにあるのかをSparkは知っているのか?ということです。Sparkは通常ファイルの読み書きを行いますが、それはファイルパスを指定しているからです。テーブル名自体にはファイルパスという概念がないので、テーブルの実態がファイルだとしても、テーブルとファイルパスの関係性を管理しなくてはなりません。

それを解決しているのがメタストア。名前の通り、テーブルのメタ情報を格納してくれています。SparkのネイティブのメタストアはHiveメタストアなので、こちらを使ってテーブルを管理します。ちなみにDatabricksのメタストアはUnity Catalogで管理されています。

前振りが長くなりましたが、Spark SQLを実際に実行していきます。Databricksで実行していきます。Sparkをローカルで実行している場合、SparkSessionの取得やメタストアの設定等が必要ですが、Databricksの場合はお手軽にスタートできます。

以下のドキュメントを参考にして、Databricksに合わせて実行していきます。

テーブルの作成

PythonからSQLを実行する際には、SparkSession.sqlを使います。なお、DatabricksではSparkSessionは変数sparkに格納されているのでspark.sqlになります。

テーブルを作成する前にデフォルトのデータベースを選択します。Databricksではテーブルの名前空間が3レベルになっており、テーブルを指定する際にはカタログ.データベース(スキーマ).テーブルとなります。データベースを選択する際には上位の2レベルを指定します。

Python
spark.sql("USE takaakiyayoi_catalog.spark_sql")

テーブルを作成します。CREATE TABLEを使います。

Python
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")

カタログエクスプローラでテーブルを確認します。
Screenshot 2024-02-26 at 21.25.27.png

テーブルへの挿入

INSERTを使います。

Python
spark.sql("INSERT INTO src(key, value) VALUES (238,'val_238'), (86,'val_86')")

データがインサートされました。
Screenshot 2024-02-26 at 21.30.24.png

テーブルの読み込み

SELECTを使います。Sparkデータフレームが返却されるのでshowで表示します。

Python
spark.sql("SELECT * FROM src").show()
+---+-------+
|key|  value|
+---+-------+
|238|val_238|
| 86| val_86|
+---+-------+

テーブルの削除

DROP TABLEです。

Python
spark.sql("DROP TABLE src")

ちょっと待ってください

なぜ、SQLなのにPySpark経由で呼び出しているのでしょうか?SparkSessionというエントリーポイントがあって初めてクエリーを実行できるので、PythonやScalaでエントリーポイントを作成してからという話は致し方ないとも言えます(詳しい方ツッコミあればお願いします)。

でも、Databricksならノートブックやクエリーエディタで直接SQLを実行できます。上のクエリーを直接SQLで実行していきましょう。上のクエリーはPythonノートブックで実行していたので、マジックコマンド%sqlを使用してデフォルト言語を切り替えます。

SQL
%sql
CREATE TABLE IF NOT EXISTS src (key INT, value STRING)

テーブルが作成されました。
Screenshot 2024-02-26 at 21.38.52.png

SQL
%sql
INSERT INTO src(key, value) VALUES (238,'val_238'), (86,'val_86')

Screenshot 2024-02-26 at 21.40.05.png

SQL
%sql
SELECT * FROM src

なお、SELECT文を実行した際には、結果は自動で表示されます。
Screenshot 2024-02-26 at 21.40.46.png

SQL
%sql
DROP TABLE src

ノートブックのデフォルト言語をSQLにすれば、マジックコマンド%sqlも不要です。

実際に動かしてみると、「DHWのようにSQLを駆使してテーブルを操作できる」ことがわかるかと思います。しかも、エンジンがSparkなので、大量データであっても高速にデータを操作することができます。Databricks上であれば、ノートブックやカタログエクスプローラーを活用して段階的な開発も容易となります。ぜひトライしてみてください!

Screenshot 2024-02-27 at 7.47.36.png

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0