こちらの続きです。
はじめに
初回記事より。
Databricks Intelligence Platformはいわずと知れた(?)強力なデータ+AIプラットフォームです。個人的には(そして本業的にも)、その中でもデータパイプライン構築機能はお気に入りです。
この良さを少しでも伝えられるように、具体的なサンプルを用いたデータパイプライン構築の記事を書くことに今回チャレンジします。
前回はQiitaの記事データを加工整形するパイプラインをSpark宣言型パイプラインを用いて作成しました。
今回はそのデータをベクトルストアから利用する活用編です。
5. Qiita記事をベクトルストアに入れて利用する
Spark宣言型パイプラインで作成したテーブルから、Mosaic AI Vector Searchを使ってベクトルインデックスを作成します。
ベクトルインデックスが何に利用できるか、などは以下のサイトが役に立つかと思います。
(Databricks公式サイトは便利な用語集サイトがいっぱい)
5.1 ベクトルインデックス作成用ノートブックを作る
vectorstoreフォルダを作成し、ベクトルストア作成用ノートブックを用意します。
ノートブック名はvectorstore_creationとしておきます。
まずは必要なモジュールをインストール。
%pip install -qq databricks-vectorsearch
%restart_python
後々Jobの中でcatalogやschema等の設定をパラメータで受け取れるようにウィジットを設定します。
dbutils.widgets.text("vs_endpoint", "default_vs_endpoint")
vs_endpoint = dbutils.widgets.get("vs_endpoint")
dbutils.widgets.text("catalog", "workspace")
catalog = dbutils.widgets.get("catalog")
dbutils.widgets.text("schema", "")
schema = dbutils.widgets.get("schema")
dbutils.widgets.text("source_table", "")
source_table = dbutils.widgets.get("source_table")
dbutils.widgets.text("index_name", "")
index_name = dbutils.widgets.get("index_name")
print(dbutils.widgets.getAll())
まず、Mosaic AI ベクトルストアのエンドポイントを作成します。
既に存在する場合はスキップされます。
詳しくはこちらから。
from databricks.vector_search.client import VectorSearchClient
try:
client = VectorSearchClient()
client.create_endpoint_and_wait(
name=vs_endpoint,
endpoint_type="STANDARD",
verbose=True,
)
# エラーハンドリングは手抜きです。きちんと実装しましょう。
except Exception as e:
# 既にエンドポイントが存在する場合
if "already exists" in str(e):
pass
else:
print(e)
pass
最後に、Spark宣言型パイプランで作成したテーブルを利用して、ベクトルインデックスを作成します。
今回はテーブル内のen_summarized_bodyという記事を要約+英訳したカラムを利用してベクトル化します。
また、ベクトル化に利用する埋め込みモデルはDatabricks標準で提供されているdatabricks-bge-large-enを利用しています。
(この埋め込みモデルは日本語対応していないため、パイプライン中に英訳処理を追加しました)
import time
def exists_index(client, index_name):
indexes = client.list_indexes(vs_endpoint)
vector_indexes = indexes.get("vector_indexes")
if not vector_indexes:
return False
for index in vector_indexes:
if index.get("name") == index_name:
return True
return False
def wait_for_deletion(client, index_name):
for _ in range(10):
if not exists_index(client, index_name):
return
time.sleep(2)
raise Exception("Timeout waiting for index deletion")
try:
client = VectorSearchClient()
# VectorIndexがすでに存在している場合は削除する
if exists_index(client, f"{catalog}.{schema}.{index_name}"):
print(f"index {catalog}.{schema}.{index_name} is already existing.")
print(f"Delete current index: {catalog}.{schema}.{index_name}")
client.delete_index(
index_name=f"{catalog}.{schema}.{index_name}",
endpoint_name=vs_endpoint,
)
wait_for_deletion(client, f"{catalog}.{schema}.{index_name}")
print(f"Deleted index {catalog}.{schema}.{index_name}")
# Indexの作成
index = client.create_delta_sync_index_and_wait(
endpoint_name=vs_endpoint,
source_table_name=f"{catalog}.{schema}.{source_table}",
index_name=f"{catalog}.{schema}.{index_name}",
pipeline_type="TRIGGERED",
primary_key="id",
embedding_model_endpoint_name="databricks-bge-large-en",
embedding_source_column="en_summarized_body",
columns_to_sync=[
"id",
"body_head_8000",
"created_at",
"updated_at",
"title",
"url",
"organization_url_name",
"summarized_body",
"en_summarized_body",
"author_id",
"author_name",
"tag_names",
"imported_at",
],
)
except Exception as e:
# 手抜き。ちゃんと実装すること。
print(e)
pass
5.2 ベクトルインデックスを作成用Job定義を作る
また、このノートブックを実行するためのJob定義をアセットバンドルのリソースとして作成します。
Vectorstoreエンドポイントやインデックステーブル名は固定で設定しました。
# ベクトルストア作成用Job
resources:
jobs:
qiita_vectorstore_setup_job:
name: qiita_vectorstore_setup_job
parameters:
- name: catalog
default: ${var.catalog}
- name: schema
default: ${var.schema}
tasks:
- task_key: vectorstore_setup
notebook_task:
notebook_path: ../src/qiita_items_pipeline_etl/vectorstore/vectorstore_creation.ipynb
base_parameters:
vs_endpoint: "default_vs_endpoint"
source_table: "qiita_items"
index_name: "qiita_items_index"
source: WORKSPACE
environments:
- environment_key: default
spec:
environment_version: "4"
5.3 セットアップJobのデプロイ&実行
作成したアセットバンドルをデプロイしましょう。
うまくいけば新しいJobが作成されます。実行しましょう。
Jobが実行されるとインデックスが作成されます。
5.4 Sync用処理の追加
今回のベクトルインデックスは、元テーブルとの同期を「トリガー」モードに設定しています。
これは、明示的に同期を指示しない限りインデックスが更新されません。
不便なので、データパイプライン中に同期処理を組み込みましょう。
まず、同期処理を実行するためのノートブックindex_syncを作成します。
まずは必要なパッケージをインストール。
%pip install -qq databricks-vectorsearch
%restart_python
こちらも後程Jobから呼び出すため、ウィジットを設定します。
dbutils.widgets.text("catalog", "workspace")
catalog = dbutils.widgets.get("catalog")
dbutils.widgets.text("schema", "")
schema = dbutils.widgets.get("schema")
dbutils.widgets.text("index_name", "")
index_name = dbutils.widgets.get("index_name")
print(dbutils.widgets.getAll())
インデックスの同期処理を実行します。
from databricks.vector_search.client import VectorSearchClient
client = VectorSearchClient()
index = client.get_index(
index_name=f"{catalog}.{schema}.{index_name}",
)
index.sync()
パイプラインのJob定義もこのノートブックを最後に呼び出すように更新します。
# Qiita Itemsの取得・加工パイプライン
resources:
jobs:
qiita_items_pipeline_job:
name: qiita_items_pipeline_job
trigger:
periodic:
interval: 1
unit: DAYS
parameters:
- name: catalog
default: ${var.catalog}
- name: schema
default: ${var.schema}
tasks:
- task_key: qiita_items_ingestion
notebook_task:
notebook_path: ../src/qiita_items_pipeline_etl/ingestions/qiita_items_source.ipynb
base_parameters:
output_schema: ${var.src_schema}
output_table: ${var.src_items}
page_from: ${var.qiita_page_from}
page_to: ${var.qiita_page_to}
per_page: ${var.qiita_per_page}
query: "tag:databricks"
source: WORKSPACE
- task_key: refresh_pipeline
depends_on:
- task_key: qiita_items_ingestion
pipeline_task:
pipeline_id: ${resources.pipelines.qiita_items_pipeline_etl.id}
full_refresh: false
# 以下が追加部分
- task_key: index_sync
depends_on:
- task_key: refresh_pipeline
notebook_task:
notebook_path: ../src/qiita_items_pipeline_etl/vectorstore/index_sync.ipynb
base_parameters:
index_name: "qiita_items_index"
source: WORKSPACE
environments:
- environment_key: default
spec:
environment_version: "4"
こちらもデプロイして実行しましょう。
5.5 インデックスを利用する
作成したインデックスをPlaygroundから利用してみましょう。
ツールとしてインデックスを利用するように設定します。
例えば、以下のように問い合わせると、インデックスを使って結果が表示されます。
参照したデータも表示されますね。
今回はここまで。次回に続きます
次回はもう一度パイプライン構築に戻り、より分析に適したデータモデルを構築します。
その上で、Metricks Viewの作成やGenieでの活用を予定しています。




