はじめに
本記事では、私がプライベートで開発中のAIエージェントを利用した簡易的なWebアプリケーションを、 MLOps/LLMOps の両面を意識してDocker Compose上に構築した事例をご紹介します。
※AIエージェントの動作ロジックやLLMの詳細実装は今回触れませんが、実装の参考としてご覧いただければと思います。
1. MLOps LLMOpsとは何か?
MLOpsやLLMOpsとはそれぞれ一言で以下のように表すことが出来ます。
- MLOps: MLOps は、機械学習モデルの「学習・評価・本番運用・監視」などのライフサイクルを一貫して管理するための取り組みです。モデルのバージョン管理、データパイプラインの管理、継続的学習などを含みます。
- LLMOps: LLMOps は、大規模言語モデル (LLM) や生成AIを使ったアプリケーションを開発・運用する際に特化したプロセスを扱います。具体的には、以下のような要素が含まれます。
- プロンプト管理(複数バージョンの管理やテンプレート更新)
- 応答ログ監査(LLMが何をどのように回答したか履歴を保存)
- RAG(Retrieval-Augmented Generation) など外部ドキュメント連携
- LLMのバージョン変更による影響調査やA/Bテスト
今回の構成では、機械学習モデル(クラシックなアルゴリズム) と LLM の両方をAIエージェントで活用する想定です。したがって、それぞれのライフサイクル管理のために MLOps と LLMOps の視点を取り入れています。
2. MLOps/LLMOps 環境のコンテナ構成と各コンポーネントの役割
次に、Docker Composeで構築したコンテナ構成を示します。以下の図はコンポーネント間の連携とデータフローを簡易的にまとめたものです。
構成図
各コンポーネントの役割
コンテナ | 役割 | 処理内容 |
---|---|---|
Streamlit(UIコンテナ) | ユーザーがAIエージェントを操作するためのフロントエンド | - ユーザーが銘柄コードを入力し、FastAPI にリクエストを送信 - 返却された分析結果を可視化 |
FastAPI(APIコンテナ) | AIエージェントの制御 & AI推論エンドポイントの提供 | - Streamlit からのリクエストを受け取る - LLM (Gemini) を呼び出し、プロンプトと応答を MLflow に記録 - MLflow(モデルサービング)を利用して、機械学習モデルの推論を実施 - RAG (ChromaDB) を活用し、関連情報を検索・取得 - 最終的な評価を生成し、Streamlit に返却 |
MLflow(Trackingコンテナ) | MLOps/LLMOps の実験管理 | - MLモデルの学習履歴・評価指標の保存 - LLM (Gemini) のプロンプト & 応答のログ記録 - 各推論リクエストの実験パラメータ・メトリクスを記録 - PostgreSQL に実験データを保存 |
MLflow(Hostingコンテナ) | モデルのホスティング | - FastAPI からのリクエストを受け、機械学習モデルを用いて推論を実行 - バージョン管理されたモデルを MLflow Registry から取得し、推論を提供 - モデルのデプロイ管理(Staging → Production への昇格など) |
ChromaDB(RAG用コンテナ) | RAG (Retrieval-Augmented Generation) のデータストレージ | - 類似検索による外部情報の取得(Yahoo Financeや市場レポートのベクトルデータ) - LLM の回答精度を向上させるための関連データ検索 - FastAPI からの検索クエリを処理し、適切なデータを提供 |
PostgreSQL(データベースコンテナ) | MLflow の実験データやモデルメタデータの保存 | - MLflow の Tracking データを保存 - 各エクスペリメントのバージョン管理を実施 |
Jupyter(学習用コンテナ) | 機械学習モデルの学習・データ分析 | - データの前処理・特徴量エンジニアリング - MLモデルのトレーニング・評価 - 学習済みモデルを MLflow に登録 - 実験の可視化・デバッグ |
各コンポーネント間のデータフロー
データフロー | 処理 | データ内容 |
---|---|---|
Streamlit → FastAPI | ユーザー入力を FastAPI に送信し、AIエージェントを起動 | 入力データ |
FastAPI → MLflow(Tracking) | 各エージェントの実験データ(プロンプト、レスポンス、推論結果)を記録 | - プロンプトのバージョン管理 - Gemini の応答 - モデルの予測結果 - 投資評価(スコア) |
FastAPI → MLflow(Hosting) | 登録済みの機械学習モデルを使って推論を実施 | モデルのinputとなる特徴量 |
FastAPI → ChromaDB | RAG を活用し、検索 | RAGで検索した結果の情報を返却 |
MLflow(Tracking) ↔ PostgreSQL | 実験データの保存 | - MLモデルの評価指標 - プロンプトのバージョン履歴 - 実験のメタデータ |
Jupyter → MLflow(Tracking) | 学習済みモデルの登録 | - モデルのアーティファクト - 学習時のパラメータ - 特徴量の重要度 |
Streamlit ← FastAPI | ユーザーに AIエージェントの分析結果を表示 | AIエージェントのoutput |
その他構成のポイント:
RAG基盤で利用するベクトルDBは以下のような選択肢があるが、今回は軽量でまずは試すことを目的として、ChromaDBを選定した。
とてもざっくりですが、個人的に思っているベクトルDBの特徴は以下の通りです。
ベクトルDB | メリット | デメリット | ChromaDB との違い |
---|---|---|---|
ChromaDB | 軽量・シンプル | スケールしない | ローカル利用に最適 |
FAISS | 超高速, GPU対応 | 永続化なし | 高速だがメモリベース |
Milvus | 大規模データ向け | セットアップが必要 | 分散処理 & 永続化あり |
Weaviate | ハイブリッド検索可 | 運用負荷高め | ベクトル+テキスト検索 |
Pinecone | 完全マネージド | クラウド限定 | SaaS型(運用不要) |
3. MLflowを用いた実験管理とモデルのバージョン管理
3-1. モデル学習 & トラッキング
Jupyterコンテナ上のNotebookで以下のようにmlflowを使い、モデル学習のログやメトリクスを記録します。
import mlflow
import mlflow.sklearn
mlflow.set_tracking_uri("http://mlflow-tracking:5003")
mlflow.set_experiment("AI_Agent_Experiment")
with mlflow.start_run():
# ML モデルの学習 (例: RandomForest)
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("precision_macro", report["macro avg"]["precision"])
mlflow.log_metric("recall_macro", report["macro avg"]["recall"])
mlflow.log_metric("f1_score_macro", report["macro avg"]["f1-score"])
# モデル保存 & Registry 登録
mlflow.sklearn.log_model(
model,
"model",
registered_model_name="Stock_Chart_Classification_Model",
signature=signature
)
その結果、MLFlow上では以下のように設定した内容が表示されます。
(今回の例としたモデルは精度低すぎますねw)
- ポイント:
- mlflow.set_tracking_uri() で Trackingサーバー (mlflow-trackingコンテナ)のURLを指定します。
- registered_model_name によって MLflow Model Registry へ自動的にモデルが登録されます。
- 正式な運用では、モデルの学習パラメータやデータセットのバージョンなどもログすると再現性が高まります。
3-2. Model Registry でステージ管理
MLflowのWeb UIにアクセスし、[Model Registry] 画面でモデルを Staging → Production に昇格させると、本番用アプリから参照するモデルを切り替え可能です。
たとえば「Production」ステージとなったモデルをFastAPIが読み込む実装を行えば、モデル差し替え時にアプリ改修を最小限に抑えられます。
4. LLM 呼び出し・RAG 処理のトラッキング
4-1. LLMのプロンプトと応答のトラッキング
LLM呼び出しに関しても、MLflowの各Runにプロンプトと応答をログしておくと後から監査やデバッグがしやすくなります。以下は一例です。
def track_llm_response(prompt: str, response: str, model_name="gemini-1.5-flash"):
""" LLM のリクエストとレスポンスを MLflow に記録 """
with mlflow.start_run(nested=True): # ✅ 変更: nested=True を統一
# 🔹 Git の情報を MLflow に記録
git_commit = get_git_commit_hash()
git_branch = get_git_branch()
mlflow.set_tag("git_commit", git_commit)
mlflow.set_tag("git_branch", git_branch)
mlflow.set_tag("model_name", model_name)
mlflow.set_tag("task", "llm_analysis")
# 例: llm_handler.py の generate_llm_response 内
snippet_length = 100 # 適宜調整
snippet = prompt[:snippet_length] + ("..." if len(prompt) > snippet_length else "")
# `log_param()` も使用して Parameters に表示させる
mlflow.log_param("git_commit", git_commit)
mlflow.log_param("git_branch", git_branch)
mlflow.log_param("model_name", model_name)
mlflow.log_param("task", "llm_analysis")
mlflow.log_param("prompt_snippet", snippet)
mlflow.log_text(prompt, "prompt.txt")
mlflow.log_text(response, "response.txt")
mlflow.end_run()
def generate_llm_response(
prompt: str,
model_name: str = "gemini-1.5-flash",
prompt_template_version: str = "v1.0",
user_id: str = "anonymous"
):
"""
LLM を呼び出してレスポンスを取得し、MLflow に各種パラメータ/メタ情報を記録する。
引数:
prompt: 実行時に組み立てた最終プロンプト
model_name: 使用するGeminiのモデル名 (例: "gemini-1.5-flash")
prompt_template_version: テンプレートのバージョン (Gitハッシュやv1.0など任意の文字列)
user_id: ユーザIDやセッションIDなど、呼び出し主体を識別できる情報
"""
# --- (1) Gitのコミットハッシュを取得---
try:
commit_hash = subprocess.check_output(["git", "rev-parse", "HEAD"]).decode("utf-8").strip()
except Exception:
commit_hash = "unknown_commit"
# --- (2) モデル呼び出し ---
gemini_model = genai.GenerativeModel(model_name)
response = gemini_model.generate_content(prompt) # レスポンス本体
# 既存のトラッキングロジックを呼び出し(必要に応じて修正・除去)
track_llm_response(prompt, response.text, model_name)
return response.text
トラッキングのイメージは以下の画面の通りです。
- ポイント:
- nested=True を使うことで、MLモデル推論のRunとは別にLLM専用のRunをネスト管理できます。
- git_commit や git_branchをタグ付けしておけば、どのバージョンのプロンプトテンプレート を使ったか追跡できます。
- 個人情報や機密データが含まれる場合は、テキスト保存時にマスク処理をするなどの配慮が必要です。
4-2. RAG 処理
従来のLLMを補完するため、ChromaDB に外部ドキュメントをベクトル化して格納し、類似検索を行う流れです。
PDFなどを読み込んでチャンク分割し、埋め込みモデルでベクトル化→ChromaDBに投入する例を示します。
# ChromaDB の HTTP クライアント(Docker コンテナの chromadb に接続)
client = chromadb.HttpClient(host="chromadb", port=8000)
# コレクション作成
collections = {
___
}
# 適切な埋め込みモデルを利用
EMBEDDING_MODEL = "models/text-embedding-004" # 高精度な768次元埋め込み
def get_gemini_embedding(text):
"""Gemini APIを使用してテキストの埋め込みを取得"""
try:
response = genai.embed_content(model=EMBEDDING_MODEL, content=text)
if "embedding" in response:
return response["embedding"]
else:
print(f"埋め込み取得失敗(レスポンス不正): {response}")
return None
except Exception as e:
print(f"埋め込みエラー: {str(e)}")
return None
# チャンク化(トークンベースで分割 & オーバーラップ設定)
def split_text(text, chunk_size=300, overlap=50):
"""テキストを適切なチャンクサイズで分割"""
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size, chunk_overlap=overlap, length_function=len
)
return text_splitter.split_text(text)
# PDFのテキストを用途別に登録(チャンク分割対応)
def process_pdf(file_path, category):
"""PDFファイルを読み込んで用途別にChromaDBへ登録(チャンク分割対応)"""
if category not in collections:
print(f"未知のカテゴリ '{category}' を 'other' に変更")
category = "other"
with pdfplumber.open(file_path) as pdf:
full_text = "\n".join([page.extract_text() for page in pdf.pages if page.extract_text()])
if not full_text.strip():
print(f" {file_path} はテキストを抽出できませんでした")
return
# トークンベースで分割
text_chunks = split_text(full_text)
for idx, chunk in enumerate(text_chunks):
embedding = get_gemini_embedding(chunk)
if embedding is None:
print(f" 埋め込み失敗: チャンク {idx+1}/{len(text_chunks)} (ファイル: {file_path})")
continue
doc_id = str(uuid.uuid4()) # ユニークなIDを生成
try:
collections[category].add(
ids=[doc_id],
documents=[chunk],
metadatas=[{"source": file_path, "category": category, "chunk_index": idx}],
embeddings=[embedding]
)
print(f"チャンク {idx+1}/{len(text_chunks)} 登録完了: {file_path} → {category} (ID: {doc_id})")
except Exception as e:
print(f" ChromaDB 登録エラー: {str(e)}")
- ポイント:
- pdfplumberでPDFをテキスト化し、300~500トークン程度でのチャンク分割を行うとベクトル検索の精度・検索速度がバランスしやすいです。
- Dockerイメージにgitやpdfplumberなど必要ライブラリをインストールしておく必要があります。
以下は類似検索部分の例です。クエリに対してLLMで「クエリ拡張」(rewrite)を行い、ベクトル検索で関連度が高いドキュメントを取得します。
# クエリのリライト(LLMを利用)
def rewrite_query(query):
"""LLM を使ってクエリを拡張"""
prompt = f"""
クエリ: {query}
"""
try:
gemini_model = genai.GenerativeModel("gemini-1.5-flash")
response = gemini_model.generate_content(prompt)
if hasattr(response, "text") and response.text:
print(f"クエリ拡張成功: {response.text}")
return response.text.strip()
else:
print(f"クエリ拡張失敗(レスポンス不正): {response}")
return query
except Exception as e:
print(f"クエリ拡張エラー: {str(e)}")
return query
# ChromaDB に問い合わせ(類似度フィルタ適用)
def retrieve_relevant_info(query: str, top_k=3, min_score=0.1):
"""ChromaDB から類似情報を取得(類似度閾値なし)"""
expanded_query = rewrite_query(query) # クエリ拡張
print(f"クエリ拡張後: {expanded_query}")
query_embedding = get_gemini_embedding(expanded_query) # クエリをベクトル化
if query_embedding is None:
print("クエリ埋め込みの取得に失敗しました")
return "クエリの埋め込みに失敗しました。"
# ChromaDB に格納されているデータの埋め込み次元確認
peek_data = collections["daily"].peek()
if "embeddings" in peek_data and len(peek_data["embeddings"]) > 0:
print(f"格納済みデータの埋め込みベクトルの次元: {len(peek_data['embeddings'][0])}")
try:
results = collections["daily"].query(
query_embeddings=[query_embedding],
n_results=top_k,
include=["documents", "metadatas", "distances"]
)
except Exception as e:
print(f"クエリ実行エラー: {str(e)}")
return " クエリの実行に失敗しました。"
if not results.get("documents"):
return " 関連情報なし"
retrieved_docs = []
for i in range(len(results["documents"][0])):
score = results["distances"][0][i]
if score < min_score:
continue # スコアが低すぎる場合は無視
retrieved_docs.append(f" 類似情報 {i+1}: {results['documents'][0][i][:200]}...")
retrieved_docs.append(f" メタデータ: {results['metadatas'][0][i]}")
retrieved_docs.append(f" 類似度スコア: {score:.4f}")
retrieved_docs.append("-" * 50)
return "\n".join(retrieved_docs) if retrieved_docs else "🔍 適切な情報が見つかりませんでした。"
for i in range(len(results["documents"][0])):
retrieved_docs.append(f" 類似情報 {i+1}: {results['documents'][0][i][:200]}...")
retrieved_docs.append(f" メタデータ: {results['metadatas'][0][i]}")
retrieved_docs.append(f" 類似度スコア: {results['distances'][0][i]:.4f}")
retrieved_docs.append("-" * 50)
return "\n".join(retrieved_docs) if retrieved_docs else "🔍 適切な情報が見つかりませんでした。"
- ポイント:
- クエリ拡張や文書チャンク分割など、LLMを絡めた前処理はコストがかかるため、キャッシュ戦略を検討しましょう。
- 2重ループが入らないようにコードの重複には注意(スニペット重複など)。サンプルコードは省略・簡略化が多いので、運用コードではエラー処理を充実させるのが望ましいです。
5. FastAPI + Streamlit を使ったアプリ公開
5-1. FastAPIでのAPIエンドポイント
AIエージェントを呼び出すためのエンドポイント実装例です。StreamlitからのPOSTリクエストを受け取り、AIの推論ロジックを呼び出します。
from fastapi import Request
@app.post("/short_term_analysis")
async def short_term_analysis(request: Request, payload: StockAnalysisRequest):
logging.debug(f"受信したリクエスト: {await request.json()}")
result = run_short_term_analysis(payload.stock_code)
return result```
- ポイント:
- Docker Compose上では、コンテナ名をDNSとして解決して http://fastapi:8000/... のように呼び出せます。
- 外部APIキーやDB接続情報は、環境変数やDocker Secretsを通じて注入し、ソースコードに直書きしないのがベストプラクティスです。
5-2. Streamlit で簡易UI
最後にこのFastAPIをフロンタ側から呼び出すためのStreamlitのソース例です。
シンプルにinputとなるコードをテキストボックスで入力して実行ボタンをクリックすると、入力したコードをFastAPIで定義したAPIに渡すような仕組みです。
import streamlit as st
import requests
stock_code = st.text_input("コード")
if st.button("実行"):
resp = requests.post("http://fastapi:8000/run_ai_agent", json={"code": code})
st.write(resp.json())
- ポイント:
- Docker Composeファイルでdepends_onを設定し、FastAPIが起動済みであることを保証するのが望ましいです。
- リクエストのタイムアウトを設定し、例外時エラー表示できるようにしておくと運用時に安心です。
6. まとめ&今後の展望
MLOps/LLMOps のポイント
- MLflow を活用して、機械学習モデルのバージョン管理 (MLOps) と LLMのプロンプト/応答監査 (LLMOps) を同時に実現。
- GitHub連携やプロンプトのバージョン管理を導入することで、LLMのA/Bテスト や Gitリビジョンごとの変更追跡 が容易。
- RAG (Retrieval-Augmented Generation) を組み合わせることで、外部ドキュメントや社内データをLLMのコンテキストに取り込み、回答精度を高めることが可能。
今後の発展
- CI/CDパイプラインの整備
- 例: GitHub Actions などによるモデルの自動学習、ステージングへのデプロイ、本番昇格までを自動化。
- 本記事ではローカル環境を前提としたため最小限の構成でしたが、運用フェーズでは継続的インテグレーション/デリバリーが必須。
- モニタリング・アラート体制の強化
- Prometheus + Grafana などを用いてモデル推論のレスポンスタイムやエラー率を可視化し、異常検知時にSlackやメールで通知。
- LLM呼び出しにおける応答の品質やトークン消費量を定期的にトラッキングし、コストとクオリティを最適化。
これらの取り組みを段階的に追加・拡張していくことで、単なるローカル検証から本格的な生産環境への移行が円滑になります。プロジェクトの要件やリソースに合わせて優先順位を付けながら進めてみてください。