この記事の内容はData+AI World Tour 2021内のテクニカルセッション「圧倒的に簡単なデータパイプラインの作り方」で実施した内容がベースになっています。
CSVの読み込み
CSV File => /databricks-datasets/lending-club-loan-stats/*.csv
# データフレームにCSVを読み込む (スキーマは推定する)
df = (
spark.read.format('csv')
.option('Header', True)
.option('inferSchema', True)
.load('/databricks-datasets/lending-club-loan-stats/*.csv')
)
# 既存のDeltaに追記
df.write.format('delta').mode('append').save('/tmp/daiwt2021/loan_stats.delta')
# 書き込んだDeltaテーブルの確認
df_delta = spark.read.format('delta').load('/tmp/daiwt2021/loan_stats.delta')
display(df_delta)
ETL・データ整形
# 簡単な処理を行い、結果を保存
from pyspark.sql.functions import col, expr
df_raw = spark.read.format('delta').load('/tmp/daiwt2021/loan_stats.delta')
(
df_raw
.select('loan_amnt', # 必要なカラムの抽出
'term',
'int_rate',
'grade',
'addr_state',
'emp_title',
'home_ownership',
'annual_inc',
'loan_status')
.withColumn('int_rate', expr('cast(replace(int_rate,"%","") as float)')) # データ型の変換
.withColumnRenamed('addr_state', 'state') # カラム名変更
.write
.format('delta') # Deltaで保存(silverテーブル)
.mode('overwrite')
.save('/tmp/daiwt2021/loan_stat_silver.delta')
)
display( spark.read.format('delta').load('/tmp/daiwt2021/loan_stat_silver.delta') )
SQLも使えます・可視化もそのままできます
%sql
DROP DATABASE IF EXISTS daiwt2021_kitamura CASCADE;
CREATE DATABASE daiwt2021_kitamura;
USE daiwt2021_kitamura;
CREATE TABLE loan_stat_silver
USING delta
LOCATION '/tmp/daiwt2021/loan_stat_silver.delta';
SELECT state, loan_status, count(*) as counts
FROM loan_stat_silver
GROUP BY state, loan_status
ORDER BY counts DESC
SQLの結果をPythonで受け取り、サマリテーブルを作成する
df_gold = spark.sql('''
SELECT state, loan_status, count(*) as counts
FROM loan_stat_silver
GROUP BY state, loan_status
ORDER BY counts DESC
''')
# Deltaで保存(goldテーブル)
df_gold.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/loan_stat_gold.delta')
# goldテーブルをHiveへ登録
spark.sql('''
CREATE TABLE IF NOT EXISTS loan_stat_gold
USING delta
LOCATION '/tmp/daiwt2021/loan_stat_gold.delta'
''')
DeltaテーブルをBIから参照する
BIツールから接続するDeltaテーブルのJDBC/ODBCのエンドポイント
Deltaテーブルの内容をDWH/RDBMSに書き出す
mysql_url = f'jdbc:mysql://{hostname}:{port}/{database}?user={username}&password={password}'
df_gold = spark.read.format('delta').load('/tmp/daiwt2021/loan_stat_gold.delta')
(
df_gold.write.format("jdbc")
.option("url", mysql_url)
.option("dbtable", "test")
.mode("overwrite")
.save()
)
Deltaテーブルの内容をファイルに書き出す
df_gold = spark.read.format('delta').load('/tmp/daiwt2021/loan_stat_gold.delta')
df_gold.write.format('csv' ).save('/tmp/daiwt2021/loan_stat_gold.csv')
df_gold.write.format('json' ).save('/tmp/daiwt2021/loan_stat_gold.json')
df_gold.write.format('parquet' ).save('/tmp/daiwt2021/loan_stat_gold.parquet')
df_gold.write.format('avro' ).save('/tmp/daiwt2021/loan_stat_gold.avro')
今までのETLのコードをまとめる
以下のコードでシンプルにETL処理が可能です。
### 1. Rawテーブル (CSV => bornze)
df_raw = (
spark.read.format('csv')
.option('Header', True)
.option('inferSchema', True)
.load('/databricks-datasets/lending-club-loan-stats/LoanStats_2018Q2.csv')
)
df_raw.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/loan_stats.delta')
sql("CREATE TABLE loan_stat_raw USING delta LOCATION '/tmp/daiwt2021/loan_stat.delta';")
### 2. データ整形 (bronze => silver)
from pyspark.sql.functions import col, expr
df_silver = (
spark.read.format('delta').load('/tmp/daiwt2021/loan_stats.delta')
.select('loan_amnt', 'term', 'int_rate', 'grade', 'addr_state', 'emp_title', 'home_ownership', 'annual_inc', 'loan_status')
.withColumn('int_rate', expr('cast(replace(int_rate,"%","") as float)'))
.withColumnRenamed('addr_state', 'state')
)
df_silver.write.format('delta').mode('overwrite').save( '/tmp/daiwt2021/loan_stat_silver.delta')
sql("CREATE TABLE loan_stat_silver USING delta LOCATION '/tmp/daiwt2021/loan_stat_silver.delta';"")
### 3. サマリテーブル (silver => gold)
df_gold = spark.sql('''
SELECT state, loan_status, count(*) as counts
FROM loan_stat_silver
GROUP BY state, loan_status
ORDER BY counts DESC
''')
df_gold.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/loan_stat_gold.delta')
sql("CREATE TABLE IF NOT EXISTS loan_stat_gold USING delta LOCATION '/tmp/daiwt2021/loan_stat_gold.delta'")
JSONを読み込む
JSON File => /mnt/training/ecommerce/events/events-500k.json
df_json = (
spark.read.format('json')
.option('inferSchema', True)
.load('/mnt/training/ecommerce/events/events-500k.json')
)
# deltaへの書き込み
df_json.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/events.delta')
# 確認
display( spark.read.format('delta').load('/tmp/daiwt2021/events.delta') )
LOGデータを読み込む
任意のテキストフォーマットでも対応可能です。
access.log(Webサーバーアクセスログ) => s3://databricks-ktmr-s3/var/log/access.log.*.gz
13.66.139.0 - - [19/Dec/2020:13:57:26 +0100] "GET /index.php?option=com_phocagallery&view=category&id=1:almhuette-raith&Itemid=53 HTTP/1.1" 200 32653 "-" "Mozilla/5.0 (compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm)" "-"
157.48.153.185 - - [19/Dec/2020:14:08:06 +0100] "GET /apache-log/access.log HTTP/1.1" 200 233 "-" "Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36" "-"
157.48.153.185 - - [19/Dec/2020:14:08:08 +0100] "GET /favicon.ico HTTP/1.1" 404 217 "http://www.almhuette-raith.at/apache-log/access.log" "Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36" "-"
...
from pyspark.sql.functions import split, regexp_extract, col, to_timestamp
raw_df = spark.read.text('s3://databricks-ktmr-s3/var/log/access.log.*.gz')
# Regexでログデータをパース
split_df = (
raw_df.select(
regexp_extract('value', r'^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', 1).alias('src_ip'),
regexp_extract('value', r'\[(.+?)\]', 1).alias('time_string'),
regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ "(.+) (.+) (HTTP.+?)"', 1).alias('method'),
regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ "(.+) (.+) (HTTP.+?)"', 2).alias('path'),
regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ "(.+) (.+) (HTTP.+?)"', 3).alias('version'),
regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ ".+HTTP.+?" (\d+) (\d+)', 1).cast('int').alias('status_code'),
regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ ".+HTTP.+?" (\d+) (\d+)', 2).cast('int').alias('content_size'),
regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ ".+HTTP.+?" (\d+) (\d+) "(.+?)" "(.+?)" "(.+?)"', 3).alias('host2'),
regexp_extract('value', r'^\S+ \S+ \S+ \S+ \S+ ".+HTTP.+?" (\d+) (\d+) "(.+?)" "(.+?)" "(.+?)"', 4).alias('user_agent')
)
.withColumn( 'timestamp', to_timestamp( col('time_string'), 'dd/MMM/yyyy:HH:mm:ss Z') )
.drop('time_string')
.filter( col('timestamp').isNotNull() )
)
# Deltaに書き出す
split_df.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/access_log.delta')
# 確認
display( spark.read.format('delta').load('/tmp/daiwt2021/access_log.delta') )
画像ファイル(バイナリファイル)を読み込む
Image Files => /databricks-datasets/cctvVideos/train_images/label=0/*.jpg
image_df = (
spark.read.format('binaryFile')
.option('mimeType', 'image/*')
.load('/databricks-datasets/cctvVideos/train_images/label=0/*.jpg')
)
spark.conf.set("spark.sql.parquet.compression.codec", "uncompressed")
image_df.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/train_image.delta')
spark.conf.set("spark.sql.parquet.compression.codec", "snappy")
# 確認
display( spark.read.format('delta').load('/tmp/daiwt2021/train_image.delta') )
RDBMS/DWHから読み込む
jdbcHostname = "example.databricks.training"
jdbcPort = 5432
jdbcDatabase = "training"
jdbcUrl = f"jdbc:postgresql://{jdbcHostname}:{jdbcPort}/{jdbcDatabase}"
query = '''
(
SELECT * FROM training.people_1m
WHERE salary > 100000
) emp_alias
'''
df = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProps)
df.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/salary.delta')
# 確認
display( spark.read.format('delta').load('/tmp/daiwt2021/salary.delta') )
ストリーミングを読み込む
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col
import re
# JSONデータのスキーマ定義
schema = StructType([
StructField("channel", StringType(), True),
StructField("comment", StringType(), True),
StructField("delta", IntegerType(), True),
StructField("flag", StringType(), True),
StructField("geocoding", StructType([ StructField("city", StringType(), True), StructField("country", StringType(), True), StructField("countryCode2", StringType(), True), StructField("countryCode3", StringType(), True), StructField("stateProvince", StringType(), True), StructField("latitude", DoubleType(), True), StructField("longitude", DoubleType(), True), ]), True),
StructField("isAnonymous", BooleanType(), True),
StructField("isNewPage", BooleanType(), True),
StructField("isRobot", BooleanType(), True),
StructField("isUnpatrolled", BooleanType(), True),
StructField("namespace", StringType(), True),
StructField("page", StringType(), True),
StructField("pageURL", StringType(), True),
StructField("timestamp", StringType(), True),
StructField("url", StringType(), True),
StructField("user", StringType(), True),
StructField("userURL", StringType(), True),
StructField("wikipediaURL", StringType(), True),
StructField("wikipedia", StringType(), True),
])
# 読み込み
stream_df = (
spark.readStream.format('kafka') # Kafkaをソースと指定
.option('kafka.bootstrap.servers', 'server123.databricks.training:9092')
.option('subscribe', 'en')
.load()
)
# ELTをして、Deltaに書き込む
(
stream_df
.withColumn('json', from_json(col('value').cast('string'), schema)) # Kafkaのバイナリデータを文字列に変換し、from_json()でJSONをパース
.select(col("json.*")) # JSONの子要素だけを取り出す
.writeStream # writeStream()でストリームを書き出す
.format('delta') # Deltaとして保存
.option('checkpointLocation', '/tmp/daiwt2021/stream.checkpoint') # チェックポイント保存先を指定
.outputMode('append') # マイクロバッチの結果をAppendで追加
.start('/tmp/daiwt2021/stream.delta') # start()でストリーム処理を開始 (アクション)
)
# 確認
df = spark.readStream.format('delta').load('/tmp/daiwt2021/stream.delta')
display( df )
随時更新追加されるファイルをストリーミングとして扱う
ファイルのディレクトリ => s3a://databricks-ktmr-s3/stocknet-dataset/tweet/raw/AAPL/*
- オブジェクトストレージ上にファイルが随時アップロードされるパターンもよくあります。
- Databricksでは、ストレージに新たな追加されたファイルを認識して、そのファイルのみ読み込むことができます。
- この場合、ストレージをストリーミングのソースとして利用することになります。
# (スキーマを楽して取得する)
df=spark.read.format('json').load('s3a://databricks-ktmr-s3/stocknet-dataset/tweet/raw/AAPL/2014-01-01')
tweet_schema = df.schema
Option1) 継続的に随時読み込む
df_autoloader = (
spark.readStream.format('cloudFiles')
.option('cloudFiles.format', 'json')
.option('cloudFiles.maxBytesPerTrigger', '50KB') # 一度に読むサイズ上限
.schema(tweet_schema)
.load('s3a://databricks-ktmr-s3/stocknet-dataset/tweet/raw/AAPL/*')
)
(
df_autoloader.writeStream.format('delta')
.option('checkpointLocation', '/tmp/daiwt2021/tweet.checkpoint')
.option('maxFilesPerTrigger', 25) # 一度に読むファイル数上限
.outputMode('append')
.trigger(processingTime='2 seconds') # 2秒に一度処理
#.trigger(once=True) # 一度だけ処理
.start('/tmp/daiwt2021/tweet.delta')
#.awaitTermination() # async => sync
)
Option2) 一度実行のみ、その際に新規ファイルがあった場合に読み込む
df_autoloader = (
spark.readStream.format('cloudFiles')
.option('cloudFiles.format', 'json')
#.option('cloudFiles.maxBytesPerTrigger', '50KB') # 一度に読むサイズ上限 <== Trigger Onceの時には無視される!
.schema(tweet_schema)
.load('s3a://databricks-ktmr-s3/stocknet-dataset/tweet/raw/AAPL/*')
)
(
df_autoloader.writeStream.format('delta')
.option('checkpointLocation', '/tmp/daiwt2021/tweet.checkpoint')
#.option('maxFilesPerTrigger', 25) # 一度に読むファイル数上限 <== Trigger Onceの時には無視される!
.outputMode('append')
.trigger(once=True) # 一度だけ処理
.start('/tmp/daiwt2021/tweet.delta')
.awaitTermination() # async => sync <== Trigger Onceの実行が終わるまでブロックさせる
)
# 確認
spark.read.format('delta').load('/tmp/daiwt2021/tweet.delta').count()
Job・自動実行・パイプライン化
Databricksではデータパイプラインの管理・定期実行するための機能があります(Jobs)。
以下のコードをJob化してみましょう。
### 1. ストレージから更新ファイルだけを認識して、Deltaテーブルに追記する
df_autoloader = (
spark.readStream.format('cloudFiles')
.option('cloudFiles.format', 'json')
.schema(tweet_schema)
.load('s3a://databricks-ktmr-s3/stocknet-dataset/tweet/raw/AAPL/*')
)
(
df_autoloader.writeStream.format('delta')
.option('checkpointLocation', '/tmp/daiwt2021/job/tweet.checkpoint')
.outputMode('append')
.trigger(once=True) # 一度だけ処理
.start('/tmp/daiwt2021/job/tweet.delta')
.awaitTermination() # async => sync
)
### 2. 上記のDeltaテーブルからサマリのDeltaテーブルを作る
df=spark.read.format('delta').load('/tmp/daiwt2021/job/tweet.delta')
(
df.groupBy('lang').count()
.write.format('delta').mode('overwrite').save('/tmp/daiwt2021/job/tweet_summary.delta')
)
sql("CREATE TABLE IF NOT EXISTS tweet_summary USING delta LOCATION '/tmp/daiwt2021/job/tweet_summary.delta'")
# 確認
display(
spark.read.format('delta').load('/tmp/daiwt2021/job/tweet_summary.delta')
)