概要
DatabricksのドキュメントにてCTAS(CREATE TABLE AS SELECT)が利用可能であるとの記載を見つけたため検証内容を共有します。一度作成したテーブルは、"CREATE OR REPLACE TABLE"で再作成が可能です。
%sql
create or replace table ctas.ctas_flights_summary_data
using delta
location "dbfs:/ctas/ctas_flights_summary_data"
AS select count(*) as count from ctas.flights_summary_data;
なお、CTAS相当のことを、データフレームで上書きしたとしても、下記のようなシンプルなコードで済みます。
df_update = spark.sql("""
select count(*) as count from ctas.flights_summary_data
""")
df_update.write.format("delta").mode("overwrite").saveAsTable("ctas.ctas_flights_summary_data")
手順
1. データの準備
# データの準備
filepath = "/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"
schema = """
DEST_COUNTRY_NAME STRING
,ORIGIN_COUNTRY_NAME STRING
,count INT
"""
df = (spark
.read
.format("csv")
.schema(schema)
.option("header", "true")
.option("inferSchema", "False")
.load(filepath)
)
df.write.format("delta").mode("overwrite").save("dbfs:/ctas/flights_summary_data")
%sql
create database if NOT EXISTS ctas;
create table if NOT EXISTS ctas.flights_summary_data
using delta
location "dbfs:/ctas/flights_summary_data";
%sql
-- データ確認
select count(*) as count from ctas.flights_summary_data
2. CTASのテーブルを作成
%sql
create table if not exists ctas.ctas_flights_summary_data
using delta
location "dbfs:/ctas/ctas_flights_summary_data"
AS select count(*) as count from ctas.flights_summary_data;
念のため、データの確認。
%sql
select * from ctas.ctas_flights_summary_data;
3. REFRESHにより更新されないことの確認
CTASテーブルの元テーブルのデータを削除して更新を実施。
%sql
truncate table ctas.flights_summary_data;
select count(*) as count from ctas.flights_summary_data;
この時点では、CTASのテーブルはデータ削除前のデータを保持。
%sql
-- データ確認
select * from ctas.ctas_flights_summary_data;
CTASテーブルの更新処理を実施。
%sql
refresh table ctas.ctas_flights_summary_data;
データが更新されていないことを確認。
%sql
-- データ確認
select * from ctas.ctas_flights_summary_data;
4. DROPとCREATEだけではエラー
先ほどのCTASテーブルと同様のディレクトリにデータがある場合に、CTASを実施するとエラーとなるようです。
%sql
drop table if exists ctas.ctas_flights_summary_data;
create table if not exists ctas.ctas_flights_summary_data
using delta
location "dbfs:/ctas/ctas_flights_summary_data"
AS select count(*) as count from ctas.flights_summary_data;
Error in SQL statement: AnalysisException: Cannot create table ('ctas
.ctas_flights_summary_data
'). The associated location ('dbfs:/ctas/ctas_flights_summary_data') is not empty but it's not a Delta table
5. ディレクトリのデータを削除してから、DROPとCREATEを実施することでCTASテーブルが更新される
dbutils.fs.rm("dbfs:/ctas/ctas_flights_summary_data", True)
%sql
drop table if exists ctas.ctas_flights_summary_data;
create table if not exists ctas.ctas_flights_summary_data
using delta
location "dbfs:/ctas/ctas_flights_summary_data"
AS select count(*) as count from ctas.flights_summary_data;
0と表示されて、データが更新されたことを確認。
%sql
-- データ確認
select * from ctas.ctas_flights_summary_data;
6. CREATE OR REPLACE TABLEを実施することでCTASテーブルが更新される
%sql
create or replace table ctas.ctas_flights_summary_data
using delta
location "dbfs:/ctas/ctas_flights_summary_data"
AS select count(*) as count from ctas.flights_summary_data;
0と表示されて、データが更新されたことを確認。
%sql
-- データ確認
select * from ctas.ctas_flights_summary_data;
7. 本検証で利用したリソースを削除
dbutils.fs.rm("/ctas", True)
%sql
drop database ctas CASCADE;
関連ドキュメント
- -Azure Databricks を使用した CREATE TABLE - Workspace | Microsoft Docs
- テーブルの更新-Azure Databricks - Workspace | Microsoft Docs
更新内容
記事投稿後、Delta Lake形式であれば、"CREATE OR REPLACE TABLE"によりCTASテーブルの再作成が実施できたため、記事の内容の一部を修正しました。