4
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks Lakeflow 宣言型パイプラインでロボットイベントデータを処理してみた

Last updated at Posted at 2025-08-10

Databricks Lakeflow 宣言型パイプラインでロボットイベントデータを処理してみた

はじめに

Databricksの新機能であるLakeflow Pipeline Editorを使って、ロボットイベントデータを処理するサンプルパイプラインを作成しました。このプロジェクトでは、Bronze、Silver、Goldの3層アーキテクチャでデータを段階的に処理し、最終的にロボットの移動距離などの集計メトリクスを算出します。

今回作成したサンプルプロジェクトをGitHubで公開しましたので、実際の手順とともに紹介します。
lakeflow_declarative_pipeline.gif

📊 プロジェクト概要

このサンプルプロジェクトは以下の特徴があります:

  • データソース: シミュレートされたロボットイベントデータ(10,000件のJSONレコード)
  • アーキテクチャ: Bronze → Silver → Gold のメダリオンアーキテクチャ
  • 処理内容:
    • Bronze: 生データの取り込み
    • Silver: データクリーニングと強化
    • Gold: 集計メトリクスとロボット移動距離の算出

パイプライン実行結果

Screenshot 2025-08-10 at 16.16.30.png

🔗 リポジトリ

完全なコードとドキュメントは以下のGitHubリポジトリで公開しています:

Sample Lakeflow Declarative Pipeline - Robot Events

🛠️ 必要な前提条件

このサンプルを実行するためには、以下の環境が必要です:

  • Lakeflow Pipeline Editorが有効なDatabricksワークスペース
  • カタログ/スキーマで管理ボリュームを作成できる権限
  • Databricks上のPython実行環境

⚠️ 重要: Lakeflow Pipeline Editorが有効になっていない場合は、事前にDatabricks管理者に有効化を依頼してください。

🚀 実装手順

1. サンプルデータの生成

ChatGPT Image Aug 10, 2025, 03_33_57 PM.png

まず、00. generate robot event dataを実行してテスト用のJSONデータを生成します

このスクリプトにより、sample_json_output/フォルダーにJSONファイルが作成されます。

2. 管理ボリュームの設定

  1. DatabricksのCatalog Explorerで対象のカタログ・スキーマに移動
  2. 新しい管理ボリュームを作成
  3. ボリューム内にフォルダーを作成し、生成したJSONファイルをアップロード

3. パイプラインスクリプトの設定

各レイヤーのスクリプトで、環境に応じてパラメータを更新します:

Bronze レイヤー (robot_pipeline/bronze.py):

data_path = "<your-managed-volume-path>"  # 実際のボリュームパスに変更

Silver・Gold レイヤー (robot_pipeline/silver.py, robot_pipeline/gold.py):

CATALOG = "<your_catalog_name>"
BRONZE = "<bronze_schema_name>"
SILVER = "<silver_schema_name>"

4. Databricksでのパイプライン作成

  1. Jobs & Pipelines作成ETL パイプラインに移動
  2. パイプライン名とデフォルトのカタログ・スキーマを設定
  3. 既存のアセットを追加し、robot_pipelineフォルダーをルートに設定
  4. 以下のソースコードパスを追加:
    • bronze.py
    • silver.py
    • gold.py
  5. 保存してパイプラインを作成

5. パイプラインの実行

パイプライン作成後、「パイプラインを実行」をクリックしてBronze → Silver → Goldの順でデータ処理を開始します。

📂 プロジェクト構成

robot_pipeline/
  ├── bronze.py      # Bronze レイヤー - 生データの取り込み
  ├── silver.py      # Silver レイヤー - データのクリーニングと強化
  ├── gold.py        # Gold レイヤー - 集計メトリクスと分析
00_generate_robot_event_data.py  # サンプルデータ生成スクリプト

✅ 実行結果

パイプライン実行により、以下のテーブルが作成されます:

  • Bronze レイヤー: 管理ボリュームからの生JSONイベントデータ
  • Silver レイヤー:
    • command_events - クリーンアップされたコマンドイベント
    • sensor_data - センサーデータ
    • combined_events - 結合済みイベントデータ
  • Gold レイヤー: ロボット別の集計メトリクスと移動距離

🎯 このサンプルで学べること

  • Lakeflow Pipeline Editorの基本的な使い方
  • 宣言型パイプラインの実装方法
  • メダリオンアーキテクチャのベストプラクティス
  • 管理ボリュームを使ったデータ取り込み手順
  • Delta Liveテーブルの作成と依存関係の設定

まとめ

Databricks Lakeflow Pipeline Editorを使うことで、従来のETL処理と比べてより直感的で保守性の高いデータパイプラインを構築できました。宣言型アプローチにより、データの依存関係や品質チェックを明確に定義でき、運用面でも大きなメリットがあります。

ぜひリポジトリをクローンして、実際に動かしてみてください!

参考リンク

4
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?