1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks Free Editionで始めるApache Spark

Last updated at Posted at 2025-08-24

来月9/16(火)19時から開催するもくもく会のノートブックをウォークスルーします。

使用するノートブックはこちらです。

ノートブックの取り込み方法

Gitフォルダを作成して、上のリポジトリをクローンします。

Screenshot 2025-08-25 at 8.46.27.png

URLにhttps://github.com/taka-yayoi/spark_mokumokuを指定します。
Screenshot 2025-08-25 at 8.46.51.png

spark-mokumoku-notebookノートブックを開きます。
Screenshot 2025-08-25 at 8.47.04.png

Apache Sparkもくもく会ハンズオンノートブック

🎯 今日の目標

このもくもく会では、Apache Sparkの基礎を実際に手を動かしながら学んでいきます。大規模データ処理の世界への第一歩を踏み出しましょう!

📋 事前準備

🚀 学習の流れ

  1. Sparkの基本概念を理解
  2. データフレームの操作を習得
  3. 実データでの分析を実践
  4. SQLとの連携を学習
  5. Unity Catalogとの統合を理解

📚 第1部: Apache Sparkとは

Sparkの誕生ストーリー

2009年、カリフォルニア大学バークレー校で誕生したApache Sparkは、ビッグデータ処理に革命をもたらしました。Databricksの創業者たちが開発したこのフレームワークは、従来のHadoop MapReduceと比較して、中間結果をメモリーに保持することで100倍以上の高速化を実現しました。

🐼 PandasとSparkの違い

多くの方が使い慣れているPandasと比較してSparkを理解しましょう:

特徴 Pandas Apache Spark
処理方式 単一マシン(シングルスレッド) 分散処理(複数マシン・並列処理)
データサイズ メモリに収まる範囲(通常数GB) メモリを超える大規模データ(TB〜PB)
処理速度 小規模データでは高速 大規模データで圧倒的に高速
実行タイミング 即座に実行(Eager Evaluation) 遅延評価(Lazy Evaluation)
データ構造 DataFrame DataFrame(分散)
スケーラビリティ 垂直スケール(マシンスペック依存) 水平スケール(ノード追加で対応)
エラー処理 エラー時は最初から 障害耐性あり(自動リトライ)

なぜSparkを学ぶのか?

  • 🚀 スピード: メモリベースの処理により、大規模データを高速に処理
  • 🎯 使いやすさ: Pandasに似たDataFrame APIで学習曲線が緩やか
  • 🔧 モジュール性: SQL、機械学習、ストリーミング処理など多様な用途に対応
  • 📈 拡張性: データ量の増加に柔軟に対応可能

🔍 これから実行するコード

最初に、Databricksで自動的に作成されているSparkセッションを確認します。
Sparkセッションは、Sparkの全機能への入口となる重要なオブジェクトです。

# Sparkセッションの確認
# Databricksでは起動時に自動的に`spark`変数が作成されています
# これがSparkへのエントリーポイントとなります
spark
<pyspark.sql.connect.session.SparkSession at 0x7fd850e666d0>

📊 Sparkのバージョンと設定を確認

現在使用しているSparkの環境情報を確認してみましょう。

# Sparkのバージョンを確認
# サーバレスコンピュートでは最新のSparkバージョンが使用されます
print(f"Sparkバージョン: {spark.version}")

# 現在のカタログとデータベースを確認
print(f"現在のカタログ: {spark.sql('SELECT current_catalog()').first()[0]}")
print(f"現在のデータベース: {spark.sql('SELECT current_database()').first()[0]}")
Sparkバージョン: 4.0.0
現在のカタログ: workspace
現在のデータベース: default

🏗️ 第2部: Sparkの基本アーキテクチャを詳しく理解する

📐 Sparkクラスターの構成要素

spark_architecture.png

Sparkは分散処理システムとして設計されており、以下の主要コンポーネントから構成されています:

1. ドライバー(Driver)

  • 役割: アプリケーション全体の司令塔
  • 責任:
    • ユーザーコードをタスクに変換
    • タスクのスケジューリング
    • エグゼキューターの監視
    • 実行計画の最適化

2. エグゼキューター(Executor)

  • 役割: 実際のデータ処理を実行
  • 責任:
    • タスクの実行
    • データの保持(メモリまたはディスク)
    • 中間結果の保存
    • ドライバーへの結果返却

3. クラスターマネージャー

  • 役割: リソースの管理と割り当て
  • 種類: Databricks(サーバレス)、YARN、Mesos、Kubernetes等

🔄 処理の流れ

[ユーザーコード] 
    ↓
[ドライバー] → 論理実行計画 → 物理実行計画 → タスク生成
    ↓
[クラスターマネージャー] → リソース割り当て
    ↓
[エグゼキューター群] → 並列実行
    ↓
[結果の集約]

🎯 重要な概念

  1. パーティション: データを分割した単位(並列処理の基本単位)
  2. タスク: パーティションに対する処理の単位
  3. ステージ: シャッフルで区切られたタスクのグループ
  4. ジョブ: アクションによって起動される全体の処理

🔍 アーキテクチャの動作原理

Sparkでデータ処理を実行する際の内部動作を理解しましょう:

例: 1000個の数値を処理する場合

# データフレームを4つのパーティションで作成
numbers_df = spark.range(0, 1000, 1, numPartitions=4)

このコードが実行されると、以下のような処理が行われます:

  1. パーティション分割

    • 0-249: パーティション1 → エグゼキューター1で処理
    • 250-499: パーティション2 → エグゼキューター2で処理
    • 500-749: パーティション3 → エグゼキューター3で処理
    • 750-999: パーティション4 → エグゼキューター4で処理
  2. 並列実行

    • 4つのタスクが同時に実行される
    • 各エグゼキューターが独立して処理
    • 処理時間は約1/4に短縮
  3. 結果の集約

    • 各エグゼキューターの結果をドライバーが収集
    • 最終的な結果を生成

サーバレス環境での最適化

Databricksのサーバレスコンピュートでは:

  • パーティション数が自動的に最適化される
  • リソースが動的に割り当てられる
  • 負荷に応じてエグゼキューターが自動スケール"

📝 理解度チェック

  • Q1: ドライバーの主な役割は何でしょうか?
  • Q2: エグゼキューターは何を実行しますか?
  • Q3: パーティションとタスクの関係は?

これらの質問に答えられるようになれば、Sparkアーキテクチャの基本を理解できています!

🎬 第3部: 最初のSparkデータフレームを作成

📖 トランスフォーメーションとアクションの理解

Sparkの処理は大きく2種類に分類されます:

  1. トランスフォーメーション(Transformation)

    • データの変換処理を定義(まだ実行されない)
    • 例: select(), filter(), groupBy()
  2. アクション(Action)

    • 実際に処理を実行して結果を返す
    • 例: count(), show(), collect()

transform_action.png

Step 1: シンプルなデータフレームの作成

まず、連番データを持つデータフレームを作成します。

# 大規模データを想定して、1から100万までの数値を生成
# range()メソッドは「トランスフォーメーション」
# この時点ではデータは生成されず、処理の定義のみが作成されます
first_df = spark.range(1000000)

# データフレームの型を確認
print(f"データフレームの型: {type(first_df)}")
print(f"データフレームのクラス: {first_df.__class__.__name__}")

# スキーマ(データ構造)を確認
# この情報はメタデータとして保持されているため、データを読まずに表示できます
print(f"\nスキーマ情報: {first_df}")
print("\n詳細なスキーマ:")
first_df.printSchema()
データフレームの型: <class 'pyspark.sql.connect.dataframe.DataFrame'>
データフレームのクラス: DataFrame

スキーマ情報: DataFrame[id: bigint]

詳細なスキーマ:
root
 |-- id: long (nullable = false)

Step 2: トランスフォーメーションのチェーン

複数のトランスフォーメーションを連鎖させて、処理のパイプラインを構築します。

# selectExpr()を使って、SQL式で新しいカラムを作成
# まだ実行されません - 処理の定義を追加するだけ
doubled_df = first_df.selectExpr(
    "id as original_id",           # 元のIDカラムをリネーム
    "(id * 2) as doubled_value"     # IDを2倍にした新しいカラム
)

# さらにトランスフォーメーションを追加
# メソッドチェーンで複数の処理を連結できます
final_df = doubled_df.selectExpr(
    "original_id",
    "doubled_value",
    "(doubled_value / 1000) as divided_by_1000",     # 1000で割る
    "(doubled_value % 100) as modulo_100"            # 100で割った余り
)

# この時点でもまだ処理は実行されていません
print("トランスフォーメーションのチェーンが定義されました")
print("実際の処理はアクションが呼ばれるまで実行されません")
トランスフォーメーションのチェーンが定義されました
実際の処理はアクションが呼ばれるまで実行されません

Step 3: アクションで処理を実行

アクションを呼び出すことで、定義した処理が実際に実行されます。

# take()アクションで上位N件を取得
# ここで初めてSparkが処理を実行します!
print("🎯 処理を実行中...")
results = final_df.take(10)  # 上位10件を取得

# 結果を見やすく表示
print("\n📊 処理結果の最初の10件:")
print("-" * 80)
print(f"{'元のID':>10} | {'2倍の値':>10} | {'1000で割った値':>15} | {'100の余り':>10}")
print("-" * 80)

for row in results:
    print(f"{row['original_id']:10d} | "
          f"{row['doubled_value']:10d} | "
          f"{row['divided_by_1000']:15.2f} | "
          f"{row['modulo_100']:10d}")
🎯 処理を実行中...

📊 処理結果の最初の10件:
--------------------------------------------------------------------------------
      元のID |       2倍の値 |       1000で割った値 |     100の余り
--------------------------------------------------------------------------------
         0 |          0 |            0.00 |          0
         1 |          2 |            0.00 |          2
         2 |          4 |            0.00 |          4
         3 |          6 |            0.01 |          6
         4 |          8 |            0.01 |          8
         5 |         10 |            0.01 |         10
         6 |         12 |            0.01 |         12
         7 |         14 |            0.01 |         14
         8 |         16 |            0.02 |         16
         9 |         18 |            0.02 |         18

🎨 Databricksの可視化機能を活用

display()関数を使うと、結果をインタラクティブに表示できます。

# display()もアクションとして動作します
# 上位100件を表示(大量データの場合は自動的に制限されます)
print("📈 インタラクティブな表示:")
display(final_df.limit(100))  # limit()はトランスフォーメーション、display()がアクション

Screenshot 2025-08-24 at 10.15.04.png

📊 第4部: 実データで学ぶデータ処理

💎 ダイヤモンドデータセットの紹介

ここからは、実際のデータセットを使って実践的なデータ処理を学びます。
使用するのは、約54,000個のダイヤモンドの品質と価格に関するデータです。

pipeline.png

データの読み込み

CSV形式のファイルをSparkデータフレームに読み込みます。

# Databricksに事前に用意されているサンプルデータのパス
data_path = "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv"

# CSVファイルの読み込み
# spark.read を使用してデータを読み込みます
diamonds = (
    spark.read.format("csv")
    .option("header", "true")         # 1行目をヘッダーとして扱う
    .option("inferSchema", "true")    # データ型を自動推論
    .load(data_path)                   # ファイルパスを指定して読み込み
)

# データセットの基本情報を表示
print("📊 データセットの概要:")
print(f"  総レコード数: {diamonds.count():,}")
print(f"  カラム数: {len(diamonds.columns)}")
print("\n📝 カラム一覧:")
for i, col in enumerate(diamonds.columns, 1):
    print(f"  {i:2d}. {col}")
📊 データセットの概要:
  総レコード数: 53,940 件
  カラム数: 11 個

📝 カラム一覧:
   1. _c0
   2. carat
   3. cut
   4. color
   5. clarity
   6. depth
   7. table
   8. price
   9. x
  10. y
  11. z

データの探索

データの中身を確認して、どのような情報が含まれているか理解します。

# show()メソッドで最初の数行を表示
# truncate=Falseで文字列が切れないように表示
print("💎 ダイヤモンドデータの最初の5行:")
diamonds.show(5, truncate=False)

# 各カラムの説明
print("\n📖 カラムの説明:")
print("  - carat: カラット(重さ)")
print("  - cut: カットの品質(Fair, Good, Very Good, Premium, Ideal)")
print("  - color: 色のグレード(D=最高 から J=最低)")
print("  - clarity: 透明度(I1=最低, SI2, SI1, VS2, VS1, VVS2, VVS1, IF=最高)")
print("  - price: 価格(USドル)")
💎 ダイヤモンドデータの最初の5行:
+---+-----+-------+-----+-------+-----+-----+-----+----+----+----+
|_c0|carat|cut    |color|clarity|depth|table|price|x   |y   |z   |
+---+-----+-------+-----+-------+-----+-----+-----+----+----+----+
|1  |0.23 |Ideal  |E    |SI2    |61.5 |55.0 |326  |3.95|3.98|2.43|
|2  |0.21 |Premium|E    |SI1    |59.8 |61.0 |326  |3.89|3.84|2.31|
|3  |0.23 |Good   |E    |VS1    |56.9 |65.0 |327  |4.05|4.07|2.31|
|4  |0.29 |Premium|I    |VS2    |62.4 |58.0 |334  |4.2 |4.23|2.63|
|5  |0.31 |Good   |J    |SI2    |63.3 |58.0 |335  |4.34|4.35|2.75|
+---+-----+-------+-----+-------+-----+-----+-----+----+----+----+
only showing top 5 rows

📖 カラムの説明:
  - carat: カラット(重さ)
  - cut: カットの品質(Fair, Good, Very Good, Premium, Ideal)
  - color: 色のグレード(D=最高 から J=最低)
  - clarity: 透明度(I1=最低, SI2, SI1, VS2, VS1, VVS2, VVS1, IF=最高)
  - price: 価格(USドル)
# データ型(スキーマ)の詳細を確認
print("📋 データ型の詳細:")
diamonds.printSchema()

# 各データ型の説明
print("\n💡 データ型の説明:")
print("  - string: 文字列型(カテゴリカルデータ)")
print("  - double: 浮動小数点型(連続値)")
print("  - integer: 整数型")
📋 データ型の詳細:
root
 |-- _c0: integer (nullable = true)
 |-- carat: double (nullable = true)
 |-- cut: string (nullable = true)
 |-- color: string (nullable = true)
 |-- clarity: string (nullable = true)
 |-- depth: double (nullable = true)
 |-- table: double (nullable = true)
 |-- price: integer (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)


💡 データ型の説明:
  - string: 文字列型(カテゴリカルデータ)
  - double: 浮動小数点型(連続値)
  - integer: 整数型

データクレンジング

分析しやすいようにデータを整形します。

from pyspark.sql.functions import col

# カラム名の変更とデータ型の調整
diamonds_clean = (
    diamonds
    .withColumnRenamed("_c0", "index")           # 最初のカラムをindexに変更
    .withColumn("price", col("price").cast("integer"))  # priceを整数型に変換
    .withColumn("carat", col("carat").cast("float"))    # caratを浮動小数点型に変換
)

# クレンジング結果を確認
print("✨ クレンジング後のデータ:")
display(diamonds_clean.limit(5))

Screenshot 2025-08-24 at 10.16.42.png

# NULL値の確認
# 実データでは欠損値の確認が重要です
from pyspark.sql.functions import count, when, isnan

print("🔍 各カラムのNULL値をチェック:")

# 各カラムのNULL値をカウントする処理の詳細
# 1. diamonds_clean.columns で全カラム名のリストを取得
# 2. 各カラムに対してcount(when(...))を適用
#    - when(col(c).isNull(), c): カラムcがNULLの場合、カラム名を返す
#    - count(): NULLでない値(つまり上記whenで返されたカラム名)をカウント
#    - alias(c): 結果のカラム名を元のカラム名と同じにする
# 3. select([...])で全カラムのNULLカウントを一度に取得

null_counts = diamonds_clean.select([
    count(when(col(c).isNull(), c)).alias(c) 
    for c in diamonds_clean.columns
])

# 結果を表示
# 各カラムの下に表示される数値がNULL値の個数
# 0が表示されていれば、そのカラムにはNULL値が存在しない
display(null_counts)

# 追加情報: isnan()関数について
# isnan()は数値カラムのNaN(Not a Number)をチェックする関数
# このデータセットでは文字列カラムも含まれているため、
# isNull()を使用してNULL値をチェックしています

print("\n✅ このデータセットには欠損値がないことが確認できました")
print("   (全カラムでNULL値のカウントが0)")

Screenshot 2025-08-24 at 10.17.18.png

📈 第5部: データ分析の実践

基本的な集計処理

GroupByとAggregation関数を使って、データを集計します。

# 必要な集計関数をインポート
from pyspark.sql.functions import avg, max, min, stddev, round, count

# カット品質ごとの価格統計を計算
cut_analysis = (
    diamonds_clean
    .groupBy("cut")                              # カット品質でグループ化
    .agg(
        count("*").alias("個数"),                 # レコード数をカウント
        round(avg("price"), 2).alias("平均価格"),  # 平均価格(小数点2桁)
        round(min("price"), 2).alias("最低価格"),  # 最低価格
        round(max("price"), 2).alias("最高価格"),  # 最高価格
        round(stddev("price"), 2).alias("価格の標準偏差")  # 価格のばらつき
    )
    .orderBy("平均価格", ascending=False)          # 平均価格の降順でソート
)

print("💎 カット品質別の価格分析:")
print("(注: 興味深いことに、'Ideal'カットが最も安い傾向があります)")
display(cut_analysis)

Screenshot 2025-08-24 at 10.17.45.png

複雑な分析: カテゴリ分けと複合集計

条件分岐を使ったカテゴリ分けと、複数の軸での集計を行います。

from pyspark.sql.functions import when, col

# カラットサイズでカテゴリ分け
# when()関数で条件分岐を実装
# 番号を付けることで、表示時に適切な順序でソートされるようにする
diamonds_categorized = diamonds_clean.withColumn(
    "carat_category",
    when(col("carat") < 0.5, "1_小(< 0.5ct)")
    .when(col("carat") < 1.0, "2_中(0.5-1.0ct)")
    .when(col("carat") < 1.5, "3_大(1.0-1.5ct)")
    .otherwise("4_特大(>= 1.5ct)")
)

# カテゴリとカットの組み合わせで集計
category_analysis = (
    diamonds_categorized
    .groupBy("carat_category", "cut")            # 2つの軸でグループ化
    .agg(
        count("*").alias("個数"),
        round(avg("price"), 2).alias("平均価格")
    )
    .orderBy("carat_category", "cut")            # カテゴリ、カットの順でソート
)

print("📊 カラットカテゴリー別・カット別の分析:")
print("(番号付きラベルにより、小→中→大→特大の順で表示されます)")
print("(サイズが大きくなるほど価格が上昇することが確認できます)")
display(category_analysis)

可視化のタブもクリックしてみてください。
Screenshot 2025-08-24 at 10.18.20.png

ウィンドウ関数を使った高度な分析

ウィンドウ関数を使うと、グループ内でのランキングや累積値などを計算できます。

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank

# ウィンドウの定義: カット品質ごとに価格でソート
window_spec = Window.partitionBy("cut").orderBy(col("price").desc())

# 各カット品質内での価格ランキングを計算
top_diamonds = (
    diamonds_clean
    .withColumn("price_rank", row_number().over(window_spec))  # 順位を付与
    .filter(col("price_rank") <= 3)                           # トップ3のみ抽出
    .select("cut", "carat", "color", "clarity", "price", "price_rank")
    .orderBy("cut", "price_rank")                              # 表示用にソート
)

print("🏆 各カット品質における最も高価なダイヤモンドTop3:")
display(top_diamonds)

Screenshot 2025-08-24 at 10.19.26.png

🚀 第6部: SQLとの連携

Spark SQLの活用

SparkデータフレームをSQLで操作する方法を学びます。
SQLに慣れている方にとって、非常に親しみやすいインターフェースです。

dataframe_sql.png

# データフレームを一時ビューとして登録
# これにより、SQLクエリでデータフレームを参照できるようになります
diamonds_clean.createOrReplaceTempView("diamonds_view")

# SQLクエリを実行
# spark.sql()メソッドでSQLを実行し、結果をデータフレームとして取得
sql_result = spark.sql("""
    SELECT 
        cut,                                      -- カット品質
        color,                                    -- 色のグレード
        COUNT(*) as count,                        -- レコード数
        ROUND(AVG(price), 2) as avg_price,       -- 平均価格
        ROUND(AVG(carat), 3) as avg_carat        -- 平均カラット
    FROM diamonds_view
    WHERE price > 1000                           -- 1000ドル以上のダイヤモンドのみ
    GROUP BY cut, color                          -- カットと色でグループ化
    HAVING COUNT(*) > 100                        -- 100個以上のグループのみ
    ORDER BY avg_price DESC                      -- 平均価格の降順
    LIMIT 10                                      -- 上位10件
""")

print("🔍 SQL分析結果(価格1000ドル以上、100個以上のグループ):")
display(sql_result)

Screenshot 2025-08-24 at 10.20.08.png

SQLマジックコマンドの使用

Databricksでは%sqlマジックコマンドを使って、セル全体をSQLクエリとして実行できます。

%sql
-- SQLマジックコマンドで直接SQL実行
-- より複雑な分析: 透明度ごとの統計
SELECT 
    clarity,                                     -- 透明度グレード
    COUNT(DISTINCT cut) as cut_variations,      -- カット種類の数
    COUNT(*) as total_count,                    -- 総数
    MIN(price) as min_price,                    -- 最低価格
    MAX(price) as max_price,                    -- 最高価格
    ROUND(MAX(price) - MIN(price), 2) as price_range  -- 価格レンジ
FROM diamonds_view
GROUP BY clarity
ORDER BY total_count DESC

Screenshot 2025-08-24 at 10.20.37.png

🎯 第7部: パフォーマンス最適化の基礎知識

📚 最適化テクニックの理解

Sparkのパフォーマンス最適化には以下のような手法があります:

  1. キャッシング: 頻繁に使用するデータフレームをメモリに保持
  2. パーティショニング: データを適切なサイズに分割
  3. ブロードキャスト結合: 小さなテーブルを各ノードに配布
  4. 述語プッシュダウン: フィルタ条件を早期に適用

サーバレスコンピュートでの最適化

Databricksのサーバレスコンピュートでは、多くの最適化が自動的に行われます。
以下、実践可能な最適化テクニックを紹介します:

# 最適化のベストプラクティスを実演

# 1. フィルタの早期適用(述語プッシュダウン)
# 良い例: フィルタを早期に適用
optimized_df = (
    diamonds_clean
    .filter(col("price") > 5000)      # まずフィルタ(データ量を削減)
    .groupBy("cut")                   # その後集計
    .agg(avg("carat").alias("avg_carat"))
)

print("✅ 最適化された処理: フィルタを先に適用")
display(optimized_df)

# 2. 必要なカラムのみ選択
# データ転送量を削減
selected_columns = (
    diamonds_clean
    .select("cut", "price", "carat")  # 必要なカラムのみ選択
    .filter(col("price") > 10000)
)

print("\n✅ 必要なカラムのみを選択して処理")
print(f"選択されたカラム数: {len(selected_columns.columns)}")

Screenshot 2025-08-24 at 10.21.18.png

最適化のベストプラクティス

サーバレス環境で効果的な最適化手法:

  1. 早期フィルタリング: WHERE句は可能な限り早い段階で適用
  2. カラムの選択: 必要なカラムのみをSELECT
  3. 適切な結合順序: 小さいテーブルを先に結合
  4. パーティション述語: パーティション化されたテーブルでは適切なフィルタを使用

optimization.png

🔄 第8部: Pandas API on Sparkの活用

Pandas APIでSparkを使う

Pandasに慣れている方でも、Sparkの分散処理能力を活用できます。

import pyspark.pandas as ps
import numpy as np

# Pandas API on Sparkでデータフレームを作成
# 通常のPandasと同じ構文で記述できます
ps_df = ps.DataFrame({
    'A': np.random.rand(1000),     # ランダムな値(0-1)
    'B': np.random.rand(1000),     # ランダムな値(0-1)
    'C': np.random.randn(1000)     # 正規分布に従うランダム値
})

# Pandasと同じメソッドが使える
print("📊 Pandas API on Sparkの統計情報:")
print(ps_df.describe())  # 基本統計量の表示

print("\n🔝 上位5件(Aカラムでソート):")
# sort_valuesやheadなど、Pandasでお馴染みのメソッドが使用可能
print(ps_df.sort_values('A', ascending=False).head())
📊 Pandas API on Sparkの統計情報:
                 A            B            C
count  1000.000000  1000.000000  1000.000000
mean      0.505290     0.494652     0.007897
std       0.294411     0.291331     1.031623
min       0.001276     0.001369    -3.640795
25%       0.234287     0.254717    -0.673122
50%       0.510131     0.486816     0.016236
75%       0.759370     0.750676     0.718045
max       0.999539     0.999994     4.161558

🔝 上位5件(Aカラムでソート):
            A         B         C
198  0.999539  0.278639  0.332884
936  0.998892  0.938016  1.724161
469  0.998588  0.965553 -1.733019
184  0.998554  0.990778  0.500505
94   0.998232  0.314186  0.845150

既存のSparkデータフレームをPandas APIで操作

# SparkデータフレームをPandas API on Sparkに変換
diamonds_ps = diamonds_clean.to_pandas_on_spark()

# Pandasスタイルでの基本統計
print("💰 価格カラムの統計情報(Pandas API):")
price_stats = diamonds_ps['price'].describe()
print(price_stats)

# 基本的な集計を個別に実行
print("\n📈 カット別の基本統計:")
cut_types = diamonds_ps['cut'].unique().to_numpy()  # NumPy配列に変換
for cut_type in cut_types[:3]:  # デモのため最初の3つのみ表示
    cut_data = diamonds_ps[diamonds_ps['cut'] == cut_type]['price']
    print(f"\n{cut_type}:")
    print(f"  平均: ${cut_data.mean():.2f}")
    print(f"  標準偏差: ${cut_data.std():.2f}")
    print(f"  件数: {len(cut_data):,}")
💰 価格カラムの統計情報(Pandas API):
count    53940.000000
mean      3932.799722
std       3989.439738
min        326.000000
25%        950.000000
50%       2401.000000
75%       5324.000000
max      18823.000000
Name: price, dtype: float64

📈 カット別の基本統計:

Ideal:
  平均: $3457.54
  標準偏差: $3808.40
  件数: 21,551

Fair:
  平均: $4358.76
  標準偏差: $3560.39
  件数: 1,610

Premium:
  平均: $4584.26
  標準偏差: $4349.20
  件数: 13,791

💾 第9部: Unity Catalogとデータの永続化

📚 Unity Catalogとは

Unity Catalogは、Databricksのデータガバナンスソリューションです:

  • 統一されたメタストア: すべてのデータ資産を一元管理
  • 細かいアクセス制御: テーブル、カラムレベルでの権限管理
  • データリネージ: データの流れを追跡
  • データ発見: カタログを通じたデータの検索と理解

SparkとUnity Catalogの関係

Unity Catalog階層:
├── カタログ (Catalog)
│   ├── スキーマ/データベース (Schema/Database)
│   │   ├── テーブル (Table)
│   │   ├── ビュー (View)
│   │   └── 関数 (Function)

uc.png

データの保存先

サーバレスコンピュートでは、ファイルシステムへの直接書き込みが制限されているため、
Unity Catalogのテーブルとしてデータを保存します。

# 現在のカタログとスキーマを確認
current_catalog = spark.sql("SELECT current_catalog()").collect()[0][0]
current_schema = spark.sql("SELECT current_database()").collect()[0][0]

print("📍 現在の位置:")
print(f"  カタログ: {current_catalog}")
print(f"  スキーマ: {current_schema}")

# workspace.defaultスキーマを使用
# これは個人用の作業領域として提供されています
spark.sql("USE workspace.default")
print("\n✅ workspace.defaultスキーマに切り替えました")
📍 現在の位置:
  カタログ: workspace
  スキーマ: default

✅ workspace.defaultスキーマに切り替えました

マネージドテーブルとしての保存

Unity Catalogのマネージドテーブルとしてデータを保存します。

import uuid
from datetime import datetime

# ユニークなテーブル名を生成(衝突を避けるため)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
unique_suffix = str(uuid.uuid4())[:8]
table_name = f"diamonds_analysis_{timestamp}_{unique_suffix}"

# テーブルとして保存
# Delta形式で保存されます(Databricksのデフォルト)
(
    diamonds_clean.write
    .mode("overwrite")          # 既存テーブルがあれば上書き
    .saveAsTable(f"workspace.default.{table_name}")
)

print(f"✅ テーブルが保存されました: workspace.default.{table_name}")
print("\n📊 保存されたテーブルの情報:")
spark.sql(f"DESCRIBE EXTENDED workspace.default.{table_name}").show(truncate=False)
 テーブルが保存されました: workspace.default.diamonds_analysis_20250824_011243_c58f2d0a

📊 保存されたテーブルの情報:
+----------------------------+---------------------------------------------------------------+-------+
|col_name                    |data_type                                                      |comment|
+----------------------------+---------------------------------------------------------------+-------+
|index                       |int                                                            |NULL   |
|carat                       |float                                                          |NULL   |
|cut                         |string                                                         |NULL   |
|color                       |string                                                         |NULL   |
|clarity                     |string                                                         |NULL   |
|depth                       |double                                                         |NULL   |
|table                       |double                                                         |NULL   |
|price                       |int                                                            |NULL   |
|x                           |double                                                         |NULL   |
|y                           |double                                                         |NULL   |
|z                           |double                                                         |NULL   |
|                            |                                                               |       |
|# Delta Statistics Columns  |                                                               |       |
|Column Names                |x, y, color, carat, depth, clarity, cut, price, table, index, z|       |
|Column Selection Method     |first-32                                                       |       |
|                            |                                                               |       |
|# Detailed Table Information|                                                               |       |
|Catalog                     |workspace                                                      |       |
|Database                    |default                                                        |       |
|Table                       |diamonds_analysis_20250824_011243_c58f2d0a                     |       |
+----------------------------+---------------------------------------------------------------+-------+
only showing top 20 rows

以下のセルを実行して表示されるリンクをクリックしてテーブルを確認してみましょう。DatabricksではカタログエクスプローラというUIでテーブルやデータベース、ファイル、モデルなどにアクセスすることができます。

displayHTML(f"<a href='/explore/data/workspace/default/{table_name}'>保存したテーブル</a>")

Screenshot 2025-08-24 at 10.24.28.png

保存したテーブルの読み込み

# テーブルからデータを読み込み
loaded_df = spark.table(f"workspace.default.{table_name}")

# 読み込んだデータを確認
print(f"📥 テーブルから読み込んだデータ:")
print(f"  レコード数: {loaded_df.count():,}")
print(f"  カラム数: {len(loaded_df.columns)}")

# SQLでもアクセス可能
sql_query = f"""
SELECT cut, COUNT(*) as count, AVG(price) as avg_price
FROM workspace.default.{table_name}
GROUP BY cut
ORDER BY avg_price DESC
"""

print("\n🔍 SQLでテーブルをクエリ:")
display(spark.sql(sql_query))

Screenshot 2025-08-24 at 10.25.00.png

Delta Lake形式の利点

Databricksでは、テーブルはDelta Lake形式で保存されます。

# Delta Lakeの特徴を確認
print("🏞️ Delta Lake形式の利点:")
print("  1. ACID トランザクション: データの一貫性を保証")
print("  2. タイムトラベル: 過去のバージョンにアクセス可能")
print("  3. スキーマエボリューション: スキーマの変更に対応")
print("  4. データ圧縮: 効率的なストレージ使用")
print("  5. Z-Ordering: クエリパフォーマンスの最適化")

# テーブルの履歴を確認
print(f"\n📜 テーブルの変更履歴:")
history_df = spark.sql(f"DESCRIBE HISTORY workspace.default.{table_name} LIMIT 5")
display(history_df.select("version", "timestamp", "operation", "operationMetrics"))

Screenshot 2025-08-24 at 10.25.30.png

🧹 第10部: クリーンアップとベストプラクティス

テーブルのクリーンアップ

# 作成したテーブルを削除(もくもく会終了時)
# コメントを外して実行してください
# spark.sql(f"DROP TABLE IF EXISTS workspace.default.{table_name}")
# print(f"🧹 テーブル {table_name} を削除しました")

print("⚠️ クリーンアップを実行する場合は、上記のコメントを外してください")

📚 学んだことのまとめ

このもくもく会で学んだ重要なポイント:

1. Sparkの基本概念

  • 分散処理アーキテクチャ(ドライバーとエグゼキューター)
  • 遅延評価(トランスフォーメーションとアクション)
  • データフレームAPI
  • Pandasとの違いと使い分け

2. データ処理テクニック

  • データの読み込みとクレンジング
  • 集計とグループ化
  • ウィンドウ関数の活用

3. SQL統合

  • SparkデータフレームのSQL操作
  • 一時ビューの活用
  • SQLマジックコマンド

4. Unity Catalogとの統合

  • カタログ・スキーマ・テーブルの階層構造
  • マネージドテーブルとしてのデータ保存
  • Delta Lake形式の利点

5. ベストプラクティス

  • フィルタの早期適用
  • 必要なカラムのみ選択
  • 適切なデータ形式の選択

🎓 次のステップ

さらに学習を深めるために

  1. 構造化ストリーミング: リアルタイムデータ処理
  2. MLlib: Sparkの機械学習ライブラリ
  3. Delta Lake: より高度なデータレイク機能
  4. Photon: Databricksの高速実行エンジン

参考リソース

🚀 チャレンジ問題

もくもく会の残り時間で以下の課題に挑戦してみましょう!

ノートブックをご覧ください。

🎉 お疲れさまでした!

Apache Sparkの世界への第一歩を踏み出しました。今日学んだ基礎を活かして、より大規模で複雑なデータ処理に挑戦していってください!

💬 質問とディスカッション

  • 不明な点があれば、遠慮なくメンターに質問してください
  • 他の参加者と学びを共有しましょう
  • 実務での活用アイデアを話し合いましょう

🔗 コミュニティ

  • Databricks Community に参加して継続的に学習
  • Apache Sparkユーザーグループでの情報交換
  • 定期的なもくもく会への参加

Happy Sparking! ✨

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?