はじめに
こんにちは、すぎもんです![]()
本記事では、Databricks を使って ETL(Extract:抽出/Transform:変換/Load:格納)ジョブを作成し、実行してみたいと思います。
今回はDatabricksに関連して下記3つの記事構成にしています。
1.DatabricksでETLジョブを作成してみた (本記事)
2.HULFT SquareからDatabricksのジョブを実行してみた
3.HULFT SquareからDatabricksのテーブルを参照してみた
HULFT Square(またはDataSpider Servista)からDatabricksのジョブを起動する記事とDatabricksのテーブルを参照する記事もあわせてご覧ください。
今回やること
Databricksに3つのタスクを含むジョブを作成して実行します。タスク1は内部ストレージに格納したCSVファイルを読み込み、データをBronzeテーブルに格納します。タスク2はBronzeテーブルのデータを読み込み、データをクレンジングした後にSilverテーブルに格納します。タスク3はSilverテーブルを読み込み、データを集計した後にGoldテーブルに格納します。

データレイクハウスでは、Bronze / Silver / Gold に分けてクレンジング、加工・集計を段階的に行う設計思想があります。
- Bronze:生データ置き場
- Silver:クレンジング・正規化済み
- Gold:加工・集計し、分析・業務向けに最適化済み
実施手順
以下の手順を実施します。
1.ソースデータ準備
2.タスク作成
3.ジョブ作成・実行
1.ソースデータ準備
DatabricksのボリュームにソースデータのCSVファイルを保存します。
Databricksでは「ボリューム」を内部ストレージとして利用します。
自組織⇒カタログ⇒スキーマ⇒ボリュームの階層構造になっています。
① スキーマを作成
CSVファイルを格納するためのスキーマを作成します。
Databricksコンソール画面の[カタログ]⇒カタログ名(workspace)⇒[スキーマを作成]を押下します。
※カタログ名は環境により異なります。

② ボリュームを作成
CSVファイルを格納するボリュームを作成します。
作成したdata_sourcesに移動し、[作成]ボタン⇒[ボリューム]を選択します。

[ボリューム名]を入力し、[カタログ]と[スキーマ]を選択して[作成]ボタンを押下します。

③ ファイルをアップロード
以下のようなCSVファイルをアップロードします。

・サンプルデータ
date,store_id,product,quantity,revenue,remarks
2026/1/1,1,apple,10,1000,active
2026/1/1,1,grape,5,500,expired
2026/1/1,2,apple,8,800,active
2026/1/2,1,apple,7,700,expired
2026/1/2,3,grape,5,500,active
2026/1/3,3,orange,6,600,active
2026/1/3,2,orange,3,300,active
2026/1/3,1,grape,5,500,active
作成したボリュームに移動し、[このボリュームにアップロード]ボタンを押下します。

2.タスク作成
Databricksのタスクを作成します。
① スキーマを作成
Deltaテーブルを格納するスキーマを作成します。
Databricksでは「テーブル」にDelta Lake形式のデータを格納します。
自組織⇒カタログ⇒スキーマ⇒テーブルの階層構造になっています。
[カタログ]⇒カタログ名(workspace)⇒[スキーマを作成]を押下します。

② ノートブックでコードを書く
[ワークスペース]⇒[作成]ボタン⇒[ノートブック]をクリックします。
- タスク1のPythonコード実装例
内部ストレージのCSVファイルを読み込み、Bronzeテーブルに書き込む処理を実装します。初回実行時にBronzeテーブルを作成します。
# Databricks notebook source
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import to_timestamp
# Spark セッション作成
spark = SparkSession.builder.getOrCreate()
# CSVファイルのスキーマ定義
schema = StructType([
StructField('date', StringType(), True),
StructField('store_id', IntegerType(), True),
StructField('product', StringType(), True),
StructField('quantity', IntegerType(), True),
StructField('revenue', DoubleType(), True),
StructField('remarks', StringType(), True)
])
# CSVファイル読み込み
bronze_df = spark.read.csv(
"/Volumes/workspace/file/data_source/sales.csv",
header=True,
schema=schema
)
# date を Timestamp に変換
bronze_df = bronze_df.withColumn("date", to_timestamp("date", "yyyy/M/d"))
# Bronze Delta テーブルとして保存
bronze_df.write.format("delta").mode("overwrite") \
.saveAsTable("workspace.test_schema.sales_bronze")
# Bronze のデータ出力(確認用)
print("=== Bronze created ===")
bronze_df.show()
- タスク2のPythonコード実装例
Bronzeテーブルを読み込み、データをクレンジングしてSilverテーブルに書き込む処理を実装します。初回実行時にSilverテーブルを作成します。
# Databricks notebook source
from pyspark.sql import SparkSession
# Spark セッション作成
spark = SparkSession.builder.getOrCreate()
# Bronze 読み込み
bronze_df = spark.table("workspace.test_schema.sales_bronze")
# Silver 作成(クレンジング)
silver_df = bronze_df.filter(bronze_df.remarks != "expired") \
.select("date", "store_id", "product", "quantity", "revenue")
# Silver Delta テーブルとして保存
silver_df.write.format("delta").mode("overwrite") \
.saveAsTable("workspace.test_schema.sales_silver")
# Silver のデータ出力(確認用)
print("=== Silver created ===")
silver_df.show()
- タスク3のPythonコード実装例
Silverテーブルを読み込み、データを集計してGoldテーブルに書き込む処理を実装します。初回実行時にGoldテーブルを作成します。
# Databricks notebook source
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# Spark セッション作成
spark = SparkSession.builder.getOrCreate()
# Silver 読み込み
silver_df = spark.table("workspace.test_schema.sales_silver")
# Gold 作成(集計)
gold_df = silver_df.groupBy("product") \
.agg(
F.sum("quantity").alias("total_quantity"),
F.sum("revenue").alias("total_revenue")
)
# Gold Delta テーブルとして保存
gold_df.write.format("delta").mode("overwrite") \
.saveAsTable("workspace.test_schema.sales_gold")
# Gold のデータ出力(確認用)
print("=== Gold created ===")
gold_df.show()
3.ジョブ作成・実行
Databricksのジョブを作成し、実行します。
① ジョブを作成
[ジョブとパイプライン]⇒[作成]ボタン⇒[ジョブ]を選択します。

② タスクを追加
左上のジョブのタイトルを入力し、[ノートブック]をクリックしてタスクを追加します。

[タスク名]を入力し、[パス]を一覧から選択して、[タスクを作成]ボタンを押下します。
[パス]は2. タスク作成で作成したタスク1のノートブックを選択します。

[タスクを追加]ボタンを押下し、同様にタスク2とタスク3を追加します。

③ ジョブを実行
必要なタスクを全て追加後に[今すぐ実行]ボタンを押下し、ジョブが正常に動作するか確認します。

ジョブ実行結果は[ジョブの実行]タブで確認が可能です。ジョブ実行が成功した場合はジョブ実行時間を表すグラフが緑で表示され、[ステータス]が成功となります。

④ ジョブ実行結果を確認
Bronzeテーブル、Silverテーブル、Goldテーブルのデータが作成されていることを確認します。
[ワークスペース]⇒[作成]ボタン⇒[クエリー]をクリックします。

それぞれのテーブルを参照するセレクト文を記載します。
select * from test_schema.sales_bronze;
select * from test_schema.sales_silver;
select * from test_schema.sales_gold;
おわりに
いかがだったでしょうか。
今回は、DatabricksのETLジョブにより、CSVファイルからデータを読み込み、Bronzeテーブル⇒Silverテーブル⇒Goldテーブルへとクレンジング・集計を行い、段階的なデータ処理を実現する方法をご紹介しました。
今回の手順を通して、Databricksのカタログ、スキーマ、ボリューム/テーブルの階層や、複数のタスクをまとめてジョブとして実行するETLパイプラインの流れもご理解いただけたと思います。
ここまで読んでいただき、ありがとうございました。それでは、また!










