3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Databricksにおけるエンドツーエンドのデータパイプラインの構築

Last updated at Posted at 2023-11-24

こちらの記事に触発されました。

Databricksのドキュメントにはもう一つのサンプルがあります。こちらをウォークスルーします。

ステップ1:クラスターを作成する

今回はMLは行わないので、Photon有効化クラスターにします。
Screenshot 2023-11-24 at 15.48.22.png

ステップ2:ソースデータを探索する

こちらを参考に探索してみます。

%fsDBFSにアクセスするためのマジックコマンドです。

%fs ls "/databricks-datasets/songs/data-001"

ファイルが一覧されます。
Screenshot 2023-11-24 at 15.54.25.png

README.mdを確認します。

%fs head --maxBytes=10000 "/databricks-datasets/songs/README.md"
Sample of Million Song Dataset
===============================

## Source
This data is a small subset of the [Million Song Dataset](http://labrosa.ee.columbia.edu/millionsong/).
The original data was contributed by The Echo Nest.
Prepared by T. Bertin-Mahieux <tb2332 '@' columbia.edu>

## Attribute Information
- artist_id:string
- artist_latitude:double
- artist_longitude:double
- artist_location:string
- artist_name:string
- duration:double
- end_of_fade_in:double
- key:int

個々のファイルの中身を確認します。

%fs head --maxBytes=10000 "/databricks-datasets/songs/data-001/part-00000"
[Truncated to first 10000 bytes]
AR81V6H1187FB48872	nan	nan		Earl Sixteen	213.7073	0.0	11	0.419	-12.106	Soldier of Jah Army	nan	SOVNZSZ12AB018A9B8	208.289	125.882	1	0.0	Rastaman	2003	--
ARVVZQP11E2835DBCB	nan	nan		Wavves	133.25016	0.0	0	0.282	0.596	Wavvves	0.471578247701	SOJTQHQ12A8C143C5F	128.116	89.519	1	0.0	I Want To See You (And Go To The Movies)	2009	--
ARFG9M11187FB3BBCB	nan	nan	Nashua USA	C-Side	247.32689	0.0	9	0.612	-4.896	Santa Festival Compilation 2008 vol.1	nan	SOAJSQL12AB0180501	242.196	171.278	5	1.0	Loose on the Dancefloor	0	225261
ARK4Z2O1187FB45FF0	nan	nan		Harvest	337.05751	0.247	4	0.46	-9.092	Underground Community	0.0	SOTDRVW12AB018BEB9	327.436	84.986	4	0.673	No Return	0	101619

タブ区切りのCSVなので、Sparkでロードします。

Python
df = spark.read.format('csv').option("sep", "\t").load('dbfs:/databricks-datasets/songs/data-001/part-00000')
df.display()

Screenshot 2023-11-24 at 15.56.38.png

これは音楽に関する情報が含まれているMillion Songsデータセットです。

ステップ3:生データを取り込む

これはETLにおけるExtract(抽出)処理になります。生データを読み込み、メダリオンアーキテクチャにおけるブロンズテーブルbronze_tableとして保持します。

Python
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField

# 以下のコードで使用される変数の定義
file_path = "/databricks-datasets/songs/data-001/"
table_name = "takaakiyayoi_catalog.end_to_end_etl.bronze_table" # 適宜変更してください
checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"

schema = StructType(
  [
    StructField("artist_id", StringType(), True),
    StructField("artist_lat", DoubleType(), True),
    StructField("artist_long", DoubleType(), True),
    StructField("artist_location", StringType(), True),
    StructField("artist_name", StringType(), True),
    StructField("duration", DoubleType(), True),
    StructField("end_of_fade_in", DoubleType(), True),
    StructField("key", IntegerType(), True),
    StructField("key_confidence", DoubleType(), True),
    StructField("loudness", DoubleType(), True),
    StructField("release", StringType(), True),
    StructField("song_hotnes", DoubleType(), True),
    StructField("song_id", StringType(), True),
    StructField("start_of_fade_out", DoubleType(), True),
    StructField("tempo", DoubleType(), True),
    StructField("time_signature", DoubleType(), True),
    StructField("time_signature_confidence", DoubleType(), True),
    StructField("title", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("partial_sequence", IntegerType(), True)
  ]
)

(spark.readStream
  .format("cloudFiles")
  .schema(schema)
  .option("cloudFiles.format", "csv")
  .option("sep","\t")
  .load(file_path)
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name)
)

処理が完了すると、カタログエクスプローラでテーブルを確認することができます。
Screenshot 2023-11-24 at 16.06.01.png

ステップ4:生データを準備する

最終的な利用形態を想定した場合、不要な列がある場合にはこれを除外して利便性を高めます。また、管理面からデータの処理時刻の列を追加します。これがETLにおけるTransform(変換)処理となります。

すでにテーブルとしてデータが保持されているので、SQLを用いてシルバーテーブルを作成することができます。ここまではPythonを実行してきていますが、マジックコマンド%sqlを用いることで、そのセルのみSQLを実行することができます。ここではsilver_tableというシルバーテーブルを作成します。もちろん、要件に応じてより複雑な変換処理を実装することも可能です。

SQL
%sql
CREATE OR REPLACE TABLE
  takaakiyayoi_catalog.end_to_end_etl.silver_table (
    artist_id STRING,
    artist_name STRING,
    duration DOUBLE,
    release STRING,
    tempo DOUBLE,
    time_signature DOUBLE,
    title STRING,
    year DOUBLE,
    processed_time TIMESTAMP
  );

INSERT INTO
  takaakiyayoi_catalog.end_to_end_etl.silver_table
SELECT
  artist_id,
  artist_name,
  duration,
  release,
  tempo,
  time_signature,
  title,
  year,
  current_timestamp()
FROM
  takaakiyayoi_catalog.end_to_end_etl.bronze_table

Screenshot 2023-11-24 at 16.11.34.png

こちらも処理が完了すると、カタログエクスプローラでテーブルを確認できます。
Screenshot 2023-11-24 at 16.12.23.png

ステップ5:変換されたデータをクエリーする

SQL
%sql
-- どのアーティストが各年で最も曲をリリースしたのか?
SELECT
  artist_name,
  count(artist_name)
AS
  num_songs,
  year
FROM
  takaakiyayoi_catalog.end_to_end_etl.silver_table
WHERE
  year > 0
GROUP BY
  artist_name,
  year
ORDER BY
  num_songs DESC,
  year DESC

一目瞭然です。
Screenshot 2023-11-24 at 16.13.59.png

ステップ6:パイプラインを実行するDatabricksジョブを作成する

これと、ステップ7:データパイプラインジョブをスケジュールするはジョブ作成の話なので割愛します。なお、ステップ3ではSparkのストリーミング処理を使っているので、ソースデータが追加された際に新規データのみを処理できるというメリットを享受することもできます。今回は静的なデータなので嬉しさが感じられないのですが。

追加ステップ: ゴールドテーブルへのロード

また、このサンプルにはETLのLoad(ロード)処理が含まれていませんが、例えば上記クエリーの結果をゴールドテーブルとして書き込むことでロード処理とすることが可能です。後段の例えばダッシュボードではそのゴールドテーブルを参照して可視化を行うことができます。

SQL
%sql
CREATE OR REPLACE TABLE takaakiyayoi_catalog.end_to_end_etl.gold_table AS
-- どのアーティストが各年で最も曲をリリースしたのか?
SELECT
  artist_name,
  count(artist_name)
AS
  num_songs,
  year
FROM
  takaakiyayoi_catalog.end_to_end_etl.silver_table
WHERE
  year > 0
GROUP BY
  artist_name,
  year
ORDER BY
  num_songs DESC,
  year DESC

Screenshot 2023-11-24 at 16.19.06.png

最終的な処理結果にテーブルとしてアクセスできるので、可視化も簡単に行えます。これはLakeviewを使っているので、ノーコードで作成したダッシュボードです。
Screenshot 2023-11-24 at 16.25.08.png

これで、生データから変換処理、最終テーブルへのロード、可視化という、データエンジニアリングにおける基本的なパスを体験することができました。ぜひお試しください!

Databricksクイックスタートガイド

Databricksクイックスタートガイド

Databricks無料トライアル

Databricks無料トライアル

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?