データカタログとは
簡単にいうと,データ利活用を促進するために必要不可欠となる,データベースのメタ情報をまとめたもので,抽象的な概念である.そのため,何の情報があればデータカタログと言えるかはケースバイケースとなる.データカタログの必要性については,DMBOKで語られているため,この場では詳細については割愛する.
今回対象とするデータカタログの中身
大前提として,Snowflakeのデータベースのデータカタログを作成する.INFORMATION_SCHEMA で取得できる情報もあるが,そこから得られる情報は少なく,また,ダイナミックデータマスキングを用いると,ロールごとに見える情報も異なるため,Snowflakeでロール毎のお手製データカタログを作成することを考える.今回対象とするデータカタログの中身は以下の通りとなる.
- ロール名,データベース名,スキーマ名,テーブル名,カラム名
- レコード数
- ユニークレコード数
- Null レコード数
- 型
- 簡単なカラムの説明(テーブルやスキーマ,データベースの説明も作成できるが今回は割愛)
サンプルコード(修正中)
注意点
- エラー処理は入れてません
- 最低限の動作を想定して,簡易なコードとなっています
- 事前に適切な権限が付与してください
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
import sys
import io
import pandas as pd
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
def main(session: snowpark.Session):
session.use_warehouse("<WAREHOUSE_NAME>")
role_list = ["<ROLE_NAME_1>", "<ROLENAME_2>"]
column_meta = pd.DataFrame(columns=["role","database_name","schema_name","table_name","column_name","count_r","count_u","count_null","dtype","metadata"])
for x in range(0,len(role_list)):
df_database_name = "<DATABASE_NAME>"
df_role = role_list[x]
session.use_database(df_database_name)
session.use_role(df_role)
df_schemas = session.sql("show terse schemas")
df_schema = df_schemas.select(col('"name"'))
pandas_df_schema = df_schema.to_pandas()
for i in range(0,len(pandas_df_schema)):
df_schema_name = pandas_df_schema.name[i]
session.use_schema(df_schema_name)
pandas_df_table_name = session.sql("show terse views").select(col('"name"')).to_pandas()
for j in range(0,len(pandas_df_table_name)):
df_table_name = pandas_df_table_name.name[j]
table_name_d = f"{df_database_name}.{df_schema_name}.{df_table_name}"
df_table = session.table(table_name_d)
df_table_columns = df_table.columns
count_r = df_table.count()
for k in range(0,len(df_table_columns)):
df_table_columns_d = df_table_columns[k]
table_sql = f"select {df_table_columns_d},count(*) from {table_name_d} group by {df_table_columns_d} order by count(*)"
table_d = session.sql(table_sql)
count_u = table_d.count()
pd_table_d = table_d.select(col(df_table_columns_d)).to_pandas()
table_sqlnl = f"select count(CASE WHEN {df_table_columns_d} IS NULL THEN 1 END) AS NL from {table_name_d}"
table_null = session.sql(table_sqlnl)
table_null_pd = table_null.to_pandas()
count_null = table_null_pd.NL[0]
dtype = str(pd_table_d[df_table_columns_d].dtype)
meta_d = ""
meta_d = meta_d + f"カラム名は{df_table_columns_d}"
meta_d_sql = session.sql(f"select {df_table_columns_d} from {df_database_name}.{df_schema_name}.{df_table_name} limit 100")
meta_d_sql_pd = meta_d_sql.to_pandas()
for l in range(0,meta_d_sql_pd.shape[0]):
meta_d = meta_d + str(meta_d_sql_pd.iloc[j,0])
print(meta_d)
metadata_sql = f"select snowflake.cortex.complete('mistral-7b','{meta_d}に含まれるカラムの情報を端的に20文字以内で日本語で説明し、このとき{meta_d}に含まれる具体的なデータの中身は出力分に含めないようにしてください') as response;"
metadata = session.sql(metadata_sql)
metadata_pd = metadata.to_pandas()
column_meta.loc[len(column_meta)] = [df_role,df_database_name,df_schema_name,df_table_name,df_table_columns_d,count_r,count_u,count_null,dtype,metadata_pd.iloc[0][0]]
snowflake_column_meta = session.create_dataframe(column_meta)
snowflake_column_meta.write.mode("overwrite").save_as_table(f"{<T_DATABASE_NAME>}.{<T_SCHEMA_NAME>}.{<T_TABLE_NAME>}")
return snowflake_column_meta