はじめに
表題の通りのことをしたかったのですが、1日探し回って、GPTに質問したりしましたが、結局自分がほしいものはなく、自分で実装したので(GPTの助け借りてですが)、メモ程度ですが共有しておきます。
(写真は、Google ImageFXで、「cute cat who avoid RAG (retrieval augmented generation)'s embedding rate limit in langchain by retry and wait cleverly. express avoiding state comically.」というプロンプトで作りました。)
目的
langchainで多数の大きなPDFファイルをベクターストアにアップロードしたい。
このようなことはほぼいつも起こるニーズだと思います。
もちろん自分でuploaderを実装しないで、Azure OpenAI StudioとかのGUIでやれば、内部でよしなにやってくれるので、表題のような問題は起きないのですが、自分で実装するとこのような問題に遭遇します。
ちなみにrate limitはToken Per Minit (TPM)で表され、私が使っているAzureOpenAIではrate limitはこちらのようになっています。なので、rate limitに引っかかっても、1分待てばまた次の処理ができます。そのときに、これまでの続きから行うというのがポイントです。
先行事例
単純にrate limitを上げてやることはまず考えられます。例えば私が使っているAzureOpenAIではこちらに従ってrate limitを変えれそうです。しかし、大きなファイルを複数uploadすれば、やがて引き上げたrate limitに到達してしまうでしょう。
こちらの方は、embeddingを自分で行い、その度に60秒待っています。
しかし、langchainのtutorialなどで紹介されているvectorstoreでは、documentかchunkを与えれば、embeddingはvectorstoreが自動で行うので、それを分解するのは面倒だな、という思いでやめました。しかし、time.sleepで待つところは参考にさせていただきました。
また、こちらの方は1ページずつvectorestoreに突っ込んで、rate limitでxceptionを受け取ったら60秒待つ、という手法をとっています。
しかし、記事の中ではloader.load_and_split()を使っていますが、こちらもlangchainのtutorialではloader.load()なので、splitまで一緒にやるのはちょっと気持ち悪いし、どうせchunkに区切るのに、1ページを単位にして、さらにchunkに区切って処理する必要もないかな、と思い、採用を見送りました。しかし、exceptionを受け取ってリトライする方法は参考にさせていただきました。
rate limitを回避する方法の考え
splitしたchunkでループし、vectorstoreにchunkを突っ込んでいきます。その際、rate limitに引っかかったらexceptionを受け取ってリトライします。そして、だんだんリトライ回数を増やしていき、最後には60秒待つようにします。
リトライで待つ秒数の設定は2のn乗で増やしていきます。
はじめのリトライを1秒とすると、5回のリトライで合計62秒になる(下記参照)ので、リトライ回数は5回でいいです。
maxretry=5
totalwait=0
for i in range(1,maxretry+1):
totalwait += 2**i
print(f"totalwait={totalwait}")
totalwait=62
rate limitを回避する方法の実装
ズバリ、こちらの関数でいいでしょう。
importは適切にして、何かのクラスのメンバー関数として使えます。
def _create_vectorstore(self):
"""
Create vector store.
This function can avoid rate limit error
"""
#create empty vectorstore
self.embedding = AzureOpenAIEmbeddings(
model="text-embedding-3-large",
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
openai_api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
)
self.vectorstore = InMemoryVectorStore(self.embedding)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
)
chunks = text_splitter.split_documents(self.docs)
for chunknum, chunk in enumerate(chunks):
print(f"Processing chunk {chunknum} of {len(chunks)}")
max_retry = 5 # by setting 5, finally it will wait for 60 seconds.
retry_delay = 1 #[sec]. need to refresh every chunk loop.
for attempt in range(max_retry):
try:
self.vectorstore.add_documents([chunk])
except Exception as e:
if attempt < max_retry:
print(f"Error embedding chunk. will wait {retry_delay} seconds: {e}")
time.sleep(retry_delay)
retry_delay *= 2
else:
raise e
self.vectorstore.save_local(self.config.get("vectorstore_path"))
終わりに
世の中意外と基本的なことが書かれてなかったりするもんなのですね。GPTも質問と回答が無限ループになったりするし。まあ私の検索の仕方が足りないだけかもしれませんが。
この記事がさくっと検索されて誰かのお役に立てれば幸いです。
アヘアへ、アディオス!