■本記事の目的
Snowflakeに、構造化データで記載されたExcelファイルを直接テーブルにロードする検証を行うため。
SnowflakeのPythonワークシートにて、SnowPark、PandasなどのPythonライブラリによるプログラム検証を行うため。
■前回の記事
Snowflake Excelデータロード:Pythonによるロード(単一ファイル)
■概要
Snowflake S3データロード:③外部ステージ から派生して検証する。
ストレージ統合、外部ステージはそのまま流用で、検証を行う。
今回は、特定フォルダ内の複数ファイルをロードすることを行う。
※ブック内の複数シートは考慮していません。
■Pythonライブラリ
Pythonのコマンドは、以下をインストールします。
・snowpark
・pandas
・openpyxl
・sys
・io
■検証テストデータ
Excel内のテストデータは、以下のようにA1セルから始まる構造化データの状態で、
xlsxの拡張子でS3に配置します。
※1行目はヘッダー
件数は、
ブック1は、ヘッダー除きの17974件です。
ブック2は、ヘッダー除きの17968件です。
ロード後の合計は、35942件の想定です。
※2ファイル処理できたことが判別できるように件数に差をつけています。
■ストアドプロシージャ(Pythonプログラム)
ストアドプロシージャで定義します。※任意の名前で良いです。
ロードするテーブル名は、プログラム内で決定しておく必要があります。
ストアドプロシージャとテーブルの所属のデータベース・スキーマは同一である必要があります。
引数に、外部ステージ名とそのパスサフィックスを設定しておきます。
このプログラムは、特定フォルダの複数ファイルをロードする仕様です。
※単一ファイルでも可能です。
CREATE OR REPLACE PROCEDURE EXEC_EXCEL_SP(stage_name VARCHAR,suffix VARCHAR)
RETURNS NUMBER
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','pandas','openpyxl')
HANDLER = 'main'
EXECUTE AS OWNER
AS
$$
#処理に必要なライブラリをインポートしています。
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
from snowflake.snowpark.files import SnowflakeFile
import openpyxl as openpyxl
from openpyxl import load_workbook
import pandas as pd
import sys #エンコード調整のため追加
import io #エンコード調整のため追加
#エンコード調整のため追加
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
#MAIN定義で、後続で定義する関数を順次実行するように定義する。
def main(session: snowpark.Session,stage_name:str,suffix:str):
truncate(session)
stage_name_sl=fc_stage_name_sl(stage_name)
suffix_sl=fc_suffix_sl(suffix)
wild_suffix=fc_wild_suffix(suffix)
replace_suffix=fc_replace_suffix(suffix)
pandas_file_name_list=fc_pandas_file_name_list(session,stage_name_sl,suffix_sl,wild_suffix,replace_suffix)
fc_excel_import(session,stage_name,pandas_file_name_list)
result=fc_count(session)
return result #ロード件数を返す。
#以下、各処理を定義していく。
#洗替えるために、TRUNCATE処理を定義する。:更新方式によって適宜調整する。
def truncate(session: snowpark.Session):
session.sql("TRUNCATE TEST_TB").collect()
#引数への文字列結合を定義する。:コード冗長防止のため
def fc_stage_name_sl(stage_name:str):
return stage_name+"/"
def fc_suffix_sl(suffix:str):
return suffix+"/"
def fc_wild_suffix(suffix:str):
return ".*/"+suffix+"/"
def fc_replace_suffix(suffix:str):
return "/"+suffix+"/"
#S3に配置されているxlsxファイルのパスリストを照会する。
#snowparkデータフレームをpandasデータフレームに変換
#ファイルパスをファイル名リストに変換する。
#ファイル名リストをlist形式に変換する。
def fc_pandas_file_name_list(session:snowpark.Session,stage_name_sl:str,suffix_sl:str,wild_suffix:str,replace_suffix:str):
query_file_list="LIST "+stage_name_sl+suffix_sl #LIST構文を作成する。
file_list=session.sql(query_file_list).collect() #LIST構文を実行する。
file_name=session.sql("select $1 AS FILE_NAME from table(result_scan(LAST_QUERY_ID()))") #LIST結果からファイルパスを取得する。
change_pandas_file_name = file_name.to_pandas() #pandas変換
pandas_file_name = change_pandas_file_name.replace(wild_suffix,replace_suffix,regex=True) #ファイル名のみ抽出する。
pandas_file_name_list = pandas_file_name["FILE_NAME"].tolist() #list形式にする。
return pandas_file_name_list
#file_urlからexcelファイルを展開する。
#excelファイルデータをデータフレームにする。
#データフレームをテーブルにINSERTする。
def table_append(session, file_url_use):
with SnowflakeFile.open(file_url_use, 'rb') as f:
workbook = load_workbook(f)
sheet = workbook.active
data = sheet.values
# 最初の行をヘッダーとして設定
columns = next(data)[0:]
# 2行目以降のデータに基づいてDataFrameを作成する
df = pd.DataFrame(data, columns=columns)
df2 = session.create_dataframe(df)
df2.write.mode("append").save_as_table("<テーブル>")
#本関数内部に、前段で定義して、INSERT処理を内包する。
#list形式のファイル名リストを再度データフレーム化する。
#データフレームのファイル名を1行目から順に取得する。
#取得したファイル名とステージ名を元に、URL変換する。
#URLをtable_append()関数に代入して、処理する。
#次の行があれば繰り返し処理が発生する。最大行まで行けば処理終了。
def fc_excel_import(session:snowpark.Session,stage_name:str,pandas_file_name_list):
df_pandas_file_name=pd.DataFrame(pandas_file_name_list)
max_num=len(df_pandas_file_name)-1 #行カウントから行番を取る
row_num=0 #処理行開始を変数に代入
while (row_num <= max_num): #繰り返し処理開始(最大行まで繰り返し処理をする。)
file_name=df_pandas_file_name.iloc[row_num,0] #行番からファイル名を取得する。
query_url="select build_scoped_file_url("+stage_name+",'"+file_name+"') AS URL" #ファイル名とステージ名からファイルパスURLを生成する。
file_url_get=session.sql(query_url) #ファイルパスURLの値を取得する。
pandas_file_url=file_url_get.to_pandas() #pandas変換
file_url_use=pandas_file_url.iloc[0,0] #値にする。
table_append(session,file_url_use) #URL値を代入する。
row_num += 1 #行番を追加する。
return True
def fc_count(session: snowpark.Session):
count_row=session.sql("SELECT COUNT(*) FROM <テーブル>") #ロード後のテーブル件数をカウントする。
pandas_count_row=count_row.to_pandas()
result=pandas_count_row.iloc[0,0]
return result
if __name__ == "__main__":
main()
$$;
■ストアドプロシージャを実行する。
実行する前にテーブルを0件にしておきます。
TRUNCATE TABLE <テーブル>;
0件確認
SELECT COUNT(*) FROM <テーブル>;
外部ステージ名とフォルダ名(パスサフィックス)をセットして実行します。
CALL EXEC_EXCEL_SP(stage_name => '@<外部ステージ>',suffix => '<フォルダ>');
実行結果は正常でした。
件数も想定通りです。
※今回、戻り値でカウント設定してました。
■考慮点
今回もコスト・パフォーマンス考慮してみてましたが、1ファイルを繰り返し処理するのに10秒くらいのペースになります。
COPY INTOだとCSVを複数ファイル並列で効率的に処理できるので、処理時間はかなりデメリットに感じました。
ブックがシートに分割されているデータなども含めると、繰り返し処理を更にネストさせて開発する必要があったりなど、運用面を考えると本ロード処理については、前提が確立されているケースに限定されると思われます。
■参考文献
今回のロード処理の参考で一番シンプルで分かりやすい参考でした。
https://medium.com/snowflake/simplify-data-ingestion-with-snowpark-python-file-access-f2bc0e4cd887
openpyxlで不明な構文があったための参考です。
https://www.soudegesu.com/en/post/python/pandas-with-openpyxl/
■あとがき
複数ファイルも繰り返し処理によりロードできることも判明しました。
※ただし、運用としてお勧めはできません。
筆者がPython初心者のため、拙いコードと定義構成で申し訳ないですが、
Pythonの構造を理解するのにも今回大変役に立った検証だと思います。
引数をもう少し考慮すれば、他のテーブルへのExcelロードを流用できるかもしれません。