概要
Databricks で JSON 形式の文字列カラムの一部の値を更新する方法について説明します。このプロセスは、特定の JSON キーの値を変更する際に役立ちます。本記事の手順では、json_string_col
というカラム名の JSON 形式の文字列データを含むテーブルの作成から始め、UPDATE
文と MERGE
文を使用して値を更新する方法を示します。
検証コードと実行結果
事前準備
まず、json_string_col
というカラムを持つテーブルを作成します。このカラムには JSON 形式の文字列データが格納されます。
%sql
CREATE SCHEMA IF NOT EXISTS json_test;
CREATE OR REPLACE TABLE json_test.store_data AS
SELECT
1 AS id,
'{
"item_01":"abc",
"item_02":"before"
}' as json_string_col
;
SELECT * FROM json_test.store_data;
1. Update 文により更新する方法
UPDATE
文を使用して JSON カラムの値を更新する方法を示します。Spark Dataframe 操作により、テーブルからデータを取得後に、from_json 関数にて Struct 型に変換後に変更したい箇所の値(item_02
の値)を修正後に to_json 関数にて String 型に戻します。
# 更新対象の ID
tgt_id = 1
# 更新後の値
after_value = "updated"
src_df = spark.sql(f"""
WITH src (
SELECT
id,
-- 更新対象の行を一意に識別するためのIDカラム
from_json(
json_string_col,
-- JSONデータを含むカラム名
'item_01 string,item_02 string'
) AS structured
FROM
json_test.store_data
)
SELECT
id,
to_json(
named_struct(
'item_01',structured.item_01,
'item_02','{after_value}'
)
) AS json_string_col
FROM
src
WHERE
id = {tgt_id};
""")
src_df.display()
作成したデータフラームの値を取得し、その値による Update 文の SQL を実行します。
row_dict = src_df.collect()[0].asDict()
print(row_dict)
update_sql = f"""
UPDATE json_test.store_data
SET json_string_col = '{row_dict['json_string_col']}'
WHERE id = {row_dict['id']}
"""
print(update_sql)
spark.sql(update_sql)
実行結果を確認します。
spark.table("json_test.store_data").display()
2. Merge 文により更新する方法
次に、MERGE
文を使用して JSON カラムの値を更新する別の方法を紹介します。Update 文により更新する方法と同様に、Spark Dataframe 操作により、テーブルからデータを取得後に、from_json 関数にて Struct 型に変換後に変更したい箇所の値(item_02
の値)を修正後に to_json 関数にて String 型に戻します。
# 更新後の値
after_value = "updated_by_merge"
src_df = spark.sql(f"""
WITH src (
SELECT
id,
-- 更新対象の行を一意に識別するためのIDカラム
from_json(
json_string_col,
-- JSONデータを含むカラム名
'item_01 string,item_02 string'
) AS structured
FROM
json_test.store_data
)
SELECT
id,
to_json(
named_struct(
'item_01',structured.item_01,
'item_02','{after_value}'
)
) AS json_string_col
FROM
src
WHERE
id = {tgt_id};
""")
src_df.display()
作成したデータフラームから一時ビューを作成して、Merge 文を実行します。
src_df.createOrReplaceTempView("_tmp_json_update")
spark.sql("""
MERGE INTO json_test.store_data AS tgt
USING _tmp_json_update AS src
ON tgt.id = src.id
WHEN MATCHED
THEN UPDATE
SET json_string_col = src.json_string_col
""")
実行結果を確認します。
spark.table("json_test.store_data").display()
事後処理
最後に、作成したスキーマとテーブルを削除するためのコードを提供します。
spark.sql("DROP SCHEMA IF EXISTS json_test CASCADE")