もともとサインアップするとチュートリアルは利用できるのですが、自分でまとめてみました。こちらの記事でそれらのチュートリアルをウォークスルーしています。
今回のチュートリアルのノートブックはこちらです。
以下の構成にしています。
-
0. My first Databricks
: Databricksの基本的な使い方を説明します。 -
1. Unity Catalog
: Unity Catalogのコンセプト、基本的な使い方を学びます。 -
2. PySpark transformation
: PySparkを用いたデータ変換処理を体験します。 -
3. ML Tutorial
: scikit-learnを使ったモデルの構築、管理を学びます。
前提: Databricks Free Editionアカウント作成済み
注意事項
-
複数のノートブックから右上の「接続」からサーバレスに接続した際、2つ目のノートブックの実行時にMax Retried Errorが発生する場合があります。
-
これはサーバレスの計算資源の制限によるものです。
-
2つ目のノートブックを実行する際に、1つ目のノートブックの右上の接続済み→接続済み(サーバレス)→終了をクリックして1つ目のノートブックの接続を切断
してください。
-
その後数分待ってから2つ目のノートブックを実行してください。
0. My first Databricks - はじめてのDatabricks
このノートブックではDatabricksの基本的な使い方をご説明します。
Databricksの使い方
画面の説明
Databricksでデータ分析を行う際に頻繁に使用するのが、今画面に表示しているノートブックです。AIアシスタントをはじめ、分析者の生産性を高めるための工夫が随所に施されています。
計算資源(コンピュート)
Databricksにおける計算資源はコンピュートと呼ばれます。Databricksが全てを管理するサーバレスの計算資源も利用できます。
- コンピュート
- SQLウェアハウス
ここでは、画面の右上にある接続ボタンをおして、サーバレスコンピュートを選択します。
接続済みとなれば計算資源を使用してプログラムを実行することができます。
プログラムを実行するにはセルの左上にある▶︎ボタンをクリックします。
print("Hello Databricks!")
Hello Databricks!
データの読み込み
Databricksでは上述のコンピュートを用いて、ファイルやテーブルにアクセスします。SQLやPython、Rなどを活用することができます。Databricksノートブックではこれらの複数の言語を混在させることができます。
AIアシスタントを活用してロジックを組み立てることができます。アシスタントを呼び出すには下のセルにカーソルを合わせると右上に表示される
アイコンをクリックします。そして、プロンプトボックスに以下のプロンプトを入力し、生成をクリックします。
プロンプト: samples.tpch.ordersの中身を1000行表示
可視化1というタブも確認してみましょう。
データの加工
AIアシスタントを活用してロジックを組み立てることができます。なお、アシスタントを活用する際には具体的に指示することが重要です。
アシスタントに以下の指示を行うことでコードにコメントを追加してもらうことができます。
プロンプト:クエリーの処理に関する日本語コメントを追加して
%sql
SELECT
o_orderkey,
o_custkey,
o_orderstatus,
o_totalprice,
o_orderdate,
o_orderpriority
FROM
samples.tpch.orders
WHERE
o_orderdate = "1998-07-01"
AND o_totalprice >= 100000
参考資料
- Databricksアシスタントの新機能を試す
- Databricksアシスタントを用いたEDA(探索的データ分析)
- DatabricksアシスタントによるEDA(探索的データ分析) その2
- Databricksアシスタントによるデータエンジニアリング
データの書き込み
テーブルから別のテーブルを作成するといった作業をしていると、このテーブルはどうやって作ったんだっけ?
となりがちです。そのような依存関係は、Databricksでは リネージ(系統情報) として自動で記録されます。
%sql
CREATE TABLE workspace.default.tpch_orders_199807 AS
SELECT
o_orderkey,
o_custkey,
o_orderstatus,
o_totalprice,
o_orderdate,
o_orderpriority
FROM
samples.tpch.orders
WHERE
o_orderdate = "1998-07-01"
AND o_totalprice >= 100000
上で作成したテーブルから、さらに別のテーブルを作成します。
%sql
CREATE TABLE workspace.default.tpch_orders_199807_derived AS
SELECT
COUNT(o_orderpriority) AS order_cnt,
o_orderpriority
FROM
workspace.default.tpch_orders_199807
GROUP BY
o_orderpriority
ORDER BY
order_cnt DESC
カタログエクスプローラでテーブルを確認しましょう。
また、リネージを確認してみましょう。リネージグラフボタンをクリックすることでより視覚的に関係性を確認することができます。
1. Unity Catalog - Unity Catalogクイックスタート
Unity Catalogを用いることで、以下の機能でDatabricks上の資産に対するガバナンスを強化することができます。
-
データアクセス制御
- 一度定義するだけで、全てのワークスペース、全ての言語、全てのユースケースに対してセキュリティを適用
-
監査機能
- ガバナンスユースケースに対応するために、全てのクエリーに対するきめ細かい監査を実現
-
データリネージ
- テーブル、カラムに対するデータリネージの自動収集
-
データ探索
- お使いのレイクハウスにおいてデータ利用者が信頼できるデータを検索するためのインタフェースを提供
-
属性ベースのアクセス制御
- 行列レベルのネイティブなセキュリティ、タグによるポリシー適用
このノートブックでは、以下の方法を説明することで、Unity Catalogを使用し始めるサンプルワークフローを提供します。
- カタログを選択し、新規スキーマを作成する。
- マネージドテーブルを作成しスキーマに追加する。
- 3レベルの名前空間を用いてテーブルにクエリーを実行する。
- データリネージを追跡する。
- テーブルに対するデータアクセス権を管理する。
3レベルの名前空間
Unity Catalogはデータを整理するために3レベルの名前空間、カタログ、スキーマ(データベースとも呼ばれます)、テーブルとビューを提供します。テーブルを参照するには以下の文法を使用します。
<catalog>.<schema>.<table>
新規カタログの作成
それぞれのUnity Catalogメタストアには、空のスキーマdefault
を含むworkspace
というデフォルトのカタログが含まれています。
新規カタログを作成するにはCREATE CATALOG
コマンドを使用します。新規カタログを作成するには、メタストア管理者である必要があります。
以降のコマンドでは、以下の方法を説明します。
- 新規カタログの作成
- カタログの選択
- 全てのカタログの表示
- カタログに対するアクセス権の許可
- カタログの全てのアクセス権の表示
注意 以下ではuc_quickstart_catalog
というカタログを作成します。
カタログやスキーマにアクセスするには、USE
コマンドを用いて明示的にカタログやスキーマを指定する必要があります。指定されない場合にはデフォルトのカタログやスキーマが使用されます。
以下のコマンドでは現時点でのデフォルトカタログを確認しています。
SELECT current_catalog();
--- 新規カタログの作成
CREATE CATALOG IF NOT EXISTS uc_quickstart_catalog;
-- カタログの選択
USE CATALOG uc_quickstart_catalog;
--- メタストアの全てのカタログの表示
SHOW CATALOGS;
catalog |
---|
samples |
system |
uc_quickstart_catalog |
workspace |
--- アカウントの全ユーザーに対してCREATE SCHEMA, USE CATALOG権限を許可
--- これは、他のアカウントレベルのグループ、ユーザーにも動作します
GRANT CREATE SCHEMA, USE CATALOG
ON CATALOG uc_quickstart_catalog
TO `account users`;
--- カタログのアクセス権の確認
SHOW GRANT ON CATALOG uc_quickstart_catalog;
Principal | ActionType | ObjectType | ObjectKey |
---|---|---|---|
account users | CREATE SCHEMA | CATALOG | uc_quickstart_catalog |
account users | USE CATALOG | CATALOG | uc_quickstart_catalog |
スキーマの作成、管理
データベースとも呼ばれるスキーマは、Unity Catalog名前空間の2番目のレイヤーです。これらはテーブルとビューを論理的に整理します。
--- カタログに新規スキーマを作成
CREATE SCHEMA IF NOT EXISTS quickstart_schema
COMMENT "quickstart_schemaという新規Unity Catalogのスキーマです";
-- 選択されたカタログのスキーマを表示
SHOW SCHEMAS;
|databaseName|
|---|
|default|
|information_schema|
|quickstart_schema|
-- スキーマの詳細を表示
DESCRIBE SCHEMA EXTENDED quickstart_schema;
database_description_item | database_description_value |
---|---|
Catalog Name | uc_quickstart_catalog |
Namespace Name | quickstart_schema |
Comment | quickstart_schemaという新規Unity Catalogのスキーマです |
Location | |
Owner | takaaki.yayoi@databricks.com |
Properties | |
Predictive Optimization | ENABLE (inherited from METASTORE metastore_aws_us_east_2) |
マネージドテーブルの作成
マネージドテーブルは、Unity Catalogでテーブルを作成する際のデフォルトの方法となります。LOCATION
が指定されない場合、テーブルはメタストアに設定されたマネージドの格納場所に作成されます。
以下のコマンドでは、次の方法を説明します。
- スキーマの選択
- マネージドテーブルを作成し、レコードをインサート
- スキーマの全テーブルの表示
- テーブルの詳細の表示
-- スキーマの選択
USE quickstart_schema;
-- マネージドのDeltaテーブルを作成、2行を追加
CREATE TABLE IF NOT EXISTS quickstart_table
(columnA Int, columnB String) PARTITIONED BY (columnA);
INSERT INTO TABLE quickstart_table
VALUES
(1, "one"),
(2, "two");
-- スキーマの全テーブルの表示
SHOW TABLES IN quickstart_schema;
database | tableName | isTemporary |
---|---|---|
quickstart_schema | quickstart_table | false |
_sqldf | true |
2行目は一旦スルーしてください。詳細はこちらをご覧ください。
-- テーブルの詳細を表示
DESCRIBE TABLE EXTENDED quickstart_table;
col_name | data_type | comment |
---|---|---|
columnA | int | null |
columnB | string | null |
# Partition Information | ||
# col_name | data_type | comment |
columnA | int | null |
# Delta Statistics Columns | ||
Column Names | columnB | |
Column Selection Method | first-32 | |
# Detailed Table Information | ||
Catalog | uc_quickstart_catalog | |
Database | quickstart_schema | |
Table | quickstart_table | |
Created Time | Wed Jul 30 12:00:15 UTC 2025 | |
Last Access | UNKNOWN | |
Created By | Spark | |
Statistics | 1244 bytes, 2 rows | |
Type | MANAGED | |
Location | ||
Provider | delta | |
Owner | takaaki.yayoi@databricks.com | |
Is_managed_location | true | |
Predictive Optimization | ENABLE (inherited from METASTORE metastore_aws_us_east_2) | |
Table Properties | [delta.enableDeletionVectors=true,delta.feature.appendOnly=supported,delta.feature.deletionVectors=supported,delta.feature.invariants=supported,delta.minReaderVersion=3,delta.minWriterVersion=7] |
3レベルの名前空間を用いることで、いくつかの方法でテーブルにアクセスすることができます。
- 完全修飾名によるアクセス
- デフォルトカタログを選択し、スキーマ、テーブル名を用いてテーブルにアクセス
- デフォルトスキーマを選択し、テーブル名を使用
以下の3つのコマンドは機能的に等価です。
-- 3レベル名前空間を用いたテーブルへのクエリーの実行
SELECT
*
FROM
uc_quickstart_catalog.quickstart_schema.quickstart_table;
-- デフォルトカタログを選択し、スキーマ、テーブル名でテーブルへクエリーを実行
USE CATALOG uc_quickstart_catalog;
SELECT *
FROM quickstart_schema.quickstart_table;
-- デフォルトカタログとデフォルトスキーマを選択し、テーブル名でテーブルへクエリーを実行
USE CATALOG uc_quickstart_catalog;
USE quickstart_schema;
SELECT *
FROM quickstart_table;
テーブルの削除
DROP TABLE
コマンドでマネージドテーブルを削除すると、背後のデータファイルも削除されます。
DROP TABLE
コマンドで外部テーブルを削除すると、テーブルに関するメタデータはカタログから削除されますが、背後のデータファイルは削除されません。
-- マネージドテーブルの削除(以下の行のコメントを解除して試してください。ノートブックの以降のコマンドを実行するには、テーブルを再作成してください)
-- DROP TABLE uc_quickstart_catalog.quickstart_schema.quickstart_table
リネージの追跡
Unity Catalogで管理されるテーブルはリネージ(系統情報)が追跡されます。どのテーブルからどのテーブルが作成されたのか、カラムレベルで追跡が可能です。
-- スキーマを作成し、権限を付与します
CREATE SCHEMA lineagedemo;
GRANT USE SCHEMA, CREATE TABLE ON SCHEMA lineagedemo TO `account users`;
-- ソーステーブルを作成します
CREATE TABLE IF NOT EXISTS uc_quickstart_catalog.lineagedemo.menu (
recipe_id INT,
app string,
main string,
desert string
);
INSERT INTO uc_quickstart_catalog.lineagedemo.menu
(recipe_id, app, main, desert)
VALUES
(1,"Ceviche", "Tacos", "Flan"),
(2,"Tomato Soup", "Souffle", "Creme Brulee"),
(3,"Chips","Grilled Cheese","Cheescake");
-- ソーステーブルから派生テーブルを作成します
CREATE TABLE uc_quickstart_catalog.lineagedemo.dinner
AS SELECT recipe_id, concat(app," + ", main," + ",desert) as full_menu FROM uc_quickstart_catalog.lineagedemo.menu
データに対するアクセス権の管理
データへのアクセスを管理するために、GRANT
とREVOKE
文を使用します。Unity Catalogはデフォルトでセキュリティ保護を行い、データへのアクセスは自動では許可されません。最初は、全てのユーザーがデータにアクセスすることができません。メタストアの管理者とデータのオーナーが、アカウントレベルのユーザーとグループに対してアクセス権の許可、剥奪を行うことができます。
Unity Catalog権限とセキュリティ保護可能なオブジェクト
オーナーシップ
Unity Catalogのそれぞれのオブジェクトにはオーナーが存在します。プリンシパルと呼ばれるアカウントレベルのユーザー、グループがオーナーになることができます。プリンシパルはセキュリティ保護可能なオブジェクトを作成する際、あるいはALTER
文を用いてオーナーシップが転送された際にオーナーになります。
アクセス権の管理
Unity Catalogのオブジェクトに対して以下のアクセス権を許可することができます。
-
ALL PRIVILEGES
: セキュリティ保護可能なオブジェクトとその子オブジェクトに適用されるすべての権限を、明示的に指定せずに付与または取り消すために使用されます。 -
SELECT
: テーブル、ビューからデータを読み込むことを許可します。 -
MODIFY
: テーブルへの行追加、更新、データの削除を許可します。 -
CREATE CATALOG/SCHEMA/TABLE
: カタログ、スキーマにセキュリティ保護可能な子供のオブジェクトを作成することを許可します。 -
USAGE CATALOG/SCHEMA
: このアクセス権はセキュリティ保護可能なオブジェクト自身へのアクセスを許可しませんが、子供のオブジェクトにアクセスするためにセキュリティ保護可能オブジェクトを通過することを許可します。例えば、テーブルからデータを取得するためには、ユーザーは当該テーブルに対するUSAGE
権限と、親のスキーマ、親のカタログに対するUSAGE
権限を必要とします。特定のグループに対してデータの名前空間のセクションへのアクセスを制限するために、この権限を使用することができます。
権限は子供のセキュリティ保護可能なオブジェクトに継承されます。
以下のコマンドでは、次の方法を説明します。
- アクセス権の許可
- セキュリティ保護可能オブジェクトに対するアクセス権の表示
- アクセス権の剥奪
アクセス権の許可
セキュリティ保護可能オブジェクトにプリンシパルのアクセス権を許可します。メタストア管理者とセキュリティ保護可能オブジェクトのオーナーのみがアクセス権を許可することができます。
-- スキーマの USE を許可
GRANT USE SCHEMA
ON SCHEMA quickstart_schema
TO `account users`;
-- プリンシパルに対してテーブルの SELECT を許可
GRANT SELECT
ON TABLE quickstart_schema.quickstart_table
TO `account users`;
アクセス権の表示
セキュリティ保護可能なオブジェクトに許可された全てのアクセス権の一覧を表示します。
-- quickstart_tableのアクセス権の表示
SHOW GRANTS
ON TABLE uc_quickstart_catalog.quickstart_schema.quickstart_table;
Principal | ActionType | ObjectType | ObjectKey |
---|---|---|---|
account users | SELECT | TABLE | uc_quickstart_catalog.quickstart_schema.quickstart_table |
-- quickstart_schemaのアクセス権の表示
SHOW GRANTS
ON SCHEMA uc_quickstart_catalog.quickstart_schema;
Principal | ActionType | ObjectType | ObjectKey |
---|---|---|---|
account users | USE SCHEMA | SCHEMA | uc_quickstart_catalog.quickstart_schema |
アクセス権の剥奪
これまで許可されていたセキュリティ保護可能なオブジェクトのアクセス権をプリンシパルから剥奪します。
REVOKE SELECT
ON TABLE quickstart_schema.quickstart_table
FROM `account users`;
-- quickstart_tableのアクセス権の表示
SHOW GRANTS
ON TABLE uc_quickstart_catalog.quickstart_schema.quickstart_table;
この場合、結果は空となります。
クリーンアップ
以下のDROP CATALOG ... CASCADE
でカタログおよび内部のスキーマ、テーブルを削除します。
DROP CATALOG uc_quickstart_catalog CASCADE;
2. PySpark transformation - PySparkによるデータ変換
PythonからApache Sparkを操作する際に使用するAPIであるPySparkの基本的な使い方を説明します。
参考資料
- PySparkことはじめ #Databricks - Qiita
- About Spark – Databricks
- Databricks Apache Sparkクイックスタート - Qiita
- Databricks Apache Sparkデータフレームチュートリアル - Qiita
- PySpark Documentation — PySpark 3.2.1 documentation
- Beginner’s Guide on Databricks: Spark Using Python & PySpark | by Christopher Lewis | Analytics Vidhya | Medium
- 【PySpark入門】第1弾 PySparkとは? - サーバーワークスエンジニアブログ
ライブラリのインポート
処理に必要なモジュールをインポートします。
データのロード
PySparkでデータをロードする際にはspark.read
を使用します。format
の引数に読み込むデータのフォーマットを指定します。json
、parquet
、delta
などが指定できます。読み込んだデータはSparkデータフレームとなります。
その前に、読み込むデータを以下のコマンドで確認します。
%fs
ls dbfs:/databricks-datasets/samples/population-vs-price/
path | name | size | modificationTime |
---|---|---|---|
dbfs:/databricks-datasets/samples/population-vs-price/data_geo.csv | data_geo.csv | 10952 | 1596690598000 |
# データフレームにサンプルデータをロードします
df = spark.read.format("csv").option("header", True).load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
df.show(20)
+---------+-------------+----------+----------+------------------------+-----------------------+
|2014 rank| City| State|State Code|2014 Population estimate|2015 median sales price|
+---------+-------------+----------+----------+------------------------+-----------------------+
| 101| Birmingham| Alabama| AL| 212247| 162.9|
| 125| Huntsville| Alabama| AL| 188226| 157.7|
| 122| Mobile| Alabama| AL| 194675| 122.5|
| 114| Montgomery| Alabama| AL| 200481| 129|
| 64|Anchorage[19]| Alaska| AK| 301010| NULL|
| 78| Chandler| Arizona| AZ| 254276| NULL|
| 86| Gilbert[20]| Arizona| AZ| 239277| NULL|
| 88| Glendale| Arizona| AZ| 237517| NULL|
| 38| Mesa| Arizona| AZ| 464704| NULL|
| 148| Peoria| Arizona| AZ| 166934| NULL|
| 6| Phoenix| Arizona| AZ| 1537058| 206.1|
| 95| Scottsdale| Arizona| AZ| 230512| NULL|
| 215| Surprise| Arizona| AZ| 126275| NULL|
| 142| Tempe| Arizona| AZ| 172816| NULL|
| 33| Tucson| Arizona| AZ| 527972| 178.1|
| 119| Little Rock| Arkansas| AR| 197706| 131.8|
| 56| Anaheim|California| CA| 346997| 685.7|
| 261| Antioch|California| CA| 108930| NULL|
| 52| Bakersfield|California| CA| 368759| NULL|
| 227| Berkeley|California| CA| 118853| NULL|
+---------+-------------+----------+----------+------------------------+-----------------------+
only showing top 20 rows
# Databricksでデータフレームを表示するにはdisplay関数を使うと便利です
display(df)
カラムの確認
df.columns
['2014 rank',
'City',
'State',
'State Code',
'2014 Population estimate',
'2015 median sales price']
スキーマの確認
# データフレームのスキーマを表示
df.printSchema()
root
|-- 2014 rank: string (nullable = true)
|-- City: string (nullable = true)
|-- State: string (nullable = true)
|-- State Code: string (nullable = true)
|-- 2014 Population estimate: string (nullable = true)
|-- 2015 median sales price: string (nullable = true)
カラム名の変更
-
withColumnRenamed
を使ってカラム名を変更します。
df2 = df.withColumnRenamed('2014 rank', '2014_rank')\
.withColumnRenamed('State Code', 'state_code')\
.withColumnRenamed('2014 Population estimate', '2014_pop_estimate')\
.withColumnRenamed('2015 median sales price', '2015_median_sales_price')
display(df2)
データ型の変換
- 既に存在しているデータフレームのカラムを指定するには、
col
関数の引数にカラム名を指定します。 -
cast
にデータ型を指定してキャストします。 -
withColumn
を用いて、キャストした後の値を持つカラムで更新します。
Data Types - Spark 3.2.1 Documentation
df3 = df2.withColumn("2014_rank", col("2014_rank").cast(IntegerType()))\
.withColumn("2014_pop_estimate", col("2014_pop_estimate").cast(IntegerType()))\
.withColumn("2015_median_sales_price", col("2015_median_sales_price").cast(FloatType()))
display(df3)
データの操作
フィルタリング、ソート
以下の例では、df3
で2015_median_sales_price
が100より大きいレコードを2015_median_sales_price
の降順でソートし、カラム2014_rank
, City
, 2015_median_sales_price
を取得しています。
display(df3.select("2014_rank", "City", "2015_median_sales_price")\
.where("2015_median_sales_price > 100")\
.orderBy(col("2015_median_sales_price").desc()))
集計
以下の処理を行なっています。
-
state_code
でレコードをグルーピング - グループごとの
2015_median_sales_price
の平均値を計算 - 平均値降順でレコードを取得
pandasとのやりとり
matplotlibで可視化したいなどpandas前提の処理を行う場合には、Sparkデータフレームをpandasデータフレームに変換します。
df4 = df3.groupBy("state_code")\
.agg(avg("2015_median_sales_price").alias("2015_median_sales_price_avg"))\
.orderBy(col("2015_median_sales_price_avg").desc()).limit(10)
import matplotlib.pyplot as plt
# pandasデータフレームに変換します
pdf = df4.toPandas()
# 棒グラフを描画します
plt.bar(pdf['state_code'], pdf['2015_median_sales_price_avg'], align="center")
plt.show()
pandasデータフレームをSparkデータフレームに変換することもできます。
# Sparkデータフレームへの変換
sdf = spark.createDataFrame(pdf)
display(sdf)
その他のAPI
Spark SQL
データフレームをテーブルあるいは一時ビューに登録することで、SQLを使用してデータを操作することができるようになります。
テーブルは永続化されますが、一時ビューは永続化されず、クラスターが稼働している間のみ一時ビューを作成したセッションでのみ利用することができます。
# データフレームを一時ビューに登録します
df3.createOrReplaceTempView("pop_price")
# '2014_rank' カラムに基づいて上位10位の市を参照します
top_10_results = spark.sql("""SELECT * FROM pop_price
WHERE 2014_rank <= 10
SORT BY 2014_rank ASC""")
display(top_10_results)
2014_rank | City | State | state_code | 2014_pop_estimate | 2015_median_sales_price |
---|---|---|---|---|---|
1 | New York[6] | New York | NY | 8491079 | 388.6000061035156 |
2 | Los Angeles | California | CA | 3928864 | 434.70001220703125 |
3 | Chicago | Illinois | IL | 2722389 | 192.5 |
4 | Houston[7] | Texas | TX | 2239558 | 200.3000030517578 |
5 | Philadelphia[8] | Pennsylvania | PA | 1560297 | 204.89999389648438 |
6 | Phoenix | Arizona | AZ | 1537058 | 206.10000610351562 |
7 | San Antonio | Texas | TX | 1436697 | 184.6999969482422 |
8 | San Diego | California | CA | 1381069 | 510.29998779296875 |
9 | Dallas | Texas | TX | 1281047 | 192.5 |
10 | San Jose | California | CA | 1015785 | 900 |
こちらは上と同じ処理となります。
%sql
SELECT
*
FROM
pop_price
WHERE
2014_rank <= 10 SORT BY 2014_rank ASC
pandas API on Spark
pandas APIに慣れ親しんでいる方は、pandas API on Spark(旧Koalas)を活用することもできます。
Pandas API on Spark | Databricks Documentation
spark.conf.set("spark.sql.ansi.enabled", "false")
import pyspark.pandas as ps
psdf = sdf.pandas_api() # pandas-on-Sparkデータフレーム
# pandasのお作法でカラムにアクセスします
psdf['state_code']
0 HI
1 CA
2 DC
3 NJ
4 CO
5 WA
6 MA
7 CT
8 UT
9 NH
Name: state_code, dtype: object
3. ML Tutorial - Databricksにおける機械学習モデルの構築と管理
Databricks の scikit-learn
ライブラリを使用して機械学習分類モデルを構築する方法について説明します。
目標は、ワインが「高品質」と見なされるかどうかを予測する分類モデルを作成することです。 データセットは、さまざまなワインの 11 の特徴 (アルコール含有量、酸度、残留糖など) と、1 から 10 の品質ランキングで構成されています。
この例では、MLflow を使用してモデル開発プロセスを追跡します。
チュートリアル: Databricks で初めての機械学習モデルを構築する | Databricks Documentation
ライブラリのインストール
%pip install -U mlflow
%restart_python
モデルレジストリ、カタログ、スキーマの設定
モデルレジストリとしてUnity Catalogを使用するように、MLflowクライアントを構成する必要があります。
import mlflow
mlflow.set_registry_uri("databricks-uc")
また、モデルが登録されるカタログとスキーマも設定する必要があります。
# 必要に応じて、「workspace」と「default」を、必要な権限を持つカタログおよびスキーマに置き換えてください。
CATALOG_NAME = "workspace"
SCHEMA_NAME = "default"
EDA(探索的データ分析)
機械学習モデルを構築する際には、トレーニングに使用するデータの特徴や傾向を把握することが重要です。Databricksの機能やライブラリを活用して、いくつかの切り口でデータを可視化して、データへの理解を深めます。これが探索的データ分析(EDA: Exploratory Data Analysis)です。
import pandas as pd
# CSVファイルからデータを読み込み
white_wine = pd.read_csv("/dbfs/databricks-datasets/wine-quality/winequality-white.csv", sep=";")
red_wine = pd.read_csv("/dbfs/databricks-datasets/wine-quality/winequality-red.csv", sep=";")
# フラグを追加
red_wine['is_red'] = 1
white_wine['is_red'] = 0
# データフレームを結合
wine_data = pd.concat([red_wine, white_wine], axis=0)
データの中身を確認します。Databricksではdisplay
関数を用いることで、簡単にデータを可視化することができます。
display(wine_data)
目的変数のquality
のヒストグラムをプロットします。
import seaborn as sns
wine_data = wine_data.sort_values("quality")
sns.histplot(data=wine_data.quality)
データを読み込み、Unity Catalogテーブルを作成
この例では、databricks-datasets
で使用できる2つのCSVファイルを使用します。
以下のコードは、次のことを行います。
-
winequality-white.csv
とwinequality-red.csv
からデータを読み取り、Sparkデータフレームに読み込みます。先ほどはPandasデータフレームを使いましたが、Pandasデータフレームは直接テーブルに書き込めないため、ここではSparkデータフレームを使います。 - 列名のスペースをアンダースコアに置き換えて、データをクリーンアップします。
- データフレーム を Unity Catalogの
white_wine
テーブルとred_wine
テーブルに書き込みます。データを Unity Catalog に保存すると、データが保持され、他のユーザーと共有する方法を制御できます。
white_wine = spark.read.csv("/databricks-datasets/wine-quality/winequality-white.csv", sep=';', header=True)
red_wine = spark.read.csv("/databricks-datasets/wine-quality/winequality-red.csv", sep=';', header=True)
# 列名からスペースを削除
for c in white_wine.columns:
white_wine = white_wine.withColumnRenamed(c, c.replace(" ", "_"))
for c in red_wine.columns:
red_wine = red_wine.withColumnRenamed(c, c.replace(" ", "_"))
# テーブル名を定義
red_wine_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.red_wine"
white_wine_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.white_wine"
# Unity Catalogのテーブルに書き込み
spark.sql(f"DROP TABLE IF EXISTS {red_wine_table}")
spark.sql(f"DROP TABLE IF EXISTS {white_wine_table}")
white_wine.write.saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.white_wine")
red_wine.write.saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.red_wine")
テーブルを確認しましょう。以下のコマンドを実行して表示されるリンクをクリックしてください。
displayHTML(f"<a href='/explore/data/{CATALOG_NAME}/{SCHEMA_NAME}'>テーブルを確認</a>")
データの前処理と分割
このステップでは、前のステップで作成した Unity Catalog テーブルから Pandas データフレーム にデータを読み込み、データを前処理します。 このセクションのコードでは、次のことを行います。
- データを Pandas データフレームとしてロードします。
- 赤ワインと白ワインを区別するために各 にBoolean 列を追加し、2つのデータフレームを新しいデータフレーム
データフレーム データフレーム
data_df`に結合します。 - データセットには、ワインを 1 から 10 まで評価する
quality
列が含まれており、10 は最高品質を示します。 このコードは、この列を 2 つの分類値に変換します。"True" は高品質のワイン (quality >= 7
) を示し、"False" は高品質ではないワイン (quality < 7
) を示します。 - データフレーム を個別のトレーニング データセットとテスト データセットに分割します。
import numpy as np
import pandas as pd
import sklearn.datasets
import sklearn.metrics
import sklearn.model_selection
import sklearn.ensemble
import matplotlib.pyplot as plt
# Unity CatalogからデータをPandasデータフレームとして読み込む
white_wine = spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.white_wine").toPandas()
red_wine = spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.red_wine").toPandas()
# 赤ワイン・白ワインのブール型フィールドを追加
white_wine['is_red'] = 0.0
red_wine['is_red'] = 1.0
data_df = pd.concat([white_wine, red_wine], axis=0)
# ワインの品質に基づいて分類ラベルを定義
data_labels = data_df['quality'].astype('int') >= 7
data_df = data_df.drop(['quality'], axis=1)
# 80/20で学習用・テスト用に分割
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(
data_df,
data_labels,
test_size=0.2,
random_state=1
)
分類モデルのトレーニング
この手順では、デフォルト アルゴリズム設定を使用して勾配ブースティング分類器をトレーニングします。 次に、結果のモデルをテストデータセットに適用し、 ROC(Receiver Operating Characteristic)曲線のAUC(Area Under Curve)を計算、ログ、および表示して、モデルのパフォーマンスを評価します。
# MLflow の自動ログ記録を有効にします
mlflow.autolog()
モデルトレーニングを実行します。
white_dataset = mlflow.data.load_delta(table_name=f"{CATALOG_NAME}.{SCHEMA_NAME}.white_wine", version="0")
red_dataset = mlflow.data.load_delta(table_name=f"{CATALOG_NAME}.{SCHEMA_NAME}.red_wine", version="0")
with mlflow.start_run(run_name='gradient_boost') as run:
model = sklearn.ensemble.GradientBoostingClassifier(random_state=0)
# モデル、パラメータ、学習メトリクスは自動でトラッキングされます
model.fit(X_train, y_train)
predicted_probs = model.predict_proba(X_test)
roc_auc = sklearn.metrics.roc_auc_score(y_test, predicted_probs[:,1])
roc_curve = sklearn.metrics.RocCurveDisplay.from_estimator(model, X_test, y_test)
# ROC曲線のプロットをファイルに保存
roc_curve.figure_.savefig("roc_curve.png")
# テストデータのAUCスコアは自動で記録されないため、手動で記録
mlflow.log_metric("test_auc", roc_auc)
# ROC曲線画像ファイルをアーティファクトとして記録
mlflow.log_artifact("roc_curve.png")
# トレーニングデータも記録します
mlflow.log_input(white_dataset, context="training")
mlflow.log_input(red_dataset, context="training")
print("テストAUC: {}".format(roc_auc))
テストAUC: 0.8834365701533531
MLflowでのエクスペリメントランの参照
MLflowのエクスペリメントトラッキングは、モデルを反復的に開発するときにコードと結果をログに記録することで、モデル開発を追跡するのに役立ちます。
上のセルの出力に表示されているView Logged Modelのリンクをクリックします。
モデルをUnity Catalogに登録
ここまでは試行錯誤を通じた実験段階です。モデルが目標精度、性能を達成した場合には本番運用に進むことになります。DatabricksのUnity Catalogは本番運用におけるモデル管理の機能であるモデルレジストリを提供しています。
また、上で記録したモデルにはGUIだけではなく、Pythonからもアクセスできるのでモデルの運用を自動化することも可能です。
# テストAUCでランをソートします。同点の場合は最新のランを使用します。
best_run = mlflow.search_runs(
order_by=['metrics.test_auc DESC', 'start_time DESC'],
max_results=10,
).iloc[0]
print('ベストラン')
print('AUC: {}'.format(best_run["metrics.test_auc"]))
print('推定器数: {}'.format(best_run["params.n_estimators"]))
print('最大深さ: {}'.format(best_run["params.max_depth"]))
print('学習率: {}'.format(best_run["params.learning_rate"]))
ベストラン
AUC: 0.8834365701533531
推定器数: 100
最大深さ: 3
学習率: 0.1
Pythonを用いて上の分類モデルをUnity Catalogに記録します。
model_uri = 'runs:/{run_id}/model'.format(
run_id=best_run.run_id
)
mlflow.register_model(model_uri, f"{CATALOG_NAME}.{SCHEMA_NAME}.wine_quality_model")
上のリンクをクリックして、依存関係のタブをクリックしてみましょう。
このあとはテストを経て本番環境へのデプロイに進むことになります。興味のある方はMosaic AI Model Serving を使用したモデルのデプロイをご覧ください。