1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS Batch版のMetaflowでDVCを使ったトレーニングをする

Last updated at Posted at 2026-01-20

背景

最近、MLOpsを実現するために、データのバージョン管理(DVC)と機械学習パイプライン(Metaflow)を組み合わせて使用していました。
しかし、Metaflowでクラウド上(AWS Batchなど)のコンテナでフローを実行する際に、DVCの設定ファイルを持ち込む方法で困りました。

Metaflowの実行環境に持ち込まれるファイルは、基本的に .py ファイルと uv の依存関係ファイル(pyproject.tomluv.lock)などに限られます。
追加のファイルを持ち込みたい場合は --package-suffixes オプションで特定の拡張子を指定できますが、DVCに必要な .dvc/config は拡張子がないため、このオプションでは対応できません。

この記事では、この問題を解決し、Metaflowのクラウド実行環境でDVCを使ってS3からデータセットを取得する方法を紹介します。

前提

  • Python 3.11
  • Metaflow 2.18.13
  • DVC 3.64.0(dvc-s3 3.2.2)
  • PyTorch Lightning 2.1.0
  • uv(パッケージマネージャー)
  • Metaflow(AWS Batch版)環境が構築済み

プロジェクト構成

.
├── .dvc/
│   └── config          # DVCのリモートストレージ設定
├── dataset/
│   ├── training_set.dvc  # 訓練データのメタファイル
│   └── test_set.dvc      # テストデータのメタファイル
├── model.py            # PyTorch Lightningモデル
├── train_flow.py      # Metaflowのフロー定義
├── pyproject.toml
└── uv.lock

解決策: IncludeFileでDVC設定を持ち込む

Metaflow には IncludeFile という機能があり、任意のファイルをフローに埋め込んで実行環境に持ち込むことができます。
この時、S3上にファイル実体が保存され、self.dvc_configのようにアクセスすることでファイル内容を取得できます。

これをファイルに書き出して使いたいのですが、AWS Batch 上で実行する場合、各ステップが独立したコンテナ上で実行されます。
そのため各ステップで .dvc/config をファイルに書き出し、データをダウンロードする必要があります。

実装のポイント

  1. IncludeFile.dvc/configの内容をフローに含める
  2. 実行時に.dvc/configを動的に復元する
  3. データセットを使うステップ内で DVCFileSystem でS3からデータを取得する

コード例

train_flow.py
import os
from metaflow import FlowSpec, step, Parameter, IncludeFile, batch
from pytorch_lightning import Trainer
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms
from dvc.api import DVCFileSystem

from model import DogCatModel


class TrainFlow(FlowSpec):

    batch_size = Parameter('batch_size', default=32)
    epochs = Parameter('epochs', default=10)
    learning_rate = Parameter('lr', default=0.001)

    # IncludeFileでDVC設定を埋め込む
    dvc_config = IncludeFile('dvc_config', default='.dvc/config', is_text=True)

    def _setup_dvc(self):
        """DVC設定を復元"""
        os.makedirs('.dvc', exist_ok=True)
        with open('.dvc/config', 'w') as f:
            f.write(self.dvc_config)

    @step
    def start(self):
        print(f"Starting training with batch_size={self.batch_size}, epochs={self.epochs}")
        self.transform = transforms.Compose([
            transforms.Resize((128, 128)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
        self.next(self.train)

    @batch(gpu=1)
    @step
    def train(self):
        # DVC設定を復元してからデータを取得
        self._setup_dvc()
        fs = DVCFileSystem(".")
        fs.get("dataset/training_set", "dataset/training_set", recursive=True)

        full_dataset = datasets.ImageFolder('dataset/training_set', transform=self.transform)
        train_size = int(0.8 * len(full_dataset))
        val_size = len(full_dataset) - train_size
        train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])

        train_loader = DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=self.batch_size)

        self.model = DogCatModel(learning_rate=self.learning_rate)
        trainer = Trainer(max_epochs=self.epochs)
        trainer.fit(self.model, train_loader, val_loader)
        self.next(self.test)

    @step
    def test(self):
        self._setup_dvc()
        fs = DVCFileSystem(".")
        fs.get("dataset/test_set", "dataset/test_set", recursive=True)

        test_dataset = datasets.ImageFolder('dataset/test_set', transform=self.transform)
        test_loader = DataLoader(test_dataset, batch_size=self.batch_size)

        trainer = Trainer()
        trainer.test(self.model, test_loader)
        self.next(self.end)

    @step
    def end(self):
        print("Training completed!")


if __name__ == '__main__':
    TrainFlow()

重要なポイント

1. IncludeFileの使い方

dvc_config = IncludeFile('dvc_config', default='.dvc/config', is_text=True)
  • is_text=Trueでテキストファイルとして読み込む
  • ローカル実行時も含め、フローの属性として.dvc/configの内容が保持される

2. DVC設定の復元

def _setup_dvc(self):
    """DVC設定を復元"""
    os.makedirs('.dvc', exist_ok=True)
    with open('.dvc/config', 'w') as f:
        f.write(self.dvc_config)

リモート実行環境では.dvcディレクトリが存在しないため、各ステップの最初で復元します。

3. DVCFileSystemでデータ取得

fs = DVCFileSystem(".")
fs.get("dataset/training_set", "dataset/training_set", recursive=True)

dvc pullコマンドを使わず、Pythonコード内でDVCが管理するデータを取得できます。

実行方法

ローカル実行

uv run train_flow.py run

AWS Batch実行

uv run train_flow.py \
  --environment=uv \
  --package-suffixes ".dvc" \
  run \
  --with batch

--package-suffixes ".dvc".dvcファイル(メタデータ)を持ち込むことを忘れないようにしましょう。

終わりに

MetaflowとDVCを組み合わせることで、データのバージョン管理と再現可能な機械学習パイプラインを構築できます。
IncludeFileによる設定ファイルの持ち込みは少しハック的な方法なため、今後Metaflowが任意のファイル/ディレクトリを持ち込む機能を提供してくれることを期待しています。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?