0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

DatabricksにおけるマルチモーダルRAG

Last updated at Posted at 2025-05-16

こちらで紹介されているリポジトリのコードを動かします

リポジトリはこちら。

翻訳版はこちらです。

DatabricksにおけるマルチモーダルRAG

このデモでは、Databricksのベクトル検索を使用してテキストで画像検索を行う方法を紹介します。このデモの終わりまでに、COCOデータセットからの画像埋め込みを含むベクトル検索インデックスを作成し、テキストを使用してこれらの画像を検索できるようになります。

このデモで使用するリソースは以下の通りです:

  1. COCO 2017 画像データセット
  2. Nomic-embed-text-v1.5 および Nomic-embed-vision-v1.5 埋め込みモデル
  3. Databricks ベクトル検索

ステップ1から3を既に実行しましたか? 依存関係をインストールする必要がありますが、ステップ4にスキップして問題なく使用できます!

概要

このデモでは、Databricksのベクトル検索を使用してテキストで画像検索を行う方法を紹介します。このデモの終わりまでに、COCOデータセットからの画像埋め込みを含むベクトル検索インデックスを作成し、テキストを使用してこれらの画像を検索できるようになります。

このデモで使用するリソースは以下の通りです:

  1. COCO 2017 画像データセット
  2. Nomic-embed-text-v1.5 および Nomic-embed-vision-v1.5 埋め込みモデル
  3. Databricks ベクトル検索

ステップ1から3を既に実行しましたか? 依存関係をインストールする必要がありますが、ステップ4にスキップして問題なく使用できます!

ステップ0: 依存関係のインストール

コンピュートタイプに応じて適切なセルを実行してください。最後のGradioを動かす場合には、サーバレスではなくDBR MLランタイムのクラスターを使いましょう。

  • DBR ML 16.x+: 下のセルを実行
  • サーバーレスコンピュート: 2つ下のセルを実行

DBR MLランタイム

%pip install --upgrade databricks-vectorsearch transformers pillow matplotlib gradio
dbutils.library.restartPython()

サーバレス

%pip install --upgrade databricks-vectorsearch torch transformers pillow matplotlib einops
dbutils.library.restartPython()
import os
import requests
import zipfile
import time
import torch
import torch.nn.functional as F
import numpy as np
import pandas as pd

from tqdm import tqdm
from PIL import Image
from databricks.vector_search.client import VectorSearchClient
from transformers import AutoModel, AutoProcessor, AutoTokenizer, AutoImageProcessor
from pyspark.sql.types import ArrayType, FloatType, StringType, StructType, StructField
from pyspark.sql.functions import input_file_name, regexp_extract, regexp_replace, col

これらを任意の場所または上記のウィジェットに変更してください

dbutils.widgets.text("catalog_name", "austin_choi_demo_catalog", "Data UC Catalog")
dbutils.widgets.text("schema_name", "demo_data", "Data UC Schema")
dbutils.widgets.text("table_name", "image_data_table", "Data UC Table")
dbutils.widgets.text("volume_name", "image_data", "Data Volume Table")
dbutils.widgets.text("embedding_table_name", "image_data_embedding", "Data Embedding Table")
dbutils.widgets.text("index_name", "image_data_index", "Data Index Table")

以下のように設定しました。

Screenshot 2025-05-16 at 11.50.32.png

このデモで使用するカタログ/スキーマ/テーブルをウィジェットまたは以下で変更してください

catalog_name = dbutils.widgets.get("catalog_name")
schema_name = dbutils.widgets.get("schema_name")
image_table_name = dbutils.widgets.get("table_name")
embedding_table_name = dbutils.widgets.get("embedding_table_name")
index_name = dbutils.widgets.get("index_name")
volume_name = dbutils.widgets.get("volume_name")
vector_search_endpoint_name = "one-env-shared-endpoint-1" # Make sure you have one. Go to Compute to create one if you do not have one
spark.sql(
f"""
    CREATE SCHEMA IF NOT EXISTS {catalog_name}.{schema_name}
"""
spark.sql(
f"""
    CREATE VOLUME IF NOT EXISTS {catalog_name}.{schema_name}.{volume_name}
"""
)

ステップ1: 画像をダウンロードする

COCOデータセットはよくラベル付けされているため、何をダウンロードするかは主にあなた次第です。このデモの目的のために、ラベルや他のメタデータがない画像がテキストを使用してどのように検索されるかを強調したかったので、ラベルのない画像をいくつかダウンロードします。

検証セットには5Kの画像が含まれており、これはデータセットが持つ数十万の画像の小さなサブセットです。

dest_dir = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}"

def download_file(url, destination):
    response = requests.get(url, stream=True)
    total_size = int(response.headers.get('content-length', 0))
    block_size = 1024
    
    with open(destination, 'wb') as f, tqdm(
            total=total_size, unit='B', unit_scale=True) as pbar:
        for data in response.iter_content(block_size):
            f.write(data)
            pbar.update(len(data))

# 検証画像をダウンロード
val_url = "http://images.cocodataset.org/zips/val2017.zip"
val_zip = os.path.join(dest_dir, "val2017.zip")
download_file(val_url, val_zip)

# ファイルを解凍
with zipfile.ZipFile(val_zip, 'r') as zip_ref:
    zip_ref.extractall(dest_dir)

# zipファイルを削除
os.remove(val_zip)

print(f"COCO 2017 検証データセットが {dest_dir} にダウンロードされました")
100%|██████████| 816M/816M [00:51<00:00, 15.8MB/s] 
COCO 2017 検証データセットが /Volumes/takaakiyayoi_catalog/multi_modal/image_data にダウンロードされました

画像がボリュームに保存されました。

Screenshot 2025-05-16 at 11.52.11.png

次に、ボリュームからファイルを取得し、クリーンアップを行い、それらをDeltaテーブルに保存します

dest_dir = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}/val2017"
image_df = spark.read.format("binaryFile").option("pathGlobFilter", "*.jpg").load(dest_dir)
image_df = image_df.withColumn('path', regexp_replace(col('path'), '^dbfs:', ''))
image_df.write.mode('overwrite').saveAsTable(f"{catalog_name}.{schema_name}.{image_table_name}")

image_df.show(10)
+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|/Volumes/takaakiy...|2025-05-16 02:10:38|696529|[FF D8 FF E0 00 1...|
|/Volumes/takaakiy...|2025-05-16 02:16:58|514818|[FF D8 FF E0 00 1...|
|/Volumes/takaakiy...|2025-05-16 02:10:59|483490|[FF D8 FF E0 00 1...|
|/Volumes/takaakiy...|2025-05-16 02:13:41|428992|[FF D8 FF E0 00 1...|
|/Volumes/takaakiy...|2025-05-16 02:14:01|423483|[FF D8 FF E0 00 1...|
|/Volumes/takaakiy...|2025-05-16 02:16:54|421559|[FF D8 FF E0 00 1...|
|/Volumes/takaakiy...|2025-05-16 02:11:09|417109|[FF D8 FF E0 00 1...|
|/Volumes/takaakiy...|2025-05-16 02:14:45|410304|[FF D8 FF E0 00 1...|
|/Volumes/takaakiy...|2025-05-16 02:16:28|404790|[FF D8 FF E0 00 1...|
|/Volumes/takaakiy...|2025-05-16 02:17:23|403707|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 10 rows

画像データがテーブルとして保存されました。

Screenshot 2025-05-16 at 11.53.14.png

#ステップ2: モデルをダウンロードする

Huggingfaceのtransformerライブラリを使用して、nomicエンベディングモデルを迅速にダウンロードします。彼らのビジョンモデルとテキストモデルはhuggingface上で別々のリポジトリですが、同じ埋め込み空間を共有しているため、マルチモーダルRAG検索に使用できます。ただし、画像とテキストのエンベディングを生成するには正しいモデルを使用する必要があります。

GPUを使っているかどうかをチェックし、使える場合には使う

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
Using device: cpu

トランスフォーマーライブラリを使用して、Huggingfaceからモデルを迅速にダウンロードします

processor = AutoImageProcessor.from_pretrained("nomic-ai/nomic-embed-vision-v1.5")
vision_model = AutoModel.from_pretrained("nomic-ai/nomic-embed-vision-v1.5", trust_remote_code=True)

この関数は、モデルカードに記載されているコードに基づいてエンベディングを生成するのに役立ちます

コードはこちら: https://huggingface.co/nomic-ai/nomic-embed-vision-v1.5

def generate_image_embedding(image_path):
    try:
        image = Image.open(image_path)
        inputs = processor(images=image, return_tensors="pt")
        with torch.no_grad():
            outputs = vision_model(**inputs)
        embedding = outputs.last_hidden_state
        normalized = F.normalize(embedding[:, 0], p=2, dim=1)
        return normalized
    except Exception as e:
        print(f"Error processing {image_path}: {e}")
        return None

ファイルパスを取得してエンベディングを生成する

# このデモでは5000件すべてを実行する必要はありません。Cocoには数万枚の画像がありますが、すべてを実行しようとするとこのデモは非常に時間がかかります
image_df = spark.table(f"{catalog_name}.{schema_name}.{image_table_name}").limit(500) 

# 処理するパスを収集します - データセットが大きい場合は、より効率的なアプローチを検討してください
image_paths = image_df.select("path").collect()
path_list = [row["path"] for row in image_paths]
image_data = []

# 本番環境のヒント: エンベディングモデルを提供し、バッチ推論にはAIクエリを使用してください。画像が数千枚ある場合、これは効率的ではありません
for i, image_path in enumerate(path_list):
    image_id = i
    embedding = generate_image_embedding(image_path)
    if embedding is not None:
        flatten_embedding = embedding.flatten()
        image_data.append({
            "image_id": image_id,
            "filepath": image_path,
            "embedding": flatten_embedding,
        })

print(f"{len(image_data)}枚の画像のエンベディングを生成しました")
500枚の画像のエンベディングを生成しました

生成されたエンベディングをDeltaテーブルに保存し、ベクトル検索のために変更データフィードを有効にする

import pandas as pd

# 画像データを含むDataFrameを作成
image_df_new = pd.DataFrame([{
    "image_id": item["image_id"],
    "filepath": item["filepath"],
    "embedding": item["embedding"].tolist()
} for item in image_data])

# pandas DataFrameをSpark DataFrameに変換
spark_df = spark.createDataFrame(image_df_new)

# DataFrameをDeltaテーブルとして保存
delta_table_path = f"{catalog_name}.{schema_name}.{embedding_table_name}"
spark_df.write.format("delta").mode("overwrite").saveAsTable(delta_table_path)
%sql
ALTER TABLE identifier(CONCAT(:catalog_name||'.'||:schema_name||'.'||:embedding_table_name)) SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

Screenshot 2025-05-16 at 11.55.37.png

ステップ3: ベクトル検索インデックスの設定

画像のエンベディングが揃ったので、これらをベクトル検索インデックスに入れてベクトル検索を行います。以下でプログラム的に作成するか、UIを使用してこれを行うことができます。

UIでは、上記で作成したエンベディングを含むデルタテーブルに移動し、「ベクトル検索インデックスを作成」をクリックします。

このステップを完了したら、いつでも戻ってきて、ステップ4に進み、ベクトル検索エンドポイントをデモすることができます。

# ベクター検索インデックスを作成
vs_client = VectorSearchClient()

# エンドポイントの設定を定義
endpoint_name = vector_search_endpoint_name
delta_table_path = f"{catalog_name}.{schema_name}.{embedding_table_name}"
delta_index_name = f"{catalog_name}.{schema_name}.{index_name}"

create_delta_sync_index_and_waitを使ってエンドポイントの準備ができるまで待っています。

# ベクター検索インデックスを作成

# vs_client.create_delta_sync_index(
#     index_name=delta_index_name,
#     endpoint_name=endpoint_name,
#     pipeline_type="TRIGGERED",
#     primary_key="image_id",
#     embedding_vector_column="embedding",
#     source_table_name=delta_table_path,
#     embedding_dimension=len(image_data[0]["embedding"])
# )

vs_client.create_delta_sync_index_and_wait(
    index_name=delta_index_name,
    endpoint_name=endpoint_name,
    pipeline_type="TRIGGERED",
    primary_key="image_id",
    embedding_vector_column="embedding",
    source_table_name=delta_table_path,
    embedding_dimension=len(image_data[0]["embedding"])
)

print(f"ベクター検索インデックス '{index_name}' がエンドポイント '{endpoint_name}' に作成されました")
ベクター検索インデックス 'image_data_index' がエンドポイント 'one-env-shared-endpoint-1' に作成されました

Screenshot 2025-05-16 at 11.56.52.png

ステップ4: テキストエンベディングモデルの設定

次に、テキストクエリをエンベディングに変換するために、テキストエンベディングモデルをダウンロードする必要があります。テキストエンベディングモデルをダウンロードするために、同様のプロセスに従います。次に、ベクトル検索インデックスをクエリするためのヘルパー関数を設定します。

# ベクター検索インデックスを作成
vs_client = VectorSearchClient()

# エンドポイントの設定を定義
endpoint_name = vector_search_endpoint_name
delta_table_path = f"{catalog_name}.{schema_name}.{embedding_table_name}"
delta_index_name = f"{catalog_name}.{schema_name}.{index_name}"
from transformers import AutoModel, AutoTokenizer

# テキストモデルとトークナイザーをロード
text_model = AutoModel.from_pretrained('nomic-ai/nomic-embed-text-v1.5', trust_remote_code=True)
text_model.eval()
text_tokenizer = AutoTokenizer.from_pretrained("nomic-ai/nomic-embed-text-v1.5")

エンベディングを生成する関数。このコードはモデルカードにもあります

Huggingfaceモデルカード: https://huggingface.co/nomic-ai/nomic-embed-text-v1.5

def mean_pooling(model_output, attention_mask):
    token_embeddings = model_output[0]
    input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
    return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)

def generate_text_embedding(query_text):
    encoded_input = text_tokenizer(query_text, padding=True, truncation=True, return_tensors='pt')

    with torch.no_grad():
      model_output = text_model(**encoded_input)
    text_embeddings = mean_pooling(model_output, encoded_input['attention_mask'])
    text_embeddings = F.layer_norm(text_embeddings, normalized_shape=(text_embeddings.shape[1],))
    text_embeddings = F.normalize(text_embeddings, p=2, dim=1)
    text_flatten_embedding = text_embeddings.flatten()
    return text_flatten_embedding.tolist()
  
def search_images_by_text(query_text, top_k=5):
    # クエリテキストのエンベディングを生成
    query_embedding = generate_text_embedding(query_text)

    index = vs_client.get_index(endpoint_name=vector_search_endpoint_name, index_name=delta_index_name)

    # ハイブリッド検索を使用する場合は、以下の行のコメントを解除
    # results = index.similarity_search(num_results=3, columns=["image_id", "filepath"], query_text=query_text, query_vector=query_embedding, query_type="hybrid")
    results = index.similarity_search(num_results=5, columns=["image_id", "filepath"], query_vector=query_embedding)
    
    return results

ステップ5: 試してみよう!

画像をクエリする準備ができました!

一つ注意点:Nomicモデルはクエリの前にタスク指示プレフィックスを期待します。いくつかの例としては、search_document: <your text here>clustering: <your text here>があります。正確な検索結果を得るためには、このプレフィックスを追加する必要があります。画像検索を行うためには、search_query: <your text here>を提供する必要があります。

以下のクエリーは「人々と森っぽいなにか」です。

question = 'search_query: something foresty with people'
search_results = search_images_by_text(question)

画像を構築するためにmatplotlibのpltを使用します

import matplotlib.pyplot as plt

# ベクトル検索結果からファイルパスを取得
file_paths = [path[1] for path in search_results['result']['data_array']]

# いくつかのサンプル画像を表示
fig, axes = plt.subplots(1, len(file_paths), figsize=(15, 5))
for i, path in enumerate(file_paths):
    img = Image.open(path)
    axes[i].imshow(img)
    axes[i].set_title(f"Image {i+1}")
    axes[i].axis('off')
plt.tight_layout()
plt.show()

download.png

悪くない!

異なるテキストクエリを試して、どの画像が引き出されるかを確認してみてください!また、ベクトル検索エンドポイントでnum_resultを調整することで、引き出される画像の数を調整することもできます。

Gradioとの連携

正直、この使い方は知りませんでした。こちらはサーバレスコンピュートでは動かないので、DBR MLのクラスターで試してください。

import gradio as gr
from PIL import Image
import matplotlib.pyplot as plt2
from io import BytesIO

# この関数を追加して、ノートブックと互換性のあるスコープを作成します
def search_and_display_images(question):
    """
    ノートブックの正確なパターンを使用して画像検索を実行します
    """
    # 元のコードの正確なパターンを使用します
    print("実際の関数を実行中")
    search_results2 = search_images_by_text(question)
    
    # ベクトル検索結果からファイルパスを取得
    file_paths = [path[1] for path in search_results2['result']['data_array']]
    
    # matplotlibを使用して画像を含む図を作成します(ノートブックと同様)
    fig, axes = plt2.subplots(1, len(file_paths), figsize=(15, 5))
    
    # 画像が1つだけの場合の処理
    if len(file_paths) == 1:
        img = Image.open(file_paths[0])
        axes.imshow(img)
        axes.set_title(f"Image 1")
        axes.axis('off')
    else:
        # 複数の画像
        for i, path in enumerate(file_paths):
            img = Image.open(path)
            axes[i].imshow(img)
            axes[i].set_title(f"Image {i+1}")
            axes[i].axis('off')
    
    plt2.tight_layout()
    
    # 図をバイトバッファに保存
    buf = BytesIO()
    plt2.savefig(buf, format='png')
    buf.seek(0)
    
    # Gradio用にPIL画像に変換
    plot_img = Image.open(buf)
    
    # メモリを解放するために図を閉じる
    plt2.close(fig)
    
    return plot_img

# シンプルなGradioインターフェース
with gr.Blocks() as demo:
    gr.Markdown("# 画像検索インターフェース")
    
    with gr.Row():
        # 入力テキストボックス
        query_input = gr.Textbox(
            label="検索クエリ",
            placeholder="検索クエリを入力してください(例: search_query: people having fun outside)",
            value="search_query: people having fun outside"  # ノートブックと同様にデフォルト値を設定
        )
    
    with gr.Row():
        # 検索をトリガーするボタン
        search_button = gr.Button("画像を検索")
    
    with gr.Row():
        # 出力画像(すべての画像を含むmatplotlibの図を表示)
        output_image = gr.Image(label="検索結果", type="pil")
    
    # 例のクエリ
    gr.Examples(
        examples=[
            "search_query: people having fun outside", 
            "search_query: dogs in the park", 
            "search_query: sunset beach"
        ],
        inputs=query_input
    )
    
    # ボタンを検索関数に接続
    search_button.click(
        fn=search_and_display_images,
        inputs=[query_input],
        outputs=[output_image]
    )
    
    # Enterキーを押して検索も可能にする
    query_input.submit(
        fn=search_and_display_images,
        inputs=[query_input],
        outputs=[output_image]
    )

demo.launch(share=True)

ノートブック上にGradioのGUIが表示されます。

Screenshot 2025-05-16 at 12.00.32.png
Screenshot 2025-05-16 at 12.01.02.png
Screenshot 2025-05-16 at 12.02.50.png

日本語でも動かないことはないですが、英語ほどの精度ではない感じ。日本語のマルチモーダルエンベディングモデルでも試してみたいところ。

Screenshot 2025-05-16 at 12.03.35.png

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?