1.完成したコード
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
from llama_index.readers.azstorage_blob import AzStorageBlobReader
from azure.storage.blob import (
BlobServiceClient,
ContainerClient,
generate_container_sas,
ContainerSasPermissions
)
from azure.ai.formrecognizer import DocumentAnalysisClient
from azure.core.credentials import AzureKeyCredential
from azure.ai.documentintelligence.models import (
AnalyzeDocumentRequest,
ContentFormat,
)
from azure.ai.documentintelligence import DocumentIntelligenceClient
import datetime
import re
from llama_index.vector_stores.azureaisearch import AzureAISearchVectorStore,IndexManagement
from azure.search.documents.indexes import SearchIndexClient
from llama_index.core import StorageContext,VectorStoreIndex
from llama_index.core.settings import Settings
from llama_index.core.node_parser import MarkdownElementNodeParser,SentenceSplitter,TokenTextSplitter
import tiktoken
from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding
from llama_index.llms.azure_openai import AzureOpenAI
import azure.functions as func
from dotenv import load_dotenv
load_dotenv("./scripts/.env")
blob_service_credential=os.environ["BLOB_CRE_KEY"]
blob_credential = AzureKeyCredential(blob_service_credential)
azure_credential = DefaultAzureCredential()
token_provider = get_bearer_token_provider(
DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default"
)
account_url = os.environ["BLOB_ACCOUNT"]
container_name = os.environ["BLOB_CONTAINER"]
# Azure Document IntelligenceのAPIに接続するためのクライアントを生成する
di_endpoint = os.environ["DOCUMENT_INTELLIGENCE_ENDPOINT"]
di_cre=os.environ["DI_CREDENTIAL_KEY"]
di_credential = AzureKeyCredential(di_cre)
di_client = DocumentAnalysisClient(endpoint=di_endpoint, credential=di_credential)
blob_service_client = BlobServiceClient(account_url=account_url, credential=blob_service_credential)
container_client=ContainerClient(account_url=account_url,container_name=container_name,credential=azure_credential)
search_service_api_key = os.environ["SEARCH_SERVICE_API_KEY"]
search_service_endpoint = os.environ["SEARCH_SERVICE_ENDPOINT"]
search_service_api_version = os.environ["SEARCH_SERVICE_API_VERSION"]
credential = AzureKeyCredential(search_service_api_key)
index_name=os.environ["INDEX_NAME"]
aoai_endpoint = os.environ["AOAI_ENDPOINT"]
aoai_api_version = os.environ["AOAI_API_VERSION"]
index_client = SearchIndexClient(
endpoint=search_service_endpoint,
credential=credential,
)
Settings.embed_model=AzureOpenAIEmbedding(
#設定していないと自動的に002のモデルが適用される
model= os.environ["EMBD_MODEL"],
deployment_name=os.environ["EMBD_DEPLOYMENT"] ,
azure_endpoint=os.environ["AOAI_ENDPOINT"],
api_version=os.environ["AOAI_API_VERSION"],
azure_ad_token_provider=token_provider,)
Settings.llm = AzureOpenAI(
model=os.environ["AOAI_MODEL"],
deployment_name=os.environ["AOAI_DEPLOYMENT"],
azure_ad_token_provider=token_provider,
azure_endpoint=aoai_endpoint,
api_version=aoai_api_version,
)
# インデックスを余計に増やしたくない場合-------------------------------------------------------------------------------------------
def delete_index():
"""
Azure AI Searchのインデックスを削除する
"""
client = SearchIndexClient(search_service_endpoint, azure_credential)
client.delete_index(index_name)
# blobのコンテナのsasを取得する--------------------------------------------------------------------------------------
def create_service_sas_container(container_client: ContainerClient, account_key: str):
start_time = datetime.datetime.now(datetime.timezone.utc)
expiry_time = start_time + datetime.timedelta(days=1)
sas_token = generate_container_sas(
account_name=container_client.account_name,
container_name=container_client.container_name,
account_key=account_key,
permission=ContainerSasPermissions(read=True),
expiry=expiry_time,
start=start_time
)
return sas_token
# nodeparserなど(旧Service_context)をグローバルで設定する-------------------------------------------------------------
text_splitter=TokenTextSplitter(chunk_size=1024,chunk_overlap=20,)
# node_parser=MarkdownElementNodeParser(llm=Settings.llm,)
tokenizer=tiktoken.get_encoding("cl100k_base").encode
Settings.text_splitter=text_splitter
Settings.tokenizer=tokenizer
# Settings.node_parser=node_parser
#blob storageとの接続(のちに関数化検討)-------------------------------------------------------------------------------
reader = AzStorageBlobReader(
account_url = account_url,
container_name = container_name,
credential = blob_service_credential,
)
pages=reader.load_data()
files=reader.list_resources_with_info()
# ファイル名だけとる
doc_name=[]
for file in files.keys():
doc_name.append(file)
documents=[]
# ファイル名ごとのメタデータを設定するループ
for name in doc_name:
# Blob のメタデータを取得
blob_metadata = files.get(name)
url_value=blob_metadata.get("sharepoint_url")
file_path=blob_metadata.get("metadata_storage_path")
# 型変換(keyは環境変数から取得する形にすれば変更が容易)
urldict={"sharepoint_url":url_value}
# DIを使う(のちに関数化検討)
di_client = DocumentIntelligenceClient(
endpoint=di_endpoint, credential=di_credential
)
# diがblobを呼び出すためのsasトークンの発行
sas_token=create_service_sas_container(container_client=container_client,account_key=blob_service_credential)
# urlにトークンを埋め込む
sas_url = f"{file_path}?{sas_token}"
# diによって解析を行う
poller = di_client.begin_analyze_document(
"prebuilt-layout",
AnalyzeDocumentRequest(url_source=sas_url),
output_content_format=ContentFormat.MARKDOWN,
)
di_content=[]
di_result=poller.result()
#NOTE:DIの解析結果をblobをreaderにかけた形に直すためページ分割する
di_content=re.split(r'<!-- PageBreak -->',di_result.content)
# updateに使うためのメタデータの箱
metadata={}
# documentsにDI後のチャンクを入れると同時にメタデータの付与も行う
i=0
for page in pages:
if name==page.metadata.get("file_name"):
page.metadata.update(urldict)
# page_labelがあるかどうかの判定、必ずしも消す必要はない
if len(page.metadata)>2:
del page.metadata["page_label"]
page.text=di_content[i]
documents.append(page)
i+=1
metadata_fields={}
# indexを削除する
delete_index()
# Vectorstoreを定義する
vector_store = AzureAISearchVectorStore(
search_or_index_client=index_client,
filterable_metadata_field_keys=metadata_fields,
index_name=index_name,
index_management=IndexManagement.CREATE_IF_NOT_EXISTS,
id_field_key="id",
chunk_field_key="chunk",
embedding_field_key="text_vector",
embedding_dimensionality=????,#portalと合わせる
metadata_string_field_key="metadata",
doc_id_field_key="doc_id",
language_analyzer="ja.microsoft",#ja.luceneというのもあるがAzureがベースなのでMS標準日本語を使用
# compression_type="binary" # Option to use "scalar" or "binary". NOTE: compression is only supported for HNSW
)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
documents, storage_context=storage_context
)
このコードを複数回実行した際にはインデックスを一度削除して同名のインデックスを作成する関数delete_index()が存在するがそこは、必要に応じて消したりしてほしい。
llmの設定はnode_parserを使う場合以外は不要である。
またMS拡張子のファイルのためのパッケージインストールを忘れずに。
経緯
Llamaindexフレームワーク&AzureAIでローカル環境で動くRAGを実装する(リンク)
で作成したRAGでは上手くメタデータを取得することができなかった。
そのためLlamaindexのフレームワーク上でインデクシングを実装すれば、
RAG側でも同一のフレームワークを用いているからメタデータの取得が容易だと考えた。
作業内容
作業は大きく4つに分かれた。
1.blobストレージとの接続およびファイルの取得
2.既存のインデックスとblob内のファイルを比較してどのファイルがまだインデクシングされていないか判断する
3.インデクシング前にファイルをdocumentintelligenceにかける
4.インデクシングをする
インデクシングのコード自体はRAGを作成する際に、どのような形でインスタンスを作成していたかなどがわかっていたので前処理に時間をかけることとなった。
sasを使うことでDIと接続しているが、readerにかけたあとの形がページごとなので、
DIにかけた後の結果を分割することが必要な点には注意が必要である。
所感
良い点
-
実装にはあまり時間がかからなかった
-
可能な限りllamaindexのAzure AI Searchのドキュメントに準ずる形で作成を行ったことでフレームワークのうちで実装ができた
-
responsemodeやretrievermodeは軽くテストしたものの十分な検証ができたとは言えないため、もっとよりよいモードや組み合わせが見つかるかもしれない
良くなかった点
- blobとindexの比較にファイル名を使っていることが名前が変更されるだけで同一のドキュメントを読ませてしまうので難点として上げられる。
参考になった記事
-
Azure AI Search - LlamaIndex(https://docs.llamaindex.ai/en/stable/examples/vector_stores/AzureAISearchIndexDemo/#create-index-if-it-does-not-exist)
-
【Azure Blob Storage】PythonからBlob・コンテナーを操作してみる #ChatGPT - Qiita(https://qiita.com/kkawano_neko/items/ee6d30881fa016b25964)
-
SASを生成してBlobからファイルをダウンロードする(Python) #Azure - Qiita(https://qiita.com/kkawano_neko/items/366f78cdb9c2c623c749)
-
[Azure Blob ストレージ] PythonでSASトークンを生成して、JavaScriptからファイルをアップロードするサンプル(https://zenn.dev/horitaka/articles/azure-blob-upload)
-
【Azure Document intelligence】- markdownで出力をする方法(https://zenn.dev/headwaters/articles/c167bcde92b9b9)
-
などなど