本記事は ZOZO Advent Calendar 2023 シリーズ 7 の 11 日目の記事です。
はじめに
この記事では Google Cloud のマネージドサービスである Vertex AI Experiments を使った実験管理方法をご紹介します。
このサービスのメリットは当然 Google Cloud の各種サービスと連携がとてもシームレスに行える点が一番に挙げられますが、個人や大学の研究室で実験管理・共有するためのツールとしても有用だと思いますので、既に MLflow などを使っている方でもぜひご一読ください。
Vertex AI Experiments で何ができる?
端的に言ってしまえば、2023年12月時点では機能的に MLflow とそう違いはありません。まずドキュメントを読んだ感じではバックエンドに MLflow を使っている雰囲気があるので、マネージド MLflow と言っても差し支えないレベルだと思います。
- モデルの学習・予測に使用したパラメータを記録する
- モデル学習時の学習曲線を記録する
- モデル評価結果のメトリクスを記録する
- etc.
Vertex AI Experiments はこういった実験管理をする上での基本的な機能を扱えるマネージドサービスである、という認識で良いと思います。
ただし、他の実験管理方法との大きな違いは冒頭にも述べた通り「Google Cloud の各種サービスとの連携がシームレスに行える」という点です。
実験と一口に言っても、ML モデルの実験を回す・個人で実験管理をする・実験結果を他人に共有する・実験を再現するなど、実験にまつわる作業は様々です。こうした作業を行うプラットフォームが一箇所に集約されていることのメリットは言わずもがなでしょう。
Vertex AI Experiments での実験管理方法
大きく2パターンで実験の記録ができます。
- Vertex AI Pipelines で記録する
- ライブラリを使って記録する
それぞれどのように記録し、その結果を確認するかを以下で説明していきます。
準備
まずは Vertex AI Experiments を扱うために必要なライブラリをインストールしておきます。
$ pip install google-cloud-aiplatform
この記事を書いた時に使用していたパッケージのバージョンは以下のとおりです。
$ pip show google-cloud-aiplatform
Name: google-cloud-aiplatform
Version: 1.36.4
Summary: Vertex AI API client library
Home-page: https://github.com/googleapis/python-aiplatform
Author: Google LLC
Author-email: googleapis-packages@google.com
...
1. Vertex AI Pipelines で記録する
Vertex AI Pipelines は Google Cloud で提供されている ML ワークフローのオーケストレーションサービスです。ここでは Vertex AI Pipelines を既に利用している前提で書き進めます。
以下のようにモデルの学習・予測を行うダミーのコンポーネントを繋ぎ合わせてパイプラインを作り、これを実行していきます。
import os
import tempfile
import fire
from google.cloud import aiplatform
from kfp.v2 import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import Input, Metrics, Output, component, Model, ClassificationMetrics
@component(base_image="python:3.11.5-slim-bullseye")
def train(
epoch: int,
batch_size: int,
description: str,
trained_model: Output[Model],
summary_metrics: Output[Metrics],
classification_metrics: Output[ClassificationMetrics],
) -> None:
import json
import random
from pathlib import Path
print(f"epoch: {epoch}, batch_size: {batch_size}, description: {description}")
# Store trained model
Path(trained_model.path).parent.mkdir(parents=True, exist_ok=True)
with open(trained_model.path, "w") as f:
json.dump({"dummy_data": "for_test"}, f)
# Store metrics
summary_metrics.log_metric("accuracy", 0.9)
summary_metrics.log_metric("recall", 0.8)
summary_metrics.log_metric("precision", 0.7)
summary_metrics.log_metric("description", description)
# Plot ROC curve
classification_metrics.log_roc_curve(
fpr=[random.uniform(0.1, 0.3) for _ in range(101)],
tpr=[random.uniform(0.6, 0.9) for _ in range(101)],
threshold=[i / 100 for i in range(101)],
)
# Plot confusion matrix
matrix = [
[random.randint(0, 100), random.randint(0, 100)],
[random.randint(0, 100), random.randint(0, 100)],
]
classification_metrics.log_confusion_matrix(categories=["Positive", "Negative"], matrix=matrix)
@component(base_image="python:3.11.5-slim-bullseye")
def predict(test_data: str, trained_model: Input[Model]) -> None:
import json
print(test_data)
with open(trained_model.path) as f:
print(json.load(f))
@dsl.pipeline(name="sample-pipeline")
def pipeline(epoch: int, batch_size: int, train_description: str):
train_task = train(
epoch=epoch, batch_size=batch_size, description=train_description
)
predict(test_data="Path of test data", trained_model=train_task.outputs["trained_model"])
def main(
project: str,
location: str,
pipeline_root: str,
display_name: str,
enable_cache: bool,
parameters: dict,
experiment_name: str,
):
"""For test with Vertex AI Pipelines
"""
aiplatform.init(
project=project,
location=location,
)
with tempfile.TemporaryDirectory() as tempdir:
path = os.path.join(tempdir, "pipeline.json")
compiler.Compiler().compile(
pipeline_func=pipeline, package_path=path, type_check=False
)
job = aiplatform.PipelineJob(
display_name=display_name,
enable_caching=enable_cache,
template_path=path,
parameter_values=parameters,
pipeline_root=pipeline_root,
)
job.submit()
if __name__ == "__main__":
fire.Fire(main)
1-1. 従来の Vertex AI Pipelines の UI から確認する
Vertex AI Pipelines を既に利用している場合パイプラインで使用したパラメータやアーティファクトは自動的に記録されます。
以下のコマンドでパイプラインを実行します。
$ poetry run python pipelines.py \
--project "<YOUR_GCP_PROJECT_ID>" \
--location "<YOUR_REGION>" \
--pipeline_root "<YOUR_GCS_BUCKET>" \
--display_name "sample-pipeline-display-name" \
--enable_cache False \
--parameters "{\"epoch\":10,\"batch_size\":32,\"train_description\":\"description of experiment\"}" \
--experiment_name "sample-pipeline-experiment"
実行結果は以下のように確認・比較できます。
ただしこの方法だと一覧性が悪く、またパラメータやメトリクスを確認したいパイプラインの run を全て選択する必要があるため快適な実験管理状態とは言えないでしょう。
1-2. experiment
パラメータを指定して記録・確認する
1-2-1. 事前準備
続いて実行結果を Vertex AI Experiments に保存するために、事前に "Experiment" というリソースを作っておきます。
1-2-2. 実行
パイプラインのジョブを実行する際、Python SDK を使っている場合は以下のように experiment
というパラメータを指定すると Vertex AI Experiments の方にも実行内容が記録されます。
experiment_name = "sample-pipeline-experiment"
job.submit(
experiment=experiment_name, # この引数を追加
)
experiment
引数を追加して実行した場合、"Experiments" タブから先ほど作成した experiment を選択して開くと実行時のパラメータやメトリクスが一覧で確認できます。
実験同士の比較は以下のように表示されますが、上記のアーティファクトとして出力されていたROC曲線や混同行列の画像の比較はできないようです。
実験用でパイプラインを実行する際は experiment
パラメータを付加して実行すると、このように簡易的な実験管理ができます。
2. ライブラリを使って記録する
ここでは google-cloud-aiplatform
ライブラリを使い、Vertex AI Pipelines のコンポーネントで実行されていたロギングと同じことを行います。
使用感は MLflow と全く同じなので、コードサンプルと実行後の Run の状態だけを示します。
import random
import datetime
import fire
from google.cloud import aiplatform
tensorboard_path = "<YOUR_TENSORBOARD_PATH>"
def main(
project: str,
location: str,
parameters: dict,
experiment_name: str,
experiment_description: str,
):
# Init
aiplatform.init(
experiment=experiment_name,
experiment_description=experiment_description,
project=project,
location=location,
)
run_name = f"sample-run-{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}"
tensorboard = aiplatform.Tensorboard(tensorboard_path)
# Execute experiment
aiplatform.start_run(run=run_name, tensorboard=tensorboard)
# Store parameters of experiment
aiplatform.log_params(parameters)
# Store metrics of experiment results
aiplatform.log_metrics({"accuracy": 0.9, "recall": 0.8, "precision": 0.7})
# Plot classification metrics
aiplatform.log_classification_metrics(
labels=["Positive", "Negative"],
matrix=[
[random.randint(0, 100), random.randint(0, 100)],
[random.randint(0, 100), random.randint(0, 100)],
],
fpr=[random.uniform(0.1, 0.3) for _ in range(101)],
tpr=[random.uniform(0.6, 0.9) for _ in range(101)],
threshold=[i / 100 for i in range(101)],
display_name="Sample of classification metrics",
)
aiplatform.end_run()
if __name__ == "__main__":
fire.Fire(main)
これらの処理を行う Vertex AI Pipelines 用のコンポーネントを作成し、学習コンポーネント・評価コンポーネントからの出力を受け取って Vertex AI Experiments にロギングしていくような運用が考えられます。
Vertex AI Pipelines でのロギングと異なる点
Vertex AI Pipelines でのアーティファクトの出力に頼るのではなく、このライブラリを使ってロギングすることのメリットは大まかに以下の2点だと考えています。
- 学習曲線などの時系列データをプロットできる
- パイプラインの実行単位よりも細かい単位で実験管理ができる
学習曲線などの時系列データをプロットできる
まず1点目ですが、Vertex AI Pipelines のアーティファクトの型に時系列データを表現するものが存在しないため、時系列データをプロットしたグラフを表示したい場合は今回のライブラリを使った方法を採らざるを得ません。
例えばライブラリを使って tensorboard 用に出力された学習曲線をプロットするサンプルを以下に示します。
aiplatform.start_upload_tb_log(
tensorboard_id=tensorboard_id,
tensorboard_experiment_name=tensorboard_experiment_name,
logdir=logdir,
experiment_display_name=experiment_display_name,
run_name_prefix=run_name_prefix,
description=description,
)
aiplatform.end_upload_tb_log()
(参考資料)
補足として、Vertex AI Experiments で時系列データのプロットを行うには Tensorboard インスタンスを立ち上げる必要があり、Tensorboard インスタンスは保存するデータ量に応じて利用料金が変わるため、そもそも時系列データを管理する必要があるかは財布と要相談といったところです。
パイプラインの実行単位よりも細かい単位で実験管理ができる
2点目は Vertex AI Experiments のリソース構造に関するものです。Vertex AI Pipelines を使った実験管理だと、パイプラインの実行一回につき一つの実験だけが記録されます。
ライブラリを使うと、以下のように様々なパラメータで実験した結果を一回の run として記録できるため、ローカル環境や Vertex AI Workbench のような環境で何度も実験を回す際に有効な記録方法です。
# Init
aiplatform.init(
experiment=experiment_name,
experiment_description=experiment_description,
project=project,
location=location,
)
run_name = f"sample-run-{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}"
tensorboard = aiplatform.Tensorboard(tensorboard_path)
for i in range(10):
# Execute experiment
aiplatform.start_run(run=f"{run_name}-{i}", tensorboard=tensorboard)
# Store parameters of experiment
parameters = {"epoch": random.randint(1, 64), "batch_size": random.randint(1, 64), "lr": 0.1 * i, "iter": i}
aiplatform.log_params(parameters)
# Store metrics of experiment results
aiplatform.log_metrics(
{
"accuracy": random.uniform(0, 1),
"recall": random.uniform(0, 1),
"precision": random.uniform(0, 1),
}
)
# Plot classification metrics
aiplatform.log_classification_metrics(
labels=["Positive", "Negative"],
matrix=[
[random.randint(0, 100), random.randint(0, 100)],
[random.randint(0, 100), random.randint(0, 100)],
],
fpr=[random.uniform(0.1, 0.3) for _ in range(101)],
tpr=[random.uniform(0.6, 0.9) for _ in range(101)],
threshold=[i / 100 for i in range(101)],
display_name="Sample of classification metrics",
)
aiplatform.end_run()
上記のコードでは任意のパラメータで実験を回すような状況を想定しています。このコードを実行すると以下のような結果が得られ、パラメータごとのパフォーマンスを容易に比較できます。
ただしこの方法だと各実験を並列で実行できないため、並列かつバックグラウンドで複数の実験を回す方法を検討する必要があります。
OSS としての MLflow を使う場合との比較
まず MLflow を利用する場合、MLflow のサーバーとなる環境を用意する必要があります。これは個人の PC でもチーム共用のサーバーでもなんでも良いですが、とにかく常に稼働している環境を用意しなければいけません。
つまり、共用サーバーでホスティングされている MLflow を管理する人が必要になるということです。一方 Vertex AI Experiments はマネージド MLflow のように使えるため、こうした管理コストを減らせる点が一番のメリットでしょう。
Google Cloud のサービスを組み合わせて MLflow をホスティングしても同じことはできると思いますが、いずれにせよ常時稼働させる・もしくはシャットダウンしてもデータが消失しないように DB を用意する必要があり、管理・構築コストはそれなりにかかってしまいます。
個人に閉じたユースケースならまだしも、チームで実験結果を集約する環境の一つとして Vertex AI Experiments はファーストチョイスになり得ると思います。
まとめ
便利なサービスなのでもっとユースケースと知見が広まってほしいところです。是非、日々の実験管理に Vertex AI Experiments を取り入れてみてください。
本記事で使用したコードは以下のリポジトリに配置しています。