Delta LakeとSQLを用いてプロのように複雑なJSONデータを登録、クエリーする
**JSONのような順構造化データの登録、クエリーは面倒で時間がかかるものですが、オートローダーとDelta Lake**がその作業をシンプルにします。JSONデータは非常に柔軟でパワフルですが、登録、クエリーは困難です。
JSONファイルの取り扱いにおける課題
- 登録するJSONファイルのスキーマの定義が面倒かつ脆いプロセスとなります。
- 時間と共にスキーマは変化し、自動でこれらの変化を取り扱えるようにする必要があります。
- ソフトウェアは常にお使いのデータから適切なスキーマを抽出するとは限らず、適切なフォーマットに関するヒントを指定する必要があるかもしれません。例えば、数字の32はintegerともlongとも解釈される場合があります。
- **多くの場合、データエンジニアには上流で準構造化データを生成するデータソースをコントロールできません。**例えば、カラム名には大文字小文字がありますが、同じカラムを示していますし、ときにはデータ型が変更されますが、すでにDelta Lakeに取り込まれているデータを更新したいとは思わないでしょう。
- **事前にJSONドキュメントをフラットにし、全てのカラムを抽出する作業をしたいとは思わないでしょう。**そして、この作業によってデータは使いにくいものになってしまいます。
- **SQLによる準構造化データのクエリーは困難です。**理解しやすい方法でデータに対するクエリーを実行できるようにする必要があります。
この記事と添付のノートブック(Databricksランタイム9.1以降が必要です)では、Databricksレイクハウスにおける大規模JSONの取り扱いをシンプルにするために、どのようなビルトインの機能が提供されているのかをご説明します。以下に、インクリメンタルなETLアーキテクチャを示します。左側は、継続的、スケジュールされたデータ取り込みを示しており、オートローダーでどのように両方のタイプの取り込みを行うのかを議論します。JSONファイルがブロンズDelta Lakeテーブルに取り込まれた後に、JSONデータでは一般的な複雑かつ準構造化のデータタイプに対するクエリーを簡単にする機能を議論します。
添付のノートブックでは、どれだけJSONの取り込みが簡単にできるのかをデモするためにセールスオーダーのデータを使用しました。ネストされたJSONセールスオーダーのデータセットはすぐに複雑なものになります。
オートローダーによる手間いらずのJSON取り込み
オートローダーは、オブジェクトストレージ(S3、ADLS、GCS)のフォルダーからDelta Lakeテーブルに新規データを取り込むためのPython、Scalaのインタフェースを提供します。オブジェクトストレージからの継続あるいはスケジュール処理によって、直接Delta Lakeテーブルへのデータ取り込みを可能とすることで、オートローダーはデータの取り込みを簡単かつ手間いらずにします。
オートローダーの一般的な機能を議論する前に、JSONの取り込みを劇的にシンプルにする機能を見ていきましょう。以下に、非常に複雑なJSONデータの取り込む処理が以下に簡単なものであるかのサンプルを示します。
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.schemaLocation", schemaLocation) \
.option("cloudFiles.format", "json") \
.option("cloudFiles.inferColumnTypes", "true") \
.option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
.option("cloudFiles.schemaHints", schemaHints) \
.load(landingZoneLocation)
スキーマ定義の柔軟性および定義のしやすさ: 上の例では、問題のあるデータに対するガードレールを維持しつつも、スキーマを容易に定義することができる2つの機能を使用しました。2つの機能とはcloudFiles.inferColumnTypes
(パワフルな機能その1:InferColumnTypes)とcloudFiles.schemaHints
(パワフルな機能その2:Schema Hints)です。詳細な定義を見ていきましょう。
-
cloudFiles.inferColumnTypes
は、スキーマの推定プロセスにおいて、string、integer、long、floatのようなデータタイプを推定するメカニズムをオン/オフするオプションです。多くの場合、スキーマエボリューションの堅牢性のために、トップレベルのカラムを文字列として保持し、スキーマエボリューションのプロセスで数値タイプのミスマッチ(integer、long、float)を回避する方が良いため、デフォルトではcloudFiles.inferColumnTypes
はfalseとなっています。 -
cloudFiles.schemaHints
はいくつかのカラムに期待するデータタイプを指定するオプションであり、スキーマ推定プロセスにおける「スキーマヒント」として知られているものです。スキーマヒントは、オートローダーにスキーマが指定されていない場合にのみ使用されます。詳細はこちらをご覧ください。
このユースケース(ノートブック)では、オートローダーのデフォルト推定である文字列型ではなく、カラムと複雑なデータタイプを推定するようにしたかったので、cloudFiles.inferColumnTypes
をtrueに設定しています。多くのカラムを推定することで、この複雑なJSONに正確性を付け加えることができ、後ほどのクエリーで柔軟性を確保することができます。さらに、カラムの片推定は非常に便利ですが、取り込むカラムに問題があることも知っています。ここで、cloudFiles.inferColumnTypes
と組み合わせることでcloudFiles.schemaHints
が役立ちます。2つのオプションを組み合わせることで、多くのカラムの複雑なデータ型を推定できることに加え、2つのみのカラムのデータ型(string)を指定することができます。
問題のある2つのカラムを見ていきましょう。このノートブックで使用した準構造化のJSONデータから、問題のある2つのカラム「ordered_products.element.promotion_info」と「clicked_items」を特定しました。ここでは、これらはstringではなくてはならないというヒントを与えています(上のカラム「ordered_products.element.promotion_info」のデータスニペットをご覧ください)。これらのカラムに対しては、後ほど議論しますが、SQLを用いて容易に準構造化のJSONにクエリーを実行することができます。配列内のネストされたカラムに対するヒントを確認することができます。この機能は複雑なスキーマに対して本当に役立つものとなります!
時間と共に変化するスキーマの取り扱いは、データ取り込みの回復性を高めます: スキーマ推定と同じように、オートローダーにおけるスキーマエボリューション(パワフルな機能その3)の実装は簡単です。しなくてはいけないことは、スキーマを保存するためのオブジェクトストレージ上の場所をcloudFiles.schemaLocation
に指定することであり、時間の経過に伴いスキーマエボリューションは調整を行います。より詳細に言うと、取り込むデータのスキーマが変化した際にスキーマエボリューションが実行され、Delta Lakeのスキーマもそれに合わせて変更されます。
例えば、添付ノートブックではオートローダーでデータが取り込まれた際にfulfillment_days
が追加されています。このカラムはオートローダーによって識別され、自動でDelta Lakeテーブルに適用されます。ドキュメントに従って、好きになようにスキーマエボリューションのモードを変更することができます。オートローダーのcloudFiles.schemaEvolutionMode
でサポートされているオプションを簡単に説明します。
-
addNewColumns
: オートローダーにスキーマが指定されていない場合のデフォルトのモードです。新規カラムはスキーマに追加されます。既存カラムのデータ型は進化しません。 -
failOnNewColumns
: オートローダーが新規カラムを検知した場合、ストリームは失敗します。指定されたスキーマが更新されない、あるいは、問題のあるデータファイルが削除されない限り再開しません。 -
rescue
: ストリームは一番最初に推定した、あるいは指定されたスキーマで実行されます。あらゆるデータ型の変更や新規カラムは、ストリームのスキーマの_rescued_data
にレスキューされたデータとして自動で保存されます。このモードにおいては、お使いのストリームはスキーマの変更で処理を失敗させません。 -
none
: オートローダーにスキーマが指定された際のデフォルトのモードです。スキーマは進化しません。新規カラムは無視され、オプションとして別途レスキューデータカラムが指定されない限り、データはレスキューされません。
上の例ではスキーマを指定していないので、スキーマエボリューションに対応するように、readStream
にデフォルトオプション.option(“cloudFiles.schemaEvolutionMode”, “addNewColumns”)
を使用しています。
追加カラムの悪いデータは捕捉されるので、何も失われません: レスキューデータカラム(パワフルな機能その4)にはパースされないすべてのデータが保持されるので、ETLの過程でデータを失うことは決してありません。データが現在のスキーマに準拠しておらず、要求されるカラムに入らない場合、レスキューデータカラムによってデータは失われることはありません。このユースケース(ノートブック)では、我々はこのオプションを使いませんでした。このオプションを有効にするには、.option(“cloudFiles.schemaEvolutionMode”, “rescue”)
の設定を行います。詳細はこちらをご覧ください。
ここまでで、JSONデータを有効活用し、最初に述べた課題に対応するオートローダーの機能を探索してきました。次に、データ取り込みの手間を軽減するいくつかの機能を見ていきましょう。
df.writeStream \
.format("delta") \
.trigger(once=True) \
.option("mergeSchema", "true") \
.option("checkpointLocation", bronzeCheckPointLocation) \
.start(bronzeTableLocation)
継続的取り込み vs. スケジュールされた取り込み: オートローダーはApache Spark™の構造化ストリーミングのソースですが、継続的に動作する必要はありません。代わりに、すべてのファイルの取り込みが完了した際に自身をオフにするスケジュールジョブになるように、trigger onceオプション(パワフルな機能その5)を使用することができます。これは、継続的な取り込み処理が不要の場合に役立ちます。さらに、コードを変更することなしに、定期的なスケジュール処理から、最終的な継続的取り込み処理に移行することができます。DBR 10.1以降で、我々はtrigger onceと同じデータ処理セマンティクスを提供しつつも、大規模データにスケールできるようにするためのレート制限を行うTrigger.AvailableNow
を導入しました。
ステートの取り扱い: プロセスが停止された状態から再開する際に必要となる情報がステートです。例えば、オートローダーにおいては、すでに取り込まれた一連のファイルがステートに含まれます。意図的あるいは処理の失敗に関係なく、あらゆるタイミングでETLが停止した場合、チェックポイントがステートを保存します。チェックポイントを活用することで、オートローダーは、継続的に処理を行うことができ、定期的あるいはスケジュール処理の一部に組み込まれることが可能となります。上の例では、チェックポイントはオプションcheckpointLocation
(パワフルな機能その6 - チェックポイント)に保存されます。オートローダーが停止され再開された際、最新のステートに復帰し、既に処理されたファイルを再処理しないようにチェックポイントが使用されます。
準構造化データ、複雑な構造化データに対するクエリー
これで、我々はJSONデータをDelta Lakeテーブルに取り込むことができましたので、次に準構造化データ、複雑な構造化データをクエリーするためのパワフルな方法を見ていきましょう。準構造化のクエリーにおける最後の課題に取り組んでいきます。
ここまでで、特定の場所にDeltaテーブルを書き込むためにオートローダーを使用しました。SQLを用いてテーブルにアクセスすることができますが、可読性のために、以下のSQLコードを用いて外部テーブルを指定します。
CREATE TABLE autoloaderBronzeTable
LOCATION '${c.bronzeTablePath}';
値キャストの文法を用いた準構造化JSONにおけるトップレベル、ネストデータの容易なアクセス:
SELECT fulfillment_days, fulfillment_days:picking,
fulfillment_days:packing::double, fulfillment_days:shipping.days
FROM autoloaderBronzeTable
WHERE fulfillment_days IS NOT NULL
データ取り込みの際、JSONを文字列の状態で保持する必要があり、いくつかのデータは適切なデータ型になっていない場合があります。このような場合、上の例の文法を用いることで、準構造化データのクエリー部分をシンプルかつ可読性の高いものにすることができます。この例でダブルクリックを行うことで、JSON文字列カラムであるfilfillment_days
のデータを見てみましょう。
-
トップレベルカラムへのアクセス: JSON文字列カラムのトップレベルにアクセスするために、単一のコロン(:)を使用します(パワフルな機能その7 - JSONカラムの抽出)。例えば、
filfillment_days:picking
は、上の最初の行では0.32を返却します。 -
ネストされたフィールドへのアクセス: ネストされたフィールドにアクセスするにはドットを使用します(パワフルな機能その8 - ドット記法)。例えば、
fulfillment_days:shipping.days
は、上の例の最初の行では3.7を返却します。 -
値のキャスト: 値をキャストするためにダブルコロン(::)とデータ型を指定します(パワフルな機能その9 - 値のキャスト)。例えば、
fulfillment_days:packing::double
は、上の例の最初の行では、文字列のpackingに対応するdouble型の1.99を返却します。
データが適切にフォーマットされていない場合でも準構造化配列から値を抽出:
SELECT *, reduce(all_click_count_array, 0, (acc, value) -> acc + value) as
sum
FROM (
SELECT order_number, clicked_items:[*][1] as all_click_counts,
from_json(clicked_items:[*][1], 'ARRAY')::ARRAY as all_click_count_array
FROM autoloaderBronzeTable
)
残念ですが、すべてのデータが使える構造でやってくるとは限りません。例えば、カラムclicked_items
は、カウントが文字列となっている配列の配列となっており、分かりにいものになっています。以下にclicked_items
のスニペットを示します。
-
配列からの値抽出(パワフルな機能その10): JSON配列文字列からすべての値を抽出するためにアスタリスク(*)を使用します。特定の配列インデックスに対して、0ベースの値を使用します。例えば、SQL
clicked_items:[*][1]
は文字列["54","85"]を返却します。 -
複雑な配列値のキャスト: 配列の配列から適切な値を抽出した後に、
reduce
を使って合計を計算できるフォーマットに配列をキャストするために、from_json
と::ARRAY
を使用します。最終的には、最初の行は合計値の139 (54 + 89)を返却します。適切にフォーマットされていないJSONから、SQLを使って簡単に合計を計算できます!
複雑な構造化データに対してSQLを用いて集計を行う:
複雑な構造化データへのアクセス、構造化データ、準構造化データ間の変換はすでにDatabricksでサポートされていました。
SELECT order_date, ordered_products_explode.name as product_name,
SUM(ordered_products_explode.qty) as quantity
FROM (
SELECT DATE(from_unixtime(order_datetime)) as order_date,
EXPLODE(ordered_products) as ordered_products_explode
FROM autoloaderBronzeTable
WHERE DATE(from_unixtime(order_datetime)) is not null
)
GROUP BY order_date, ordered_products_explode.name
ORDER BY order_date, ordered_products_explode.name
この記事の始めで、我々はオートローダーでJSONを読み込み、カラムordered_products
の複雑なデータ型を推定するためにoption(“cloudFiles.inferColumnTypes”, “true”)
を使用しました。上のSQLクエリーでは、複雑な構造化データにアクセスし、集計を行う方法を探索しました。以下に、カラムordered_products
の1行のサンプルを示しており、日次ベースで売れたそれぞれの製品の個数を探したいと考えています。見てわかるように、製品と数量の両方は配列でネストされています。
-
行として配列の要素にアクセス:
ordered_products
カラムにexplode
を使用し、以下のようにそれぞれの要素が行となるようにします。
-
ネストされたフィールドへのアクセス: 準構造化データのJSONと同じ方法で、ドット記法を用いてネストされたフィールドにアクセスします。例えば、
ordered_products_explode.qty
は上の最初の行では1を返却します。これにより、日付と製品名でグルーピングを行い合計値を計算することができます。
追加のリソース: 構造化データ、準構造化データのJSONに対するクエリーにおいて、多くのトピックをカバーしましたが、更なる情報を以下から参照することができます。
- SQLによる準構造化JSONに対するクエリーのドキュメント
- 複雑な構造化、準構造化データの取り扱いを述べたブログ記事。複雑なデータのストリーミング処理のユースケースに言及しています。
まとめ
Databricksにおいては、不可能なことを可能にし、難しいことを簡単にするための取り組みを進めています。オートローダーを用いることで、大規模かつ複雑なJSONユースケースを簡単かつ実現可能なものにします。準構造化、複雑なデータに対するSQLシンタックスを用いることで、データの操作を簡単なものにします。ここまでで皆様は、複雑なJSONを取り込みクエリーを行うために活用できる、オートローダーとSQL、10のパワフルな機能を学びましたので、これらを用いて皆様がどのようなものを構築するのかが楽しみでなりません。
サンプルノートブック