こんにちは。データエクスペリエンス推進部の @shun_173 です。
この記事は、Supershipグループ Advent Calendar 2023の5日目の記事になります。
TL;DR
Snowparkを気軽に使って痒いところに手を届かせよう!という内容です。
- Snowflakeの管理において困ったこと
- それを手軽に解消できるSnowparkの紹介
- 簡単な使い方とサンプルコード
はじめに
私の所属する部署では、データ分析基盤のガバナンス維持やリソース管理をしています。
私の主な役割は、Snowflakeでのリソース管理やデータパイプラインの構築などです。
今回は、その業務の中でSnowflakeのリソースの棚卸しをする際に抱えていたお困りごとを、Snowparkという概念を用いて楽にしたという話をします。
Snowflakeのリソース棚卸し
我々の定常的な業務の一つに、データ分析基盤のリソースの定期的な棚卸しというものが存在します。
ここでのリソースとは、例えばSnowflakeであれば「ユーザー」「ロール」などが該当します。
ユーザーの棚卸しであれば、分析業務のためにSnowflakeのユーザーを作成している社員が、異動や退職などをした際に、不要となったSnowflakeユーザーを削除するといったことをします。
ロールの棚卸しは、ユーザーに付与されているロールについて、同様に不要となったものを削除します。
弊社のSnowflake環境では、分析業務を行うプロジェクトごとにロールを作成し、それをユーザーに付与することによってデータへのアクセス権限の制御をしています。そのため、この棚卸し作業をしないと、せっかくの権限制御の仕組みが無意味になってしまいます。
棚卸しの作業方法は、
- Snowflakeからユーザーとロールの一覧を取得する
- 社内で作成している社員情報のスプレッドシートと突き合わせる
- その結果、現在の在籍が確認できなかったユーザー/ロールを削除する
という流れになります。
Snowflake棚卸しのお困りごと
この棚卸しの作業の中で、とても困っていたのが「Snowflakeからユーザーとロールの一覧を取得するのがとても面倒」というものでした。
作りたい一覧のイメージ
このような形式のデータにして、最終的にはスプレッドシートにコピーして突合させていました。
| Role | User |
|---|---|
| PROJECT_A_USER | shun.inami |
| PROJECT_B_USER | shun.inami |
| PROJECT_C_USER | shun.inami |
| PROJECT_A_USER | hoge.fuga |
形式としては単純で、ロール名とユーザー名のセットのデータを出すだけです。
これまでのデータ作成アプローチ
直感的にはこのくらいのデータなら簡単に取得できそうですが、実際にやってみるととても面倒でした。
Snowflakeでクエリを用いてユーザーやロールの一覧を取得する場合には、show rolesやshow grants ~といったクエリを発行します。
具体的には、まずはSnowflake上のロールの一覧を取得します。
show roles;
続いて、取得したロール名を順番に以下のクエリに当てはめて実行していきます。
show grants of role {role_name};
これの出力をスプシにコピーしていきます。
しんどいポイント
この方法ですが、例えばshow rolesによって得られたロールの数が100だったとすると、その後のshow grants→スプシにコピペの流れを100回やらなければいけないことになります。
考えただけで身体中をめんどくさいが駆け巡りますね。
show ~という形式のクエリは、各構文の仕様によりますが、出力するカラムの指定やWHEREでの詳細な条件指定といったことができません。他のテーブルとjoinするということももちろん不可能です。そのままでは仕様通りの結果しか出力することができないのです。
(その代わり、show ~系のクエリはウェアハウス不要で実行できる、というトレードオフだったと記憶しています)
このため、仕様通りの出力からSnowflakeの外側でデータの加工といったことをすることになり、そこにめんどくささが生じているということになります。
ただ、クエリの条件指定や出力カラムの指定をするのであればresult_scan()という関数を用いることによって可能だったりします。しかし、その引数にshow ~をした時の query id が必要になってくるので、手動実行でサクッとやりたいこの場面ではあまり相性が良くなさそうです。
https://docs.snowflake.com/ja/sql-reference/functions/result_scan
なんとか、1回のクエリ実行で完成系のデータセットになって出力されて、コピペ一発で終わらせたいところですね。
Snowparkという選択肢
このお困りごとの解決のためのソリューションとして、Snowparkを用いました。
Snowparkは、Snowflakeのウェアハウス上でPythonコードを実行でき、pandasやpysparkなどで用いられるデータフレームのようにSnowflakeのデータをソースコードで加工などができるというものです。
(Python以外にもJavaやScalaでもコードが書けるようです)
開発者ガイド:https://docs.snowflake.com/ja/developer-guide/snowpark/index
これを用いれば、show ~クエリとresult_scan()関数の連続的な実行をPythonによって制御することで、一発で要求するデータセットを作成してくれる処理を開発できそうです。
使ってみた
ということで、SnowparkのPythonバージョンを用いて要求のデータセットを一発で作成するような処理を開発してみました。
今回はサクッと開発→実行を進めるため、Snowflakeの新バージョンのコンソールであるSnowsight上でコードを記述し、そのまま実行する方法で開発しました。
(Snowparkクライアントを導入して手元のPython環境にて動作させることも可能です)
始め方
(詳細は↓の公式ドキュメントを参照ください)
https://docs.snowflake.com/ja/developer-guide/snowpark/python/python-worksheets#prerequisites-for-python-worksheets
Snowsightで開発を始める前の前提条件として、アカウント単位でのAnacondaの利用規約同意をすませておかなければいけません。ACCOUNTADMINの権限での操作が必要ですので、アカウントの管理者へご依頼してください。
開発を始めるには、Worksheetsの画面でNew Worksheetを選択し、続いてPython Worksheetを選択します。すると新しくSnowpark Python用のWorksheetが作成され、すでにテンプレートが埋め込まれた状態となっています。
あとは、そのテンプレートのdef main(session: snowpark.Session):の中にデータ操作の処理を記載していきます。
基本的な書き方
Snowparkの基本的な要素としてsnowpark.sessionがあります。これは、Sparkのsessionと同じようなイメージであり、Snowflakeのデータベースへアクセスする際に使用されます。
もっとも汎用的な使い方は、
session.sql('SELECT * FROM hogehoge')
のように、sql()関数を用いてSQLを実行するものです。
このsql()関数を実行すると、戻り値としてDataFrameが返却されます。
これはクエリの結果などのテーブル形式のデータを仮想的に表現し、プログラムによって条件指定などのデータ操作を可能とするものです。pandasやpysparkなどのDataFrameと同義の立ち位置です。
https://docs.snowflake.com/ja/developer-guide/snowpark/python/working-with-dataframes
他のライブラリのDataFrameと同様に、実際にデータソースに対してクエリを実行するタイミングはsql()関数の実行時ではなく、そのDataFrameの条件指定が完了し実際に結果が出力されるタイミングとなります。(遅延評価と呼びます)
ただ、この遅延評価のための内部的なロジックで前述のresult_scan()関数を用いているようで、そこの仕様の兼ね合いで意図した結果が得られないようなパターンがshow ~まわりであったので、ちょっと注意が必要です。
# この時点ではまだ実行されない
df = session.sql('SELECT id, name FROM employee')
# 実際に結果データが必要になったタイミングでコンパイルされたクエリが実行される
df.filter(col('id') == 1).show
SnowsightのPythonワークシートでは、テンプレートのmain()関数のreturnとして、このDataFrameを返却すると、そのデータセットを実行結果として出力してくれます。
簡単に始められますし、書く内容も既存の類似ライブラリの扱い方を踏襲しているので、サクッと本質的な部分に集中して開発ができて良いと思います。
開発したコード
今回開発したコードはこちらになります。
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
def main(session: snowpark.Session):
# まずはshow roles でロールの一覧を取得し、ユーザーがアサインされているロール名のみに絞り込む
session.sql('show roles').collect()
role_names = session.sql('select "name" from table(result_scan(last_query_id())) where "assigned_to_users" >= 1').collect()
# ロール名をもとにそのロールに属するユーザー名の一覧を取得し、ロールごとの結果を全てマージして出力する
# Snowparkではdataframeのクエリは遅延評価されるため、最後のunion_dfで全ての結果をマージして出力するためには、そのクエリを最後に一発で流すようにしないといけない。
# なので、「show grants -> result_scan -> show grants ...」という流れでの実装ができない。
# そのため、ループを2つに分けて、先にRole全てのぶんのshow grantsを実行してから、result_scanでlast_query_id()で取るクエリを制御している。
for role_name_row in role_names:
role_name = role_name_row["name"]
show_grant_sql = F'show grants of role {role_name}'
session.sql(show_grant_sql).collect()
union_df = None
for i in range(len(role_names), 0, -1):
# snowflakeのlast_query_id()は、引数に負の値を設定することで、数回前のクエリIDを取得できる
# role_namesの配列の長さの分だけ過去に遡ったクエリを取得すれば、そこが一番最初に実行したshow grantsということになる
# そこから-1まで順番に実行すれば、全てのshow grantsを拾ってカラムを絞ってunionできる
grants_sql = F'select "role", "grantee_name" from table(result_scan(last_query_id(-{i})))'
grants_df = session.sql(grants_sql)
if not union_df:
union_df = grants_df
else:
union_df = union_df.union_all(grants_df)
return union_df
処理の流れとしては
- 1人以上ユーザーに権限がGrantされているロール名の一覧を
show rolesで取得する - 取得したロール名を
show grants of role {role_name}に当てはめて全て連続で実行する - 実行結果を順番に
result_scan()し、ロール名とユーザー名を取得してデータセットをUNIONする
という感じになっています。
show ~とresult_scan()を組み合わせ、まあまあの力技で実装しました。
ハマったポイント
開発時にハマったポイントとして、「DataFrameのSQLコンパイルがshowに対応できていない」というものがありました。
最初の処理で「1人以上ユーザーに権限がGrantされているロール名の一覧をshow rolesで取得する」がありますが、最初は以下のようなコードでこれを実現しようとしていました。
roles_df = session.sql('show roles')
roles_df.filter(col("assigned_to_users") >= 1).show()
ですが、実行すると「assigned_to_users というカラムはない(意訳)」というようなメッセージのエラーが発生しました。
エラーログに記載されていたクエリを見ると、
select * from (result_scan('show roles した時のクエリID')) where assigned_to_users >= 1
となっていました。
どうやら、result_scan({show roles})では、出力カラムの指定ができないといったshow ~の特性を引き継いでしまうような挙動のようでした。
そこで、以下のように変更すると意図した挙動になりました。
session.sql('show roles').collect()
role_names = session.sql('select "name" from table(result_scan(last_query_id())) where "assigned_to_users" >= 1').collect()
DataFrameの仕様では意図したクエリにならないので、最初のshow rolesはDataFrameにはせずにすぐさまクエリ実行させるようにしました。
任意のタイミングでクエリを実行したい場合には、collect()を使用します。
https://docs.snowflake.com/en/developer-guide/snowpark/python/working-with-dataframes#executing-sql-statements
また、show rolesの結果を用いてフィルタリングをするために、table(result_scan(last_query_id()))を用いて一度普通のテーブル形式のデータとしてからWHEREでの条件指定をしています。
last_query_id()は、現在のsession内ですでに発行されたクエリのquery_idを取得します。引数に何回前のidを取得するかを指定できるのですが、それを省略すると前回のidを取得します。
これによって、show rolesの結果をフィルタリングして取得することができました。
このデバッグの中でDataFrameの遅延評価の仕組みや、result_scan()とlast_query_id()の挙動について深く理解できたことで、その後のshow grants of role ~の部分のアルゴリズム開発もスムーズに進めることができました。
これを実行して、求めていたデータセットを一発で取得することができるようになりました!
おわりに
Snowparkの使い方として、今回のお困りごとを解消するために使えた技術というのはまだまだ小さな範囲だなと思っています。
もっと大きな可能性のある領域だと思うので、まだまだ数多く残るSnowflake運用周りのお困りごと解消の切り札として扱っていければなと思います。
最後に宣伝です。
Supershipではプロダクト開発やサービス開発に関わる人を絶賛募集しております。
ご興味がある方は以下リンクよりご確認ください。
Supership 採用サイト
是非ともよろしくお願いします。