背景
Vertex AI Model Registry でバージョン管理しているモデルを使用して並列に予測を行う Dataflow のジョブを作成したくなったが、 Vertex AI の SDK である google-cloud-aiplatform
が Dataflow ワーカーにインストールされていなかった。
やりたいこと
Dataflow + Python でバッチジョブを作るときに任意のパッケージをインストールしたい。
ちなみにデフォルトでインストールされるパッケージはジョブの記述に使用している Python と Apache Beam SDK のバージョンによって異なっており、以下からパッケージのリストを参照できる。
google-cloud-*
系のパッケージや numpy, pandas などメジャーなパッケージは元からインストールされているものの、上記にないパッケージは自分で用意しなければいけない。
Dataflow ワーカーで任意のパッケージを使用するには大きく2通りの方法がある。
1. 追加したいパッケージの定義を用意してインストール
2. 予め必要なパッケージがインストールされたコンテナでジョブを起動する
1. 追加したいパッケージの定義を用意してインストール
パッケージの定義方法は大きく以下の2パターンに分けられる。
1. PyPI から取得できるパッケージ群を requirements.txt
に記述する
2. setup.py
で必要なパッケージ群を記述する
ここでは一番簡単な 1 の方法のみを書いておく。例えば poetry を使って以下のように requirements.txt
を作る。
$ poetry add google-cloud-aiplatform
$ poetry export --without-hashes -f requirements.txt
$ cat requirements.txt
cachetools==5.2.0 ; python_version >= "3.7" and python_version < "4.0"
certifi==2022.9.24 ; python_version >= "3.7" and python_version < "4"
charset-normalizer==2.1.1 ; python_version >= "3.7" and python_version < "4"
google-api-core==2.11.0 ; python_version >= "3.7" and python_version < "4.0"
google-api-core[grpc]==2.11.0 ; python_version >= "3.7" and python_version < "4.0"
google-auth==2.15.0 ; python_version >= "3.7" and python_version < "4.0"
google-cloud-aiplatform==1.19.0 ; python_version >= "3.7" and python_version < "4.0"
...
あとはこのファイルを PipelineOptions に渡すだけ。
options = PipelineOptions()
setup_option = options.view_as(SetupOptions)
setup_option.requirements_file = "./requirements.txt"
with beam.Pipeline(options=options) as pipeline:
...
ドキュメントにはコマンドラインから --requirements_file
オプションを指定する旨のことしか書いていないため、この方法であれば Dataflow テンプレートを作成する場合にも使用できる。
2. 予め必要なパッケージがインストールされたコンテナでジョブを起動する
方法 1 は手軽にできることがメリットだが、デメリットとして「ワーカーが起動する度にパッケージをインストールする必要がある」ことが挙げられる。
ワーカーが起動する度に pip install が走るので内部的にはパッケージのダウンロード・インストールが行われており、1分1秒でも早くバッチジョブを終わらせたいスピード狂なエンジニアには不都合である。
これを解決するために、事前にパッケージがインストールされたカスタムコンテナでワーカーを起動する。
2-1. Dockerfile を作る
Dataflow ワーカーで使用されるカスタムコンテナをビルドするために Dockerfile を作成する。以下ではベースイメージにできるだけ軽量なものを使用し、Dataflow ジョブを実行するために必要なものだけを抽出している。
依存関係の管理には poetry を使用している。
$ poetry add google-cloud-aiplatform
$ poetry add "apache-beam[gcp]==2.43.0"
FROM python:3.9.15-slim-bullseye
RUN pip install poetry==1.2.0
COPY pyproject.toml poetry.lock ./
RUN poetry config virtualenvs.create false \
&& poetry install --without dev
COPY --from=apache/beam_python3.9_sdk:2.43.0 /opt/apache/beam /opt/apache/beam
# Set the entrypoint to Apache Beam SDK launcher.
ENTRYPOINT ["/opt/apache/beam/boot"]
注意点として、依存関係には ENTRYPOINT で使用する Beam と同じバージョンの apache-beam[gcp]
を追加する必要がある。
上記例だと 2.43.0
を使用しているため poetry add
するバージョンも 2.43.0
で固定している。
2-2. コンテナをビルドして push
上記のコンテナをローカルでも Cloud Build でも好きなところでビルドして GCR 等に push する。
$ PROJECT_ID="YOUR_PROJECT_ID"
$ IMAGE_NAME="dataflow_custom_container"
$ TAG="latest"
$ IMAGE="gcr.io/${PROJECT_ID}/${IMAGE_NAME}:${TAG}"
$ docker build -t $IMAGE .
$ docker push $IMAGE
2-3. カスタムコンテナをオプションで指定する
あとはビルドしたイメージを使用するようにオプションで指定するだけ。カスタムコンテナは Dataflow Runner v2 でないと使用できないため、 v2 が使われるようにオプションを付加している。
custom_container_image = f"gcr.io/{project_id}/dataflow_custom_container:latest"
options = PipelineOptions(
runner="DataflowRunner",
project=project_id,
region=region,
machine_type=machine_type,
staging_location=staging_location,
temp_location=temp_location,
### 以下の2行を追加 ###
sdk_container_image=custom_container_image,
experiments=["use_runner_v2"],
)
with beam.Pipeline(options=options) as pipeline:
...
結論
手軽にやりたい場合は requirements.txt
を用意し、スピードを追い求める場合はカスタムコンテナを使用する。ただしカスタムコンテナで運用を続けることを考えると、そのコンテナ自体をビルドする CI が必要になるためその運用も併せて検討する必要はある。
その他参考資料