背景
最近、MLOpsを実現するために、データのバージョン管理(DVC)と機械学習パイプライン(Metaflow)を組み合わせて使用していました。
しかし、Metaflowでクラウド上(AWS Batchなど)のコンテナでフローを実行する際に、DVCの設定ファイルを持ち込む方法で困りました。
Metaflowの実行環境に持ち込まれるファイルは、基本的に .py ファイルと uv の依存関係ファイル(pyproject.toml、uv.lock)などに限られます。
追加のファイルを持ち込みたい場合は --package-suffixes オプションで特定の拡張子を指定できますが、DVCに必要な .dvc/config は拡張子がないため、このオプションでは対応できません。
この記事では、この問題を解決し、Metaflowのクラウド実行環境でDVCを使ってS3からデータセットを取得する方法を紹介します。
前提
- Python 3.11
- Metaflow 2.18.13
- DVC 3.64.0(dvc-s3 3.2.2)
- PyTorch Lightning 2.1.0
- uv(パッケージマネージャー)
- Metaflow(AWS Batch版)環境が構築済み
プロジェクト構成
.
├── .dvc/
│ └── config # DVCのリモートストレージ設定
├── dataset/
│ ├── training_set.dvc # 訓練データのメタファイル
│ └── test_set.dvc # テストデータのメタファイル
├── model.py # PyTorch Lightningモデル
├── train_flow.py # Metaflowのフロー定義
├── pyproject.toml
└── uv.lock
解決策: IncludeFileでDVC設定を持ち込む
Metaflow には IncludeFile という機能があり、任意のファイルをフローに埋め込んで実行環境に持ち込むことができます。
この時、S3上にファイル実体が保存され、self.dvc_configのようにアクセスすることでファイル内容を取得できます。
これをファイルに書き出して使いたいのですが、AWS Batch 上で実行する場合、各ステップが独立したコンテナ上で実行されます。
そのため各ステップで .dvc/config をファイルに書き出し、データをダウンロードする必要があります。
実装のポイント
-
IncludeFileで.dvc/configの内容をフローに含める - 実行時に
.dvc/configを動的に復元する - データセットを使うステップ内で
DVCFileSystemでS3からデータを取得する
コード例
import os
from metaflow import FlowSpec, step, Parameter, IncludeFile, batch
from pytorch_lightning import Trainer
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms
from dvc.api import DVCFileSystem
from model import DogCatModel
class TrainFlow(FlowSpec):
batch_size = Parameter('batch_size', default=32)
epochs = Parameter('epochs', default=10)
learning_rate = Parameter('lr', default=0.001)
# IncludeFileでDVC設定を埋め込む
dvc_config = IncludeFile('dvc_config', default='.dvc/config', is_text=True)
def _setup_dvc(self):
"""DVC設定を復元"""
os.makedirs('.dvc', exist_ok=True)
with open('.dvc/config', 'w') as f:
f.write(self.dvc_config)
@step
def start(self):
print(f"Starting training with batch_size={self.batch_size}, epochs={self.epochs}")
self.transform = transforms.Compose([
transforms.Resize((128, 128)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
self.next(self.train)
@batch(gpu=1)
@step
def train(self):
# DVC設定を復元してからデータを取得
self._setup_dvc()
fs = DVCFileSystem(".")
fs.get("dataset/training_set", "dataset/training_set", recursive=True)
full_dataset = datasets.ImageFolder('dataset/training_set', transform=self.transform)
train_size = int(0.8 * len(full_dataset))
val_size = len(full_dataset) - train_size
train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])
train_loader = DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=self.batch_size)
self.model = DogCatModel(learning_rate=self.learning_rate)
trainer = Trainer(max_epochs=self.epochs)
trainer.fit(self.model, train_loader, val_loader)
self.next(self.test)
@step
def test(self):
self._setup_dvc()
fs = DVCFileSystem(".")
fs.get("dataset/test_set", "dataset/test_set", recursive=True)
test_dataset = datasets.ImageFolder('dataset/test_set', transform=self.transform)
test_loader = DataLoader(test_dataset, batch_size=self.batch_size)
trainer = Trainer()
trainer.test(self.model, test_loader)
self.next(self.end)
@step
def end(self):
print("Training completed!")
if __name__ == '__main__':
TrainFlow()
重要なポイント
1. IncludeFileの使い方
dvc_config = IncludeFile('dvc_config', default='.dvc/config', is_text=True)
-
is_text=Trueでテキストファイルとして読み込む - ローカル実行時も含め、フローの属性として
.dvc/configの内容が保持される
2. DVC設定の復元
def _setup_dvc(self):
"""DVC設定を復元"""
os.makedirs('.dvc', exist_ok=True)
with open('.dvc/config', 'w') as f:
f.write(self.dvc_config)
リモート実行環境では.dvcディレクトリが存在しないため、各ステップの最初で復元します。
3. DVCFileSystemでデータ取得
fs = DVCFileSystem(".")
fs.get("dataset/training_set", "dataset/training_set", recursive=True)
dvc pullコマンドを使わず、Pythonコード内でDVCが管理するデータを取得できます。
実行方法
ローカル実行
uv run train_flow.py run
AWS Batch実行
uv run train_flow.py \
--environment=uv \
--package-suffixes ".dvc" \
run \
--with batch
--package-suffixes ".dvc" で.dvcファイル(メタデータ)を持ち込むことを忘れないようにしましょう。
終わりに
MetaflowとDVCを組み合わせることで、データのバージョン管理と再現可能な機械学習パイプラインを構築できます。
IncludeFileによる設定ファイルの持ち込みは少しハック的な方法なため、今後Metaflowが任意のファイル/ディレクトリを持ち込む機能を提供してくれることを期待しています。