Tutorial: Run an end-to-end lakehouse analytics pipeline | Databricks on AWS [2022/11/18時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
Databricksでは、データプロフェッショナルがETL(extract/transform/load)パイプラインをクイックに開発、デプロイできるようにプロダクションレベルのツールのスイートを提供しています。Unity Catalogを用いることで、データスチュワードは組織全体のユーザーに対して、セキュアなストレージ認証情報、外部ロケーション、データベースオブジェクトを設定することができます。Databricks SQLを用いることで、アナリストはプロダクションのETLワークロードで使用されているのと同じテーブルに対してSQLクエリーを実行することができ、大規模なリアルタイムBIを実現することができます。
本書の最後まで読み進めることで、以下のことに慣れ親しむことができます。
- Unity Catalogが有効化された計算クラスターの起動
- Databricksノートブックの作成
- Unity Catalog外部ロケーションへのデータの読み書き
- Auto Loaderを用いたUnity Catalogテーブルへのインクリメンタルなデータ取り込みの設定
- データを処理、クエリー、プレビューするためにノートブックのセルを実行
- Databricksジョブとしてノートブックをスケジューリング
- Databricks SQLからUnity Catalogテーブルにクエリー
このチュートリアルでは、Unity Catalogが有効化されたクラスターで一般的なETLタスクを完了するために、インタラクティブなノートブックを使用します。Unity Catalogを使用していない場合は、Databricksで初めてのETLワークロードを実行するをご覧ください。
要件
Databricksにログインし、Data Science & Engineeringワークスペースにいること。詳細については、Get started: Free trial & setupをご覧ください。
注意
クラスター作成権限を持っていない場合でも、クラスターにアクセスできるのであれば、以下のほとんどのステップを完了することができます。
Databricks SQLワークスペースのみにアクセスできる場合には、データアナリストとしてDatabricksを使い始めるをご覧ください。
ステップ1: クラスターを作成する
探索的データ分析とデータエンジニアリングを行うには、コマンドの実行に必要な計算リソースを提供するクラスターを作成します。
- サイドバーのComputeをクリックします。
- ComputeページでCreate Clusterをクリックします。New Clusterページが表示されます。
- クラスターのユニークな名称を指定します。
- Single nodeラジオボタンを選択します。
- Access modeドロップダウンでSingle userを選択します。
- Single user accessフィールドにご自身のメールアドレスが表示されていることを確認します。
- 使用するDatabricks runtime versionを選択します。Unity Catalogを使うには11.1以降が必要です。
- Create Clusterをクリックします。
Databricksクラスターの詳細については、クラスターをご覧ください。
ステップ2: Databricksノートブックを作成する
Databricksでインタラクティブなコードを記述、実行するにはノートブックを作成します。
- サイドバーのNewをクリックし、Notebookをクリックします。
- Create Notebookページで:
- ノートブックのユニークな名称を指定します。
- デフォルト言語がPythonに設定されていることを確認します。
- Clusterドロップダウンからステップ1で作成したクラスターを選択します。
- Createをクリックします。
一番初めが空のセルのノートブックが開きます。
ノートブックの作成、管理の詳細に関してはノートブックの管理をご覧ください。
ステップ3: Unity Catalogによって管理される外部ロケーションでデータの読み書きを行う
インクリメンタルなデータの取り込みにはAuto Loaderを使うことをお勧めします。Auto Loaderは、クラウドオブジェクトストレージに新規ファイルが到着すると自動で検知し、処理を行います。
外部ロケーションに対するセキュアなアクセスを管理するためにUnity Catalogを使うことができます。外部ロケーションに対してREAD FILES
権限を持つユーザーやサービスプリンシパルは、データを取り込むためにAuto Loaderを使うことができます。
通常は、外部ロケーションに到着するデータは他のシステムから書き込まれます。このデモでは、外部ロケーションにJSONファイルを書き込むことでデータの到着をシミュレートすることができます。
以下のコードをノートブックセルにコピーします。catalog
の値を、CREATE CATALOG
とUSE CATALOG
権限を持つカタログ名で置き換えます。external_location
の値をREAD FILES
、WRITE FILES
、CREATE EXTERNAL TABLE
権限を持つ外部ロケーションのパスで置き換えます。
外部ロケーションを全体的なストレージコンテナとして定義することができますが、多くの場合コンテナ内にネストされたディレクトリをポイントします。
外部ロケーションパスの適切なフォーマットは"s3://bucket-name/path/to/external_location"
となります。
external_location = "<your_external_location>"
catalog = "<your_catalog>"
dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
display(dbutils.fs.head(f"{external_location}/filename.txt"))
dbutils.fs.rm(f"{external_location}/filename.txt")
display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))
このセルを実行することで、12バイトが書き込まれたと表示され、文字列Hello world!
が表示され、指定されたカタログに存在するすべてのデータベースが表示されます。このセルを実行することができない場合、お使いのワークスペースでUnity Catalogが有効化されているこをと確認し、このチュートリアルを完了するために適切な権限をワークスペース管理者にリクエストしてください。
以下のPythonコードでは、指定されたカタログにユニークなデータベースを作成し、指定された外部ロケーションにユニークなストレージロケーションを作成するために、あなたのメールアドレスを使用します。このセルを実行することで、このチュートリアルに関係するすべてのデータが削除され、冪等性を持ってこのサンプルを実行できる様になります。接続されたシステムからあなたのソースガイブリケーションへのデータバッチの到着をシミュレートするために使用するクラスが定義され、インスタンスが作成されます。
ノートブックの新規セルにこのコードをコピーし、環境を構築するために実行します。
注意
このコードで定義される変数は、既存のワークスペース資産やユーザーと競合するリスクを引き起こすことなしに、安全に実行される必要があります。制限のあるネットワークやストレージ権限によって、このコードの実行時にエラーが生じることがあります。これらの制限のトラブルシュートに関しては、ワークスペース管理者に問い合わせてください。
from pyspark.sql.functions import col
# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"
spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")
# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)
# Define a class to load batches of data to source
class LoadData:
def __init__(self, source):
self.source = source
def get_date(self):
try:
df = spark.read.format("json").load(source)
except:
return "2016-01-01"
batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
if batch_date.month == 3:
raise Exception("Source data exhausted")
return batch_date
def get_batch(self, batch_date):
return (
spark.table("samples.nyctaxi.trips")
.filter(col("tpep_pickup_datetime").cast("date") == batch_date)
)
def write_batch(self, batch):
batch.write.format("json").mode("append").save(self.source)
def land_batch(self):
batch_date = self.get_date()
batch = self.get_batch(batch_date)
self.write_batch(batch)
RawData = LoadData(source)
これで、以下のコードをセルにコピーして実行することで、バッチデータを到着させることができます。マニュアルで新規データの到着を60回まで実行することができます。
RawData.land_batch()
ステップ4: Unity Catalogにデータを取り込むAuto Loaderを設定する
Delta Lakeにデータを格納することをお勧めします。Delta LakeはACIDトランザク書を提供するオープンソースのストレージレイヤーであり、データレイクハウスを実現するものです。Delta Lakeは、Databricksで作成されるテーブルのデフォルトフォーマットとなっています。
Unity Catalogテーブルにデータを取り込む様にAuto Loaderを設定するには、以下のコードをノートブックの空のセルにコピーアンドペーストします。
# Import functions
from pyspark.sql.functions import input_file_name, current_timestamp
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", input_file_name().alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.option("mergeSchema", "true")
.toTable(table))
Auto Loaderの詳細に関しては、Auto Loaderをご覧ください。
Unity Catalogと構造化ストリーミングに関しては、Using Unity Catalog with Structured Streamingをご覧ください。
ステップ5: データを処理、操作する
ノートブックはセルごとにロジックを実行します。セルのロジックを実行するには以下のステップを踏みます:
-
以前のステップで入力したセルを実行するには、セルを選択しSHIFT+ENTERを推します。
-
作成したテーブルにクエリーを実行するには、空のセルに以下のコードをコピーアンドペーストし、SHIFT+ENTERを押してセルを実行します。
Pythondf = spark.read.table(table_name)
-
データフレームのデータをプレビューするには、空のセルに以下のコードをコピーアンドペーストし、SHIFT+ENTERを押してセルを実行します。
Pythondisplay(df)
データをインタラクティブに可視化する方法については、Databricksにおけるデータの可視化をご覧ください。
ステップ6: ジョブをスケジュールする
Databricksジョブのタスクに追加することで、Databricksノートブックをプロダクションのスクリプトとして実行することができます。このステップでは、手動で起動する新規ジョブを作成します。
タスクとしてノートブックをスケジュールするには:
- ヘッダーバーの右側にあるScheduleをクリックします。
- Job nameにユニークな名称を入力します。
- Manualをクリックします。
- Clusterドロップダウンで、ステップ1で作成したクラスターを選択します。
- Createをクリックします。
- ウィンドウが表示されるので、Run nowをクリックします。
- ジョブ実行の結果を参照するには、Last runタイムスタンプの隣のアイコンをクリックします。
ジョブの詳細については、Databricksにおけるジョブ管理をご覧ください。
ステップ7: Databricks SQLからテーブルにクエリーする
使用しているカタログに対するUSE CATALOG
権限、使用しているスキーマ(データベース)に対するUSE SCHEMA
権限、テーブルに対するSELECT
権限を持っている人は誰でも、お好きなDatabricks APIを用いてテーブルのコンテンツにクエリーを行うことができます。
画面の左上にある +
の上のペルソナスイッチャーを用いてDatabricks SQLのUIに切り替えることができます。ドロップダウンメニューからSQLを選択します。
Databricks SQLでクエリーを実行するには、稼働中のSQLウェアハウスにアクセスする必要があります。
このチュートリアルで作成したテーブルはtarget_table
という名前です。最初に指定したカタログとe2e_lakehouse_<your_username>
というパターンのデータベースを用いてクエリーを行うことができます。作成したデータオブジェクトを参照するためにData Explorerを使うこともできます。
さらなるインテグレーション
Databricksとデータエンジニアリングツールのインテグレーションに関しては以下を参照ください。
- Connect your favorite IDE
- Use dbt with Databricks
- Learn about the Databricks Command Line Interface (CLI)
- Databricks Terraformプロバイダー