BigQueryに日々追加されるデータを月1でモデル学習に利用していましたが、前処理やラベル付け、学習の一連の手順が手作業でかなりミスが発生していました。運用効率を上げるため、Google CloudのVertex AI Pipelinesでこれらのプロセスを自動化しました。
Vertex AI Pipelinesとは
Vertex AI Pipelinesは、Google Cloud上で機械学習パイプラインを構築・管理できるサービスです。Kubeflow Pipelines SDKやTFX Pipeline DSLで、Kubernetesクラスタの管理を意識することなく、サーバーレス環境で効率的にパイプラインを実行できます。
Google Cloudのエコシステムとの親和性が高く、以下のような利点があります。
- BigQueryやAutoML用のコンポーネントが用意されている
- Cloud Schedulerを利用せずにスケジュール実行が可能
- サーバレスでスケーラブル
実装手順
ファイル構成
パイプラインの構成と実行に必要なファイルは以下のように管理しています。
- sql_queries/
- add_reaction_label.sql
- training_pre_processing.sql
- undersampling.sql
- pipeline_definition.py
- pipeline_notebook.ipynb
-
sql_queries
: 前処理用SQLを格納 -
pipeline_definition.py
: パイプラインの定義を記述 -
pipeline_notebook.ipynb
: パイプラインのコンパイルおよび実行用ノートブック
パイプラインの構成
以下がパイプライン定義です。
from google.cloud import aiplatform
from kfp import dsl
from kfp.dsl import component
import datetime
from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp
from google_cloud_pipeline_components.v1.automl.training_job import AutoMLTabularTrainingJobRunOp
from google_cloud_pipeline_components.v1.dataset import TabularDatasetCreateOp
from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp
aiplatform.init(project=PROJECT_ID, location="asia-northeast1")
RECIPIENTS_LIST = [""] # NOTE: メールアドレスを3つまで登録できる
PROJECT_ID = ""
def load_sql(file_path):
with open(file_path, "r") as file:
return file.read()
@dsl.component(base_image='python:3.12', packages_to_install=['google-cloud-bigquery'])
def create_bigquery_op(dataset_name: str, location: str) -> str:
"""Creates a BigQuery dataset if it does not exist."""
from google.cloud import bigquery
client = bigquery.Client(project=PROJECT_ID)
dataset_id = f"{PROJECT_ID}.{dataset_name}"
dataset = bigquery.Dataset(dataset_id)
dataset.location = location
dataset = client.create_dataset(dataset, exists_ok=True)
print(f"Dataset {dataset_id} created.")
return dataset_id
@dsl.pipeline(name="data-preprocessing-and-training-pipeline")
def my_pipeline():
# NOTE: 実行日を基にデータセット名を生成
today = datetime.date.today().strftime('%Y%m%d')
dest_dataset = f"pre_processed_dataset_{today}"
create_dataset_op = create_bigquery_op(
dataset_name=dest_dataset, location="asia-northeast1")
label_sql = load_sql("sql_queries/add_reaction_label.sql")
preprocess_sql = load_sql(
"sql_queries/training_pre_processing.sql")
undersampling_sql = load_sql("sql_queries/undersampling.sql")
formatted_label_sql = label_sql.format(
dataset=dest_dataset, table="add_reaction_label")
formatted_preprocess_sql = preprocess_sql.format(
dataset=dest_dataset, table="training_preprocessed")
formatted_undersampling_sql = undersampling_sql.format(
dataset=dest_dataset, table="summary_all_processed_undersampling")
notify_email_op = VertexNotificationEmailOp(recipients=RECIPIENTS_LIST)
with dsl.ExitHandler(notify_email_op):
label_sql_op = BigqueryQueryJobOp(
query=formatted_label_sql,
location="asia-northeast1",
project="{{$.pipeline_google_cloud_project_id}}"
).after(create_dataset_op)
preprocess_sql_op = BigqueryQueryJobOp(
query=formatted_preprocess_sql,
location="asia-northeast1",
project="{{$.pipeline_google_cloud_project_id}}"
).after(label_sql_op)
undersampling_sql_op = BigqueryQueryJobOp(
query=formatted_undersampling_sql,
location="asia-northeast1",
project="{{$.pipeline_google_cloud_project_id}}"
).after(preprocess_sql_op)
dataset_create_op = TabularDatasetCreateOp(
display_name=f"tabular_dataset_from_bigquery_{today}",
bq_source=f"bq://{PROJECT_ID}.{dest_dataset}.summary_all_processed_undersampling",
project="{{$.pipeline_google_cloud_project_id}}",
location="asia-northeast1"
).after(undersampling_sql_op)
model_training_op = AutoMLTabularTrainingJobRunOp(
display_name=f"visitor_prediction_model_{today}",
dataset=dataset_create_op.outputs["dataset"],
target_column="reaction_score",
training_fraction_split=0.8,
validation_fraction_split=0.1,
test_fraction_split=0.1,
budget_milli_node_hours=72000,
project="{{$.pipeline_google_cloud_project_id}}",
optimization_prediction_type="classification",
location="asia-northeast1"
).after(dataset_create_op)
データセット名にパイプライン実行日の日付を含める
パイプラインの実行ごとに、処理対象のデータを区別するためにデータセット名に実行日の日付を含めています。create_bigquery_op
コンポーネントを使用して、BigQueryにデータセットを作成する際に実行日の情報を動的に生成し、それをデータセット名に反映します。
today = datetime.date.today().strftime('%Y%m%d')
dest_dataset = f"pre_processed_dataset_{today}"
create_dataset_op = create_bigquery_op(
dataset_name=dest_dataset, location="asia-northeast1")
SQLの読み込みと動的変数の使用
パイプライン内で複数のSQLクエリを実行する際、同じパイプラインを他のデータセットやテーブル名で再利用できるように、SQL内のパラメータを動的に変更する仕組みを導入しました。
load_sql
関数でSQLファイルを読み込んだ後、Pythonの.format
メソッドを用いて{dataset}
や{table}
などのプレースホルダに動的な値を埋め込んでいます。
def load_sql(file_path):
with open(file_path, "r") as file:
return file.read()
undersampling_sql = load_sql("sql_queries/undersampling.sql")
formatted_undersampling_sql = undersampling_sql.format(
dataset=dest_dataset, table="summary_all_processed_undersampling")
以下が例として使用したundersampling.sql
です。
CREATE OR REPLACE TABLE `{dataset}.{table}` AS
WITH class_counts AS (
SELECT reaction_score, COUNT(*) as count
FROM `{dataset}.training_preprocessed`
GROUP BY reaction_score
),
median_count AS (
SELECT APPROX_QUANTILES(count, 2)[OFFSET(1)] as target_count
FROM class_counts
)
SELECT data.*
FROM `{dataset}.training_preprocessed` data
JOIN class_counts
ON data.reaction_score = class_counts.reaction_score
JOIN median_count
ON TRUE
WHERE RAND() < (median_count.target_count / class_counts.count);
Google Cloud Pipeline Components
Google Cloud Pipeline Componentsは、Google Cloudのさまざまなサービスを簡単に利用できるように設計されたカスタムコンポーネントのセットです。パイプライン内で直接使用することで、複雑な処理を簡潔に記述できます。
以下が今回使用したコンポーネントです。
-
BigqueryQueryJobOp
: BigQueryでSQLを実行する -
TabularDatasetCreateOp
: BigQueryのテーブルをVertex AIのデータセットとして登録する -
AutoMLTabularTrainingJobRunOp
: Vertex AIで表形式データのAutoMLトレーニングジョブを実行する -
VertexNotificationEmailOp
: 指定したメールアドレスに通知を送信する
コンポーネント間の依存関係の制御
パイプラインでは、各コンポーネントが依存する前段の処理が完了してから実行されるように.after()
メソッドを使用して実行順序を制御します。
label_sql_op = BigqueryQueryJobOp(
query=formatted_label_sql,
location="asia-northeast1",
project="{{$.pipeline_google_cloud_project_id}}"
).after(create_dataset_op)
パイプライン終了後にメール通知を送信
パイプラインの処理がすべて完了した際に通知を送るために、dsl.ExitHandler
を使用しています。以下のように、メール通知用のVertexNotificationEmailOp
コンポーネントをExitHandler
内で設定することで、パイプラインの終了時に自動的に通知が送信されます。
notify_email_op = VertexNotificationEmailOp(recipients=RECIPIENTS_LIST)
with dsl.ExitHandler(notify_email_op):
label_sql_op = BigqueryQueryJobOp(
query=formatted_label_sql,
location="asia-northeast1",
project="{{$.pipeline_google_cloud_project_id}}"
).after(create_dataset_op)
パイプラインの処理が完了すると以下のようなメールが届きます。
パイプライン実行
Vertex AI Pipelinesを実行するには、パイプラインをコンパイルし、オンデマンドまたはスケジュールで実行します。今回はVertex AI Workbenchを実行環境として使用しています。
オンデマンド実行
オンデマンド実行では、まずパイプラインの定義をYAML形式にコンパイルします。その後、コンパイルされたYAMLを使用してパイプラインを実行します。
以下のコードで、pipeline_definition.py
に記述したパイプライン定義をコンパイルし、compiled_pipeline.yaml
というファイルに保存します。
from google.cloud import aiplatform
from kfp import compiler
import pipeline_definition
aiplatform.init(project=<project-id>, location="asia-northeast1")
compiler.Compiler().compile(
pipeline_func=pipeline_definition.my_pipeline,
package_path="compiled_pipeline.yaml"
)
コンパイルしたcompiled_pipeline.yaml
を使用して、以下のコードでパイプラインを実行します。
aiplatform.PipelineJob(
display_name="data-preprocessing-and-training-pipeline",
template_path="compiled_pipeline.yaml",
parameter_values={},
# enable_caching=False
).submit()
実行が成功すると、以下のようにVertex AIのコンソールに結果が表示され、各ステップの進捗や状態を確認できます。
スケジュール実行
パイプラインを定期的に実行する場合は、PipelineJob.create_schedule
メソッドを使用してスケジュールを作成します。以下は、毎月1日の9:00(JST)に実行されるスケジュールを作成する例です。
pipeline_job = aiplatform.PipelineJob(
display_name="data-preprocessing-and-training-pipeline",
template_path="compiled_pipeline.yaml",
parameter_values={},
# enable_caching=False
)
pipeline_job_schedule = pipeline_job.create_schedule(
display_name="monthly-data-preprocessing-and-training",
cron="TZ=Asia/Tokyo 0 9 1 * *", # JSTで毎月1日の9:00
max_concurrent_run_count=1,
max_run_count=None
)
スケジュールが登録されると、Vertex AIのスケジュールタブで確認でき、以下のような画面が表示されます。
参考