Snowflakeでユーザ個別の利用状況を可視化する方法 の続き
Snowflakeでユーザ個別の利用状況を可視化する方法の続き.
サンプルコード
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
import sys
import io
import pandas as pd
import re
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
def main(session: snowpark.Session):
query_price = <QUERY_PRICE>
df_warehouse_use = "<WAREHOUSE_NAME>"
df_role_use = "<ROLE_NAME>"
df_database_name = "<DATABASE_NAME>"
df_schema_name = "<SCHEMA_NAME>"
session.use_warehouse(df_warehouse_use)
session.use_role(df_role_use)
session.use_database(df_database_name)
session.use_schema(df_schema_name)
df_warehouse_list = "WAREHOUSE_LIST" #warehouse一覧の格納テーブル名
df_QUERY_HISTORY_LOG = "QUERY_HISTORY_LOG" #クエリログにコストを追加したテーブル名
df_USER_QUERY = "USER_QUERY" #ユーザ別月別利用料集計テーブル名
df_USER_LOGIN_NUM = "USER_LOGIN_NUM" #ユーザ別月別ログイン回数テーブル名
#Warehouse一覧
warehouses_list = session.sql("show warehouses")
warehouses_list.write.mode('overwrite').save_as_table(f"{df_database_name}.{df_schema_name}.{df_warehouse_list}")
#query_history にwarehouseの情報結合
sql1 = f"select *, {df_warehouse_list}.\"size\" as SIZE from SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY inner join {df_warehouse_list} on SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY.WAREHOUSE_NAME = {df_warehouse_list}.\"name\""
query_history_log = session.sql(sql1)
query_history_log_pd = query_history_log.to_pandas()
#COSTの追加
query_history_log_pd['COST'] = 0
for i in range(0,len(query_history_log_pd)):
if query_history_log_pd.SIZE[i] == "Large":
query_history_log_pd.COST[i] = query_history_log_pd.TOTAL_ELAPSED_TIME[i] * 8
elif query_history_log_pd.SIZE[i] == "X-Small":
query_history_log_pd.COST[i] = query_history_log_pd.TOTAL_ELAPSED_TIME[i]
elif query_history_log_pd.SIZE[i] == "Small":
query_history_log_pd.COST[i] = query_history_log_pd.TOTAL_ELAPSED_TIME[i] * 2
elif query_history_log_pd.SIZE[i] == "Medium":
query_history_log_pd.COST[i] = query_history_log_pd.TOTAL_ELAPSED_TIME[i] * 4
elif query_history_log_pd.SIZE[i] == "X-Large":
query_history_log_pd.COST[i] = query_history_log_pd.TOTAL_ELAPSED_TIME[i] * 16
elif query_history_log_pd.SIZE[i] == "2X-Large":
query_history_log_pd.COST[i] = query_history_log_pd.TOTAL_ELAPSED_TIME[i] * 32
elif query_history_log_pd.SIZE[i] == "3X-Large":
query_history_log_pd.COST[i] = query_history_log_pd.TOTAL_ELAPSED_TIME[i] * 64
elif query_history_log_pd.SIZE[i] == "4X-Large":
query_history_log_pd.COST[i] = query_history_log_pd.TOTAL_ELAPSED_TIME[i] * 128
elif query_history_log_pd.SIZE[i] == "5X-Large":
query_history_log_pd.COST[i] = query_history_log_pd.TOTAL_ELAPSED_TIME[i] * 256
elif query_history_log_pd.SIZE[i] == "6X-Large":
query_history_log_pd.COST[i] = query_history_log_pd.TOTAL_ELAPSED_TIME[i] * 512
else:
query_history_log_pd.COST[i] = query_history_log_pd.TOTAL_ELAPSED_TIME[i]
query_history_log_pd['COST'] = query_history_log_pd['COST'] * query_price / 1000 / 3600
query_history_log = session.create_dataframe(query_history_log_pd)
query_history_log.write.mode('overwrite').save_as_table(f"{df_database_name}.{df_schema_name}.{df_QUERY_HISTORY_LOG}")
#ユーザ\別に月別集計
sql2 = f"select USER_NAME as USER, TO_CHAR(TO_DATE(START_TIME),'YYYY-MM') as MONTH, count(*) as QUERY_NUM, sum(TOTAL_ELAPSED_TIME) as QUERY_TIME, sum(COST) as COST from {df_database_name}.{df_schema_name}.{df_QUERY_HISTORY_LOG} group by USER_NAME, MONTH order by MONTH"
user_query = session.sql(sql2)
user_query.write.mode('overwrite').save_as_table(f"{df_database_name}.{df_schema_name}.{df_USER_QUERY}")
#ユーザログイン回数
sql3 = "select user_name as USER_NAME, to_char(to_date(event_timestamp),'yyyy-mm') as MONTH, count(*) as LOGIN_NUM from snowflake.account_usage.login_history group by USER_NAME, MONTH order by MONTH"
user_login = session.sql(sql3)
user_login.write.mode('overwrite').save_as_table(f"{df_database_name}.{df_schema_name}.{df_USER_LOGIN_NUM}")
return user_query