どういうイベントなの
DATUM STUDIO株式会社がデータエンジニア(やデータエンジニアを目指す人)向けに開催しているイベントシリーズ、Data Engineering Geeksの第3回!…が先日7/30に開催されたので、参加してきました。
今回はSnowpark for PythonとdbtとApache Airflowを組み合わせてMLパイプラインを作ろうというSnowflakeハンズオンセミナーでした。
このハンズオンは2023年2月のSNOWDAY JAPANでも開催されていて、そのときも満席御礼の大人気セッションだったのですが、今回は内容がさらにブラッシュアップされていました!
当日は、カラーバーヘッドなデータエンジニアの梶谷さんがハンズオンガイドに沿ってライブコーディングして、菱沼さんが実況と解説、サポートスタッフとして待機していた他のデータエンジニアの皆様が参加者一人ひとりの質問やトラブルをバシバシ解決する、という形で進行していました。
カラーバーヘッドってなんなの…?
データエンジニアリング界隈ではバ美肉おじさんとして知られている菱沼さんですが、当日は生。珍しい!
感想
SNOWDAY JAPANのハンズオンも参加していたんですが、そのときは手元のトラブルシューティングに手一杯で何をやっているのかあまり理解できず。
(当時はハンズオンガイドがWindows非対応だったような…うろ覚え)
後日、ハンズオンガイドを見ながら会社で復習してみたけどまだ頭に入らない、という感じでした。
ということで、同じ内容のハンズオンをやるのは自分は3回目なのですが、今回はハンズオンガイドがすごく分かりやすくなっていて、理解がかなり深まりました!
そのおかげで、この先のステップアップとして自分が何を勉強すればいいのかも少し見えてきたので、とてもいい勉強になりました。
入念な準備と丁寧な運営をしてくれたDATUM STUDIOの皆様、本当にありがとうございました!
以降、当日にやったことです。
ハンズオンの準備
- Snowflake
- サードパーティーパッケージ(Anaconda)を有効にしたアカウント。有効にする手順
- ACCOUNTADMIN
- ただし、ハンズオン中はだいたいSYSADMINで作業していた
- Docker Desktop
- コンテナの中身(抜粋)
- Python 3.8.16
- Apache Airflow 2.5.0
- snowflake-snowpark-python 1.5.1
- dbt-snowflake 1.5.2
- Jupyter 1.0.0
まずはコンテナの立ち上げ
-
docker compose up -d
で済むようにdocker-compose.yml
を準備してくれていました。ありがたい!- ハンズオンあるある:事前準備のためのドキュメントを読んでいない参加者がいる
- 私です。運営の皆様お手数おかけしました
- 初回立ち上げのときのダウンロードに15分くらいかかりました
- でも、その間もSnowpark for Pythonの歴史と機能を解説してくれたので退屈しなかった!
- Snowpark for Pythonのライブラリロードの仕組みを初めて知った
- ステージにPythonファイルを投げ込んでそこからSnowflake内部のPython実行環境に取り込んでいるとのこと
- 菱沼さん「importされないときにステージにPythonファイルがあるか確認する、という判断ができるので(仕組みを知っておくと)開発するときに便利!」
Jupyter NotebookからSnowpark for Pythonに触ってみる
やったこと
- コンテナ内に入ってJupyter Notebookを起動
- ブラウザでJupyter Notebookに接続
- PythonからSnowflakeに接続(
snowflake.snowpark.session
) - ハンズオン用のデータベース、スキーマ、ウェアハウスを作成
- サンプルデータ(
SNOWFLAKE_SAMPLE_DATA.TPCH_SF1
)のデータをSnowpark DataFrameで読み出し- DataFrameを用いたLIMIT、WHERE、ソート、グルーピング、集計をそれぞれ試しました
- Pythonで書いたUDF(User Defined Function)を試す
- サンプルとして実行したのは文字の全角・半角の操作。たしかにこれはSQLだとやり方が分からない…
- Pythonで書いたストアドプロシージャの作成を試す
学びと感想
以下の余談がめちゃめちゃ目から鱗でした。
Size of Pandas DataFrame in Memory: 682.23 MB (715370781 B)
Size of Snowpark DataFrame in Memory: 0.0 MB (48 B)
Pandas DataFrame はデータの実体をメモリ上に保持していますが、Snowpark DataFrame は保持していないようです。
なるほどなぁ!
あと、プログラムやSaaSからSnowflakeに接続するために必要なアカウント識別子(Account Identifier)がここからコピーできるって今まで知らなかった…
当日理解できなかったのは、UDFとストアドプロシージャをどう使い分けるのか…コメント文には書いてあったんですが、自信の基礎知識の不足で理解しきれずでした。
Snowflakeのドキュメントに解説があったので後で読みます。
参考:ストアドプロシージャとユーザー定義関数のどちらを記述するかの選択
dbtとApache Airflowを触ってみる
やったこと
- dbtからの接続確認
- dbt SQLモデルでモデルを定義して、dbtコマンドで実体化(集計テーブルの作成)を実行
- dbt Pythonモデルでモデルを定義して、dbtコマンドで実体化(集計テーブルの作成)を実行:SQLモデルとまったく同じことをPythonモデルでやってみる
- Apache Airflow(Airflow)の初期設定
- AirflowでHello WorldなDAGを実行
- Airflowからdbt SQLモデルによる実体化を実行
- Airflowからdbt Pythonモデルによる実体化を実行
- Snowsightのワークシートから、dbtコマンドで実体化したテーブルとAirflowが実体化したテーブルに違いがないことを検査
学びと感想
以下のような解説がありました。覚えておこう。
- dbtは頻繁にテーブルをDROPするからTime Travelでストレージコストが膨らまないように注意。DROPしてもTime Travelの日数分がデータ量に計上されてしまう
- (Time Travelしなくてもいいテーブルなら)TRANSIENT TABLEを使うといいよ
最後に2つのテーブルの違いを以下のSQLクエリで確認していました。EXCEPT演算子、便利だなあ。
with result_except as (
select * from DEGEEKS_HO_DB.PUBLIC.example1
except
select * from DEGEEKS_HO_DB.PUBLIC.example1_dbt
)
select count(1) from result_except;
どこかでトラブってファクトテーブルの行数があるときから突然減る、あるあるなので最低でもこういうSQLクエリで定期チェックしたいですね。
機械学習を試してみる
やったこと
- 時系列データを機械学習(ML)を試してみる。サンプルデータ(
SNOWFLAKE_SAMPLE_DATA.TPCH_SF1
)に含まれる取引金額を予測するというシナリオでした - 学習済みモデルを保存するためのステージを作成
- データセットの作成:時系列データのMLでは移動平均を特徴量として使うことが多いので、移動平均を計算しておく
- データセットの保存:上記で作成したデータセットをトレーニング用とテスト用の2つに分けて一時テーブルに保存
- XGBoostのライブラリと、トレーニング用データセットを使って学習済みモデルを作成。ステージに保存
- 学習済みモデルを読み込んで、テスト用データセットから未来の売り上げを予測。予測結果はテーブルに保存
- 予測結果を
matplotlib
でグラフ化して確認
- ここまでの流れをdbt Pythonモデルにして、dbtコマンドで実行
- さらにこのdbt PythonモデルをAirflowから実行(MLパイプラインの完成!)
学びと感想
ここからすごい加速した…本当にみんなできたのかな?
基本中の基本なんですけど、MLってだいたい以下の流れでやるんですよ、ということが分かっていないと何をやろうとしているのか分からなかったかも。
- 学習(訓練)フェーズでは、学習済みモデルを作る
- 推論(予測)フェーズでは、学習フェーズで作った学習済みモデルを使って推論(予測)を行う
Snowflakeに最適化されたXGBoost(や他のアルゴリズム)のライブラリが公開されているそうです。最近リリースされたばかりとのこと。
参考:Snowpark ML Modeling: ML Model Development
SnowparkのUDFやストアドプロシージャを使うことでSnowflake上で実行できるようになっていますが、例えばXGBoostで学習や推論をするところはほとんど従来のPythonと変わらないとのことでした。
つまり、XGBoostについて勉強すればここでやっていることがもっと理解できるということなので、自分が次にやるべきことが見えてきた…!
今回はローカル環境で自分だけが実行しているからAirflowを使うメリットが感じられませんでしたが、例えばクラウド環境にAirflowを導入すれば、誰がいつMLを実行しているのかが見えるからいいですよー、という補足もありました。
参考:Amazon MWAA(Managed Workflows for Apache Airflow)
(当日の解説でAirflowのことを「エムダブリューエーエー」って言っていたのはこれかあと今更気づきました)
Airflowじゃなくて他のオーケストレーションツール(ワークフローエンジン)でもいいということで、よりモダンなDagsterを使うのも面白いかもね!という余談もありました。
オーケストレーションツールというものの雰囲気は分かってきたので、自分で試してみても面白いかも。
参考:データオーケストレーションツールのDagsterを使ってみた
当日はあまり気にならなかったんですが、dbt PythonモデルでMLみたいな複雑な変換処理を定義するのは普通なのかな?
どういう風にデータが使われているのかを確認しようと思ったときにちょっと大変そうだなあと思いました。
野良のMLがあちこちで実行されるとそもそも確認できないから、それよりは断然いいのかな。
トラブルシューティング
当日に参加者の何人かがハマっていたポイント。
Jupyter Notebookが終了できない
そもそもハンズオン中は終了しなくてもいいんですけど、コンソールを複数開く方法が分からない場合はこれでハマるんですよね。
Jupyter Notebookを終了するためにはメニューからShut Down!
コンソールでCtrl+C
を入力して強制終了してもいいですね。
コンソールごと強制終了するパワフルな参加者さんも見かけました。
ターミナルを複数開けない
- ターミナルを複数開く習慣がない
- Dockerに慣れていないので、2つ目のターミナルを開いた後にコンテナ内に入るために何をすればいいのか分からない
みたいな感じですね多分。
自分はVisualStudio Codeをターミナルとして使っていたので、[+]ボタンで2つ目のターミナルを開きました。
また、今回はDocker Composeでコンテナを立ち上げていたので、
cd (docker-compose.ymlが置かれているディレクトリ)
docker compose exec handson bash
を再実行すれば問題なくコンテナ内に入れました。
懇親会
DATUM STUDIOの皆様、参加者の皆様と大変楽しい時間を過ごせました。
大学の研究室ノリでデータエンジニアリングのエッジを攻め続ける会社…この会社で働いたら間違いなく成長できるなあと感心していました。好き嫌いはあると思いますが!