3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

大規模データセットのベクトル化と検索をOracle Database 23aiで試してみた

Last updated at Posted at 2024-06-07

はじめに

Oracle Database 23aiが利用可能になりました。新機能としてVector型のデータを持てるようになり、ベクトル型を使ったベクトル検索が可能になっています。
そこで、既存のデータに対してベクトル検索を行うには、データをベクトル化してDBに保存する必要があります。しかし、社内の既存データは膨大で、一度に大量のデータをDB内でベクトル化しようとするとある程度の時間がかかってしまいますし、本番環境のDBのリソースを使いベクトル化を行えないケースもあります。
そこで、今回は数十万行の大規模データを別の仮想マシン上でベクトル化して、そのベクトルデータをOracle Database 23aiに保存する方法を考えてみます。

Oracle Database 23aiとは

Oracle Database 23aiではAIに重点を置いており、AIを使って生成されたベクトルデータを保存して効率的にオブジェクトの類似性を検索できるAI Vector Searchという機能が追加されています。ドキュメント、画像、ビデオ、サウンドなど様々なオブジェクトをベクトル化することで、それらの間の類似度を検索できるようになります。Oracle Database 23aiでは、この類似度検索とビジネスデータの検索を簡単に組み合わせることができます。
Oracle Database 23aiでVector Searchを試してみたい方はOCI Tutorialをご確認ください。

OCI Generative AIとは

OCI Generative AIはOracle Cloud Infrastructureの事前学習済みのフルマネージドな生成AIのサービスです。詳しくはこちらの記事をご覧ください。

OCI Generative AIには4つの機能があります。今回はテキスト埋め込みのモデルcohere.embed-multilingual-v3.0を使います。

機能名 モデル
チャット・モデル cohere.command-r-16k, meta.llama-3-70b-instruct
テキスト生成 cohere.command, cohere.command-light, meta.llama-2-70b-chat
テキスト要約 cohere.command
テキスト埋め込み cohere.embed-english-v3.0, cohere.embed-multilingual-v3.0,
cohere.embed-english-light-v3.0, cohere.embed-multilingual-light-v3.0

アーキテクチャ

今回は以下のようなアーキテクチャを考えます。Base DatabaseのSource Tableからデータをベクトル化し、Target Tableに保存します。

image.png

前提

  • Oracle Cloud Infrastructureの環境
    • ネットワークリソースの構築
    • Virtual Machineの構築
    • Oracle Base Database 23aiの構築
    • Chicago or Frankfurt Regionのサブスクライブ
  • Virtual Machineの環境構築
    • Python環境
    • oci-sdkが利用できる環境

手順

  1. 事前準備
  2. Source Table作成
  3. Source データのInsert
  4. Target Tableの作成
  5. OCI Generative AIを使ってEmbedしたデータの保存
  6. Vector Search
  7. Table削除

0. 事前準備

OCIの環境構築

上記のアーキテクチャを構築します。

Oracle Database 23aiの構築

今回はデータソースと保存先としてOracle Database 23aiを使います。
Oracle Database 23aiの構築方法は、OCI Tutorialの「Oracle Database編 - Base Database Service (BaseDB) を使ってみよう」か、「Oracle Database - Oracle AI Vector Searchを使ってみよう」を参考にしてください。

Virtual Machineの構築

今回はPythonの実行環境としてOCI上のVirtual Machineを使用します。
こちらのOCI Tutorial「その3 - インスタンスを作成する」を参考にVirtual Machineを作成してください。また、追加でPythonをインストールして実行環境を作成してください。Python 3.11.0rc1で動作確認済みです。

$ python --version
Python 3.11.0rc1

GitリポジトリのClone

今回使うソースコードはこのGitHubに公開しています。

ご自身の環境にGitリポジトリをCloneします。

$ git clone https://github.com/sh-sho/embed_insert_100k_data_kit.git

この記事ではsrcディレクトリで作業を行います。srcに移動します。

$ cd embed_insert_100k_data_kit/src/
$ tree -a
.
├── .env.example
├── 01.create_source_table.py
├── 02.insert_data.py
├── 03.create_target_vector_table.py
├── 04.embed_insert_texts.py
├── 05.execute_vector_search.py
├── 06.drop_table.py
├── requirements.txt
├── table_detail.py
└── utils.py

次にPythonのライブラリをインストールします。requirements.txtに必要なライブラリが記載されているので以下のコマンドでインストールをします。

$ pip install -r requirements.txt

Vector Searchを行うためpython-oracledbのバージョンは2.2.1以上のものを使用してください。

環境変数の設定

次に環境変数を設定します。
.env.exampleファイルをコピーして.envファイルを作成します。

$ cp .env.example .env

.envの内容をご自身の環境に合わせて書き換えます。

UN=username
PW=password
DSN=dsn
OCI_CONFIG_FILE=~/.oci/config
OCI_COMPARTMENT_ID=ocid1.compartment.oc1..aaaaaaaaxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
CSV_DIRECTORY_PATH=/tmp_data
CONFIG_PROFILE=CHICAGO

CONFIG_PROFILEはOCI Generative AIをCallする際に使用するのでChicago (or Frankfurt) RegionのProfile名を指定してください。

以上が事前準備です。

1. Source Table作成

Source Tableを作成します。実行するPythonのコードは01.create_source_table.pyです。Table名はSOURCE_TABLEで、以下のような構成です。

Column名 Data Type Size
PRODUCT_NAME VARCHAR2 100 BYTE
PRODUCT_CODE VARCHAR2 100 BYTE
PRODUCT_SEARCH VARCHAR2 100 BYTE

具体的な処理の内容としては、oracledbでコネクションを張り、SQL文でTableのcreateを行っています。

with oracledb.connect(user=UN, password=PW, dsn=DSN) as connection:
   with connection.cursor() as cursor:
       cursor.execute(f"""
           CREATE TABLE {td.source_table}
           (	
               {td.name_column} VARCHAR2(100 BYTE),
               {td.code_column} VARCHAR2(100 BYTE),
               {td.source_column} VARCHAR2(100 BYTE)
           )
       """)
   connection.commit()

01.create_source_table.pyのコードを実行すると、以下のような出力がされます。

$ python 01.create_source_table.py
Start Creating Table
End Creating Table

2. Source データのInsert

SourceとなるデータをTableにInsertします。実行するPythonのコードは02.insert_data.pyです。

今回はサンプルとして以下のようなデータを10万行Insertします。

Product 1, Product Code 1 , Search Keyword 1
Product 2, Product Code 2 , Search Keyword 2
...
Product 100000, Product Code 100000 , Search Keyword 100000

具体的な処理は、まずSource Tableの内容をDeleteし、sample_dataに10万行の配列データとして作成し、Source TableにInsertしています。詳細は02.insert_data.pyのコードをご確認ください。

with oracledb.connect(user=UN, password=PW, dsn=DSN) as connection:
   with connection.cursor() as cursor:
       cursor.execute(f"""
           DELETE from {td.source_table}
       """)
       sample_data = [("Product " + str(i), "Product Code " + str(i) ,"Search Keyword " + str(i)) for i in range(1, 10001)]
       insert_sql = f"""
           INSERT INTO {td.source_table} ({td.name_column}, {td.code_column}, {td.source_column})
           VALUES (:1, :2, :3)
       """
       cursor.setinputsizes(None, oracledb.STRING)
       cursor.executemany(insert_sql, sample_data)
   connection.commit()

02.insert_data.pyを実行すると以下の内容が出力されます。

$ python 02.insert_data.py 
Start Insert Data
End Insert Data

テーブルを確認すると100,000データが保存されています。

SQL> SELECT COUNT(*) FROM SOURCE_TABLE;

  COUNT(*)
----------
    100000

3. Target Tableの作成

Target Tableを作成します。実行するPythonのコードは03.create_target_vector_table.pyです。

Column名 Data Type Size
NAME VARCHAR2 100 BYTE
CODE VARCHAR2 100 BYTE
SOURCE_COLUMN VARCHAR2 100 BYTE
VECTOR_COLUMN VECTOR

Source Tableと同様にSQL文でTableを作成するコードです。Vector型のカラムが追加されたテーブルを新しく作成しています。

with oracledb.connect(user=UN, password=PW, dsn=DSN) as connection:
   with connection.cursor() as cursor:
       create_table_sql = f"""
           CREATE TABLE {td.target_table}
           (
               {td.name_column} VARCHAR2(100 BYTE),
               {td.code_column} VARCHAR2(100 BYTE),
               {td.source_column} VARCHAR2(100 BYTE),
               {td.vector_column} VECTOR
           )
       """
       cursor.execute(create_table_sql)
    connection.commit()

作成が完了すると以下のような内容が出力されます。

$ python 03.create_target_vector_table.py 
Create VECTOR type Table.

4. OCI Generative AIを使ってEmbedしたデータの保存

この記事のメインとなる10万行のデータをEmbedしてTarget Tableに保存します。
1行ずつEmbedしてTableをUpdateする方法では、かなりの時間がかかってしまいます。私の環境では約60時間かかりました。
そこで今回は速度を向上させるために以下の処理を行いました。

実行するコードは04.embed_insert_texts.pyのコードです。
各処理について順番に説明していきます。

OCI Generative AIを使ってTextデータをまとめてEmbed

TextデータをEmbedしCSVに保存している処理のコードを以下に記載しています。

def transform_and_dump_to_csv(batch: np.ndarray) -> None:
    """Transform and load data to CSV"""
    response = embed_texts(batch[str.lower(td.source_column)].tolist())
    batch[td.vector_column] = response
    to_csv(batch=batch)

まずEmbedの処理から説明します。
TextデータをChicago RegionのOCI Generative AIを使ってEmbedしています。
今回はAPIキー認証でアクセスしており、cohere.embed-multilingual-v3.0を使っています。
OCI Generative AIのEmbedのモデルでは、最大96セットのデータをまとめてEmbed可能です。
OCI Generative AIのModelの種類や制限は以下のドキュメントをご確認ください。
https://docs.oracle.com/en-us/iaas/Content/generative-ai/pretrained-models.htm

以下にTextデータをEmbedしているコードを記載しています。

def embed_texts(texts: np.ndarray) -> list:
    """Embed texts"""
    texts_vector = utils.embed_documents(texts)
    return texts_vector

実際にEmbeddingの処理をしているコードはutils.pyに記載しています。

def embed_documents(texts: List[str]) -> List[List[float]]:
    embed_text_response = generative_ai_inference_client.embed_text(
        embed_text_details=oci.generative_ai_inference.models.EmbedTextDetails(
            inputs=texts,
            serving_mode=oci.generative_ai_inference.models.OnDemandServingMode(
                model_id="cohere.embed-multilingual-v3.0"),
            compartment_id=OCI_COMPARTMENT_ID,
            input_type="SEARCH_DOCUMENT"))

    embeddings = embed_text_response.data.embeddings
    return embeddings

メモリ効率化のためバッチでデータをCSVに出力

EmbedしたデータをDBに保存することを考えます。Tableに対して1行ずつInsertを行うと非常に時間がかかってしまいます。そこで今回はデータをまとめてInsertします。

Cohereの場合1024次元の数値データとなり、10万行すべてのEmbedしたデータをメモリに展開することは現実的ではありません。そこで、一度EmbedしたデータをCSVに出力した後、TableにInsert処理を行います。

以下にEmbedしたデータをCSVに出力するコードを記載しています。96セットずつCSVファイルが作成されます。

def to_csv(batch: np.ndarray) -> None:
    """Dump data to CSV"""
    pd.DataFrame(batch).to_csv(f"{CSV_DIRECTORY_PATH}/insert-data-{time.time()}.csv", header=False, index=False)

マルチスレッド処理

以下のコードでEmbedとCSVへの出力の処理をマルチスレッドで行います。
使用しているCPU数に応じてNO_OF_PROCESSORSの値が決まり、2スレッド以上の場合は並行処理が行われます。

with mp.Pool(NO_OF_PROCESSORS) as mappool:
    mappool.starmap(transform_and_dump_to_csv, zip(batches))

CSVをまとめてTableにBulk Insert

最後にCSVのデータをTarget TableにBulk Insertします。
以下のコードでCSVからデータを取得しています。Target Tableには10,000データずつInsertされます。

def bulk_insert() -> None:
    """Bulk insert to sink table"""
    files = glob.glob(f"{CSV_DIRECTORY_PATH}/*.csv")
    insert_data = []
    for file in files:
        with open(file, "r") as csv_file:
            csv_reader = csv.reader(csv_file, delimiter=",")
            for line in csv_reader:
                insert_data.append(tuple(line))
                if (len(insert_data)) >= 10_000:
                    flush(data=insert_data)
                    insert_data = []
    if insert_data:
        flush(data=insert_data)

以下のコードでCSVから取得したデータをTarget TableにInsertしています。

def flush(data: list) -> None:
    """Flush the on-memory data"""
    with pool.acquire() as connection:
        connection.autocommit = True
        with connection.cursor() as cursor:
            cursor.setinputsizes(None, oracledb.DB_TYPE_VECTOR)

            insert_sql = f"""
                INSERT INTO {td.target_table} ({td.name_column}, {td.code_column}, {td.source_column}, {td.vector_column}) 
                VALUES(:1, :2, :3, :4)
                """
            try:
                cursor.executemany(statement=insert_sql, parameters=data, batcherrors=True, arraydmlrowcounts=True)
                print(f"Insert rows: {len(cursor.getarraydmlrowcounts())}")
            except oracledb.DatabaseError as e:
                finalizer(exception=e, cursor=cursor, connection=connection)
            except KeyboardInterrupt as e:
                finalizer(exception=e, cursor=cursor, connection=connection)

最後に、これらの処理の関数を順番に実行します。
上記で説明した処理のほかにCSVを保存するディレクトリの作成、削除やInsertしたデータを確認する処理を行っています。

if __name__ == "__main__":
    start_time = time.time()
    auth_check()
    csv_dir_check()
    batches = extract_all_data()
    print(f"batches {batches}")
    
    with mp.Pool(NO_OF_PROCESSORS) as mappool:
        mappool.starmap(transform_and_dump_to_csv, zip(batches))
    
    bulk_insert()
    data_checks()
    end_time = time.time()
    execution_time = end_time - start_time
    print("Data vectorization process is complete.")
    print("Total Run Time ", execution_time)
    delete_dir()

処理が問題なく終了すると以下のように、ベクトル化した数値のデータの一部と合計の処理時間が出力されます。

$ python 04.embed_save_texts.py
Auth Success
Status:200
...
.0295257568359375, -0.045745849609375, -0.0303802490234375, -0.0028781890869140625, -0.006397247314453125, 0.0224609375, 0.067626953125, -0.04595947265625]))]
Data vectorization process is complete.
Total Run Time  199.31680464744568
success delete csv directory

私の2OCPU(4スレッド)の環境では、10万行のデータは約200秒でEmbedとInsertの処理が完了しています。

参考までに、100万行で行った場合は約2,000秒でした。

SQL> SELECT COUNT(*) FROM TARGET_TABLE; 

  COUNT(*)
----------
   1000000
$ python 04.embed_insert_texts.py 
...
76141357421875, 0.039642333984375]))]
Data vectorization process is complete.
Total Run Time  1963.9059801101685
success delete csv directory

5. Vector Search

簡単なベクトル検索を行います。

実行するPythonのコードは05.execute_vector_search.pyです。
このコードでは実行時に入力された文章をEmbedし、04.embed_insert_texts.pyで作成したベクトル列の値とベクトルの距離が近いものを検索します。

def query_text(text: str):
    embed_text = array.array('f', utils.embed_documents([text])[0])

    print(f"Start Vector Search")
    try:
        with oracledb.connect(user=UN, password=PW, dsn=DSN) as connection:
            with connection.cursor() as cursor:
                cursor.setinputsizes(oracledb.DB_TYPE_VECTOR)
                select_sql = f"""
                    SELECT
                        {td.name_column},
                        {td.source_column},
                        VECTOR_DISTANCE({td.vector_column}, :1, COSINE) as distance
                    FROM
                        {td.target_table}
                    ORDER BY distance
                    FETCH FIRST 3 ROWS ONLY
                """
                cursor.execute(select_sql, [embed_text])

                print(f"============Results============")
                index = 1
                for row in cursor:
                    print(f"{index}: {row}")
                    index += 1

            connection.commit()
    except Exception as e:
        print("Error Vector Search:", e)
        
    print(f"End Vector Search")

if __name__ == "__main__":
    args = sys.argv
    query = args[1]
    print(f"Search Text:{query}")
    query_text(query)

実行すると以下の結果が出力されます。

$ python 05.execute_vector_search.py '23'
Search Text:23
Start Vector Search
============Results============
1: ('Product 23', 'Search Keyword 23', 0.19899375827306565)
2: ('Product 233', 'Search Keyword 233', 0.24061140639163003)
3: ('Product 231', 'Search Keyword 231', 0.25913973326091844)
End Vector Search

今回の場合、実行時に23を入力しています。この23がEmbedされ23に近い値として、Search Keyword 23, Search Keyword 233, Search Keyword 231が検索されています。

6. Table削除

最後に以下のコードで作成したTableを削除できます。実行するコードは06.drop_table.pyです。

with oracledb.connect(user=UN, password=PW, dsn=DSN) as connection:
    with connection.cursor() as cursor:

        cursor.execute(f"""
            DROP TABLE {td.source_table}
        """)
        print(f"Drop Table {td.source_table}")
        
        cursor.execute(f"""
            DROP TABLE {td.target_table}
        """)
        print(f"Drop Table {td.target_table}")
        
        connection.commit()

削除が完了すると以下のような内容が出力されます。

$ python 06.drop_table.py 
Start Drop Table
Drop Table SOURCE_TABLE
Drop Table TARGET_TABLE
End Drop Table

終わりに

今回は、Oracle Database 23aiでVector型のデータが扱えるようになったので、10万行のデータをベクトル化してTableに保存してみました。ベクトル検索を取り入れる上で、既存の大量のデータをベクトルデータにEmbedし如何に効率的に保存できるかは重要な観点だと思います。

3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?