初版: 2021/4/20
著者: 高久 隆史, 株式会社日立製作所
はじめに
KubeflowはKubernetes上でMLOpsを実現するためのOSSのツールキットです。2020/3にバージョン1.0がリリースされ、2021/4現在も活発に開発が行われており、機械学習を利用したシステムの開発/運用のライフサイクルを回すための有効な手段の1つとして注目されています。
本連載では、Kubeflowのバージョン1.2(連載開始時点の最新版、2020年11月リリース)について、構築手順、Kubeflow Pipelines、KFServingの基礎的な利用手順についての情報を紹介いたします。
第3回の本稿では、Kubeflow Pipelines上でscikit-learnの機械学習モデルを訓練・評価した際の手順を紹介いたします。
Kubeflow Pipelinesで独自のPythonコードを独自のDockerイメージも利用して動作させる、という汎用的な手順としても、是非参考にしてください。
投稿一覧:
- MLOpsを実現するKubeflowを前提のKubernetesも含めて構築する(前編)
- MLOpsを実現するKubeflowを前提のKubernetesも含めて構築する(後編)
- Kubeflow Pipelinesでscikit-learnの機械学習モデルを訓練・評価してみる (本投稿)
- Kubeflow KFServingでscikit-learnの学習済みモデルを用いた推論サービスを公開する
Kubeflow Pipelines概要
Kubeflow Pipelinesは、Dockerコンテナに基づく、ポータブルでスケーラブルな機械学習ワークフローを構築、およびデプロイするためのプラットフォームです。
機械学習ワークフローの各タスクを指定したDockerイメージ上で実行でき、実行結果はKubeflow Central Dashboardから確認できます。
シナリオ
本稿では、次に示すシナリオを利用して、Kubeflow Pipelinesの利用手順を示します。
なお、本手順ではAWSのEC2インスタンス上に構築したKubeflowを利用し、入出力データの保存先にはAmazon S3を利用します。
Kubeflowの構築手順については第1回目、および、第2回目の投稿をご参照ください。
シナリオの内容
# | 項目 | 内容 |
---|---|---|
1 | シナリオ概要 | Kubeflow Pipelinesを利用して、入力データの取得、データセットの分割、特長量生成、学習、予測、学習済データの保存の処理を行います。 |
2 | 機械学習ワークフローのタスクの構成 | 「入力データの取得~データセットの分割」および「特徴量生成~学習済データの保存」を行う2タスクとします。 |
3 | 入力データ | 機械学習の入力CSVデータ、および、ハイパーパラメータの値を入力とします。入力CSVデータはAmazon S3に配置されているものを参照します。 |
4 | 出力データ | Kubeflow Pipelinesの処理で作成した学習済みデータ「ml_pipelines.pickle」をAmazon S3に出力します。 |
5 | 各タスクが利用するDockerイメージ | Kubeflow Pipelinesがデフォルトで利用するDockerイメージ「python:3.7」に、今回の処理で利用するパッケージ「pandas」「sklearn」「boto3」のインストール、および、独自のPythonファイルの配置を実施したDockerイメージを作成して利用します。今回は2つのタスクがありますが、どちらも同じイメージを利用することとします。 |
Kubeflow Pipelines利用手順の流れ
今回のKubeflow Pipelinesの利用手順の流れを次に示します。
今回のKubeflow Pipelinesの利用手順の流れ
# | 手順 |
---|---|
1 | Kubeflow Pipelines用のPythonコードの作成 |
2 | 各タスクが利用するDockerイメージの作成 |
3 | Kubeflow PipelinesからAmazon S3にアクセスするための設定 |
4 | dsl-compileコマンドによるYAMLの圧縮ファイル(.tar.gz)の作成 |
5 | Kubeflow Central Dashboardからアップロードして実行 |
Kubeflow Pipelinesの利用手順
1. Kubeflow Pipelines用のPythonコードの作成
ここでは、「scikit-learnとFlaskによる機械学習モデルのサービング」の1~4章のコードをKubeflow Pipelinesで動作するように変更したものを例に説明いたします。
Pythonファイルは、特長量作成処理を独自クラスとして定義したファイル「preprocessor.py」とKubeflow Pipelines用の処理を定義した「pipelines_test.py」の2ファイルの構成とします。
特長量作成処理を独自クラスとして定義したファイル「preprocessor.py」は、「scikit-learnとFlaskによる機械学習モデルのサービング」のものをそのまま利用します。
「preprocessor.py」の内容を次に示します。
「preprocesssor.py」の内容
from sklearn.base import TransformerMixin
# 特徴量作成処理をTransformer化
class PreProcessor(TransformerMixin):
def fit(self, X, y):
return self
def transform(self, X):
# 不要な列を削除
X = X.drop(['Cabin','Name','PassengerId','Ticket'],axis=1)
#欠損値処理
X['Fare'] = X['Fare'].fillna(X['Fare'].median())
X['Age'] = X['Age'].fillna(X['Age'].median())
X['Embarked'] = X['Embarked'].fillna('S')
#カテゴリ変数の変換
X['Sex'] = X['Sex'].apply(lambda x: 1 if x == 'male' else 0)
X['Embarked'] = X['Embarked'].map(
{'S': 0, 'C': 1, 'Q': 2}).astype(int)
return X
次に、Kubeflow Pipelinesの仕様に従い、Kubeflow Pipelines用のPythonコードを作成します。
Kubeflow Pipelines の仕様は次のURLを参照してください。
https://www.kubeflow.org/docs/pipelines/sdk/sdk-overview/
Kubeflow Pipelines用のPythonコードは、次のブロックに分けて説明します。
Kubeflow Pipelines用のPythonコードの説明用ブロック分け
<(a) Kubeflow Pipelines のAPIに対するimport文の定義>
<(b) Kubeflow Pipelinesのフローで実行する各タスク用のメソッド定義>
<(c) 各タスク用のメソッドをfunc_to_container_opでラップ定義>
<(d) Kubeflow Pipelinesの処理内容の定義>
(a) Kubeflow Pipelines のAPIに対するimport文の定義
ここでは、Kubeflow Pipelines SDK APIライブラリのimport文を定義します。
Kubeflow Pipelines SDK APIの仕様は以下に記載されていますので、必要に応じて参照してください。
https://kubeflow-pipelines.readthedocs.io/en/stable/index.html
コード例(a) Kubeflow Pipelines のAPIに対するimport文の定義
import kfp
from kfp.aws import use_aws_secret
(b) Kubeflow Pipelinesのフローで実行する各タスク用のメソッド定義
ここでは、Kubeflow Pipelinesの各タスク用のメソッドをdefで定義します。なお、タスク内で利用するライブラリのimport文は、各タスク用メソッド内で定義します。
コード例(b) Kubeflow Pipelinesのフローで実行する各タスク用のメソッド定義
# Kubeflow Pipelinesで実行するタスク用のメソッドを定義(その1)。
# ここでは、入力データの取得および分割を実施。
def prepare(
input_file_name: str,
output_csv_path1: kfp.components.OutputPath('CSV'),
output_csv_path2: kfp.components.OutputPath('CSV')
):
import pandas as pd
from sklearn.model_selection import train_test_split
from io import StringIO
import boto3
# Amazon S3に配置されている入力データを取得。
BUCKET_NAME = 'kubeflow-pl'
client = boto3.client('s3')
obj = client.get_object(Bucket=BUCKET_NAME, Key=input_file_name)
content = obj['Body'].read().decode('utf-8')
df = pd.read_csv(StringIO(content))
# 入力データを分割して出力。
(train, test) = train_test_split(df, test_size = 0.3)
train.to_csv(output_csv_path1, index=True)
test.to_csv(output_csv_path2, index=True)
print('train: ' + str(len(train)))
print('test : ' + str(len(test)))
# Kubeflow Pipelinesで実行するタスク用のメソッドを定義(その2)。
# ここでは、特徴量生成、学習、予測、学習済モデルの保存を実施。
def analysis(
random_state_value: int,
n_estimators_value: int,
input_csv_path1: kfp.components.InputPath('CSV'),
input_csv_path2: kfp.components.InputPath('CSV')
):
import boto3
import pandas as pd
import pickle
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
from sklearn.pipeline import Pipeline
import sys
# 独自の特徴量作成処理をimport
sys.path.append('/var/preprocessor/')
from preprocessor import PreProcessor
# 引数で受けとった訓練/評価用データから説明変数と目的変数を抽出。
train = pd.read_csv(input_csv_path1, index_col=0)
test = pd.read_csv(input_csv_path2, index_col=0)
X_train = train.drop('Survived', axis=1)
X_test = test.drop('Survived', axis=1)
y_train = train.Survived
y_test = test.Survived
# 学習/予測パイプラインを定義
ml_pipeline = Pipeline([
# 特徴量作成
('preprocessor', PreProcessor()),
# 訓練または予測する(モデルにランダムフォレストを使用)
('random_forest', RandomForestClassifier(
random_state=random_state_value, n_estimators=n_estimators_value))])
# パイプラインで訓練を実行
ml_pipeline.fit(X_train, y_train)
# パイプラインで予測を実行
pred = ml_pipeline.predict(X_test)
# 予測結果の評価指標を確認。
# 正解率 (Accuracy)
print('Accuracy : ', accuracy_score(y_test, pred))
# 精度 (Precision)
print('Precision: ', precision_score(y_test, pred))
# 検出率 (Recall)
print('Recall : ', recall_score(y_test, pred))
# F値 (F-measure)
print('F-measure: ', f1_score(y_test, pred))
# 学習済モデルをpickleファイルに保存し、Amazon S3にアップロード。
filename = 'ml_pipeline.pickle'
pickle.dump(ml_pipeline, open(filename, 'wb'))
BUCKET_NAME = 'kubeflow-pl'
s3 = boto3.resource('s3')
bucket = s3.Bucket(BUCKET_NAME)
bucket.upload_file(filename, filename)
(c) 各タスク用のメソッドをfunc_to_container_opでラップ定義
ここでは、(b)で定義した各タスク用メソッドをkfp.components.func_to_container_opでラップ定義します。各タスク実行時に利用するDockerイメージはbase_image引数で指定でき、省略した場合はデフォルトのDockerイメージを利用します。kfp.components.func_to_container_opの仕様については、次のURLを参照してください。
https://kubeflow-pipelines.readthedocs.io/en/stable/source/kfp.components.html#kfp.components.func_to_container_op
コード例(c) 各タスク用のメソッドをfunc_to_container_opでラップ定義
# タスク用のメソッドをfunc_to_container_opでラップ。
# タスク実行時に利用するDockerイメージはここで指定。
prepare_op = kfp.components.func_to_container_op(
func = prepare,
base_image = 'user_name/mypipe:0.01'
)
analysis_op = kfp.components.func_to_container_op(
func = analysis,
base_image = 'user_name/mypipe:0.01'
)
(d) Kubeflow Pipelinesの処理内容の定義
ここでは、(c)でラップ定義した各タスクのメソッドを順に呼び出し、Kubeflow Pipelinesが実行するフローを決定するメソッドを定義します。
このメソッドは、@kfp.dsl.pipelineを指定して定義します。メソッドの引数には、Kubeflow Pipelinesを実行するときに指定したい項目を定義します。
今回の例では、各タスクの中でAmazon S3にアクセスするため、kfp.aws.use_aws_secretも利用しています。
kfp.dsl.pipeline、および、kfp.aws.use_aws_secretの仕様ついては、次のURLを参照してください。
https://kubeflow-pipelines.readthedocs.io/en/stable/source/kfp.dsl.html#kfp.dsl.pipeline
https://kubeflow-pipelines.readthedocs.io/en/stable/source/kfp.extensions.html?highlight=kfp.aws.use_aws_secret#kfp.aws.use_aws_secret
コード例(d) Kubeflow Pipelinesの処理内容の定義
# Kubeflow Pipelinesの処理内容を定義。
@kfp.dsl.pipeline(
name='Sample Pipeline',
description='My machine learning pipeline'
)
def my_pipeline(
# 各引数の値はデフォルト値。Kubeflow Pipelines実行時にブラウザから指定可能。
input_file_name='train.csv',
random_state_value='0',
n_estimators_value='10'
):
# prepareタスク、analysisタスクの順に実行。
prepare_task = prepare_op(input_file_name).apply(
use_aws_secret(
'aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'
)
)
analysis_task = analysis_op(
random_state_value,
n_estimators_value,
prepare_task.outputs['output_csv_path1'],
prepare_task.outputs['output_csv_path2']
).apply(
use_aws_secret(
'aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'
)
)
2. 各タスクが利用するDockerイメージの作成
Kubeflow Pipelinesでは、機械学習ワークフローの各タスクが利用するDockerイメージを指定できます。指定しなかった場合は、「python:3.7」(本検証時点でのデフォルト)というイメージを利用します。
今回は、上記Dockerイメージ「python:3.7」に、今回の処理で利用するパッケージ「pandas」「sklearn」「boto3」のインストール、および、独自のPythonファイル「preprocessor.py」の配置を実施したDockerイメージを作成して利用します。
Dockerイメージの作成手順は次のURLを参考にしています。
https://www.kubeflow.org/docs/pipelines/sdk/component-development/#example-dockerfile
なお、Dockerのイメージ名([Dockerユーザ名]/[イメージ名]:[タグ名])を、「user_name/mypipe:0.01」とした場合の例を記載しています。
# Kubeflowを構築した環境にSSHでログインします。
# 作業ディレクトリの準備
export WORK_DIR=~/mywork
mkdir -p $WORK_DIR
cd $WORK_DIR
# 作業ディレクトリに「1. Kubeflow Pipelines用のPythonコードの作成」で作成した「preprocessor.py」を配置します。
# Dockerfileの作成
cat <<EOF > Dockerfile
FROM python:3.7
RUN python3 -m pip install pandas
RUN python3 -m pip install sklearn
RUN python3 -m pip install boto3
RUN mkdir -p /var/preprocessor
COPY ./preprocessor.py /var/preprocessor/
EOF
# docker loginの実行(自分のDockerユーザ)
sudo docker login
# DockerfileからDockerイメージの作成
# sudo docker build -t [Dockerユーザ名]/[イメージ名]:[タグ名] [Dockerfileのあるディレクトリパス]
sudo docker build -t user_name/mypipe:0.01 .
# --- 実行結果例 ここから -----------------------------------------------------------
# …
# Successfully built 8eda105d08ad
# Successfully tagged user_name/mypipe:0.01
# --- 実行結果例 ここまで -----------------------------------------------------------
# 作成したイメージの確認
sudo docker images --digests user_name/mypipe:0.01
# --- 実行結果例 ここから -----------------------------------------------------------
# REPOSITORY TAG DIGEST IMAGE ID CREATED SIZE
# mypipe 0.01 <none> 8eda105d08ad 3 minutes ago 1.31GB
# --- 実行結果例 ここまで -----------------------------------------------------------
# 作成したイメージのアップロード
sudo docker push user_name/mypipe:0.01
# --- 実行結果例 ここから -----------------------------------------------------------
The push refers to repository [docker.io/user_name/mypipe]
b1407abce774: Pushed
b9f70b486d82: Pushed
fa04049f0b53: Pushed
4d661d9a5ebd: Pushed
4f2333334801: Pushed
3da8662a6eed: Mounted from library/python
6a6ea1335e48: Mounted from library/python
4324e0912cc9: Mounted from library/python
59840d625c92: Mounted from library/python
da87e334550a: Mounted from library/python
c5f4367d4a59: Mounted from library/python
ceecb62b2fcc: Mounted from library/python
193bc1d68b80: Mounted from library/python
f0e10b20de19: Mounted from library/python
0.01: digest: sha256:0459f6ffa841b79466c6c002f6cc6474d55aa28469fb7a28cb97f88341a44225 size: 3268
# --- 実行結果例 ここまで -----------------------------------------------------------
※補足
パッケージのインストールは、前述のPythonコードにて、各タスクの中でsubprocess.runメソッドを実行する、または、func_to_container_opメソッドのpackages_to_installパラメタを利用する、という方法で、各タスクの動作時に動的に実行することもできます。ただし、これらの方法の場合、Kubeflow Pipelinesの実行時に毎回パッケージのインストール処理が発生してしまうデメリットがあります。
動的にパッケージをインストールする例(各タスクの中でsubprocess.runを実行)
…
def prepare(…):
import subprocess
subprocess.run(['pip', 'install', 'pandas'])
subprocess.run(['pip', 'install', 'sklearn'])
subprocess.run(['pip', 'install', 'boto3'])
…
動的にパッケージをインストールする例(func_to_container_opメソッドのpackages_to_installパラメタを利用)
…
prepare_op = kfp.components.func_to_container_op(
func = task1,
packages_to_install = ['pandas', 'sklearn', 'boto3']
)
…
3. Kubeflow PipelinesからAmazon S3にアクセスするための設定
Kubeflow PipelinesからAmazon S3にアクセスするための設定を行います。
https://www.kubeflow.org/docs/aws/pipeline/#s3-access-from-kubeflow-pipelines
# Kubeflowを構築した環境にSSHでログインします。
# AWS_ACCESS_KEY_ID、および、AWS_SECRET_ACCESS_KEYの値をBASE64エンコードして設定
# <…の値>部分は環境に合わせて指定します。
export AWS_ACCESS_KEY_ID_BASE64=`echo -n "<AWS_ACCESS_KEY_IDの値>" | base64`
export AWS_SECRET_ACCESS_KEY_BASE64=`echo -n "<AWS_SECRET_ACCESS_KEYの値>" | base64`
# Kubeflow Central Dashboardで利用しているネームスペースを設定。
# ここでは「ns-agent」とした場合の例を記載しています。
export MY_NAMESPACE=ns-agent
# AWS認証用のYAMLファイルの作成
cat <<EOF > my_aws_secret.yaml
apiVersion: v1
kind: Secret
metadata:
name: aws-secret
namespace: $MY_NAMESPACE
type: Opaque
data:
AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID_BASE64
AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY_BASE64
EOF
# AWS認証情報のKubernetesへの設定
kubectl apply -f my_aws_secret.yaml
# --- 実行結果例 ここから -----------------------------------------------------------
# secret/aws-secret created
# --- 実行結果例 ここまで -----------------------------------------------------------
4. dsl-compileコマンドによるYAMLの圧縮ファイル(.tar.gz)の作成
dsl-compileコマンドを利用して、作成したPythonコードからYAMLの圧縮ファイルを作成します。
https://www.kubeflow.org/docs/pipelines/sdk/build-component/#compile-the-pipeline
# Kubeflowを構築した環境にSSHでログインします。
# dsl-compileコマンドを利用するため、Kubeflow Pipelines SDKをインストールします。
# https://www.kubeflow.org/docs/components/pipelines/sdk/install-sdk/#install-the-kubeflow-pipelines-sdk
pip3 install kfp --upgrade
# dsl-compileコマンドを実行します。
# dsl-compile --py <作成したKubeflow Pipelines用Pythonファイルパス> --output <出力ファイルパス>
dsl-compile --py pipelines_test.py --output pipelines_test.tar.gz
5. Kubeflow Central Dashboardからアップロードして実行
-
ブラウザからKubeflow Central Dashboardにアクセスします。
https://<構築したマシンのIPアドレス>:31380/ -
左ペインの「Pipelines」をクリックして、「Pipelines」ページを表示し、右上の「+ Upload pipelines」をクリックします。
- 表示された「Upload Pipeline or Pipeline Version」ページで、「Upload a file」、「Choose file」をクリックし、dsl-compileコマンドで出力した.tar.gzファイルを指定します。指定すると「Pipelines Name」テキストボックスに名称が自動入力されます。また、「Pipelines Description」に任意の値を入力し、Createボタンをクリックします。
Upload Pipelines or Pipeline Versionページ
アップロードしたPipelineのページに遷移し、Pythonコードで定義したPrepareタスクとAnalysisタスクのフローが表示されます。
- 上記ページの「+ Create experiment」をクリックしたあと、次の「New experiment」ページで「Experiment name」に任意の値を入力して、「Next」をクリックします。
- 次の「Start a run」ページで、「Start」をクリックします。なお、Run parametersの値はPythonコードで指定した値がデフォルトとして指定されていますが、ここで変更できます。
- 作成したExperimentのページが出力されます。しばらく待ったあと「Refresh」をクリックすると、Statusのアイコンが?からチェックマークに変わります。
上記ページの今回実行した「Run of pipelines_test~」の部分をクリックすると、実行結果が確認できます。
6. 実行結果の確認
実行結果の各タブのSSを次に記載します。
- Graphタブでは、Pipelineのタスクのフローが出力されます。正常終了したタスクがチェックマークになっています。
- Run outputタブでは、Pipelinesの出力データが出力されます。今回の例では、特に出力データがない(analysisタスク内の処理でS3に学習済モデルデータを出力していますがPipelineとしての出力はない)ため、何も出力されていません。
- Configタブでは、実行時のステータス、タイムスタンプ、パラメタが出力されます。
- Graphタブの各タスクをクリックすると、クリックしたタスクの表示欄が出力されます。
タスクのInput/Outputタブでは、当該タスクの入出力パラメタの値が出力されます。
- タスクのVisualizationsタブでは、出力データを視覚化して出力できると想定していますが、今回はそこまで検証できていません。本機能の詳細は、Kubeflow公式サイトの「Visualize Results in the Pipelines UI」のページを参照してください。
https://www.kubeflow.org/docs/components/pipelines/sdk/output-viewer/
- タスクのML Metadataタブでは、実行時のメタデータが出力されます。
- タスクのVolumesタブでは、Pipelineで利用したVolumeの情報が出力されると想定していますが、今回はVolumeを利用していないため、何も出力されていません。
- タスクのlogsタブでは、タスク実行時の標準出力やエラー情報が出力されます。
- タスクのPodタブでは、タスク実行時のPod情報が出力されます。
- タスクのEventsタブでは、タスク実行時のイベント情報が出力されます。
REST APIについて
今回は、作成したPipeline(.tar.gz)のアップロードや実行をブラウザから実施する手順を紹介しましたが、Kubeflow Pipelinesには、REST APIも用意されており、このAPIを利用して、Pipelinesのアップロードや実行をシステムに組み込んで任意のタイミングで実行することもできます。
Kubeflow PipelinesのREST APIの使い方や仕様については、以下のページを参照してください。
https://www.kubeflow.org/docs/components/pipelines/overview/interfaces/
https://www.kubeflow.org/docs/components/pipelines/reference/api/kubeflow-pipeline-api-spec/
おわりに
本稿では、Kubeflow Pipelines上でscikit-learnの機械学習モデルを訓練・評価した際の手順を紹介しました。次回はKFServingの利用手順を紹介します。