3
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Google Vertex AIで作成された分析モデルをVantageでスコアリングする方法

Posted at

警告
本記事はTeradata CorporationのサイトGetting Startedに掲載された内容を抄訳したものです。掲載内容の正確性・完全性・信頼性・最新性を保証するものではございません。正確な内容については、原本をご参照下さい。
また、修正が必要な箇所や、ご要望についてはコメントをよろしくお願いします。

このノートブックを.ipynbフォーマットで取得します。

Vertex AIは、データサイエンティストがMLモデルを開発・展開するためのGoogleの単一環境であり、実験から展開、モデルの管理・監視に至るまで一貫して行うことができます。このチュートリアルでは、Vantage Analyticsの機能をVertex AI ML Pipelinesに統合する方法を紹介します。2つのパイプラインを作成します。

⓵ 最初のステップでは Vantage でデータを変換してトレーニング用のファイルをエクスポートし、2 番目のステップでは scikit-learn を使用してモデルをトレーニングし、最後のステップでは Teradata Vantage の BYOM (Bring Your Own Model) 機能を使用してモデルを Vantage にデプロイします。

画像1.png

⓶スコアリング - 2 番目のパイプラインでは、1 番目のパイプラインで作成されたモデルを使用して、Vantage 上のテーブルに保存された新しいデータをスコアリングします。

画像2.png

どちらのパイプラインも非常にシンプルですが、最初のパイプラインは、生産モデルがドリフトしたときに、新しいデータでモデルを再トレーニングするためにトリガーすることができます。2番目のパイプラインは、スコアリングのための新しいデータが利用可能になったときに、定期的に実行することができます。

前提条件

・Google Cloudアカウント - 登録はこちら
・Kaggleアカウント - 登録はこちら

Vantageのセットアップとデータの読み込み

ノートブック環境のセットアップ

aaa
import sys
!{sys.executable} -m pip install --upgrade --force-reinstall ipython-sql
!{sys.executable} -m pip install teradatasqlalchemy teradataml kaggle ipython-sql kfp

Vantageインスタンスのセットアップ

Run Vantage Express on Google Cloudの手順に従って、Vantageのセットアップを行います。VMをインターネットに接続するための手順に従ってください。

GCSバケットの作成

KubeFlowで管理する成果物を格納するために、GCSバケットが必要です。

バケット名を定義します。

aaa
BUCKET_NAME = "<your-bucket-name>"

バケットが存在しない場合は、先にバケットを作成します。

aaa
!gsutil ls -b gs://$BUCKET_NAME || gsutil mb gs://$BUCKET_NAME

Vertex AIにバケットへのアクセス権限を付与します。

GCSコンソールのIAMタブを開き、デフォルトのCompute EngineにStorage Adminロールを割り当てます。デフォルトのCompute Engineアカウントのプリンシパルは以下のような感じです。project-id- compute@developer.gserviceaccount.com.

サンプルデータのダウンロード

ここでは、Kaggleから取得できるBoston Housingのデータセットを使用します。Kaggleのアカウントにログインします。右上のユーザーアイコンをクリックし、「アカウント」を選択します。APIセクションを探し、Create New API Tokenをクリックします。これにより、kaggle.jsonファイルが作成されます。kaggle.jsonを開き、ユーザー名とキーをコピーします。セルに値を代入し、実行します。

aaa
%env KAGGLE_USERNAME=<your-kaggle-username>
%env KAGGLE_KEY=<your-kaggle-key>
aaa
!kaggle datasets download -f housing.csv vikrishnan/boston-house-prices

Vantage にトレーニング データをロードする

Vantage のインスタンスを指す DATABASE_URL 環境変数を設定します。BYOMパッケージがインストールされているmldbデータベースをデフォルトとしていることを確認してください。

aaa
DATABASE_URL='teradatasql://dbc:dbc@34.121.78.209/mldb'
%env DATABASE_URL=$DATABASE_URL
aaa
import pandas
import os

df=pandas.read_fwf('housing.csv', names=['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B', 'LSTAT', 'MEDV'])
df.to_sql('housing', con=DATABASE_URL, index=False)

このチュートリアルでは、学習済みモデルを格納するテーブルと、モデルでスコアリングしたい新しいデータを格納するテーブルが必要です。Vantageインスタンス上でteradatasqlを使用して以下のSQLを実行します。

aaa
%%sql
CREATE SET TABLE demo_models (model_id VARCHAR (30), model BLOB) PRIMARY INDEX (model_id);
CREATE SET TABLE test_housing (ID INTEGER, CRIM FLOAT, ZN FLOAT,INDUS FLOAT,CHAS INTEGER,NOX FLOAT,RM FLOAT, 
    AGE FLOAT,DIS FLOAT, RAD INTEGER,TAX INTEGER,PTRATIO FLOAT,B FLOAT,LSTAT FLOAT) PRIMARY INDEX (CRIM);
INSERT INTO test_housing (ID, CRIM, ZN, INDUS, CHAS, NOX, RM, AGE, DIS, RAD, TAX, PTRATIO, B, LSTAT) 
    VALUES (1,.02,0.0,7.07,0,.46,6.4,78.9,4.9,2,242,17.8,396.9,9.14);

Kubeflowを使ったモデルの学習とデプロイを行う最初のパイプライン

これで、パイプラインのコンポーネントを作成する準備が整いました。Vertex AI Pipelinesは、Kubeflow Pipelines SDKまたはTensorFlow Extendedを使って構築されたパイプラインを実行することができます。今回のscikit-learnを使った簡単な例では、Kubeflow Pipelines SDKを使用することにします。

この例では、以下の3つのコンポーネントを作成します。

リードデータフロムバンテージ
 ■入力:VantageをホストするVMのipadr
 ■出力: トレーニングとテスト用のデータを含むcsvファイル
トレーニングモデル
 ■入力: 学習とテストのためのデータを含むcsvファイル
 ■出力: モデルを含むファイル
 ■出力: モデルの性能を示すメトリックアーティファクト
デプロイメントモデル
 ■入力: モデルを含むファイル

まず、Kubeflow Pipelineのコンポーネントとdslのパッケージをインポートします。

aaa
import kfp.v2.dsl as dsl
from kfp.v2.dsl import (
    component,
    Input,
    Output,
    Dataset,
    Model,
    Metrics,
)

Vantageからデータを読み込むコンポーネントを作成します。

最初のコンポーネントは、Vantageウェアハウスからデータを読み込みます(上記を参照し、Google CloudでVantage Expressを設定していることを確認してください。インターネットからVantageにアクセスできるように、VMへのファイアウォールを開くことも含まれています)。

このコンポーネントは、入力パラメータとして渡された接続文字列を使用して Vantage に接続し、Vantage の mldb.housing テーブルから行を読み取って、データを Output[Dataset] に出力します。出力は、コンポーネント間でデータを受け渡すために使用される一時ファイルです (コンポーネント間のデータ受け渡しに関する詳細は、こちらを参照してください)。

コンポーネントは、sqlalchemyを使用してTeradataと通信します。各コンポーネントはKubernetes上の別のコンテナで実行されるため、すべてのインポート文はコンポーネント内で行う必要があります。今回は、teradatasqlがインストール済みのベースイメージを作成しました。base_image='python'を渡すと、コンポーネントはそのイメージを使用してコンテナを作成します。 packages_to_installパラメータは、コンポーネントが他にどのようなパッケージを必要とするかを定義します。

aaa
@component(base_image='python', packages_to_install=['teradatasqlalchemy'])
def read_data_from_vantage(
    connection_string: str,
    output_file: Output[Dataset]
):
    import sqlalchemy

    file_name = output_file.path
    engine = sqlalchemy.create_engine(connection_string)

    with engine.connect() as con:
        rs = con.execute('SELECT * FROM housing')
        with open(output_file.path, 'w') as output_file:
            output_file.write('CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT,MEDV\n')
            for row in rs:
                output_file.write(','.join([str(i) for i in row]) + '\n')

トレインモデルコンポーネントの作成

次に、学習データを使ってモデルを学習させるためのコンポーネントを作成します。

このコンポーネントへの入力は、前のコンポーネントからのファイルです。出力はjoblib.dumpを使って学習したモデルのファイルと、テストデータのファイルです。

このコンポーネントはscikit-learnとpandasを使用するので、packages_to_install= ['pandas==1.3.5','scikit-learn']を渡す必要があります - これはコンテナの作成時にパッケージをインストールするようにKubeflowに指示するものです。

aaa
@component(base_image='teradata/python-sklearn2pmml', packages_to_install=['pandas==1.3.5','scikit-learn','sklearn-pandas==1.5.0'])
def train_model(
    input_file : Input[Dataset],
    output_model: Output[Model],
    output_metrics: Output[Metrics]
):
    import pandas as pd
    from sklearn.model_selection import train_test_split
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.preprocessing import StandardScaler
    from sklearn import metrics
    from sklearn_pandas import DataFrameMapper
    import joblib
    from sklearn2pmml.pipeline import PMMLPipeline
    from sklearn2pmml import sklearn2pmml

    df = pd.read_csv(input_file.path)

    train, test = train_test_split(df, test_size = .33)
    train = train.apply(pd.to_numeric, errors='ignore')
    test = test.apply(pd.to_numeric, errors='ignore')

    target = 'MEDV'
    features = train.columns.drop(target)


    pipeline = PMMLPipeline([
    ("mapping", DataFrameMapper([
    (['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B', 'LSTAT'], StandardScaler())
    ])),
    ("rfc", RandomForestRegressor(n_estimators = 100, random_state = 0))
    ])

    pipeline.fit(train[features], train[target])
    y_pred = pipeline.predict(test[features])

    metric_accuracy = metrics.mean_squared_error(y_pred,test[target])
    output_metrics.log_metric('accuracy', metric_accuracy)
    output_model.metadata['accuracy'] = metric_accuracy

    joblib.dump(pipeline, output_model.path)

モデルをデプロイするためのコンポーネントを作成

最後のコンポーネントは、モデルをロードし、テストデータ上でそれをテストします。Output[Metrics]は、モデルのパフォーマンスを示すアーティファクトを作成し、Runtime Graphで可視化することができます。

aaa
@component(base_image='teradata/python-sklearn2pmml')
def deploy_model(
    connection_string: str,
    input_model : Input[Model],
):
    import sqlalchemy
    import teradataml as tdml
    import joblib
    from sklearn2pmml.pipeline import PMMLPipeline
    from sklearn2pmml import sklearn2pmml

    engine = sqlalchemy.create_engine(connection_string)
    tdml.create_context(tdsqlengine = engine)

    pipeline = joblib.load(input_model.path)

    sklearn2pmml(pipeline, "test_local.pmml", with_repr = True)

    model_id = 'housing_rf'
    model_file = 'test_local.pmml'
    table_name = 'demo_models'

    tdml.configure.byom_install_location = "mldb"

    try:
        res = tdml.save_byom(model_id = model_id, model_file = model_file, table_name = table_name)
    except Exception as e:
        # if our model exists, delete and rewrite
        if str(e.args).find('TDML_2200') >= 1:
            res = tdml.delete_byom(model_id = model_id, table_name = table_name)
            res = tdml.save_byom(model_id = model_id, model_file = model_file, table_name = table_name)
            pass
        else:
            raise

パイプラインを実行するための関数を作成

では、パイプラインの各コンポーネントを実行するための関数を作成します。

aaa
@dsl.pipeline(
   name='run-vantage-pipeline',
   description='An example pipeline that connects to Vantage.',
)
def run_vantage_pipeline_vertex(
   connection_string: str
):
    data_file = read_data_from_vantage(connection_string).output
    test_model_data = train_model(data_file)
    deploy_model(connection_string,test_model_data.outputs['output_model'])

パイプラインをコンパイルします。パイプラインはpackage_pathとして指定された名前のjsonファイルに保存されます。

aaa
from kfp.v2 import compiler
compiler.Compiler().compile(pipeline_func=run_vantage_pipeline_vertex,
    package_path='train_housing_pipeline.json')

ここでVertex AIクライアントを使用して、パイプラインを実行します。google.cloud.aiplatform パッケージをインポートします。Vertex AIは一時ファイル用にCloud Storageバケットを必要とします。上記のjsonファイルを使って新しいジョブを作成し、パラメータとしてipaddrを渡します。そして、ジョブを投入します。

ジョブが開始されると、Runtime Graphに移動するリンクが表示されます。

aaa
import google.cloud.aiplatform as aip

pipeline_root_path = 'gs://' + BUCKET_NAME

job = aip.PipelineJob(
    display_name="housing_training_deploy",
    template_path="train_housing_pipeline.json",
    pipeline_root=pipeline_root_path,
    parameter_values={
        'connection_string': DATABASE_URL
    }
)

job.submit()

モデルメトリクスの検査

パイプラインの実行が完了すると(グラフ内の各コンポーネントに緑色のチェックマークが付くはずです)。各コンポーネントをクリックすると、実行の詳細や作成されたログを確認することができます。output_metricsアーティファクトをクリックすると、Pipeline run analysisウィンドウのNode Infoにモデルの精度が表示されます。Metricsアーティファクトを使用して渡すことができる他のメトリクスや可視化については、こちらで詳しく説明しています)。

デプロイされたモデルのテスト

先ほどデプロイしたモデルを、新しいデータのスコアリングによってテストしてみましょう。teradataml ドライバを使って、保存されたモデルを取得し、新しいデータでテーブルの行を採点してみましょう。

aaa
import teradataml as tdml
import sqlalchemy
import os

engine = sqlalchemy.create_engine(DATABASE_URL)
eng = tdml.create_context(tdsqlengine = engine)

#indicate the database that BYOM is using
tdml.configure.byom_install_location = "mldb"

tdf_test = tdml.DataFrame('test_housing')

modeldata = tdml.retrieve_byom("housing_rf", table_name="demo_models")

predictions = tdml.PMMLPredict(
        modeldata = modeldata,
        newdata = tdf_test,
        accumulate = ['ID']
        )
predictions.result.to_pandas()

新しいデータをスコアリングするための新しいパイプラインを作成する

このパイプラインは、teradatasqlドライバを使用してSQLクエリを実行し、demo_modelテーブルからモデルを取得し、test_housingテーブルの行にスコアを付けるというコンポーネントを1つだけ持っています。

aaa
@component(base_image='teradata/python-sklearn2pmml', packages_to_install=['pandas==1.3.5','scikit-learn'])
def score_new_data(
    connection_string: str,
    model_name: str,
    model_table: str,
    data_table: str,
    prediction_table: str
):
    import teradataml as tdml
    import sqlalchemy

    engine = sqlalchemy.create_engine(connection_string)

    with engine.connect() as con:
        con.execute(f'CREATE TABLE {prediction_table} AS (SELECT * FROM mldb.PMMLPredict ( ON {data_table} ON (SELECT * FROM {model_ta

run_new_data_scoreパイプラインは、以下のパラメータを受け取ります。

model_name: モデルのID
model_table: モデルを保存しているテーブルの名前
data_table: スコアリングするための新しいデータを格納するテーブルの名前
prediction_table: スコアリングの結果を格納するテーブルの名前

パイプラインが実行されると、ダッシュボードに使用する値を入力するフィールドが表示されます。

aaa
@dsl.pipeline(
   name='new-data-pipeline',
   description='An example of a component that scores new data with a saved model.',
)
def run_new_data_score(
    connection_string: str,
    model_name: str,
    model_table: str,
    data_table: str,
    prediction_table: str
):
    score_new_data(DATABASE_URL,model_name,model_table,data_table,prediction_table)

パイプラインをコンパイルするには、以下のコードを実行してください。パイプラインは score_new_data_pipeline_sql.json ファイルに保存されます。

aaa
compiler.Compiler().compile(pipeline_func=run_new_data_score,
    package_path='score_new_data_pipeline_sql.json')

これからVertex AI Pipelinesでパイプラインを実行します。

aaa
import google.cloud.aiplatform as aip

pipeline_root_path = 'gs://' + BUCKET_NAME

job = aip.PipelineJob(
    display_name="new_data_housing",
    template_path="score_new_data_pipeline_sql.json",
    pipeline_root=pipeline_root_path,
    parameter_values={
        'connection_string': DATABASE_URL,
        'model_name': 'housing_rf',
        'model_table': 'demo_models',
        'data_table': 'test_housing',
        'prediction_table': 'housing_predictions'
    }
)

job.submit()

ジョブが完了すると、バッチ予測で確認することができます。

aaa
%%sql
SELECT * FROM housing_predictions;

クリーンアップ

課金を停止するには、以下のリソースをクリーンアップする必要があります。

・Vantage Express VM の削除 - Compute Engine インスタンスのリストに移動し、Vantage Express を搭載したインスタンスを選択し、[Delete] をクリックします。
・設定したストレージバケットを削除します。

Teradata Vantageへのお問合せ

Teradata Vantage へのお問合せ

3
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?