警告
本記事はTeradata CorporationのサイトGetting Startedに掲載された内容を抄訳したものです。掲載内容の正確性・完全性・信頼性・最新性を保証するものではございません。正確な内容については、原本をご参照下さい。
また、修正が必要な箇所や、ご要望についてはコメントをよろしくお願いします。
Vertex AIは、データサイエンティストがMLモデルを開発・展開するためのGoogleの単一環境であり、実験から展開、モデルの管理・監視に至るまで一貫して行うことができます。このチュートリアルでは、Vantage Analyticsの機能をVertex AI ML Pipelinesに統合する方法を紹介します。2つのパイプラインを作成します。
⓵ 最初のステップでは Vantage でデータを変換してトレーニング用のファイルをエクスポートし、2 番目のステップでは scikit-learn を使用してモデルをトレーニングし、最後のステップでは Teradata Vantage の BYOM (Bring Your Own Model) 機能を使用してモデルを Vantage にデプロイします。
⓶スコアリング - 2 番目のパイプラインでは、1 番目のパイプラインで作成されたモデルを使用して、Vantage 上のテーブルに保存された新しいデータをスコアリングします。
どちらのパイプラインも非常にシンプルですが、最初のパイプラインは、生産モデルがドリフトしたときに、新しいデータでモデルを再トレーニングするためにトリガーすることができます。2番目のパイプラインは、スコアリングのための新しいデータが利用可能になったときに、定期的に実行することができます。
前提条件
・Google Cloudアカウント - 登録はこちら
・Kaggleアカウント - 登録はこちら
Vantageのセットアップとデータの読み込み
ノートブック環境のセットアップ
import sys
!{sys.executable} -m pip install --upgrade --force-reinstall ipython-sql
!{sys.executable} -m pip install teradatasqlalchemy teradataml kaggle ipython-sql kfp
Vantageインスタンスのセットアップ
Run Vantage Express on Google Cloudの手順に従って、Vantageのセットアップを行います。VMをインターネットに接続するための手順に従ってください。
GCSバケットの作成
KubeFlowで管理する成果物を格納するために、GCSバケットが必要です。
バケット名を定義します。
BUCKET_NAME = "<your-bucket-name>"
バケットが存在しない場合は、先にバケットを作成します。
!gsutil ls -b gs://$BUCKET_NAME || gsutil mb gs://$BUCKET_NAME
Vertex AIにバケットへのアクセス権限を付与します。
GCSコンソールのIAMタブを開き、デフォルトのCompute EngineにStorage Adminロールを割り当てます。デフォルトのCompute Engineアカウントのプリンシパルは以下のような感じです。project-id- compute@developer.gserviceaccount.com.
サンプルデータのダウンロード
ここでは、Kaggleから取得できるBoston Housingのデータセットを使用します。Kaggleのアカウントにログインします。右上のユーザーアイコンをクリックし、「アカウント」を選択します。APIセクションを探し、Create New API Tokenをクリックします。これにより、kaggle.jsonファイルが作成されます。kaggle.jsonを開き、ユーザー名とキーをコピーします。セルに値を代入し、実行します。
%env KAGGLE_USERNAME=<your-kaggle-username>
%env KAGGLE_KEY=<your-kaggle-key>
!kaggle datasets download -f housing.csv vikrishnan/boston-house-prices
Vantage にトレーニング データをロードする
Vantage のインスタンスを指す DATABASE_URL 環境変数を設定します。BYOMパッケージがインストールされているmldbデータベースをデフォルトとしていることを確認してください。
DATABASE_URL='teradatasql://dbc:dbc@34.121.78.209/mldb'
%env DATABASE_URL=$DATABASE_URL
import pandas
import os
df=pandas.read_fwf('housing.csv', names=['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B', 'LSTAT', 'MEDV'])
df.to_sql('housing', con=DATABASE_URL, index=False)
このチュートリアルでは、学習済みモデルを格納するテーブルと、モデルでスコアリングしたい新しいデータを格納するテーブルが必要です。Vantageインスタンス上でteradatasqlを使用して以下のSQLを実行します。
%%sql
CREATE SET TABLE demo_models (model_id VARCHAR (30), model BLOB) PRIMARY INDEX (model_id);
CREATE SET TABLE test_housing (ID INTEGER, CRIM FLOAT, ZN FLOAT,INDUS FLOAT,CHAS INTEGER,NOX FLOAT,RM FLOAT,
AGE FLOAT,DIS FLOAT, RAD INTEGER,TAX INTEGER,PTRATIO FLOAT,B FLOAT,LSTAT FLOAT) PRIMARY INDEX (CRIM);
INSERT INTO test_housing (ID, CRIM, ZN, INDUS, CHAS, NOX, RM, AGE, DIS, RAD, TAX, PTRATIO, B, LSTAT)
VALUES (1,.02,0.0,7.07,0,.46,6.4,78.9,4.9,2,242,17.8,396.9,9.14);
Kubeflowを使ったモデルの学習とデプロイを行う最初のパイプライン
これで、パイプラインのコンポーネントを作成する準備が整いました。Vertex AI Pipelinesは、Kubeflow Pipelines SDKまたはTensorFlow Extendedを使って構築されたパイプラインを実行することができます。今回のscikit-learnを使った簡単な例では、Kubeflow Pipelines SDKを使用することにします。
この例では、以下の3つのコンポーネントを作成します。
・リードデータフロムバンテージ
■入力:VantageをホストするVMのipadr
■出力: トレーニングとテスト用のデータを含むcsvファイル
・トレーニングモデル
■入力: 学習とテストのためのデータを含むcsvファイル
■出力: モデルを含むファイル
■出力: モデルの性能を示すメトリックアーティファクト
・デプロイメントモデル
■入力: モデルを含むファイル
まず、Kubeflow Pipelineのコンポーネントとdslのパッケージをインポートします。
import kfp.v2.dsl as dsl
from kfp.v2.dsl import (
component,
Input,
Output,
Dataset,
Model,
Metrics,
)
Vantageからデータを読み込むコンポーネントを作成します。
最初のコンポーネントは、Vantageウェアハウスからデータを読み込みます(上記を参照し、Google CloudでVantage Expressを設定していることを確認してください。インターネットからVantageにアクセスできるように、VMへのファイアウォールを開くことも含まれています)。
このコンポーネントは、入力パラメータとして渡された接続文字列を使用して Vantage に接続し、Vantage の mldb.housing テーブルから行を読み取って、データを Output[Dataset] に出力します。出力は、コンポーネント間でデータを受け渡すために使用される一時ファイルです (コンポーネント間のデータ受け渡しに関する詳細は、こちらを参照してください)。
コンポーネントは、sqlalchemyを使用してTeradataと通信します。各コンポーネントはKubernetes上の別のコンテナで実行されるため、すべてのインポート文はコンポーネント内で行う必要があります。今回は、teradatasqlがインストール済みのベースイメージを作成しました。base_image='python'を渡すと、コンポーネントはそのイメージを使用してコンテナを作成します。 packages_to_installパラメータは、コンポーネントが他にどのようなパッケージを必要とするかを定義します。
@component(base_image='python', packages_to_install=['teradatasqlalchemy'])
def read_data_from_vantage(
connection_string: str,
output_file: Output[Dataset]
):
import sqlalchemy
file_name = output_file.path
engine = sqlalchemy.create_engine(connection_string)
with engine.connect() as con:
rs = con.execute('SELECT * FROM housing')
with open(output_file.path, 'w') as output_file:
output_file.write('CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT,MEDV\n')
for row in rs:
output_file.write(','.join([str(i) for i in row]) + '\n')
トレインモデルコンポーネントの作成
次に、学習データを使ってモデルを学習させるためのコンポーネントを作成します。
このコンポーネントへの入力は、前のコンポーネントからのファイルです。出力はjoblib.dumpを使って学習したモデルのファイルと、テストデータのファイルです。
このコンポーネントはscikit-learnとpandasを使用するので、packages_to_install= ['pandas==1.3.5','scikit-learn']を渡す必要があります - これはコンテナの作成時にパッケージをインストールするようにKubeflowに指示するものです。
@component(base_image='teradata/python-sklearn2pmml', packages_to_install=['pandas==1.3.5','scikit-learn','sklearn-pandas==1.5.0'])
def train_model(
input_file : Input[Dataset],
output_model: Output[Model],
output_metrics: Output[Metrics]
):
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import StandardScaler
from sklearn import metrics
from sklearn_pandas import DataFrameMapper
import joblib
from sklearn2pmml.pipeline import PMMLPipeline
from sklearn2pmml import sklearn2pmml
df = pd.read_csv(input_file.path)
train, test = train_test_split(df, test_size = .33)
train = train.apply(pd.to_numeric, errors='ignore')
test = test.apply(pd.to_numeric, errors='ignore')
target = 'MEDV'
features = train.columns.drop(target)
pipeline = PMMLPipeline([
("mapping", DataFrameMapper([
(['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B', 'LSTAT'], StandardScaler())
])),
("rfc", RandomForestRegressor(n_estimators = 100, random_state = 0))
])
pipeline.fit(train[features], train[target])
y_pred = pipeline.predict(test[features])
metric_accuracy = metrics.mean_squared_error(y_pred,test[target])
output_metrics.log_metric('accuracy', metric_accuracy)
output_model.metadata['accuracy'] = metric_accuracy
joblib.dump(pipeline, output_model.path)
モデルをデプロイするためのコンポーネントを作成
最後のコンポーネントは、モデルをロードし、テストデータ上でそれをテストします。Output[Metrics]は、モデルのパフォーマンスを示すアーティファクトを作成し、Runtime Graphで可視化することができます。
@component(base_image='teradata/python-sklearn2pmml')
def deploy_model(
connection_string: str,
input_model : Input[Model],
):
import sqlalchemy
import teradataml as tdml
import joblib
from sklearn2pmml.pipeline import PMMLPipeline
from sklearn2pmml import sklearn2pmml
engine = sqlalchemy.create_engine(connection_string)
tdml.create_context(tdsqlengine = engine)
pipeline = joblib.load(input_model.path)
sklearn2pmml(pipeline, "test_local.pmml", with_repr = True)
model_id = 'housing_rf'
model_file = 'test_local.pmml'
table_name = 'demo_models'
tdml.configure.byom_install_location = "mldb"
try:
res = tdml.save_byom(model_id = model_id, model_file = model_file, table_name = table_name)
except Exception as e:
# if our model exists, delete and rewrite
if str(e.args).find('TDML_2200') >= 1:
res = tdml.delete_byom(model_id = model_id, table_name = table_name)
res = tdml.save_byom(model_id = model_id, model_file = model_file, table_name = table_name)
pass
else:
raise
パイプラインを実行するための関数を作成
では、パイプラインの各コンポーネントを実行するための関数を作成します。
@dsl.pipeline(
name='run-vantage-pipeline',
description='An example pipeline that connects to Vantage.',
)
def run_vantage_pipeline_vertex(
connection_string: str
):
data_file = read_data_from_vantage(connection_string).output
test_model_data = train_model(data_file)
deploy_model(connection_string,test_model_data.outputs['output_model'])
パイプラインをコンパイルします。パイプラインはpackage_pathとして指定された名前のjsonファイルに保存されます。
from kfp.v2 import compiler
compiler.Compiler().compile(pipeline_func=run_vantage_pipeline_vertex,
package_path='train_housing_pipeline.json')
ここでVertex AIクライアントを使用して、パイプラインを実行します。google.cloud.aiplatform パッケージをインポートします。Vertex AIは一時ファイル用にCloud Storageバケットを必要とします。上記のjsonファイルを使って新しいジョブを作成し、パラメータとしてipaddrを渡します。そして、ジョブを投入します。
ジョブが開始されると、Runtime Graphに移動するリンクが表示されます。
import google.cloud.aiplatform as aip
pipeline_root_path = 'gs://' + BUCKET_NAME
job = aip.PipelineJob(
display_name="housing_training_deploy",
template_path="train_housing_pipeline.json",
pipeline_root=pipeline_root_path,
parameter_values={
'connection_string': DATABASE_URL
}
)
job.submit()
モデルメトリクスの検査
パイプラインの実行が完了すると(グラフ内の各コンポーネントに緑色のチェックマークが付くはずです)。各コンポーネントをクリックすると、実行の詳細や作成されたログを確認することができます。output_metricsアーティファクトをクリックすると、Pipeline run analysisウィンドウのNode Infoにモデルの精度が表示されます。Metricsアーティファクトを使用して渡すことができる他のメトリクスや可視化については、こちらで詳しく説明しています)。
デプロイされたモデルのテスト
先ほどデプロイしたモデルを、新しいデータのスコアリングによってテストしてみましょう。teradataml ドライバを使って、保存されたモデルを取得し、新しいデータでテーブルの行を採点してみましょう。
import teradataml as tdml
import sqlalchemy
import os
engine = sqlalchemy.create_engine(DATABASE_URL)
eng = tdml.create_context(tdsqlengine = engine)
#indicate the database that BYOM is using
tdml.configure.byom_install_location = "mldb"
tdf_test = tdml.DataFrame('test_housing')
modeldata = tdml.retrieve_byom("housing_rf", table_name="demo_models")
predictions = tdml.PMMLPredict(
modeldata = modeldata,
newdata = tdf_test,
accumulate = ['ID']
)
predictions.result.to_pandas()
新しいデータをスコアリングするための新しいパイプラインを作成する
このパイプラインは、teradatasqlドライバを使用してSQLクエリを実行し、demo_modelテーブルからモデルを取得し、test_housingテーブルの行にスコアを付けるというコンポーネントを1つだけ持っています。
@component(base_image='teradata/python-sklearn2pmml', packages_to_install=['pandas==1.3.5','scikit-learn'])
def score_new_data(
connection_string: str,
model_name: str,
model_table: str,
data_table: str,
prediction_table: str
):
import teradataml as tdml
import sqlalchemy
engine = sqlalchemy.create_engine(connection_string)
with engine.connect() as con:
con.execute(f'CREATE TABLE {prediction_table} AS (SELECT * FROM mldb.PMMLPredict ( ON {data_table} ON (SELECT * FROM {model_ta
run_new_data_scoreパイプラインは、以下のパラメータを受け取ります。
・model_name: モデルのID
・model_table: モデルを保存しているテーブルの名前
・data_table: スコアリングするための新しいデータを格納するテーブルの名前
・prediction_table: スコアリングの結果を格納するテーブルの名前
パイプラインが実行されると、ダッシュボードに使用する値を入力するフィールドが表示されます。
@dsl.pipeline(
name='new-data-pipeline',
description='An example of a component that scores new data with a saved model.',
)
def run_new_data_score(
connection_string: str,
model_name: str,
model_table: str,
data_table: str,
prediction_table: str
):
score_new_data(DATABASE_URL,model_name,model_table,data_table,prediction_table)
パイプラインをコンパイルするには、以下のコードを実行してください。パイプラインは score_new_data_pipeline_sql.json ファイルに保存されます。
compiler.Compiler().compile(pipeline_func=run_new_data_score,
package_path='score_new_data_pipeline_sql.json')
これからVertex AI Pipelinesでパイプラインを実行します。
import google.cloud.aiplatform as aip
pipeline_root_path = 'gs://' + BUCKET_NAME
job = aip.PipelineJob(
display_name="new_data_housing",
template_path="score_new_data_pipeline_sql.json",
pipeline_root=pipeline_root_path,
parameter_values={
'connection_string': DATABASE_URL,
'model_name': 'housing_rf',
'model_table': 'demo_models',
'data_table': 'test_housing',
'prediction_table': 'housing_predictions'
}
)
job.submit()
ジョブが完了すると、バッチ予測で確認することができます。
%%sql
SELECT * FROM housing_predictions;
クリーンアップ
課金を停止するには、以下のリソースをクリーンアップする必要があります。
・Vantage Express VM の削除 - Compute Engine インスタンスのリストに移動し、Vantage Express を搭載したインスタンスを選択し、[Delete] をクリックします。
・設定したストレージバケットを削除します。