テーブル変更後の更新でエラーになる
write実行時にDelta Lake でのスキーマがチェックされ、合致しなければエラーになってしまいます。
間違い防止にすごくありがたいこの機能ですが、仕様変更で新しい列が追加されたり、データの型が変更になってしまうととたんに厄介者になってしまいます。
開発時はテーブル削除でしのげても、いったん運用になってしまうとそうはいきません。
.option('mergeSchema','true')
- 新しい列の追加(最も一般的なシナリオ)
- NullType からあらゆる型へのデータ型変更。もしくは、ByteType、ShortType、IntegerType へのアップキャスト
公式ページではケースとして上記が挙げられているけど NullType …正直わからない!のでここでは、新しい列追加に絞って記載します。
いままでのテーブル/parquetファイルに新しい列を追加したいなら、Sparkの .write コマンドまたは.writeStreamコマンドに、.option('mergeSchema','true') を追加しよう。
# Add the mergeSchema option
loans.write.format("delta") \
.option("mergeSchema", "true") \
.mode("append") \
.save(DELTALAKE_SILVER_PATH)
このオプションを使用することで、既存の列を参照するモデルに影響を及ぼすことなく、運用中のテーブルに新しい列を追加できます。例えば、データサイエンティストやエンジニアが、新しい追跡指標や直近の売上データの列を追加したい場合などが想定されます。
これで運用中のテーブルでも新しい列を追加できる!
.option('overwriteSchema','true')
新しい列の追加じゃなくて、列の削除とか既存の列のデータ型変更なんだよぅ!
という場合は、こちら。
- 列のドロップ
- 既存の列のデータ型変更
- 大文字と小文字で区別された列名の変更(例:「Foo」と「foo」)
こんなときはスキーマとデータを上書きするための .option("overwriteSchema","true") を追加しよう。
列の型を変更する
spark.read.table(...) \
.withColumn("birthDate", col("birthDate").cast("date")) \
.write \
.format("delta") \
.mode("overwrite")
.option("overwriteSchema", "true") \
.saveAsTable(...)
列の名前を変更する
spark.read.table(...) \
.withColumnRenamed("dateOfBirth", "birthDate") \
.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.saveAsTable(...)
これで仕様変更があってもバッチリ!
この記事は以下の記事を参考に書きました。
Delta Lake でのスキーマ(schema)DB の適用・展開とは
https://databricks.com/jp/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html
テーブルのバッチ読み取りと書き込み
https://docs.microsoft.com/ja-jp/azure/databricks/delta/delta-batch