2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

VertexAI Pipelines 入門編 その2 パイプラインを理解する

Last updated at Posted at 2021-11-24

はじめに

本記事ではVertexAI Pipelinesで用いる基本的なパイプラインの書き方について紹介します。
そもそものVeretexAI Pipelinesについての説明や、コンポーネントの作成から実行までの流れ、コンポーネントの紹介については、以下の記事を先にお読みください。
本記事は以下の記事を前提にお話しますので、VeretexAI Pipelinesが全く初めての方には強く推奨します。

VertexAI Pipelines 入門編その0 Hello World
VertexAI Pipelines 入門編その1 コンポーネントを理解する

注意事項

記載されているコードは、2021年11月現在、以下のライブラリでのバージョンで実行が確認されたものです。
kfp==1.8.9
google-cloud-aiplatform==1.7.1

バージョンの互換性によってはエラーとなったり想定外の挙動となる可能性があります。

引数を受け入れるパイプラインを作成する

コンポーネントの引数と同様にパイプラインにも引数がサポートされています。
作り方次第では1つのパイプラインで様々なケースに対応できる、汎用的なパイプラインが実現可能です。

以下の例ではそのパイプラインの書き方とコンパイル方法について記述しています。

python
from kfp import dsl
from kfp.v2.dsl import component
from kfp.v2 import compiler


# 文字列を受け入れ、文字列を出力するコンポーネント
@component(base_image="python:3.7")
def generate_name_op(first_name:str, last_name:str) -> str:
    return f"{first_name} {last_name}"


# 文字列の出力を受け入れるコンポーネント
@component(base_image="python:3.7")
def display_str_op(var: str):
    print(var)
    pass


@dsl.pipeline(
    name="tutorial-pipeline"
)
def tutorial_pipeline(first_name:str, last_name:str):
    
    out_1 = generate_name_op(first_name, last_name)
    display_str_op(out_1.output)
    
    pass

compiler.Compiler().compile(
    pipeline_func = tutorial_pipeline,
    package_path  = "任意のパス",
)

コンパイルしたパイプラインを実行します。
引数のparameter_valuesにパイプライン関数での引数名と値を持つdictを挿入します。

python
import google.cloud.aiplatform as aip


aip.init(
    project="任意のプロジェクトID",
    location="us-central1"
)

aip.PipelineJob(
    display_name = "tutorial pipeline",
    template_path="コンパイル時に指定したpackage_path",
    pipeline_root=f"gs://任意のバケット名/path/to/xxxx",
    parameter_values=dict(
        first_name = "バーテックス",
        last_name = "太郎"
    )
).submit()

パイプラインのコンパイルや実行の詳しい説明は、VertexAI Pipelines 入門編その0 Hello Worldをご覧ください。

パイプラインで条件分岐を実装する

パイプライン関数では、IF文がサポートされていません。
なぜならパイプライン関数はコンパイル時に一度実行され、その時の各コンポーネントは入出力の型しか返さないため、これらの出力値をIF文に使用しても意味がないからです。

コンポーネントの出力値によって条件分岐をするためには、with dsl.Condition(...)を用います。第一引数に条件文を入れることで、これがTrueとなる場合にのみwithが対象とするスコープが実行されます。

python
from kfp import dsl


@dsl.pipeline(
    name="tutorial-pipeline"
)
def tutorial_pipeline():
    out_1 =  generate_name_op("バーテックス", "太郎")
    
    with dsl.Condition(out_1.output == "バーテックス 太郎" ,name="condition_1"):
        display_str_op(out_1.output)
        
    with dsl.Condition(out_1.output == "バーテックス 次郎" ,name="condition_2"):
        display_str_op(out_1.output)
        
        
    pass

実行すると以下のようなパイプラインが表示されます。

スクリーンショット 2021-11-24 9.15.15.png

condtion_1のみ実行されており、condition_2は実行されていません(グレー表示)

パイプラインでfor文を実装する。

パイプライン関数では、IF文と同様にfor文もサポートされていません。理由はIF文と同じです。

for文を実装したい場合は、with dsl.ParallelFor(items) as itemというようなコードで実装します。itemsにはlistをdumpしたものが格納されている必要があります。itemにはitemsの各要素がdumpされたものが渡されます。

このとき、itemsの要素数だけ並列実行されます。あまりにも要素数が多すぎる場合はVertexAI Pipeliensの上限に抵触する恐れがありますので注意が必要です。
https://cloud.google.com/vertex-ai/quotas#vertex-ai-pipelines

python
from kfp import dsl
from kfp.v2.dsl import component


@component
def generate_items_op() -> str:
    import json
    
    return json.dumps(
        [
            {"hoge":1},
            {"hoge":2}
        ])


@component(base_image="python:3.7")
def display_dict_op(var: str):
    import json
    print(json.loads(var))
    pass


@dsl.pipeline(
    name="tutorial-pipeline"
)
def tutorial_pipeline():
    out_1 = generate_items_op()
    
    with dsl.ParallelFor(out_1.output) as item:
        display_dict_op(item) # 先述で定義したコンポーネント
        
    pass

実行すると、generate_items_opが完了するまでは以下の図のようにloopは1つだけに見えます。

for1.png

計算が終わり要素数が判明すると2 iterationsとloop数が表れ、問題なく並列実行されていることが分かります。
for2.png

全体のコード

これまでの処理内容を実施できる全体のコードを共有します。

python
from kfp import dsl
from kfp.v2.dsl import component
from kfp.v2 import compiler
import google.cloud.aiplatform as aip


# 文字列を受け入れ、文字列を出力するコンポーネント
@component(base_image="python:3.7")
def generate_name_op(first_name:str, last_name:str) -> str:
    return f"{first_name} {last_name}"


# 文字列の出力を受け入れるコンポーネント
@component(base_image="python:3.7")
def display_str_op(var: str):
    print(var)
    pass

# itemsを作成するコンポーネント
@component
def generate_items_op() -> str:
    import json
    
    return json.dumps(
        [
            {"hoge":1},
            {"hoge":2}
        ])


# json dumpされたものを受け入れるコンポーネント
@component(base_image="python:3.7")
def display_dict_op(var: str):
    import json
    print(json.loads(var))
    pass


# パイプラインの作成
@dsl.pipeline(
    name="tutorial-pipeline"
)
def tutorial_pipeline(first_name:str, last_name:str):
    
    out_1 = generate_name_op(first_name, last_name)
    display_str_op(out_1.output)

    out_1 =  generate_name_op("バーテックス", "太郎")
    
    with dsl.Condition(out_1.output == "バーテックス 太郎" ,name="condition_1"):
        display_str_op(out_1.output)
        
    with dsl.Condition(out_1.output == "バーテックス 次郎" ,name="condition_2"):
        display_str_op(out_1.output)


    out_2 = generate_items_op()
    
    with dsl.ParallelFor(out_2.output) as item:
        display_dict_op(item)

    
    pass


# パイプラインのコンパイル
compiler.Compiler().compile(
    pipeline_func = tutorial_pipeline,
    package_path  = "任意のパス",
)


# パイプラインの実行
aip.init(
    project="任意のプロジェクトID",
    location="us-central1"
)

aip.PipelineJob(
    display_name = "tutorial pipeline",
    template_path="コンパイル時に指定したpackage_path",
    pipeline_root=f"gs://任意のバケット名/path/to/xxxx",
    parameter_values=dict(
        first_name = "バーテックス",
        last_name = "太郎"
    )
).submit()


さいごに

パイプラインの説明は以上です。
以下の記事とあわせると大体のパイプラインは作れてしまうかと思います。

VertexAI Pipelines 入門編その0 Hello World
VertexAI Pipelines 入門編その1 コンポーネントを理解する

一方で、VertexAI AutoMLとの連携や実際の機械学習、予測データをBigQueryに入れるなどの具体的な実装方法まではご紹介できていないので、後日改めてその記事を書く予定です。

2
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?