この記事はZOZOテクノロジーズ #3 Advent Calendar 2020 1日目の記事です。
やりたいこと
最近業務でKubeflow Pipelinesを扱うことが増えてきているのですが、コンポーネント間のデータの扱い方を忘れてしまうことが多いのでタイトルの通り、Kubeflow Pipelinesにおけるデータの扱い方についてまとめます。
より具体的には以下のようなコンポーネントへの入出力ケースについてまとめます。
No. | 入出力として扱うオブジェクト |
---|---|
1 | 単一の文字列、数値 |
2 | 複数の文字列、数値 |
3 | csvファイル |
4 | joblibファイル |
5 | 関数などのオブジェクト |
本記事ではNo.1, 2について解説し、No.3 ~ 5については来週(12/8)のアドベントカレンダーにて解説します。
環境
kfpは2020年12月現在、v1.1までリリースされていますが公式ドキュメントの方がバージョンアップデートに追いついていないのでv1.0を使っています。
$ pip install kfp==1.0
$ pip show kfp
Name: kfp
Version: 1.0.0
Summary: KubeFlow Pipelines SDK
...
今回実行するコンポーネントは全てKubeflow Pipelines SDKのfunc_to_container_op
アノテーションで作っているので、Dockerイメージなどを新たに作る必要はありません。
ありがたいことに、KFP側でよしなにやってくれます。
下記のパッケージが事前に宣言されているものとして書き進めます。基本的にはコピペで動作確認できるようなコードにしています。
from kfp.components import func_to_container_op
from kfp import dsl
# 以下をご自身のKubeflow Pipelinesの環境に置き換えてください
HOST = "https://your-kubeflow-pipelines-host"
単一の文字列、数値
入力として受ける場合
方法1
任意の文字列を入力として受け取ってprintするような関数は以下のようになります。
@func_to_container_op
def print_my_text(text: str):
'''コンテナ内で実行される関数:func_to_container_opアノテーションを付与'''
print(f"Output in print_my_text: {text}")
@dsl.pipeline(name='Text Passing Pipeline',
description='Output text in my component')
def my_pipeline():
'''パイプライン本体'''
consume_task1 = print_my_text(text='Hello world')
consume_task2 = print_my_text(text='Good-bye world')
kfp.Client(host=HOST).create_run_from_pipeline_func(my_pipeline, arguments={})
実行結果
Output in print_my_text: Hello world
Output in print_my_text: Good-bye world
Pythonで普通に関数を使う時と同じ要領なので難しいことは何もないと思います。
方法2
パイプライン側に引数を持たせてargumentsで指定することもできます。
以下の例ではパイプライン側にinput_text1
とinput_text2
という引数を持たせ、argumentsでprintする文字列を渡しています。
実行結果は方法1と全く同じです。
@func_to_container_op
def print_my_text(text: str):
'''コンテナ内で実行される関数:func_to_container_opアノテーションを付与'''
print(f"Output in print_my_text: {text}")
@dsl.pipeline(name='Text Passing Pipeline',
description='Output text in my component')
def my_pipeline(input_text1: str, input_text2: str):
'''パイプライン本体'''
consume_task1 = print_my_text(input_text1)
consume_task2 = print_my_text(input_text2)
kfp.Client(host=HOST).create_run_from_pipeline_func(my_pipeline, arguments={'input_text1':'Hello world',
'input_text2':'Good-bye world'})
出力として渡す場合
こちらも通常のpythonの関数と同じ要領で、return
句で次のコンポーネントに受け渡すことができます。
パイプライン内ではreturn
で返却された文字列を以下の2通りの方法で取得することができます。
-
.output
というインスタンス変数で取得 -
.outputs['output']
という辞書型のキーを指定して取得
以下の例ではprint_my_text
関数に渡された文字列に、新たに文字列を連結して次のコンポーネントへの入力として渡しています。
実行結果を見ると何をしているかわかりやすいと思います。
@func_to_container_op
def print_my_text(text: str) -> str:
'''コンテナ内で実行される関数:func_to_container_opアノテーションを付与'''
print(f"Output in print_my_text: {text}")
return text + ' from print_my_text'
@dsl.pipeline(name='Text Passing Pipeline',
description='Output text in my component')
def my_pipeline(input_text: str):
'''パイプライン本体'''
consume_task1 = print_my_text(input_text)
consume_task2 = print_my_text(consume_task1.output) # 方法1
consume_task3 = print_my_text(consume_task1.outputs['output']) # 方法2
kfp.Client(host=HOST).create_run_from_pipeline_func(my_pipeline, arguments={'input_text':'Hello world'})
実行結果
consume_task2
とconsume_task3
の引数として渡される値は同じであるため、同じ実行結果になっています。
Output in print_my_text: Hello world
Output in print_my_text: Hello world from print_my_text
Output in print_my_text: Hello world from print_my_text
複数の文字列、数値
入力として受ける場合
コンポーネントの入力に複数の引数を持つ場合の関数と同様です。
実行結果は割愛します。
@func_to_container_op
def print_my_some_values(text: str, num: int):
'''コンテナ内で実行される関数:func_to_container_opアノテーションを付与'''
print(f"Output in print_my_some_values: text:{text}, num:{num}")
@dsl.pipeline(name='Text Passing Pipeline',
description='Output text in my component')
def my_pipeline(input_text, input_num):
'''パイプライン本体'''
consume_task1 = print_my_some_values(text=input_text, num=input_num)
kfp.Client(host=HOST).create_run_from_pipeline_func(my_pipeline, arguments={'input_text':'Hello world', 'input_num':10})
出力として渡す場合
複数の出力を次のコンポーネントに渡す場合、単純に複数の値をreturn
するだけでは出力を得ることができません。
複数の出力を渡す方法はいくつかあるのですが、「Namedtuple
で渡したい変数を渡す」という方法が最もスマートと思われます。
以下のサンプルでは複数の値を受け取り、str型とint型の2つの値を返却しています。
返り値としてNamedtuple
でtext
, num
という属性名を指定しているため、次のコンポーネントに渡す際の参照方法が.outputs['text']
, .outputs['num']
のようになっています。
from typing import NamedTuple
@func_to_container_op
def produce_two_outputs(text: str, n1: int, n2:int) -> NamedTuple('Outputs', [('text', str), ('num', int)]):
print(f"Output in produce_two_outputs: text:{text}, n1:{n1}, n2:{n2}")
return (text + " from produce_two_outputs", n1 * n2)
@dsl.pipeline(name='Multi Values Passing Pipeline',
description='Output text in my component')
def my_pipeline(input_text, input_num):
'''パイプライン本体'''
consume_task1 = produce_two_outputs(text=input_text,
n1=input_num,
n2=11)
consume_task2 = produce_two_outputs(text=consume_task1.outputs['text'],
n1=consume_task1.outputs['num'],
n2=12)
kfp.Client(host=HOST).create_run_from_pipeline_func(my_pipeline, arguments={'input_text':'Hello world', 'input_num':10})
実行結果
consume_task2
の引数であるtext
にはconsume_task1
の返り値が渡されるため、Hello world from produce_two_outputs
が渡っています。
Output in produce_two_outputs: text:Hello world, n1:10, n2:11
Output in produce_two_outputs: text:Hello world from produce_two_outputs, n1:110, n2:12
まとめ
今回見てきたコンポーネントへの入力パターンは一般的なPythonの関数と変わらなかったと思います。
一方で、コンポーネントの出力は以下のように取得しました。
- 単一の値を次のコンポーネントに受け渡す場合:
task.output
ortask.outputs['output']
- 複数の値を次のコンポーネントに受け渡す場合:
Namedtuple
でreturnしてtask.outputs['key1']
,task.outputs['key2']
, ...
実際の業務でパイプラインを組む際には文字列や数値をそのまま受け渡すようなケースはそう多くなく、pandasのDataFrameや前処理をした画像などのオブジェクトがメインとなるため、今回のようなデータの受け渡し方はあまり使わないかもしれません。。
今回のデータ受け渡し方法の用途としては、後続のコンポーネントで使用する保存先ディレクトリのパス生成などでしょうか。
次回の記事では下記のNo.3 ~ 5について解説しますので、そちらも見ていただければと思います!
No. | 入出力として扱うオブジェクト |
---|---|
1 | 単一の文字列、数値 |
2 | 複数の文字列、数値 |
3 | csvファイル |
4 | joblibファイル |
5 | 関数などのオブジェクト |