MLflowのリポジトリに興味深いサンプルが。Databricksでウォークスルーします。
MLflowでは、Retrieval-Augmented Generation (RAG)を構成するための高度なフレームワークを提供しています。RAGは収集モデル(ユーザーの質問に基づきドキュメントのチャンクを選択、ランキングするモデル)の強みと生成モデルを組み合わせる最先端のアプローチです。文脈的に適切で一貫性のあるレスポンスを提供するためにテキストを検索、生成する能力を組み合わせることで、既存のドキュメントを参照するテキストを生成することができます。RAGは、コンテキストとなるドキュメントの検索でリトリーバを活用し、この新しいアプローチはさまざまなNLPタスクに革新をもたらしています。
多くの場合、我々はパフォーマンスを比較、判断するために、RAGモデルにおけるこのリトリーバシステムを評価できるようにしたいと考えます。リトリーバシステムを評価するためには、まずドキュメントに対する質問のテストセットが必要となります。これらの質問は多岐に渡り、適切で一貫性があるものではなくてなんりません。手動で質問を生成しようとすると、ドキュメントを理解し、それらに対する質問を考え出すために多くの時間を費やさなくてはならないため、困難なものとなる場合があります。
このテストセットの質問の生成にLLMを活用することで、このプロセスをシンプルなものにしたいと考えています。このチュートリアルでは、質問の生成方法と、質問の多様性と適切性の評価方法をウォークスルーします。
Step 1: パッケージのインストールとロード
コストを削減するためにLLMのレスポンスをキャッシュするいくつかのユーティリティ関数を定義します。次のセルの実装の詳細に関してはスキップしても構いません。
%pip install beautifulsoup4 langchain openai pandas seaborn scikit-learn
import json
import os
# コスト削減のためにLLMレスポンスのキャッシュを作成
import threading
# データ分析と可視化
import matplotlib.pyplot as plt
import numpy as np
import openai
import pandas as pd
# スクレーピング
import requests
import seaborn as sns
from bs4 import BeautifulSoup
from langchain.embeddings import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
class Cache:
def __init__(self, persist_path, cache_loading_fn):
"""
cache_loading_fnは任意のシリアライズ可能な引数を受け取り、シリアライズされた値を返却する関数でなくてはなりません。
value = cache_loading_fn(**kwargs)
例えば、openai.chat.completions.create(...)に対しては、cache_loading_fnは以下のようなものであるべきです:
def cache_loading_fn(**kwargs):
result = openai.chat.completions.create(**kwargs)
return result.to_dict_recursive()
"""
self._cache = self._get_or_create_cache_dict(persist_path)
self._persist_path = persist_path
self._cache_loading_fn = cache_loading_fn
self._cache_lock = threading.Lock()
@classmethod
def _get_or_create_cache_dict(cls, persist_path):
if os.path.exists(persist_path):
# ファイルが存在した場合には、JSON文字列としてdictにロード
with open(persist_path) as f:
cache = json.load(f)
else:
# ファイルがない場合には空のdictを作成
cache = {}
return cache
def _save_to_file(self):
with open(self._persist_path, "w") as file:
json.dump(self._cache, file)
def _update_cache(self, key, value):
with self._cache_lock:
self._cache[key] = value
self._save_to_file()
def get_from_cache_or_load_cache(self, **kwargs):
key = json.dumps(kwargs)
with self._cache_lock:
value = self._cache.get(key, None)
if value is None:
value = self._cache_loading_fn(**kwargs)
self._update_cache(key, value)
else:
print("Loaded from cache")
return value
def chat_completion_create_fn(**kwargs):
#return openai.chat.completions.create(**kwargs)
return openai.ChatCompletion.create(**kwargs)
def cached_openai_ChatCompletion_create(**kwargs):
cache = kwargs.pop("cache")
return cache.get_from_cache_or_load_cache(**kwargs)
def embeddings_embed_documents_fn(**kwargs):
chunk = kwargs.get("chunk")
return embeddings.embed_documents([chunk])
def cached_langchain_openai_embeddings(**kwargs):
cache = kwargs.pop("cache")
return cache.get_from_cache_or_load_cache(**kwargs)
注意
chat_completion_create_fn
では呼び出しメソッドを変更しています。
Step 2: OpenAIキーの設定
質問生成システムは任意のLLMを用いて構成することができます。ここではOpenAIを使うので、APIキーが必要となります。
openai.api_key = dbutils.secrets.get("demo-token-takaaki.yayoi", "openai_api_key")
os.environ["OPENAI_API_KEY"] = openai.api_key
# その他の設定
# 再現可能な結果を得るためにシードを選択
SEED = 2023
# コスト削減のために、LLM呼び出しによるレスポンスを永続化するためのパスを選択
CACHE_PATH = "_cache.json"
EMBEDDINGS_CACHE_PATH = "_embeddings_cache.json"
# スクレーピング処理の際実行を避けるために、スクレーピングしたドキュメントを保存するためのパスを選択
SCRAPPED_DATA_PATH = "mlflow_docs_scraped.csv"
# 生成したデータセットを保存するためのパスを選択
OUTPUT_DF_PATH = "question_answer_source.csv"
ユーティリティ関数
cache = Cache(CACHE_PATH, chat_completion_create_fn)
embeddings_cache = Cache(EMBEDDINGS_CACHE_PATH, embeddings_embed_documents_fn)
Step 3: チャンクサイズの決定
CHUNK_SIZE = 1500
Step 4: ドキュメントデータの準備
MLflowウェブサイトからドキュメントをスクレーピング
page = requests.get("https://mlflow.org/docs/latest/index.html")
soup = BeautifulSoup(page.content, "html.parser")
mainLocation = "https://mlflow.org/docs/latest/"
header = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11",
"Accept-Language": "en-US,en;q=0.8",
"Connection": "keep-alive",
}
data = []
for a_link in soup.find_all("a"):
document_url = mainLocation + a_link["href"]
page = requests.get(document_url, headers=header)
soup = BeautifulSoup(page.content, "html.parser")
file_to_store = a_link.get("href")
if soup.find("div", {"class": "rst-content"}):
data.append(
[
file_to_store,
soup.find("div", {"class": "rst-content"}).text.replace("\n", " "),
]
)
df = pd.DataFrame(data, columns=["source", "text"])
df.to_csv(SCRAPPED_DATA_PATH, index=False)
df = pd.read_csv(SCRAPPED_DATA_PATH)
ドキュメントのサブセットを選択し、チャンクに分割
# デモなので、データセットから5つの人気のMLflowドキュメントページをピックアップします
mask = df["source"].isin(
{
"tracking.html",
"models.html",
"model-registry.html",
"search-runs.html",
"projects.html",
}
)
sub_df = df[mask]
# ドキュメントをチャンクに分割
text_splitter = CharacterTextSplitter(chunk_size=CHUNK_SIZE, separator=" ")
def get_chunks(input_row):
new_rows = []
chunks = text_splitter.split_text(input_row["text"])
for i, chunk in enumerate(chunks):
new_rows.append({"chunk": chunk, "source": input_row["source"], "chunk_index": i})
return new_rows
expanded_df = pd.DataFrame(columns=["chunk", "source", "chunk_index"])
for index, row in sub_df.iterrows():
new_rows = get_chunks(row)
expanded_df = pd.concat([expanded_df, pd.DataFrame(new_rows)], ignore_index=True)
expanded_df.head(3)
# コスト削減のために、それぞれのドキュメントの最初の3つのチャンクをピックアップします
# より多くのチャンクで質問を生成するには、iloc[]の最初と最後のインデックスを変更します
start, end = 0, 3
filtered_df = (
expanded_df.groupby("source").apply(lambda x: x.iloc[start:end]).reset_index(drop=True)
)
filtered_df.head(3)
filtered_df["chunk"][0]
'Documentation MLflow Model Registry MLflow Model Registry The MLflow Model Registry component is a centralized model store, set of APIs, and UI, to collaboratively manage the full lifecycle of an MLflow Model. It provides model lineage (which MLflow experiment and run produced the model), model versioning, model aliasing, model tagging, and annotations. Table of Contents Concepts Model Registry Workflows UI Workflow Register a Model Find Registered Models Deploy and Organize Models API Workflow Adding an MLflow Model to the Model Registry Deploy and Organize Models with Aliases and Tags Fetching an MLflow Model from the Model Registry Serving an MLflow Model from Model Registry Promoting an MLflow Model across environments Adding or Updating an MLflow Model Descriptions Renaming an MLflow Model Listing and Searching MLflow Models Deleting MLflow Models Registering a Model Saved Outside MLflow Registering an Unsupported Machine Learning Model Deprecated: Using Model Stages Migrating from Stages New model deployment tools Migrating models away from stages Concepts The Model Registry introduces a few concepts that describe and facilitate the full lifecycle of an MLflow Model. ModelAn MLflow Model is created from an experiment or run that is logged with one of the model flavor’s mlflow..log_model() methods. Once logged, this model can then be registered with the Model Registry. Registered ModelAn MLflow Model can be registered with the Model Registry. A registered'
Step 5: 質問の生成
以下のプロンプトでは、それぞれ指定されたチャンクに対する質問を生成し、人間による検証を容易にするために質問に対する回答を生成することをLLMに指示しています。また、構造化されたフォーマットで結果を返却するように指示しています。
この例では、質問の生成にOpenAIのgpt-3.5-turboモデルを使用していますが、あなたのユースケースに適したLLMで置き換えることができます。
def get_raw_response(content):
prompt = f"""Please generate a question asking for the key information in the given paragraph.
Also answer the questions using the information in the given paragraph.
Please ask the specific question instead of the general question, like
'What is the key information in the given paragraph?'.
Please generate the answer using as much information as possible.
If you are unable to answer it, please generate the answer as 'I don't know.'
The answer should be informative and should be more than 3 sentences.
Paragraph: {content}
Please call the submit_function function to submit the generated question and answer.
"""
messages = [{"role": "user", "content": prompt}]
submit_function = {
"name": "submit_function",
"description": "Call this function to submit the generated question and answer.",
"parameters": {
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "The question asking for the key information in the given paragraph.",
},
"answer": {
"type": "string",
"description": "The answer to the question using the information in the given paragraph.",
},
},
"required": ["question", "answer"],
},
}
return cached_openai_ChatCompletion_create(
messages=messages,
model="gpt-3.5-turbo",
functions=[submit_function],
function_call="auto",
temperature=0.0,
seed=SEED,
cache=cache,
)
def generate_question_answer(content):
if content is None or len(content) == 0:
return "", "N/A"
response = get_raw_response(content)
try:
func_args = json.loads(response["choices"][0]["message"]["function_call"]["arguments"])
question = func_args["question"]
answer = func_args["answer"]
return question, answer
except Exception as e:
return str(e), "N/A"
queries = []
get_raw_response(filtered_df["chunk"][0])
<OpenAIObject chat.completion id=chatcmpl-9QP4TQ7uyHmXpmyieujnFODCxOlhx at 0x7f38db0f4630> JSON: {
"id": "chatcmpl-9QP4TQ7uyHmXpmyieujnFODCxOlhx",
"object": "chat.completion",
"created": 1716079929,
"model": "gpt-3.5-turbo-0125",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": null,
"function_call": {
"name": "submit_function",
"arguments": "{\"question\":\"What is the purpose of the MLflow Model Registry component?\",\"answer\":\"The MLflow Model Registry component serves as a centralized model store, set of APIs, and UI to collaboratively manage the full lifecycle of an MLflow Model. It provides features such as model lineage, versioning, aliasing, tagging, and annotations. Users can use the Model Registry to manage and organize models, deploy them, and track their evolution across different environments.\"}"
}
},
"logprobs": null,
"finish_reason": "function_call"
}
],
"usage": {
"prompt_tokens": 488,
"completion_tokens": 102,
"total_tokens": 590
},
"system_fingerprint": null
}
# リクエストがレート制限を受ける場合がありますので、既存の結果を失うことなしにこのセルを再実行することができます。
n = len(filtered_df)
for i, row in filtered_df.iterrows():
chunk = row["chunk"]
question, answer = generate_question_answer(chunk)
print(f"{i+1}/{n}: {question}")
queries.append(
{
"question": question,
"answer": answer,
"chunk": chunk,
"chunk_id": row["chunk_index"],
"source": row["source"],
}
)
Loaded from cache
1/15: What is the purpose of the MLflow Model Registry component?
2/15: What is the purpose of Model Registry in MLflow and what are the components of a registered model in Model Registry?
3/15: How can you categorize and label registered models and model versions in MLflow?
4/15: What is the key concept that makes MLflow Models powerful and allows deployment tools to understand the model?
5/15: What are the different flavors defined in the MLmodel file for a model saved using mlflow.sklearn?
6/15: What files are automatically logged for environment recreation when a model is logged in MLflow?
7/15: What is an MLflow Project and how is it used to package data science code?
8/15: What are the key considerations for specifying project environments and entry points in MLflow?
9/15: What are the supported project environments in MLflow and how do they differ from each other?
10/15: How can you search MLflow runs based on metrics, params, tags, dataset information, and run metadata?
11/15: What does the code above do with MLflow runs and dataset information?
12/15: What is the purpose of writing search queries in MLflow?
13/15: What is the purpose of MLflow Tracking and what does it allow you to log and visualize?
14/15: How can you organize your runs into experiments using MLflow Tracking APIs?
15/15: How can you track datasets associated with model training events in MLflow?
LLMが質問の生成に失敗することがあります。エラーが起きていないかどうかを確認するために上のデータを検証することができます。エラーが起きている場合には、エラーレコードを除外します。
result_df = pd.DataFrame(queries)
result_df = result_df[result_df["answer"] != "N/A"]
def add_to_output_df(result_df=pd.DataFrame({})):
"""
この関数はresult_dfのレコードをOUTPUT_DF_PATHに保存されている既存レコードに追加し、
重複する行を除外し、新たなコレクションをOUTPUT_DF_PATHに保存します。
"""
if os.path.exists(OUTPUT_DF_PATH):
all_result_df = pd.read_csv(OUTPUT_DF_PATH)
else:
all_result_df = pd.DataFrame({})
all_result_df = (
pd.concat([all_result_df, result_df], ignore_index=True)
.drop_duplicates()
.sort_values(by=["source", "chunk_id"])
.reset_index(drop=True)
)
all_result_df.to_csv(OUTPUT_DF_PATH, index=False)
return all_result_df
all_result_df = add_to_output_df(result_df)
all_result_df.head(3)
生成した質問の品質解析 (オプション)
さまざまなプロンプトで生成された質問の品質を比較したいのであれば、手動かつまとめて質問の品質を解析することができます。2つの次元で質問を評価したいものとします - 多様性と適切性です。
https://github.com/mlflow/mlflow/blob/master/examples/llms/question_generation/question_answer_source.csv は56の質問で事前に生成されたデータセットです。このセクションをすぐにスタートしたい場合には、このノートブックの残りを実行するために、これをダウンロードし、OUTPUT_DF_PATH
のパスで指定してロードすることができます。
注意
生成された質問の品質を解析するために確立された手法はないので、これは生成された質問がどれだけ多岐に渡り、適切なものであるのかに関する洞察を得ることのできる一つのアプローチと考えてください。
all_result_df = add_to_output_df()
質問の多様性の評価
質問がドキュメントのコンテンツの大部分をカバーして欲しいと考えているので、質問の多様性は重要です。さらに、さまざまな形態の質問でリトリーバを評価できるようになりたいと考えています。難しい質問や簡単な質問を確保したいと考えています。これら全ての分析は困難なので、質問の長さと潜在空間のエンべディングを通じて分析することにしました。
長さ
長さで質問がどれだけ多岐に渡っているのかの感触を得ることができます。ある質問は長く、他のものは理解しやすいものとなるでしょう。また、生成された質問における問題の特定に役立ちます。
# 長さ
questions = all_result_df["question"].to_list()
question_len = pd.DataFrame([len(q) for q in questions], columns=["length"])
question_len.hist(bins=5)
plt.title("Histogram of Question Lengths")
plt.xlabel("Question Length")
plt.ylabel("Frequency")
plt.show()
可視化に加えて、より厳密なパーセンタイルの値を確認したいと思います。
# パーセンタイルの値の計算
p10 = int(question_len["length"].quantile(0.10))
p90 = int(question_len["length"].quantile(0.90))
print("p10-p90 range is", p90 - p10)
p10-p90 range is 43
通常より長い質問がいくつかありますが、問題なさそうです。
[q for q in questions if len(q) > 100]
['What is the purpose of Model Registry in MLflow and what are the components of a registered model in Model Registry?',
'What is the key concept that makes MLflow Models powerful and allows deployment tools to understand the model?',
'How can you search MLflow runs based on metrics, params, tags, dataset information, and run metadata?']
潜在空間
潜在空間のエンべディングには、質問に関するセマンティックな情報が含まれています。これは、2つの質問における意味論的な違いや多様性の評価に活用することができます。このためには、高次元の空間を低次元の空間にマッピングする必要があります。可視化のために、2次元空間にエンべディングをマッピングするために、PCAとTSNEを活用します。
質問がどれだけ多岐に渡っているのかの可視化を行えるように5つのベンチマークのクエリーを追加します。これらの質問の最初の4つは意味論的には類似しており、すべてはMLflowについて質問していますが、最後のものは異なっており、RAGに関して質問しています。
benchmark_questions = [
"What is MLflow?",
"What is MLflow about?",
"What is MLflow Tracking?",
"What is MLflow Evaluation?",
"Why is RAG so popular?",
]
questions_to_embed = questions + benchmark_questions
TSNEの計算処理の複雑性からsklearnで推奨されているように、エンべディングの次元を2次元に削減するためにTNSEを適用する前に、エンべディングの次元を10次元に削減するためにPCAを適用します。
# エンべディングの適用
embeddings = OpenAIEmbeddings()
question_embeddings = embeddings.embed_documents(questions_to_embed)
# 10次元に削減するためにエンべディングにPCAを適用
pca = PCA(n_components=10)
question_embeddings_reduced = pca.fit_transform(question_embeddings)
# 2次元に削減するためにエンべディングにTSNEを適用
tsne = TSNE(n_components=2, random_state=SEED)
lower_dim_embeddings = tsne.fit_transform(question_embeddings_reduced)
質問のセマンティクスを表現する2次元のエンべディングを得られたので、散布図で可視化して、ベンチマーク質問と生成された質問の差異を明らかにします。
labels = np.concatenate(
[
np.full(len(lower_dim_embeddings) - len(benchmark_questions), "generated"),
np.full(len(benchmark_questions), "benchmark"),
]
)
data = pd.DataFrame(
{"x": lower_dim_embeddings[:, 0], "y": lower_dim_embeddings[:, 1], "label": labels}
)
sns.scatterplot(data=data, x="x", y="y", hue="label")
散布図におけるオレンジの点を参照すると、他よりも離れた地点にある点が1つ存在します。これはRAGに関するユニークなベンチマークの質問です。このプロットによって、生成された質問の多様性に対する感触を得ることができます。
ドキュメントの適切性の評価
検討すべき別の重要な軸は、指定しているドキュメントに対して質問がどれだけ適切なものかということです。LLMによって生成された質問が実際に指定されたテキストを参照しているのか、不適切な質問の幻覚を見ているのかを理解したいと考えています。まず、それらのドキュメントのチャンクに対して適切な質問なのかどうかを手動チェックします。そして、定量的に解析できるように適正性の指標を定義します。
ドキュメントの適切性手動チェック
質問がドキュメントに適したものかどうかを手動で定性的にチェックします。
all_result_df.sample(3)
エンべディングのコサイン類似度
チャンクとクエリーのエンべディングは、同じ潜在空間に配置されるので、リトリーバは質問のエンべディングに類似したチャンクのエンべディングを抽出します。このため、リトリーバの適切性はこの潜在空間におけるエンべディングの距離によって定義されます。
コサイン類似度はベクトルの類似性の指標であり、チャンクとクエリーの間のエンべディングの距離を特定するために活用することができます。質問とチャンクが類似していると1に、異なる場合には0に近づく距離の指標です。
適切性を計測するためにコサイン類似度のスコアを直接活用します。
embedded_queries = all_result_df.copy()
embedded_queries["chunk_emb"] = all_result_df["chunk"].apply(
lambda x: np.squeeze(cached_langchain_openai_embeddings(chunk=x, cache=embeddings_cache))
)
embedded_queries["question_emb"] = all_result_df["question"].apply(
lambda x: np.squeeze(cached_langchain_openai_embeddings(chunk=x, cache=embeddings_cache))
)
def cossim(x, y):
return np.dot(x, y) / (np.linalg.norm(x) * np.linalg.norm(y))
embedded_queries["cossim"] = embedded_queries.apply(
lambda row: cossim(row["question_emb"], row["chunk_emb"]), axis=1
)
相対的な適切性でそれぞれの質問をスコアリングしたあとで、生成された質問全体を評価することができます。
scores = embedded_queries["cossim"].to_list()
plt.hist(scores, bins=5)
スコアの低いものがいくつか存在します。確認してみましょう。
#mask = embedded_queries["cossim"] < 0.75
mask = embedded_queries["cossim"] < 0.82
lower_cossim = embedded_queries[mask]
for i, row in lower_cossim.iterrows():
print(f"Question: {i}")
print(row["question"])
print("Chunk:")
print(row["chunk"])
print("cossim:")
print(row["cossim"])
Question: 11
What is the purpose of writing search queries in MLflow?
Chunk:
context=dataset_context[i]) The code above creates 10 MLflow runs with different metrics, params, tags and dataset information. After successful execution, if you return to the MLflow UI in your browser, you should find all of these runs under the experiment “search-run-guide”, as shown by the following screenshot: In real-world production deployments of MLflow, it’s common to have thousands or even hundreds of thousands of runs. In such cases, it’s important to be able to filter and search for runs based on specific criteria. Search Query Example In order to filter your MLflow runs, you will need to write search queries, which are pseudo-SQL conditions expressed in a distinct syntax. To showcase this functionality, let’s look at the below code examples. import mlflow all_runs = mlflow.search_runs(search_all_experiments=True) print(all_runs) Output run_id ... tags.mlflow.user 0 5984a3488161440f92de9847e846b342 ... michael.berk 1 41160f238a5841998dda263794b26067 ... michael.berk 2 babe221a676b4fa4b204f8240f2c4f14 ... michael.berk 3 45eb4f02c5a1461aa6098fa550233be6 ... michael.berk 4 1c7c459486c44b23bb016028aee1f153 ... michael.berk 5 4453f59f1ab04491bb9582d8cba5f437 ... michael.berk 6 22db81f070f6413588641c8c343cdd72 ... michael.berk 7 c3680e37d0fa44eb9c9fb7828f6b5481 ... michael.berk 8 67973142b9c0470d8d764ada07c5a988 ... michael.berk 9 59853d5f17f946218f63de1dc82de07b ... michael.berk [10 rows x 19 columns] Second, let’s try filtering the runs for our really bad models:
cossim:
0.8165016632559196
これらの適切性の低い質問を手動で調査することで、いくつかのチャンクには情報が少なく、あるいは主にコードで構成されていることがわかり、生成された質問があまり役に立っていないということになります。必要に応じてこれらを除外します。
こちらに続きます。