こちらの記事に触発されました。
Databricksのドキュメントにはもう一つのサンプルがあります。こちらをウォークスルーします。
ステップ1:クラスターを作成する
今回はMLは行わないので、Photon有効化クラスターにします。
ステップ2:ソースデータを探索する
こちらを参考に探索してみます。
%fs
はDBFSにアクセスするためのマジックコマンドです。
%fs ls "/databricks-datasets/songs/data-001"
README.md
を確認します。
%fs head --maxBytes=10000 "/databricks-datasets/songs/README.md"
Sample of Million Song Dataset
===============================
## Source
This data is a small subset of the [Million Song Dataset](http://labrosa.ee.columbia.edu/millionsong/).
The original data was contributed by The Echo Nest.
Prepared by T. Bertin-Mahieux <tb2332 '@' columbia.edu>
## Attribute Information
- artist_id:string
- artist_latitude:double
- artist_longitude:double
- artist_location:string
- artist_name:string
- duration:double
- end_of_fade_in:double
- key:int
個々のファイルの中身を確認します。
%fs head --maxBytes=10000 "/databricks-datasets/songs/data-001/part-00000"
[Truncated to first 10000 bytes]
AR81V6H1187FB48872 nan nan Earl Sixteen 213.7073 0.0 11 0.419 -12.106 Soldier of Jah Army nan SOVNZSZ12AB018A9B8 208.289 125.882 1 0.0 Rastaman 2003 --
ARVVZQP11E2835DBCB nan nan Wavves 133.25016 0.0 0 0.282 0.596 Wavvves 0.471578247701 SOJTQHQ12A8C143C5F 128.116 89.519 1 0.0 I Want To See You (And Go To The Movies) 2009 --
ARFG9M11187FB3BBCB nan nan Nashua USA C-Side 247.32689 0.0 9 0.612 -4.896 Santa Festival Compilation 2008 vol.1 nan SOAJSQL12AB0180501 242.196 171.278 5 1.0 Loose on the Dancefloor 0 225261
ARK4Z2O1187FB45FF0 nan nan Harvest 337.05751 0.247 4 0.46 -9.092 Underground Community 0.0 SOTDRVW12AB018BEB9 327.436 84.986 4 0.673 No Return 0 101619
タブ区切りのCSVなので、Sparkでロードします。
df = spark.read.format('csv').option("sep", "\t").load('dbfs:/databricks-datasets/songs/data-001/part-00000')
df.display()
これは音楽に関する情報が含まれているMillion Songsデータセットです。
ステップ3:生データを取り込む
これはETLにおけるExtract(抽出)処理になります。生データを読み込み、メダリオンアーキテクチャにおけるブロンズテーブルbronze_table
として保持します。
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
# 以下のコードで使用される変数の定義
file_path = "/databricks-datasets/songs/data-001/"
table_name = "takaakiyayoi_catalog.end_to_end_etl.bronze_table" # 適宜変更してください
checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"
schema = StructType(
[
StructField("artist_id", StringType(), True),
StructField("artist_lat", DoubleType(), True),
StructField("artist_long", DoubleType(), True),
StructField("artist_location", StringType(), True),
StructField("artist_name", StringType(), True),
StructField("duration", DoubleType(), True),
StructField("end_of_fade_in", DoubleType(), True),
StructField("key", IntegerType(), True),
StructField("key_confidence", DoubleType(), True),
StructField("loudness", DoubleType(), True),
StructField("release", StringType(), True),
StructField("song_hotnes", DoubleType(), True),
StructField("song_id", StringType(), True),
StructField("start_of_fade_out", DoubleType(), True),
StructField("tempo", DoubleType(), True),
StructField("time_signature", DoubleType(), True),
StructField("time_signature_confidence", DoubleType(), True),
StructField("title", StringType(), True),
StructField("year", IntegerType(), True),
StructField("partial_sequence", IntegerType(), True)
]
)
(spark.readStream
.format("cloudFiles")
.schema(schema)
.option("cloudFiles.format", "csv")
.option("sep","\t")
.load(file_path)
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name)
)
処理が完了すると、カタログエクスプローラでテーブルを確認することができます。
ステップ4:生データを準備する
最終的な利用形態を想定した場合、不要な列がある場合にはこれを除外して利便性を高めます。また、管理面からデータの処理時刻の列を追加します。これがETLにおけるTransform(変換)処理となります。
すでにテーブルとしてデータが保持されているので、SQLを用いてシルバーテーブルを作成することができます。ここまではPythonを実行してきていますが、マジックコマンド%sql
を用いることで、そのセルのみSQLを実行することができます。ここではsilver_table
というシルバーテーブルを作成します。もちろん、要件に応じてより複雑な変換処理を実装することも可能です。
%sql
CREATE OR REPLACE TABLE
takaakiyayoi_catalog.end_to_end_etl.silver_table (
artist_id STRING,
artist_name STRING,
duration DOUBLE,
release STRING,
tempo DOUBLE,
time_signature DOUBLE,
title STRING,
year DOUBLE,
processed_time TIMESTAMP
);
INSERT INTO
takaakiyayoi_catalog.end_to_end_etl.silver_table
SELECT
artist_id,
artist_name,
duration,
release,
tempo,
time_signature,
title,
year,
current_timestamp()
FROM
takaakiyayoi_catalog.end_to_end_etl.bronze_table
こちらも処理が完了すると、カタログエクスプローラでテーブルを確認できます。
ステップ5:変換されたデータをクエリーする
%sql
-- どのアーティストが各年で最も曲をリリースしたのか?
SELECT
artist_name,
count(artist_name)
AS
num_songs,
year
FROM
takaakiyayoi_catalog.end_to_end_etl.silver_table
WHERE
year > 0
GROUP BY
artist_name,
year
ORDER BY
num_songs DESC,
year DESC
ステップ6:パイプラインを実行するDatabricksジョブを作成する
これと、ステップ7:データパイプラインジョブをスケジュールするはジョブ作成の話なので割愛します。なお、ステップ3ではSparkのストリーミング処理を使っているので、ソースデータが追加された際に新規データのみを処理できるというメリットを享受することもできます。今回は静的なデータなので嬉しさが感じられないのですが。
追加ステップ: ゴールドテーブルへのロード
また、このサンプルにはETLのLoad(ロード)処理が含まれていませんが、例えば上記クエリーの結果をゴールドテーブルとして書き込むことでロード処理とすることが可能です。後段の例えばダッシュボードではそのゴールドテーブルを参照して可視化を行うことができます。
%sql
CREATE OR REPLACE TABLE takaakiyayoi_catalog.end_to_end_etl.gold_table AS
-- どのアーティストが各年で最も曲をリリースしたのか?
SELECT
artist_name,
count(artist_name)
AS
num_songs,
year
FROM
takaakiyayoi_catalog.end_to_end_etl.silver_table
WHERE
year > 0
GROUP BY
artist_name,
year
ORDER BY
num_songs DESC,
year DESC
最終的な処理結果にテーブルとしてアクセスできるので、可視化も簡単に行えます。これはLakeviewを使っているので、ノーコードで作成したダッシュボードです。
これで、生データから変換処理、最終テーブルへのロード、可視化という、データエンジニアリングにおける基本的なパスを体験することができました。ぜひお試しください!