概要
Snowflakeでロール毎のお手製データカタログを作成する方法で紹介したデータカタログについて,テーブルのメタ情報も追加した.また,エラー処理や試しでスキーマ,テーブル,カラムの1つに対して試用できるようにした.
サンプルコード
import snowflake.snowpark as snowpark
import pandas as pd
import sys
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
def main(session: snowpark.Session):
# 設定: 以下のフラグで出力範囲を切り替える
process_all_schemas = False # True: 全スキーマ, False: 1つ目のスキーマのみ
process_all_tables = False # True: 全テーブル, False: 1つ目のテーブルのみ
process_all_columns = False # True: 全カラム, False: 1つ目のカラムのみ
session.use_warehouse("<WAREHOUSE_NAME>")
database = "<DATABASE_NAME>" # データベースを固定
role_list = ["<ROLE_NAME_1>", "<ROLENAME_2>"]
# データカタログのカラム定義
data_catalog = pd.DataFrame(columns=[
"role", "database", "schema", "table", "table_metadata",
"column", "count_r","count_u","count_null","dtype","column_metadata"
])
for role in role_list:
session.use_role(role)
session.use_database(database)
schemas = session.sql("SHOW SCHEMAS").collect()
# スキーマ処理: 全スキーマ or 1つ目のスキーマのみ
schemas_to_process = schemas if process_all_schemas else schemas[:1]
for schema_row in schemas_to_process:
schema = schema_row["name"]
# INFORMATION_SCHEMA をスキップ
if schema.upper() == "INFORMATION_SCHEMA":
continue
session.use_schema(schema)
tables = session.sql("SHOW TABLES").collect()
# テーブル処理: 全テーブル or 1つ目のテーブルのみ
tables_to_process = tables if process_all_tables else tables[:1]
for table_row in tables_to_process:
table = table_row["name"]
full_table_name = f"{database}.{schema}.{table}"
# テーブル存在確認
try:
session.sql(f"SELECT 1 FROM {full_table_name} LIMIT 1").collect()
except Exception as e:
print(f"テーブル {full_table_name} にアクセスできません: {e}")
continue
# テーブルの説明 (table_meta)
try:
table_meta_d = f"テーブル名は{table}"
table_meta_sample_sql = session.sql(f"SELECT * FROM {full_table_name} LIMIT 10")
table_meta_sample_df = table_meta_sample_sql.to_pandas()
table_meta_d += table_meta_sample_df.to_string(index=False)
table_metadata_sql = f"SELECT snowflake.cortex.complete('mistral-7b', '{table_meta_d}に含まれるテーブルの情報を端的に20文字以内で日本語で説明し、このとき{table_meta_d}に含まれる具体的なデータの中身は出力分に含めないようにしてください') AS response"
table_metadata_result = session.sql(table_metadata_sql)
try:
table_metadata_result_df = table_metadata_result.to_pandas()
table_meta = table_metadata_result_df.iloc[0][0]
except:
pass
except Exception as e:
print(f"テーブル説明取得エラー: {e}")
table_meta = "不明"
# レコード数取得
try:
df_table = session.table(full_table_name)
record_count = df_table.count()
except Exception as e:
print(f"レコード数取得エラー: {e}")
record_count = None
# カラム情報の取得
try:
columns_info = session.sql(f"SHOW COLUMNS IN TABLE {full_table_name}").collect()
if not columns_info:
print(f"テーブル {full_table_name} にカラム情報がありません。")
continue
# カラム処理: 全カラム or 1つ目のカラムのみ
columns_to_process = columns_info if process_all_columns else columns_info[:1]
for column_row in columns_to_process:
column = column_row["column_name"]
data_type = column_row["data_type"]
# ユニーク数
try:
unique_count = df_table.select(column).distinct().count()
except Exception as e:
print(f"ユニーク数取得エラー: {e}")
unique_count = None
# NULL 値の数
try:
null_count_sql = f"SELECT COUNT(*) - COUNT({column}) AS null_count FROM {full_table_name}"
null_count_result = session.sql(null_count_sql).collect()
null_count = null_count_result[0]["NULL_COUNT"] if null_count_result else 0
except Exception as e:
print(f"NULL数取得エラー: {e}")
null_count = None
# 型情報の取得
try:
sample_df = df_table.limit(100).to_pandas()
dtype = str(sample_df[column].dtype)
except Exception as e:
print(f"型情報取得エラー: {e}")
dtype = "不明"
# カラム説明 (column_meta)
try:
meta_d = f"カラム名は{column}"
meta_sample_sql = session.sql(f"SELECT {column} FROM {full_table_name} LIMIT 100")
meta_sample_df = meta_sample_sql.to_pandas()
for _, row in meta_sample_df.iterrows():
meta_d += str(row[column])
metadata_sql = f"SELECT snowflake.cortex.complete('mistral-7b', '{meta_d}に含まれるカラムの情報を端的に20文字以内で日本語で説明し、このとき{meta_d}に含まれる具体的なデータの中身は出力分に含めないようにしてください') AS response"
metadata_result = session.sql(metadata_sql)
try:
metadata_result_df = metadata_result.to_pandas()
column_meta = metadata_result_df.iloc[0][0]
except:
pass
except Exception as e:
print(f"カラム説明取得エラー: {e}")
column_meta = "不明"
# データフレームに追加
data_catalog.loc[len(data_catalog)] = [
role, database, schema, table, table_meta,
column, record_count, unique_count, null_count, dtype, column_meta
]
#適宜テーブルへの書き込みを行いたいとき
try:
session.use_role("SYSADMIN")
session.use_database("<T_DATABASE_NAME>")
session.use_schema("<T_SCHEMA_NAME>")
snowflake_df = session.create_dataframe(data_catalog)
snowflake_df.write.mode("overwrite").save_as_table(f"{<T_DATABASE_NAME>}.{<T_SCHEMA_NAME>}.{<T_TABLE_NAME>}")
session.use_role(role)
session.use_database(database)
session.use_schema(schema)
except Exception as e:
print(f"データ保存エラー: {e}")
except Exception as e:
print(f"カラム情報取得エラー: {e}")
continue
# データを Snowflake テーブルに保存
try:
session.use_role("SYSADMIN")
session.use_database("<T_DATABASE_NAME>")
session.use_schema("<T_SCHEMA_NAME>")
snowflake_df = session.create_dataframe(data_catalog)
snowflake_df.write.mode("overwrite").save_as_table(f"{<T_DATABASE_NAME>}.{<T_SCHEMA_NAME>}.{<T_TABLE_NAME>}")
except Exception as e:
print(f"データ保存エラー: {e}")
return snowflake_df
修正
適宜テーブルへの書き込みを行いたいときように以下を追加
try:
session.use_role("SYSADMIN")
session.use_database("<T_DATABASE_NAME>")
session.use_schema("<T_SCHEMA_NAME>")
snowflake_df = session.create_dataframe(data_catalog)
snowflake_df.write.mode("overwrite").save_as_table(f"{<T_DATABASE_NAME>}.{<T_SCHEMA_NAME>}.{<T_TABLE_NAME>}")
session.use_role(role)
session.use_database(database)
session.use_schema(schema)
except Exception as e:
print(f"データ保存エラー: {e}")
また,データカタログの情報をテーブルに書き込む際,クローリング対象のデータベース,スキーマと異なるテーブルに出力したい場合,データベースとスキーマを指定しないと書き込みがnullになるため,以下を追加
session.use_role("SYSADMIN")
session.use_database("<T_DATABASE_NAME>")
session.use_schema("<T_SCHEMA_NAME>")