この記事はfreeeデータに関わる人たち Advent Calendar 2020の12日分(だけど書くのめちゃ遅れた)の記事です。
まとめ
- Delta Lakeさわってみた
- 集計分析でDBライクにFile Objectを扱いたいときによさそう
動機
- 以前からちらほら聞くことがあったDeltaLakeをちょっと知りたかった
Delta Lakeとは
- Storage Layerの便利ツール
- デフォルトでParquetのみをサポート(ORCはなさそう)
- Sparkを前提
- ACIDなTransactionを実現する仕組み
- Delta Log
- Time Travelできる
サクッとつかった例
- Read/Write
# write
# formatとしてdeltaを指定
df.write.format("delta").mode("overwrite").save("/path/to/table")
# read as Spark DataFrame
data = spark.read.format("delta").load("/path/to/table")
data.withColumn(...)
...
# read as DeltaTable(wrapper)
from delta.tables import DeltaTable
deltaTable = DeltaTable.forPath(spark, "/path/to/table")
deltaTable.update(...)
...
- Delta Log
$ ls /path/to/table/_delta_log/
00000000000000000000.json 00000000000000000001.json...
$ cat 00000000000000000000.json
{"commitInfo":{"timestamp":1606459693818,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"isBlindAppend":false,"operationMetrics":{"numFiles":"4","numOutputBytes":"1894","numOutputRows":"10"}}}...
AWS Glueで使えるか試してみた
雰囲気はつかめたので、弊社で利用しているAWS GlueのJob Scriptで呼び出せるか試してみた。
- JarをS3に置いて指定
- Job Script
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from awsglue.context import GlueContext
sc = SparkContext()
gc = GlueContext(sc)
gc.setConf("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
gc.setConf("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
gc.setConf("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")
sc.addPyFile("delta-core_2.12-0.7.0.jar")
from delta.tables import *
spark = SparkSession(gc)
delta_path = "s3://path/to/table"
data = spark.range(0, 100)
data.write.format("delta").mode("overwrite").save(delta_path)
- ref: https://stackoverflow.com/questions/63186034/read-write-delta-lake-tables-on-s3-using-aws-glue-jobs
s3のkeyが無いと怒られるのでちょっと違うかも、、
というところで力尽きたので来年の自分に期待して終わります・・・(AWS Glue無理じゃねっという記事もみたのでたぶんできないかも?)