Run your first ETL workload on Databricks | Databricks on AWS [2022/5/27時点]の翻訳です。
Databricksクイックスタートガイドのコンテンツです。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
Databricksでは、データのプロフェッショナルがETL(extract、transform、load)パイプラインをクイックに開発、デプロイできるようにプロダクションレディのツールのスイートを提供しています。
本書の最後までには、以下のことに習熟することになります。
- Databricksのall-purpose計算クラスターの起動
- Databricksノートブックの作成
- Auto Loaderを用いたDelta Lakeへのインクリメンタルなデータ取り込みの設定
- データを処理、クエリー、プレビューするためにノートブックセルを実行
- Databricksジョブとしてノートブックをスケジュール
このチュートリアルでは、PythonやScalaにおける一般的なETLタスクを完成させるためにインタラクティブなノートブックを使用します。
また、ETLパイプラインの構築にDelta Live Tablesを使用することもできます。プロダクションのETLパイプラインの構築、デプロイ、維持管理の複雑性を削減するために、DatabricksではDelta Live Tablesを作成しました。Delta Live Tablesクイックスタートをご覧ください。
要件
- Databricksにログインし、Data Science & Engineeringワークスペースにアクセスしていること。詳細はフリートライアルのサインアップをご覧ください。
- クラスターを作成できる権限を持っていること。
注意
クラスターの管理権限を持っていない場合でも、クラスターにアクセスできる限り以下のステップのほとんどを完了することができます。
Databricks SQLワークスペースにのみアクセスできる場合には、Get started with Databricks as a data analystを参照ください。
ステップ1: クラスターを作成する
探索的データ分析とデータエンジニアリングを行うためには、コマンドを実行するのに必要な計算資源を提供するためにクラスターを作成します。
- サイドバーのComputeをクリックします。
- クラスターページで、Create Clusterをクリックします。New Clusterページが表示されます。
- クラスター固有の名前を指定し、他の値はデフォルトの状態のままとし、Create Clusterをクリックします。
Databricksクラスターの詳細については、クラスターをご覧ください。
ステップ2: Databricksノートブックを作成する
Databricksでインタラクティブなコードを記述し、実行するにはノートブックを作成します。
- サイドバーのCreateをクリックし、Notebookをクリックします。
- Notebook作成ページで以下を行います。
- ノートブック固有の名前を指定します。
- デフォルトの言語がPythonかScalaであることを確認します。
- Clusterドロップダウンからステップ1で作成したクラスターを選択します。
- Createをクリックします。
一番上に空のセルがあるノートブックが開きます。
ノートブックの作成、管理の詳細についてはノートブックの管理をご覧ください。
ステップ3: Delta Lakeにデータを取り込むAuto Loaderを設定する
インクリメンタルなデータ取り込みには、Auto Loaderを使用することをお勧めします。Auto Loaderはクラウドオブジェクトストレージに新規データが到着すると、自動でファイルを検知し処理します。
また、データの格納にはDelta Lakeを使用することをお勧めします。Delta LakeはACIDトランザクションを提供するオープンソースのストレージレイヤーであり、データレイクハウスを実現することができます。Delta LakeはDatabricksで作成されるテーブルのデフォルトフォーマットです。
Delta Lakeテーブルにデータを取り込むのにAuto Loaderを設定するには、以下のコードをノートブックの空のセルにコピーアンドペーストします。
# Import a function
from pyspark.sql.functions import input_file_name, current_timestamp
# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"
# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", input_file_name().alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))
// Imports
import org.apache.spark.sql.functions.{input_file_name, current_timestamp}
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._
// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"
// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)
// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select($"*", input_file_name.as("source_file"), current_timestamp.as("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(Trigger.AvailableNow)
.toTable(table_name)
注意
このコードで定義される変数は、既存のワークスペースの資産や他のユーザーと競合するリスクを引き起こすことなしに、安全に実行されるようになっています。制限されたネットワークやストレージのアクセス権によって、コード実行時にエラーが発生する場合があります。これらの制限をトラブルシュートするためにあなたのワークスペース管理者にコンタクトしてください。
Auto Loaderの詳細については、Auto Loaderをご覧ください。
ステップ4: データを処理しインタラクションする
ノートブックはロジックをセル単位で実行します。お使いのセルでロジックを実行するには、以下を行います。
-
上のステップで入力したセルを実行するには、セルを選択しSHIFT+ENTERを押します。
-
今作成したテーブルにクエリーを実行するには、以下のコードを空のセルにコピーアンドペーストし、セルを実行するためにSHIFT+ENTERを押します。
Pythondf = spark.read.table(table_name)
Scalaval df = spark.read.table(table_name)
-
データフレームのデータをプレビューすには、以下のコードを空のセルにコピーアンドペーストし、セルを実行するためにSHIFT+ENTERを押します。
Pythondisplay(df)
Scaladisplay(df)
データの可視化を行うためのインタラクティブなオプションに関しては、Databricksにおけるデータの可視化をご覧ください。
ステップ5: ジョブをスケジュールする
DatabricksジョブのタスクとしてDatabricksノートブックを追加することで、Databricksノートブックをプロダクションスクリプトとして実行することができます。このステップでは、手動で起動できる新規のジョブを作成します。
ノートブックをタスクとしてスケジュールするには:
- ヘッダーバーの右側にあるScheduleをクリックします。
- Job nameに固有の名前を入力します。
- Manualをクリックします。
- Clusterドロップダウンから、ステップ1で作成したクラスターを選択します。
- Createをクリックします。
- ウィンドウが表示されるのでRun nowをクリックします。
- ジョブの実行結果を参照するには、Last runタイムスタンプの隣のアイコンをクリックします。
ジョブの詳細については、Databricksにおけるジョブ管理をご覧ください。
その他のインテグレーション
Databricksを用いたデータエンジニアリングのインテグレーションとツールに関しては、以下をご覧ください。
- Connect your favorite IDE
- Airflowによるデータパイプラインの依存関係の管理
- Use dbt with Databricks
- Learn about the Databricks Command Line Interface (CLI)
- Databricks Terraformプロバイダー