1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Delta Lakeをさわってみた話 + α

Last updated at Posted at 2020-12-30

この記事はfreeeデータに関わる人たち Advent Calendar 2020の12日分(だけど書くのめちゃ遅れた)の記事です。

まとめ

  • Delta Lakeさわってみた
  • 集計分析でDBライクにFile Objectを扱いたいときによさそう

動機

  • 以前からちらほら聞くことがあったDeltaLakeをちょっと知りたかった

Delta Lakeとは

  • Storage Layerの便利ツール
    • デフォルトでParquetのみをサポート(ORCはなさそう)
    • Sparkを前提
    • ACIDなTransactionを実現する仕組み
      • Delta Log
    • Time Travelできる

サクッとつかった例

  • Read/Write
# write
# formatとしてdeltaを指定
df.write.format("delta").mode("overwrite").save("/path/to/table")

# read as Spark DataFrame
data = spark.read.format("delta").load("/path/to/table") 
data.withColumn(...)
...

# read as DeltaTable(wrapper)
from delta.tables import DeltaTable
deltaTable = DeltaTable.forPath(spark, "/path/to/table")
deltaTable.update(...)
...
  • Delta Log
$ ls /path/to/table/_delta_log/
00000000000000000000.json  00000000000000000001.json...

$ cat 00000000000000000000.json
{"commitInfo":{"timestamp":1606459693818,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"isBlindAppend":false,"operationMetrics":{"numFiles":"4","numOutputBytes":"1894","numOutputRows":"10"}}}...

AWS Glueで使えるか試してみた

雰囲気はつかめたので、弊社で利用しているAWS GlueのJob Scriptで呼び出せるか試してみた。

  • JarをS3に置いて指定

DeltaLake.png

  • Job Script
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from awsglue.context import GlueContext

sc = SparkContext()
gc = GlueContext(sc)

gc.setConf("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
gc.setConf("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
gc.setConf("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")

sc.addPyFile("delta-core_2.12-0.7.0.jar")

from delta.tables import *

spark = SparkSession(gc)

delta_path = "s3://path/to/table"
data = spark.range(0, 100)
data.write.format("delta").mode("overwrite").save(delta_path)

s3のkeyが無いと怒られるのでちょっと違うかも、、
というところで力尽きたので来年の自分に期待して終わります・・・(AWS Glue無理じゃねっという記事もみたのでたぶんできないかも?)

1
0
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?