イオンネクスト株式会社 の フロントエンドグループ でソフトウェアエンジニアをやっている 田村 と申します。
この記事は、AEON Advent Calendar 2024 の9日目です。
この記事で分かること
- 「EventArc + Vertex AI Vector Search」を組み合わせて、類似情報検索を“必要時のみ”起動することで、Google Cloud コストを抑える方法
- 「Vertex AI Vector Search の Index 更新・デプロイ」をイベントドリブンに処理し、非同期的・自動的に Vector Search リソースを制御するテクニック
背景
弊社では、GreenBeans というネット専用スーパーを運営しています。
生鮮食品から日用品まで、毎日の暮らしに必要な商品をポチッと簡単に購入でき、欲しいときに(1時間単位指定で)受け取れるサービスがウリです。
ただ、様々な条件により、どうしても「すみません。在庫切れです。」となってしまうことがあります。
その際に「代わりにこちらの商品はいかがでしょうか。」とオススメする機能はあるのですが、その代替商品は各商品カテゴリ毎の担当者が選定から設定まで地道に対応している状態。
代替商品の選定には商品知識が必要、担当カテゴリ内に適切な代替商品が無いこともある(でも他のカテゴリにはあったりする)、取扱商品は日々変わとる、、、で代替商品の設定・更新がなかなかに進まない。販売機会の損失に繋がりかねないです。
そこで「スマートかつ自動的に代替品を提案する仕組みを整えよう。」ということになりました。
代替品提案は日次や週次で提示できれば良いので、システムは必要なときに動かすだけでOKです。
アーキテクチャのイメージ
大まかな流れは以下のようになります。
- Cloud Scheduler から以下の処理 (CloudRun) を実行
- 商品情報の抽出
- ベクトル表現の生成 (Vertex AI Embeddings API)
- Vector Search の Index 更新をリクエスト (完了は待たない)
- Index更新イベント (EventArc) をトリガに以下を実行
- Vector Search の Index の更新が完了していれば、Index Endpoint へのデプロイをリクエスト (完了は待たない)
- Index Endpoint へデプロイイベントをトリガに以下を実行
- 商品群の類似商品を Index Endpoint に問い合わせ
- 類似商品 を代替品候補としてユーザーへ提示
- Index Endpoint を停止 (Index をアンデプロイ)
各リソースの設定内容
1. Cloud Scheduler
Cloud Run サービスの HTTP エンドポイントを宛先に指定するのみです。
試行期限 (attempt_deadline) は Cloud Run サービスからのレスポンスタイム を考慮して設定してください。
(当然ではあるのですが、レスポンスタイムが attempt_deadline を超えるとリトライが発生します)
resource "google_cloud_scheduler_job" "sample_update_index" {
name = "sample-update-index"
region = "asia-northeast1"
time_zone = "Asia/Tokyo"
schedule = "5 1 * * *"
# Cloud Run サービスのレスポンスタイムを考慮して設定する
attempt_deadline = "300s"
paused = false
retry_config {
retry_count = 1
min_backoff_duration = "30s"
}
http_target {
http_method = "POST"
uri = "${google_cloud_run_v2_service.main.uri}/update-index"
oidc_token {
service_account_email = google_service_account.scheduler.email
}
}
}
resource "google_service_account" "scheduler" {
project = "sample-project"
account_id = "sample-scheduler"
display_name = "sample-scheduler"
}
resource "google_project_iam_member" "scheduler" {
project = "sample-project"
role = "roles/run.invoker"
member = "serviceAccount:${google_service_account.scheduler.email}"
}
2. Cloud Run サービス
Cloud Scheduler, EventArcから実行される Cloud Run サービスです。
“必要時のみ起動" という方針なので、Cloud Run ジョブ のほうが適切ではあるのですが、EventArc からジョブを直接実行出来ない( Workflows を介する必要あり) ので、簡略化のためゼロスケール設定のサービスにしています。
(サービスへのリクエストは日次・週次の処理実行のみなので、概ねコンテナ数ゼロ=課金なし を実現できる)
resource "google_cloud_run_v2_service" "main" {
deletion_protection = false
project_id = "sample-project"
location = "asia-northeast1"
service_name = "main"
# Cloud Scheduler と EventArc からしか実行しないため InternalOnly
# https://cloud.google.com/run/docs/securing/ingress?hl=ja#settings
ingress = "INGRESS_TRAFFIC_INTERNAL_ONLY"
# 最大 900s なので、それを超える場合には Workflows + Cloud Run ジョブ を検討する
timeout = "900s"
scaling = {
# ゼロスケール設定
min_instance_count = 0
max_instance_count = 1
}
traffic = {
type = "TRAFFIC_TARGET_ALLOCATION_TYPE_LATEST"
percent = 100
}
service_account = google_service_account.cloudrun.email
containers = [
{
name = "main"
image = <コンテナイメージのURL:tag>
ports = [{ container_port = 8000 }]
resources = {
# ゼロスケール設定
cpu_idle = true
limits = {
cpu = "2000m"
memory = "2Gi"
}
}
startup_probe = {
failure_threshold = 3
initial_delay_seconds = 5
period_seconds = 10
timeout_seconds = 10
http_get = {
path = "/health"
port = 8000
}
}
liveness_probe = {
failure_threshold = 2
initial_delay_seconds = 0
period_seconds = 30
timeout_seconds = 5
http_get = {
path = "/health"
port = 8000
}
}
}
]
}
resource "google_service_account" "cloudrun" {
project = "sample-project"
account_id = "sample-cloudrun"
display_name = "sample-cloudrun"
}
resource "google_project_iam_member" "cloudrun" {
for_each = toset([
"roles/run.admin",
"roles/storage.objectUser",
"roles/aiplatform.user",
])
project = "sample-project"
role = each.key
member = "serviceAccount:${google_service_account.cloudrun.email}"
}
3. Vector Search
Vector Search の Index は中身は空、Index Endpoint は Index をデプロイしない状態 で作成しておきます。
後述の EventArc の matching 条件で Index, Index Endpoint の ID が必要になるためです。
なお、この状態(中身は空、デプロイしない)であればコストはほぼ発生しません。
# Vector Search
## index
resource "google_vertex_ai_index" "index" {
region = "asia-northeast1"
display_name = "sample-index"
# 日次・週次で全件更新なので BATCH_UPDATE を指定
index_update_method = "BATCH_UPDATE"
metadata {
# ベクトル表現を配置する予定の Cloud Storage URI を指定しておく
# パスが存在しなくてもバケットがあればOK
contents_delta_uri = "gs://path/to/embeddings"
# 日次・週次で全件更新なので上書き
is_complete_overwrite = true
config {
# 利用するベクトル表現の次元数
# 利用する text-multilingual-embedding-002 に合わせる
# https://cloud.google.com/vertex-ai/generative-ai/docs/embeddings/get-text-embeddings?hl=ja#get_text_embeddings_for_a_snippet_of_text
dimensions = 768
# その他はデフォルト
approximate_neighbors_count = 20
shard_size = "SHARD_SIZE_MEDIUM"
distance_measure_type = "DOT_PRODUCT_DISTANCE"
algorithm_config {
tree_ah_config {
leaf_node_embedding_count = 1000
leaf_nodes_to_search_percent = 10
}
}
}
}
lifecycle {
ignore_changes = [
# default valueである 5 以外を指定すると常に変更が発生するため無視する
metadata[0].config[0].algorithm_config[0].tree_ah_config[0].leaf_nodes_to_search_percent
]
}
}
## index endpoint
resource "google_vertex_ai_index_endpoint" "endpoint" {
display_name = "sample-endpoint"
region = "asia-northeast1"
public_endpoint_enabled = true
}
4. EventArc
Index 更新・Index デプロイに関するイベントを条件に、Cloud Run サービスを実行します。
EventArcが処理するイベントデータのフォーマットは CloudEvents に準拠しています。
イベントソースの種類は Cloud Storage
, Pub/Sub
, その他(Cloud Audit Logs) の3種類あり、Vertex AI Vector Search はその他 (Cloud Audit Logs) に該当します。
イベントデータのフォーマットは以下にまとめられており、EventArcのパラメータはこのフォーマットを踏まえて設定する必要があります。
なお、EventArc のイベント受信/送信には Pub/Sub が使われており、EventArc リソースを作成すると、それに対応する Pub/Sub リソースも生成されます。
ただ、生成される Subscription の確認応答期限 が10秒であり、今回の仕組みには短すぎたため、terraform_data + gcloud コマンドを使って確認応答期限を延長しました。
# EventArc
## update index
resource "google_eventarc_trigger" "update_index" {
name = "sample-update-index"
location = "asia-northeast1"
event_data_content_type = "application/json"
service_account = google_service_account.eventarc.email
matching_criteria {
attribute = "type"
value = "google.cloud.audit.log.v1.written"
}
matching_criteria {
attribute = "serviceName"
value = "aiplatform.googleapis.com"
}
matching_criteria {
attribute = "methodName"
value = "google.cloud.aiplatform.v1.IndexService.UpdateIndex"
# これと似たようなイベントとして `google.cloud.aiplatform.ui.IndexService.UpdateIndex` がある
# これは Cloud Console からの操作時に発生するイベントなので要注意
}
matching_criteria {
attribute = "resourceName"
# index更新完了時のイベントに含まれるリソース名にはProject IDではなくNumberが含まれるため、IDからNumberに置換する
value = replace(google_vertex_ai_index.index.id, data.google_project.project.project_id, data.google_project.project.number)
}
destination {
cloud_run_service {
service = google_cloud_run_v2_service.main.name
region = "asia-northeast1"
path = "/event-update-index"
}
}
}
## deploy index endpoint
resource "google_eventarc_trigger" "deploy_index" {
name = "sample-deploy-index"
location = "asia-northeast1"
event_data_content_type = "application/json"
service_account = google_service_account.eventarc.email
# https://cloud.google.com/eventarc/docs/workflows/cloudevents?hl=ja
matching_criteria {
attribute = "type"
value = "google.cloud.audit.log.v1.written"
}
matching_criteria {
attribute = "serviceName"
value = "aiplatform.googleapis.com"
}
matching_criteria {
attribute = "methodName"
value = "google.cloud.aiplatform.v1.IndexEndpointService.DeployIndex"
}
matching_criteria {
attribute = "resourceName"
# index更新完了時のイベントに含まれるリソース名にはProject IDではなくNumberが含まれるため、IDからNumberに置換する
value = replace(google_vertex_ai_index_endpoint.endpoint.id, data.google_project.project.project_id, data.google_project.project.number)
}
destination {
cloud_run_service {
service = google_cloud_run_v2_service.main.name
region = "asia-northeast1"
path = "/event-deploy-index"
}
}
}
## extend ack_deadline_seconds of EventArc backend subscription
resource "terraform_data" "extend_ack_timeout_for_update_index" {
triggers_replace = [
google_eventarc_trigger.update_index.id
]
provisioner "local-exec" {
command = <<-EOT
gcloud pubsub subscriptions update '${google_eventarc_trigger.update_index.transport[0].pubsub[0].subscription}' \
--ack-deadline=600 \
--retain-acked-messages \
--project="sample-project" \
--impersonate-service-account='<適切なサービスアカウントemail>'
EOT
}
}
resource "terraform_data" "extend_ack_timeout_for_deploy_index" {
triggers_replace = [
google_eventarc_trigger.deploy_index.id
]
provisioner "local-exec" {
command = <<-EOT
gcloud pubsub subscriptions update '${google_eventarc_trigger.deploy_index.transport[0].pubsub[0].subscription}' \
--ack-deadline=600 \
--retain-acked-messages \
--project="sample-project" \
--impersonate-service-account='<適切なサービスアカウントemail>'
EOT
}
}
resource "google_service_account" "eventarc" {
project = "sample-project"
account_id = "sample-eventarc"
display_name = "sample-eventarc"
}
resource "google_project_iam_member" "eventarc" {
for_each = toset([
"roles/pubsub.publisher",
"roles/run.invoker",
"roles/eventarc.eventReceiver",
"roles/storage.objectUser",
"roles/aiplatform.user",
])
project = "sample-project"
role = each.key
member = "serviceAccount:${google_service_account.eventarc.email}"
}
コスト削減のポイント
- 処理が終わったら Index Endpoint を停止 (Index をアンデプロイ) する
- Index Endpoint の1ノードあたりの時間単価は以下の通り。今回利用した
e2-standart-16
でも、複数ノードを常時稼働するとまあまあ高い。
https://cloud.google.com/vertex-ai/pricing?hl=ja#vectorsearchリージョン e2-standard-2 e2-standard-16 e2-highmem-16 n2d-standard-32 n1-standard-16 n1-standard-32 us_central1 0.094 0.75 1.012 1.893 1.064 2.128 asia_northeast1 0.12 0.963 1.298 2.428 1.366 2.733 - 今回の要件では、処理後に Index Endpoint を停止することで、起動時間を 10分程度×ノード数 に抑えることが出来ている。常時稼働と比較するとだいぶ低コスト。
- Index Endpoint の1ノードあたりの時間単価は以下の通り。今回利用した
- Index 更新、Index デプロイ を待たない (イベントにドリブン処理する)
- Index 更新、Index デプロイの所要時間は、今回の Index 規模 (商品点数=ベクトルデータ数 4万程度) でも1時間程度要する。
単純に更新・デプロイの完了を同期的に待つのは Cloud Run コストの無駄遣い。(上記 1. のコストに比べると微々たるコストではありますが) - Index 更新、Index デプロイ のイベントは複数回発生(開始時、完了時、その他)し、イベントには処理の状態は含まれないため、イベントだけでは処理の完了を判断できない。
イベントには Operation ID が含まれるため、その ID を元に処理の状態を問い合わせることで判断できる。
- Index 更新、Index デプロイの所要時間は、今回の Index 規模 (商品点数=ベクトルデータ数 4万程度) でも1時間程度要する。
以下、Index更新、Indexデプロイ/アンデプロイ、処理状況問い合わせ 等の実装サンプルです。
「EventArc + Vertex AI Vector Search」の情報は少なかった(特に Operation ID で処理状態問い合わせについて)ので、以下のサンプルはわりと貴重かもしれません。
import logging
from dataclasses import dataclass
from google.api_core import operation as gac_operation
from google.api_core.exceptions import (AlreadyExists, FailedPrecondition,
InternalServerError, NotFound,
TooManyRequests)
from google.cloud.aiplatform.matching_engine import MatchingEngineIndexEndpoint
from google.longrunning import operations_pb2
from tenacity import (after_log, retry, retry_if_exception_type,
stop_after_attempt, wait_exponential_jitter)
from vertexai.language_models import TextEmbeddingInput, TextEmbeddingModel
logger = getLogger(__name__)
class AvailableDeployedIndexNotFound(Exception):
pass
@dataclass
class SimilarProduct:
product_id: str
distance: float
class SimilarProductSearch:
"""類似商品検索"""
def __init__(self, region: str, index_name: str, index_endpoint_name: str) -> None:
self.region = region
self.index_name = index_name
self.index_endpoint_name = index_endpoint_name
self.index_service_client = aiplatform_v1.IndexServiceClient(
client_options={"api_endpoint": f"{self.region}-aiplatform.googleapis.com"}
)
self.index_endpoint_service_client = aiplatform_v1.IndexEndpointServiceClient(
client_options={"api_endpoint": f"{self.region}-aiplatform.googleapis.com"}
)
self.deploy_index_name = "main"
@retry(
wait=wait_exponential_jitter(initial=1, max=10),
stop=stop_after_attempt(5),
after=after_log(logger, logging.WARN),
)
def _check_operation_running(self, operation: gac_operation.Operation) -> None:
if not operation.running():
raise Exception(f"opration is not running. operation: {operation}")
def request_update_index(self, gcs_uri_embeddings_objects_included: str) -> None:
"""index更新要求を送信する"""
req = aiplatform_v1.UpdateIndexRequest(
index=aiplatform_v1.Index(
name=self.index_name,
metadata={"contentsDeltaUri": gcs_uri_embeddings_objects_included},
)
)
res = self.index_service_client.update_index(req)
self._check_operation_running(res)
def is_completed_updating_index(self, operation_id: str) -> bool:
"""index更新が完了しているかどうかを確認する"""
op = self.index_service_client.get_operation(
request=operations_pb2.GetOperationRequest(name=operation_id)
)
if op is None:
raise Exception(f"operation is not found. operation_id: {operation_id}")
return op.done
def request_deploy_index(self) -> bool:
"""indexデプロイ要求を送信する
Returns:
bool: True=デプロイ成功, False=既にデプロイされている等の理由で
"""
req = aiplatform_v1.DeployIndexRequest(
index_endpoint=self.index_endpoint_name,
deployed_index=aiplatform_v1.DeployedIndex(
id=self.deploy_index_name,
display_name=self.deploy_index_name,
index=self.index_name,
dedicated_resources=aiplatform_v1.DedicatedResources(
machine_spec=aiplatform_v1.MachineSpec(
machine_type="e2-standard-16",
),
min_replica_count=1,
max_replica_count=10,
),
),
)
try:
res = self.index_endpoint_service_client.deploy_index(req)
except AlreadyExists:
# 既にデプロイされている場合はエラーを無視
logger.warning(
f"index is already deployed. index: {self.index_name}", exc_info=True
)
return False
self._check_operation_running(res)
return True
def is_completed_deploying_index(self, operation_id: str) -> bool:
"""indexデプロイが完了しているかどうかを確認する"""
op = self.index_endpoint_service_client.get_operation(
request=operations_pb2.GetOperationRequest(name=operation_id)
)
if op is None:
raise Exception(f"operation is not found. operation_id: {operation_id}")
return op.done
def request_undeploy_index(self):
"""indexデプロイ解除要求を送信する"""
try:
# deployed index idはVertex AI内部で動的に採番されるものでり、生成時に指定したIDとは異なるため、
# 取得可能なdeployed indexはすべてundeployする
endpoint = self.index_endpoint_service_client.get_index_endpoint(
request=aiplatform_v1.GetIndexEndpointRequest(
name=self.index_endpoint_name,
)
)
deployed_indexes = endpoint.deployed_indexes
if len(deployed_indexes) == 0:
logger.warning(
f"deployed_indexes must exists at least one in {endpoint.name}. maybe deployment in progress or deployed index have deleted."
)
return
for deployed_index in deployed_indexes:
operation = self.index_endpoint_service_client.undeploy_index(
aiplatform_v1.UndeployIndexRequest(
index_endpoint=self.index_endpoint_name,
deployed_index_id=deployed_index.id,
)
)
operation.result(timeout=300)
logger.info(f"index {deployed_index.id} is undeployed")
except (FailedPrecondition, NotFound):
logger.warning(
f"maybe index has already undeployed from index endpoint: {self.index_endpoint_name}"
)
@retry(
wait=wait_exponential_jitter(initial=1, max=10),
stop=stop_after_attempt(5),
retry=retry_if_exception_type((TooManyRequests, InternalServerError)), # noqa: F821
after=after_log(logger, logging.WARN), # noqa: F821
)
def find_similar_products(
self, product_id_list: list[str], suggest_count: int = 20
) -> dict[str, list[SimilarProduct]]:
"""指定された商品(product_id)の類似商品をベクトル検索から取得する
Returns: dict[str, list[SimilarProduct]] keyはproduct_id_listで指定されたid, valueは類似商品のリスト
"""
endpoint = self.index_endpoint_service_client.get_index_endpoint(
request=aiplatform_v1.GetIndexEndpointRequest(
name=self.index_endpoint_name,
)
)
if len(endpoint.deployed_indexes) == 0:
m = f"deployed_indexes must exists at least one in {endpoint.name}. maybe deployment in progress or deployed index have deleted."
logger.warning(m)
raise AvailableDeployedIndexNotFound(m)
deployed_index = endpoint.deployed_indexes[0]
queries = [
aiplatform_v1.FindNeighborsRequest.Query(
datapoint=aiplatform_v1.IndexDatapoint(datapoint_id=id),
neighbor_count=suggest_count,
)
for id in product_id_list
]
request = aiplatform_v1.FindNeighborsRequest(
index_endpoint=self.index_endpoint_name,
deployed_index_id=deployed_index.id,
queries=queries,
return_full_datapoint=False,
)
endpoint = MatchingEngineIndexEndpoint(
index_endpoint_name=self.index_endpoint_name
)
match_service_client = aiplatform_v1.MatchServiceClient(
client_options={
"api_endpoint": endpoint.gca_resource.public_endpoint_domain_name
}
)
try:
response = match_service_client.find_neighbors(request)
except NotFound:
# 類似商品が見つからない場合は空リストを返す
logger.warning(f"similar products not found: {product_id_list}")
return {id: [] for id in product_id_list}
return {
nn.id: [
SimilarProduct(
product_id=n.datapoint.datapoint_id, distance=n.distance
)
for n in nn.neighbors
# 類似商品群から元商品は除外
if n.datapoint.datapoint_id != nn.id
]
for nn in response.nearest_neighbors
}
余談
類似商品検索の精度向上のため、ベクトル表現の生成において、タスクタイプや生成元の商品情報フォーマット等いくつか工夫をしていますが、それについては別の機会にまとめたいと思います。
まとめ
本記事では、Vertex AI Vector Search と EventArc を組み合わせて「必要時のみ」類似情報検索を稼働する手法を紹介しました。
日次・週次などの定期トリガーからIndex更新~デプロイを非同期かつ自動で行い、最終的に代替品候補を提供した後はIndex Endpointを停止することで、コストを大幅に抑えた運用が可能になります。
是非参考にしてみてください。
参考資料
- Vertex AI Vector Search
- Index
- Index Endpoint
- Operation ID
イオングループで、一緒に働きませんか?
イオングループでは、エンジニアを積極採用中です。少しでもご興味もった方は、キャリア登録やカジュアル面談登録などもしていただけると嬉しいです。
皆さまとお話できるのを楽しみにしています!