5
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

SnowflakeAdvent Calendar 2022

Day 12

Snowflake上で画像分類タスクを実行してみた

Last updated at Posted at 2022-12-12

データレイク上の非構造データを対応すべく、Snowparkは日々アップデートがなされている。この記事では、Snowflake上にある画像データを対象に、Snowflakeのウェアハウスを使用し、画像分類タスクを実行する。

内容

Snowpark(Python UDF)を使用し、内部ステージにある画像ファイルに対して、画像分類タスクを実施する。
Screenshot 2022-12-09 at 21.50.09.png

購読の対象者

コードレベルで説明するため、開発者寄りの記事となっている。もし、Snowflakeにおける非構造データの扱い方など概要レベルで知りたい場合には、こちらの記事を読まれることをお勧めする。

個人的な所感

  • 今までは非構造データにおけるメタデータの管理しかできなかったが、ステージ上のファイルオブジェクトを処理できるように拡張され、データレイクとウェアハウスの統合に向けた第一歩のように感じた
  • Transformersライブラリが便利で面白い
  • 性能面が気になった。今回のケースでDeep Learningベースの処理であるため、GPUインスタンスなどの下回りの変革も必要になる

前提条件

  • Snowparkのバージョンは1.0を使用
  • Jupyter Notebookを使用
  • Python UDFからステージ上にあるファイルオブジェクトを取得するため、Private Preview(PrPr)を使用している
    • 必要な場合、営業の人にお声掛けください

ディレクトリ構成

├── helper_functions
│   └── snowpark_image_plotting.py
├── image_classification.ipynb
├── images
│   ├── bird1.jpeg
│   ├── bird2.jpeg
│   ├── bird3.jpeg
│   ├── cat1.jpeg
│   ├── cat2.jpeg
│   ├── cat3.jpeg
│   ├── dog1.jpeg
│   ├── dog2.jpeg
│   └── dog3.jpeg
└── snowflake_connection.json

詳細

 まず処理の大まかな流れは以下となっている。

  1. 環境設定
  2. Snowflakeへの接続
  3. ローカルにあるファイル群を内部ステージにアップロード
  4. Snowpark APIを通じて、画像分類モデルをPython UDFとして登録
  5. UDFを実行し、Snowflake上で画像分類を実施

 なお、記事が長くなってしまうため、「1.環境設定」や「2.Snowflakeへの接続」に関する詳細な説明は割愛する。これらに関する詳細情報として参考を記載しているため、こちらを参照することとする。

1. 環境設定

 condaなどを使用し、必要なパッケージをインストールする。参考として、こちらを掲載しておく。なお、環境設定において注意すべき点として、いくつかのライブラリのバージョンには気をつけて頂きたい。具体的には、以下の通りである。理由として、UDFで登録する際にSnowflake側の環境とローカルの環境でのバージョンを一致させるためである。

Jupyter Notebook
%pip install pillow==9.2.0 torch==1.8.1, transformers==4.14.1, cachetools==4.2.2

2. Snowflakeへの接続

 参考として、こちらを掲載しておく。snowflake_connection.json に接続に必要な情報を記載している。また、helper_functionsフォルダ下にあるsnowpark_image_plotting.pyのコードは記載するため、実行する際にはご自身の環境にコピーして頂きたい。

Jupyter Notebook
###############################
## Preparing our environment ##
###############################
import json
from glob import glob
from PIL import Image, ImageFile
import pandas as pd
from cachetools import cached

from snowflake.snowpark.session import Session
import snowflake.snowpark.functions as F
import snowflake.snowpark.types as T
from snowflake.snowpark.functions import udf
from helper_functions.snowpark_image_plotting import show_images_from_df, show_classifications_from_df

# Reading Snowflake Connection Details
snowflake_connection_cfg = json.loads(open('./snowflake_connection.json').read())

# Creating Snowpark Session
session = Session.builder.configs(snowflake_connection_cfg).create()
snowpark_image_plotting.py
import pandas as pd
import matplotlib.pyplot as plt
from skimage import io
from skimage.transform import resize
from textwrap import wrap
import numpy as np
import snowflake.snowpark.functions as F

def show_images_from_df(df, url='RELATIVE_PATH', stage='@', title='RELATIVE_PATH', ncol=2, max_images=10, resize_shape=None, figsize=None):
    df = df.with_column('PRESIGNED_URL', F.call_builtin('GET_PRESIGNED_URL', stage, F.col(url)))
    df = df.sample(n=max_images).toPandas()
    # Create a Presigned-URL for image-retrieval
    nimages = len(df)
    if nimages > ncol:
        nrow = nimages // ncol + 1
    else:
        nrow = 1
        ncol = nimages
    if figsize is None:
        figsize = (16, 16 // ncol * nrow)
    fig = plt.figure(figsize=figsize)
    for i in range(nimages):
        image = io.imread(df['PRESIGNED_URL'][i])
        if resize_shape is not None:
            image = resize(image, resize_shape)
        if title is not None:
            label = df[title][i]
        else:
            label = 'N/A'
        ax = fig.add_subplot(nrow, ncol, i + 1)
        ax.set_title('{}'.format(label))
        plt.imshow(image)
        plt.xticks([]), plt.yticks([])
    plt.show()

def show_classifications_from_df(df, url='RELATIVE_PATH', stage='@', title='RELATIVE_PATH', classifications='CLASSIFICATIONS', max_images=10, resize_shape=(200,200)):
    df = df.with_column('PRESIGNED_URL', F.call_builtin('GET_PRESIGNED_URL', stage, F.col(url)))
    df = df.select(list(set([url, title, classifications, 'PRESIGNED_URL']))).sample(n=max_images).toPandas()
    nimages = len(df)
    for i in range(nimages):
        image = io.imread(df['PRESIGNED_URL'][i])
        if resize_shape is not None:
            image = resize(image, resize_shape)
        if title is not None:
            label = df[title][i]
        else:
            label = 'N/A'
        fig = plt.figure(figsize=(12, 5))
        ax1 = fig.add_subplot(1, 2, 1)
        ax1.set_title('{}'.format(label))
        ax1.imshow(image)
        ax1.axis('off')
        ax2 = fig.add_subplot(1, 2, 2)
        labels = pd.DataFrame(eval(df['CLASSIFICATIONS'][i]))['label'].tolist()
        labels = [ '\n'.join(wrap(l, 40)) for l in labels ]
        scores = pd.DataFrame(eval(df['CLASSIFICATIONS'][i]))['score'].tolist()
        y_pos = (0.2 + np.arange(len(labels))) / (1 + len(labels))
        width = 0.8 / (1 + len(labels))
        colors = ['blue', 'green', 'yellow', 'orange', 'red']
        for i in range(len(labels)):
            ax2.barh(y_pos[i], scores[i], width, align='center',
                    color=colors[i], ecolor='black')
            ax2.text(scores[i] + 0.01, y_pos[i], '{:.2%}'.format(scores[i]))
        ax2.set_yticks(y_pos)
        ax2.set_yticklabels(labels)
        ax2.set_xlabel('Probability')
        ax2.set_xticks([0, 0.25, 0.5, 0.75, 1])
        ax2.set_xticklabels(['0%', '25%', '50%', '75%', '100%'])
        ax2.set_title('Predicted Probability')
        fig.subplots_adjust(left=None, bottom=None, right=1.5, top=None, wspace=None, hspace=None)

3. ローカルにあるファイル群を内部ステージにアップロード

 まず、画像データをアップロード, UDFや画像分類モデルを登録するためのステージを用意する

Jupyter Notebook
# Create a stage for Images
session.sql("CREATE or REPLACE STAGE IMAGES directory = (enable = true auto_refresh = false) ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE')").collect()
# Create a stage for Functions
session.sql('CREATE or REPLACE STAGE FUNCTIONS').collect()
# Create a stage for Models
session.sql('CREATE or REPLACE STAGE MODELS').collect()

 その後にputメソッドを使用して、ローカルにあるファイル群をSnowflakeの内部ステージにアップロードする。

Jupyter Notebook
# Upload sample images
for image_file in glob('images/*'):
    session.file.put(image_file, '@IMAGES', auto_compress=False)
    
# Refresh stage 
session.sql('ALTER STAGE IMAGES REFRESH').collect()

 その、ディレクトリテーブルに対してクエリを掛け、画像データがアップロードされていることを確認する。結果の画像ファイルも貼っておく。

Jupyter Notebook
# Let's create a Snowpark DataFrame for our Directory Table
snowpark_df = session.sql("""SELECT * FROM DIRECTORY(@IMAGES)""") # -> DIRECTORY TABLE NOT YET IN SNOWPARK-API
print(f'Number of images:{snowpark_df.count()}')
snowpark_df.limit(5).show()

Screenshot 2022-12-09 at 22.21.07.png

 最後にhelper_functions下にあるメソッドを使い、内部ステージ上にある画像ファイルを取得し、Jupyter Notebook上に表示する。実際の画像データの取得にはPresigned URLを使用している。

Jupyter Notebook
show_images_from_df(snowpark_df, url='RELATIVE_PATH', stage='@IMAGES', title='RELATIVE_PATH', max_images=8, ncol=4, resize_shape=(200,200))

Screenshot 2022-12-09 at 22.25.31.png

4. Snowpark APIを通じて、画像分類モデルをPython UDFとして登録

 モデルをUDFに登録する前にローカルで画像分類モデルを構築する。今回使用するのはtransformersというhuggingfaceが公開している機械学習、特に自然言語処理を主とした深層学習向けのライブラリである。(このライブラリが凄い・・・)
 pipeline() メソッドにある通り、入力の画像サイズを224x224に変換後に画像分類モデルを作成するパイプラインを組んだ上で、save_pretrained() メソッドを使用し、モデルファイルをローカルに保存している。その後に、モデル情報をSnowflakeの内部ステージにアップロードしている。

Jupyter Notebook
# Get an Image Classification model
from transformers import pipeline
image_classification_pipeline = pipeline("image-classification", "google/vit-base-patch16-224")

# Save model
image_classification_pipeline.save_pretrained('model/')
# Upload model-files to Snowflake
for model_file in glob('model/*'):
    print(model_file)
    try:
        session.file.put(model_file, '@MODELS', auto_compress=False)
    except:
        None

 必要なモデルを内部ステージにアップロード後、画像分類タスクをUDFとして登録する。2022/12現在において、UDFが動くSecure Sandboxと言われる環境から内部ステージにあるファイルにアクセスするためにプレビュー機能を有効化する必要がある。仮に有効化されていない場合、コードにある _snowflakeというライブラリがないとエラーが返ってくる。

Jupyter Notebook
@cached(cache={})
def f_load_transformer_model(import_dir: str)-> object:
    from transformers import pipeline
    transformer_pipeline = pipeline('image-classification', import_dir, import_dir)
    return transformer_pipeline

@udf(name="classify_image", 
     is_permanent=True, 
     stage_location='@FUNCTIONS', 
     replace=True, 
     session=session, 
     packages=['pillow','pytorch==1.8.1','transformers==4.14.1','cachetools==4.2.2'], 
     imports=['@MODELS/config.json','@MODELS/preprocessor_config.json','@MODELS/pytorch_model.bin'])
def f_score_transformer_model(datafile: str) -> T.Variant:
    import os
    import sys
    import io
    import _snowflake ## PrPrの有効化が必要である
    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    pipeline = f_load_transformer_model(import_dir)
    # Dynamic File Access in UDFs
    img = Image.open(io.BytesIO(_snowflake.open(datafile).read()))
    output = pipeline(img)
    return output

5. Snowflake上で画像分類タスクを実施

 最後に登録したUDFを使い、内部ステージにある画像データを対象に画像分類タスクを実行する。具体的にはデュレクトリテーブルにあるFILE_URLを引数として、登録したUDFのf_score_transformer_model に渡している。その結果、最終列にCLASSIFICATIONS というVariant列に値がJson形式の値が入っていることを確認することができる。
 なお、予測確率を計算していた時間は10-20秒程度と個人的には少し長い印象にあるが、今回使用しているウェアハウスのサイズがXSmallと小さいことが影響している。また、現在のSnowpark Optimized Warehouseなど下回りのインスタンスなど新たなインスタンスの出現に期待したい。

Jupyter Notebook
# Classify the images and gather the predictions
snowpark_df = snowpark_df.with_column('CLASSIFICATIONS', f_score_transformer_model("FILE_URL"))
snowpark_df.toPandas().head()

Screenshot 2022-12-09 at 22.53.38.png

 また、最後に登録した画像と共に各分類タグの予測確率を載せたバーグラフをJupyter Notebook上に表示する。

Jupyter Notebook
# Display a sample of images
show_classifications_from_df(snowpark_df, url='RELATIVE_PATH', stage='@IMAGES', title='RELATIVE_PATH', max_images=8, resize_shape=(200,200))

Screenshot 2022-12-09 at 22.54.48.png

まとめ

 今回の例では画像データであったが、それ以外にもステージ上にある非構造データを含むファイルオブジェクトをSnowflakeをウェアハウスを使用し、処理することが可能となっていた。Snowflakeのウェアハウスから内部ステージへの通信が可能とする機能は現在Private Preview中ではあるが、この機能が今後はもっと充実化していくはずである。そうすれば、現在のデータレイクとデータウェアハウスを統合的に管理はできることに加え、SnowflakeというIFからテーブルとファイルオブジェクトの両方を統一的に処理ができるようになっていくはずである。

5
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?