0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks Free Editionで始めるデータパイプライン作成:Qiita記事の取得から加工・活用まで④ ベクトルストアでの活用

Last updated at Posted at 2025-12-20

こちらの続きです。

はじめに

初回記事より。

Databricks Intelligence Platformはいわずと知れた(?)強力なデータ+AIプラットフォームです。個人的には(そして本業的にも)、その中でもデータパイプライン構築機能はお気に入りです。

この良さを少しでも伝えられるように、具体的なサンプルを用いたデータパイプライン構築の記事を書くことに今回チャレンジします。

前回はQiitaの記事データを加工整形するパイプラインをSpark宣言型パイプラインを用いて作成しました。
今回はそのデータをベクトルストアから利用する活用編です。

5. Qiita記事をベクトルストアに入れて利用する

Spark宣言型パイプラインで作成したテーブルから、Mosaic AI Vector Searchを使ってベクトルインデックスを作成します。

ベクトルインデックスが何に利用できるか、などは以下のサイトが役に立つかと思います。
(Databricks公式サイトは便利な用語集サイトがいっぱい)

5.1 ベクトルインデックス作成用ノートブックを作る

vectorstoreフォルダを作成し、ベクトルストア作成用ノートブックを用意します。
ノートブック名はvectorstore_creationとしておきます。

まずは必要なモジュールをインストール。

vectorstore_creation 1
%pip install -qq databricks-vectorsearch

%restart_python

後々Jobの中でcatalogやschema等の設定をパラメータで受け取れるようにウィジットを設定します。

vectorstore_creation 2
dbutils.widgets.text("vs_endpoint", "default_vs_endpoint")
vs_endpoint = dbutils.widgets.get("vs_endpoint")

dbutils.widgets.text("catalog", "workspace")
catalog = dbutils.widgets.get("catalog")

dbutils.widgets.text("schema", "")
schema = dbutils.widgets.get("schema")

dbutils.widgets.text("source_table", "")
source_table = dbutils.widgets.get("source_table")

dbutils.widgets.text("index_name", "")
index_name = dbutils.widgets.get("index_name")

print(dbutils.widgets.getAll())

まず、Mosaic AI ベクトルストアのエンドポイントを作成します。
既に存在する場合はスキップされます。

詳しくはこちらから。

vectorstore_creation 3
from databricks.vector_search.client import VectorSearchClient

try:
    client = VectorSearchClient()
    client.create_endpoint_and_wait(
        name=vs_endpoint,
        endpoint_type="STANDARD",
        verbose=True,
    )
# エラーハンドリングは手抜きです。きちんと実装しましょう。
except Exception as e:
    # 既にエンドポイントが存在する場合
    if "already exists" in str(e):
        pass
    else:
        print(e)
        pass

最後に、Spark宣言型パイプランで作成したテーブルを利用して、ベクトルインデックスを作成します。

今回はテーブル内のen_summarized_bodyという記事を要約+英訳したカラムを利用してベクトル化します。
また、ベクトル化に利用する埋め込みモデルはDatabricks標準で提供されているdatabricks-bge-large-enを利用しています。
(この埋め込みモデルは日本語対応していないため、パイプライン中に英訳処理を追加しました)

vectorstore_creation 4

import time

def exists_index(client, index_name):
    indexes = client.list_indexes(vs_endpoint)
    vector_indexes = indexes.get("vector_indexes")
    if not vector_indexes:
        return False

    for index in vector_indexes:        
        if index.get("name") == index_name:
            return True
    return False

def wait_for_deletion(client, index_name):
    for _ in range(10):
        if not exists_index(client, index_name):
            return
        time.sleep(2)
    raise Exception("Timeout waiting for index deletion")

try:
    client = VectorSearchClient()

    # VectorIndexがすでに存在している場合は削除する
    if exists_index(client, f"{catalog}.{schema}.{index_name}"):
        print(f"index {catalog}.{schema}.{index_name} is already existing.")
        print(f"Delete current index: {catalog}.{schema}.{index_name}")
        client.delete_index(
            index_name=f"{catalog}.{schema}.{index_name}",
            endpoint_name=vs_endpoint,
        )
        wait_for_deletion(client, f"{catalog}.{schema}.{index_name}")
        print(f"Deleted index {catalog}.{schema}.{index_name}")

    # Indexの作成
    index = client.create_delta_sync_index_and_wait(
        endpoint_name=vs_endpoint,
        source_table_name=f"{catalog}.{schema}.{source_table}",
        index_name=f"{catalog}.{schema}.{index_name}",
        pipeline_type="TRIGGERED",
        primary_key="id",
        embedding_model_endpoint_name="databricks-bge-large-en",
        embedding_source_column="en_summarized_body",
        columns_to_sync=[
            "id",
            "body_head_8000",
            "created_at",
            "updated_at",
            "title",
            "url",
            "organization_url_name",
            "summarized_body",
            "en_summarized_body",
            "author_id",
            "author_name",
            "tag_names",
            "imported_at",
        ],
    )
except Exception as e:
    # 手抜き。ちゃんと実装すること。
    print(e)
    pass

5.2 ベクトルインデックスを作成用Job定義を作る

また、このノートブックを実行するためのJob定義をアセットバンドルのリソースとして作成します。

Vectorstoreエンドポイントやインデックステーブル名は固定で設定しました。

resources/qiita_vectorstore_setup.job.yml
# ベクトルストア作成用Job
resources:
  jobs:
    qiita_vectorstore_setup_job:
      name: qiita_vectorstore_setup_job

      parameters:
        - name: catalog
          default: ${var.catalog}
        - name: schema
          default: ${var.schema}

      tasks:
        - task_key: vectorstore_setup
          notebook_task:
            notebook_path: ../src/qiita_items_pipeline_etl/vectorstore/vectorstore_creation.ipynb
            base_parameters:
              vs_endpoint: "default_vs_endpoint"
              source_table: "qiita_items"
              index_name: "qiita_items_index"
            source: WORKSPACE

      environments:
        - environment_key: default
          spec:
            environment_version: "4"

5.3 セットアップJobのデプロイ&実行

作成したアセットバンドルをデプロイしましょう。

うまくいけば新しいJobが作成されます。実行しましょう。

asset bundle job

Jobが実行されるとインデックスが作成されます。

image.png

5.4 Sync用処理の追加

今回のベクトルインデックスは、元テーブルとの同期を「トリガー」モードに設定しています。
これは、明示的に同期を指示しない限りインデックスが更新されません。
不便なので、データパイプライン中に同期処理を組み込みましょう。

まず、同期処理を実行するためのノートブックindex_syncを作成します。

まずは必要なパッケージをインストール。

index_sync 1
%pip install -qq databricks-vectorsearch

%restart_python

こちらも後程Jobから呼び出すため、ウィジットを設定します。

index_sync 2
dbutils.widgets.text("catalog", "workspace")
catalog = dbutils.widgets.get("catalog")

dbutils.widgets.text("schema", "")
schema = dbutils.widgets.get("schema")

dbutils.widgets.text("index_name", "")
index_name = dbutils.widgets.get("index_name")

print(dbutils.widgets.getAll())

インデックスの同期処理を実行します。

index_sync 3
from databricks.vector_search.client import VectorSearchClient

client = VectorSearchClient()
index = client.get_index(
    index_name=f"{catalog}.{schema}.{index_name}",
)

index.sync()

パイプラインのJob定義もこのノートブックを最後に呼び出すように更新します。

resources/qiita_items_pipeline.job.yml
# Qiita Itemsの取得・加工パイプライン

resources:
  jobs:
    qiita_items_pipeline_job:
      name: qiita_items_pipeline_job

      trigger:
        periodic:
          interval: 1
          unit: DAYS

      parameters:
        - name: catalog
          default: ${var.catalog}
        - name: schema
          default: ${var.schema}

      tasks:
        - task_key: qiita_items_ingestion
          notebook_task:
            notebook_path: ../src/qiita_items_pipeline_etl/ingestions/qiita_items_source.ipynb
            base_parameters:
              output_schema: ${var.src_schema}
              output_table: ${var.src_items}
              page_from: ${var.qiita_page_from}
              page_to: ${var.qiita_page_to}
              per_page: ${var.qiita_per_page}
              query: "tag:databricks"
            source: WORKSPACE

        - task_key: refresh_pipeline
          depends_on:
            - task_key: qiita_items_ingestion
          pipeline_task:
            pipeline_id: ${resources.pipelines.qiita_items_pipeline_etl.id}
            full_refresh: false
            
        # 以下が追加部分
        - task_key: index_sync
          depends_on:
            - task_key: refresh_pipeline          
          notebook_task:
            notebook_path: ../src/qiita_items_pipeline_etl/vectorstore/index_sync.ipynb
            base_parameters:
              index_name: "qiita_items_index"
            source: WORKSPACE

      environments:
        - environment_key: default
          spec:
            environment_version: "4"

こちらもデプロイして実行しましょう。

5.5 インデックスを利用する

作成したインデックスをPlaygroundから利用してみましょう。
ツールとしてインデックスを利用するように設定します。

image.png

例えば、以下のように問い合わせると、インデックスを使って結果が表示されます。

image.png

参照したデータも表示されますね。

image.png

今回はここまで。次回に続きます

次回はもう一度パイプライン構築に戻り、より分析に適したデータモデルを構築します。
その上で、Metricks Viewの作成やGenieでの活用を予定しています。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?