5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

MicroAd (マイクロアド) Advent Calendar 2023

Day 18

Kubeflow Pipelineのdsl.ExitHandlerの使い方まとめ

Last updated at Posted at 2023-12-17

この記事はMicroAd Advent Calendar 2023の18日目の記事です。

この記事では Kubeflow Pipelines の使う上で必須の機能ではありつつも少し使い方に癖のある dsl.ExitHandler について、個人的な備忘録も兼ねてまとめようと思います。

dsl.ExitHandlerの使い方

dsl.ExitHandler は python の finally 句のように、タスクが正常終了した場合も異常終了した場合も必ず行う処理を設定するフロー制御機能です。

コンポーネントを登録する場合

dsl.ExitHandler の基本的な使い方は、以下のようにコンポーネントを終了処理として登録する方法です。

@dsl.component
def exit_op():
    print('exit_op')

@dsl.pipeline
def pipeline():
    exit_task = exit_op()
    
    with dsl.ExitHandler(exit_task):
        task1 = success_op()
        task2 = success_op().after(task1)
        task3 = fail_op().after(task2)

これを GCP サービスの Vertex AI Pepelines で実行すると以下のような結果が得られます。

スクリーンショット 2023-12-17 21.54.39.png

この例では fail-op が異常終了しているため後続のタスクが停止してしまうのですが、exit-op を dsl.ExitHandler で終了タスクとして登録することで通常通り実行することができます。

パイプラインを登録する場合

dsl.ExitHandler にはコンポーネントと同様にパイプラインも終了タスクとして登録することができます。

@dsl.component
def exit_op():
    print('exit_op')

@dsl.pipeline
def exit_pipeline():
    exit_op()

@dsl.pipeline
def pipeline():
    exit_task = exit_pp()
    
    with dsl.ExitHandler(exit_task):
        task1 = success_op()
        task2 = fail_op().after(task1)

スクリーンショット 2023-12-17 22.11.17.png

終了処理として複数のコンポーネントを登録したい場合に利用します。

dsl.PipelineTaskFinalStatus

dsl.ExitHandler でよく利用する機能そして dsl.PipelineTaskFinalStatus があります。

dsl.PipelineTaskFinalStatus は dsl.ExitHandler 内にパイプラインの最終ステータスを取得する機能で、終了タスクに以下の情報を渡すことができます。

属性名 内容
pipeline_job_resource_name projects/{project}/locations/{location}/pipelineJobs/{pipeline_job} 形式のパイプラインジョブリソース名
pipeline_task_name 最終タスク名
state 最終タスクのステータス
error_code エラーコード
error_message エラーメッセージ

コンポーネントで dsl.PipelineTaskFinalStatus を取得する場合は引数として定義します。

@dsl.component
def exit_op(status: dsl.PipelineTaskFinalStatus):
    print(status.pipeline_job_resource_name)
    print(status.pipeline_task_name)
    print(status.state)
    print(status.error_code)
    print(status.error_message)

@dsl.pipeline(name=name, pipeline_root=pipeline_root)
def pipeline():
    exit_task = exit_op()
    
    with dsl.ExitHandler(exit_task):
        task1 = success_op()
        task2 = fail_op().after(task1)

スクリーンショット 2023-12-17 22.19.45.png

パイプライン内で dsl.PipelineTaskFinalStatus の属性を参照する方法

dsl.PipelineTaskFinalStatus はコンポーネント内での参照を想定されているため、パイプラインで受け取った場合は属性を参照することができません。

パイプライン内で dsl.PipelineTaskFinalStatus の属性を参照する場合は、以下のようなコンポーネントで出力形式を変更する必要があります。

@dsl.component
def unpack_exit_status(status: dict) -> NamedTuple(
    'outputs',
    state=str,
    pipeline_job_resource_name=str,
    pipeline_task_name=str,
    error_code=int,
    error_message=str
):
    Outputs = NamedTuple(
        'outputs',
        state=str,
        pipeline_job_resource_name=str,
        pipeline_task_name=str,
        error_code=int,
        error_message=str,
    )
    return Outputs(
        state=status['state'],
        pipeline_job_resource_name=status['pipelineJobResourceName'],
        pipeline_task_name=status['pipelineTaskName'],
        error_code=status.get('error', {}).get('code', 0),
        error_message=status.get('error', {}).get('message', ''),
    )
5
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?