1
0

PySparkにおけるクエリーのパラメーター化

Posted at

Spark 3.4、Databricksランタイム12.1からパラメータ化(parameterize)がサポートされていたとは。これまでは、クエリーにパラメータを埋め込むには、spark.sqlとf-stringを使ってました。以下のブログで紹介されている内容をウォークスルーします。

以下の二つの方法がサポートされています。

  1. PySparkのカスタム文字列フォーマッティング
    Pythonのformatで使用されているのと同じ構文です。
  2. パラメーターマーカー
    名前付きのマーカーと名前なしのマーカーがサポートされています。

以下ではDatabricksランタイム14.2を使っています。

PySparkのカスタム文字列フォーマッティング

ここでは以下のようなCOVID-19感染者数のテーブルを使います。

SQL
%sql
SELECT * FROM takaakiyayoi_catalog.japan_covid_analysis.covid_cases;

Screenshot 2024-01-04 at 14.18.28.png

東京都の感染者数の合計をとります。

SQL
%sql
SELECT Prefecture, SUM(Cases) AS CasesTotal 
FROM takaakiyayoi_catalog.japan_covid_analysis.covid_cases 
WHERE Prefecture = "Tokyo"
GROUP BY Prefecture;

Screenshot 2024-01-04 at 14.19.29.png

こちらをパラメータ化して他の都道府県を指定できるようにします。

Python
query = """SELECT Prefecture, SUM(Cases) AS CasesTotal 
FROM takaakiyayoi_catalog.japan_covid_analysis.covid_cases 
WHERE Prefecture = {Pref_val}
GROUP BY Prefecture;"""

spark.sql(query, Pref_val="Tokyo").show()

上と同じ結果が返ってきます。

+----------+----------+
|Prefecture|CasesTotal|
+----------+----------+
|     Tokyo|   2187185|
+----------+----------+

千葉県のデータを見てみます。

Python
spark.sql(query, Pref_val="Chiba").show()

これは便利。

+----------+----------+
|Prefecture|CasesTotal|
+----------+----------+
|     Chiba|    617162|
+----------+----------+

これまでは、PySparkでSQLを使うにはデータがテーブルに永続化されているか、一時ビューに登録する必要がありましたが、この仕組みを使うと一時ビューの作成が不要になります。これも便利ですね。

手元にデータフレームが無いので、ここではテーブルからあえてデータフレームを作ります。

Python
covid_df = spark.table("takaakiyayoi_catalog.japan_covid_analysis.covid_cases")
Python
spark.sql(
    "select Prefecture, count(*) as num_pref from {covid_df} group by Prefecture",
    covid_df=covid_df,
).show()

データフレームにクイックにSQLを発行したい場合には有効かと。

+----------+--------+
|Prefecture|num_pref|
+----------+--------+
|     Aichi|     226|
|    Toyama|     226|
|     Tokyo|     226|
|    Nagano|     226|
|     Fukui|     226|
|       Mie|     226|
|     Chiba|     226|
|  Shizuoka|     226|
|      Gifu|     226|
| Yamanashi|     226|
|  Ishikawa|     226|
|  Kanagawa|     226|
|   Niigata|     226|
|     Kyoto|     226|
|  Wakayama|     226|
|     Shiga|     226|
|      Nara|     226|
|   Okayama|     226|
|   Tottori|     226|
| Yamaguchi|     226|
+----------+--------+
only showing top 20 rows

パラメーターマーカー

名前付きパラメーターマーカーの例です。

Python
query = "SELECT Prefecture, sum(Cases) from takaakiyayoi_catalog.japan_covid_analysis.covid_cases group by Prefecture having Prefecture = :Prefecture"	
Python
spark.sql(
    query,
    args={"Prefecture": "Osaka"},
).show()
+----------+----------+
|Prefecture|sum(Cases)|
+----------+----------+
|     Osaka|   1380847|
+----------+----------+

カスタム文字列フォーマッティングとパラメーターマーカーの違い

元記事にも記載がありますが、以下の違いがあります。

  • カスタム文字列フォーマッティング: {}構文を用いたクライアントサイドのパラメーター化
  • パラメーターマーカー: 名前付きあるいは名前なしのパラメーターマーカーを用いたサーバーサイドのパラメーター化

使いやすさの改善やプログラム可能性を改善するために、カスタム文字列フォーマッティングではクライアント側でSQLクエリーの文字列置換を行います。しかし、Sparkサーバーに送信する前にクエリー文字列の置換を行うためSQLインジェクション攻撃を防御することができません。

sql() APIのargs引数を用いたパラメータ化(パラメーターマーカー)では、SQLテキストとパラメーターを別々にしてサーバーに送信します。SQLテキストはパラメーターのプレースホルダーを用いてSQLテキストが解析され、分析されたクエリーツリーの中でargsで指定されたパラメータの値を置き換えます。

パラメーターマーカーのハイレベルの動作は以下のようになります。

  • SQLクエリーはオプションのキー/バリューのパラメーターリストと共にサーバーに到着します。
  • Apache SparkはSQLクエリーを解析し、パラメーターへのリファレンスを対応する解析ツリーのノードで置き換えます。
  • 解析においては、パラメーターのリファレンスを指定されたパラメーターの値で置き換えるためにCatalystのルールが実行されます。
  • このアプローチは、リテラル値のみをサポートしていることからSQLインジェクションの攻撃を防御することになります。一方で、通常の文字列の内挿(f-stringのアプローチ)においては、SQL文字列の置き換えが行われます。文字列に意図しているリテラル値ではなくSQL構文が含まれている場合には、この戦略は攻撃に脆弱になる可能性があります。

一時ビューの作成が不要、かつSQLインジェクションもガードできるパラメーターマーカー、是非ご活用ください!

Databricksクイックスタートガイド

Databricksクイックスタートガイド

Databricks無料トライアル

Databricks無料トライアル

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0