1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks で JSON 形式の文字列カラムの一部の値を更新する方法

Posted at

概要

Databricks で JSON 形式の文字列カラムの一部の値を更新する方法について説明します。このプロセスは、特定の JSON キーの値を変更する際に役立ちます。本記事の手順では、json_string_col というカラム名の JSON 形式の文字列データを含むテーブルの作成から始め、UPDATE 文と MERGE 文を使用して値を更新する方法を示します。

image.png

検証コードと実行結果

事前準備

まず、json_string_col というカラムを持つテーブルを作成します。このカラムには JSON 形式の文字列データが格納されます。

%sql
CREATE SCHEMA IF NOT EXISTS json_test;
CREATE OR REPLACE TABLE json_test.store_data AS 
SELECT
1 AS id,
'{
  "item_01":"abc",
  "item_02":"before"
}' as json_string_col
;
SELECT * FROM json_test.store_data;

image.png

1. Update 文により更新する方法

UPDATE 文を使用して JSON カラムの値を更新する方法を示します。Spark Dataframe 操作により、テーブルからデータを取得後に、from_json 関数にて Struct 型に変換後に変更したい箇所の値(item_02の値)を修正後に to_json 関数にて String 型に戻します。

# 更新対象の ID
tgt_id = 1

# 更新後の値
after_value = "updated"

src_df = spark.sql(f"""
WITH src (
  SELECT
    id,
    -- 更新対象の行を一意に識別するためのIDカラム
    from_json(
      json_string_col,
      -- JSONデータを含むカラム名
      'item_01 string,item_02 string'
    ) AS structured
  FROM
    json_test.store_data
)
SELECT
  id,
  to_json(
    named_struct(
      'item_01',structured.item_01,
      'item_02','{after_value}'
    )
  ) AS json_string_col
FROM
  src
WHERE
  id = {tgt_id};
""")
src_df.display()

image.png

作成したデータフラームの値を取得し、その値による Update 文の SQL を実行します。

row_dict = src_df.collect()[0].asDict()
print(row_dict)

update_sql = f"""
UPDATE json_test.store_data
  SET json_string_col = '{row_dict['json_string_col']}'
  WHERE id = {row_dict['id']}
"""
print(update_sql)
spark.sql(update_sql)

image.png

実行結果を確認します。

spark.table("json_test.store_data").display()

image.png

2. Merge 文により更新する方法

次に、MERGE 文を使用して JSON カラムの値を更新する別の方法を紹介します。Update 文により更新する方法と同様に、Spark Dataframe 操作により、テーブルからデータを取得後に、from_json 関数にて Struct 型に変換後に変更したい箇所の値(item_02の値)を修正後に to_json 関数にて String 型に戻します。

# 更新後の値
after_value = "updated_by_merge"

src_df = spark.sql(f"""
WITH src (
  SELECT
    id,
    -- 更新対象の行を一意に識別するためのIDカラム
    from_json(
      json_string_col,
      -- JSONデータを含むカラム名
      'item_01 string,item_02 string'
    ) AS structured
  FROM
    json_test.store_data
)
SELECT
  id,
  to_json(
    named_struct(
      'item_01',structured.item_01,
      'item_02','{after_value}'
    )
  ) AS json_string_col
FROM
  src
WHERE
  id = {tgt_id};
""")
src_df.display()

image.png

作成したデータフラームから一時ビューを作成して、Merge 文を実行します。

src_df.createOrReplaceTempView("_tmp_json_update")

spark.sql("""
MERGE INTO json_test.store_data AS tgt 
    USING _tmp_json_update AS src
    ON tgt.id = src.id
    WHEN MATCHED
        THEN UPDATE 
        SET json_string_col = src.json_string_col
""")

image.png

実行結果を確認します。

spark.table("json_test.store_data").display()

image.png

事後処理

最後に、作成したスキーマとテーブルを削除するためのコードを提供します。

spark.sql("DROP SCHEMA IF EXISTS json_test CASCADE")

image.png

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?