0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Snowflakeでロール毎のお手製データカタログを作成する方法2

Last updated at Posted at 2025-01-19

概要

Snowflakeでロール毎のお手製データカタログを作成する方法で紹介したデータカタログについて,テーブルのメタ情報も追加した.また,エラー処理や試しでスキーマ,テーブル,カラムの1つに対して試用できるようにした.

サンプルコード

import snowflake.snowpark as snowpark
import pandas as pd
import sys
import io

sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')

def main(session: snowpark.Session):
    # 設定: 以下のフラグで出力範囲を切り替える
    process_all_schemas = False  # True: 全スキーマ, False: 1つ目のスキーマのみ
    process_all_tables = False   # True: 全テーブル, False: 1つ目のテーブルのみ
    process_all_columns = False  # True: 全カラム, False: 1つ目のカラムのみ

    session.use_warehouse("<WAREHOUSE_NAME>")
    database = "<DATABASE_NAME>"  # データベースを固定
    role_list = ["<ROLE_NAME_1>", "<ROLENAME_2>"]

    # データカタログのカラム定義
    data_catalog = pd.DataFrame(columns=[
        "role", "database", "schema", "table", "table_metadata",
        "column", "count_r","count_u","count_null","dtype","column_metadata"
    ])

    for role in role_list:
        session.use_role(role)
        session.use_database(database)
        schemas = session.sql("SHOW SCHEMAS").collect()

        # スキーマ処理: 全スキーマ or 1つ目のスキーマのみ
        schemas_to_process = schemas if process_all_schemas else schemas[:1]

        for schema_row in schemas_to_process:
            schema = schema_row["name"]
            # INFORMATION_SCHEMA をスキップ
            if schema.upper() == "INFORMATION_SCHEMA":
                continue
            
            session.use_schema(schema)
            tables = session.sql("SHOW TABLES").collect()

            # テーブル処理: 全テーブル or 1つ目のテーブルのみ
            tables_to_process = tables if process_all_tables else tables[:1]

            for table_row in tables_to_process:
                table = table_row["name"]
                full_table_name = f"{database}.{schema}.{table}"

                # テーブル存在確認
                try:
                    session.sql(f"SELECT 1 FROM {full_table_name} LIMIT 1").collect()
                except Exception as e:
                    print(f"テーブル {full_table_name} にアクセスできません: {e}")
                    continue

                # テーブルの説明 (table_meta)
                try:
                    table_meta_d = f"テーブル名は{table}"
                    table_meta_sample_sql = session.sql(f"SELECT * FROM {full_table_name} LIMIT 10")
                    table_meta_sample_df = table_meta_sample_sql.to_pandas()
                    table_meta_d += table_meta_sample_df.to_string(index=False)
                    
                    table_metadata_sql = f"SELECT snowflake.cortex.complete('mistral-7b', '{table_meta_d}に含まれるテーブルの情報を端的に20文字以内で日本語で説明し、このとき{table_meta_d}に含まれる具体的なデータの中身は出力分に含めないようにしてください') AS response"
                    table_metadata_result = session.sql(table_metadata_sql)
                    try:
                        table_metadata_result_df = table_metadata_result.to_pandas()
                        table_meta = table_metadata_result_df.iloc[0][0]
                    except:
                        pass
                    
                except Exception as e:
                    print(f"テーブル説明取得エラー: {e}")
                    table_meta = "不明"

                # レコード数取得
                try:
                    df_table = session.table(full_table_name)
                    record_count = df_table.count()
                except Exception as e:
                    print(f"レコード数取得エラー: {e}")
                    record_count = None

                # カラム情報の取得
                try:
                    columns_info = session.sql(f"SHOW COLUMNS IN TABLE {full_table_name}").collect()
                    if not columns_info:
                        print(f"テーブル {full_table_name} にカラム情報がありません。")
                        continue

                    # カラム処理: 全カラム or 1つ目のカラムのみ
                    columns_to_process = columns_info if process_all_columns else columns_info[:1]

                    for column_row in columns_to_process:
                        column = column_row["column_name"]
                        data_type = column_row["data_type"]

                        # ユニーク数
                        try:
                            unique_count = df_table.select(column).distinct().count()
                        except Exception as e:
                            print(f"ユニーク数取得エラー: {e}")
                            unique_count = None

                        # NULL 値の数
                        try:
                            null_count_sql = f"SELECT COUNT(*) - COUNT({column}) AS null_count FROM {full_table_name}"
                            null_count_result = session.sql(null_count_sql).collect()
                            null_count = null_count_result[0]["NULL_COUNT"] if null_count_result else 0
                        except Exception as e:
                            print(f"NULL数取得エラー: {e}")
                            null_count = None

                        # 型情報の取得
                        try:
                            sample_df = df_table.limit(100).to_pandas()
                            dtype = str(sample_df[column].dtype)
                        except Exception as e:
                            print(f"型情報取得エラー: {e}")
                            dtype = "不明"

                        # カラム説明 (column_meta)
                        try:
                            meta_d = f"カラム名は{column}"
                            meta_sample_sql = session.sql(f"SELECT {column} FROM {full_table_name} LIMIT 100")
                            meta_sample_df = meta_sample_sql.to_pandas()
                            for _, row in meta_sample_df.iterrows():
                                meta_d += str(row[column])

                            metadata_sql = f"SELECT snowflake.cortex.complete('mistral-7b', '{meta_d}に含まれるカラムの情報を端的に20文字以内で日本語で説明し、このとき{meta_d}に含まれる具体的なデータの中身は出力分に含めないようにしてください') AS response"
                            metadata_result = session.sql(metadata_sql)
                            try:
                                metadata_result_df = metadata_result.to_pandas()
                                column_meta = metadata_result_df.iloc[0][0]
                            except:
                                pass

                        except Exception as e:
                            print(f"カラム説明取得エラー: {e}")
                            column_meta = "不明"

                        # データフレームに追加
                        data_catalog.loc[len(data_catalog)] = [
                            role, database, schema, table, table_meta,
                            column, record_count, unique_count, null_count, dtype, column_meta
                        ]
                        #適宜テーブルへの書き込みを行いたいとき
                        try:
                            session.use_role("SYSADMIN")
                            session.use_database("<T_DATABASE_NAME>")
                            session.use_schema("<T_SCHEMA_NAME>")
                            snowflake_df = session.create_dataframe(data_catalog)
                                snowflake_df.write.mode("overwrite").save_as_table(f"{<T_DATABASE_NAME>}.{<T_SCHEMA_NAME>}.{<T_TABLE_NAME>}")
                            session.use_role(role)
                            session.use_database(database)
                            session.use_schema(schema)
                        except Exception as e:
                            print(f"データ保存エラー: {e}")
                                
                except Exception as e:
                    print(f"カラム情報取得エラー: {e}")
                    continue

    # データを Snowflake テーブルに保存
    try:
        session.use_role("SYSADMIN")
        session.use_database("<T_DATABASE_NAME>")
        session.use_schema("<T_SCHEMA_NAME>")
        snowflake_df = session.create_dataframe(data_catalog)
        snowflake_df.write.mode("overwrite").save_as_table(f"{<T_DATABASE_NAME>}.{<T_SCHEMA_NAME>}.{<T_TABLE_NAME>}")
    except Exception as e:
        print(f"データ保存エラー: {e}")

    return snowflake_df

修正

適宜テーブルへの書き込みを行いたいときように以下を追加

try:
    session.use_role("SYSADMIN")
    session.use_database("<T_DATABASE_NAME>")
    session.use_schema("<T_SCHEMA_NAME>")
    snowflake_df = session.create_dataframe(data_catalog)
    snowflake_df.write.mode("overwrite").save_as_table(f"{<T_DATABASE_NAME>}.{<T_SCHEMA_NAME>}.{<T_TABLE_NAME>}")
    session.use_role(role)
    session.use_database(database)
    session.use_schema(schema)
except Exception as e:
    print(f"データ保存エラー: {e}")

また,データカタログの情報をテーブルに書き込む際,クローリング対象のデータベース,スキーマと異なるテーブルに出力したい場合,データベースとスキーマを指定しないと書き込みがnullになるため,以下を追加

session.use_role("SYSADMIN")
session.use_database("<T_DATABASE_NAME>")
session.use_schema("<T_SCHEMA_NAME>")
0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?