はじめに
本記事ではVertexAI Pipelinesで用いる基本的なパイプラインの書き方について紹介します。
そもそものVeretexAI Pipelinesについての説明や、コンポーネントの作成から実行までの流れ、コンポーネントの紹介については、以下の記事を先にお読みください。
本記事は以下の記事を前提にお話しますので、VeretexAI Pipelinesが全く初めての方には強く推奨します。
VertexAI Pipelines 入門編その0 Hello World
VertexAI Pipelines 入門編その1 コンポーネントを理解する
注意事項
記載されているコードは、2021年11月現在、以下のライブラリでのバージョンで実行が確認されたものです。
kfp==1.8.9
google-cloud-aiplatform==1.7.1
バージョンの互換性によってはエラーとなったり想定外の挙動となる可能性があります。
引数を受け入れるパイプラインを作成する
コンポーネントの引数と同様にパイプラインにも引数がサポートされています。
作り方次第では1つのパイプラインで様々なケースに対応できる、汎用的なパイプラインが実現可能です。
以下の例ではそのパイプラインの書き方とコンパイル方法について記述しています。
from kfp import dsl
from kfp.v2.dsl import component
from kfp.v2 import compiler
# 文字列を受け入れ、文字列を出力するコンポーネント
@component(base_image="python:3.7")
def generate_name_op(first_name:str, last_name:str) -> str:
return f"{first_name} {last_name}"
# 文字列の出力を受け入れるコンポーネント
@component(base_image="python:3.7")
def display_str_op(var: str):
print(var)
pass
@dsl.pipeline(
name="tutorial-pipeline"
)
def tutorial_pipeline(first_name:str, last_name:str):
out_1 = generate_name_op(first_name, last_name)
display_str_op(out_1.output)
pass
compiler.Compiler().compile(
pipeline_func = tutorial_pipeline,
package_path = "任意のパス",
)
コンパイルしたパイプラインを実行します。
引数のparameter_values
にパイプライン関数での引数名と値を持つdict
を挿入します。
import google.cloud.aiplatform as aip
aip.init(
project="任意のプロジェクトID",
location="us-central1"
)
aip.PipelineJob(
display_name = "tutorial pipeline",
template_path="コンパイル時に指定したpackage_path",
pipeline_root=f"gs://任意のバケット名/path/to/xxxx",
parameter_values=dict(
first_name = "バーテックス",
last_name = "太郎"
)
).submit()
パイプラインのコンパイルや実行の詳しい説明は、VertexAI Pipelines 入門編その0 Hello Worldをご覧ください。
パイプラインで条件分岐を実装する
パイプライン関数では、IF文がサポートされていません。
なぜならパイプライン関数はコンパイル時に一度実行され、その時の各コンポーネントは入出力の型しか返さないため、これらの出力値をIF文に使用しても意味がないからです。
コンポーネントの出力値によって条件分岐をするためには、with dsl.Condition(...)
を用います。第一引数に条件文を入れることで、これがTrue
となる場合にのみwith
が対象とするスコープが実行されます。
from kfp import dsl
@dsl.pipeline(
name="tutorial-pipeline"
)
def tutorial_pipeline():
out_1 = generate_name_op("バーテックス", "太郎")
with dsl.Condition(out_1.output == "バーテックス 太郎" ,name="condition_1"):
display_str_op(out_1.output)
with dsl.Condition(out_1.output == "バーテックス 次郎" ,name="condition_2"):
display_str_op(out_1.output)
pass
実行すると以下のようなパイプラインが表示されます。
condtion_1
のみ実行されており、condition_2
は実行されていません(グレー表示)
パイプラインでfor文を実装する。
パイプライン関数では、IF文と同様にfor文もサポートされていません。理由はIF文と同じです。
for文を実装したい場合は、with dsl.ParallelFor(items) as item
というようなコードで実装します。items
にはlist
をdumpしたものが格納されている必要があります。item
にはitems
の各要素がdumpされたものが渡されます。
このとき、items
の要素数だけ並列実行されます。あまりにも要素数が多すぎる場合はVertexAI Pipeliensの上限に抵触する恐れがありますので注意が必要です。
https://cloud.google.com/vertex-ai/quotas#vertex-ai-pipelines
from kfp import dsl
from kfp.v2.dsl import component
@component
def generate_items_op() -> str:
import json
return json.dumps(
[
{"hoge":1},
{"hoge":2}
])
@component(base_image="python:3.7")
def display_dict_op(var: str):
import json
print(json.loads(var))
pass
@dsl.pipeline(
name="tutorial-pipeline"
)
def tutorial_pipeline():
out_1 = generate_items_op()
with dsl.ParallelFor(out_1.output) as item:
display_dict_op(item) # 先述で定義したコンポーネント
pass
実行すると、generate_items_op
が完了するまでは以下の図のようにloopは1つだけに見えます。
計算が終わり要素数が判明すると2 iterations
とloop数が表れ、問題なく並列実行されていることが分かります。
全体のコード
これまでの処理内容を実施できる全体のコードを共有します。
from kfp import dsl
from kfp.v2.dsl import component
from kfp.v2 import compiler
import google.cloud.aiplatform as aip
# 文字列を受け入れ、文字列を出力するコンポーネント
@component(base_image="python:3.7")
def generate_name_op(first_name:str, last_name:str) -> str:
return f"{first_name} {last_name}"
# 文字列の出力を受け入れるコンポーネント
@component(base_image="python:3.7")
def display_str_op(var: str):
print(var)
pass
# itemsを作成するコンポーネント
@component
def generate_items_op() -> str:
import json
return json.dumps(
[
{"hoge":1},
{"hoge":2}
])
# json dumpされたものを受け入れるコンポーネント
@component(base_image="python:3.7")
def display_dict_op(var: str):
import json
print(json.loads(var))
pass
# パイプラインの作成
@dsl.pipeline(
name="tutorial-pipeline"
)
def tutorial_pipeline(first_name:str, last_name:str):
out_1 = generate_name_op(first_name, last_name)
display_str_op(out_1.output)
out_1 = generate_name_op("バーテックス", "太郎")
with dsl.Condition(out_1.output == "バーテックス 太郎" ,name="condition_1"):
display_str_op(out_1.output)
with dsl.Condition(out_1.output == "バーテックス 次郎" ,name="condition_2"):
display_str_op(out_1.output)
out_2 = generate_items_op()
with dsl.ParallelFor(out_2.output) as item:
display_dict_op(item)
pass
# パイプラインのコンパイル
compiler.Compiler().compile(
pipeline_func = tutorial_pipeline,
package_path = "任意のパス",
)
# パイプラインの実行
aip.init(
project="任意のプロジェクトID",
location="us-central1"
)
aip.PipelineJob(
display_name = "tutorial pipeline",
template_path="コンパイル時に指定したpackage_path",
pipeline_root=f"gs://任意のバケット名/path/to/xxxx",
parameter_values=dict(
first_name = "バーテックス",
last_name = "太郎"
)
).submit()
さいごに
パイプラインの説明は以上です。
以下の記事とあわせると大体のパイプラインは作れてしまうかと思います。
VertexAI Pipelines 入門編その0 Hello World
VertexAI Pipelines 入門編その1 コンポーネントを理解する
一方で、VertexAI AutoMLとの連携や実際の機械学習、予測データをBigQueryに入れるなどの具体的な実装方法まではご紹介できていないので、後日改めてその記事を書く予定です。