Tutorial: Continuously ingest data into Delta Lake with Auto Loader | Databricks on AWS [2022/3/15時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
継続的かつインクリメンタルなデータ取り込みは一般的な要件です。例えば、モバイルゲーム、eコマースのウェブサイト、IoTセンサーなどは継続的なデータストリームを生成します。アナリストは最新のデータにアクセスしたいと望みますが、いくつかの理由から実装には課題があります。
- データが到着したら、ファイルを一度だけ処理し、データを取り込んで変換する必要がある場合があります。
- テーブルに書き込む前にスキーマを強制したいと思うかもしれません。このロジックの開発や維持をするには複雑になる場合があります。
- 時間を経てスキーマが変化するデータの取り扱いは大変です。例えば、データ品質問題を持つ入力行をどのように取り扱うのか、生のデータで問題を解決した後にこれらの行をどのように再処理するのかを決めなくてはなりません。
- 分あたり数千、数百万のファイルを処理するスケーラブルなソリューションは、イベント通知、メッセージキュー、トリガーのようなクラウドサービスとのインテグレーションを必要とし、複雑な開発と長期のメンテナンスを追加することになります。
継続的、コスト効率的、メンテナンス可能かつスケーラブルなデータ変換、取り込みシステムの構築は簡単なものではありません。上述の問題に取り組み、データチームが低コスト、低レーテンシーでクラウドオブジェクトストアから生データをロードする手段を提供するために、Databricksはビルトインかつ最適化されたソリューションであるAuto Loaderを提供します。Auto Loaderは、新規ファイルに対する通知サービスを自動で設定、リッスンし、秒あたり数百万のファイルまでスケールアップすることができます。また、スキーマ推定やスキーマ進化にような一般的な問題についても対応します。詳細はAuto Loaderを参照ください。
このチュートリアルでは、インクリメンタルにデータをDeltaテーブルに取り込むためにAuto Loaderを使用します。
要件
- Databricksのアカウントおよびアカウント内に作成したDatabricksワークスペース。これらを作成するにはフリートライアルへのサインアップをご覧ください。
- ワークスペースで作成したall-purposeクラスター。作成するには、クラスターの作成をご覧ください。
- Databricksワークスペースの使い方に慣れる。Navigate the workspaceをご覧ください。
ステップ1: サンプルデータの作成
このステップでは、ワークスペースにノートブックを作成します。このノートブックでは、30秒ごとにワークスペースにランダムなカンマ区切りのファイルを生成するコードを実行します。これらのファイルのそれぞれにはランダムなデータのセットが含まれます。
注意
Auto Loaderは以下のファイルフォーマットのデータに対しても動作します。Avro, バイナリー, CSV, JSON, ORC, Parquet, テキスト。
-
ワークスペースのサイドバーでCreate > Notebookをクリックします。
-
Create Notebookダイアログで、例えば
Fake Data Generator
というようなノートブックの名前を入力します。 -
Default LanguageではPythonを選択します。
-
Clusterでは要件のセクションで作成したクラスターを選択するか、利用可能なクラスターから使いたいものを選択します。
-
Createをクリックします。
-
ノートブックのメニューバーで、クラスター名の隣の円に緑のチェックマークが含まれていない場合、クラスター名の隣のドロップダウンの矢印をクリックし、Start Clusterをクリックします。Confirmをクリックし、円の中に緑のチェックマークが表示されるまで待ちます。
-
ノートブックの最初のセルに以下のコードを貼り付けます。
Pythonimport csv import uuid import random import time from pathlib import Path count = 0 path = "/tmp/generated_raw_csv_data" Path(path).mkdir(parents=True, exist_ok=True) while True: row_list = [ ["id", "x_axis", "y_axis"], [uuid.uuid4(), random.randint(-100, 100), random.randint(-100, 100)], [uuid.uuid4(), random.randint(-100, 100), random.randint(-100, 100)], [uuid.uuid4(), random.randint(-100, 100), random.randint(-100, 100)] ] file_location = f'{path}/file_{count}.csv' with open(file_location, 'w', newline='') as file: writer = csv.writer(file) writer.writerows(row_list) file.close() count += 1 dbutils.fs.mv(f'file:{file_location}', f'dbfs:{file_location}') time.sleep(30) print(f'New CSV file created at dbfs:{file_location}. Contents:') with open(f'/dbfs{file_location}', 'r') as file: reader = csv.reader(file, delimiter=' ') for row in reader: print(', '.join(row)) file.close()
上のコードでは以下の処理を行なっています。
-
ワークスペースで
/tmp/generated_raw_csv_data
にディレクトリが存在しない場合には作成します。ティップス
既に誰かがこのチュートリアルを実行しており、ワークスペースにこのパスが存在している場合は、このパスのファイルを削除した方が良いかもしれません。 -
以下のようなランダムなデータセットを作成します。
id,x_axis,y_axis d033faf3-b6bd-4bbc-83a4-43a37ce7e994,88,-13 fde2bdb6-b0a1-41c2-9650-35af717549ca,-96,19 297a2dfe-99de-4c52-8310-b24bc2f83874,-23,43
-
30秒後に、
file_<number>.csv
という名前のファイルを生成し、ファイルにランダムなデータセットを書き込み、dbfs:/tmp/generated_raw_csv_data
にファイルを格納し、ファイルへのパスと中身をレポートします。<number>
は0
からスタートし、ファイルが作成される都度、1づつ増えます(例えば、file_0.csv
、file_1.csv
など)。
-
-
ノートブックのメニューバーでRun Allをクリックします。ノートブックを実行したままの状態にしておきます。
注意
生成されたファイルの一覧を参照するには、サイドバーでDataをクリックします。DBFSをクリックし、プロンプトが出た場合にはクラスターを選択します。そして、tmp > generated_raw_csv_dataにアクセスします。
ステップ2: Auto Loaderの実行
このステップでは、ワークスペースのある場所から生データを継続的に読み込み、そのデータを同じワークスペースの別の場所にあるDetlaテーブルにstreamingするためにAuto Loaderを使用します。
-
ワークスペースのサイドバーでCreate > Notebookをクリックします。
-
Create Notebookダイアログで、例えば
Auto Loader Demo
というようなノートブックの名前を入力します。 -
Default LanguageではPythonを選択します。
-
Clusterでは要件のセクションで作成したクラスターを選択するか、利用可能なクラスターから使いたいものを選択します。
-
Createをクリックします。
-
ノートブックのメニューバーで、クラスター名の隣の円に緑のチェックマークが含まれていない場合、クラスター名の隣のドロップダウンの矢印をクリックし、Start Clusterをクリックします。Confirmをクリックし、円の中に緑のチェックマークが表示されるまで待ちます。
-
ノートブックの最初のセルに以下のコードを貼り付けます。
Pythonraw_data_location = "dbfs:/tmp/generated_raw_csv_data" target_delta_table_location = "dbfs:/tmp/table/coordinates" schema_location = "dbfs:/tmp/auto_loader/schema" checkpoint_location = "dbfs:/tmp/auto_loader/checkpoint"
このコードは、お使いのワークスペースにおける生データとターゲットのDeltaテーブルへのパス、テーブルスキーマへのパス、Auto LoaderがDelta Lakeのトランザクションログの中のチェックポイントファイル情報を書き込む場所へのパスを定義します。チェックポイントを用いることで、Auto Loaderは新規に到着するデータのみを処理し、既に処理した既存データをスキップすることができます。
ティップス
既に誰かがこのチュートリアルを実行しており、ワークスペースにこのパスが存在している場合は、このパスのファイルを削除した方が良いかもしれません。 -
最初のセルにカーソルを置いたままで、セルを実行します(セルを実行するにはShift+Enterを押します)。Databricksが指定されたパスをメモリーに読み込みます。
-
次のセルが無い場合には、最初のセルの下にセルを追加します(セルを追加するには、セルの下の端にマウスポインターを移動し、**+**アイコンをクリックします)。2つ目のセルでは、以下のコードを貼り付けます(
cloudFiles
がAuto Loaderを指すことに注意してください)。Pythonstream = spark.readStream \ .format("cloudFiles") \ .option("cloudFiles.format", "csv") \ .option("header", "true") \ .option("cloudFiles.schemaLocation", schema_location) \ .load(raw_data_location)
-
このセルを実行します。
-
3つ目のセルには以下のコードを貼り付けます。
Pythondisplay(stream)
-
このセルを実行します。Auto Loaderは
raw_data_location
にある既存のCSVファイルの処理を開始し、その場所に到着するCSVも処理します。Auto Loaderはファイルの最初の行を用いてCSVファイルごとのフィールド名を処理し、残りの行をフィールドデータとして処理します。DatabrikcsはAuto Loaderが処理を行うたびにデータを表示します。 -
4つ目のセルでは、以下のコードを貼り付けます。
Pythonstream.writeStream \ .option("checkpointLocation", checkpoint_location) \ .start(target_delta_table_location)
-
このセルを実行します。Auto Loaderは
target_data_table_location
にあるDeltaテーブルにデータを書き込みます。また、Auto Loaderはcheckpoint_location
にチェックポイントファイル情報を書き込みます。
ステップ3: データスキーマの進化と強制
時間とともにお使いのデータのスキーマが変化したら何が起きるのでしょうか?例えば、将来的にはデータ品質問題をよりうまく取り扱い、データの計算をより簡単にできるように、フィールドデータのタイプを進化させたいとしたらどうでしょうか?このステップでは、データを許可されるデータタイプに進化させ、入力データに対してこのスキーマを強制します。
新規のサンプルファイルが生成されるようにデータストリームを維持するために、ステップ1のノートブックが稼働していることを思い出して下さい。
-
ステップ2のノートブックを停止します(ノートブックメニューバーのStop Executionをクリックします)。
-
ステップ2のノートブックで、(
stream.writeStream
を起動するセルである)4つ目のセルを以下の内容で置き換えます。Pythonstream.printSchema()
-
全てのノートブックセルを実行します。
-
ノートブックを停止します。
-
(
stream = spark.readStream
を起動した)2つ目のセルを以下の内容で置き換えます。Pythonstream = spark.readStream \ .format("cloudFiles") \ .option("cloudFiles.format", "csv") \ .option("header", "true") \ .option("cloudFiles.schemaLocation", schema_location) \ .option("cloudFiles.schemaHints", """x_axis integer, y_axis integer""") \ .load(raw_data_location)
-
ノートブックの全てのセルを実行します。Databricksは、
x_axis
とy_axis
のカラムがintegerとなるデータの新たなスキーマを表示します。それでは、新たなスキーマを用いてデータ品質を強制しましょう。 -
ノートブックを停止します。
-
2つ目のセルを以下のコードで置き換えます。
Pythonfrom pyspark.sql.types import StructType, StructField, StringType, IntegerType schema = StructType([ StructField('id', StringType(), True), StructField('x_axis', IntegerType(), True), StructField('y_axis', IntegerType(), True) ]) stream = spark.readStream \ .format("cloudFiles") \ .option("cloudFiles.format", "csv") \ .option("header", "true") \ .option("cloudFiles.schemaLocation", schema_location) \ .schema(schema) \ .load(raw_data_location)
-
ノートブックの全てのセルを実行します。Auto Loaderは、新規スキーマにマッチしない入力データをどのように処理するのかを決定するために、自身のスキーマ推論とスキーマ進化ロジックを使用します。
ステップ4: クリーンアップ
このチュートリアルを完了したら、ワークスペースのDatabricks関連リソースをクリーンアップすることができます。
データの削除
-
両方のノートブックを停止します(ノートブックを開くには、サイドバーでWorkspace > Users > your user nameをクリックします)。
-
ステップ1のノートブックで、最初のセルの後にセルを追加し、以下のコードを2つ目のセルに貼り付けます。
Pythondbutils.fs.rm("dbfs:/tmp/generated_raw_csv_data", True) dbutils.fs.rm("dbfs:/tmp/table", True) dbutils.fs.rm("dbfs:/tmp/auto_loader", True)
警告!
これらの場所に他の情報がある場合、これらの情報も削除されます! -
セルを実行します。Databricksは生データを含むディレクトリ、Deltaテーブル、テーブルのスキーマ、Auto Loaderのチェックポイント情報を削除します。
ノートブックの削除
- サイドバーでWorkspace > Users > your user nameをクリックします。
- 最初のノートブックの隣にあるドロップダウンの矢印をクリックし、Move to Trashをクリックします。
- Confirm and move to Trashをクリックします。
- 2つ目のノートブックにもステップ1-3を行います。
クラスターの停止
他のタスクにクラスターを使わないのであれば、追加のコストを避けるために停止すべきです。
- サイドバーでComputeをクリックします。
- クラスター名をクリックします。
- Terminateをクリックします。
- Confirmをクリックします。
その他のリソース
- Auto Loaderの技術ドキュメント
- 大規模データに対するファイル通知の活用
- ブログ記事 Databricksレイクハウスで準構造化データ管理をシンプルにする10のパワフルな機能
- オンデマンドウェビナーシリーズ Hassle-Free Data Ingestion