1
2

More than 1 year has passed since last update.

Azure Synapse Analytics Dedicated SQL pool ( 専用 SQL プール) における Merge(Upsert)処理の性能検証を実施する方法

Last updated at Posted at 2022-10-31

概要

Azure Synapse Analytics Dedicated SQL pool ( 専用 SQL プール) における Merge(Upsert)処理を、 TPC-H の LINEITEM テーブルのデータを用いて性能検証を実施する方法を共有します。

下記のリソースを利用します。Azure Datbricks にて TPC-H のデータを生成、および、TPC-H のParquet 化を実施します。その後、出力される SQL 文を Azure Synapse Analytics Dedicated SQL pool (専用 SQL プール)で実行します。

  • Azure Synapse Analytics
  • Azure Storage
  • Azure Databricks

TPC-H の概要とデータ生成方法については、下記記事にて整理してます。

本手順後に、下記表のデータを生成できます。Merge 処理時のソースでは約600万レコードを想定しており、ファイルサイズは Snappy 形式で圧縮した Parquet 形式のファイルサイズを示しております。

# 想定処理 処理後のレコード数 ソースのレコード数 ソースの新規レコード数 ソースの既存レコード数 ファイルサイズ(MB) テキストファイル時のサイズ(MB)
0 初期データのロード 44,991,725 44,991,725 44,991,725 0
1 1度目の処理 47,989,007 5,996,953 2,997,282 2,999,671 245 774
2 2度目の処理 50,988,593 5,997,887 2,999,586 2,998,301 245 776
3 3度目の処理 53,986,608 5,996,879 2,998,015 2,998,864 245 778
4 4度目の処理 56,982,414 5,997,357 2,995,806 3,001,551 245 779
5 5度目の処理 59,986,052 6,000,991 3,003,638 2,997,353 245 779

2022 年 10 月 31 日時点にて、Merge 文により Upsert 処理を実施する場合には、ターゲットのテーブルがハッシュ分散テーブルのみに制限されております。

image.png

引用元:MERGE (Transact-SQL) - SQL Server | Microsoft Learn

本手順で記述しているストレージは削除済みです。

データと SQL 文の生成

TPC-H のデータ生成

scale_factor = "10"  # Scale Factor数をセット
chunks_number = "20"  # 出力ファイル数と並列処理数をセット
 
path = "file:/tmp/tpch_data"
dbfs_path = "dbfs:/FileStore/load_test/raw"
# ディレクトリの初期化
dbutils.fs.rm(path, True)
dbutils.fs.rm(dbfs_path, True)

image.png

import os
 
# shellスクリプトに渡す引数をセット
os.environ["scale_factor"] = scale_factor
print(os.getenv("scale_factor"))
 
os.environ["chunks_number"] = chunks_number
print(os.getenv("chunks_number"))

image.png

%sh
sudo apt-get install -y make gcc yacc
cd /tmp
rm -r ./tpch-dbgen -f
git clone https://github.com/databricks/tpch-dbgen.git
cd tpch-dbgen
make OS=LINUX WORKLOAD=TPCH
cd ..
rm -r ./tpch_data -f
mkdir tpch_data
cd tpch_data
 
for i in $( seq 1 ${chunks_number} ) ; do
    ../tpch-dbgen/dbgen -b ../tpch-dbgen/dists.dss -f -s ${scale_factor} -C ${chunks_number} -S ${i} -T L &
done
wait

image.png

file_list = dbutils.fs.ls(path)
 
# ファイル一覧を表示
display(file_list)
 
# ファイルサイズを表示
spark.createDataFrame(file_list).groupBy().sum("size").display()

image.png

# dbfs 上にファイルをコピー
dbutils.fs.cp(path, dbfs_path, True)

image.png

# データ件数チェック( 59,986,052 件のはず)
def create_tpch_lineitem_dataframe(
    filepath,
):
 
    schema = """
      L_ORDERKEY    INTEGER ,
      L_PARTKEY     INTEGER ,
      L_SUPPKEY     INTEGER ,
      L_LINENUMBER  INTEGER ,
      L_QUANTITY    DECIMAL(15,2) ,
      L_EXTENDEDPRICE  DECIMAL(15,2) ,
      L_DISCOUNT    DECIMAL(15,2) ,
      L_TAX         DECIMAL(15,2) ,
      L_RETURNFLAG  STRING ,
      L_LINESTATUS  STRING ,
      L_SHIPDATE    DATE ,
      L_COMMITDATE  DATE ,
      L_RECEIPTDATE DATE ,
      L_SHIPINSTRUCT STRING ,
      L_SHIPMODE     STRING ,
      L_COMMENT      STRING
    """
 
    df = spark.read.format("csv").schema(schema).option("sep", "|").load(filepath)
    return df
 
 
df = create_tpch_lineitem_dataframe(dbfs_path)
df.count()

image.png

TPC-H のデータの Parquet 形式への変換

parquet_file_paths = {
    "base": [
        f"{dbfs_path}/lineitem.tbl.1",
        f"{dbfs_path}/lineitem.tbl.2",
        f"{dbfs_path}/lineitem.tbl.3",
        f"{dbfs_path}/lineitem.tbl.4",
        f"{dbfs_path}/lineitem.tbl.5",
        f"{dbfs_path}/lineitem.tbl.6",
        f"{dbfs_path}/lineitem.tbl.7",
        f"{dbfs_path}/lineitem.tbl.8",
        f"{dbfs_path}/lineitem.tbl.9",
        f"{dbfs_path}/lineitem.tbl.10",
        f"{dbfs_path}/lineitem.tbl.11",
        f"{dbfs_path}/lineitem.tbl.12",
        f"{dbfs_path}/lineitem.tbl.13",
        f"{dbfs_path}/lineitem.tbl.14",
        f"{dbfs_path}/lineitem.tbl.15",
    ],
    "wiret_01": [
        f"{dbfs_path}/lineitem.tbl.1",
        f"{dbfs_path}/lineitem.tbl.16",
    ],
    "wiret_02": [
        f"{dbfs_path}/lineitem.tbl.4",
        f"{dbfs_path}/lineitem.tbl.17",
    ],
    "wiret_03": [
        f"{dbfs_path}/lineitem.tbl.7",
        f"{dbfs_path}/lineitem.tbl.18",
    ],
    "wiret_04": [
        f"{dbfs_path}/lineitem.tbl.10",
        f"{dbfs_path}/lineitem.tbl.19",
    ],
    "wiret_05": [
        f"{dbfs_path}/lineitem.tbl.13",
        f"{dbfs_path}/lineitem.tbl.20",
    ],
}

image.png

base_dbfs_path = "dbfs:/FileStore/load_test"
 
parqeut_dbfs_path = f"{base_dbfs_path}/parquet"

def create_tpch_lineitem_dataframe(
    filepath,
):
 
    schema = """
      L_ORDERKEY    INTEGER ,
      L_PARTKEY     INTEGER ,
      L_SUPPKEY     INTEGER ,
      L_LINENUMBER  INTEGER ,
      L_QUANTITY    DECIMAL(15,2) ,
      L_EXTENDEDPRICE  DECIMAL(15,2) ,
      L_DISCOUNT    DECIMAL(15,2) ,
      L_TAX         DECIMAL(15,2) ,
      L_RETURNFLAG  STRING ,
      L_LINESTATUS  STRING ,
      L_SHIPDATE    DATE ,
      L_COMMITDATE  DATE ,
      L_RECEIPTDATE DATE ,
      L_SHIPINSTRUCT STRING ,
      L_SHIPMODE     STRING ,
      L_COMMENT      STRING
    """
 
    df = spark.read.format("csv").schema(schema).option("sep", "|").load(filepath)
    return df

image.png

for path, src_path in parquet_file_paths.items():
    write_path = f"{parqeut_dbfs_path}/{path}"
 
    # 指定パスのデータフレームを作成
    raw_df = create_tpch_lineitem_dataframe(src_path)
 
    # Synapse docs に記載があるパラメータを設定
    spark.conf.set("spark.sql.parquet.writeLegacyFormat", "true")
 
    # parqeut 形式で書き込み
    raw_df.write.format("parquet").mode("overwrite").save(write_path)
 
    spark.conf.set("spark.sql.parquet.writeLegacyFormat", "false")
 
    print(f"{path} is finished.")

image.png

# ファイル一覧を表示
file_list = dbutils.fs.ls(f"{parqeut_dbfs_path}")
display(file_list)

image.png

Parquet 形式のTPC-H のデータをストレージへの書き込み

storage_account = "synapsetest123456689343"
container_name = "synapse"
access_key = "09jIsfx97uwoW1Vc+CXalhXCuf2ONI+gJB0YyWXh5mGKVi8HmX8Q7HFV8cdXby7o9O4j181048B0+AStJWf7ug=="
spark.conf.set(
    f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", access_key
)
 
# SAS キーで実施する場合には下記で実施
# storage_account = 'synapsetest123345'
# container_name = 'synapse-load'
# token = '?sv=2021-06-08&ss=b&srt=sco&sp=rwdlacyx&se=2022-11-04T18:45:33Z&st=2022-10-28T10:45:33Z&spr=https&sig=XJLZh9FVzHZvia2VYlTzfKHa7%2FYcKljfo%2F9z3Dx'
# spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "SAS")
# spark.conf.set(f"fs.azure.sas.token.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
# spark.conf.set(f"fs.azure.sas.fixed.token.{storage_account}.dfs.core.windows.net", token)

image.png

# 接続確認
storage_dir = f"abfss://{container_name}@{storage_account}.dfs.core.windows.net"
dbutils.fs.ls(storage_dir)

image.png

tpch_dir = f"{storage_dir}/tpch"

# ストレージ上のディレクトリを初期化
dbutils.fs.rm(tpch_dir, True)
 
# `tpch` ディレクトリをストレージ側に作成後、dbfs 上の parquet ファイルをコピー
dbutils.fs.mkdirs(tpch_dir)
dbutils.fs.cp(parqeut_dbfs_path, tpch_dir, True)
 
# ストレージ上のファイルチェック
dbutils.fs.ls(tpch_dir)

image.png

Synapse における検証用 SQL の生成

変数をセット

storage_account = "synapsetest123456689343"
container_name = "synapse"
access_key = "09jIsfx97uwoW1Vc+CXalhXCuf2ONI+gJB0YyWXh5mGKVi8HmX8Q7HFV8cdXby7o9O4j181048B0+AStJWf7ug=="
spark.conf.set(
    f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", access_key
)

image.png

# 接続確認
storage_dir = f"abfss://{container_name}@{storage_account}.dfs.core.windows.net"
dbutils.fs.ls(storage_dir)

image.png

data_dir = "tpch"
base_path = f"{data_dir}/base/*.parquet"
wiret_01 = f"{data_dir}/wiret_01/*.parquet"
wiret_02 = f"{data_dir}/wiret_02/*.parquet"
wiret_03 = f"{data_dir}/wiret_03/*.parquet"
wiret_04 = f"{data_dir}/wiret_04/*.parquet"
wiret_05 = f"{data_dir}/wiret_05/*.parquet"

image.png

ワークロードを作成

# ユーザー名を取得
sql = """
SELECT user_name();
"""
 
print(sql)

image.png

# ユーザー名を設定
user_name = "dbo"

image.png

sql = f"""
CREATE WORKLOAD GROUP DataLoads
WITH ( 
    MIN_PERCENTAGE_RESOURCE = 0
    ,CAP_PERCENTAGE_RESOURCE = 100
    ,REQUEST_MIN_RESOURCE_GRANT_PERCENT = 100
);
 
CREATE WORKLOAD CLASSIFIER [wgcELTLogin]
WITH (
    WORKLOAD_GROUP = 'DataLoads'
    ,MEMBERNAME = '{user_name}'
);
"""
 
print(sql)

image.png

テーブルを作成

複数列のハッシュ分散テーブルを作成
ddl = """
-- ハッシュ分散テーブル
ALTER DATABASE SCOPED CONFIGURATION SET DW_COMPATIBILITY_LEVEL = 9000;
 
IF OBJECT_ID (N'dbo.LINEITEM', N'U') IS NOT NULL  
DROP TABLE dbo.LINEITEM;
 
CREATE TABLE dbo.LINEITEM
(
    L_ORDERKEY      integer
   ,L_PARTKEY       integer
   ,L_SUPPKEY       integer
   ,L_LINENUMBER    integer
   ,L_QUANTITY      NUMERIC(15,2)
   ,L_EXTENDEDPRICE NUMERIC(15,2)
   ,L_DISCOUNT      NUMERIC(15,2)
   ,L_TAX           NUMERIC(15,2)
   ,L_RETURNFLAG    varchar(1)
   ,L_LINESTATUS    varchar(1)
   ,L_SHIPDATE      date
   ,L_COMMITDATE    date
   ,L_RECEIPTDATE   date
   ,L_SHIPINSTRUCT  varchar(25)
   ,L_SHIPMODE      varchar(10)
   ,L_COMMENT       varchar(44)
)
WITH
(
    DISTRIBUTION = HASH (L_ORDERKEY, L_LINENUMBER),
    HEAP
)
;
"""
 
print(ddl)

image.png

単一列のハッシュ分散テーブルを作成
ddl = """
IF OBJECT_ID (N'dbo.LINEITEM', N'U') IS NOT NULL  
DROP TABLE dbo.LINEITEM;
 
CREATE TABLE dbo.LINEITEM
(
    L_ORDERKEY      integer
   ,L_PARTKEY       integer
   ,L_SUPPKEY       integer
   ,L_LINENUMBER    integer
   ,L_QUANTITY      NUMERIC(15,2)
   ,L_EXTENDEDPRICE NUMERIC(15,2)
   ,L_DISCOUNT      NUMERIC(15,2)
   ,L_TAX           NUMERIC(15,2)
   ,L_RETURNFLAG    varchar(1)
   ,L_LINESTATUS    varchar(1)
   ,L_SHIPDATE      date
   ,L_COMMITDATE    date
   ,L_RECEIPTDATE   date
   ,L_SHIPINSTRUCT  varchar(25)
   ,L_SHIPMODE      varchar(10)
   ,L_COMMENT       varchar(44)
)
WITH
(
    DISTRIBUTION = HASH (L_LINENUMBER),
    HEAP
)
;
"""
 
print(ddl)

image.png

ラウンドロビン分散テーブルを作成
ddl = """
-- ラウンドロビンテーブル
IF OBJECT_ID (N'dbo.LINEITEM', N'U') IS NOT NULL  
DROP TABLE dbo.LINEITEM;
CREATE TABLE dbo.LINEITEM
(
    L_ORDERKEY      integer
   ,L_PARTKEY       integer
   ,L_SUPPKEY       integer
   ,L_LINENUMBER    integer
   ,L_QUANTITY      NUMERIC(15,2)
   ,L_EXTENDEDPRICE NUMERIC(15,2)
   ,L_DISCOUNT      NUMERIC(15,2)
   ,L_TAX           NUMERIC(15,2)
   ,L_RETURNFLAG    varchar(1)
   ,L_LINESTATUS    varchar(1)
   ,L_SHIPDATE      date
   ,L_COMMITDATE    date
   ,L_RECEIPTDATE   date
   ,L_SHIPINSTRUCT  varchar(25)
   ,L_SHIPMODE      varchar(10)
   ,L_COMMENT       varchar(44)
)
WITH
(
    DISTRIBUTION = ROUND_ROBIN,
    HEAP
)  
;
"""
 
print(ddl)

image.png

初期データをロード

sql = f"""
-- 44991725 レコード
IF OBJECT_ID(N'tempdb..#LINEITEM', N'U') IS NOT NULL
DROP TABLE #LINEITEM;
 
TRUNCATE TABLE dbo.LINEITEM;
 
COPY INTO #LINEITEM
FROM 'https://{storage_account}.blob.core.windows.net/{container_name}/{base_path}'
WITH (
    FILE_TYPE = 'PARQUET',
    CREDENTIAL = (IDENTITY= 'Storage Account Key', SECRET='{access_key}'),
    AUTO_CREATE_TABLE = 'ON'
)
;
 
INSERT INTO dbo.LINEITEM
SELECT * FROM #LINEITEM
 
select count(*) FROM dbo.LINEITEM;
"""
 
print(sql)

image.png

Merge 処理

1 回目のデータを更新
sql = f"""
IF OBJECT_ID(N'tempdb..#LINEITEM', N'U') IS NOT NULL
DROP TABLE #LINEITEM;
 
COPY INTO #LINEITEM
FROM 'https://{storage_account}.blob.core.windows.net/{container_name}/{wiret_01}'
WITH (
    FILE_TYPE = 'PARQUET',
    CREDENTIAL = (IDENTITY= 'Storage Account Key', SECRET='{access_key}'),
    AUTO_CREATE_TABLE = 'ON'
)
;
 
MERGE INTO 
    dbo.LINEITEM AS tgt  
USING
    #LINEITEM AS src  
ON 
(
    tgt.L_ORDERKEY = src.L_ORDERKEY
    and tgt.L_LINENUMBER = src.L_LINENUMBER
)
 WHEN MATCHED THEN
    UPDATE 
        SET
        tgt.L_ORDERKEY = src.L_ORDERKEY
        ,tgt.L_PARTKEY = src.L_PARTKEY
        ,tgt.L_SUPPKEY = src.L_SUPPKEY
        ,tgt.L_LINENUMBER = src.L_LINENUMBER
        ,tgt.L_QUANTITY = src.L_QUANTITY
        ,tgt.L_EXTENDEDPRICE = src.L_EXTENDEDPRICE
        ,tgt.L_DISCOUNT = src.L_DISCOUNT
        ,tgt.L_TAX = src.L_TAX
        ,tgt.L_RETURNFLAG = src.L_RETURNFLAG
        ,tgt.L_LINESTATUS = src.L_LINESTATUS
        ,tgt.L_SHIPDATE = src.L_SHIPDATE
        ,tgt.L_COMMITDATE = src.L_COMMITDATE
        ,tgt.L_RECEIPTDATE = src.L_RECEIPTDATE
        ,tgt.L_SHIPINSTRUCT = src.L_SHIPINSTRUCT
        ,tgt.L_SHIPMODE = src.L_SHIPMODE
        ,tgt.L_COMMENT = src.L_COMMENT
WHEN NOT MATCHED THEN
    INSERT
    (
        L_ORDERKEY
        ,L_PARTKEY
        ,L_SUPPKEY
        ,L_LINENUMBER
        ,L_QUANTITY
        ,L_EXTENDEDPRICE
        ,L_DISCOUNT
        ,L_TAX
        ,L_RETURNFLAG
        ,L_LINESTATUS
        ,L_SHIPDATE
        ,L_COMMITDATE
        ,L_RECEIPTDATE
        ,L_SHIPINSTRUCT
        ,L_SHIPMODE
        ,L_COMMENT
    )
    VALUES 
    (
        L_ORDERKEY
        ,L_PARTKEY
        ,L_SUPPKEY
        ,L_LINENUMBER
        ,L_QUANTITY
        ,L_EXTENDEDPRICE
        ,L_DISCOUNT
        ,L_TAX
        ,L_RETURNFLAG
        ,L_LINESTATUS
        ,L_SHIPDATE
        ,L_COMMITDATE
        ,L_RECEIPTDATE
        ,L_SHIPINSTRUCT
        ,L_SHIPMODE
        ,L_COMMENT
    )
;
"""
 
print(sql)

image.png

2 回目のデータを更新
sql = f"""
IF OBJECT_ID(N'tempdb..#LINEITEM', N'U') IS NOT NULL
DROP TABLE #LINEITEM;
 
COPY INTO #LINEITEM
FROM 'https://{storage_account}.blob.core.windows.net/{container_name}/{wiret_02}'
WITH (
    FILE_TYPE = 'PARQUET',
    CREDENTIAL = (IDENTITY= 'Storage Account Key', SECRET='{access_key}'),
    AUTO_CREATE_TABLE = 'ON'
)
;
 
MERGE INTO 
    dbo.LINEITEM AS tgt  
USING
    #LINEITEM AS src  
ON 
(
    tgt.L_ORDERKEY = src.L_ORDERKEY
    and tgt.L_LINENUMBER = src.L_LINENUMBER
)
 WHEN MATCHED THEN
    UPDATE 
        SET
        tgt.L_ORDERKEY = src.L_ORDERKEY
        ,tgt.L_PARTKEY = src.L_PARTKEY
        ,tgt.L_SUPPKEY = src.L_SUPPKEY
        ,tgt.L_LINENUMBER = src.L_LINENUMBER
        ,tgt.L_QUANTITY = src.L_QUANTITY
        ,tgt.L_EXTENDEDPRICE = src.L_EXTENDEDPRICE
        ,tgt.L_DISCOUNT = src.L_DISCOUNT
        ,tgt.L_TAX = src.L_TAX
        ,tgt.L_RETURNFLAG = src.L_RETURNFLAG
        ,tgt.L_LINESTATUS = src.L_LINESTATUS
        ,tgt.L_SHIPDATE = src.L_SHIPDATE
        ,tgt.L_COMMITDATE = src.L_COMMITDATE
        ,tgt.L_RECEIPTDATE = src.L_RECEIPTDATE
        ,tgt.L_SHIPINSTRUCT = src.L_SHIPINSTRUCT
        ,tgt.L_SHIPMODE = src.L_SHIPMODE
        ,tgt.L_COMMENT = src.L_COMMENT
WHEN NOT MATCHED THEN
    INSERT
    (
        L_ORDERKEY
        ,L_PARTKEY
        ,L_SUPPKEY
        ,L_LINENUMBER
        ,L_QUANTITY
        ,L_EXTENDEDPRICE
        ,L_DISCOUNT
        ,L_TAX
        ,L_RETURNFLAG
        ,L_LINESTATUS
        ,L_SHIPDATE
        ,L_COMMITDATE
        ,L_RECEIPTDATE
        ,L_SHIPINSTRUCT
        ,L_SHIPMODE
        ,L_COMMENT
    )
    VALUES 
    (
        L_ORDERKEY
        ,L_PARTKEY
        ,L_SUPPKEY
        ,L_LINENUMBER
        ,L_QUANTITY
        ,L_EXTENDEDPRICE
        ,L_DISCOUNT
        ,L_TAX
        ,L_RETURNFLAG
        ,L_LINESTATUS
        ,L_SHIPDATE
        ,L_COMMITDATE
        ,L_RECEIPTDATE
        ,L_SHIPINSTRUCT
        ,L_SHIPMODE
        ,L_COMMENT
    )
;
"""
 
print(sql)

image.png

3 回目のデータを更新
sql = f"""
IF OBJECT_ID(N'tempdb..#LINEITEM', N'U') IS NOT NULL
DROP TABLE #LINEITEM;
 
COPY INTO #LINEITEM
FROM 'https://{storage_account}.blob.core.windows.net/{container_name}/{wiret_03}'
WITH (
    FILE_TYPE = 'PARQUET',
    CREDENTIAL = (IDENTITY= 'Storage Account Key', SECRET='{access_key}'),
    AUTO_CREATE_TABLE = 'ON'
)
;
 
MERGE INTO 
    dbo.LINEITEM AS tgt  
USING
    #LINEITEM AS src  
ON 
(
    tgt.L_ORDERKEY = src.L_ORDERKEY
    and tgt.L_LINENUMBER = src.L_LINENUMBER
)
 WHEN MATCHED THEN
    UPDATE 
        SET
        tgt.L_ORDERKEY = src.L_ORDERKEY
        ,tgt.L_PARTKEY = src.L_PARTKEY
        ,tgt.L_SUPPKEY = src.L_SUPPKEY
        ,tgt.L_LINENUMBER = src.L_LINENUMBER
        ,tgt.L_QUANTITY = src.L_QUANTITY
        ,tgt.L_EXTENDEDPRICE = src.L_EXTENDEDPRICE
        ,tgt.L_DISCOUNT = src.L_DISCOUNT
        ,tgt.L_TAX = src.L_TAX
        ,tgt.L_RETURNFLAG = src.L_RETURNFLAG
        ,tgt.L_LINESTATUS = src.L_LINESTATUS
        ,tgt.L_SHIPDATE = src.L_SHIPDATE
        ,tgt.L_COMMITDATE = src.L_COMMITDATE
        ,tgt.L_RECEIPTDATE = src.L_RECEIPTDATE
        ,tgt.L_SHIPINSTRUCT = src.L_SHIPINSTRUCT
        ,tgt.L_SHIPMODE = src.L_SHIPMODE
        ,tgt.L_COMMENT = src.L_COMMENT
WHEN NOT MATCHED THEN
    INSERT
    (
        L_ORDERKEY
        ,L_PARTKEY
        ,L_SUPPKEY
        ,L_LINENUMBER
        ,L_QUANTITY
        ,L_EXTENDEDPRICE
        ,L_DISCOUNT
        ,L_TAX
        ,L_RETURNFLAG
        ,L_LINESTATUS
        ,L_SHIPDATE
        ,L_COMMITDATE
        ,L_RECEIPTDATE
        ,L_SHIPINSTRUCT
        ,L_SHIPMODE
        ,L_COMMENT
    )
    VALUES 
    (
        L_ORDERKEY
        ,L_PARTKEY
        ,L_SUPPKEY
        ,L_LINENUMBER
        ,L_QUANTITY
        ,L_EXTENDEDPRICE
        ,L_DISCOUNT
        ,L_TAX
        ,L_RETURNFLAG
        ,L_LINESTATUS
        ,L_SHIPDATE
        ,L_COMMITDATE
        ,L_RECEIPTDATE
        ,L_SHIPINSTRUCT
        ,L_SHIPMODE
        ,L_COMMENT
    )
;
"""
 
print(sql)

image.png

4 回目のデータを更新
sql = f"""
IF OBJECT_ID(N'tempdb..#LINEITEM', N'U') IS NOT NULL
DROP TABLE #LINEITEM;
 
COPY INTO #LINEITEM
FROM 'https://{storage_account}.blob.core.windows.net/{container_name}/{wiret_04}'
WITH (
    FILE_TYPE = 'PARQUET',
    CREDENTIAL = (IDENTITY= 'Storage Account Key', SECRET='{access_key}'),
    AUTO_CREATE_TABLE = 'ON'
)
;
 
MERGE INTO 
    dbo.LINEITEM AS tgt  
USING
    #LINEITEM AS src  
ON 
(
    tgt.L_ORDERKEY = src.L_ORDERKEY
    and tgt.L_LINENUMBER = src.L_LINENUMBER
)
 WHEN MATCHED THEN
    UPDATE 
        SET
        tgt.L_ORDERKEY = src.L_ORDERKEY
        ,tgt.L_PARTKEY = src.L_PARTKEY
        ,tgt.L_SUPPKEY = src.L_SUPPKEY
        ,tgt.L_LINENUMBER = src.L_LINENUMBER
        ,tgt.L_QUANTITY = src.L_QUANTITY
        ,tgt.L_EXTENDEDPRICE = src.L_EXTENDEDPRICE
        ,tgt.L_DISCOUNT = src.L_DISCOUNT
        ,tgt.L_TAX = src.L_TAX
        ,tgt.L_RETURNFLAG = src.L_RETURNFLAG
        ,tgt.L_LINESTATUS = src.L_LINESTATUS
        ,tgt.L_SHIPDATE = src.L_SHIPDATE
        ,tgt.L_COMMITDATE = src.L_COMMITDATE
        ,tgt.L_RECEIPTDATE = src.L_RECEIPTDATE
        ,tgt.L_SHIPINSTRUCT = src.L_SHIPINSTRUCT
        ,tgt.L_SHIPMODE = src.L_SHIPMODE
        ,tgt.L_COMMENT = src.L_COMMENT
WHEN NOT MATCHED THEN
    INSERT
    (
        L_ORDERKEY
        ,L_PARTKEY
        ,L_SUPPKEY
        ,L_LINENUMBER
        ,L_QUANTITY
        ,L_EXTENDEDPRICE
        ,L_DISCOUNT
        ,L_TAX
        ,L_RETURNFLAG
        ,L_LINESTATUS
        ,L_SHIPDATE
        ,L_COMMITDATE
        ,L_RECEIPTDATE
        ,L_SHIPINSTRUCT
        ,L_SHIPMODE
        ,L_COMMENT
    )
    VALUES 
    (
        L_ORDERKEY
        ,L_PARTKEY
        ,L_SUPPKEY
        ,L_LINENUMBER
        ,L_QUANTITY
        ,L_EXTENDEDPRICE
        ,L_DISCOUNT
        ,L_TAX
        ,L_RETURNFLAG
        ,L_LINESTATUS
        ,L_SHIPDATE
        ,L_COMMITDATE
        ,L_RECEIPTDATE
        ,L_SHIPINSTRUCT
        ,L_SHIPMODE
        ,L_COMMENT
    )
;
"""
 
print(sql)

image.png

5 回目のデータを更新
sql = f"""
IF OBJECT_ID(N'tempdb..#LINEITEM', N'U') IS NOT NULL
DROP TABLE #LINEITEM;
 
COPY INTO #LINEITEM
FROM 'https://{storage_account}.blob.core.windows.net/{container_name}/{wiret_05}'
WITH (
    FILE_TYPE = 'PARQUET',
    CREDENTIAL = (IDENTITY= 'Storage Account Key', SECRET='{access_key}'),
    AUTO_CREATE_TABLE = 'ON'
)
;
 
MERGE INTO 
    dbo.LINEITEM AS tgt  
USING
    #LINEITEM AS src  
ON 
(
    tgt.L_ORDERKEY = src.L_ORDERKEY
    and tgt.L_LINENUMBER = src.L_LINENUMBER
)
 WHEN MATCHED THEN
    UPDATE 
        SET
        tgt.L_ORDERKEY = src.L_ORDERKEY
        ,tgt.L_PARTKEY = src.L_PARTKEY
        ,tgt.L_SUPPKEY = src.L_SUPPKEY
        ,tgt.L_LINENUMBER = src.L_LINENUMBER
        ,tgt.L_QUANTITY = src.L_QUANTITY
        ,tgt.L_EXTENDEDPRICE = src.L_EXTENDEDPRICE
        ,tgt.L_DISCOUNT = src.L_DISCOUNT
        ,tgt.L_TAX = src.L_TAX
        ,tgt.L_RETURNFLAG = src.L_RETURNFLAG
        ,tgt.L_LINESTATUS = src.L_LINESTATUS
        ,tgt.L_SHIPDATE = src.L_SHIPDATE
        ,tgt.L_COMMITDATE = src.L_COMMITDATE
        ,tgt.L_RECEIPTDATE = src.L_RECEIPTDATE
        ,tgt.L_SHIPINSTRUCT = src.L_SHIPINSTRUCT
        ,tgt.L_SHIPMODE = src.L_SHIPMODE
        ,tgt.L_COMMENT = src.L_COMMENT
WHEN NOT MATCHED THEN
    INSERT
    (
        L_ORDERKEY
        ,L_PARTKEY
        ,L_SUPPKEY
        ,L_LINENUMBER
        ,L_QUANTITY
        ,L_EXTENDEDPRICE
        ,L_DISCOUNT
        ,L_TAX
        ,L_RETURNFLAG
        ,L_LINESTATUS
        ,L_SHIPDATE
        ,L_COMMITDATE
        ,L_RECEIPTDATE
        ,L_SHIPINSTRUCT
        ,L_SHIPMODE
        ,L_COMMENT
    )
    VALUES 
    (
        L_ORDERKEY
        ,L_PARTKEY
        ,L_SUPPKEY
        ,L_LINENUMBER
        ,L_QUANTITY
        ,L_EXTENDEDPRICE
        ,L_DISCOUNT
        ,L_TAX
        ,L_RETURNFLAG
        ,L_LINESTATUS
        ,L_SHIPDATE
        ,L_COMMITDATE
        ,L_RECEIPTDATE
        ,L_SHIPINSTRUCT
        ,L_SHIPMODE
        ,L_COMMENT
    )
;
"""
 
print(sql)

image.png

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2