LoginSignup
3
4

More than 1 year has passed since last update.

IrisデータセットをSnowparkで操作してみた。(機械学習もしてるよ)

Last updated at Posted at 2022-10-04

Snowflakeが提供するSnowparkについて紹介する。なお、あくまでも個人として記述しております。

(久々に書こうと思った)背景・モチベーション

  1. Snowparkに関する日本語の記事がない
  2. 自分の頭の整理として記事にしておく

Snowparkとは?

Snowflakeが提供するライブラリで、データエンジニアやデータサイエンティストを対象にScala、Python(現在プレビュー)、Javaなどプログラミング言語に対応しながら、DataFramesのような使い慣れたプログラミング構造を使用し、より効果的にデータの操作が可能である。Snowparkは開発に力を入れていることもあり、2022/10月現在ではマイナーバージョンアップが1 or 2ヶ月に一度発生している状況である。

参考ドキュメント
  1. Snowpark API Refrence
  2. snowflake-snowpark-python

Snowflakeのベストプラクティスとは?(2022/10現在)

  1. モデルの学習にはSPROCを、予測/変換にはUDFを使用する
  2. UDFにモデル/パイプラインをロードするときには「cachetool」を使用する
  3. 予測/変換にはベクトル化されたUDFを使用する(データ量が大きい場合は特に)
  4. トレーニング/検証/テストデータセットをSnowflake内にテーブルとして保存する
    ML flow on snowflake.png

本記事で紹介する内容

  • 機械学習のサンプルデータとしてよく使用されるIrisデータセットをSnowparkで加工/MLモデルの作成/推論を実施する

処理の流れ

大まかな流れは下記の通りである

  1. 環境の作成
  2. IrisデータをSnowflake上にテーブルとして登録
  3. 機械学習モデルの作成・内部ステージへの書き込み(SPROC)
  4. 内部ステージからモデルの読み込み推論(UDF)

詳細

こちらのパートでは具体的なコードを示しながら、snowparkの関数やsnowflakeでの機械学習方法を説明していく。

1. 環境の作成

最初にcondaコマンドを使用し、環境の作成を実施する。私の環境は下記の通りである。

  • OSはMac(12.6)を使用
  • Snowparkのバージョンは0.11.0を使用(2022/10現在では0.11.0が最新)
  • Jupyter Notebookを使用
  • Condaはインストール済み

 ディレクトリ構成は下記の通りである。

任意のフォルダ
 ┝ creds.json
 ┝ environment.yaml
 └ predict_iris_species.ipynb

必要なパッケージ群を含んだYamlファイルを示す。こちらを任意のフォルダに保存し、condaコマンドで環境作成とアクティベートを行う。

environment.yml
name: snowpark_ml_test
channels:
  - snowflake
dependencies:
  - python=3.8
  - snowflake-snowpark-python
  - ipykernel
  - pyarrow
  - numpy
  - scikit-learn
  - pandas
  - joblib
  - cachetools

condaコマンドによる環境の作成

 conda env create -f environment.yml 

condaコマンドで環境の切り替え

conda activate snowpark_test

2. IrisデータをSnowflake上にテーブルとして登録

Irisデータをローカル上に取り込んだ後、そこからsnowflakeのテーブルオブジェクトとして登録する。

2-1. Irisデータの取得

 scikit-learnにあるdatasetsからIrisデータをロード後、説明変数と目的変数(species)を結合し、最後にPandasのデータフレームに変換している。

from sklearn import datasets
import pandas as pd

iris = datasets.load_iris()
iris_df = pd.DataFrame(iris.data, columns=iris.feature_names)
iris_df['species'] = [iris.target_names[i] for i in iris.target]
iris_df.sample(10)
2-2. Snowflakeへの接続

 次にSnowflakeに接続するため、Configファイルの作成と接続処理を示す。
下記の通り、creds.jsonというファイルを作成する。<>で括られた部分は自身の環境に併せて変更してほしい。

creds.json
{
   "username": "<YOUR_USERNAME>",
   "password": "<YOUR PASSWORD>",
   "sf_account": "<YOUR SNOWFLAKE ACCOUNT LOCATOR or REGIONLESS URL>",
   "sf_wh": "<YOUR WAREHOUSE>",
   "sf_db": "iris",
   "sf_schema": "public"
}

 sf_account欄では二つのURLが有効である。なお、どちらのパターンでも .snowflakecomputing.com を含んではならない。

  • アカウントロケーターを使用する場合:xyz1234.ap-northeast-1.aws
  • リージョンレスURLを使用する場合:sforg-takadaaccount

 Snowflakeにおけるアカウント識別子を理解したい場合、こちらを参考にしてほしい。
また、Private Linkを使用している場合はURLが異なるので注意する必要がある。

 接続のためのConfigファイルの準備が完了したため、Snowflakeにセッション接続を行う。

import json

with open('creds.json') as f:
    data = json.load(f)
    USERNAME = data['username']
    PASSWORD = data['password']
    SF_ACCOUNT = data['sf_account']
    SF_WH = data['sf_wh']
    SF_DB = data['sf_db']
    SF_SCHEMA = data['sf_schema']

CONNECTION_PARAMETERS = {
   "account": SF_ACCOUNT,
   "user": USERNAME,
   "password": PASSWORD,
   "database": SF_DB,
   "schema": SF_SCHEMA,
   "warehouse": SF_WH
}
session = session.builder.configs(CONNECTION_PARAMETERS).create()

 SessionBuliderクラスを使用するsessionオブジェクトが作成される。なお、Snowparkにおけるセッション作成にはSnowflake Connector for Pythonとは別のライブラリを使用している。参考ドキュメントはこちらである。

 現在がポイントされているデータベース/スキーマ/ウェアハウス/ロールなどのオブジェクトの確認方法は下記の通りである。

from snowflake.snowpark import version as v
snowpark_version = v.VERSION
print('Database                    : {}'.format(session.get_current_database()))
print('Schema                      : {}'.format(session.get_current_schema()))
print('Warehouse                   : {}'.format(session.get_current_warehouse()))
print('Role                        : {}'.format(session.get_current_role()))
print('Database                    : {}'.format(session.get_current_database()))
print('Snowpark for Python version : {}.{}.{}'.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))
2-3. Snowflake内にデータベース/テーブルオブジェクトの作成

Irisデータを格納するため、Snowflake側にデータベースとテーブルを用意する。

## データベースの作成
query = '''
create or replace database iris;
'''
session.sql(query).collect()

## テーブルの作成
query = ''' 
create or replace table iris(
  sepal_length float,	
  sepal_width float,
  petal_length float,
  petal_width float,
  specie string
);
'''
session.sql(query).collect()

 sqlメソッドを使用すると、直接SQLクエリを実行することが可能である。DataFrame APIが対応していない場合など、こちらのsqlメソッドを通して、直接クエリを発行する場合がある。
 最後のcollect()ではクエリの結果セットをArray型として返してくる。次に説明するshow()とは違い、全ての行を返してくることが違いとなる。

2-4. SnowflakeへIrisデータをロード

ローカルにあるIrisデータをSnowflakeのIrisテーブルにロードする。

iris_df.columns = [x.upper() for  x in ["sepal_length", "sepal_width", "petal_length", "petal_width", "specie"]]
snowpark_df = session.write_pandas(iris_df, "IRIS")
snowpark_df.show()

 具体的にはwrite_pandas()メソッドを使用し、第一引数にローカルにPandas Dataframe、第2引数に対象テーブルを指定する。ここには 2つの注意点 が存在しており、

  1. Pandas Dataframeのカラム名は大文字であること。
  2. write_pandasで指定する第2引数のテーブル名も大文字であること

と両ケースにおいて、Case Sensitiveであることに注意する必要がある。最後のshow()はSQLクエリでLimit 10と同等であり、ランダムに10行ほどデータを表示する関数である。

 また、上記のやり方は、予めDDLを用意後、データをロードしていたが、Pandas Dataframeから直接データをロードする別のやり方も紹介したい。個人的には上記のCase Sensitiveを気にしないのでいいので、こちらがお勧めである。

snowdf = session.createDataFrame(iris_df)
snowdf.write.mode("overwrite").saveAsTable("iris_2") 

 createDataFrame() もしくは create_dataframe()メソッドはSnowflake内にTemporary Tableを作成する。Temporary Tableとは、そのセッションが継続する限り存在する仮テーブルである。テーブルにおける列情報はPandas Dataframeにある列情報を参照している。 
 SnowflakeにTemporary Tableを作成後に、saveAsTable() もしくは save_as_table()を利用し、テーブルに変更している。注意点とすると、こちらのテーブルはPersistant Tableではなく、Transient Tableである。TransientやPersistant Tableの違いについて、こちらを参考にすること。

3. 機械学習モデルの作成・内部ステージへの書き込み(SPROC)

 こちらでは

  • ローカル上でのSnowpark Dataframeを使った機械学習モデルの作成
  • Stored Procedure(SPROC)として特徴量エンジニアリングとML学習ロジックの登録方法

について紹介したい。なお、機械学習としてIrisデータにおける種類(Species)をあてる分類問題とし、MLライブラリにscikit-learnRandomForestClassifierを使用する。

3-1. 学習/検証/テストデータへの分割

Snowparkによるデータの分割方法について示す。今回のケースでは学習:検証:テストを7:3:1の割合で分割をする。

weights = [0.7, 0.2, 0.1]
snowpark_df_train, snowpark_df_verification, snowpark_df_test = snowpark_df.random_split(weights, seed=82)

snowpark_df_train.write.mode("overwrite").saveAsTable("iris_train")
snowpark_df_verification.write.mode("overwrite").saveAsTable("iris_verification")
snowpark_df_test.write.mode("overwrite").saveAsTable("iris_test")

 具体的には2行目にあるSnowpark Dataframeにあるrandom_split()メソッドを使用し、データの分割を行なっている。第1引数にあるweightsという変数はリスト型で分割割合が保存されている。こちらは、最も有名なscikit-learnライブラリのtrain_test_split()と類似した関数である。
 また、Snowflakeのベストプラクティスとして、分割したデータはテーブルオブジェクトとして登録することを勧めているため、saveAsTable()で各種データをテーブルとして保存している。

3-2. ローカル上でのMLモデルの作成/検証

 こちらのパートでは、分割したデータを用いて、特徴量変換とMLモデルの作成を行なっている。先にいってしまうと、Snowparkは機械学習系のライブラリではないため、冒頭以外でSnowparkは出現せず、主にscikit-learnがメインのパートとなっている。

## 必要なライブラリの読み込み
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

## Snowpark DFからPandas DFに変換
df_train = snowpark_df_train.drop("SPECIE").to_pandas() # drop labels for training set
df_train_labels = snowpark_df_train.select("SPECIE").to_pandas()
df_verification = snowpark_df_verification.drop("SPECIE").to_pandas()
df_verification_labels = snowpark_df_verification.select("SPECIE").to_pandas()
df_test = snowpark_df_test.drop("SPECIE").to_pandas()
df_test_labels = snowpark_df_test.select("SPECIE").to_pandas()

## 特徴量エンジニアリング
num_attribs = list(df_train)
num_pipeline = Pipeline([
            ('std_scaler', StandardScaler()),
        ])
preprocessor = ColumnTransformer([
            ("num", num_pipeline, num_attribs)
        ])

## 機械学習モデルの指定 + パイプラインの作成
full_pipeline = Pipeline([
            ('preprocessor', preprocessor),
            ('model', RandomForestClassifier(n_estimators=10, criterion='gini', max_depth=5, random_state=100)),
        ])

## モデルの学習
full_pipeline.fit(df_train, df_train_labels)

## 検証データを用いてモデルの精度評価
predictions = full_pipeline.predict(df_verification)
print('Accuracy: %.2f' % accuracy_score(df_verification_labels, predictions))

 冒頭では機械学習に必要なライブラリ群をインポートしている。
 次にSnowflake DataframeからPandas Dataframeに変換している。変換する理由は、scikit-learnではPandas Dataframeにしか対応していないためである。具体的な箇所として、to_pandas() もしくは toPandas()関数を使用すると、データタイプの変換が可能となる。見た目上ではわからないが、Pandas Dataframeに変換した時にデータの所在が変わる。Snowflake Dataframeの場合、データはSnowflakeのサーバサイドに存在しているが、Pandas Dataframeに変換した瞬間にデータがローカル上に移ってきている。そのため、大量のデータをPandas Dataframeに変換する場合、ローカルでのOOM(Out of Memory)の問題が発生しないことに注意する必要がある。
 特徴量エンジニアリングのパートはsnowpark関連ではないので詳細は割愛するが、対象の4つの説明変数に対してStandardScaler()を適用し、データの標準化を行なっている。StandardScalerは簡単にいうと、平均が0、標準偏差が1なるようにデータを正規化してくれる関数である。これらをPreprocessorというオブジェクトに登録し、最終的にはPipelineメソッドを使い、full_pipelineというオブジェクトに機械学習モデルの学習パートと併せてパイプライン化している。モデルについては、PipelineメソッドでRandomForestClassifierを指定している。
 fit()メソッドでモデルを学習し、最後のパートで検証用データを使用し、モデルの精度検証を行なっている。

3-3. 学習ロジックをStored Procedureとしての登録

 こちらのパートでは、3-1と3-2で示した特徴量エンジニアリングともモデル学習を含むロジックをSnowflakeにStored Procedureとして登録する方法を説明する。

 まず最初に学習モデルを保存するための内部ステージを作成する。内部ステージにはsqlメソッドを使用して作成する。

query = "create or replace stage models" +\
        " directory = (enable = true)" +\
        " copy_options = (on_error='skip_file')"
        
session.sql(query).collect()

 下記コードが示す通り、Stored Procedureとして登録するコードを実行する。今回のStored Procedureはsave_file()train_model()の二つのメソッドから構成されている。save_file()メソッドは学習したモデルをSnowflakeの内部ステージに登録する。一方で、train_model()では3-1、 3-2で示した通り、特徴量変換とモデル学習を行うメソッドとなっている。train_model()については、既に説明していることからこのパートでの説明は割愛する。

## ライブラリの指定
import snowflake.snowpark
from snowflake.snowpark.functions import sproc
from snowflake.snowpark.session import Session

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

import io
import joblib

## リモート側(Snowflake)で必要なパッケージ群を指定する
session.add_packages('snowflake-snowpark-python', 'scikit-learn', 'pandas', 'numpy', 'joblib')

## 内部ステージに登録するためのメソッド
def save_file(session, model, path):
    input_stream = io.BytesIO()
    joblib.dump(model, input_stream)
    session._conn._cursor.upload_stream(input_stream, path) ## upload_streamメソッドで学習したモデルを内部ステージに登録している
    return "successfully created file: " + path

## モデル学習するメソッド
def train_model(session: snowflake.snowpark.Session) -> float:
    snowpark_df = session.table("IRIS") 

    ## データの分割
    weights = [0.7, 0.2, 0.1]
    snowpark_df_train, snowpark_df_verification, snowpark_df_test = snowpark_df.random_split(weights, seed=82)  # use seed to make the split repeatable

    # 分割したデータをテーブルとして保存
    snowpark_df_train.write.mode("overwrite").saveAsTable("iris_train")
    snowpark_df_verification.write.mode("overwrite").saveAsTable("iris_verification")
    snowpark_df_test.write.mode("overwrite").saveAsTable("iris_test")
    
    
    df_train = snowpark_df_train.drop("SPECIE").to_pandas() # drop labels for training set
    df_train_labels = snowpark_df_train.select("SPECIE").to_pandas()
    df_verification = snowpark_df_verification.drop("SPECIE").to_pandas()
    df_verification_labels = snowpark_df_verification.select("SPECIE").to_pandas()

    # パイプラインの作成
    num_attribs = list(df_train)
    num_pipeline = Pipeline([
            ('std_scaler', StandardScaler()),
        ])
    preprocessor = ColumnTransformer([
            ("num", num_pipeline, num_attribs)
        ])
    full_pipeline = Pipeline([
            ('preprocessor', preprocessor),
            ('model', RandomForestClassifier(n_estimators=10, criterion='gini', max_depth=5, random_state=100)),
        ])

    # 学習の実行
    full_pipeline.fit(df_train, df_train_labels)

    # モデルを内部ステージに保存
    save_file(session, full_pipeline, "@MODELS/predict_species.joblib")

    # 検証データを使用し、モデルの精度検証
    predictions = full_pipeline.predict(df_verification)
    accuracy = accuracy_score(df_verification_labels, predictions)
    return accuracy

# メソッドをStored Procedureとして登録
train_model_iris = sproc(train_model, replace=True)
session.add_packages('snowflake-snowpark-python', 'scikit-learn', 'pandas', 'numpy', 'joblib')

 まず、Stored Procedureで使うパッケージ群をadd_packeges()メソッドを使って指定します。add_packeges()ではリポジトリ内にあるサードパーティライブラリを参照することができる。通常、ライブラリとそのバージョンを指定することができるが、今回のケースではライブラリのみを指定している。バージョン指定がない場合、Snowflakeにあるリポジトリの中で最新のバージョンを自動的に指定する仕様となっている。
 仮にStored Procedureを登録する際にローカル環境とリモート環境でバージョンの違いがあると、下記のような警告メッセージが出力される。

The version of package snowflake-snowpark-python in the local environment is 0.11.0, which does not fit the criteria for the requirement snowflake-snowpark-python. Your UDF might not work when the package version is different between the server and your local environment

def save_file(session, model, path):
    input_stream = io.BytesIO()
    joblib.dump(model, input_stream)
    session._conn._cursor.upload_stream(input_stream, path) ## upload_streamメソッドで学習したモデルを内部ステージに登録している
    return "successfully created file: " + path

 次にsave_file()メソッドについて説明する。こちらは内部ステージに登録するためのメソッドとなっており、第1引数にSnowflakeのSessionBuilderオブジェクト、第2引数に保存すべきモデル(今回のケースだとfull_pipelineオブジェクト)、第3引数に内部ステージ内のパス情報を指定する。io.BytesIOオブジェクトやjoblib.dumpメソッドを使い、作成されたモデルをバイトコードとして出力後、upload_stream()メソッドを使用して、マシン上のメモリにあるバイトコードのモデルをSnowflakeの内部ステージに登録している。upload_stream()の第1引数には保存対象のモデルと第2引数に内部ステージのPath(今回のケースだと"@MODELS/predict_species.joblib")を指定している。

save_file(session, full_pipeline, "@MODELS/predict_species.joblib")

 train_model()については詳細は省くが、学習モデルの作成しているfit()の後にsave_file()メソッドを呼び出し、内部ステージにモデルを保存している

train_model_iris = sproc(train_model, replace=True)

 
 最後に、このパートにおける最も大事な点であるStored Procedureへの登録方法について説明する。上記にある通り、sproc()に対象となるメソッドを指定すると、Stored ProcedureとしてSnowflake側に登録される。なお、Stored Procedureへの登録方法はいくか存在しており、session.sproc.registerメソッドの使用や登録したいメソッドの冒頭にアノテーション@sprocという付与するだけでも登録が可能となっている。Stored Procedureの登録方法に関する詳細は、こちらを参考にして貰いたい。

3-4. Stored Procedureの実行

 ローカル上で以下のメソッドを実行すると、リモート(Snowflake)のコンピュートを使用して、モデルの学習から内部ステージへの登録までを実施する。学習時にローカルのコンピュートリソースを全く使用していない。

train_model_iris()

 以下のコマンドを実行すると、MODELSという内部ステージにモデルが登録されていることを確認することができる。

query = 'ls @MODELS/'
session.sql(query).collect()

4. 内部ステージからモデルの読み込み、テストデータに推論を実施

 こちらの章では、3章で作ったモデルを読み込んだ後にテストデータに適用し、推定結果を得る方法を示す。

4-1. 推論用UDFの作成

 以下のコードは、内部ステージからモデルを読み込むためのread_file()メソッドと実際に推論を行うpredict()メソッドから構成されている。

import sys
import cachetools
import os
from snowflake.snowpark.functions import udf

## 内部ステージからモデルを指定
session.add_import("@MODELS/predict_species.joblib")  
session.add_packages('snowflake-snowpark-python', 'scikit-learn', 'pandas', 'numpy', 'joblib', 'cachetools')

## 内部ステージから当該モデルを読み込み
@cachetools.cached(cache={})
def read_file(filename):
       import_dir = sys._xoptions.get("snowflake_import_directory")
       if import_dir:
              with open(os.path.join(import_dir, filename), 'rb') as file:
                     m = joblib.load(file)
                     return m

## 推論部分をUDFとして登録
@udf(name="iris", is_permanent=True, stage_location="@MODELS", replace=True)
def predict (SEPAL_LENGTH:float, SEPAL_WIDTH:float, PETAL_LENGTH:float, PETAL_WIDTH:float) -> str:
       m = read_file('predict_species.joblib')    
       df = pd.DataFrame([locals()], columns = ["SEPAL_LENGTH", "SEPAL_WIDTH", "PETAL_LENGTH", "PETAL_WIDTH"])
       return m.predict(df)[0]
session.add_import("@MODELS/predict_species.joblib")  

 まず、内部ステージにあるモデルを指定する。add_import()はUDF内で使用するファイルを内部ステージ上からインポートするメソッドである。今回のケースでは、モデルを読み込んでいるが、テキストファイルなどの通常のファイルオブジェクトでも読み込むことができる。詳細はこちらを参考にしてほしい。

 また、read_file()ではインポートされたモデルが格納されているPath情報から実際にjoblib.load()で展開している。メソッドの冒頭にある@cachetools.cachedでは内部ステージから何回もモデルを読み込まないように、キャッシュとして保持している。

@udf(name="iris", is_permanent=True, stage_location="@MODELS", replace=True)

 最後にpredict()ではread_file()で対象モデルを読み込んだ後に、入力データに対してモデル適用し、結果を返すメソッドとなっている。メソッドの冒頭にはsproc()では使われていなかったアノテーションが使用されており、これにより、自動的にUDFとして登録することが可能である。(Stored Procedureでもアノテーションの使用は可能)

4-2. 推論用UDFの実行

このパートでは4-1で登録したUDFとテストデータを使い、推論を実施する。

from snowflake.snowpark import functions as F

## テストデータの読み込み
snowpark_df_test = session.table('iris_test') 

## 目的変数のDrop
inputs = snowpark_df_test.drop('SPECIE') 

## 推論
snow_results = snowpark_df_test.select( 
    *inputs, 
    predict(*inputs).alias('Prediction'), 
    (F.col('SPECIE')).alias('Actual_label')
)

## 結果の表示
snow_results.show()

 最初に登録されているテストデータを読み込んだ後、目的変数であるSPECIE列を削除する。その後に、snowpark_df_test.select()メソッド内でpredict()UDFを使用していることがわかる。*inputsとはSPECIE列以外の説明変数である4列を指定している。predict()内には説明変数である*inputsが入力され、alias()で列名を変更している。最後に実際の正解データであるSPECIE列をFuctionオブジェクトを介して、選択している。

(おまけ) 4-3 Verctized UDFの登録/利用

 Vectrized UDFについて説明する。上記のUDFでは1行ずつ計算しているため計算効率が良くなかった。Vertrized UDFではPandas Dataframe/Seriesを対象にUDFにバルクロードすることが可能であり、計算効率が改善することがわかっている。以下に登録方法を記述する。

from snowflake.snowpark import types as T

@pandas_udf(max_batch_size=100)
def predict_batch(df: T.PandasDataFrame[float, float, float, float]) -> T.PandasSeries[str]:
       m = read_file('predict_species.joblib') 
       df.columns = ["SEPAL_LENGTH", "SEPAL_WIDTH", "PETAL_LENGTH", "PETAL_WIDTH"]
       return m.predict(df)

 まずアノテーションが先ほどの@udfから@pandas_udfに変更されていることがわかる。オプションの中にあるmax_batch_sizeとは一回のバッチに読み込む行数を指定することができる。こちらにある通り、Vertorized UDFには1分間の時間制限があることから、このバッチサイズを大きくしすぎないのも注意事項である。
 次にT.PandasDataFrameというデータタイプを入力/出力していることがわかる。繰り返しにはなるが、Vectorized UDFはPandas Dataframeにのみ有効なUDFであるため、List型のような他のデータタイプで入力することはできない。メソッド内は通常のPandas Dataframeを扱うのと同等の記述で問題がない。

 実行方法についても記載しておく。内容は既に4-2で説明した通りであり、predict()の部分がpredict_batch()に変わっているだけである。Vectrized UDFを使用する場合、入力されたデータをSnowflake側で自動でPandas Dataframeに変換し、メソッドに入力していることがわかる。(ユーザ側でデータ型を意識する必要はない)

from snowflake.snowpark import functions as F
snowpark_df_test = session.table('iris_test')
inputs = snowpark_df_test.drop('SPECIE')
snow_results = snowpark_df_test.select(
        *inputs, 
        predict_batch(*inputs).alias('Prediction'),  ## ここだけが変更点
        (F.col('SPECIE')).alias('Actual_label'))
snow_results.show()

最後

 今回はIrisデータセットを使用し、Snowflake上での特徴量変換/MLモデルの学習/推論方法について、説明した。個人的な所感としては商用運用する上ではデバックがしにくいことなどから、まだかなという印象ではあるが、特に弱かったモデルの学習をSnowflake上でできるようになり、データサイエンスにおける全ての操作がSnowflake内部で完結するようになったことは素晴らしいことだと感じる。
 今回はIrisデータにおける一連の流れににフォーカスを充てたため、snowflakeに関係ないライブラリ(sklearnとか)が大量に出てきてしまったこともあり、(特に初心者には?)わかりずらさも多かったのかなと感じる。(文字も多かったしな・・・)
今後はsnowparkにフォーカスをあて、その概要や機能の紹介、細かなチートシート的なことも記述できればと思う。また、もう少し個人的なナレッジが出てきたらStreamlitとの連携方法やTips的なことも挙げていきたい。

コード全体

1セクションをJupyter Notebookの1セルで実行できます。

## Irisデータのロード
from sklearn import datasets
import pandas as pd

iris = datasets.load_iris()
iris_df = pd.DataFrame(iris.data, columns=iris.feature_names)
iris_df['species'] = [iris.target_names[i] for i in iris.target]
iris_df.sample(10)
## Snowflakeへの接続
import json
from snowflake.snowpark.session import Session

with open('creds.json') as f:
    data = json.load(f)
    USERNAME = data['username']
    PASSWORD = data['password']
    SF_ACCOUNT = data['sf_account']
    SF_WH = data['sf_wh']
    SF_DB = data['sf_db']
    SF_SCHEMA = data['sf_schema']

CONNECTION_PARAMETERS = {
   "account": SF_ACCOUNT,
   "user": USERNAME,
   "password": PASSWORD,
   "database": SF_DB,
   "schema": SF_SCHEMA,
   "warehouse": SF_WH
}
session = Session.builder.configs(CONNECTION_PARAMETERS).create()
## データベースの作成
query = '''
create or replace database iris;
'''
session.sql(query).collect()

## テーブルの作成
query = ''' 
create or replace table iris(
  sepal_length float,	
  sepal_width float,
  petal_length float,
  petal_width float,
  specie string
);
'''
session.sql(query).collect()
## テーブルへの投入
iris_df.columns = [x.upper() for  x in ["sepal_length", "sepal_width", "petal_length", "petal_width", "specie"]]
snowpark_df = session.write_pandas(iris_df, "IRIS")

## 別のテーブル作成方法
snowdf = session.createDataFrame(iris_df)
snowdf.write.mode("overwrite").saveAsTable("iris_2") 
## Modelステージの作成
query = "create or replace stage models" +\
        " directory = (enable = true)" +\
        " copy_options = (on_error='skip_file')"
        
session.sql(query).collect()
## Stored Procedureの作成
import snowflake.snowpark
from snowflake.snowpark.functions import sproc
from snowflake.snowpark.session import Session

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

import io
import joblib

## リモート側で必要なパッケージ群を指定する
session.add_packages('snowflake-snowpark-python', 'scikit-learn', 'pandas', 'numpy', 'joblib')

def save_file(session, model, path):
    input_stream = io.BytesIO()
    joblib.dump(model, input_stream)
    session._conn._cursor.upload_stream(input_stream, path)
    return "successfully created file: " + path

def train_model(session: snowflake.snowpark.Session) -> float:
    snowpark_df = session.table("IRIS") 

    ## データの分割
    weights = [0.7, 0.2, 0.1]
    snowpark_df_train, snowpark_df_verification, snowpark_df_test = snowpark_df.random_split(weights, seed=82)  # use seed to make the split repeatable

    # 分割したデータをテーブルとして保存
    snowpark_df_train.write.mode("overwrite").saveAsTable("iris_train")
    snowpark_df_verification.write.mode("overwrite").saveAsTable("iris_verification")
    snowpark_df_test.write.mode("overwrite").saveAsTable("iris_test")
    
    
    df_train = snowpark_df_train.drop("SPECIE").to_pandas() # drop labels for training set
    df_train_labels = snowpark_df_train.select("SPECIE").to_pandas()
    df_verification = snowpark_df_verification.drop("SPECIE").to_pandas()
    df_verification_labels = snowpark_df_verification.select("SPECIE").to_pandas()

    # パイプラインの作成
    num_attribs = list(df_train)
    num_pipeline = Pipeline([
            ('std_scaler', StandardScaler()),
        ])
    preprocessor = ColumnTransformer([
            ("num", num_pipeline, num_attribs)
        ])
    full_pipeline = Pipeline([
            ('preprocessor', preprocessor),
            ('model', RandomForestClassifier(n_estimators=10, criterion='gini', max_depth=5, random_state=100)),
        ])

    # 学習の実行
    full_pipeline.fit(df_train, df_train_labels)

    # モデルを内部ステージに保存
    save_file(session, full_pipeline, "@MODELS/predict_species.joblib")

    # 検証データを使用し、モデルの精度検証
    predictions = full_pipeline.predict(df_verification)
    accuracy = accuracy_score(df_verification_labels, predictions)
    return accuracy

# Create an instance of StoredProcedure using the sproc() function
train_model_iris = sproc(train_model, replace=True)
## 学習の実行
train_model_iris()

## 登録されているかの確認
query = 'ls @MODELS'
session.sql(query).collect()
## 推論用UDFの登録
import sys
import cachetools
import os
from snowflake.snowpark.functions import udf
session.add_import("@MODELS/predict_species.joblib")  
session.add_packages('snowflake-snowpark-python', 'scikit-learn', 'pandas', 'numpy', 'joblib', 'cachetools')

@cachetools.cached(cache={})
def read_file(filename):
       import_dir = sys._xoptions.get("snowflake_import_directory")
       if import_dir:
              with open(os.path.join(import_dir, filename), 'rb') as file:
                     m = joblib.load(file)
                     return m

@udf(name="iris", is_permanent=True, stage_location="@MODELS", replace=True)
def predict (SEPAL_LENGTH:float, SEPAL_WIDTH:float, PETAL_LENGTH:float, PETAL_WIDTH:float) -> str:
       m = read_file('predict_species.joblib')    
       df = pd.DataFrame([locals()], columns = ["SEPAL_LENGTH", "SEPAL_WIDTH", "PETAL_LENGTH", "PETAL_WIDTH"])
       return m.predict(df)[0]
## UDFを実行し、推論の実施
from snowflake.snowpark import functions as F
snowpark_df_test = session.table('iris_test')
inputs = snowpark_df_test.drop('SPECIE')
snow_results = snowpark_df_test.select(*inputs, predict(*inputs).alias('Prediction'), (F.col('SPECIE')).alias('Actual_label'))
snow_results.show()
## Vectorized UDFの登録
import pandas
import sys
import cachetools
import os
from snowflake.snowpark.functions import pandas_udf
from snowflake.snowpark import types as T

session.add_import("@MODELS/predict_species.joblib")  
@cachetools.cached(cache={})
def read_file(filename):
       import_dir = sys._xoptions.get("snowflake_import_directory")
       if import_dir:
              with open(os.path.join(import_dir, filename), 'rb') as file:
                     m = joblib.load(file)
                     return m

@pandas_udf(max_batch_size=100)
def predict_batch(df: T.PandasDataFrame[float, float, float, float]) -> T.PandasSeries[str]:
       m = read_file('predict_species.joblib') 
       df.columns = ["SEPAL_LENGTH", "SEPAL_WIDTH", "PETAL_LENGTH", "PETAL_WIDTH"]
       return m.predict(df)
## UDFを実行し、推論の実施
from snowflake.snowpark import functions as F
snowpark_df_test = session.table('iris_test')
inputs = snowpark_df_test.drop('SPECIE')
snow_results = snowpark_df_test.select(*inputs, predict_batch(*inputs).alias('Prediction'), (F.col('SPECIE')).alias('Actual_label'))
snow_results.show()
3
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
4