はじめに
最近急激に暑くなってきましたね...
この記事を書いている前の週にSnowflake Summit 2023が開催され、DWH×AIがさらに加速するような内容がさまざま発表され、私もすっかりSnowflakeに御熱なわけですが、SnowflakeでAIモデルを動かしてみたいと思い、いろいろ試行錯誤をした内容をまとめておこう!ということで、この記事を書いていこうと思います。
初めてのQiitaでの記事投稿でかなり緊張しており、拙い部分が多いと思いますが、Snowflakeの特徴的な部分をお伝えできればと思います!
この記事の対象者
- Snowflake×AIに興味がある方
- Snowflakeで感情分析をやってみたい方
- ONNXをSnowflakeで使うにはどうすればいいのかわからない方
この記事の内容
- 完成図の確認
- 利用する技術の説明
- 感情分析をSnowflakeで行うための準備
- やってみた結果・感想
完成形
下記に記載しているセットアップが完了すると、以下のように文章を入れるだけで、ネガポジが計算されます!
SELECT SENTIMENT_CLASSIFICATION('It is great!!!!');
> POS
SELECT SENTIMENT_CLASSIFICATION('I was very disappointed.');
> NEG
構成
Hugging Faceで公開されているAmazonのレビューデータのネガポジを行うことを想定しています。
Snowflakeにデータを連携後、自作したユーザー定義関数を通して文章のネガポジを取得します。
利用する技術の説明
ここからは少し難しい内容も含まれるので、Snowflakeでどんな流れで設定するの?という部分に興味がある方は、「感情分析をSnowflakeで行うための準備」まで読み飛ばしてしまっても問題ありません。
BERT
- Bidirectional Encoder Representations from Transformers(Transformerによる双方向のエンコード表現)の略で、今一番熱い自然言語処理系の派生モデル。
- 内部にTransformerというつよつよの機構を用いていて、既存のアーキテクチャと違い、並列性や文脈の理解力が段違いに良くなっています。
- ちなみに話題のChatGPTのGPTは「Generative Pre-trained Transformer」の略で、実はBERTの親戚だったりします。
- BERTについての詳しい内容は自然言語処理の王様「BERT」の論文を徹底解説という記事がわかりやすいので、興味がある方はぜひ読んでみてください。
- 今回感情分析に使うモデルはfiniteautomata/bertweet-base-sentiment-analysisという、Twitterのツイートを使って学習したものを使用します。
ONNX
- Open Neural Network Exchangeの略で、オニキスと読みます。ディープラーニングや機械学習モデルのような人工知能モデル(以下、モデル)を表現するためのフォーマットです。
- 今回は、BERTモデルは読み込みに時間がかかることを考慮し、呼び出しが簡単かつ高速にするためにONNXフォーマットを使用します。
- (細かい話)Hugging Faceで公開されているモデルは基本的に巨大なモデルばかりなので、GPU使うことが前提となることが多いですが、CPUでの処理で高速に処理可能なため使用している側面もあります。
Snowpark
- Snowflakeから公開されている複雑なデータパイプラインの構築をより簡単にし、開発者がデータを移動させずに直接Snowflakeと対話できるようにするために設計された開発者用フレームワークで、今回はユーザー定義関数を外部から登録したり、ファイルをアップロードするのに利用したりします。
- 2023年現在ではPython, Scala, Java, JavaScript, SQLなどが使えます。
感情分析をSnowflakeで行うための準備
Snowflakeでの準備
まず、Snowflake上に必要なユーザーやロール、データベースなどを作成します。
細かい点ですが、Warehouseの作成時にWAREHOUSE_TYPE = 'SNOWPARK-OPTIMIZED'
を指定しています。
-- ロールとユーザーの作成
CREATE OR REPLACE ROLE sent_role;
CREATE OR REPLACE USER sent_user PASSWORD = "Passw0rd";
-- ユーザーとロールの紐付け
GRANT ROLE sent_role TO USER sent_user;
GRANT ROLE sent_role TO ROLE sysadmin;
USE ROLE sysadmin;
-- Warehouseの作成
CREATE OR REPLACE WAREHOUSE sentiment_wh WITH WAREHOUSE_SIZE = 'XLARGE' WAREHOUSE_TYPE = 'SNOWPARK-OPTIMIZED' AUTO_SUSPEND = 60 AUTO_RESUME = TRUE MIN_CLUSTER_COUNT = 1 MAX_CLUSTER_COUNT = 1 INITIALLY_SUSPENDED = TRUE;
GRANT ALL ON WAREHOUSE sentiment_wh TO ROLE sent_role;
-- Databaseの作成
CREATE OR REPLACE DATABASE sentiment_db;
GRANT ALL ON DATABASE sentiment_db TO ROLE sent_role;
GRANT ALL ON ALL SCHEMAS IN DATABASE sentiment_db TO ROLE sent_role;
USE ROLE sent_role;
-- Stageの作成
CREATE OR REPLACE STAGE files;
-- レビューを格納するテーブル
CREATE OR REPLACE TABLE sentiment_db.public.amazon_reviews (
marketplace string,
customer_id string,
review_id string,
product_id string,
product_parent string,
product_title string,
product_category string,
star_rating int,
helpful_votes int,
total_votes int ,
vine int,
verified_purchase int,
review_headline string,
review_body string,
review_date string
);
レビューデータの用意
次に、推論させる(感情分析をする)データを用意します。今回利用するモデルの仕様上、英語のみに対応しているため、 Hugging Faceで公開されているamazon_us_reviews(Amazonのレビュー)を使います。
こちらは事前にSnowflakeにアップロードして、テーブルとして展開します。
from datasets import load_dataset
import pandas as pd
import csv
from snowflake.snowpark.session import Session
import json
# Hugging Face Datasetsからデータをロード
dataset = load_dataset("amazon_us_reviews", "Mobile_Electronics_v1_00")
# # trainデータセットをPandasのDataFrameに変換
df = pd.DataFrame(dataset["train"][:10000])
# DataFrameをCSVファイルに保存
df.to_csv("amazon_reviews.csv",
index=False,
encoding='utf-8',
quoting=csv.QUOTE_NONNUMERIC,
sep=',')
# Snowflakeの接続情報を取得
connection_parameters = json.load(open('connection.json'))
# SnowflakeとのSessionを確立
session = Session.builder.configs(connection_parameters).create()
session.sql_simplifier_enabled = True
# csvのupload
session.file.put("./amazon_reviews.csv", "@files")
Python実行後にSQLを実行
COPY INTO sentiment_db.public.amazon_reviews from @files/amazon_reviews.csv.gz
ON_ERROR = 'CONTINUE'
FILE_FORMAT = (type = csv
field_delimiter = ','
record_delimiter = '\n'
FIELD_OPTIONALLY_ENCLOSED_BY = '"'
skip_header=1);
Pythonコード内のconnection_parameterに入れる認証情報は以下のとおりです。
{
"account" : "アカウント情報",
"user" : "sent_user",
"password" : "Passw0rd",
"role" : "sent_role",
"warehouse" : "sentiment_wh",
"database" : "sentiment_db",
"schema" : "public"
}
Snowflakeにデータを読み込むと以下のようになります。
BERTモデルの用意
次に実際に推論させるために使用するモデルの用意をします。
少しややこしいですが、Hugging Faceで提供されている自然言語モデルを動作させるには、大抵「モデル」と「トークナイザー」の2つが必要になります。
詳しい説明は省きますが、「モデル」は実際に感情分析を行うための代物、「トークナイザー」は文章を「モデル」が理解できるように変換する特殊な代物と認識いただければ、一旦は大丈夫です。
先に説明したONNXですが、モデル側をONNXフォーマットに変換し、トークナイザーは1つのフォルダとしてまとめておきます。
# 必要なライブラリのインストール(インストールしてなければ)
!pip install datasets transformers 'transformers[onnx]' onnxruntime emoji==0.6.0 torch
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
# モデルとトークナイザーの準備
model_name = "finiteautomata/bertweet-base-sentiment-analysis"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)
# ダミーの入力を作成します
max_length = 128 # モデルの最大シーケンス長を調整する場合は、ここを変更してください
dummy_input = tokenizer.encode_plus(
"This is a dummy input for ONNX conversion",
return_tensors='pt',
padding='max_length',
truncation=True,
max_length=max_length
)
# ダミーの入力に対応する名前を設定します
input_names = ['input_ids', 'attention_mask', 'token_type_ids']
output_names = ['logits']
# モデルをONNX形式に変換します
torch.onnx.export(
model,
(dummy_input['input_ids'], dummy_input['attention_mask'], dummy_input['token_type_ids']),
"bertweet-base-sentiment-analysis.onnx",
input_names=input_names,
output_names=output_names
)
# トークナイザーの保存
tokenizer.save_pretrained("./transformer_tokenizer")
モデル・トークナイザーのアップロードとUDF関数の作成
ここでは先ほど作成したモデルとトークナイザーをSnowflakeにアップロードし、同時にUDFも作成します。
# Snowpark
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import udf
from snowflake.snowpark.version import VERSION
# Misc
import pandas as pd
import json
import cachetools
import logging
logger = logging.getLogger("snowflake.snowpark.session")
logger.setLevel(logging.ERROR)
# ---------------------------------------------------------------------------
# Snowflakeの接続情報を取得
connection_parameters = json.load(open('connection.json'))
# SnowflakeとのSessionを確立
session = Session.builder.configs(connection_parameters).create()
session.sql_simplifier_enabled = True
# ---------------------------------------------------------------------------
# 必要なデータを用意
# ファイルは「モデルのONNX化とTokenizerファイルの用意」で作成
session.file.put('bertweet-base-sentiment-analysis.onnx','@files',overwrite=True,auto_compress=False)
session.file.put('./transformer_tokenizer/bpe.codes','@files',overwrite=True,auto_compress=False)
session.file.put('./transformer_tokenizer/added_tokens.json','@files',overwrite=True,auto_compress=False)
session.file.put('./transformer_tokenizer/tokenizer_config.json','@files',overwrite=True,auto_compress=False)
session.file.put('./transformer_tokenizer/special_tokens_map.json','@files',overwrite=True,auto_compress=False)
session.file.put('./transformer_tokenizer/vocab.txt','@files',overwrite=True,auto_compress=False)
# ---------------------------------------------------------------------------
# Session内のライブラリやステージから参照するファイルの指定
session.clear_packages()
session.clear_imports()
session.add_packages('snowflake-snowpark-python','numpy','onnxruntime','transformers','joblib','cachetools')
session.add_import('@files/bertweet-base-sentiment-analysis.onnx')
session.add_import('@files/bpe.codes')
session.add_import('@files/added_tokens.json')
session.add_import('@files/tokenizer_config.json')
session.add_import('@files/special_tokens_map.json')
session.add_import('@files/vocab.txt')
# ---------------------------------------------------------------------------
# トークナイザーのフォルダをインスタンス内に作成
@cachetools.cached(cache={})
def make_tokenizer_folder(import_dir, new_folder):
import os
if not os.path.exists(new_folder):
os.mkdir(new_folder)
file_list = ['bpe.codes', 'added_tokens.json', 'tokenizer_config.json', 'special_tokens_map.json', 'vocab.txt']
for item in file_list:
# コピー元のファイルを読み込みモードで開く
with open(import_dir+'/'+item, 'rb') as source_file:
# コピー先のファイルを書き込みモードで開く
with open(new_folder+'/'+item, 'wb') as destination_file:
# コピー元のファイルの内容を読み込む
contents = source_file.read()
# コピー先のファイルに内容を書き込む
destination_file.write(contents)
# ---------------------------------------------------------------------------
# onnxファイル・トークナイザーフォルダーからモデルとトークナイザーを取得
@cachetools.cached(cache={})
def load_model(import_dir, new_folder):
import onnxruntime as ort
from transformers import AutoTokenizer
model_file = import_dir + '/bertweet-base-sentiment-analysis.onnx'
opts = ort.SessionOptions()
opts.intra_op_num_threads = 1
providers = ['CPUExecutionProvider']
ort_session = ort.InferenceSession(model_file, providers=providers, sess_options=opts)
tokenizer = AutoTokenizer.from_pretrained(new_folder)
return ort_session, tokenizer
# ---------------------------------------------------------------------------
# UDF
# モデル・トークナイザーを用いて引数の文章の感情分析を行う
@udf(name='sentiment_classification',session=session,replace=True,is_permanent=True,stage_location='@files')
def sentiment_classification(sentence: str) -> str:
import sys
import uuid
import numpy as np
new_folder = '/tmp/transformer_tokenizer' + str(uuid.uuid4()).replace('-', '_')
# ラベル
label = ['NEG', 'NEU', 'POS']
IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
# tokenizerの用意
make_tokenizer_folder(import_dir, new_folder)
# モデルとトークナイザーの準備
ort_session, tokenizer = load_model(import_dir, new_folder)
# 文章のトークン化
inputs = tokenizer.encode_plus(
sentence,
return_tensors='np', # numpy形式で返す
padding='max_length',
truncation=True,
max_length=128
)
# 入力を準備
input_dict = {name:inputs[name] for name in ['input_ids', 'attention_mask', 'token_type_ids']}
# 推論実行
ort_outs = ort_session.run(None, input_dict)
# ort_outsはリスト形式で結果を返すので、最初の要素を取得
logits = ort_outs[0]
# softmaxを適用して確率に変換
probs = np.exp(logits) / np.sum(np.exp(logits))
# labelに変換してから返却
return f"{label[np.argmax(probs)]}"
長いですが、かいつまんで説明すると
- 必要ライブラリのインポートと接続情報をもとにSnowflakeとの接続を開始します。
- 「必要なデータを用意」で、作成した「モデル」「トークナイザー」をステージにアップロードします。
- 「Session内のライブラリやステージの依存関係を設定」で、UDF実行時のライブラリや参照するファイルを設定します。
- 残りのパートで実際に動作させる関数の定義を行います。内容としては実行時にモデルの読み込みを行い、入力された文章の感情分析を得て、['NEG', 'NEU', 'POS']の3つのどれかが選ばれます。
実行!
長かったですが、これでようやく実行できます!
実行すると以下のようになります。
わかりやすくするため、元のレビューを書いた方の評価(star_rating)を表示させていますが、概ね結果に問題はなさそうですね。
ですがいろいろ見てみると、実は恐ろしいことが起きていました...
費用対効果について
パフォーマンス
まず実行した100件のレビューに対しての実行時間・クエリ情報は以下のとおりでした。
実行に4分24秒もかかっている....!!!
そうなんです。めちゃくちゃ長い準備に付き合ってくださった方には大変申し訳がないのですが、現状全く使い物にならないのです。
他にもよく見てみると、「統計」の「Python sandbox max memory」58.09GBになっているのもかなり気になります。
ONNXの恩恵を受けれていないのか?そもそも肥大化するようなものなのか?などいろいろ切り分けが必要ですが、現状よくわかっていません...
料金
料金について、1回の実行でかかった料金についても見てみました。
以下の図は100件の推論1回の実行でかかった料金についてです。
どこのクラウドサービス・どこのリージョンを使うかによってクレジットの単価が変わりますが、WarehouseのサイズをXLARGE
を指定して100行のみの推論で大体¥1000〜¥2000くらいかかります。「1回だけで」です。高すぎますね...
最後に
今回は高速化のためにONNXモデルを使用しましたが、結果は使い物にならず失敗に終わってしまいました...
前述した通りなぜ遅くなっているのかなどの切り分けは必要ですが、現状ビジネス上でそのまま使うにはなかなか難しいものがあると思います。
ですが、Python UDFを作るための一連の流れの大枠を捉えるにはとても良い勉強だったと思うので、この失敗を参考にしていただければ嬉しく思います!
また、Snowflake Summit 2023では「Snowpark Container Service」という機能が発表され、GPUを利用した処理をSnowflakeだけで完結させることもできるようになるそうです!
ですので、今回の内容のような処理が比較的重たいAIモデルを使用する際のベストプラクティスになるかもしれませんね。