Spark 3.4、Databricksランタイム12.1からパラメータ化(parameterize)がサポートされていたとは。これまでは、クエリーにパラメータを埋め込むには、spark.sql
とf-stringを使ってました。以下のブログで紹介されている内容をウォークスルーします。
以下の二つの方法がサポートされています。
- PySparkのカスタム文字列フォーマッティング
Pythonのformat
で使用されているのと同じ構文です。 -
パラメーターマーカー
名前付きのマーカーと名前なしのマーカーがサポートされています。
以下ではDatabricksランタイム14.2を使っています。
PySparkのカスタム文字列フォーマッティング
ここでは以下のようなCOVID-19感染者数のテーブルを使います。
%sql
SELECT * FROM takaakiyayoi_catalog.japan_covid_analysis.covid_cases;
東京都の感染者数の合計をとります。
%sql
SELECT Prefecture, SUM(Cases) AS CasesTotal
FROM takaakiyayoi_catalog.japan_covid_analysis.covid_cases
WHERE Prefecture = "Tokyo"
GROUP BY Prefecture;
こちらをパラメータ化して他の都道府県を指定できるようにします。
query = """SELECT Prefecture, SUM(Cases) AS CasesTotal
FROM takaakiyayoi_catalog.japan_covid_analysis.covid_cases
WHERE Prefecture = {Pref_val}
GROUP BY Prefecture;"""
spark.sql(query, Pref_val="Tokyo").show()
上と同じ結果が返ってきます。
+----------+----------+
|Prefecture|CasesTotal|
+----------+----------+
| Tokyo| 2187185|
+----------+----------+
千葉県のデータを見てみます。
spark.sql(query, Pref_val="Chiba").show()
これは便利。
+----------+----------+
|Prefecture|CasesTotal|
+----------+----------+
| Chiba| 617162|
+----------+----------+
これまでは、PySparkでSQLを使うにはデータがテーブルに永続化されているか、一時ビューに登録する必要がありましたが、この仕組みを使うと一時ビューの作成が不要になります。これも便利ですね。
手元にデータフレームが無いので、ここではテーブルからあえてデータフレームを作ります。
covid_df = spark.table("takaakiyayoi_catalog.japan_covid_analysis.covid_cases")
spark.sql(
"select Prefecture, count(*) as num_pref from {covid_df} group by Prefecture",
covid_df=covid_df,
).show()
データフレームにクイックにSQLを発行したい場合には有効かと。
+----------+--------+
|Prefecture|num_pref|
+----------+--------+
| Aichi| 226|
| Toyama| 226|
| Tokyo| 226|
| Nagano| 226|
| Fukui| 226|
| Mie| 226|
| Chiba| 226|
| Shizuoka| 226|
| Gifu| 226|
| Yamanashi| 226|
| Ishikawa| 226|
| Kanagawa| 226|
| Niigata| 226|
| Kyoto| 226|
| Wakayama| 226|
| Shiga| 226|
| Nara| 226|
| Okayama| 226|
| Tottori| 226|
| Yamaguchi| 226|
+----------+--------+
only showing top 20 rows
パラメーターマーカー
名前付きパラメーターマーカーの例です。
query = "SELECT Prefecture, sum(Cases) from takaakiyayoi_catalog.japan_covid_analysis.covid_cases group by Prefecture having Prefecture = :Prefecture"
spark.sql(
query,
args={"Prefecture": "Osaka"},
).show()
+----------+----------+
|Prefecture|sum(Cases)|
+----------+----------+
| Osaka| 1380847|
+----------+----------+
カスタム文字列フォーマッティングとパラメーターマーカーの違い
元記事にも記載がありますが、以下の違いがあります。
- カスタム文字列フォーマッティング:
{}
構文を用いたクライアントサイドのパラメーター化 - パラメーターマーカー: 名前付きあるいは名前なしのパラメーターマーカーを用いたサーバーサイドのパラメーター化
使いやすさの改善やプログラム可能性を改善するために、カスタム文字列フォーマッティングではクライアント側でSQLクエリーの文字列置換を行います。しかし、Sparkサーバーに送信する前にクエリー文字列の置換を行うためSQLインジェクション攻撃を防御することができません。
sql()
APIのargs
引数を用いたパラメータ化(パラメーターマーカー)では、SQLテキストとパラメーターを別々にしてサーバーに送信します。SQLテキストはパラメーターのプレースホルダーを用いてSQLテキストが解析され、分析されたクエリーツリーの中でargs
で指定されたパラメータの値を置き換えます。
パラメーターマーカーのハイレベルの動作は以下のようになります。
- SQLクエリーはオプションのキー/バリューのパラメーターリストと共にサーバーに到着します。
- Apache SparkはSQLクエリーを解析し、パラメーターへのリファレンスを対応する解析ツリーのノードで置き換えます。
- 解析においては、パラメーターのリファレンスを指定されたパラメーターの値で置き換えるためにCatalystのルールが実行されます。
- このアプローチは、リテラル値のみをサポートしていることからSQLインジェクションの攻撃を防御することになります。一方で、通常の文字列の内挿(f-stringのアプローチ)においては、SQL文字列の置き換えが行われます。文字列に意図しているリテラル値ではなくSQL構文が含まれている場合には、この戦略は攻撃に脆弱になる可能性があります。
一時ビューの作成が不要、かつSQLインジェクションもガードできるパラメーターマーカー、是非ご活用ください!