LoginSignup
11
9

More than 1 year has passed since last update.

GCPVertexAI(Workbench,Pipelines)で始めるKaggle

Last updated at Posted at 2022-03-13

はじめに

  • GCPのVertexAI(Workbench,Pipelines)を使ってKaggleをやってみました
  • PythonではじめるKaggleスタートブックを参考にしながら、Titanicコンペにsubmitすることを目指してみました
  • GCSにデータを配置してWorkbenchで開発→Pipelinesで実行、をしてみました
  • とりあえず動くコードは書けましたが、「課題・ToDo」の章に記載した内容は今後修正していきたいです
  • コードはこちら

前提

  • Kaggleのアカウントが開設されていること
  • GCPアカウントが開設されていること
  • GCPプロジェクトが作成されていること

ゴールイメージ

スクリーンショット 2022-03-13 20.36.42.png

  • transform
    • KaggleからダウンロードしてGCSにアップロードしておいたinputデータを前処理
  • trainer
    • sklearnでロジスティック回帰モデルを作成
  • predictor
    • 推論実行+Kaggleにsubmit

手順

ノートブックを立ち上げる(Workbench)

  • Vertex AI → Workbench
  • Notebooks API 有効化
  • New Notebook
    • MANAGED NOTEBOOKS
    • 事前にArtifact Registoryに配置したコンテナイメージも指定可能
  • OPEN JUPYTERLAB

inputデータをバケットに配置する(GCS)

  • バケットを作成する
    • Cloud Storage
    • リージョンを合わせること
    • ディレクトリ構成は、kaggleノートブックに合わせて input/titanic にする
  • Kaggleからダウンロードする
$ pip install kaggle
$ vim .kaggle/kaggle.json
$ chmod 600 kaggle.json
$ kaggle competitions download -c titanic
$ unzip titanic.zip
->
# input/titanic/train.csv
# input/titanic/test.csv
# input/titanic/gender_submission.csv
  • バケットへアップロードする
$ gsutil cp -r input gs://${BUCKET_NAME}/

開発・実行用コンテナを用意する(Artifact Registory)

  • Artifact Registry API 有効化
  • Repository作成
    • Docker
  • イメージアップロード
# ユーザアカウント認証
$ gcloud auth login
$ gcloud config set project ${PROJECT_NAME}

# 利用するゾーンに対する docker コンフィグ生成を実施する
$ gcloud auth configure-docker us-central1-docker.pkg.dev

# DockerHubからkaggleコンテナのプル→Artifact Registoryへのプッシュ
# ※コンポーネントごとにコンテナを分ける場合は、それぞれビルドしてプッシュしておく
$ docker pull kaggle/python
$ docker tag kaggle/python:latest us-central1-docker.pkg.dev/${PROJECT_NAME}/kaggle/python:latest
$ docker push us-central1-docker.pkg.dev/${PROJECT_NAME}/kaggle/python:latest

ノートブックで開発→パイプライン実行する(Workbench→Pipelines)

ローカルで動作確認後(@componentなどをコメントアウトして実行)、パイプライン実行する

1. 定数を定義する

  • 中間データやモデルは、TIMESTAMPごとにGCSにディレクトリを作成してそこに格納することでバージョン管理するようにしました
  • イメージも同様に、バージョン管理のためにタグを指定するようにしています(コミットハッシュ値)
  • PIPELINE_ROOTに指定すると、 /gcs/<GCS_BUCKET_NAME> として各コンテナにマウントされます。パイプライン実行ごとのメタデータなども格納されます( BUCKET_NAME/XXXXXX/titanic-20220313120000/predictor_-XXXXXX/executor_output.json など)
# Define constants
from datetime import datetime
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
# -> '20220305052457'

PROJECT_ID = "" 
REGION = "us-central1"
BUCKET_NAME = ""
PIPELINE_ROOT = "gs://{}".format(BUCKET_NAME)
API_ENDPOINT = "{}-aiplatform.googleapis.com".format(REGION)
# -> 'us-central1-aiplatform.googleapis.com'
DISPLAY_NAME = "titanic_" + TIMESTAMP
KAGGLE_COMMIT_MESSAGE = "update at " + TIMESTAMP

RAW_DATA_PATH = "/gcs/" + BUCKET_NAME + "/input/titanic"
TRANSFORMED_DATA_PATH = "/gcs/" + BUCKET_NAME + "/output/titanic/" + TIMESTAMP
TRAINED_MODEL_PATH = "/gcs/" + BUCKET_NAME + "/output/titanic/" + TIMESTAMP
PREDICTED_DATA_PATH = "/gcs/" + BUCKET_NAME + "/output/titanic/" + TIMESTAMP
PIPELINE_SPEC_NAME = "pipeline_titanic_" + TIMESTAMP + ".json"

BASE_IMAGE_TRANSFORM = REGION + "-docker.pkg.dev/" + PROJECT_ID + "/kaggle/transform:XXX"
BASE_IMAGE_TRAINER = REGION + "-docker.pkg.dev/" + PROJECT_ID + "/kaggle/trainer:XXX"
BASE_IMAGE_PREDICTOR = REGION + "-docker.pkg.dev/" + PROJECT_ID + "/kaggle/predictor:XXX"

NOTEBOOK_NAME = "pipeline_titanic.ipynb"
NOTEBOOK_NAME_SAVED = "pipeline_titanic_" + TIMESTAMP + ".ipynb"

2. コンポーネントとパイプラインを定義する

  • 型ヒントをつける必要がある(コンパイルする時にエラーになる)
    • Must be one of str, int, float, a subclass of Artifact, or a NamedTuple collection of these types.
  • 前のコンポーネントの出力を使うかどうかで、DAGの構成は決まる(transform_task.output など)
# Define components and a pipeline
# Set up your Google Cloud project
!gcloud config set project $PROJECT_ID

import google.cloud.aiplatform as aip

from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, ClassificationMetrics, Metrics, component)
from typing import NamedTuple

# Set up your Google Cloud project
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_NAME)

@component(base_image=BASE_IMAGE_TRANSFORM)
def transform(raw_data_path: str, transformed_data_path: str) -> NamedTuple("outputs", [("transformed_data_path", str)]):
    import os
    from io import BytesIO
    import numpy as np
    import pandas as pd
    
    # input
    train = pd.read_csv(raw_data_path + '/train.csv')
    test = pd.read_csv(raw_data_path + '/test.csv')
    gender_submission = pd.read_csv(raw_data_path + '/gender_submission.csv')

    # run
    data = pd.concat([train, test], sort=False)

    data['Sex'].replace(['male', 'female'], [0, 1], inplace=True)

    data['Embarked'].fillna(('S'), inplace=True)
    data['Embarked'] = data['Embarked'].map({'S': 0, 'C': 1, 'Q': 2}).astype(int)

    data['Fare'].fillna(np.mean(data['Fare']), inplace=True)

    age_avg = data['Age'].mean()
    age_std = data['Age'].std()
    data['Age'].fillna(np.random.randint(age_avg - age_std, age_avg + age_std), inplace=True)

    delete_columns = ['Name', 'PassengerId', 'SibSp', 'Parch', 'Ticket', 'Cabin']
    data.drop(delete_columns, axis=1, inplace=True)

    train = data[:len(train)]
    test = data[len(train):]

    y_train = train['Survived']
    X_train = train.drop('Survived', axis=1)
    X_test = test.drop('Survived', axis=1)
    
    # output
    if not os.path.exists(transformed_data_path):
        os.mkdir(transformed_data_path)
    y_train.to_csv(transformed_data_path + "/y_train.csv")
    X_train.to_csv(transformed_data_path + "/X_train.csv")
    X_test.to_csv(transformed_data_path + "/X_test.csv")

    return (transformed_data_path,)

# debug local
transform(RAW_DATA_PATH, TRANSFORMED_DATA_PATH)

@component(base_image=BASE_IMAGE_TRAINER)
def trainer(transformed_data_path: str, trained_model_path: str) -> NamedTuple("outputs", [("trained_model_path", str)]):
    import argparse
    import os
    from io import BytesIO
    import pickle
    import numpy as np
    import pandas as pd
    from sklearn.linear_model import LogisticRegression

    # input
    y_train = pd.read_csv(transformed_data_path + "/y_train.csv", index_col=0)
    X_train = pd.read_csv(transformed_data_path + "/X_train.csv", index_col=0)

    # run
    model = LogisticRegression(penalty='l2', solver='sag', random_state=0)
    model.fit(X_train, y_train)
    
    # output
    if not os.path.exists(trained_model_path):
        os.mkdir(trained_model_path)
    pickle.dump(model, open(trained_model_path + '/model_titanic.sav', 'wb'))

    return (trained_model_path,)

# debug local
trainer(TRANSFORMED_DATA_PATH, TRAINED_MODEL_PATH)

@component(base_image=BASE_IMAGE_PREDICTOR)
def predictor(raw_data_path: str, transformed_data_path: str, trained_model_path: str, predicted_data_path: str, kaggle_commit_message: str) -> NamedTuple("outputs", [("predicted_data_path", str)]):
    import os
    from io import BytesIO
    import pickle
    import numpy as np
    import pandas as pd
    from sklearn.linear_model import LogisticRegression
    from kaggle.api.kaggle_api_extended import KaggleApi

    # input
    gender_submission = pd.read_csv(raw_data_path + '/gender_submission.csv')
    X_test = pd.read_csv(transformed_data_path + "/X_test.csv", index_col=0)
    loaded_model = pickle.load(open(trained_model_path + "/model_titanic.sav", "rb"))
    
    # run
    y_pred = loaded_model.predict(X_test)
    # submit
    gender_submission["Survived"] = list(map(int, y_pred))
    if not os.path.exists(predicted_data_path):
        os.mkdir(predicted_data_path)
    gender_submission.to_csv(predicted_data_path + "/submission.csv", index=False)

    api = KaggleApi()
    api.authenticate()
    api.competition_submit(predicted_data_path + "/submission.csv", message=kaggle_commit_message, competition="titanic")

    return (predicted_data_path,)

# debug local
predictor(RAW_DATA_PATH, TRANSFORMED_DATA_PATH, TRAINED_MODEL_PATH, PREDICTED_DATA_PATH, KAGGLE_COMMIT_MESSAGE)

@dsl.pipeline(
    name="titanic",
    description="pipeline for titanic",
    pipeline_root=PIPELINE_ROOT,
)
def pipeline(
        raw_data_path: str = RAW_DATA_PATH,
        transformed_data_path: str = TRANSFORMED_DATA_PATH,
        trained_model_path: str = TRAINED_MODEL_PATH,
        predicted_data_path: str = PREDICTED_DATA_PATH,
        kaggle_commit_message: str = KAGGLE_COMMIT_MESSAGE
        
    ):

    transform_task = transform(raw_data_path, transformed_data_path)
    # 実行するコンテナのスペックを変更する場合に記述する
    transform_task.set_cpu_limit("4").set_memory_limit("16")
    
    trainer_task = trainer(transform_task.outputs["transformed_data_path"], trained_model_path)

    predictor_task = predictor(raw_data_path, transform_task.outputs["transformed_data_path"], trainer_task.outputs["trained_model_path"], predicted_data_path, kaggle_commit_message)
  • 補足: ベースイメージを指定+必要なパッケージをインストールする場合
@dsl.component(
    base_image='python:3.8', # 関数を実行するベースイメージを指定
    packages_to_install=["pandas>=1.3.1", "scikit-learn>=0.24.2"] # 必要なパッケージをここで指定
)

3. パイプラインをコンパイルする

# Compile the pipeline
from kfp.v2 import compiler  # noqa: F811

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path=PIPELINE_SPEC_NAME
)

4. パイプラインを実行する

# Run the pipeline
job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path=PIPELINE_SPEC_NAME,
    pipeline_root=PIPELINE_ROOT,
)

job.run()

# save notebook
!cp $NOTEBOOK_NAME $NOTEBOOK_NAME_SAVED

課題・ToDo

  • データやモデルのGCSパスをジョブ間で渡しているが、Artifactの利用もできるはず
  • ノートブックにもGCSをマウントする
  • 複数人開発を見据えたルール整備する
  • 実験管理周りを充実させる
  • スクリプト化する
  • WorkBenchインスタンスへVSCodeでリモート接続する
  • GitHub Actionsと連携させてCI/CDパイプラインを組む
  • BigQuery・可視化ツールと連携する
  • Dataprocと連携する

参考

11
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
9