概要
Azure Synapse Analytics Dedicated SQL pool ( 専用 SQL プール) における Merge(Upsert)処理を、 TPC-H の LINEITEM テーブルのデータを用いて性能検証を実施する方法を共有します。
下記のリソースを利用します。Azure Datbricks にて TPC-H のデータを生成、および、TPC-H のParquet 化を実施します。その後、出力される SQL 文を Azure Synapse Analytics Dedicated SQL pool (専用 SQL プール)で実行します。
- Azure Synapse Analytics
- Azure Storage
- Azure Databricks
TPC-H の概要とデータ生成方法については、下記記事にて整理してます。
- データベースの性能検証に利用されるTPC-HとTPC-DSに関するざっくりとした整理 - Qiita
- Databrick(Spark)にて、TPC-H、および、TPC-DSのデータを生成する方法 - Qiita
本手順後に、下記表のデータを生成できます。Merge 処理時のソースでは約600万レコードを想定しており、ファイルサイズは Snappy 形式で圧縮した Parquet 形式のファイルサイズを示しております。
# | 想定処理 | 処理後のレコード数 | ソースのレコード数 | ソースの新規レコード数 | ソースの既存レコード数 | ファイルサイズ(MB) | テキストファイル時のサイズ(MB) |
---|---|---|---|---|---|---|---|
0 | 初期データのロード | 44,991,725 | 44,991,725 | 44,991,725 | 0 | ||
1 | 1度目の処理 | 47,989,007 | 5,996,953 | 2,997,282 | 2,999,671 | 245 | 774 |
2 | 2度目の処理 | 50,988,593 | 5,997,887 | 2,999,586 | 2,998,301 | 245 | 776 |
3 | 3度目の処理 | 53,986,608 | 5,996,879 | 2,998,015 | 2,998,864 | 245 | 778 |
4 | 4度目の処理 | 56,982,414 | 5,997,357 | 2,995,806 | 3,001,551 | 245 | 779 |
5 | 5度目の処理 | 59,986,052 | 6,000,991 | 3,003,638 | 2,997,353 | 245 | 779 |
2022 年 10 月 31 日時点にて、Merge 文により Upsert 処理を実施する場合には、ターゲットのテーブルがハッシュ分散テーブルのみに制限されております。
引用元:MERGE (Transact-SQL) - SQL Server | Microsoft Learn
本手順で記述しているストレージは削除済みです。
データと SQL 文の生成
TPC-H のデータ生成
scale_factor = "10" # Scale Factor数をセット
chunks_number = "20" # 出力ファイル数と並列処理数をセット
path = "file:/tmp/tpch_data"
dbfs_path = "dbfs:/FileStore/load_test/raw"
# ディレクトリの初期化
dbutils.fs.rm(path, True)
dbutils.fs.rm(dbfs_path, True)
import os
# shellスクリプトに渡す引数をセット
os.environ["scale_factor"] = scale_factor
print(os.getenv("scale_factor"))
os.environ["chunks_number"] = chunks_number
print(os.getenv("chunks_number"))
%sh
sudo apt-get install -y make gcc yacc
cd /tmp
rm -r ./tpch-dbgen -f
git clone https://github.com/databricks/tpch-dbgen.git
cd tpch-dbgen
make OS=LINUX WORKLOAD=TPCH
cd ..
rm -r ./tpch_data -f
mkdir tpch_data
cd tpch_data
for i in $( seq 1 ${chunks_number} ) ; do
../tpch-dbgen/dbgen -b ../tpch-dbgen/dists.dss -f -s ${scale_factor} -C ${chunks_number} -S ${i} -T L &
done
wait
file_list = dbutils.fs.ls(path)
# ファイル一覧を表示
display(file_list)
# ファイルサイズを表示
spark.createDataFrame(file_list).groupBy().sum("size").display()
# dbfs 上にファイルをコピー
dbutils.fs.cp(path, dbfs_path, True)
# データ件数チェック( 59,986,052 件のはず)
def create_tpch_lineitem_dataframe(
filepath,
):
schema = """
L_ORDERKEY INTEGER ,
L_PARTKEY INTEGER ,
L_SUPPKEY INTEGER ,
L_LINENUMBER INTEGER ,
L_QUANTITY DECIMAL(15,2) ,
L_EXTENDEDPRICE DECIMAL(15,2) ,
L_DISCOUNT DECIMAL(15,2) ,
L_TAX DECIMAL(15,2) ,
L_RETURNFLAG STRING ,
L_LINESTATUS STRING ,
L_SHIPDATE DATE ,
L_COMMITDATE DATE ,
L_RECEIPTDATE DATE ,
L_SHIPINSTRUCT STRING ,
L_SHIPMODE STRING ,
L_COMMENT STRING
"""
df = spark.read.format("csv").schema(schema).option("sep", "|").load(filepath)
return df
df = create_tpch_lineitem_dataframe(dbfs_path)
df.count()
TPC-H のデータの Parquet 形式への変換
parquet_file_paths = {
"base": [
f"{dbfs_path}/lineitem.tbl.1",
f"{dbfs_path}/lineitem.tbl.2",
f"{dbfs_path}/lineitem.tbl.3",
f"{dbfs_path}/lineitem.tbl.4",
f"{dbfs_path}/lineitem.tbl.5",
f"{dbfs_path}/lineitem.tbl.6",
f"{dbfs_path}/lineitem.tbl.7",
f"{dbfs_path}/lineitem.tbl.8",
f"{dbfs_path}/lineitem.tbl.9",
f"{dbfs_path}/lineitem.tbl.10",
f"{dbfs_path}/lineitem.tbl.11",
f"{dbfs_path}/lineitem.tbl.12",
f"{dbfs_path}/lineitem.tbl.13",
f"{dbfs_path}/lineitem.tbl.14",
f"{dbfs_path}/lineitem.tbl.15",
],
"wiret_01": [
f"{dbfs_path}/lineitem.tbl.1",
f"{dbfs_path}/lineitem.tbl.16",
],
"wiret_02": [
f"{dbfs_path}/lineitem.tbl.4",
f"{dbfs_path}/lineitem.tbl.17",
],
"wiret_03": [
f"{dbfs_path}/lineitem.tbl.7",
f"{dbfs_path}/lineitem.tbl.18",
],
"wiret_04": [
f"{dbfs_path}/lineitem.tbl.10",
f"{dbfs_path}/lineitem.tbl.19",
],
"wiret_05": [
f"{dbfs_path}/lineitem.tbl.13",
f"{dbfs_path}/lineitem.tbl.20",
],
}
base_dbfs_path = "dbfs:/FileStore/load_test"
parqeut_dbfs_path = f"{base_dbfs_path}/parquet"
def create_tpch_lineitem_dataframe(
filepath,
):
schema = """
L_ORDERKEY INTEGER ,
L_PARTKEY INTEGER ,
L_SUPPKEY INTEGER ,
L_LINENUMBER INTEGER ,
L_QUANTITY DECIMAL(15,2) ,
L_EXTENDEDPRICE DECIMAL(15,2) ,
L_DISCOUNT DECIMAL(15,2) ,
L_TAX DECIMAL(15,2) ,
L_RETURNFLAG STRING ,
L_LINESTATUS STRING ,
L_SHIPDATE DATE ,
L_COMMITDATE DATE ,
L_RECEIPTDATE DATE ,
L_SHIPINSTRUCT STRING ,
L_SHIPMODE STRING ,
L_COMMENT STRING
"""
df = spark.read.format("csv").schema(schema).option("sep", "|").load(filepath)
return df
for path, src_path in parquet_file_paths.items():
write_path = f"{parqeut_dbfs_path}/{path}"
# 指定パスのデータフレームを作成
raw_df = create_tpch_lineitem_dataframe(src_path)
# Synapse docs に記載があるパラメータを設定
spark.conf.set("spark.sql.parquet.writeLegacyFormat", "true")
# parqeut 形式で書き込み
raw_df.write.format("parquet").mode("overwrite").save(write_path)
spark.conf.set("spark.sql.parquet.writeLegacyFormat", "false")
print(f"{path} is finished.")
# ファイル一覧を表示
file_list = dbutils.fs.ls(f"{parqeut_dbfs_path}")
display(file_list)
Parquet 形式のTPC-H のデータをストレージへの書き込み
storage_account = "synapsetest123456689343"
container_name = "synapse"
access_key = "09jIsfx97uwoW1Vc+CXalhXCuf2ONI+gJB0YyWXh5mGKVi8HmX8Q7HFV8cdXby7o9O4j181048B0+AStJWf7ug=="
spark.conf.set(
f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", access_key
)
# SAS キーで実施する場合には下記で実施
# storage_account = 'synapsetest123345'
# container_name = 'synapse-load'
# token = '?sv=2021-06-08&ss=b&srt=sco&sp=rwdlacyx&se=2022-11-04T18:45:33Z&st=2022-10-28T10:45:33Z&spr=https&sig=XJLZh9FVzHZvia2VYlTzfKHa7%2FYcKljfo%2F9z3Dx'
# spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "SAS")
# spark.conf.set(f"fs.azure.sas.token.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
# spark.conf.set(f"fs.azure.sas.fixed.token.{storage_account}.dfs.core.windows.net", token)
# 接続確認
storage_dir = f"abfss://{container_name}@{storage_account}.dfs.core.windows.net"
dbutils.fs.ls(storage_dir)
tpch_dir = f"{storage_dir}/tpch"
# ストレージ上のディレクトリを初期化
dbutils.fs.rm(tpch_dir, True)
# `tpch` ディレクトリをストレージ側に作成後、dbfs 上の parquet ファイルをコピー
dbutils.fs.mkdirs(tpch_dir)
dbutils.fs.cp(parqeut_dbfs_path, tpch_dir, True)
# ストレージ上のファイルチェック
dbutils.fs.ls(tpch_dir)
Synapse における検証用 SQL の生成
変数をセット
storage_account = "synapsetest123456689343"
container_name = "synapse"
access_key = "09jIsfx97uwoW1Vc+CXalhXCuf2ONI+gJB0YyWXh5mGKVi8HmX8Q7HFV8cdXby7o9O4j181048B0+AStJWf7ug=="
spark.conf.set(
f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", access_key
)
# 接続確認
storage_dir = f"abfss://{container_name}@{storage_account}.dfs.core.windows.net"
dbutils.fs.ls(storage_dir)
data_dir = "tpch"
base_path = f"{data_dir}/base/*.parquet"
wiret_01 = f"{data_dir}/wiret_01/*.parquet"
wiret_02 = f"{data_dir}/wiret_02/*.parquet"
wiret_03 = f"{data_dir}/wiret_03/*.parquet"
wiret_04 = f"{data_dir}/wiret_04/*.parquet"
wiret_05 = f"{data_dir}/wiret_05/*.parquet"
ワークロードを作成
# ユーザー名を取得
sql = """
SELECT user_name();
"""
print(sql)
# ユーザー名を設定
user_name = "dbo"
sql = f"""
CREATE WORKLOAD GROUP DataLoads
WITH (
MIN_PERCENTAGE_RESOURCE = 0
,CAP_PERCENTAGE_RESOURCE = 100
,REQUEST_MIN_RESOURCE_GRANT_PERCENT = 100
);
CREATE WORKLOAD CLASSIFIER [wgcELTLogin]
WITH (
WORKLOAD_GROUP = 'DataLoads'
,MEMBERNAME = '{user_name}'
);
"""
print(sql)
テーブルを作成
複数列のハッシュ分散テーブルを作成
ddl = """
-- ハッシュ分散テーブル
ALTER DATABASE SCOPED CONFIGURATION SET DW_COMPATIBILITY_LEVEL = 9000;
IF OBJECT_ID (N'dbo.LINEITEM', N'U') IS NOT NULL
DROP TABLE dbo.LINEITEM;
CREATE TABLE dbo.LINEITEM
(
L_ORDERKEY integer
,L_PARTKEY integer
,L_SUPPKEY integer
,L_LINENUMBER integer
,L_QUANTITY NUMERIC(15,2)
,L_EXTENDEDPRICE NUMERIC(15,2)
,L_DISCOUNT NUMERIC(15,2)
,L_TAX NUMERIC(15,2)
,L_RETURNFLAG varchar(1)
,L_LINESTATUS varchar(1)
,L_SHIPDATE date
,L_COMMITDATE date
,L_RECEIPTDATE date
,L_SHIPINSTRUCT varchar(25)
,L_SHIPMODE varchar(10)
,L_COMMENT varchar(44)
)
WITH
(
DISTRIBUTION = HASH (L_ORDERKEY, L_LINENUMBER),
HEAP
)
;
"""
print(ddl)
単一列のハッシュ分散テーブルを作成
ddl = """
IF OBJECT_ID (N'dbo.LINEITEM', N'U') IS NOT NULL
DROP TABLE dbo.LINEITEM;
CREATE TABLE dbo.LINEITEM
(
L_ORDERKEY integer
,L_PARTKEY integer
,L_SUPPKEY integer
,L_LINENUMBER integer
,L_QUANTITY NUMERIC(15,2)
,L_EXTENDEDPRICE NUMERIC(15,2)
,L_DISCOUNT NUMERIC(15,2)
,L_TAX NUMERIC(15,2)
,L_RETURNFLAG varchar(1)
,L_LINESTATUS varchar(1)
,L_SHIPDATE date
,L_COMMITDATE date
,L_RECEIPTDATE date
,L_SHIPINSTRUCT varchar(25)
,L_SHIPMODE varchar(10)
,L_COMMENT varchar(44)
)
WITH
(
DISTRIBUTION = HASH (L_LINENUMBER),
HEAP
)
;
"""
print(ddl)
ラウンドロビン分散テーブルを作成
ddl = """
-- ラウンドロビンテーブル
IF OBJECT_ID (N'dbo.LINEITEM', N'U') IS NOT NULL
DROP TABLE dbo.LINEITEM;
CREATE TABLE dbo.LINEITEM
(
L_ORDERKEY integer
,L_PARTKEY integer
,L_SUPPKEY integer
,L_LINENUMBER integer
,L_QUANTITY NUMERIC(15,2)
,L_EXTENDEDPRICE NUMERIC(15,2)
,L_DISCOUNT NUMERIC(15,2)
,L_TAX NUMERIC(15,2)
,L_RETURNFLAG varchar(1)
,L_LINESTATUS varchar(1)
,L_SHIPDATE date
,L_COMMITDATE date
,L_RECEIPTDATE date
,L_SHIPINSTRUCT varchar(25)
,L_SHIPMODE varchar(10)
,L_COMMENT varchar(44)
)
WITH
(
DISTRIBUTION = ROUND_ROBIN,
HEAP
)
;
"""
print(ddl)
初期データをロード
sql = f"""
-- 44991725 レコード
IF OBJECT_ID(N'tempdb..#LINEITEM', N'U') IS NOT NULL
DROP TABLE #LINEITEM;
TRUNCATE TABLE dbo.LINEITEM;
COPY INTO #LINEITEM
FROM 'https://{storage_account}.blob.core.windows.net/{container_name}/{base_path}'
WITH (
FILE_TYPE = 'PARQUET',
CREDENTIAL = (IDENTITY= 'Storage Account Key', SECRET='{access_key}'),
AUTO_CREATE_TABLE = 'ON'
)
;
INSERT INTO dbo.LINEITEM
SELECT * FROM #LINEITEM
select count(*) FROM dbo.LINEITEM;
"""
print(sql)
Merge 処理
1 回目のデータを更新
sql = f"""
IF OBJECT_ID(N'tempdb..#LINEITEM', N'U') IS NOT NULL
DROP TABLE #LINEITEM;
COPY INTO #LINEITEM
FROM 'https://{storage_account}.blob.core.windows.net/{container_name}/{wiret_01}'
WITH (
FILE_TYPE = 'PARQUET',
CREDENTIAL = (IDENTITY= 'Storage Account Key', SECRET='{access_key}'),
AUTO_CREATE_TABLE = 'ON'
)
;
MERGE INTO
dbo.LINEITEM AS tgt
USING
#LINEITEM AS src
ON
(
tgt.L_ORDERKEY = src.L_ORDERKEY
and tgt.L_LINENUMBER = src.L_LINENUMBER
)
WHEN MATCHED THEN
UPDATE
SET
tgt.L_ORDERKEY = src.L_ORDERKEY
,tgt.L_PARTKEY = src.L_PARTKEY
,tgt.L_SUPPKEY = src.L_SUPPKEY
,tgt.L_LINENUMBER = src.L_LINENUMBER
,tgt.L_QUANTITY = src.L_QUANTITY
,tgt.L_EXTENDEDPRICE = src.L_EXTENDEDPRICE
,tgt.L_DISCOUNT = src.L_DISCOUNT
,tgt.L_TAX = src.L_TAX
,tgt.L_RETURNFLAG = src.L_RETURNFLAG
,tgt.L_LINESTATUS = src.L_LINESTATUS
,tgt.L_SHIPDATE = src.L_SHIPDATE
,tgt.L_COMMITDATE = src.L_COMMITDATE
,tgt.L_RECEIPTDATE = src.L_RECEIPTDATE
,tgt.L_SHIPINSTRUCT = src.L_SHIPINSTRUCT
,tgt.L_SHIPMODE = src.L_SHIPMODE
,tgt.L_COMMENT = src.L_COMMENT
WHEN NOT MATCHED THEN
INSERT
(
L_ORDERKEY
,L_PARTKEY
,L_SUPPKEY
,L_LINENUMBER
,L_QUANTITY
,L_EXTENDEDPRICE
,L_DISCOUNT
,L_TAX
,L_RETURNFLAG
,L_LINESTATUS
,L_SHIPDATE
,L_COMMITDATE
,L_RECEIPTDATE
,L_SHIPINSTRUCT
,L_SHIPMODE
,L_COMMENT
)
VALUES
(
L_ORDERKEY
,L_PARTKEY
,L_SUPPKEY
,L_LINENUMBER
,L_QUANTITY
,L_EXTENDEDPRICE
,L_DISCOUNT
,L_TAX
,L_RETURNFLAG
,L_LINESTATUS
,L_SHIPDATE
,L_COMMITDATE
,L_RECEIPTDATE
,L_SHIPINSTRUCT
,L_SHIPMODE
,L_COMMENT
)
;
"""
print(sql)
2 回目のデータを更新
sql = f"""
IF OBJECT_ID(N'tempdb..#LINEITEM', N'U') IS NOT NULL
DROP TABLE #LINEITEM;
COPY INTO #LINEITEM
FROM 'https://{storage_account}.blob.core.windows.net/{container_name}/{wiret_02}'
WITH (
FILE_TYPE = 'PARQUET',
CREDENTIAL = (IDENTITY= 'Storage Account Key', SECRET='{access_key}'),
AUTO_CREATE_TABLE = 'ON'
)
;
MERGE INTO
dbo.LINEITEM AS tgt
USING
#LINEITEM AS src
ON
(
tgt.L_ORDERKEY = src.L_ORDERKEY
and tgt.L_LINENUMBER = src.L_LINENUMBER
)
WHEN MATCHED THEN
UPDATE
SET
tgt.L_ORDERKEY = src.L_ORDERKEY
,tgt.L_PARTKEY = src.L_PARTKEY
,tgt.L_SUPPKEY = src.L_SUPPKEY
,tgt.L_LINENUMBER = src.L_LINENUMBER
,tgt.L_QUANTITY = src.L_QUANTITY
,tgt.L_EXTENDEDPRICE = src.L_EXTENDEDPRICE
,tgt.L_DISCOUNT = src.L_DISCOUNT
,tgt.L_TAX = src.L_TAX
,tgt.L_RETURNFLAG = src.L_RETURNFLAG
,tgt.L_LINESTATUS = src.L_LINESTATUS
,tgt.L_SHIPDATE = src.L_SHIPDATE
,tgt.L_COMMITDATE = src.L_COMMITDATE
,tgt.L_RECEIPTDATE = src.L_RECEIPTDATE
,tgt.L_SHIPINSTRUCT = src.L_SHIPINSTRUCT
,tgt.L_SHIPMODE = src.L_SHIPMODE
,tgt.L_COMMENT = src.L_COMMENT
WHEN NOT MATCHED THEN
INSERT
(
L_ORDERKEY
,L_PARTKEY
,L_SUPPKEY
,L_LINENUMBER
,L_QUANTITY
,L_EXTENDEDPRICE
,L_DISCOUNT
,L_TAX
,L_RETURNFLAG
,L_LINESTATUS
,L_SHIPDATE
,L_COMMITDATE
,L_RECEIPTDATE
,L_SHIPINSTRUCT
,L_SHIPMODE
,L_COMMENT
)
VALUES
(
L_ORDERKEY
,L_PARTKEY
,L_SUPPKEY
,L_LINENUMBER
,L_QUANTITY
,L_EXTENDEDPRICE
,L_DISCOUNT
,L_TAX
,L_RETURNFLAG
,L_LINESTATUS
,L_SHIPDATE
,L_COMMITDATE
,L_RECEIPTDATE
,L_SHIPINSTRUCT
,L_SHIPMODE
,L_COMMENT
)
;
"""
print(sql)
3 回目のデータを更新
sql = f"""
IF OBJECT_ID(N'tempdb..#LINEITEM', N'U') IS NOT NULL
DROP TABLE #LINEITEM;
COPY INTO #LINEITEM
FROM 'https://{storage_account}.blob.core.windows.net/{container_name}/{wiret_03}'
WITH (
FILE_TYPE = 'PARQUET',
CREDENTIAL = (IDENTITY= 'Storage Account Key', SECRET='{access_key}'),
AUTO_CREATE_TABLE = 'ON'
)
;
MERGE INTO
dbo.LINEITEM AS tgt
USING
#LINEITEM AS src
ON
(
tgt.L_ORDERKEY = src.L_ORDERKEY
and tgt.L_LINENUMBER = src.L_LINENUMBER
)
WHEN MATCHED THEN
UPDATE
SET
tgt.L_ORDERKEY = src.L_ORDERKEY
,tgt.L_PARTKEY = src.L_PARTKEY
,tgt.L_SUPPKEY = src.L_SUPPKEY
,tgt.L_LINENUMBER = src.L_LINENUMBER
,tgt.L_QUANTITY = src.L_QUANTITY
,tgt.L_EXTENDEDPRICE = src.L_EXTENDEDPRICE
,tgt.L_DISCOUNT = src.L_DISCOUNT
,tgt.L_TAX = src.L_TAX
,tgt.L_RETURNFLAG = src.L_RETURNFLAG
,tgt.L_LINESTATUS = src.L_LINESTATUS
,tgt.L_SHIPDATE = src.L_SHIPDATE
,tgt.L_COMMITDATE = src.L_COMMITDATE
,tgt.L_RECEIPTDATE = src.L_RECEIPTDATE
,tgt.L_SHIPINSTRUCT = src.L_SHIPINSTRUCT
,tgt.L_SHIPMODE = src.L_SHIPMODE
,tgt.L_COMMENT = src.L_COMMENT
WHEN NOT MATCHED THEN
INSERT
(
L_ORDERKEY
,L_PARTKEY
,L_SUPPKEY
,L_LINENUMBER
,L_QUANTITY
,L_EXTENDEDPRICE
,L_DISCOUNT
,L_TAX
,L_RETURNFLAG
,L_LINESTATUS
,L_SHIPDATE
,L_COMMITDATE
,L_RECEIPTDATE
,L_SHIPINSTRUCT
,L_SHIPMODE
,L_COMMENT
)
VALUES
(
L_ORDERKEY
,L_PARTKEY
,L_SUPPKEY
,L_LINENUMBER
,L_QUANTITY
,L_EXTENDEDPRICE
,L_DISCOUNT
,L_TAX
,L_RETURNFLAG
,L_LINESTATUS
,L_SHIPDATE
,L_COMMITDATE
,L_RECEIPTDATE
,L_SHIPINSTRUCT
,L_SHIPMODE
,L_COMMENT
)
;
"""
print(sql)
4 回目のデータを更新
sql = f"""
IF OBJECT_ID(N'tempdb..#LINEITEM', N'U') IS NOT NULL
DROP TABLE #LINEITEM;
COPY INTO #LINEITEM
FROM 'https://{storage_account}.blob.core.windows.net/{container_name}/{wiret_04}'
WITH (
FILE_TYPE = 'PARQUET',
CREDENTIAL = (IDENTITY= 'Storage Account Key', SECRET='{access_key}'),
AUTO_CREATE_TABLE = 'ON'
)
;
MERGE INTO
dbo.LINEITEM AS tgt
USING
#LINEITEM AS src
ON
(
tgt.L_ORDERKEY = src.L_ORDERKEY
and tgt.L_LINENUMBER = src.L_LINENUMBER
)
WHEN MATCHED THEN
UPDATE
SET
tgt.L_ORDERKEY = src.L_ORDERKEY
,tgt.L_PARTKEY = src.L_PARTKEY
,tgt.L_SUPPKEY = src.L_SUPPKEY
,tgt.L_LINENUMBER = src.L_LINENUMBER
,tgt.L_QUANTITY = src.L_QUANTITY
,tgt.L_EXTENDEDPRICE = src.L_EXTENDEDPRICE
,tgt.L_DISCOUNT = src.L_DISCOUNT
,tgt.L_TAX = src.L_TAX
,tgt.L_RETURNFLAG = src.L_RETURNFLAG
,tgt.L_LINESTATUS = src.L_LINESTATUS
,tgt.L_SHIPDATE = src.L_SHIPDATE
,tgt.L_COMMITDATE = src.L_COMMITDATE
,tgt.L_RECEIPTDATE = src.L_RECEIPTDATE
,tgt.L_SHIPINSTRUCT = src.L_SHIPINSTRUCT
,tgt.L_SHIPMODE = src.L_SHIPMODE
,tgt.L_COMMENT = src.L_COMMENT
WHEN NOT MATCHED THEN
INSERT
(
L_ORDERKEY
,L_PARTKEY
,L_SUPPKEY
,L_LINENUMBER
,L_QUANTITY
,L_EXTENDEDPRICE
,L_DISCOUNT
,L_TAX
,L_RETURNFLAG
,L_LINESTATUS
,L_SHIPDATE
,L_COMMITDATE
,L_RECEIPTDATE
,L_SHIPINSTRUCT
,L_SHIPMODE
,L_COMMENT
)
VALUES
(
L_ORDERKEY
,L_PARTKEY
,L_SUPPKEY
,L_LINENUMBER
,L_QUANTITY
,L_EXTENDEDPRICE
,L_DISCOUNT
,L_TAX
,L_RETURNFLAG
,L_LINESTATUS
,L_SHIPDATE
,L_COMMITDATE
,L_RECEIPTDATE
,L_SHIPINSTRUCT
,L_SHIPMODE
,L_COMMENT
)
;
"""
print(sql)
5 回目のデータを更新
sql = f"""
IF OBJECT_ID(N'tempdb..#LINEITEM', N'U') IS NOT NULL
DROP TABLE #LINEITEM;
COPY INTO #LINEITEM
FROM 'https://{storage_account}.blob.core.windows.net/{container_name}/{wiret_05}'
WITH (
FILE_TYPE = 'PARQUET',
CREDENTIAL = (IDENTITY= 'Storage Account Key', SECRET='{access_key}'),
AUTO_CREATE_TABLE = 'ON'
)
;
MERGE INTO
dbo.LINEITEM AS tgt
USING
#LINEITEM AS src
ON
(
tgt.L_ORDERKEY = src.L_ORDERKEY
and tgt.L_LINENUMBER = src.L_LINENUMBER
)
WHEN MATCHED THEN
UPDATE
SET
tgt.L_ORDERKEY = src.L_ORDERKEY
,tgt.L_PARTKEY = src.L_PARTKEY
,tgt.L_SUPPKEY = src.L_SUPPKEY
,tgt.L_LINENUMBER = src.L_LINENUMBER
,tgt.L_QUANTITY = src.L_QUANTITY
,tgt.L_EXTENDEDPRICE = src.L_EXTENDEDPRICE
,tgt.L_DISCOUNT = src.L_DISCOUNT
,tgt.L_TAX = src.L_TAX
,tgt.L_RETURNFLAG = src.L_RETURNFLAG
,tgt.L_LINESTATUS = src.L_LINESTATUS
,tgt.L_SHIPDATE = src.L_SHIPDATE
,tgt.L_COMMITDATE = src.L_COMMITDATE
,tgt.L_RECEIPTDATE = src.L_RECEIPTDATE
,tgt.L_SHIPINSTRUCT = src.L_SHIPINSTRUCT
,tgt.L_SHIPMODE = src.L_SHIPMODE
,tgt.L_COMMENT = src.L_COMMENT
WHEN NOT MATCHED THEN
INSERT
(
L_ORDERKEY
,L_PARTKEY
,L_SUPPKEY
,L_LINENUMBER
,L_QUANTITY
,L_EXTENDEDPRICE
,L_DISCOUNT
,L_TAX
,L_RETURNFLAG
,L_LINESTATUS
,L_SHIPDATE
,L_COMMITDATE
,L_RECEIPTDATE
,L_SHIPINSTRUCT
,L_SHIPMODE
,L_COMMENT
)
VALUES
(
L_ORDERKEY
,L_PARTKEY
,L_SUPPKEY
,L_LINENUMBER
,L_QUANTITY
,L_EXTENDEDPRICE
,L_DISCOUNT
,L_TAX
,L_RETURNFLAG
,L_LINESTATUS
,L_SHIPDATE
,L_COMMITDATE
,L_RECEIPTDATE
,L_SHIPINSTRUCT
,L_SHIPMODE
,L_COMMENT
)
;
"""
print(sql)