Azure へのデプロイ(本記事の続き)はこちら
Azure 環境の RAG システム構築 ローカル開発手順~デプロイ②
https://qiita.com/KUROMAGOORO/items/9f4a942e4bee651bf850
概要
- Azure 環境でのアプリ開発~デプロイの流れをまとめました(①ではローカル環境開発まで)
- 各サービスと接続する事を優先したので「①情報取得」でデータをいろいろな所から集めたり、「③生成結果保存」が複数発生します
- 実施には偉い権限(所有者以上)が必要です
-
ローカル環境での接続例
各サービスのデプロイ
- 以下サービスをデプロイしていきます
- ローカル環境開発から実施するので「AppService」は後の手順でデプロイします
リソースグループ
仮想ネットワーク
- サービスエンドポイントに「Microsoft.CognitiveServices」を追加します(AzureOpenAI用)
ストレージアカウント
Azure Database for MySQL Flexible Server
Azure OpenAI
Azure AI Document Intelligence
AISearch
-
https://qiita.com/KUROMAGOORO/items/2967f8fe822294710079#azure-ai-search-%E3%81%AE%E4%BD%9C%E6%88%90
を参考にして「システム割当マネージドID」の設定をします
データベース構築・登録
AISearch
- https://qiita.com/KUROMAGOORO/items/2967f8fe822294710079 などを参考にして適当なデータを登録します
- インデクサーに取り込むテキスト
Blob ストレージ
MySQL
アプリ構築
- 適当なアプリを作成します
- DefaultAzureCredential() を使用しているため、アプリ起動前に az login をしておきます
上記認証では、キーの代わりに設定したロールの権限を使用してアクセスします。
- azure mysql はデフォルトで require_secure_transport = ON なので、以下証明書をダウンロードしてアプリの適当な場所に配備してコネクタ作成時に利用します
https://learn.microsoft.com/ja-jp/azure/mysql/flexible-server/how-to-connect-tls-ssl#download-the-public-ssl-certificate
.env
AISEARCH_ENDPOINT = "https://XXX.search.windows.net"
AISEARCH_KEY = "KEYKEYKEY"
AISEARCH_INDEX = "vector-1733938719844"
BLOB_URL = "https://XXX.blob.core.windows.net"
CONTAINER_NAME_INPUT = "input"
CONTAINER_NAME_OUTPUT = "output"
MYSQL_HOST = "XXX.mysql.database.azure.com"
MYSQL_USER = "rrr"
MYSQL_PASSWORD = "KEYKEYKEY"
MYSQL_DATABASE = "rag"
AZURE_OPENAI_ENDPOINT = "https://XXX.openai.azure.com/"
AZURE_OPENAI_API_VERSION = "2024-07-01-preview"
requirements.txt
gradio==5.8.0
azure-search-documents==11.5.2
PyMySQL==1.1.1
azure-storage-blob==12.24.0
azure-identity==1.19.0
openai==1.57.2
python-dotenv==1.0.1
app.py
from action.rag import rag
import gradio as gr
from dotenv import load_dotenv
load_dotenv()
demo = gr.Interface(
fn=rag,
inputs=["text"],
outputs=["text"],
allow_flagging="never"
)
demo.launch()
action.rag.py
import task.retrieve as ret
import task.generate as gen
import task.register as reg
def rag(input: str):
ret_result = ret._run(input)
gen_result = gen._run(input, ret_result)
reg._run(gen_result)
return gen_result
task.retrieve.py
import os
# aisearch から取得
def aisearch(text: str):
from azure.search.documents import SearchClient
from azure.search.documents.models import QueryType, VectorizableTextQuery
from azure.core.credentials import AzureKeyCredential
credential = AzureKeyCredential(os.getenv("AISEARCH_KEY"))
search_client = SearchClient(endpoint=os.getenv("AISEARCH_ENDPOINT"),
index_name=os.getenv("AISEARCH_INDEX"),
credential=credential)
vector_query = VectorizableTextQuery(text=text, k_nearest_neighbors=50, fields="chunk_vector", exhaustive=True)
results = search_client.search(
search_text = text,
query_type = QueryType.SEMANTIC,
top = 10,
semantic_configuration_name = "semantic-configuration",
vector_queries = [vector_query],
select = "title, chunk"
)
# 1件だけなので適当に取得する
for result in results:
print(f"aisearch contents: {result['chunk']}")
return result["chunk"]
# blob から取得
def download_blob():
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
account_url = os.getenv("BLOB_URL")
credential = DefaultAzureCredential()
blob_service_client = BlobServiceClient(account_url, credential=credential)
blob_client = blob_service_client.get_blob_client(container=os.getenv("CONTAINER_NAME_INPUT"), blob="blob_input.txt")
downloader = blob_client.download_blob(max_concurrency=1, encoding='UTF-8')
blob_text = downloader.readall()
print(f"Blob contents: {blob_text}")
return blob_text
# mysql から取得
def select_mysql():
import pymysql.cursors
# データベースに接続
connection = pymysql.connect(host=os.getenv("MYSQL_HOST"),
user=os.getenv("MYSQL_USER"),
password=os.getenv("MYSQL_PASSWORD"),
database=os.getenv("MYSQL_DATABASE"),
cursorclass=pymysql.cursors.DictCursor,
ssl_ca='ssl/DigiCertGlobalRootCA.crt.pem')
with connection:
with connection.cursor() as cursor:
# データ読み込み
sql = "SELECT `input_text` FROM `input`"
cursor.execute(sql)
result = cursor.fetchone()
print(f"mysql contents: {result['input_text']}")
return result['input_text']
def _run(text: str):
aisearch_text = aisearch(text)
blob_text = download_blob()
mysql_text = select_mysql()
return f"{aisearch_text}.{blob_text}.{mysql_text}"
task.generate.py
def llm(input: str, retrieve_text: str):
import os
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
from openai import AzureOpenAI
token_provider = get_bearer_token_provider(
DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default"
)
client = AzureOpenAI(
api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
azure_ad_token_provider=token_provider,
)
message = f"""
# 質問
{input}
# ソース
{retrieve_text}
"""
chat_completion = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "あなたは「ソース」の情報を使ってユーザの質問に答える親切なアシスタントです。"},
{"role": "user", "content": message}
]
)
print(f"llm generate contents: {chat_completion.choices[0].message.content}")
return chat_completion.choices[0].message.content
def _run(input: str, retrieve_text: str):
generate_text = llm(input, retrieve_text)
return generate_text
task.register.py
import os
# blob から取得
def upload_blob(text):
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
account_url = os.getenv("BLOB_URL")
credential = DefaultAzureCredential()
blob_service_client = BlobServiceClient(account_url, credential=credential)
blob_client = blob_service_client.get_blob_client(container=os.getenv("CONTAINER_NAME_OUTPUT"), blob="blob_output.txt")
blob_client.upload_blob(text.encode(), blob_type="BlockBlob", overwrite=True)
# mysql から取得
def insert_mysql(text: str):
import pymysql.cursors
# データベースに接続
connection = pymysql.connect(host=os.getenv("MYSQL_HOST"),
user=os.getenv("MYSQL_USER"),
password=os.getenv("MYSQL_PASSWORD"),
database=os.getenv("MYSQL_DATABASE"),
cursorclass=pymysql.cursors.DictCursor,
ssl_ca='ssl/DigiCertGlobalRootCA.crt.pem')
with connection:
with connection.cursor() as cursor:
# レコードを挿入
sql = "INSERT INTO `output` (`output_text`) VALUES (%s)"
cursor.execute(sql, (text))
# コミットしてトランザクション実行
connection.commit()
def _run(text: str):
upload_blob(text)
insert_mysql(text)