0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

RAG-Evaluation-Dataset-JA+Milvus+Azure OpenAI+watsonx.governanceでRAGを構築と評価&監視@オンプレ

Last updated at Posted at 2025-04-14

RAG-Evaluation-Dataset-JAを取得

git clone https://huggingface.co/datasets/allganize/RAG-Evaluation-Dataset-JA

ドキュメントをダウンロード

downloads.py
# %%
#!pip install pandas
#!pip install requests
#!mkdir data

# %%
import pandas as pd

df = pd.read_csv("RAG-Evaluation-Dataset-JA/documents.csv")
df

# %%
import os
import requests

for index, row in df.iterrows():
    url = row['url']
    file_name = row['file_name']
    print(url)
    file = f"data/{file_name}"
    if os.path.exists(file):
        continue

    try:
        buffer = requests.get(url=url, timeout=10)
        with open(file=file, mode="wb") as f:
            f.write(buffer.content)
    except Exception as e:
        print(f"{e}")

# %%

ドキュメントをMilvusに登録

milvus.py
# %%
#!pip install langchain_huggingface
#!pip install langchain
#!pip install langchain_milvus
#!pip install langchain_community
#!pip install pypdf

# %%
import glob

file_paths = glob.glob(pathname="data/*.pdf")

# %%
from langchain_community.document_loaders import PyPDFLoader
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_milvus import Milvus

# %%
embedding = HuggingFaceEmbeddings(model_name="intfloat/multilingual-e5-large")
uri = "RAG-Evaluation-Dataset-JA.db"
connection_args = {"uri": uri}
vector_store = Milvus(embedding_function=embedding, connection_args=connection_args, drop_old=True, auto_id=True)

# %%
for file_path in file_paths:
    print(file_path)
    try:
        loader = PyPDFLoader(file_path=file_path)
        documents = loader.load_and_split()
        vector_store.add_documents(documents=documents)
        print(f"{file_path}:")
    except Exception as e:
        print(f"{file_path}: {e}")
        continue

documents
# %%

Azure OpenAI/GPT-35-TubroでRAGの構築

templates.py
# %%
template = """# 指示:
与えられた文脈にもとづいて質問に回答してください。

# 文脈:
{context}

# 質問:
{question}

# 回答:
"""
rag.py
# %%
#!pip install azure.identity
#!pip install langchain_openai

# %%
from dotenv import load_dotenv

load_dotenv(override=True)

# %%
import os

AZURE_TENANT_ID = os.environ.get("AZURE_TENANT_ID")
AZURE_CLIENT_ID = os.environ.get("AZURE_CLIENT_ID")
AZURE_CLIENT_SECRET = os.environ.get("AZURE_CLIENT_SECRET")
AZURE_OPENAI_ENDPOINT = os.environ.get('AZURE_OPENAI_ENDPOINT')
AZURE_OPENAI_DEPLOYMENT_NAME = os.environ.get("AZURE_OPENAI_DEPLOYMENT_NAME")
API_VERSION = os.environ.get("API_VERSION")

# %%
from langchain_huggingface import HuggingFaceEmbeddings

embedding = HuggingFaceEmbeddings(model_name="intfloat/multilingual-e5-large")

# %%
from langchain_milvus import Milvus

URI = "RAG-Evaluation-Dataset-JA.db"
connection_args = {"uri": URI}
vector_store = Milvus(embedding_function=embedding, connection_args=connection_args, auto_id=True)

# %%
from azure.identity import ClientSecretCredential, get_bearer_token_provider

credential = ClientSecretCredential(
    tenant_id=AZURE_TENANT_ID,
    client_id=AZURE_CLIENT_ID,
    client_secret=AZURE_CLIENT_SECRET
)

# %%
scopes = "https://cognitiveservices.azure.com/.default"
azure_ad_token_provider = get_bearer_token_provider(credential, scopes)

# %%
from langchain_openai import AzureChatOpenAI

temperature = 0
max_tokens = 4096
llm = AzureChatOpenAI(
    azure_endpoint=AZURE_OPENAI_ENDPOINT,
    api_version=API_VERSION,
    azure_deployment=AZURE_OPENAI_DEPLOYMENT_NAME,
    azure_ad_token_provider=azure_ad_token_provider,
    temperature=temperature,
    max_tokens=max_tokens,
)

# %%
from templates import template
from langchain_core.prompts import PromptTemplate

prompt_template = PromptTemplate.from_template(template=template)

回答の生成

rag_generated_text.py
# %%
from rag import vector_store, prompt_template, llm

# %%
import pandas as pd

filepath = "RAG-Evaluation-Dataset-JA/rag_evaluation_result.csv"
df = pd.read_csv(filepath_or_buffer=filepath).iloc[:20]
df

# %%
from unicodedata import normalize

generated_text = []
for index, row in df.iterrows():
    print(index)
    question = row.question
    reference_text = row.target_answer

    context = ""
    try:
        results = vector_store.similarity_search(query=question)
#        print(results)
        for index, result in enumerate(results):
            context += f"# {index+1}:"
            context += "\n"
            context += normalize("NFKD", result.page_content).replace("\n", "")
            context += "\n"
        response = llm.invoke(input=prompt_template.format(context=context, question=question))
        generated_text.append(response.content)
    except Exception as e:
        print(f"{e}")
        generated_text.append("")
        continue

# %%
df['generated_text'] = generated_text
df

# %%
df['context'] = context
df

detached prompt templateの作成

create_detached_prompte_template.py
# %%
#!pip install ibm_aigov_facts_client
#!pip install tabulate

# %%
from dotenv import load_dotenv

load_dotenv(override=True)

# %%
import os

CPD_URL = os.environ.get("CPD_URL")
CPD_USERNAME = os.environ.get("CPD_USERNAME")
CPD_API_KEY = os.environ.get("CPD_API_KEY")
PROJECT_ID = os.environ.get("PROJECT_ID")

# %%
from ibm_aigov_facts_client import AIGovFactsClient
from ibm_aigov_facts_client import CloudPakforDataConfig

creds=CloudPakforDataConfig(
    service_url=CPD_URL,
    username=CPD_USERNAME,
    api_key=CPD_API_KEY
)

facts_client = AIGovFactsClient(
    cloud_pak_for_data_configs=creds,
    container_id=PROJECT_ID,
    container_type="project",
    disable_tracing=True
)

# %%
from ibm_aigov_facts_client import DetachedPromptTemplate, PromptTemplate

AZURE_OPENAI_DEPLOYMENT_NAME = os.environ.get("AZURE_OPENAI_DEPLOYMENT_NAME")
AZURE_OPENAI_ENDPOINT = os.environ.get("AZURE_OPENAI_ENDPOINT")

detached_information = DetachedPromptTemplate(
    prompt_id="rag-azure-openai-gpt-35-turbo",
    model_id=AZURE_OPENAI_DEPLOYMENT_NAME,
    model_provider="azure-ml-openai",
    model_name=AZURE_OPENAI_DEPLOYMENT_NAME,
    model_url=AZURE_OPENAI_ENDPOINT,
    prompt_url="prompt_url",
    prompt_additional_info={"model_owner": "Microsoft"}
)

# %%
from templates import template_rag

prompt_template = PromptTemplate(
    input=template_rag,
    prompt_variables={"context": "", "question": ""},
    input_prefix="",
    output_prefix="",
)

# %%
response = facts_client.assets.create_detached_prompt(
    name="rag-azure-openai-gpt-35-turbo",
    model_id=AZURE_OPENAI_DEPLOYMENT_NAME,
    task_id="retrieval_augmented_generation",
    detached_information=detached_information,
    description="RAG based on Azure OpenAI GPT-3.5-turbo",
    prompt_details=prompt_template)

# %%
response

# %%

image.png
image.png
image.png

detached prompt templateの設定

execute_detached_prompte_template.py
# %%
#!pip install ibm_watson_openscale
#!pip install "pandas<=2.1.9"

# %%
from dotenv import load_dotenv

load_dotenv(override=True)

# %%
import os

CPD_URL = os.environ.get("CPD_URL")
CPD_USERNAME = os.environ.get("CPD_USERNAME")
CPD_PASSWORD = os.environ.get("CPD_PASSWORD")
CPD_API_KEY = os.environ.get("CPD_API_KEY")

PROJECT_ID = os.environ.get("PROJECT_ID")
PROMPT_TEMPLATE_ASSET_ID = os.environ.get("PROMPT_TEMPLATE_ASSET_ID")

# %%
from ibm_cloud_sdk_core.authenticators import CloudPakForDataAuthenticator
from ibm_watson_openscale import *
from ibm_watson_openscale.supporting_classes.enums import *
from ibm_watson_openscale.supporting_classes import *

authenticator = CloudPakForDataAuthenticator(
    url=CPD_URL,
    username=CPD_USERNAME,
    password=CPD_PASSWORD,
    disable_ssl_verification=True
)

wos_client = APIClient(
    service_url=CPD_URL,
    authenticator=authenticator,
)
data_mart_id = wos_client.service_instance_id
print(data_mart_id)
print(wos_client.version)

# %%
from ibm_watson_openscale.base_classes import ApiRequestFailure

try:
  wos_client.wos.add_instance_mapping(                
    service_instance_id=data_mart_id,
    project_id=PROJECT_ID
  )
except ApiRequestFailure as arf:
    if arf.response.status_code == 409:
      # Instance mapping already exists
      pass
    else:
      raise arf

# %%
wos_client.data_marts.show()

# %%
from ibm_metrics_plugin.metrics.llm.utils.constants import LLMTextMetricGroup, LLMCommonMetrics, LLMRAGMetrics, RetrievalQualityMetric

supporting_monitors = {
    "generative_ai_quality": {
        "parameters": {
            "min_sample_size": 10,
            "metrics_configuration": {
            }
        }
    }
}
import json
print(json.dumps(supporting_monitors, indent=2, ensure_ascii=False))

# %%
language_code = "ja"

response = wos_client.wos.execute_prompt_setup(
    prompt_template_asset_id=PROMPT_TEMPLATE_ASSET_ID,
	label_column="reference_text",
	operational_space_id="development",
	problem_type="retrieval_augmented_generation",
	input_data_type="unstructured_text",
    data_input_locale=[language_code],
    generated_output_locale=[language_code],
	project_id=PROJECT_ID,
    context_fields=["context"],     
    question_field="question",
	supporting_monitors=supporting_monitors,
	background_mode=True)
response.result.to_dict()

# %%
response = wos_client.wos.get_prompt_setup(
    prompt_template_asset_id=PROMPT_TEMPLATE_ASSET_ID,
    project_id=PROJECT_ID)
response.result.to_dict()

# %%

image.png

評価

evaluate_risk.py
# %%
from dotenv import load_dotenv

load_dotenv(override=True)

# %%
import os

CPD_URL = os.environ.get("CPD_URL")
CPD_USERNAME = os.environ.get("CPD_USERNAME")
CPD_PASSWORD = os.environ.get("CPD_PASSWORD")
CPD_API_KEY = os.environ.get("CPD_API_KEY")
PROJECT_ID = os.environ.get("PROJECT_ID")

# %%
from ibm_cloud_sdk_core.authenticators import CloudPakForDataAuthenticator
from ibm_watson_openscale import *
from ibm_watson_openscale.supporting_classes.enums import *
from ibm_watson_openscale.supporting_classes import *

authenticator = CloudPakForDataAuthenticator(
    url=CPD_URL,
    username=CPD_USERNAME,
    password=CPD_PASSWORD,
    disable_ssl_verification=True
)

wos_client = APIClient(
    service_url=CPD_URL,
    authenticator=authenticator,
)
data_mart_id = wos_client.service_instance_id
print(data_mart_id)
print(wos_client.version)

# %%
SUBSCRIPTION_ID = os.environ.get("SUBSCRIPTION_ID")
SUBSCRIPTION_ID

# %%
wos_client.monitor_instances.show(target_target_id=SUBSCRIPTION_ID)

# %%
monitor_definition_id = "mrm"
target_target_id = SUBSCRIPTION_ID
response = wos_client.monitor_instances.list(
  data_mart_id=data_mart_id,
  monitor_definition_id=monitor_definition_id,
  target_target_id=target_target_id,
  project_id=PROJECT_ID)
response.result.to_dict()

# %%
mrm_monitor_instance_id = response.result.to_dict()["monitor_instances"][0]["metadata"]["id"]
mrm_monitor_instance_id

# %%
import pandas as pd

df = pd.read_csv("rag_generated_text.csv")
df

# %%
df.rename(columns={
    "question": "question",
    "target_answer": "reference_text",
    "context": "context",
    "generated_text": "generated_text"
}, inplace=True)
df

# %%
import csv

df[["context", "question", "generated_text", "reference_text"]]

# %%
df[["context", "question", "generated_text", "reference_text"]].to_csv("test.csv", index=False, quoting=csv.QUOTE_ALL)

# %%
test_data_path = "test.csv"
test_data_set_name = "data"
content_type = "multipart/form-data"

response  = wos_client.monitor_instances.mrm.evaluate_risk(
    monitor_instance_id=mrm_monitor_instance_id,
    test_data_set_name=test_data_set_name, 
    test_data_path=test_data_path,
    content_type=content_type,
    body=None,
    project_id=PROJECT_ID,
    includes_model_output=True,
    background_mode=True
)
response.result.to_dict()

# %%
response = wos_client.monitor_instances.mrm.get_risk_evaluation(
    monitor_instance_id=mrm_monitor_instance_id,
    project_id=PROJECT_ID
)
response.result.to_dict()

# %%
monitor_definition_id = "generative_ai_quality"
response = wos_client.monitor_instances.list(
  data_mart_id=data_mart_id,
  monitor_definition_id=monitor_definition_id,
  target_target_id=target_target_id,
  project_id=PROJECT_ID
)
response.result.to_dict()

# %%
gaiq_monitor_instance_id = response.result.to_dict()["monitor_instances"][0]["metadata"]["id"]
gaiq_monitor_instance_id

# %%
wos_client.monitor_instances.show_metrics(
  monitor_instance_id=gaiq_monitor_instance_id,
  project_id=PROJECT_ID
)

# %%
response = wos_client.data_sets.list(
  target_target_id=SUBSCRIPTION_ID,
  target_target_type="subscription",
  type="gen_ai_quality_metrics"
)
response.result.to_dict()

# %%
gaiq_data_set_id = response.result.to_dict()["data_sets"][0]["metadata"]["id"]
gaiq_data_set_id
# %%
wos_client.data_sets.show_records(data_set_id = gaiq_data_set_id)

# %%

image.png

Detached prompt templateを本番環境へプロモート

image.png
image.png
image.png
image.png

プロモートしたDetached prompte templateを設定

execute_detached_prompte_template_production.py
# %%
from dotenv import load_dotenv

load_dotenv(override=True)

# %%
import os

CPD_URL = os.environ.get("CPD_URL")
CPD_USERNAME = os.environ.get("CPD_USERNAME")
CPD_PASSWORD = os.environ.get("CPD_PASSWORD")
CPD_API_KEY = os.environ.get("CPD_API_KEY")

PROJECT_ID = os.environ.get("PROJECT_ID")

SPACE_ID = os.environ.get("SPACE_ID")
PROMPTE_TEMPLATE_ASSET_ID = os.environ.get("PROMPT_TEMPLATE_ASSET_ID_PRODUCTION")
DEPLOYMENT_ID = os.environ.get("DEPLOYMENT_ID")

# %%
from ibm_cloud_sdk_core.authenticators import CloudPakForDataAuthenticator
from ibm_watson_openscale import *
from ibm_watson_openscale.supporting_classes.enums import *
from ibm_watson_openscale.supporting_classes import *

authenticator = CloudPakForDataAuthenticator(
    url=CPD_URL,
    username=CPD_USERNAME,
    password=CPD_PASSWORD,
    disable_ssl_verification=True
)

wos_client = APIClient(
    service_url=CPD_URL,
    authenticator=authenticator,
)
data_mart_id = wos_client.service_instance_id
print(data_mart_id)
print(wos_client.version)

# %%
from ibm_watson_openscale.base_classes import ApiRequestFailure

try:
  wos_client.wos.add_instance_mapping(                
    service_instance_id=data_mart_id,
    space_id=SPACE_ID     # <- ここが大切!
  )
except ApiRequestFailure as arf:
    if arf.response.status_code == 409:
      # Instance mapping already exists
      pass
    else:
      raise arf

# %%
supporting_monitors = {
    "generative_ai_quality": {
        "parameters": {
            "min_sample_size": 10,
            "metrics_configuration": {
            }
        }
    }
}

# %%
language_code = "ja"

response = wos_client.wos.execute_prompt_setup(
    prompt_template_asset_id=PROMPTE_TEMPLATE_ASSET_ID,
    space_id=SPACE_ID,
    deployment_id=DEPLOYMENT_ID,
	label_column="reference_text",
	operational_space_id="production",
	problem_type="retrieval_augmented_generation",
	input_data_type="unstructured_text",
    data_input_locale=[language_code],
    generated_output_locale=[language_code],
    context_fields=["context"],     
    question_field="question",
	supporting_monitors=supporting_monitors,
	background_mode=True)
response.result.to_dict()

# %%
response = wos_client.monitor_instances.mrm.get_prompt_setup(
    prompt_template_asset_id=PROMPTE_TEMPLATE_ASSET_ID,
    deployment_id=DEPLOYMENT_ID,
    space_id=SPACE_ID)
response.result.to_dict()

# %%

image.png

watsonx.governanceへのログ送付機能の実装

pyload.py
# %%
from dotenv import load_dotenv

load_dotenv(override=True)

# %%
import os

CPD_URL = os.environ.get("CPD_URL")
CPD_USERNAME = os.environ.get("CPD_USERNAME")
CPD_PASSWORD = os.environ.get("CPD_PASSWORD")
CPD_API_KEY = os.environ.get("CPD_API_KEY")

PROJECT_ID = os.environ.get("PROJECT_ID")

SPACE_ID = os.environ.get("SPACE_ID")
SUBSCRIPTION_ID = os.environ.get("SUBSCRIPTION_ID_PRODUCTION")

# %%
from ibm_cloud_sdk_core.authenticators import CloudPakForDataAuthenticator
from ibm_watson_openscale import *
from ibm_watson_openscale.supporting_classes.enums import *
from ibm_watson_openscale.supporting_classes import *

authenticator = CloudPakForDataAuthenticator(
    url=CPD_URL,
    username=CPD_USERNAME,
    password=CPD_PASSWORD,
    disable_ssl_verification=True
)

wos_client = APIClient(
    service_url=CPD_URL,
    authenticator=authenticator,
)
data_mart_id = wos_client.service_instance_id
data_mart_id

# %%
wos_client.version

# %%
from ibm_watson_openscale.supporting_classes.enums import *

data_set_id = None
response = wos_client.data_sets.list(
    type=DataSetTypes.PAYLOAD_LOGGING,
    target_target_id=SUBSCRIPTION_ID,
    target_target_type=TargetTypes.SUBSCRIPTION)
response.result.to_dict()

# %%
data_set_id = response.result.to_dict()['data_sets'][1]['metadata']['id']
data_set_id

# %%
#import json
#import pandas as pd

#with open(file="hap_low_pii_low.json", mode="r", encoding="utf-8") as f:
#    data = json.load(f)
#dataset = pd.json_normalize(data=data)
#dataset

#request_body = []
#for index, row in dataset.iterrows():
#    print(index)
#    question = row.question
#    context = row.context
#    generated_text = row.generated_text
#    tmp = {
#        "request": {
#            "parameters": {
#                "template_variables": {
#                    "context": context,
#                    "question": question
#                }
#            }
#        },
#        "response": {
#            "results": [
#                {
#                    "generated_text": generated_text
#                }
#            ]
#        }
#    }
#    request_body.append(tmp)

# %%
#print(json.dumps(request_body, indent=2, ensure_ascii=False))

# %%
#response = wos_client.data_sets.store_records(
#   data_set_id=data_set_id,
#   request_body=request_body,
#   background_mode=True)
#response.result.to_dict()

# %%
#wos_client.data_sets.get_records_count(data_set_id=data_set_id)

# %%

RAGの構築

app.py
# %%
#!pip install gradio

# %%
import gradio as gr
from rag import vector_store, prompt_template, llm
from unicodedata import normalize
from payload import wos_client, data_set_id

# %%
import pandas as pd

df = pd.read_csv("RAG-Evaluation-Dataset-JA/rag_evaluation_result.csv")
df

# %%
import os

existed_files = []
files = df['target_file_name'].unique()
for file in files:
    path = f"data/{file}"
    if os.path.exists(path=path):
        existed_files.append(file)

# %%
df = df[df["target_file_name"].isin(existed_files)].copy()

# %%
def rag(question):
    context = ""
    try:
        results = vector_store.similarity_search(query=question)
        for index, result in enumerate(results):
            source = result.metadata['source']
            page = result.metadata['page'] + 1
            context += f"# {index+1}/{source}/{page}:"
            context += "\n"
            context += normalize("NFKD", result.page_content).replace("\n", "")
            context += "\n"
        response = llm.invoke(input=prompt_template.format(context=context, question=question))
        generated_text = response.content

        request_body = [{
            "request": {
                "parameters": {
                    "template_variables": {
                        "context": context,
                        "question": question
                    }
                }
            },
            "response": {
                "results": [
                    {
                        "generated_text": generated_text
                    }
                ]
            }
        }]

        response = wos_client.data_sets.store_records(
        data_set_id=data_set_id,
        request_body=request_body,
        background_mode=True)
        response.result.to_dict()

    except Exception as e:
        generated_text = ""
        context = ""
        return generated_text, context

    return generated_text, context

# %%
def get_reference_text(selected_question):
    reference_text = df[df["question"] == selected_question]["target_answer"].iloc[0]
    target_file_name = df[df["question"] == selected_question]["target_file_name"].iloc[0]
    target_page_no = df[df["question"] == selected_question]["target_page_no"].iloc[0]
    return reference_text, target_file_name, target_page_no

# %%
with gr.Blocks() as app:
    question = gr.Dropdown(choices=df['question'].tolist(), label="question")
    reference_text = gr.Textbox(label="reference_text", value=df.iloc[0]["target_answer"])
    target_file_name = gr.Textbox(label="target_file_name", value=df.iloc[0]["target_file_name"])
    target_page_no = gr.Textbox(label="target_page_no", value=df.iloc[0]["target_page_no"])
    button = gr.Button()
    generated_text = gr.Textbox(label="generated_text")
    context = gr.Textbox(label="context")
    question.change(fn=get_reference_text, inputs=[question], outputs=[reference_text, target_file_name, target_page_no])
    button.click(fn=rag, inputs=[question], outputs=[generated_text, context])
    question.change(fn=rag, inputs=[question], outputs=[generated_text, context])

# %%
app.launch(server_name="0.0.0.0")
#app.launch(server_name="0.0.0.0", server_port=8080)

Webアプリケーションの実行

python app.py

image.png

評価@watsonx

image.png

監視@watsonx

image.png
image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?