Pythonから利用した際の忘備録です。
- CDKから
OpenSearchServerless
をDeployする - アプリから
OpenSearchServerless
に接続する -
OpenSearchServerless
に保存するデータの型を設定する -
OpenSearchServerless
にデータを登録する -
OpenSearchServerless
に対してベクトル検索を行う - 料金について
1. CDKでOpenSearchServerless
をDeployする
コレクション
データを保存するためのコレクションを作成する。
oss_collection = aws_oss.CfnCollection(
self,
"RagData",
name="rag-data",
type="VECTORSEARCH",
standby_replicas="DISABLED"
)
- standby_replicas
- 開発用にスタンバイレプリカを無効にする(費用が安くなる)
暗号化ポリシー
データの暗号化のためのポリシーを作成する。必須。
oss_encryption_policy = aws_oss.CfnSecurityPolicy(
self,
"RagDataEncryptionPolicy",
name="rag-data-encryption-policy",
type="encryption",
policy=json.dumps({
"Rules": [
{
"ResourceType": "collection",
"Resource": [f'collection/{oss_collection.name}']
}
],
"AWSOwnedKey": True
})
)
- AWSOwnedKey
- AWSが自動で管理してくれるキーを使う
ネットワークポリシー
アクセス許可するネットワークの設定。必須。
oss_network_policy = aws_oss.CfnSecurityPolicy(
self,
"RagDataNetworkPolicy",
name="rag-data-network-policy",
type="network",
policy=json.dumps([{
"Rules": [
{
"ResourceType": "collection",
"Resource": [
f"collection/{oss_collection.name}"
]
},
{
"ResourceType": "dashboard",
"Resource": [
f"collection/{oss_collection.name}"
]
}
],
"AllowFromPublic": True
}])
)
- AllowFromPublic
- 全てのネットワークから許可
アクセスポリシー
lambdaから使用したかったので、特定のlambdaからのアクセスを許可。
(lambda側のCDK設定でARNをRagLambdaRoleArn
という名称でExportしている)
rag_lambda_role_arn = Fn.import_value("RagLambdaRoleArn")
oss_access_policy = aws_oss.CfnAccessPolicy(
self,
"RagDataAccessPolicy",
name="rag-data-access-policy",
type="data",
policy=json.dumps([
{
"Rules": [
{
"ResourceType": "collection",
"Resource": [f"collection/{oss_collection.name}"],
"Permission": ["aoss:*"]
},
{
"ResourceType": "index",
"Resource": [f"index/{oss_collection.name}/*"],
"Permission": ["aoss:*"]
}
],
"Principal": [rag_lambda_role_arn]
}
])
)
依存関係の設定
コレクションが設定に依存するようにする。
oss_collection.add_dependency(oss_encryption_policy)
oss_collection.add_dependency(oss_access_policy)
oss_collection.add_dependency(oss_network_policy)
2. アプリからOpenSearchServerless
に接続する
opensearchpy
モジュールを利用して接続する。
boto3
のセッションから認証情報を取得して利用できる。
import boto3
from opensearchpy import OpenSearch, AWSV4SignerAuth, RequestsHttpConnection
from openai import OpenAI
session = boto3.Session()
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAuth(credentials, session.region_name, 'aoss')
oss_client = OpenSearch(
hosts=[{'host': "xxxx.ap-northeast-1.aoss.amazonaws.com", 'port': 443}],
http_auth=auth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
)
- hostsには作成したインスタンスのホスト名を指定する
- (
https://
は不要)
- (
3. OpenSearchServerless
に保存するデータの型を設定する
初めに一度だけやればOK。
oss_mapping = {
"settings": {
"index.knn": True,
},
"mappings": {
"properties": {
"type": {"type": "keyword"}, # フィルタ用
"title": {"type": "text"},
"body": {"type": "text"},
"embedding": {
"type": "knn_vector",
"dimension": 1536
}
}
}
}
oss_client.indices.put_index_template(
name="rag-data",
body={
"index_patterns": ["rag-data"],
"template": oss_mapping,
},
)
4. OpenSearchServerless
にデータを登録する
複数データをまとめて入れる。
インデックスの指定と登録するデータについて、交互に情報を配列に設定してOpenSearchServerless
に送る。
data = [
{
"index": {"_index": "rag-data"},
},
{
"type": "knowledge",
"title": "日本の祝日",
"body": "日本の祝日は、...(以降略)",
"embedding": [1, 2, 3, ...(以降略)] # bodyの内容をベクトル化したもの
}
]
oss_client.bulk(data)
5. OpenSearchServerless
に対してベクトル検索を行う
vec = [1, 2, 3, ...(以降略)] # ベクトルデータ
search_query = {
"query": {
"bool": {
"must": [
{
"knn": {
"embedding": {
"vector": vec,
"k": 3, # 検索するデータの数
}
}
}
],
"filter": [
{"term": {"type": "knowledge"}}, # フィルタ指定
]
}
}
}
response = oss_client.search(index='rag-data', body=search_query)
- typeでフィルタするようにしている
6. 料金について
-
OCU
というコンピューティング単位と、あとストレージやネットワーク等に対して課金される - 現状では常に1
OCU
が消費される模様- (個人で気軽に使うには割と高い)