はじめに
Oracle Database 23aiが利用可能になりました。新機能としてVector型のデータを持てるようになり、ベクトル型を使ったベクトル検索が可能になっています。
そこで、既存のデータに対してベクトル検索を行うには、データをベクトル化してDBに保存する必要があります。しかし、社内の既存データは膨大で、一度に大量のデータをDB内でベクトル化しようとするとある程度の時間がかかってしまいますし、本番環境のDBのリソースを使いベクトル化を行えないケースもあります。
そこで、今回は数十万行の大規模データを別の仮想マシン上でベクトル化して、そのベクトルデータをOracle Database 23aiに保存する方法を考えてみます。
Oracle Database 23aiとは
Oracle Database 23aiではAIに重点を置いており、AIを使って生成されたベクトルデータを保存して効率的にオブジェクトの類似性を検索できるAI Vector Searchという機能が追加されています。ドキュメント、画像、ビデオ、サウンドなど様々なオブジェクトをベクトル化することで、それらの間の類似度を検索できるようになります。Oracle Database 23aiでは、この類似度検索とビジネスデータの検索を簡単に組み合わせることができます。
Oracle Database 23aiでVector Searchを試してみたい方はOCI Tutorialをご確認ください。
OCI Generative AIとは
OCI Generative AIはOracle Cloud Infrastructureの事前学習済みのフルマネージドな生成AIのサービスです。詳しくはこちらの記事をご覧ください。
OCI Generative AIには4つの機能があります。今回はテキスト埋め込みのモデルcohere.embed-multilingual-v3.0
を使います。
機能名 | モデル |
---|---|
チャット・モデル |
cohere.command-r-16k , meta.llama-3-70b-instruct
|
テキスト生成 |
cohere.command , cohere.command-light , meta.llama-2-70b-chat
|
テキスト要約 | cohere.command |
テキスト埋め込み |
cohere.embed-english-v3.0 , cohere.embed-multilingual-v3.0 ,cohere.embed-english-light-v3.0 , cohere.embed-multilingual-light-v3.0
|
アーキテクチャ
今回は以下のようなアーキテクチャを考えます。Base DatabaseのSource Tableからデータをベクトル化し、Target Tableに保存します。
前提
- Oracle Cloud Infrastructureの環境
- ネットワークリソースの構築
- Virtual Machineの構築
- Oracle Base Database 23aiの構築
- Chicago or Frankfurt Regionのサブスクライブ
- Virtual Machineの環境構築
- Python環境
- oci-sdkが利用できる環境
手順
- 事前準備
- Source Table作成
- Source データのInsert
- Target Tableの作成
- OCI Generative AIを使ってEmbedしたデータの保存
- Vector Search
- Table削除
0. 事前準備
OCIの環境構築
上記のアーキテクチャを構築します。
Oracle Database 23aiの構築
今回はデータソースと保存先としてOracle Database 23aiを使います。
Oracle Database 23aiの構築方法は、OCI Tutorialの「Oracle Database編 - Base Database Service (BaseDB) を使ってみよう」か、「Oracle Database - Oracle AI Vector Searchを使ってみよう」を参考にしてください。
Virtual Machineの構築
今回はPythonの実行環境としてOCI上のVirtual Machineを使用します。
こちらのOCI Tutorial「その3 - インスタンスを作成する」を参考にVirtual Machineを作成してください。また、追加でPythonをインストールして実行環境を作成してください。Python 3.11.0rc1で動作確認済みです。
$ python --version
Python 3.11.0rc1
GitリポジトリのClone
今回使うソースコードはこのGitHubに公開しています。
ご自身の環境にGitリポジトリをCloneします。
$ git clone https://github.com/sh-sho/embed_insert_100k_data_kit.git
この記事ではsrc
ディレクトリで作業を行います。src
に移動します。
$ cd embed_insert_100k_data_kit/src/
$ tree -a
.
├── .env.example
├── 01.create_source_table.py
├── 02.insert_data.py
├── 03.create_target_vector_table.py
├── 04.embed_insert_texts.py
├── 05.execute_vector_search.py
├── 06.drop_table.py
├── requirements.txt
├── table_detail.py
└── utils.py
次にPythonのライブラリをインストールします。requirements.txt
に必要なライブラリが記載されているので以下のコマンドでインストールをします。
$ pip install -r requirements.txt
Vector Searchを行うためpython-oracledb
のバージョンは2.2.1以上のものを使用してください。
環境変数の設定
次に環境変数を設定します。
.env.example
ファイルをコピーして.env
ファイルを作成します。
$ cp .env.example .env
.env
の内容をご自身の環境に合わせて書き換えます。
UN=username
PW=password
DSN=dsn
OCI_CONFIG_FILE=~/.oci/config
OCI_COMPARTMENT_ID=ocid1.compartment.oc1..aaaaaaaaxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
CSV_DIRECTORY_PATH=/tmp_data
CONFIG_PROFILE=CHICAGO
CONFIG_PROFILE
はOCI Generative AIをCallする際に使用するのでChicago (or Frankfurt) RegionのProfile名を指定してください。
以上が事前準備です。
1. Source Table作成
Source Tableを作成します。実行するPythonのコードは01.create_source_table.py
です。Table名はSOURCE_TABLE
で、以下のような構成です。
Column名 | Data Type | Size |
---|---|---|
PRODUCT_NAME | VARCHAR2 | 100 BYTE |
PRODUCT_CODE | VARCHAR2 | 100 BYTE |
PRODUCT_SEARCH | VARCHAR2 | 100 BYTE |
具体的な処理の内容としては、oracledbでコネクションを張り、SQL文でTableのcreateを行っています。
with oracledb.connect(user=UN, password=PW, dsn=DSN) as connection:
with connection.cursor() as cursor:
cursor.execute(f"""
CREATE TABLE {td.source_table}
(
{td.name_column} VARCHAR2(100 BYTE),
{td.code_column} VARCHAR2(100 BYTE),
{td.source_column} VARCHAR2(100 BYTE)
)
""")
connection.commit()
01.create_source_table.py
のコードを実行すると、以下のような出力がされます。
$ python 01.create_source_table.py
Start Creating Table
End Creating Table
2. Source データのInsert
SourceとなるデータをTableにInsertします。実行するPythonのコードは02.insert_data.py
です。
今回はサンプルとして以下のようなデータを10万行Insertします。
Product 1, Product Code 1 , Search Keyword 1
Product 2, Product Code 2 , Search Keyword 2
...
Product 100000, Product Code 100000 , Search Keyword 100000
具体的な処理は、まずSource Tableの内容をDeleteし、sample_data
に10万行の配列データとして作成し、Source TableにInsertしています。詳細は02.insert_data.py
のコードをご確認ください。
with oracledb.connect(user=UN, password=PW, dsn=DSN) as connection:
with connection.cursor() as cursor:
cursor.execute(f"""
DELETE from {td.source_table}
""")
sample_data = [("Product " + str(i), "Product Code " + str(i) ,"Search Keyword " + str(i)) for i in range(1, 10001)]
insert_sql = f"""
INSERT INTO {td.source_table} ({td.name_column}, {td.code_column}, {td.source_column})
VALUES (:1, :2, :3)
"""
cursor.setinputsizes(None, oracledb.STRING)
cursor.executemany(insert_sql, sample_data)
connection.commit()
02.insert_data.py
を実行すると以下の内容が出力されます。
$ python 02.insert_data.py
Start Insert Data
End Insert Data
テーブルを確認すると100,000データが保存されています。
SQL> SELECT COUNT(*) FROM SOURCE_TABLE;
COUNT(*)
----------
100000
3. Target Tableの作成
Target Tableを作成します。実行するPythonのコードは03.create_target_vector_table.py
です。
Column名 | Data Type | Size |
---|---|---|
NAME | VARCHAR2 | 100 BYTE |
CODE | VARCHAR2 | 100 BYTE |
SOURCE_COLUMN | VARCHAR2 | 100 BYTE |
VECTOR_COLUMN | VECTOR |
Source Tableと同様にSQL文でTableを作成するコードです。Vector型のカラムが追加されたテーブルを新しく作成しています。
with oracledb.connect(user=UN, password=PW, dsn=DSN) as connection:
with connection.cursor() as cursor:
create_table_sql = f"""
CREATE TABLE {td.target_table}
(
{td.name_column} VARCHAR2(100 BYTE),
{td.code_column} VARCHAR2(100 BYTE),
{td.source_column} VARCHAR2(100 BYTE),
{td.vector_column} VECTOR
)
"""
cursor.execute(create_table_sql)
connection.commit()
作成が完了すると以下のような内容が出力されます。
$ python 03.create_target_vector_table.py
Create VECTOR type Table.
4. OCI Generative AIを使ってEmbedしたデータの保存
この記事のメインとなる10万行のデータをEmbedしてTarget Tableに保存します。
1行ずつEmbedしてTableをUpdateする方法では、かなりの時間がかかってしまいます。私の環境では約60時間かかりました。
そこで今回は速度を向上させるために以下の処理を行いました。
実行するコードは04.embed_insert_texts.py
のコードです。
各処理について順番に説明していきます。
OCI Generative AIを使ってTextデータをまとめてEmbed
TextデータをEmbedしCSVに保存している処理のコードを以下に記載しています。
def transform_and_dump_to_csv(batch: np.ndarray) -> None:
"""Transform and load data to CSV"""
response = embed_texts(batch[str.lower(td.source_column)].tolist())
batch[td.vector_column] = response
to_csv(batch=batch)
まずEmbedの処理から説明します。
TextデータをChicago RegionのOCI Generative AIを使ってEmbedしています。
今回はAPIキー認証でアクセスしており、cohere.embed-multilingual-v3.0
を使っています。
OCI Generative AIのEmbedのモデルでは、最大96セットのデータをまとめてEmbed可能です。
OCI Generative AIのModelの種類や制限は以下のドキュメントをご確認ください。
https://docs.oracle.com/en-us/iaas/Content/generative-ai/pretrained-models.htm
以下にTextデータをEmbedしているコードを記載しています。
def embed_texts(texts: np.ndarray) -> list:
"""Embed texts"""
texts_vector = utils.embed_documents(texts)
return texts_vector
実際にEmbeddingの処理をしているコードはutils.py
に記載しています。
def embed_documents(texts: List[str]) -> List[List[float]]:
embed_text_response = generative_ai_inference_client.embed_text(
embed_text_details=oci.generative_ai_inference.models.EmbedTextDetails(
inputs=texts,
serving_mode=oci.generative_ai_inference.models.OnDemandServingMode(
model_id="cohere.embed-multilingual-v3.0"),
compartment_id=OCI_COMPARTMENT_ID,
input_type="SEARCH_DOCUMENT"))
embeddings = embed_text_response.data.embeddings
return embeddings
メモリ効率化のためバッチでデータをCSVに出力
EmbedしたデータをDBに保存することを考えます。Tableに対して1行ずつInsertを行うと非常に時間がかかってしまいます。そこで今回はデータをまとめてInsertします。
Cohereの場合1024次元の数値データとなり、10万行すべてのEmbedしたデータをメモリに展開することは現実的ではありません。そこで、一度EmbedしたデータをCSVに出力した後、TableにInsert処理を行います。
以下にEmbedしたデータをCSVに出力するコードを記載しています。96セットずつCSVファイルが作成されます。
def to_csv(batch: np.ndarray) -> None:
"""Dump data to CSV"""
pd.DataFrame(batch).to_csv(f"{CSV_DIRECTORY_PATH}/insert-data-{time.time()}.csv", header=False, index=False)
マルチスレッド処理
以下のコードでEmbedとCSVへの出力の処理をマルチスレッドで行います。
使用しているCPU数に応じてNO_OF_PROCESSORS
の値が決まり、2スレッド以上の場合は並行処理が行われます。
with mp.Pool(NO_OF_PROCESSORS) as mappool:
mappool.starmap(transform_and_dump_to_csv, zip(batches))
CSVをまとめてTableにBulk Insert
最後にCSVのデータをTarget TableにBulk Insertします。
以下のコードでCSVからデータを取得しています。Target Tableには10,000データずつInsertされます。
def bulk_insert() -> None:
"""Bulk insert to sink table"""
files = glob.glob(f"{CSV_DIRECTORY_PATH}/*.csv")
insert_data = []
for file in files:
with open(file, "r") as csv_file:
csv_reader = csv.reader(csv_file, delimiter=",")
for line in csv_reader:
insert_data.append(tuple(line))
if (len(insert_data)) >= 10_000:
flush(data=insert_data)
insert_data = []
if insert_data:
flush(data=insert_data)
以下のコードでCSVから取得したデータをTarget TableにInsertしています。
def flush(data: list) -> None:
"""Flush the on-memory data"""
with pool.acquire() as connection:
connection.autocommit = True
with connection.cursor() as cursor:
cursor.setinputsizes(None, oracledb.DB_TYPE_VECTOR)
insert_sql = f"""
INSERT INTO {td.target_table} ({td.name_column}, {td.code_column}, {td.source_column}, {td.vector_column})
VALUES(:1, :2, :3, :4)
"""
try:
cursor.executemany(statement=insert_sql, parameters=data, batcherrors=True, arraydmlrowcounts=True)
print(f"Insert rows: {len(cursor.getarraydmlrowcounts())}")
except oracledb.DatabaseError as e:
finalizer(exception=e, cursor=cursor, connection=connection)
except KeyboardInterrupt as e:
finalizer(exception=e, cursor=cursor, connection=connection)
最後に、これらの処理の関数を順番に実行します。
上記で説明した処理のほかにCSVを保存するディレクトリの作成、削除やInsertしたデータを確認する処理を行っています。
if __name__ == "__main__":
start_time = time.time()
auth_check()
csv_dir_check()
batches = extract_all_data()
print(f"batches {batches}")
with mp.Pool(NO_OF_PROCESSORS) as mappool:
mappool.starmap(transform_and_dump_to_csv, zip(batches))
bulk_insert()
data_checks()
end_time = time.time()
execution_time = end_time - start_time
print("Data vectorization process is complete.")
print("Total Run Time ", execution_time)
delete_dir()
処理が問題なく終了すると以下のように、ベクトル化した数値のデータの一部と合計の処理時間が出力されます。
$ python 04.embed_save_texts.py
Auth Success
Status:200
...
.0295257568359375, -0.045745849609375, -0.0303802490234375, -0.0028781890869140625, -0.006397247314453125, 0.0224609375, 0.067626953125, -0.04595947265625]))]
Data vectorization process is complete.
Total Run Time 199.31680464744568
success delete csv directory
私の2OCPU(4スレッド)の環境では、10万行のデータは約200秒でEmbedとInsertの処理が完了しています。
参考までに、100万行で行った場合は約2,000秒でした。
SQL> SELECT COUNT(*) FROM TARGET_TABLE;
COUNT(*)
----------
1000000
$ python 04.embed_insert_texts.py
...
76141357421875, 0.039642333984375]))]
Data vectorization process is complete.
Total Run Time 1963.9059801101685
success delete csv directory
5. Vector Search
簡単なベクトル検索を行います。
実行するPythonのコードは05.execute_vector_search.py
です。
このコードでは実行時に入力された文章をEmbedし、04.embed_insert_texts.py
で作成したベクトル列の値とベクトルの距離が近いものを検索します。
def query_text(text: str):
embed_text = array.array('f', utils.embed_documents([text])[0])
print(f"Start Vector Search")
try:
with oracledb.connect(user=UN, password=PW, dsn=DSN) as connection:
with connection.cursor() as cursor:
cursor.setinputsizes(oracledb.DB_TYPE_VECTOR)
select_sql = f"""
SELECT
{td.name_column},
{td.source_column},
VECTOR_DISTANCE({td.vector_column}, :1, COSINE) as distance
FROM
{td.target_table}
ORDER BY distance
FETCH FIRST 3 ROWS ONLY
"""
cursor.execute(select_sql, [embed_text])
print(f"============Results============")
index = 1
for row in cursor:
print(f"{index}: {row}")
index += 1
connection.commit()
except Exception as e:
print("Error Vector Search:", e)
print(f"End Vector Search")
if __name__ == "__main__":
args = sys.argv
query = args[1]
print(f"Search Text:{query}")
query_text(query)
実行すると以下の結果が出力されます。
$ python 05.execute_vector_search.py '23'
Search Text:23
Start Vector Search
============Results============
1: ('Product 23', 'Search Keyword 23', 0.19899375827306565)
2: ('Product 233', 'Search Keyword 233', 0.24061140639163003)
3: ('Product 231', 'Search Keyword 231', 0.25913973326091844)
End Vector Search
今回の場合、実行時に23
を入力しています。この23
がEmbedされ23
に近い値として、Search Keyword 23
, Search Keyword 233
, Search Keyword 231
が検索されています。
6. Table削除
最後に以下のコードで作成したTableを削除できます。実行するコードは06.drop_table.py
です。
with oracledb.connect(user=UN, password=PW, dsn=DSN) as connection:
with connection.cursor() as cursor:
cursor.execute(f"""
DROP TABLE {td.source_table}
""")
print(f"Drop Table {td.source_table}")
cursor.execute(f"""
DROP TABLE {td.target_table}
""")
print(f"Drop Table {td.target_table}")
connection.commit()
削除が完了すると以下のような内容が出力されます。
$ python 06.drop_table.py
Start Drop Table
Drop Table SOURCE_TABLE
Drop Table TARGET_TABLE
End Drop Table
終わりに
今回は、Oracle Database 23aiでVector型のデータが扱えるようになったので、10万行のデータをベクトル化してTableに保存してみました。ベクトル検索を取り入れる上で、既存の大量のデータをベクトルデータにEmbedし如何に効率的に保存できるかは重要な観点だと思います。