はじめに
こちらの記事は、初見では何がなんだか分かりづらい、Snowflake における Streamlit での DataFrame の種類と特徴を把握しておこう!という趣旨の記事になります。
本記事では、その特徴を触れるに留め、詳細なメソッドの比較などは行いません。それらについては、先人の記事を参照するようにしてみてください。非常に分かりやすく、利用者の視点で記載いただいております。
Streamlit×Snowflake×DataFrame の種類
まずは早速、どのような DataFrame 形式があるのか確認していきましょう。
DataFrameの種類 | 概要 | 特徴 |
---|---|---|
通常の Pandas DataFrame | Python で広く使われている DataFrame。 | ● ローカルメモリ上で比較的高速に処理できる ● 大規模データにはメモリ制限がある ● ローカルへのデータ転送に時間を要する |
Snowpark DataFrame | DataFrame 処理を Snowpark が自動的に SQL に変換し、スケーラブルに実行できる。 | ● ローカルの性能に依存しない ● 大規模データの処理が可能 ● 操作感が Pandas と異なる |
Snowpark Pandas DataFrame | Pandas と同等の記法で、Snowpark DataFrame と同じようなメリットを享受できる DataFrame。 | ● Pandas の操作感のままに Snowflake の計算リソースを活用できる ● Pandas と比較して処理速度が速い上、データ転送コストが削減できる ● Pandas のすべてのメソッドに対応している訳ではない(およそ60%程度?) |
続いて、各データフレームの使い始め方を見てみましょう。
- 通常の Pandas DataFrame:
df = session.table("<table-name>").to_pandas()
- Snowpark DataFrame:
df = session.table("<table-name>")
- Snowpark Pandas DataFrame:
df = session.table("<table-name>").to_snowpark_pandas()
なお、Snowpark Pandas DataFrame については、こちらの記事で詳細を解説しています。あわせてチェックしてみてください。非常に便利で、オススメな DataFrame 形式です!
ただし、記事執筆時点では、Streamlit in Snowflake で Snowpark Pandas DataFrame を使用することはできません。恐らく、近日中には使えるようになると思われますので、楽しみにしておきましょう!
Streamlit×Snowflake×DataFrame におけるあるある
基本的に Snowpark DataFrame を使っていれば下記のような問題に直面することはないのですが、知らないとつまづきやすいポイントを挙げてみようと思います。
集計に時間がかかる
Pandas データフレームを使っている場合、データをアプリのウェアハウスにダウンロードする必要があるため時間がかかったり、処理自体も1プロセスでの処理となるため、余計に時間がかかってしまいます。
例えば、Pandas DataFrame と Snowpark DataFrame で、1500万行のテーブルの集計を行ったときの処理時間を下記に示します。(縦軸 時間[秒] です。)
検証用ソースコード(Streamlit in Snowflake で実行)
import time
import streamlit as st
from snowflake.snowpark.context import get_active_session
st.title("Example Streamlit App :balloon:")
session = get_active_session()
session.sql("ALTER SESSION SET USE_CACHED_RESULT = FALSE")
st.write(f"レコード数:{session.sql('select count(*) from snowflake_sample_data.tpch_sf10.orders').collect()[0][0]/10000}万件")
st.subheader("Pandas DataFrame による処理")
start_time_entire = time.time()
df = session.table("snowflake_sample_data.tpch_sf10.orders").to_pandas()
start_time_process = time.time()
df_grouped = df.groupby("O_CUSTKEY").sum("O_TOTALPRICE")
st.dataframe(df_grouped.head(10)[["O_TOTALPRICE"]])
end_time = time.time()
elapsed_time_pdf_entire = end_time - start_time_entire
elapsed_time_pdf_process = end_time - start_time_process
st.write(f"elapsed time (entire) {elapsed_time_pdf_entire}")
st.write(f"elapsed time (process) {elapsed_time_pdf_process}")
st.subheader("Snowpark DataFrame による処理")
start_time_entire = time.time()
df = session.table("snowflake_sample_data.tpch_sf10.orders")
start_time_process = time.time()
df_grouped = df.group_by("O_CUSTKEY").sum("O_TOTALPRICE")
df_grouped.show()
end_time = time.time()
elapsed_time_sdf_entire = end_time - start_time_entire
elapsed_time_sdf_process = end_time - start_time_process
st.write(f"elapsed time (entire) {elapsed_time_sdf_entire}")
st.write(f"elapsed time (process) {elapsed_time_sdf_process}")
col1, col2 = st.columns(2)
with col1:
st.write(f"【全体】Snowpark DataFrameのほうが、{elapsed_time_pdf_entire/elapsed_time_sdf_entire :.1f}倍高速でした。(読み込み時間を含む)")
st.bar_chart({
"Pandas": elapsed_time_pdf_entire,
"Snowpark": elapsed_time_sdf_entire,
})
with col2:
st.write(f"【集計処理】Snowpark DataFrameのほうが、{elapsed_time_pdf_process/elapsed_time_sdf_process :.1f}倍高速でした。(読み込み時間を含まない)")
st.bar_chart({
"Pandas": elapsed_time_pdf_process,
"Snowpark": elapsed_time_sdf_process,
})
そもそもメモリに乗り切らない
先ほどは 1500万件でしたが、同テーブルの 1.5億件を Pandas DataFrame に対して使ってみると、アプリが完全に固まってしまいました。(再起動ループのようなものを繰り返してしまいます)
検証用ソースコード(Streamlit in Snowflake で実行)
import streamlit as st
from snowflake.snowpark.context import get_active_session
st.title("Example Streamlit App :balloon:")
session = get_active_session()
session.sql("ALTER SESSION SET USE_CACHED_RESULT = FALSE")
st.write(f"レコード数:{session.sql('select count(*) from snowflake_sample_data.tpch_sf100.orders').collect()[0][0]/10000}万件")
st.subheader("Pandas DataFrame による処理")
df = session.table("snowflake_sample_data.tpch_sf100.orders").to_pandas()
st.dataframe(df.sample(100))
なお、Snowpark DataFrame の場合は、15億件のテーブルの場合(最右)はさすがに劣化が見られましたが、実行できないということはありませんでした。
検証用ソースコード(Streamlit in Snowflake で実行)
import time
import streamlit as st
from snowflake.snowpark.context import get_active_session
st.title("Example Streamlit App :balloon:")
session = get_active_session()
session.sql("ALTER SESSION SET USE_CACHED_RESULT = FALSE")
st.write(f"レコード数(tpch10):{session.sql('select count(*) from snowflake_sample_data.tpch_sf10.orders').collect()[0][0]/10000}万件")
st.write(f"レコード数(tpch100):{session.sql('select count(*) from snowflake_sample_data.tpch_sf100.orders').collect()[0][0]/10000}万件")
st.write(f"レコード数(tpch1000):{session.sql('select count(*) from snowflake_sample_data.tpch_sf1000.orders').collect()[0][0]/10000}万件")
st.subheader("Snowpark DataFrame による処理(tpch10)")
start_time_entire = time.time()
df = session.table("snowflake_sample_data.tpch_sf10.orders")
df_grouped = df.group_by("O_CUSTKEY").sum("O_TOTALPRICE")
df_grouped.show()
end_time = time.time()
elapsed_time_sdf_entire_10 = end_time - start_time_entire
st.write(f"elapsed time (entire, tpch10) {elapsed_time_sdf_entire_10}")
st.subheader("Snowpark DataFrame による処理(tpch100)")
start_time_entire = time.time()
df = session.table("snowflake_sample_data.tpch_sf100.orders")
df_grouped = df.group_by("O_CUSTKEY").sum("O_TOTALPRICE")
df_grouped.show()
end_time = time.time()
elapsed_time_sdf_entire_100 = end_time - start_time_entire
st.write(f"elapsed time (entire, tpch010) {elapsed_time_sdf_entire_100}")
st.subheader("Snowpark DataFrame による処理(tpch1000)")
start_time_entire = time.time()
df = session.table("snowflake_sample_data.tpch_sf1000.orders")
df_grouped = df.group_by("O_CUSTKEY").sum("O_TOTALPRICE")
df_grouped.show()
end_time = time.time()
elapsed_time_sdf_entire_1000 = end_time - start_time_entire
st.write(f"elapsed time (entire, tpch1000) {elapsed_time_sdf_entire_1000}")
st.bar_chart({
"Snowpark tpch10": elapsed_time_sdf_entire_10,
"Snowpark tpch100": elapsed_time_sdf_entire_100,
"Snowpark tpch1000": elapsed_time_sdf_entire_1000,
})
構文や適切な使い方がよく分からない
こちらは、Snowpark DataFrame でよくある事例だと思います。その結果、使い慣れていたり情報の多かったりする Pandas DataFrame を使わざるを得なくなってしまっているのかなと思います。
ただ、Snowpark DataFrame と Pandas DataFrame の違いを知らないだけだったり、意識していないだけだったりする場合は、ぜひ Snowpark DataFrame を知って、習得してほしいな、と思います。(慣れるとだいぶ直感的になります)
もちろん、Snowpark DataFrame が難しければ、Snowpark Pandas DataFrame を使うことも良い選択肢です。使い慣れた DataFrame 形式で、Snowflake のリソースを使い倒すことができます!
よく分からない!となりがちな例として、いくつか確認してみましょう。
そもそもどうしたら Snowpark DataFrame になるの?
df = session.table("<table-name>")
としたら、それがそのまま Snowpark DataFrame 形式のデータフレームとなります。以降は、df.group_by("<agg_col_name>").sum("<value_col_name>").show()
などとすることで、処理を行えます。
Streamlit のメソッドで使う方法
実は、Streamlit の大部分のメソッドは、Snowpark DataFrame 及び Snowpark Pandas DataFrame に対応しており、そのまま入力することができます。
例えば、下記の様なものです。
df = session.table("snowflake_sample_data.tpch_sf1.orders")
st.dataframe(df.limit(10))
st.bar_chart(df.group_by("O_CUSTKEY").sum("O_TOTALPRICE"))
st.selectbox(label="選択", options=df.select("O_ORDERSTATUS").distinct())
ただし、Snowpark DataFrame で扱っているデータは必然的に大容量であることが多くなるため、その点は注意する必要があります。可視化する際は、なるべく limit(10)
や where
などにより、データ量を削減することを意識するようにしましょう。
新規列への代入操作
通常の Pandas DataFrame であれば、下記のように行えます。
df["new_col"] = df["col1"] + df["col2"]
しかし、Snowpark DataFrame では、下記のように with_column
メソッドを使って行う必要があります。
from snowflake.snowpark import functions as F
df = df.with_column("new_col", F.col("col1")+F.col("col2"))
実行タイミング(遅延評価)
Snowpark for Python の Snowpark DataFrame は、遅延評価という仕組みにより効率的なデータ処理を実現しています。
遅延評価とは、実際にデータを処理するコード(SQL クエリ)が実行されるのは「結果を取得する必要があるとき」に限られる評価方法です。
例えば、下記のようなメソッドを実行すると、遅延評価によりクエリが実行されます。
- df.collect()
- df.count()
- df.show()
- df.write.mode("overwrite").save_as_table()
- df.to_pandas()
よく使うのは、df.show()
ですかね。その他、Streamlit に入力すれば、中で to_pandas()
を実行してから可視化してくれます。
注意しておきたいポイントとして、df.collect()
とすると、実行結果をそのままダウンロードしてしまいます。それにより、想定よりも時間がかかってしまうことがあるので意識しておきましょう。
ちなみに、collect
関数がそのような挙動となるのは、返り値が List[Row()]
の形式になっているためです。反対に、show
関数の返り値は None
、count
関数の返り値は int
となっています。
キャッシュのさせ方が分からない
Streamlit では、@cache_data
を使って、データ処理などをキャッシュしておくことができます。しかし、Snowpark DataFrame は、一連の処理を保持するだけなので、通常の使用方法では、キャッシュしておくことができません。
そこで、Snowpark DataFrame の状態でキャッシュする方法を下記の記事にまとめておきました。気になったらぜひ読んでみてください!
おわりに
この記事では、Streamlit in Snowflake や Snowflake Notebooks を扱う際に重要な要素である、DataFrame 形式について紹介いたしました。
通常の Pandas DataFrame だけでなく、Snowpark DataFrame や Snowpark Pandas DataFrame があることをご理解いただけたでしょうか?Snowpark の DataFrame は、パフォーマンスにも優れますし、Streamlit との相性も非常に良いです。ぜひ意識的に活用していけるようにしましょう!
ただ、完全に使いこなすには、達人・玄人級の Snowpark の知識が必要になります。分からない部分は通常の Pandas で誤魔化してしまっても良いと思います。まずは一部だけでも、Snowpark DataFrame を適用していけるようになると良さそうですね。
また、SiS でアプリを開発してもらうチームにこういった DataFrame の種類を理解してもらうことが難しい場合は、通常の Pandas でも問題ないように、そもそも扱えるデータの容量を小さくしておくことや、フィルタしてから使ってもらうことなどを整備しておくのも手なのかもな、とも思いました。
そういうときに、Snowpark Pandas DataFrame が活きてくるんだろうな、と個人的には思います。そのため、Streamlit in Snowflake で使えるようになったタイミングで、ぜひ活用し始めてみてください。なお、Snowflake Notebooks では記事で紹介している通り、既に利用できます。
仲間募集
NTTデータ テクノロジーコンサルティング事業本部 では、以下の職種を募集しています。
1. クラウド技術を活用したデータ分析プラットフォームの開発・構築(ITアーキテクト/クラウドエンジニア)
クラウド/プラットフォーム技術の知見に基づき、DWH、BI、ETL領域におけるソリューション開発を推進します。
https://enterprise-aiiot.nttdata.com/recruitment/career_sp/cloud_engineer
2. データサイエンス領域(データサイエンティスト/データアナリスト)
データ活用/情報処理/AI/BI/統計学などの情報科学を活用し、よりデータサイエンスの観点から、データ分析プロジェクトのリーダーとしてお客様のDX/デジタルサクセスを推進します。
https://enterprise-aiiot.nttdata.com/recruitment/career_sp/datascientist
3.お客様のAI活用の成功を推進するAIサクセスマネージャー
DataRobotをはじめとしたAIソリューションやサービスを使って、
お客様のAIプロジェクトを成功させ、ビジネス価値を創出するための活動を実施し、
お客様内でのAI活用を拡大、NTTデータが提供するAIソリューションの利用継続を推進していただく人材を募集しています。
https://nttdata.jposting.net/u/job.phtml?job_code=804
4.DX/デジタルサクセスを推進するデータサイエンティスト《管理職/管理職候補》
データ分析プロジェクトのリーダとして、正確な課題の把握、適切な評価指標の設定、分析計画策定や適切な分析手法や技術の評価・選定といったデータ活用の具現化、高度化を行い分析結果の見える化・お客様の納得感醸成を行うことで、ビジネス成果・価値を出すアクションへとつなげることができるデータサイエンティスト人材を募集しています。ソリューション紹介
Trusted Data Foundationについて
~データ資産を分析活用するための環境をオールインワンで提供するソリューション~
https://www.nttdata.com/jp/ja/lineup/tdf/
最新のクラウド技術を採用して弊社が独自に設計したリファレンスアーキテクチャ(Datalake+DWH+AI/BI)を顧客要件に合わせてカスタマイズして提供します。
可視化、機械学習、DeepLearningなどデータ資産を分析活用するための環境がオールインワンで用意されており、これまでとは別次元の量と質のデータを用いてアジリティ高くDX推進を実現できます。
TDFⓇ-AM(Trusted Data Foundation - Analytics Managed Service)について
~データ活用基盤の段階的な拡張支援(Quick Start) と保守運用のマネジメント(Analytics Managed)をご提供することでお客様のDXを成功に導く、データ活用プラットフォームサービス~
https://www.nttdata.com/jp/ja/lineup/tdf_am/
TDFⓇ-AMは、データ活用をQuickに始めることができ、データ活用の成熟度に応じて段階的に環境を拡張します。プラットフォームの保守運用はNTTデータが一括で実施し、お客様は成果創出に専念することが可能です。また、日々最新のテクノロジーをキャッチアップし、常に活用しやすい環境を提供します。なお、ご要望に応じて上流のコンサルティングフェーズからAI/BIなどのデータ活用支援に至るまで、End to Endで課題解決に向けて伴走することも可能です。
NTTデータとDatabricksについて
NTTデータは、お客様企業のデジタル変革・DXの成功に向けて、「databricks」のソリューションの提供に加え、情報活用戦略の立案から、AI技術の活用も含めたアナリティクス、分析基盤構築・運用、分析業務のアウトソースまで、ワンストップの支援を提供いたします。NTTデータとTableauについて
ビジュアル分析プラットフォームのTableauと2014年にパートナー契約を締結し、自社の経営ダッシュボード基盤への採用や独自のコンピテンシーセンターの設置などの取り組みを進めてきました。さらに2019年度にはSalesforceとワンストップでのサービスを提供開始するなど、積極的にビジネスを展開しています。
これまでPartner of the Year, Japanを4年連続で受賞しており、2021年にはアジア太平洋地域で最もビジネスに貢献したパートナーとして表彰されました。
また、2020年度からは、Tableauを活用したデータ活用促進のコンサルティングや導入サービスの他、AI活用やデータマネジメント整備など、お客さまの企業全体のデータ活用民主化を成功させるためのノウハウ・方法論を体系化した「デジタルサクセス」プログラムを提供開始しています。
https://www.nttdata.com/jp/ja/lineup/tableau/
NTTデータとAlteryxについて
Alteryx導入の豊富な実績を持つNTTデータは、最高位にあたるAlteryx Premiumパートナーとしてお客さまをご支援します。
導入時のプロフェッショナル支援など独自メニューを整備し、特定の業種によらない多くのお客さまに、Alteryxを活用したサービスの強化・拡充を提供します。
NTTデータとDataRobotについて
NTTデータはDataRobot社と戦略的資本業務提携を行い、経験豊富なデータサイエンティストがAI・データ活用を起点にお客様のビジネスにおける価値創出をご支援します。
NTTデータとInformaticaについて
データ連携や処理方式を専門領域として10年以上取り組んできたプロ集団であるNTTデータは、データマネジメント領域でグローバルでの高い評価を得ているInformatica社とパートナーシップを結び、サービス強化を推進しています。
https://www.nttdata.com/jp/ja/lineup/informatica/
NTTデータとSnowflakeについて
NTTデータではこれまでも、独自ノウハウに基づき、ビッグデータ・AIなど領域に係る市場競争力のあるさまざまなソリューションパートナーとともにエコシステムを形成し、お客さまのビジネス変革を導いてきました。
Snowflakeは、これら先端テクノロジーとのエコシステムの形成に強みがあり、NTTデータはこれらを組み合わせることでお客さまに最適なインテグレーションをご提供いたします。