Spark SQLとは結局何か?ふと思い立ちました。
Spark SQL は、Spark での SQL のネイティブサポートを可能にし、RDD(Sparkの分散データセット)や外部ソースに保存されたデータのクエリ処理を効率化します。また、RDD をリレーショナルテーブルと同様に処理することが可能です。このような優れた抽象化の統合により、開発者は複雑な分析で外部データをクエリする SQL コマンドを、容易に、単一のアプリケーション内に混在させることができます。具体的には、Spark SQL で開発者は次のことができるようになります。
- Parquet ファイルや Hive テーブルからのリレーショナルデータのインポート
- インポートされたデータと既存の RDD に対する SQL クエリの実行
- Hive テーブルや Parquet ファイルへの容易な RDD の書き出し
わかるようでわからない。
こちらの記事で触れているSparkコアAPIのSQLがSpark SQL。ということは、大雑把に言えばSparkのSQL APIということ。
こちらにも書かれているように、Sparkに対してSQLで問い合わせを可能にしているモジュールとのこと。
results = spark.sql(
"SELECT * FROM people")
でも、そうすると気になるのは、上のSELECT文のテーブルpeople
がどこにあるのかをSparkは知っているのか?ということです。Sparkは通常ファイルの読み書きを行いますが、それはファイルパスを指定しているからです。テーブル名自体にはファイルパスという概念がないので、テーブルの実態がファイルだとしても、テーブルとファイルパスの関係性を管理しなくてはなりません。
それを解決しているのがメタストア。名前の通り、テーブルのメタ情報を格納してくれています。SparkのネイティブのメタストアはHiveメタストアなので、こちらを使ってテーブルを管理します。ちなみにDatabricksのメタストアはUnity Catalogで管理されています。
前振りが長くなりましたが、Spark SQLを実際に実行していきます。Databricksで実行していきます。Sparkをローカルで実行している場合、SparkSession
の取得やメタストアの設定等が必要ですが、Databricksの場合はお手軽にスタートできます。
以下のドキュメントを参考にして、Databricksに合わせて実行していきます。
テーブルの作成
PythonからSQLを実行する際には、SparkSession.sql
を使います。なお、DatabricksではSparkSessionは変数spark
に格納されているのでspark.sql
になります。
テーブルを作成する前にデフォルトのデータベースを選択します。Databricksではテーブルの名前空間が3レベルになっており、テーブルを指定する際にはカタログ.データベース(スキーマ).テーブル
となります。データベースを選択する際には上位の2レベルを指定します。
spark.sql("USE takaakiyayoi_catalog.spark_sql")
テーブルを作成します。CREATE TABLE
を使います。
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
カタログエクスプローラでテーブルを確認します。
テーブルへの挿入
INSERT
を使います。
spark.sql("INSERT INTO src(key, value) VALUES (238,'val_238'), (86,'val_86')")
テーブルの読み込み
SELECT
を使います。Sparkデータフレームが返却されるのでshow
で表示します。
spark.sql("SELECT * FROM src").show()
+---+-------+
|key| value|
+---+-------+
|238|val_238|
| 86| val_86|
+---+-------+
テーブルの削除
DROP TABLE
です。
spark.sql("DROP TABLE src")
ちょっと待ってください
なぜ、SQLなのにPySpark経由で呼び出しているのでしょうか?SparkSession
というエントリーポイントがあって初めてクエリーを実行できるので、PythonやScalaでエントリーポイントを作成してからという話は致し方ないとも言えます(詳しい方ツッコミあればお願いします)。
でも、Databricksならノートブックやクエリーエディタで直接SQLを実行できます。上のクエリーを直接SQLで実行していきましょう。上のクエリーはPythonノートブックで実行していたので、マジックコマンド%sql
を使用してデフォルト言語を切り替えます。
%sql
CREATE TABLE IF NOT EXISTS src (key INT, value STRING)
%sql
INSERT INTO src(key, value) VALUES (238,'val_238'), (86,'val_86')
%sql
SELECT * FROM src
なお、SELECT文を実行した際には、結果は自動で表示されます。
%sql
DROP TABLE src
ノートブックのデフォルト言語をSQLにすれば、マジックコマンド%sql
も不要です。
実際に動かしてみると、「DHWのようにSQLを駆使してテーブルを操作できる」ことがわかるかと思います。しかも、エンジンがSparkなので、大量データであっても高速にデータを操作することができます。Databricks上であれば、ノートブックやカタログエクスプローラーを活用して段階的な開発も容易となります。ぜひトライしてみてください!