この記事はMicroAd Advent Calendar 2023の18日目の記事です。
この記事では Kubeflow Pipelines の使う上で必須の機能ではありつつも少し使い方に癖のある dsl.ExitHandler について、個人的な備忘録も兼ねてまとめようと思います。
dsl.ExitHandlerの使い方
dsl.ExitHandler は python の finally 句のように、タスクが正常終了した場合も異常終了した場合も必ず行う処理を設定するフロー制御機能です。
コンポーネントを登録する場合
dsl.ExitHandler の基本的な使い方は、以下のようにコンポーネントを終了処理として登録する方法です。
@dsl.component
def exit_op():
print('exit_op')
@dsl.pipeline
def pipeline():
exit_task = exit_op()
with dsl.ExitHandler(exit_task):
task1 = success_op()
task2 = success_op().after(task1)
task3 = fail_op().after(task2)
これを GCP サービスの Vertex AI Pepelines で実行すると以下のような結果が得られます。
この例では fail-op
が異常終了しているため後続のタスクが停止してしまうのですが、exit-op
を dsl.ExitHandler で終了タスクとして登録することで通常通り実行することができます。
パイプラインを登録する場合
dsl.ExitHandler にはコンポーネントと同様にパイプラインも終了タスクとして登録することができます。
@dsl.component
def exit_op():
print('exit_op')
@dsl.pipeline
def exit_pipeline():
exit_op()
@dsl.pipeline
def pipeline():
exit_task = exit_pp()
with dsl.ExitHandler(exit_task):
task1 = success_op()
task2 = fail_op().after(task1)
終了処理として複数のコンポーネントを登録したい場合に利用します。
dsl.PipelineTaskFinalStatus
dsl.ExitHandler でよく利用する機能そして dsl.PipelineTaskFinalStatus があります。
dsl.PipelineTaskFinalStatus は dsl.ExitHandler 内にパイプラインの最終ステータスを取得する機能で、終了タスクに以下の情報を渡すことができます。
属性名 | 内容 |
---|---|
pipeline_job_resource_name |
projects/{project}/locations/{location}/pipelineJobs/{pipeline_job} 形式のパイプラインジョブリソース名 |
pipeline_task_name |
最終タスク名 |
state |
最終タスクのステータス |
error_code |
エラーコード |
error_message |
エラーメッセージ |
コンポーネントで dsl.PipelineTaskFinalStatus を取得する場合は引数として定義します。
@dsl.component
def exit_op(status: dsl.PipelineTaskFinalStatus):
print(status.pipeline_job_resource_name)
print(status.pipeline_task_name)
print(status.state)
print(status.error_code)
print(status.error_message)
@dsl.pipeline(name=name, pipeline_root=pipeline_root)
def pipeline():
exit_task = exit_op()
with dsl.ExitHandler(exit_task):
task1 = success_op()
task2 = fail_op().after(task1)
パイプライン内で dsl.PipelineTaskFinalStatus の属性を参照する方法
dsl.PipelineTaskFinalStatus はコンポーネント内での参照を想定されているため、パイプラインで受け取った場合は属性を参照することができません。
パイプライン内で dsl.PipelineTaskFinalStatus の属性を参照する場合は、以下のようなコンポーネントで出力形式を変更する必要があります。
@dsl.component
def unpack_exit_status(status: dict) -> NamedTuple(
'outputs',
state=str,
pipeline_job_resource_name=str,
pipeline_task_name=str,
error_code=int,
error_message=str
):
Outputs = NamedTuple(
'outputs',
state=str,
pipeline_job_resource_name=str,
pipeline_task_name=str,
error_code=int,
error_message=str,
)
return Outputs(
state=status['state'],
pipeline_job_resource_name=status['pipelineJobResourceName'],
pipeline_task_name=status['pipelineTaskName'],
error_code=status.get('error', {}).get('code', 0),
error_message=status.get('error', {}).get('message', ''),
)