はじめに
タイトルの通りですが、Snowpark for Python DataFrame を Streamlit でキャッシュする方法について質問を受けたので、改めて整理して考えてみました。
今回主に紹介する機能は、Snowpark DataFrame の cache_result()
という関数です。2024年5月の Snowpark リリースで追加された機能のようです。
なお、小規模なデータ(~1万行)に対しては、to_pandas()
&st.cache_data
も有効な手段だと思います。うまく使い分けられると良いですね。
結論
Snowpark DataFrame の cache_result()
を簡単に使えるデコレータを作成しておくよ。
全体のコード
準備
本検証においてリザルトキャッシュの影響を受けないよう、パラメータ設定を FALSE
に設定します。
USE ROLE ACCOUNTADMIN;
ALTER ACCOUNT SET USE_CACHED_RESULT = FALSE;
このパラメータ設定を TRUE
に戻し忘れると無駄なコストがかかる恐れがあるため、トライアルアカウントなどの環境で行うか、本記事の内容を見て満足するようにしてください。
なお、リザルトキャッシュの無効化は、セッションパラメータにより設定することもできますが、今回の Streamlit in Snowflake の環境からは設定できないためこのようにアカウントパラメータにより設定しています。
検証コードの全体(Streamlit in Snowflake)
# Import python packages
import streamlit as st
from snowflake.snowpark.context import get_active_session
# Write directly to the app
st.title("Example Streamlit App :balloon:")
# Get the current credentials
session = get_active_session()
df = session.table("snowflake_sample_data.tpch_sf100.orders")
st.write(f"レコード数:{df.count()}")
st.dataframe(df.limit(10))
st.subheader("Streamlit セッションステート")
if st.toggle("セッションステートの検証を行う"):
if st.toggle("セッションステートをキャッシュとして使ってみる"):
if "df2" not in st.session_state:
st.session_state.df2 = df.group_by("o_custkey").sum("o_totalprice")
st.dataframe(st.session_state.df2.limit(100))
else:
df2 = df.group_by("o_custkey").sum("o_totalprice")
st.dataframe(df2.limit(100))
st.subheader("Snowpark キャッシュ")
if st.toggle("キャッシュの検証を行う"):
if st.toggle("Snowpark のキャッシュ機能を使う"):
if "df3" not in st.session_state:
st.session_state.df3 = df.group_by("o_custkey").sum("o_totalprice").cache_result()
st.dataframe(st.session_state.df3.limit(100))
else:
df3 = df.group_by("o_custkey").sum("o_totalprice")
st.dataframe(df3.limit(100))
st.button("更新")
if st.button("キャッシュリセット"):
st.session_state.clear()
st.write("キュッシュクリアに成功しました。")
なお、上記の検証コードでは、各検証を排他的に実行できるよう st.toggle
で検証するかどうかを選択できるようにしています。
また、処理結果がキャッシュされているかどうかの確認のために、"更新" ボタンを用意しています。
検証
それでは、各検証について確認していきましょう。
前提:st.cache_data
まず、上記コードには含んでおりませんが、st.cache_data
デコレータでキャッシュできるんでないの?と思われた方もいるかもしれません。
しかし Snowpark DataFrame は、そのオブジェクト自体がデータを持っている訳ではなく処理の記述の履歴を保持しているだけですので、特にキャッシュできるデータの実体がある訳ではありません。
実際に試してみると、次のようなエラーが生じてしまいます。これは、未評価のデータフレームがキャッシュされていることを表します。先述の通り、特にキャッシュできるデータの実体がないということですね。
コード
@st.cache_data
def get_grouped_data():
return df.group_by("o_custkey").sum("o_totalprice").cache_result()
df_cached = get_grouped_data()
st.dataframe(df_cached.limit(100))
そのため、本記事で述べるような一工夫が必要となるわけです。
Streamlit の状態管理変数 st.session_state
でやってみる
st.cache_data
がダメとなれば、まず気になるのが st.session_state
です。結論としては、こちらの方法だけでは不適切なのですが、確認していきましょう。
st.subheader("Streamlit セッションステート")
if st.toggle("セッションステートの検証を行う"):
if st.toggle("セッションステートをキャッシュとして使ってみる"):
if "df2" not in st.session_state:
st.session_state.df2 = df.group_by("o_custkey").sum("o_totalprice")
st.dataframe(st.session_state.df2.limit(100))
else:
df2 = df.group_by("o_custkey").sum("o_totalprice")
st.dataframe(df2.limit(100))
動画の通り、セッションステートを使っても使わなくても、実行にかかる時間は同じで適切にキャッシュができていないことが分かります。
なお、このときの Snowpark DataFrame によって生成された裏側のクエリは下記のようになっていました。すべての DataFrame 処理が一括で SQL に変換されていることが分かりますね。
SELECT *
FROM (
SELECT "O_CUSTKEY", sum("O_TOTALPRICE") AS "SUM(O_TOTALPRICE)"
FROM (
SELECT *
FROM snowflake_sample_data.tpch_sf100.orders
)
GROUP BY "O_CUSTKEY"
) LIMIT 100
Snowpark の キャッシュ機能 cache_result()
でやってみる
そうなると、アプリ側で Snowpark DataFrame の処理をキャッシュすることは適切でないのだと理解できました。そこで、Snowpark の機能を使ってキャッシュすることを検討してみましょう。
このために、Snowpark は便利なメソッド cache_result()
を用意してくれています。同様に、ソースコードを見てみましょう。
st.subheader("Snowpark キャッシュ")
if st.toggle("キャッシュの検証を行う"):
if st.toggle("Snowpark のキャッシュ機能を使う"):
if "df3" not in st.session_state:
st.session_state.df3 = df.group_by("o_custkey").sum("o_totalprice").cache_result()
st.dataframe(st.session_state.df3.limit(100))
else:
df3 = df.group_by("o_custkey").sum("o_totalprice")
st.dataframe(df3.limit(100))
このコードの実行の様子です。最初はキャッシュ機能を使用していないため依然として実行は遅いです。しかし、「Snowpark のキャッシュ機能を使う」トグルスイッチを ON にしてから、二回目以降の実行は高速に表示できていることが分かると思います。
こちらも、分かりやすいクエリヒストリーが残っていました。初回は下記のように一時テーブルを作成し、結果を挿入しています。
CREATE SCOPED TEMPORARY TABLE "SNOWPARK_TEMP_TABLE_H7RT84XGCV"(
"O_CUSTKEY" BIGINT NOT NULL ,
"SUM(O_TOTALPRICE)" NUMBER(24, 2)
);
INSERT INTO "SNOWPARK_TEMP_TABLE_H7RT84XGCV"
SELECT "O_CUSTKEY", sum("O_TOTALPRICE") AS "SUM(O_TOTALPRICE)"
FROM (
SELECT *
FROM snowflake_sample_data.tpch_sf100.orders
) GROUP BY "O_CUSTKEY";
しかし、二回目以降は下記のように、先ほど作成した一時テーブルから結果を取得しています。後続に処理があった場合も、この一時テーブルをもとに処理が行われていきます。
SELECT *
FROM "SNOWPARK_TEMP_TABLE_H7RT84XGCV"
LIMIT 100
キャッシュのために毎回セッションステートを張るのが大変
分かります。
そこで、流用可能なデコレータ関数を用意しておこうと思います。このデコレータを、cache_result()
したデータフレームに適用すれば良いだけにしておきましょう。
下記のコードを追加します。
st.subheader("Snowpark キャッシュデコレータ")
def cache_sp_data(state_name):
def decorator(func):
def wrapper(*args, **kwargs):
if state_name not in st.session_state:
st.session_state[state_name] = func(*args, **kwargs)
return st.session_state[state_name]
return wrapper
return decorator
@cache_sp_data(state_name="df4")
def get_grouped_data_decorator():
return df.group_by("o_custkey").sum("o_totalprice").cache_result()
if st.toggle("Snowpark キャッシュデコレータ の検証を行う"):
if st.toggle("Snowpark キャッシュデコレータ を使う"):
df4 = get_grouped_data_decorator()
st.dataframe(df4.limit(100))
else:
df4 = df.group_by("o_custkey").sum("o_totalprice")
st.dataframe(df4.limit(100))
少し小難しいですが、やることは単純です。cache_sp_data
関数を予め定義しておき、それを下記のように関数定義に被せるだけです。
@cache_sp_data(state_name="df4")
def get_grouped_data_decorator():
return df.group_by("o_custkey").sum("o_totalprice").cache_result()
あとは、df4 = get_grouped_data_decorator()
のように呼び出すだけで、キャッシュ化されたデータフレームを取得することができます。
Snowpark DataFrame におけるキャッシュ機能のまとめ
Snowpark DataFrame には、便利なキャッシュ機能 cache_result()
があることをご理解いただけたでしょうか?
特に、Streamlit では、操作のたびに上から下まで再実行するという特徴があり、その度に重たいクエリが走るようではユーザーエクスペリエンスが低下すること間違いなしです。
今回ご紹介した機能以外でも、様々な工夫により快適な Streamlit ライフを楽しみましょう!
ちなみに:Snowflake のリザルトキャッシュを ON にすると・・・?
今回の検証パターンでは、すべてリザルトキャッシュ側がキャッシュをしてくれるようになります。
と言っても、Snowpark 側の cache_result()
が刺さるケースも多々あります。特に、後続に処理が続くケースで、処理のベースとして DataFrame 処理を一時保存しておきたいケースなどでしょうか。
また、Snowflake のリザルトキャッシュはクエリが同じでないといけないという特徴がありますが、Snowpark DataFrame は自動生成クエリのためリザルトキャッシュをうまく活用できないケースもあるかもしれません。そうした場合にも非常に有効になりえますね。
このように cache_result()
メソッドは Snowpark DataFrame オブジェクトを返してくれ、非常に簡単に活用できます。ぜひ頭の片隅に入れておいていただき、活用できそうな所にぜひ使ってみていただければと思います!
そして、Snowflake の最強リザルトキャッシュにも感謝しましょう!🙇
仲間募集
NTTデータ テクノロジーコンサルティング事業本部 では、以下の職種を募集しています。
1. クラウド技術を活用したデータ分析プラットフォームの開発・構築(ITアーキテクト/クラウドエンジニア)
クラウド/プラットフォーム技術の知見に基づき、DWH、BI、ETL領域におけるソリューション開発を推進します。
https://enterprise-aiiot.nttdata.com/recruitment/career_sp/cloud_engineer
2. データサイエンス領域(データサイエンティスト/データアナリスト)
データ活用/情報処理/AI/BI/統計学などの情報科学を活用し、よりデータサイエンスの観点から、データ分析プロジェクトのリーダーとしてお客様のDX/デジタルサクセスを推進します。
https://enterprise-aiiot.nttdata.com/recruitment/career_sp/datascientist
3.お客様のAI活用の成功を推進するAIサクセスマネージャー
DataRobotをはじめとしたAIソリューションやサービスを使って、
お客様のAIプロジェクトを成功させ、ビジネス価値を創出するための活動を実施し、
お客様内でのAI活用を拡大、NTTデータが提供するAIソリューションの利用継続を推進していただく人材を募集しています。
https://nttdata.jposting.net/u/job.phtml?job_code=804
4.DX/デジタルサクセスを推進するデータサイエンティスト《管理職/管理職候補》
データ分析プロジェクトのリーダとして、正確な課題の把握、適切な評価指標の設定、分析計画策定や適切な分析手法や技術の評価・選定といったデータ活用の具現化、高度化を行い分析結果の見える化・お客様の納得感醸成を行うことで、ビジネス成果・価値を出すアクションへとつなげることができるデータサイエンティスト人材を募集しています。ソリューション紹介
Trusted Data Foundationについて
~データ資産を分析活用するための環境をオールインワンで提供するソリューション~
https://www.nttdata.com/jp/ja/lineup/tdf/
最新のクラウド技術を採用して弊社が独自に設計したリファレンスアーキテクチャ(Datalake+DWH+AI/BI)を顧客要件に合わせてカスタマイズして提供します。
可視化、機械学習、DeepLearningなどデータ資産を分析活用するための環境がオールインワンで用意されており、これまでとは別次元の量と質のデータを用いてアジリティ高くDX推進を実現できます。
TDFⓇ-AM(Trusted Data Foundation - Analytics Managed Service)について
~データ活用基盤の段階的な拡張支援(Quick Start) と保守運用のマネジメント(Analytics Managed)をご提供することでお客様のDXを成功に導く、データ活用プラットフォームサービス~
https://www.nttdata.com/jp/ja/lineup/tdf_am/
TDFⓇ-AMは、データ活用をQuickに始めることができ、データ活用の成熟度に応じて段階的に環境を拡張します。プラットフォームの保守運用はNTTデータが一括で実施し、お客様は成果創出に専念することが可能です。また、日々最新のテクノロジーをキャッチアップし、常に活用しやすい環境を提供します。なお、ご要望に応じて上流のコンサルティングフェーズからAI/BIなどのデータ活用支援に至るまで、End to Endで課題解決に向けて伴走することも可能です。
NTTデータとDatabricksについて
NTTデータは、お客様企業のデジタル変革・DXの成功に向けて、「databricks」のソリューションの提供に加え、情報活用戦略の立案から、AI技術の活用も含めたアナリティクス、分析基盤構築・運用、分析業務のアウトソースまで、ワンストップの支援を提供いたします。NTTデータとTableauについて
ビジュアル分析プラットフォームのTableauと2014年にパートナー契約を締結し、自社の経営ダッシュボード基盤への採用や独自のコンピテンシーセンターの設置などの取り組みを進めてきました。さらに2019年度にはSalesforceとワンストップでのサービスを提供開始するなど、積極的にビジネスを展開しています。
これまでPartner of the Year, Japanを4年連続で受賞しており、2021年にはアジア太平洋地域で最もビジネスに貢献したパートナーとして表彰されました。
また、2020年度からは、Tableauを活用したデータ活用促進のコンサルティングや導入サービスの他、AI活用やデータマネジメント整備など、お客さまの企業全体のデータ活用民主化を成功させるためのノウハウ・方法論を体系化した「デジタルサクセス」プログラムを提供開始しています。
https://www.nttdata.com/jp/ja/lineup/tableau/
NTTデータとAlteryxについて
Alteryx導入の豊富な実績を持つNTTデータは、最高位にあたるAlteryx Premiumパートナーとしてお客さまをご支援します。
導入時のプロフェッショナル支援など独自メニューを整備し、特定の業種によらない多くのお客さまに、Alteryxを活用したサービスの強化・拡充を提供します。
NTTデータとDataRobotについて
NTTデータはDataRobot社と戦略的資本業務提携を行い、経験豊富なデータサイエンティストがAI・データ活用を起点にお客様のビジネスにおける価値創出をご支援します。
NTTデータとInformaticaについて
データ連携や処理方式を専門領域として10年以上取り組んできたプロ集団であるNTTデータは、データマネジメント領域でグローバルでの高い評価を得ているInformatica社とパートナーシップを結び、サービス強化を推進しています。
https://www.nttdata.com/jp/ja/lineup/informatica/
NTTデータとSnowflakeについて
NTTデータではこれまでも、独自ノウハウに基づき、ビッグデータ・AIなど領域に係る市場競争力のあるさまざまなソリューションパートナーとともにエコシステムを形成し、お客さまのビジネス変革を導いてきました。
Snowflakeは、これら先端テクノロジーとのエコシステムの形成に強みがあり、NTTデータはこれらを組み合わせることでお客さまに最適なインテグレーションをご提供いたします。