Snowflakeでユーザ個別の利用状況を可視化する方法 の続き
Snowflakeでユーザ個別の利用状況を可視化する方法の続き.
サンプルコード
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
import sys
import io
import pandas as pd
import re
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
def main(session: snowpark.Session): 
    query_price = <QUERY_PRICE>
    df_warehouse_use = "<WAREHOUSE_NAME>"
    df_role_use = "<ROLE_NAME>"
    df_database_name = "<DATABASE_NAME>"
    df_schema_name = "<SCHEMA_NAME>"
    session.use_warehouse(df_warehouse_use)
    session.use_role(df_role_use)
    session.use_database(df_database_name)
    session.use_schema(df_schema_name)
    df_warehouse_list = "WAREHOUSE_LIST"          #warehouse一覧の格納テーブル名
    df_QUERY_HISTORY_LOG = "QUERY_HISTORY_LOG"    #クエリログにコストを追加したテーブル名
    df_USER_QUERY = "USER_QUERY"                  #ユーザ別月別利用料集計テーブル名
    df_USER_LOGIN_NUM = "USER_LOGIN_NUM"          #ユーザ別月別ログイン回数テーブル名
    
    #Warehouse一覧
    warehouses_list = session.sql("show warehouses")
    warehouses_list.write.mode('overwrite').save_as_table(f"{df_database_name}.{df_schema_name}.{df_warehouse_list}")
    #query_history にwarehouseの情報結合
    sql1 = f"select *, {df_warehouse_list}.\"size\" as SIZE from SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY inner join {df_warehouse_list} on SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY.WAREHOUSE_NAME = {df_warehouse_list}.\"name\""
    query_history_log = session.sql(sql1)
    query_history_log_pd = query_history_log.to_pandas()
    #COSTの追加
    query_history_log_pd['COST'] = 0
    for i in range(0,len(query_history_log_pd)):
        if query_history_log_pd.SIZE[i] == "Large":
            query_history_log_pd.COST[i] = query_history_log_pd.TOTAL_ELAPSED_TIME[i] * 8 
        elif query_history_log_pd.SIZE[i] == "X-Small":
            query_history_log_pd.COST[i] = query_history_log_pd.TOTAL_ELAPSED_TIME[i] 
        elif query_history_log_pd.SIZE[i] == "Small":
            query_history_log_pd.COST[i] = query_history_log_pd.TOTAL_ELAPSED_TIME[i] * 2
        elif query_history_log_pd.SIZE[i] == "Medium":
            query_history_log_pd.COST[i] = query_history_log_pd.TOTAL_ELAPSED_TIME[i] * 4
        elif query_history_log_pd.SIZE[i] == "X-Large":
            query_history_log_pd.COST[i] = query_history_log_pd.TOTAL_ELAPSED_TIME[i] * 16
        elif query_history_log_pd.SIZE[i] == "2X-Large":
            query_history_log_pd.COST[i] = query_history_log_pd.TOTAL_ELAPSED_TIME[i] * 32
        elif query_history_log_pd.SIZE[i] == "3X-Large":
            query_history_log_pd.COST[i] = query_history_log_pd.TOTAL_ELAPSED_TIME[i] * 64
        elif query_history_log_pd.SIZE[i] == "4X-Large":
            query_history_log_pd.COST[i] = query_history_log_pd.TOTAL_ELAPSED_TIME[i] * 128
        elif query_history_log_pd.SIZE[i] == "5X-Large":
            query_history_log_pd.COST[i] = query_history_log_pd.TOTAL_ELAPSED_TIME[i] * 256
        elif query_history_log_pd.SIZE[i] == "6X-Large":
            query_history_log_pd.COST[i] = query_history_log_pd.TOTAL_ELAPSED_TIME[i] * 512
        else:
            query_history_log_pd.COST[i] = query_history_log_pd.TOTAL_ELAPSED_TIME[i]
    query_history_log_pd['COST'] = query_history_log_pd['COST'] * query_price / 1000 / 3600
    
    query_history_log = session.create_dataframe(query_history_log_pd)
    query_history_log.write.mode('overwrite').save_as_table(f"{df_database_name}.{df_schema_name}.{df_QUERY_HISTORY_LOG}")
    #ユーザ\別に月別集計
    sql2 = f"select USER_NAME as USER, TO_CHAR(TO_DATE(START_TIME),'YYYY-MM') as MONTH, count(*) as QUERY_NUM, sum(TOTAL_ELAPSED_TIME) as QUERY_TIME, sum(COST) as COST from {df_database_name}.{df_schema_name}.{df_QUERY_HISTORY_LOG} group by USER_NAME, MONTH order by MONTH"
    user_query = session.sql(sql2)
    user_query.write.mode('overwrite').save_as_table(f"{df_database_name}.{df_schema_name}.{df_USER_QUERY}")
    #ユーザログイン回数
    sql3 = "select user_name as USER_NAME, to_char(to_date(event_timestamp),'yyyy-mm') as MONTH, count(*) as LOGIN_NUM from snowflake.account_usage.login_history group by USER_NAME, MONTH order by MONTH"
    user_login = session.sql(sql3)
    user_login.write.mode('overwrite').save_as_table(f"{df_database_name}.{df_schema_name}.{df_USER_LOGIN_NUM}")
    
    return user_query
