はじめに
Knowledge Bases for Amazon Bedrock では S3, Confluence, Sharepoint, Salesforce, ウェブページなどのデータソースをサポートしています。
データはドキュメントという単位で管理されドキュメントには付随するメタデータも定義することが可能です。
最もシンプルなデータの投入方法としては S3 に直接ファイルを投入し同期することです。その際メタデータを Lambda で自動生成するフックを活用しメタデータ作成を自動化することも可能です。
一方で Knowledge Bases for Amazon Bedrock に投入したいデータがテーブルデータの場合(例: データベースやログサービスからエクスポートした CSV データ)の場合、CSV ファイルをそのままアップロードした場合はファイルが一つのドキュメントとして管理されます。通常の CSV であればファイルごとでも要件として問題ないですが、レコードごとに検索をしたい場合(例: 記事やログなど長文テキストがある場合等)は要件に合いません。そのため適切な処理を施して投入する必要があります。
この記事ではテーブルデータをレコード単位で適切なメタデータを作成しながら Knowledge Bases for Amazon Bedrock にインポートするためのサンプルスクリプトを紹介します。
2 つのテーブルデータのインポート方法
現状 Knowledge Bases for Amazon Bedrock にテーブルデータをインポートする方法は2通りあります。
- Advanced Parsing (CSV Parse 機能) によるインポート (2024/07/10 に追加された新機能)
- ファイル変換によるインポート
Advanced Parsing (CSV Parse 機能) は、CSV ファイルを S3 にアップロードし、Knowledge Bases 側でファイルを行ごとにパースしてコンテンツとメタデータを作成する手法です。Knowledge Bases から返される URL は CSV ファイルとなります。この手法のメリットは、CSV ファイルの準備が簡単にできる点にあります。一方で注意点として、Knowledge Bases から返されるのは CSV ファイルの URL とチャンク(デフォルト300トークン)なので、レコードが小さい場合は問題ありませんがレコードが大きい場合に元データのレコード全体を見るには CSV をダウンロードしたり、S3 に対してクエリを行ったり別途処理が必要になります。
この処理を軽減する方法としては、同時に追加された新機能の Hierarchical Parsing を組み合わせることで、設定した親チャンクの大きさ(最大 8192 トークン)までレコードを返すことができます。注意点としては、1件のレスポンスが大きくなるため、検索結果件数が多いと全体のレスポンスサイズが大きくなります。
一方のファイル変換は、CSV ファイルを自前のスクリプトで行ごとにファイル(テキストファイルなど)とメタデータファイルに変換し、S3 にアップロードする手法です。Knowledge Bases から返される URL は変換したファイルとなります。この手法のメリットは、アプリ側で個別のレコードのファイルを取得し、テキストファイルをページ内で表示したりブラウザタブで開いたりと、柔軟な表示が可能になる点です。注意点は、ファイル変換のスクリプトを用意する必要がある点と、大量の小さいファイルが生成された場合に S3 リクエスト数が増え、インポート時間がかかる可能性があることです。
どちらの場合も Knowldge Bases のクォータである1つのファイルの最大サイズ 50 MB の制約が適用されます。
手法 | メリット | 注意点 |
---|---|---|
Advanced Parsing | - CSVファイルの準備が簡単 | - ダウンロードは CSV 単位 - レコード全体の取得には別途処理が必要 |
Advanced Parsing + Hierarchical Parsing |
- レコード全体(設定した親チャンクの大きさまで)を返せる - CSVファイルの準備が簡単 |
- ダウンロードは CSV 単位 - 検索結果件数が多いとレスポンスサイズが大きくなる |
ファイル変換 | - 個別のファイルのレコード全体を取得可能 (テキストファイルをページ内で表示/ブラウザタブで表示) | - ファイル変換のスクリプトが必要 - 大量の小さいファイルが生成された場合 S3 リクエスト数が増えインポート時間がかかる可能性がある |
RAG として利用する上ではファイルのダウンロードはそこまで考えなくてもいいので Advanced Parsing + Hierarchical Parsing で多くのユーズケースには対応できます。レコードが大きく元データのレコードを別個に参照したい場合はファイル変換も検討すると良いでしょう。
それでは具体的な手順の紹介を行います。
インポート手順
以下の検証では Kaggle の News Category Dataset を CSV 化したものをサンプルデータとして利用しています。
必要に応じて書き換えてご利用ください。
Advanced Parsing によるインポート
まずは Advanced Parsing によるインポートの手順です。
まずは必要な初期化を行います。データを S3 に投入するため、Knowledge Bases と同期する S3 バケットとパスを指定します。
import csv
import os
import json
from datetime import datetime
import boto3
import hashlib
# 環境変数からAWS認証情報を読み込む
session = boto3.Session()
# S3クライアントを作成する
s3_client = session.client('s3')
# CSVファイルのパス
csv_file = 'data.csv'
# S3バケット名
bucket_name = 'xxxxxx'
# テキストファイルとメタデータファイルを保存するS3プレフィックス
prefix = 'news-csv/'
続いて CSV ファイルの整形を行います。特にメタデータとして扱う列を修正します。
Knowledge Bases のメタデータは string, number, boolean, string[] しかサポートしていないため、日付は年・月・日に分割する必要があります。
import pandas as pd
# CSVファイルを読み込む
df = pd.read_csv(csv_file, parse_dates=['date'])
# 日付列を年、月、日に分割
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
df['day'] = df['date'].dt.day
# 新しいCSVファイルに書き出す
processed_file = "data_processed.csv"
# only save first 1000
df = df.head(1000)
df.to_csv(processed_file, index=False)
続いてメタデータファイルを作成します。CSV の Advanced Parsing に必要な JSON スキーマはこちらのドキュメントで定義されています。
メタデータファイルを作成したら CSV とともに S3 にアップロードします。
metadata = {
"metadataAttributes": {
"source": "News Category Dataset",
},
"documentStructureConfiguration": {
"type": "RECORD_BASED_STRUCTURE_METADATA",
"recordBasedStructureMetadata": {
"contentFields": [
{
"fieldName": "short_description"
}
],
"metadataFieldsSpecification": {
"fieldsToInclude": [
{
"fieldName": "year"
},
{
"fieldName": "month"
},
{
"fieldName": "day"
}
],
}
}
}
}
metadata_file = f"{processed_file}.metadata.json"
with open(metadata_file, "w") as f:
json.dump(metadata, f)
s3_client.upload_file(processed_file, bucket_name, f"{prefix}{processed_file}")
s3_client.upload_file(metadata_file, bucket_name, f"{prefix}{metadata_file}")
これでデータを S3 に投入することができました。Knowldge Base for Amazon Bedrock でデータを投入した S3 を指定し同期を行うことでデータの取り込みが行われます。
ファイル変換によるインポート
続いてはファイル変換によるインポートの手順です。
まずは必要な初期化を行います。データを S3 に投入するため、Knowledge Bases と同期する S3 バケットとパスを指定します。
import csv
import os
import json
from datetime import datetime
import boto3
import hashlib
# 環境変数からAWS認証情報を読み込む
session = boto3.Session()
# S3クライアントを作成する
s3_client = session.client('s3')
# CSVファイルのパス
csv_file = 'data.csv'
# S3バケット名
bucket_name = 'xxxxxx'
# テキストファイルとメタデータファイルを保存するS3プレフィックス
prefix = 'news/'
続いて、CSV からレコードをループして読み取り、コンテンツの中身となるドキュメントとメタデータに分けます。
この際、メタデータ は string, number, boolean, string[] しかサポートしていないため、日付は年・月・日に分割する必要があります。
# CSVファイルを読み込む
count = 0
with open(csv_file, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
# 検証では最初の 1000 件のみ
count += 1
if count > 1000:
break
# 以下処理の例
# タイトルと本文からテキストファイルを作成
title = row['headline']
content = row['short_description']
title_hash = hashlib.sha256(title.encode('utf-8')).hexdigest()
text_file = f"{title_hash}.txt"
with open(text_file, 'w', encoding='utf-8') as f:
f.write(f"{title}\n\n{content}")
# 日付からメタデータファイルを作成
date_str = row['date']
date = datetime.strptime(date_str, '%Y-%m-%d')
metadata = {
'metadataAttributes': {
'title': title,
'year': date.year,
'month': date.month,
'day': date.day
}
}
metadata_file = f"{text_file}.metadata.json"
with open(metadata_file, 'w', encoding='utf-8') as f:
json.dump(metadata, f)
# テキストファイルとメタデータファイルをS3にアップロード
s3_client.upload_file(text_file, bucket_name, f"{prefix}{text_file}")
s3_client.upload_file(metadata_file, bucket_name, f"{prefix}{metadata_file}")
# 一時ファイルを削除
os.remove(text_file)
os.remove(metadata_file)
これでデータを S3 に投入することができました。Knowldge Base for Amazon Bedrock でデータを投入した S3 を指定し同期を行うことでデータの取り込みが行われます。
検証
続いてインポートしたデータが適切にレコードごとに検索可能か、そしてメタデータでフィルタリングすることができるか検証します。
まず Knowledge Bases のクライアントを作成します。
import json
import boto3
knowledgeBaseId = "XXXXXXXX"
client = boto3.client('bedrock-agent-runtime', region_name="us-west-2")
続いて通常の検索リクエストを実行します。
# 通常の検索リクエスト
query = "How many people got COVID?"
response = client.retrieve(
knowledgeBaseId=knowledgeBaseId,
retrievalQuery={
'text': query
},
retrievalConfiguration={
'vectorSearchConfiguration': {
'numberOfResults': 5,
'overrideSearchType': 'HYBRID', # KB でハイブリッド検索が有効の場合
}
}
)
print(response["retrievalResults"])
print([item["metadata"]["year"] for item in response["retrievalResults"]])
複数の記事データがレコードの単位でヒットしました。そしてヒットした記事の公開年は 2022 と 2021 のものがヒットしました。
[{'content': {'text': 'The Biggest COVID Issues People Bring Up In Therapy Mental health experts reveal the common pandemic-related struggles people discuss in sessions, plus advice on how to cope.'}, 'location': {'s3Location': {'uri':
...
'x-amz-bedrock-kb-chunk-id': '1%3A0%3Ag_LRlpEB-kCaZt0rv0-7'}, 'score': 0.40674815}]
[2022.0, 2022.0, 2022.0, 2022.0, 2021.0]
続いて、同じ検索キーワードで今度は公開年によるフィルタリングを行います。
# 日付でのフィルタ付きの検索リクエスト
query = "How many people got COVID?"
response = client.retrieve(
knowledgeBaseId=knowledgeBaseId,
retrievalQuery={
'text': query
},
retrievalConfiguration={
'vectorSearchConfiguration': {
'numberOfResults': 5,
'overrideSearchType': 'HYBRID', # KB でハイブリッド検索が有効の場合
'filter': {
'lessThan': {
'key': 'year',
'value': 2022
}
},
}
}
)
print(response["retrievalResults"])
print([item["metadata"]["year"] for item in response["retrievalResults"]])
ヒットした記事を見ると lessThan 2022
と設定したため 2022 年の記事はヒットせず 2021 年の記事のみヒットしました。メタデータによるフィルタも正しく機能しています。
[{'content': {'text': "How To Fade The Dark Spots On Your Skin That Summer Left Behind Whether you've got melasma or other discoloration caused by the sun, these products can help."}, 'location': {'s3Location':
...
'x-amz-bedrock-kb-chunk-id': '1%3A0%3AJfLUlpEB-kCaZt0rLFLK'}, 'score': 0.3974835867806897}]
[2021.0, 2021.0, 2021.0, 2021.0, 2021.0]
まとめ
本記事では Knowledge Bases のドキュメントとメタデータの管理方法についての説明、そしてテーブルデータをインポートする際の2つの手法のメリット・デメリットそして手順についてご紹介しました。同様の要件の際には参考にしていただければ幸いです。