5
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Databricks(Delta lake)のデータ入出力の実装パターン - cheatsheet

Last updated at Posted at 2022-01-20

この記事の内容はData+AI World Tour 2021内のテクニカルセッション「圧倒的に簡単なデータパイプラインの作り方」で実施した内容がベースになっています。

この記事で扱うコード・サンプルNotebook

CSVの読み込み

CSV File => /databricks-datasets/lending-club-loan-stats/*.csv

# データフレームにCSVを読み込む (スキーマは推定する)
df = (
  spark.read.format('csv')
  .option('Header', True)
  .option('inferSchema', True)
  .load('/databricks-datasets/lending-club-loan-stats/*.csv') 
)

# 既存のDeltaに追記
df.write.format('delta').mode('append').save('/tmp/daiwt2021/loan_stats.delta')

# 書き込んだDeltaテーブルの確認
df_delta = spark.read.format('delta').load('/tmp/daiwt2021/loan_stats.delta')
display(df_delta)

ETL・データ整形


# 簡単な処理を行い、結果を保存
from pyspark.sql.functions import col, expr

df_raw = spark.read.format('delta').load('/tmp/daiwt2021/loan_stats.delta')

(
  df_raw
  .select('loan_amnt', # 必要なカラムの抽出
            'term',
            'int_rate',
            'grade',
            'addr_state',
            'emp_title',
            'home_ownership',
            'annual_inc',
            'loan_status')
  .withColumn('int_rate', expr('cast(replace(int_rate,"%","") as float)')) # データ型の変換
  .withColumnRenamed('addr_state', 'state') # カラム名変更
  .write
  .format('delta') # Deltaで保存(silverテーブル)
  .mode('overwrite')
  .save('/tmp/daiwt2021/loan_stat_silver.delta')
)

display( spark.read.format('delta').load('/tmp/daiwt2021/loan_stat_silver.delta') )

SQLも使えます・可視化もそのままできます

%sql

DROP DATABASE IF EXISTS daiwt2021_kitamura CASCADE;
CREATE DATABASE daiwt2021_kitamura;
USE daiwt2021_kitamura;

CREATE TABLE loan_stat_silver
USING delta
LOCATION '/tmp/daiwt2021/loan_stat_silver.delta';

SELECT state, loan_status, count(*) as counts 
FROM loan_stat_silver
GROUP BY state, loan_status
ORDER BY counts DESC

SQLの結果をPythonで受け取り、サマリテーブルを作成する

df_gold = spark.sql('''

SELECT state, loan_status, count(*) as counts 
FROM loan_stat_silver
GROUP BY state, loan_status
ORDER BY counts DESC

''')

# Deltaで保存(goldテーブル)
df_gold.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/loan_stat_gold.delta')

# goldテーブルをHiveへ登録
spark.sql('''
  CREATE TABLE IF NOT EXISTS loan_stat_gold
  USING delta
  LOCATION '/tmp/daiwt2021/loan_stat_gold.delta'
''')

DeltaテーブルをBIから参照する

BIツールから接続するDeltaテーブルのJDBC/ODBCのエンドポイント

Deltaテーブルの内容をDWH/RDBMSに書き出す

mysql_url = f'jdbc:mysql://{hostname}:{port}/{database}?user={username}&password={password}'

df_gold = spark.read.format('delta').load('/tmp/daiwt2021/loan_stat_gold.delta')

(
  df_gold.write.format("jdbc")
  .option("url", mysql_url)
  .option("dbtable", "test")
  .mode("overwrite")
  .save()
)

Deltaテーブルの内容をファイルに書き出す

df_gold = spark.read.format('delta').load('/tmp/daiwt2021/loan_stat_gold.delta')

df_gold.write.format('csv'     ).save('/tmp/daiwt2021/loan_stat_gold.csv')
df_gold.write.format('json'    ).save('/tmp/daiwt2021/loan_stat_gold.json')
df_gold.write.format('parquet' ).save('/tmp/daiwt2021/loan_stat_gold.parquet')
df_gold.write.format('avro'    ).save('/tmp/daiwt2021/loan_stat_gold.avro')

今までのETLのコードをまとめる

以下のコードでシンプルにETL処理が可能です。

### 1. Rawテーブル (CSV => bornze)
df_raw = (
  spark.read.format('csv')
  .option('Header', True)
  .option('inferSchema', True)
  .load('/databricks-datasets/lending-club-loan-stats/LoanStats_2018Q2.csv') 
)
df_raw.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/loan_stats.delta')
sql("CREATE TABLE loan_stat_raw USING delta LOCATION '/tmp/daiwt2021/loan_stat.delta';")


### 2. データ整形 (bronze => silver)
from pyspark.sql.functions import col, expr

df_silver = (
  spark.read.format('delta').load('/tmp/daiwt2021/loan_stats.delta')
  .select('loan_amnt', 'term', 'int_rate', 'grade', 'addr_state', 'emp_title', 'home_ownership', 'annual_inc', 'loan_status')
  .withColumn('int_rate', expr('cast(replace(int_rate,"%","") as float)'))
  .withColumnRenamed('addr_state', 'state')
)
df_silver.write.format('delta').mode('overwrite').save( '/tmp/daiwt2021/loan_stat_silver.delta')
sql("CREATE TABLE loan_stat_silver USING delta LOCATION '/tmp/daiwt2021/loan_stat_silver.delta';"")


### 3. サマリテーブル (silver => gold)
df_gold = spark.sql('''
  SELECT state, loan_status, count(*) as counts 
  FROM loan_stat_silver
  GROUP BY state, loan_status
  ORDER BY counts DESC
''')

df_gold.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/loan_stat_gold.delta')
sql("CREATE TABLE IF NOT EXISTS loan_stat_gold USING delta LOCATION '/tmp/daiwt2021/loan_stat_gold.delta'")

JSONを読み込む

JSON File => /mnt/training/ecommerce/events/events-500k.json

df_json = (
  spark.read.format('json')
  .option('inferSchema', True)
  .load('/mnt/training/ecommerce/events/events-500k.json')
)

# deltaへの書き込み
df_json.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/events.delta')

# 確認
display( spark.read.format('delta').load('/tmp/daiwt2021/events.delta') )

LOGデータを読み込む

任意のテキストフォーマットでも対応可能です。

access.log(Webサーバーアクセスログ) => s3://databricks-ktmr-s3/var/log/access.log.*.gz

13.66.139.0 - - [19/Dec/2020:13:57:26 +0100] "GET /index.php?option=com_phocagallery&view=category&id=1:almhuette-raith&Itemid=53 HTTP/1.1" 200 32653 "-" "Mozilla/5.0 (compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm)" "-"
157.48.153.185 - - [19/Dec/2020:14:08:06 +0100] "GET /apache-log/access.log HTTP/1.1" 200 233 "-" "Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36" "-"
157.48.153.185 - - [19/Dec/2020:14:08:08 +0100] "GET /favicon.ico HTTP/1.1" 404 217 "http://www.almhuette-raith.at/apache-log/access.log" "Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36" "-"
...

from pyspark.sql.functions import split, regexp_extract, col, to_timestamp

raw_df = spark.read.text('s3://databricks-ktmr-s3/var/log/access.log.*.gz')

# Regexでログデータをパース
split_df = (
    raw_df.select(
      regexp_extract('value', r'^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', 1).alias('src_ip'),
      regexp_extract('value', r'\[(.+?)\]', 1).alias('time_string'),
      regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ "(.+) (.+) (HTTP.+?)"', 1).alias('method'),
      regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ "(.+) (.+) (HTTP.+?)"', 2).alias('path'),
      regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ "(.+) (.+) (HTTP.+?)"', 3).alias('version'),
      regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ ".+HTTP.+?" (\d+) (\d+)', 1).cast('int').alias('status_code'),
      regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ ".+HTTP.+?" (\d+) (\d+)', 2).cast('int').alias('content_size'),
      regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ ".+HTTP.+?" (\d+) (\d+) "(.+?)" "(.+?)" "(.+?)"', 3).alias('host2'),
      regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ ".+HTTP.+?" (\d+) (\d+) "(.+?)" "(.+?)" "(.+?)"', 4).alias('user_agent')
    )
  .withColumn( 'timestamp', to_timestamp(  col('time_string'), 'dd/MMM/yyyy:HH:mm:ss Z') )
  .drop('time_string')
  .filter( col('timestamp').isNotNull() ) 
)

# Deltaに書き出す
split_df.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/access_log.delta')

# 確認
display( spark.read.format('delta').load('/tmp/daiwt2021/access_log.delta') )

画像ファイル(バイナリファイル)を読み込む

Image Files => /databricks-datasets/cctvVideos/train_images/label=0/*.jpg

image_df = (
  spark.read.format('binaryFile')
  .option('mimeType', 'image/*')
  .load('/databricks-datasets/cctvVideos/train_images/label=0/*.jpg')
)

spark.conf.set("spark.sql.parquet.compression.codec", "uncompressed")
image_df.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/train_image.delta')
spark.conf.set("spark.sql.parquet.compression.codec", "snappy")

# 確認
display( spark.read.format('delta').load('/tmp/daiwt2021/train_image.delta') ) 

RDBMS/DWHから読み込む

jdbcHostname = "example.databricks.training"
jdbcPort = 5432
jdbcDatabase = "training"
jdbcUrl = f"jdbc:postgresql://{jdbcHostname}:{jdbcPort}/{jdbcDatabase}"


query = '''
(
  SELECT * FROM training.people_1m  
  WHERE salary > 100000
) emp_alias
'''

df = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProps)
df.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/salary.delta')

# 確認
display( spark.read.format('delta').load('/tmp/daiwt2021/salary.delta') )

ストリーミングを読み込む

from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col
import re

# JSONデータのスキーマ定義
schema = StructType([
  StructField("channel", StringType(), True),
  StructField("comment", StringType(), True),
  StructField("delta", IntegerType(), True),
  StructField("flag", StringType(), True),
  StructField("geocoding", StructType([ StructField("city", StringType(), True), StructField("country", StringType(), True), StructField("countryCode2", StringType(), True), StructField("countryCode3", StringType(), True), StructField("stateProvince", StringType(), True), StructField("latitude", DoubleType(), True), StructField("longitude", DoubleType(), True), ]), True),
  StructField("isAnonymous", BooleanType(), True),
  StructField("isNewPage", BooleanType(), True),
  StructField("isRobot", BooleanType(), True),
  StructField("isUnpatrolled", BooleanType(), True),
  StructField("namespace", StringType(), True),         
  StructField("page", StringType(), True),              
  StructField("pageURL", StringType(), True),           
  StructField("timestamp", StringType(), True),        
  StructField("url", StringType(), True),
  StructField("user", StringType(), True),              
  StructField("userURL", StringType(), True),
  StructField("wikipediaURL", StringType(), True),
  StructField("wikipedia", StringType(), True),
])

# 読み込み
stream_df = (
  spark.readStream.format('kafka') # Kafkaをソースと指定
  .option('kafka.bootstrap.servers', 'server123.databricks.training:9092')
  .option('subscribe', 'en')
  .load()
)

# ELTをして、Deltaに書き込む
(
  stream_df
  .withColumn('json', from_json(col('value').cast('string'), schema))   # Kafkaのバイナリデータを文字列に変換し、from_json()でJSONをパース
  .select(col("json.*"))                    # JSONの子要素だけを取り出す
  .writeStream                              # writeStream()でストリームを書き出す
  .format('delta')                          # Deltaとして保存
  .option('checkpointLocation', '/tmp/daiwt2021/stream.checkpoint') # チェックポイント保存先を指定
  .outputMode('append')                     # マイクロバッチの結果をAppendで追加
  .start('/tmp/daiwt2021/stream.delta')   # start()でストリーム処理を開始 (アクション)
)

# 確認
df = spark.readStream.format('delta').load('/tmp/daiwt2021/stream.delta')
display( df )

随時更新追加されるファイルをストリーミングとして扱う

ファイルのディレクトリ => s3a://databricks-ktmr-s3/stocknet-dataset/tweet/raw/AAPL/*

  • オブジェクトストレージ上にファイルが随時アップロードされるパターンもよくあります。
  • Databricksでは、ストレージに新たな追加されたファイルを認識して、そのファイルのみ読み込むことができます。
  • この場合、ストレージをストリーミングのソースとして利用することになります。

# (スキーマを楽して取得する)
df=spark.read.format('json').load('s3a://databricks-ktmr-s3/stocknet-dataset/tweet/raw/AAPL/2014-01-01')
tweet_schema = df.schema

Option1) 継続的に随時読み込む

df_autoloader = (
  spark.readStream.format('cloudFiles')
  .option('cloudFiles.format', 'json')
  .option('cloudFiles.maxBytesPerTrigger', '50KB') # 一度に読むサイズ上限
  .schema(tweet_schema)
  .load('s3a://databricks-ktmr-s3/stocknet-dataset/tweet/raw/AAPL/*')
)

(
  df_autoloader.writeStream.format('delta')
  .option('checkpointLocation', '/tmp/daiwt2021/tweet.checkpoint')
  .option('maxFilesPerTrigger', 25) # 一度に読むファイル数上限
  .outputMode('append')
  .trigger(processingTime='2 seconds') # 2秒に一度処理
  #.trigger(once=True) # 一度だけ処理
  .start('/tmp/daiwt2021/tweet.delta')
  #.awaitTermination() # async => sync
)

Option2) 一度実行のみ、その際に新規ファイルがあった場合に読み込む

df_autoloader = (
  spark.readStream.format('cloudFiles')
  .option('cloudFiles.format', 'json')
  #.option('cloudFiles.maxBytesPerTrigger', '50KB') # 一度に読むサイズ上限 <== Trigger Onceの時には無視される!
  .schema(tweet_schema)
  .load('s3a://databricks-ktmr-s3/stocknet-dataset/tweet/raw/AAPL/*')
)

(
  df_autoloader.writeStream.format('delta')
  .option('checkpointLocation', '/tmp/daiwt2021/tweet.checkpoint')
  #.option('maxFilesPerTrigger', 25) # 一度に読むファイル数上限  <== Trigger Onceの時には無視される!
  .outputMode('append')
  .trigger(once=True) # 一度だけ処理
  .start('/tmp/daiwt2021/tweet.delta')
  .awaitTermination() # async => sync  <== Trigger Onceの実行が終わるまでブロックさせる
)
# 確認
spark.read.format('delta').load('/tmp/daiwt2021/tweet.delta').count()

Job・自動実行・パイプライン化

Databricksではデータパイプラインの管理・定期実行するための機能があります(Jobs)。
以下のコードをJob化してみましょう。

### 1. ストレージから更新ファイルだけを認識して、Deltaテーブルに追記する
df_autoloader = (
  spark.readStream.format('cloudFiles')
  .option('cloudFiles.format', 'json')
  .schema(tweet_schema)
  .load('s3a://databricks-ktmr-s3/stocknet-dataset/tweet/raw/AAPL/*')
)

(
  df_autoloader.writeStream.format('delta')
  .option('checkpointLocation', '/tmp/daiwt2021/job/tweet.checkpoint')
  .outputMode('append')
  .trigger(once=True) # 一度だけ処理
  .start('/tmp/daiwt2021/job/tweet.delta')
  .awaitTermination() # async => sync
)
### 2. 上記のDeltaテーブルからサマリのDeltaテーブルを作る

df=spark.read.format('delta').load('/tmp/daiwt2021/job/tweet.delta')

(
  df.groupBy('lang').count()
  .write.format('delta').mode('overwrite').save('/tmp/daiwt2021/job/tweet_summary.delta')
)

sql("CREATE TABLE IF NOT EXISTS tweet_summary USING delta LOCATION '/tmp/daiwt2021/job/tweet_summary.delta'")

# 確認
display(
  spark.read.format('delta').load('/tmp/daiwt2021/job/tweet_summary.delta')
)
5
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?