RAG-Evaluation-Dataset-JAを取得
git clone https://huggingface.co/datasets/allganize/RAG-Evaluation-Dataset-JA
ドキュメントをダウンロード
downloads.py
# %%
#!pip install pandas
#!pip install requests
#!mkdir data
# %%
import pandas as pd
df = pd.read_csv("RAG-Evaluation-Dataset-JA/documents.csv")
df
# %%
import os
import requests
for index, row in df.iterrows():
url = row['url']
file_name = row['file_name']
print(url)
file = f"data/{file_name}"
if os.path.exists(file):
continue
try:
buffer = requests.get(url=url, timeout=10)
with open(file=file, mode="wb") as f:
f.write(buffer.content)
except Exception as e:
print(f"{e}")
# %%
ドキュメントをMilvusに登録
milvus.py
# %%
#!pip install langchain_huggingface
#!pip install langchain
#!pip install langchain_milvus
#!pip install langchain_community
#!pip install pypdf
# %%
import glob
file_paths = glob.glob(pathname="data/*.pdf")
# %%
from langchain_community.document_loaders import PyPDFLoader
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_milvus import Milvus
# %%
embedding = HuggingFaceEmbeddings(model_name="intfloat/multilingual-e5-large")
uri = "RAG-Evaluation-Dataset-JA.db"
connection_args = {"uri": uri}
vector_store = Milvus(embedding_function=embedding, connection_args=connection_args, drop_old=True, auto_id=True)
# %%
for file_path in file_paths:
print(file_path)
try:
loader = PyPDFLoader(file_path=file_path)
documents = loader.load_and_split()
vector_store.add_documents(documents=documents)
print(f"{file_path}:")
except Exception as e:
print(f"{file_path}: {e}")
continue
documents
# %%
Azure OpenAI/GPT-35-TubroでRAGの構築
templates.py
# %%
template = """# 指示:
与えられた文脈にもとづいて質問に回答してください。
# 文脈:
{context}
# 質問:
{question}
# 回答:
"""
rag.py
# %%
#!pip install azure.identity
#!pip install langchain_openai
# %%
from dotenv import load_dotenv
load_dotenv(override=True)
# %%
import os
AZURE_TENANT_ID = os.environ.get("AZURE_TENANT_ID")
AZURE_CLIENT_ID = os.environ.get("AZURE_CLIENT_ID")
AZURE_CLIENT_SECRET = os.environ.get("AZURE_CLIENT_SECRET")
AZURE_OPENAI_ENDPOINT = os.environ.get('AZURE_OPENAI_ENDPOINT')
AZURE_OPENAI_DEPLOYMENT_NAME = os.environ.get("AZURE_OPENAI_DEPLOYMENT_NAME")
API_VERSION = os.environ.get("API_VERSION")
# %%
from langchain_huggingface import HuggingFaceEmbeddings
embedding = HuggingFaceEmbeddings(model_name="intfloat/multilingual-e5-large")
# %%
from langchain_milvus import Milvus
URI = "RAG-Evaluation-Dataset-JA.db"
connection_args = {"uri": URI}
vector_store = Milvus(embedding_function=embedding, connection_args=connection_args, auto_id=True)
# %%
from azure.identity import ClientSecretCredential, get_bearer_token_provider
credential = ClientSecretCredential(
tenant_id=AZURE_TENANT_ID,
client_id=AZURE_CLIENT_ID,
client_secret=AZURE_CLIENT_SECRET
)
# %%
scopes = "https://cognitiveservices.azure.com/.default"
azure_ad_token_provider = get_bearer_token_provider(credential, scopes)
# %%
from langchain_openai import AzureChatOpenAI
temperature = 0
max_tokens = 4096
llm = AzureChatOpenAI(
azure_endpoint=AZURE_OPENAI_ENDPOINT,
api_version=API_VERSION,
azure_deployment=AZURE_OPENAI_DEPLOYMENT_NAME,
azure_ad_token_provider=azure_ad_token_provider,
temperature=temperature,
max_tokens=max_tokens,
)
# %%
from templates import template
from langchain_core.prompts import PromptTemplate
prompt_template = PromptTemplate.from_template(template=template)
回答の生成
rag_generated_text.py
# %%
from rag import vector_store, prompt_template, llm
# %%
import pandas as pd
filepath = "RAG-Evaluation-Dataset-JA/rag_evaluation_result.csv"
df = pd.read_csv(filepath_or_buffer=filepath).iloc[:20]
df
# %%
from unicodedata import normalize
generated_text = []
for index, row in df.iterrows():
print(index)
question = row.question
reference_text = row.target_answer
context = ""
try:
results = vector_store.similarity_search(query=question)
# print(results)
for index, result in enumerate(results):
context += f"# {index+1}:"
context += "\n"
context += normalize("NFKD", result.page_content).replace("\n", "")
context += "\n"
response = llm.invoke(input=prompt_template.format(context=context, question=question))
generated_text.append(response.content)
except Exception as e:
print(f"{e}")
generated_text.append("")
continue
# %%
df['generated_text'] = generated_text
df
# %%
df['context'] = context
df
detached prompt templateの作成
create_detached_prompte_template.py
# %%
#!pip install ibm_aigov_facts_client
#!pip install tabulate
# %%
from dotenv import load_dotenv
load_dotenv(override=True)
# %%
import os
CPD_URL = os.environ.get("CPD_URL")
CPD_USERNAME = os.environ.get("CPD_USERNAME")
CPD_API_KEY = os.environ.get("CPD_API_KEY")
PROJECT_ID = os.environ.get("PROJECT_ID")
# %%
from ibm_aigov_facts_client import AIGovFactsClient
from ibm_aigov_facts_client import CloudPakforDataConfig
creds=CloudPakforDataConfig(
service_url=CPD_URL,
username=CPD_USERNAME,
api_key=CPD_API_KEY
)
facts_client = AIGovFactsClient(
cloud_pak_for_data_configs=creds,
container_id=PROJECT_ID,
container_type="project",
disable_tracing=True
)
# %%
from ibm_aigov_facts_client import DetachedPromptTemplate, PromptTemplate
AZURE_OPENAI_DEPLOYMENT_NAME = os.environ.get("AZURE_OPENAI_DEPLOYMENT_NAME")
AZURE_OPENAI_ENDPOINT = os.environ.get("AZURE_OPENAI_ENDPOINT")
detached_information = DetachedPromptTemplate(
prompt_id="rag-azure-openai-gpt-35-turbo",
model_id=AZURE_OPENAI_DEPLOYMENT_NAME,
model_provider="azure-ml-openai",
model_name=AZURE_OPENAI_DEPLOYMENT_NAME,
model_url=AZURE_OPENAI_ENDPOINT,
prompt_url="prompt_url",
prompt_additional_info={"model_owner": "Microsoft"}
)
# %%
from templates import template_rag
prompt_template = PromptTemplate(
input=template_rag,
prompt_variables={"context": "", "question": ""},
input_prefix="",
output_prefix="",
)
# %%
response = facts_client.assets.create_detached_prompt(
name="rag-azure-openai-gpt-35-turbo",
model_id=AZURE_OPENAI_DEPLOYMENT_NAME,
task_id="retrieval_augmented_generation",
detached_information=detached_information,
description="RAG based on Azure OpenAI GPT-3.5-turbo",
prompt_details=prompt_template)
# %%
response
# %%
detached prompt templateの設定
execute_detached_prompte_template.py
# %%
#!pip install ibm_watson_openscale
#!pip install "pandas<=2.1.9"
# %%
from dotenv import load_dotenv
load_dotenv(override=True)
# %%
import os
CPD_URL = os.environ.get("CPD_URL")
CPD_USERNAME = os.environ.get("CPD_USERNAME")
CPD_PASSWORD = os.environ.get("CPD_PASSWORD")
CPD_API_KEY = os.environ.get("CPD_API_KEY")
PROJECT_ID = os.environ.get("PROJECT_ID")
PROMPT_TEMPLATE_ASSET_ID = os.environ.get("PROMPT_TEMPLATE_ASSET_ID")
# %%
from ibm_cloud_sdk_core.authenticators import CloudPakForDataAuthenticator
from ibm_watson_openscale import *
from ibm_watson_openscale.supporting_classes.enums import *
from ibm_watson_openscale.supporting_classes import *
authenticator = CloudPakForDataAuthenticator(
url=CPD_URL,
username=CPD_USERNAME,
password=CPD_PASSWORD,
disable_ssl_verification=True
)
wos_client = APIClient(
service_url=CPD_URL,
authenticator=authenticator,
)
data_mart_id = wos_client.service_instance_id
print(data_mart_id)
print(wos_client.version)
# %%
from ibm_watson_openscale.base_classes import ApiRequestFailure
try:
wos_client.wos.add_instance_mapping(
service_instance_id=data_mart_id,
project_id=PROJECT_ID
)
except ApiRequestFailure as arf:
if arf.response.status_code == 409:
# Instance mapping already exists
pass
else:
raise arf
# %%
wos_client.data_marts.show()
# %%
from ibm_metrics_plugin.metrics.llm.utils.constants import LLMTextMetricGroup, LLMCommonMetrics, LLMRAGMetrics, RetrievalQualityMetric
supporting_monitors = {
"generative_ai_quality": {
"parameters": {
"min_sample_size": 10,
"metrics_configuration": {
}
}
}
}
import json
print(json.dumps(supporting_monitors, indent=2, ensure_ascii=False))
# %%
language_code = "ja"
response = wos_client.wos.execute_prompt_setup(
prompt_template_asset_id=PROMPT_TEMPLATE_ASSET_ID,
label_column="reference_text",
operational_space_id="development",
problem_type="retrieval_augmented_generation",
input_data_type="unstructured_text",
data_input_locale=[language_code],
generated_output_locale=[language_code],
project_id=PROJECT_ID,
context_fields=["context"],
question_field="question",
supporting_monitors=supporting_monitors,
background_mode=True)
response.result.to_dict()
# %%
response = wos_client.wos.get_prompt_setup(
prompt_template_asset_id=PROMPT_TEMPLATE_ASSET_ID,
project_id=PROJECT_ID)
response.result.to_dict()
# %%
評価
evaluate_risk.py
# %%
from dotenv import load_dotenv
load_dotenv(override=True)
# %%
import os
CPD_URL = os.environ.get("CPD_URL")
CPD_USERNAME = os.environ.get("CPD_USERNAME")
CPD_PASSWORD = os.environ.get("CPD_PASSWORD")
CPD_API_KEY = os.environ.get("CPD_API_KEY")
PROJECT_ID = os.environ.get("PROJECT_ID")
# %%
from ibm_cloud_sdk_core.authenticators import CloudPakForDataAuthenticator
from ibm_watson_openscale import *
from ibm_watson_openscale.supporting_classes.enums import *
from ibm_watson_openscale.supporting_classes import *
authenticator = CloudPakForDataAuthenticator(
url=CPD_URL,
username=CPD_USERNAME,
password=CPD_PASSWORD,
disable_ssl_verification=True
)
wos_client = APIClient(
service_url=CPD_URL,
authenticator=authenticator,
)
data_mart_id = wos_client.service_instance_id
print(data_mart_id)
print(wos_client.version)
# %%
SUBSCRIPTION_ID = os.environ.get("SUBSCRIPTION_ID")
SUBSCRIPTION_ID
# %%
wos_client.monitor_instances.show(target_target_id=SUBSCRIPTION_ID)
# %%
monitor_definition_id = "mrm"
target_target_id = SUBSCRIPTION_ID
response = wos_client.monitor_instances.list(
data_mart_id=data_mart_id,
monitor_definition_id=monitor_definition_id,
target_target_id=target_target_id,
project_id=PROJECT_ID)
response.result.to_dict()
# %%
mrm_monitor_instance_id = response.result.to_dict()["monitor_instances"][0]["metadata"]["id"]
mrm_monitor_instance_id
# %%
import pandas as pd
df = pd.read_csv("rag_generated_text.csv")
df
# %%
df.rename(columns={
"question": "question",
"target_answer": "reference_text",
"context": "context",
"generated_text": "generated_text"
}, inplace=True)
df
# %%
import csv
df[["context", "question", "generated_text", "reference_text"]]
# %%
df[["context", "question", "generated_text", "reference_text"]].to_csv("test.csv", index=False, quoting=csv.QUOTE_ALL)
# %%
test_data_path = "test.csv"
test_data_set_name = "data"
content_type = "multipart/form-data"
response = wos_client.monitor_instances.mrm.evaluate_risk(
monitor_instance_id=mrm_monitor_instance_id,
test_data_set_name=test_data_set_name,
test_data_path=test_data_path,
content_type=content_type,
body=None,
project_id=PROJECT_ID,
includes_model_output=True,
background_mode=True
)
response.result.to_dict()
# %%
response = wos_client.monitor_instances.mrm.get_risk_evaluation(
monitor_instance_id=mrm_monitor_instance_id,
project_id=PROJECT_ID
)
response.result.to_dict()
# %%
monitor_definition_id = "generative_ai_quality"
response = wos_client.monitor_instances.list(
data_mart_id=data_mart_id,
monitor_definition_id=monitor_definition_id,
target_target_id=target_target_id,
project_id=PROJECT_ID
)
response.result.to_dict()
# %%
gaiq_monitor_instance_id = response.result.to_dict()["monitor_instances"][0]["metadata"]["id"]
gaiq_monitor_instance_id
# %%
wos_client.monitor_instances.show_metrics(
monitor_instance_id=gaiq_monitor_instance_id,
project_id=PROJECT_ID
)
# %%
response = wos_client.data_sets.list(
target_target_id=SUBSCRIPTION_ID,
target_target_type="subscription",
type="gen_ai_quality_metrics"
)
response.result.to_dict()
# %%
gaiq_data_set_id = response.result.to_dict()["data_sets"][0]["metadata"]["id"]
gaiq_data_set_id
# %%
wos_client.data_sets.show_records(data_set_id = gaiq_data_set_id)
# %%
Detached prompt templateを本番環境へプロモート
プロモートしたDetached prompte templateを設定
execute_detached_prompte_template_production.py
# %%
from dotenv import load_dotenv
load_dotenv(override=True)
# %%
import os
CPD_URL = os.environ.get("CPD_URL")
CPD_USERNAME = os.environ.get("CPD_USERNAME")
CPD_PASSWORD = os.environ.get("CPD_PASSWORD")
CPD_API_KEY = os.environ.get("CPD_API_KEY")
PROJECT_ID = os.environ.get("PROJECT_ID")
SPACE_ID = os.environ.get("SPACE_ID")
PROMPTE_TEMPLATE_ASSET_ID = os.environ.get("PROMPT_TEMPLATE_ASSET_ID_PRODUCTION")
DEPLOYMENT_ID = os.environ.get("DEPLOYMENT_ID")
# %%
from ibm_cloud_sdk_core.authenticators import CloudPakForDataAuthenticator
from ibm_watson_openscale import *
from ibm_watson_openscale.supporting_classes.enums import *
from ibm_watson_openscale.supporting_classes import *
authenticator = CloudPakForDataAuthenticator(
url=CPD_URL,
username=CPD_USERNAME,
password=CPD_PASSWORD,
disable_ssl_verification=True
)
wos_client = APIClient(
service_url=CPD_URL,
authenticator=authenticator,
)
data_mart_id = wos_client.service_instance_id
print(data_mart_id)
print(wos_client.version)
# %%
from ibm_watson_openscale.base_classes import ApiRequestFailure
try:
wos_client.wos.add_instance_mapping(
service_instance_id=data_mart_id,
space_id=SPACE_ID # <- ここが大切!
)
except ApiRequestFailure as arf:
if arf.response.status_code == 409:
# Instance mapping already exists
pass
else:
raise arf
# %%
supporting_monitors = {
"generative_ai_quality": {
"parameters": {
"min_sample_size": 10,
"metrics_configuration": {
}
}
}
}
# %%
language_code = "ja"
response = wos_client.wos.execute_prompt_setup(
prompt_template_asset_id=PROMPTE_TEMPLATE_ASSET_ID,
space_id=SPACE_ID,
deployment_id=DEPLOYMENT_ID,
label_column="reference_text",
operational_space_id="production",
problem_type="retrieval_augmented_generation",
input_data_type="unstructured_text",
data_input_locale=[language_code],
generated_output_locale=[language_code],
context_fields=["context"],
question_field="question",
supporting_monitors=supporting_monitors,
background_mode=True)
response.result.to_dict()
# %%
response = wos_client.monitor_instances.mrm.get_prompt_setup(
prompt_template_asset_id=PROMPTE_TEMPLATE_ASSET_ID,
deployment_id=DEPLOYMENT_ID,
space_id=SPACE_ID)
response.result.to_dict()
# %%
watsonx.governanceへのログ送付機能の実装
pyload.py
# %%
from dotenv import load_dotenv
load_dotenv(override=True)
# %%
import os
CPD_URL = os.environ.get("CPD_URL")
CPD_USERNAME = os.environ.get("CPD_USERNAME")
CPD_PASSWORD = os.environ.get("CPD_PASSWORD")
CPD_API_KEY = os.environ.get("CPD_API_KEY")
PROJECT_ID = os.environ.get("PROJECT_ID")
SPACE_ID = os.environ.get("SPACE_ID")
SUBSCRIPTION_ID = os.environ.get("SUBSCRIPTION_ID_PRODUCTION")
# %%
from ibm_cloud_sdk_core.authenticators import CloudPakForDataAuthenticator
from ibm_watson_openscale import *
from ibm_watson_openscale.supporting_classes.enums import *
from ibm_watson_openscale.supporting_classes import *
authenticator = CloudPakForDataAuthenticator(
url=CPD_URL,
username=CPD_USERNAME,
password=CPD_PASSWORD,
disable_ssl_verification=True
)
wos_client = APIClient(
service_url=CPD_URL,
authenticator=authenticator,
)
data_mart_id = wos_client.service_instance_id
data_mart_id
# %%
wos_client.version
# %%
from ibm_watson_openscale.supporting_classes.enums import *
data_set_id = None
response = wos_client.data_sets.list(
type=DataSetTypes.PAYLOAD_LOGGING,
target_target_id=SUBSCRIPTION_ID,
target_target_type=TargetTypes.SUBSCRIPTION)
response.result.to_dict()
# %%
data_set_id = response.result.to_dict()['data_sets'][1]['metadata']['id']
data_set_id
# %%
#import json
#import pandas as pd
#with open(file="hap_low_pii_low.json", mode="r", encoding="utf-8") as f:
# data = json.load(f)
#dataset = pd.json_normalize(data=data)
#dataset
#request_body = []
#for index, row in dataset.iterrows():
# print(index)
# question = row.question
# context = row.context
# generated_text = row.generated_text
# tmp = {
# "request": {
# "parameters": {
# "template_variables": {
# "context": context,
# "question": question
# }
# }
# },
# "response": {
# "results": [
# {
# "generated_text": generated_text
# }
# ]
# }
# }
# request_body.append(tmp)
# %%
#print(json.dumps(request_body, indent=2, ensure_ascii=False))
# %%
#response = wos_client.data_sets.store_records(
# data_set_id=data_set_id,
# request_body=request_body,
# background_mode=True)
#response.result.to_dict()
# %%
#wos_client.data_sets.get_records_count(data_set_id=data_set_id)
# %%
RAGの構築
app.py
# %%
#!pip install gradio
# %%
import gradio as gr
from rag import vector_store, prompt_template, llm
from unicodedata import normalize
from payload import wos_client, data_set_id
# %%
import pandas as pd
df = pd.read_csv("RAG-Evaluation-Dataset-JA/rag_evaluation_result.csv")
df
# %%
import os
existed_files = []
files = df['target_file_name'].unique()
for file in files:
path = f"data/{file}"
if os.path.exists(path=path):
existed_files.append(file)
# %%
df = df[df["target_file_name"].isin(existed_files)].copy()
# %%
def rag(question):
context = ""
try:
results = vector_store.similarity_search(query=question)
for index, result in enumerate(results):
source = result.metadata['source']
page = result.metadata['page'] + 1
context += f"# {index+1}/{source}/{page}:"
context += "\n"
context += normalize("NFKD", result.page_content).replace("\n", "")
context += "\n"
response = llm.invoke(input=prompt_template.format(context=context, question=question))
generated_text = response.content
request_body = [{
"request": {
"parameters": {
"template_variables": {
"context": context,
"question": question
}
}
},
"response": {
"results": [
{
"generated_text": generated_text
}
]
}
}]
response = wos_client.data_sets.store_records(
data_set_id=data_set_id,
request_body=request_body,
background_mode=True)
response.result.to_dict()
except Exception as e:
generated_text = ""
context = ""
return generated_text, context
return generated_text, context
# %%
def get_reference_text(selected_question):
reference_text = df[df["question"] == selected_question]["target_answer"].iloc[0]
target_file_name = df[df["question"] == selected_question]["target_file_name"].iloc[0]
target_page_no = df[df["question"] == selected_question]["target_page_no"].iloc[0]
return reference_text, target_file_name, target_page_no
# %%
with gr.Blocks() as app:
question = gr.Dropdown(choices=df['question'].tolist(), label="question")
reference_text = gr.Textbox(label="reference_text", value=df.iloc[0]["target_answer"])
target_file_name = gr.Textbox(label="target_file_name", value=df.iloc[0]["target_file_name"])
target_page_no = gr.Textbox(label="target_page_no", value=df.iloc[0]["target_page_no"])
button = gr.Button()
generated_text = gr.Textbox(label="generated_text")
context = gr.Textbox(label="context")
question.change(fn=get_reference_text, inputs=[question], outputs=[reference_text, target_file_name, target_page_no])
button.click(fn=rag, inputs=[question], outputs=[generated_text, context])
question.change(fn=rag, inputs=[question], outputs=[generated_text, context])
# %%
app.launch(server_name="0.0.0.0")
#app.launch(server_name="0.0.0.0", server_port=8080)
Webアプリケーションの実行
python app.py