Private ChatGPT(privateGPT)をローカルデータ/Google Driveで動かす
❗️いいねをつけていただけると励みになります❗️
人気だったら、Githubで公開します。
「Private ChatGPT」(通称 privateGPT)を、ローカルファイルやGoogle Drive上のファイルをデータソースにして動かすStreamlitアプリを作成しました。本記事では、環境構築から主な機能、使い方までを解説します。
目次
- はじめに
- 特長・できること
- 動作環境・事前準備
- 各機能のポイント解説
- SQLiteバージョンチェック・自動修正
- embedchainによるドキュメント取り込み
- Google Drive認証&ファイル操作
- ローカルモード(sandbox)でのファイルアップロード
- チャット機能
- デプロイ方法のヒント
- おわりに
- コード 全文解説
1. はじめに
Private ChatGPTは、自分が所有するPDF/TXT/Wordなどのドキュメントを学習データとしてChatGPTに投げ、社内資料や技術ドキュメントをプライベートに質問できるツールです。本記事で紹介するStreamlitアプリは、
- ローカルディレクトリ(sandbox)上のファイル
- Google Drive上のファイル
を、ボタン操作だけで取り込み、対話形式で質問できる仕組みを備えています。
2. 特長・できること
-
SQLiteバージョンチェック
ChromaDB(ベクトルDB)用のSQLiteが古い場合、自動でpysqlite3-binary
をインストール・置き換え -
embedchain連携
PDFやGoogle Driveのファイルをembedchain.App
クラスで取り込み、ドキュメント検索 -
Google OAuth2 認証
Webブラウザ認証 or コード入力でOAuth2を実装 -
ローカルモード(sandbox)
ローカルにアップロードしたファイルをそのままデータベース化 -
チャットインターフェース
質問入力 → embedchainのquery()
で回答を生成
3. 動作環境・事前準備
- Python 3.8 以上
- pip または pipenv / Poetry
- OpenAI APIキー
- Google Cloud で取得した OAuth クライアントID/Secret
- Streamlit:
pip install streamlit
- embedchain:
pip install embedchain
- Google API クライアント:
pip install google-auth google-auth-oauthlib google-api-python-client
.env
に以下を用意しておきましょう:
OPENAI_API_KEY=sk-…
CLIENT_ID=xxx.apps.googleusercontent.com
CLIENT_SECRET=yyy
REDIRECT_URI=http://localhost:8501/oauth2callback
4. 各機能のポイント解説
4-1. SQLiteバージョンチェック・自動修正
ChromaDBのSQLite要件(3.35.0以上)を満たしていない場合、自動で pysqlite3-binary
をインストールし、標準ライブラリの sqlite3
と置き換えます。
import sys, subprocess, sqlite3, st # Streamlit を st として import
from sqlite3 import sqlite_version_info
def fix_sqlite():
# 現在の SQLite バージョンをチェック
if sqlite_version_info < (3, 35, 0):
st.warning(f"現在のSQLite3バージョンは{sqlite3.sqlite_version}です。ChromaDBには3.35.0以上が必要です。")
st.info("pysqlite3-binaryをインストールしています...")
# pysqlite3-binary のインストール
subprocess.check_call([
sys.executable, "-m", "pip", "install",
"pysqlite3-binary", "--quiet", "--disable-pip-version-check"
])
# sqlite3 モジュールを pysqlite3 で置き換え
import pysqlite3
sys.modules["sqlite3"] = pysqlite3
# 再ロードしてバージョン確認
import sqlite3
st.success(f"SQLite3バージョンが{sqlite3.sqlite_version}に更新されました。")
-
Streamlit Cloud 環境検出:
IS_STREAMLIT_CLOUD
環境変数や/.dockerenv
の有無でローカルとクラウドを判定 - エラー時は警告表示して処理を継続
4-2. embedchainによるドキュメント取り込み
ドキュメントの追加とクエリ実行はシンプルです。
from embedchain import App
# セッションステートに App インスタンスを保持
if 'emb_app' not in st.session_state:
st.session_state.emb_app = App()
emb_app = st.session_state.emb_app
# ローカルモードでPDFファイルを追加
for f in selected_files:
file_path = os.path.join('sandbox', f)
emb_app.add(source=file_path, data_type='pdf_file')
# Google Driveモードでファイルを追加
for file_meta in drive_files:
emb_app.add(source=file_meta['id'], data_type='google_drive_file')
-
data_type
は'pdf_file'
または'google_drive_file'
- 追加後はベクトル化して内部DB(ChromaDB)へ格納
クエリは以下のように実行:
query = st.text_input("質問を入力")
if st.button("送信") and query:
answer = emb_app.query(
query,
citations=False, # 引用リンクは不要な場合
max_tokens=4000 # トークン制限を緩和
)
st.markdown("### 回答")
st.markdown(answer)
4-3. Google Drive認証&ファイル操作
OAuth2フローは Webブラウザ認証/手動入力の2パターン対応。
from google_auth_oauthlib.flow import Flow
SCOPES = [
'https://www.googleapis.com/auth/drive.file',
'https://www.googleapis.com/auth/drive.readonly'
]
# クライアント設定(環境変数から取得)
client_config = {
'web': {
'client_id': st.session_state.google_client_id,
'client_secret': st.session_state.google_client_secret,
'redirect_uris': [st.session_state.google_redirect_uri],
'auth_uri': 'https://accounts.google.com/o/oauth2/auth',
'token_uri': 'https://oauth2.googleapis.com/token'
}
}
# 認証フロー初期化
flow = Flow.from_client_config(
client_config=client_config,
scopes=SCOPES,
redirect_uri=st.session_state.google_redirect_uri
)
# 認証URL取得
auth_url, _ = flow.authorization_url(
access_type='offline',
prompt='consent',
include_granted_scopes='true'
)
st.write(f"[認証用リンクを開く]({auth_url})")
# コード入力後のトークン取得
code = st.text_input("認証コードを貼り付け")
if st.button("トークン取得") and code:
flow.fetch_token(code=code)
creds = flow.credentials
st.session_state.creds = {
'token': creds.token,
'refresh_token': creds.refresh_token,
'token_uri': creds.token_uri,
'client_id': creds.client_id,
'client_secret': creds.client_secret,
'scopes': creds.scopes
}
st.success("Google Drive 認証に成功しました!")
認証後は googleapiclient.discovery.build
で Drive API クライアントを生成し、ファイル一覧やアップロードを実装します。
4-4. ローカルモード(sandbox)でのファイルアップロード
import os
# sandbox ディレクトリを確保
os.makedirs('sandbox', exist_ok=True)
# ファイルアップローダー
uploaded = st.file_uploader("ローカルファイルをアップロード", type=['pdf','txt','docx'])
if uploaded:
save_path = os.path.join('sandbox', uploaded.name)
with open(save_path, 'wb') as f:
f.write(uploaded.getbuffer())
st.success(f"{uploaded.name} を sandbox に保存しました。")
# sandbox 内のファイルリスト取得
def refresh_file():
return os.listdir('sandbox')
sandbox_files = refresh_file()
selected_files = st.multiselect("取り込みファイルを選択", sandbox_files)
- Drag & Drop で複数ファイル対応
- 選択したファイルを前述の
emb_app.add()
に渡します
5-5. チャット機能
最後に、質問→回答のUI部分。
st.subheader("チャット")
if not st.session_state.openai_api_key:
st.warning("OpenAI API キーが設定されていません。")
return
os.environ["OPENAI_API_KEY"] = st.session_state.openai_api_key
emb_app = st.session_state.emb_app
query = st.text_input("質問を入力", key='query_input')
if st.button("送信"):
with st.spinner("回答を生成中…"):
answer = emb_app.query(query, citations=False, max_tokens=4000)
st.markdown("### 回答")
st.markdown(answer)
-
st.spinner
で処理中インジケータを表示 - Markdownでリッチに回答をレンダリング
5. デプロイ方法のヒント
-
Streamlit Cloud なら
.streamlit/config.toml
に環境変数を設定してワンクリック公開 -
Docker 化
FROM python:3.10-slim WORKDIR /app COPY . . RUN pip install -r requirements.txt CMD ["streamlit", "run", "app.py", "--server.port", "8501", "--server.address", "0.0.0.0"]
6. おわりに
以上、コードを詳細に解説した privateGPT Streamlit アプリのNote記事例でした。
自社資料や論文PDFをプライベートなChatGPTで手軽に検索・対話する環境構築に、ぜひお役立てください。
ご質問・フィードバックはコメント欄までお気軽にどうぞ!
7. PrivateGPT コード 全文解説 (エンジニア向け)
1. 全体概要
このコードは、Streamlitフレームワークを使用して構築されたWebアプリケーションです。主な目的は、ユーザーがアップロードしたローカルファイル(PDF, TXT, DOCX)またはGoogle Drive上のファイルの内容に基づいて、大規模言語モデル(LLM、ここではOpenAIのモデルを想定)と対話できるようにすることです(Retrieval-Augmented Generation - RAG)。
- UI: Streamlitにより、設定入力、ファイルアップロード、データソース選択、チャットインターフェースを提供します。
-
コア機能:
embedchain
ライブラリを利用して、ドキュメントの読み込み、チャンキング、ベクトル化、保存(内部でChromaDBなどを使用)、そして質問に対する関連情報の検索とLLMへのコンテキスト提供を行います。 -
データソース: ローカルの
sandbox
ディレクトリ、またはGoogle Driveを選択できます。 - 認証: Google Drive利用時にはOAuth 2.0認証が必要です。
-
設定: OpenAI APIキーとGoogle OAuth認証情報はUI経由で設定・保存(ローカル環境では
.env
ファイル、Streamlit Cloudではセッション変数)できます。 - 環境対応: 特にStreamlit Cloudでの実行を考慮し、SQLite3のバージョン問題を自動修正する仕組みが含まれています。
2. 依存ライブラリの役割
-
streamlit
: Webアプリケーションフレームワーク。UI要素の作成、状態管理 (st.session_state
)、インタラクティブな操作を実現。 -
python-dotenv
:.env
ファイルから環境変数を読み込む。APIキーなどの機密情報をコードから分離。 -
embedchain
: RAGパイプラインを簡単に構築するための高レベルライブラリ。データソースの追加、インデックス作成、クエリ処理を抽象化。 -
google-auth-oauthlib
&google-api-python-client
: Google API(ここではDrive API v3)へのアクセスとOAuth 2.0認証処理に使用。 -
pysqlite3-binary
:embedchain
が内部で使用するChromaDB
が要求するSQLite 3.35.0以上を提供するためのバイナリパッケージ。標準のsqlite3
モジュールを置き換える目的で使用。 -
os
,shutil
,sys
,subprocess
,importlib
,json
,traceback
,logging
,tempfile
,asyncio
: Python標準ライブラリ。ファイル操作、システム操作、非同期処理、エラーハンドリングなどに使用。
3. コード詳細解説 (main.py
想定)
import os
import shutil # ファイル/ディレクトリ操作 (このコードでは直接使われていないように見えるが、将来的な拡張や依存関係で必要かも)
from dotenv import load_dotenv # .envファイル読み込み
import streamlit as st # Streamlitライブラリ
import asyncio # 非同期処理用 (drive_operations.py で主に利用、main.pyではラッパー経由)
# --- ChromaDB/SQLite3 バージョン問題対応 ---
# embedchain (特に内部のChromaDB) は SQLite 3.35.0 以上を要求する。
# Streamlit Cloudなどの環境では古いバージョンが使われていることがあるため、
# アプリケーション側で新しいバージョン (pysqlite3-binary) をインストールし、
# 実行時に標準のsqlite3モジュールを置き換える。
import sys
import subprocess # 外部プロセス(pip)実行用
import importlib # モジュール情報の取得など (ここでは使われていない)
def fix_sqlite():
"""SQLite3のバージョンをチェックし、必要であればpysqlite3-binaryをインストールして置き換える関数"""
try:
import sqlite3
# 現在のSQLite3バージョンを取得
if sqlite3.sqlite_version_info < (3, 35, 0):
st.warning(f"現在のSQLite3バージョンは{sqlite3.sqlite_version}です。ChromaDBには3.35.0以上が必要です。")
st.info("pysqlite3-binaryをインストールしています...")
# pipコマンドを使ってpysqlite3-binaryをインストール
# sys.executable: 現在実行中のPythonインタプリタのパス
# -m pip: pipモジュールを実行
# --quiet: インストール中の出力を抑制
# --disable-pip-version-check: pipのバージョンチェックを無効化
subprocess.check_call([
sys.executable, "-m", "pip", "install",
"pysqlite3-binary", "--quiet", "--disable-pip-version-check"
])
# pysqlite3をインポート (これでモジュールがメモリにロードされる)
__import__("pysqlite3")
# sys.modules はロード済みのモジュールを保持する辞書
# 標準の 'sqlite3' のエントリを、ロードした 'pysqlite3' モジュールで上書きする
# これにより、以降 'import sqlite3' すると、実際にはpysqlite3がロードされるようになる (Monkey Patching)
sys.modules["sqlite3"] = sys.modules.pop("pysqlite3")
# 置き換え後のバージョンを確認して表示
import sqlite3
st.success(f"SQLite3バージョンが{sqlite3.sqlite_version}に更新されました。")
except Exception as e:
st.error(f"SQLite3の更新中にエラーが発生しました: {e}")
st.info("代替の方法でアプリを実行します。一部の機能が制限される可能性があります。")
# Streamlit Cloud環境変数 'IS_STREAMLIT_CLOUD' が 'true' の場合に実行
if os.environ.get("IS_STREAMLIT_CLOUD", "false").lower() == "true":
fix_sqlite()
# Docker環境でないローカル環境でも試みる (エラーは無視)
elif not os.path.exists("/.dockerenv"):
try:
fix_sqlite()
except:
pass # ローカルでの失敗は致命的ではない可能性を考慮
# --- embedchain のインポートとエラーハンドリング ---
try:
# embedchain のコアクラスをインポート
from embedchain import App
except ImportError as e:
# embedchainがインストールされていない、または他の依存関係の問題でインポートできない場合
st.error(f"embedchainライブラリのインポートに失敗しました: {e}")
st.info("アプリの一部機能が利用できません。")
# アプリケーションが完全に停止しないように、最低限のインターフェースを持つダミークラスを定義
# これにより、App()やadd(), query()を呼び出すコードでAttributeErrorが発生するのを防ぐ
class App:
def __init__(self, *args, **kwargs):
pass
def add(self, *args, **kwargs):
st.error("embedchainライブラリが利用できないため、ドキュメントを追加できません。")
return False
def query(self, *args, **kwargs):
st.error("embedchainライブラリが利用できないため、クエリを実行できません。")
return "エラー: embedchainライブラリが利用できません。システム管理者に連絡してください。"
# --- Google Drive API関連 ---
from google_auth_oauthlib.flow import Flow, InstalledAppFlow # OAuth 2.0 フロー管理
from googleapiclient.discovery import build # Google APIクライアント構築
# utilities.py や drive_operations.py から関数をインポート (想定)
from utilities import refresh_file # sandbox内のファイルリスト更新用 (実装は別途必要)
from drive_operations import upload_file_to_drive, fetch_files_from_drive # Drive操作用
import json # JSON操作 (認証情報など)
import traceback # エラー時のスタックトレース表示用
# --- 環境変数読み込み ---
load_dotenv() # .env ファイルがあれば環境変数をロード
# --- Google Drive API スコープ定義 ---
# アプリケーションが必要とするGoogle Driveへのアクセス権限を指定
SCOPES = [
'https://www.googleapis.com/auth/drive.file', # アプリが作成したファイルの読み書き権限
'https://www.googleapis.com/auth/drive.readonly' # Drive内のすべてのファイルの読み取り専用権限
]
# --- Google OAuth 認証情報のシリアライズ/デシリアライズ ---
# google.oauth2.credentials.Credentials オブジェクトは直接JSONシリアライズできないため、
# 必要な属性を辞書に変換して st.session_state に保存し、必要時に復元するためのヘルパー関数。
def creds_to_dict(creds):
"""Credentialsオブジェクトを辞書に変換"""
return {
'token': creds.token,
'refresh_token': creds.refresh_token,
'token_uri': creds.token_uri,
'client_id': creds.client_id,
'client_secret': creds.client_secret,
'scopes': creds.scopes
}
from google.oauth2.credentials import Credentials # Credentialsクラス
def dict_to_creds(d):
"""辞書からCredentialsオブジェクトを復元"""
return Credentials(**d) # 辞書のキーを引数名としてCredentialsを初期化
# --- Streamlit セッション状態の初期化 ---
def initialize_session_state():
"""Streamlitのセッション状態に必要な変数がなければ初期化する"""
# st.session_state は、ユーザーセッション間で変数を保持するための辞書ライクなオブジェクト
# アプリが再実行されても値が維持される (ブラウザタブを閉じるまで)
if 'openai_api_key' not in st.session_state:
# 環境変数 'OPENAI_API_KEY' があればそれを、なければ空文字列を初期値とする
st.session_state.openai_api_key = os.getenv("OPENAI_API_KEY", "")
if 'google_client_id' not in st.session_state:
st.session_state.google_client_id = os.getenv("CLIENT_ID", "")
if 'google_client_secret' not in st.session_state:
st.session_state.google_client_secret = os.getenv("CLIENT_SECRET", "")
if 'google_redirect_uri' not in st.session_state:
# デフォルトはローカル開発用の 'http://localhost:8501'
st.session_state.google_redirect_uri = os.getenv("REDIRECT_URI", "http://localhost:8501")
if 'show_settings' not in st.session_state:
# 設定UIの表示/非表示フラグ
st.session_state.show_settings = False
if 'auth_flow' not in st.session_state:
# Google OAuthの認証フローオブジェクトを保持 (認証プロセス中のみ)
st.session_state.auth_flow = None
# embedchain App インスタンスもセッション状態で保持
if 'emb_app' not in st.session_state:
try:
# OpenAI APIキーが設定されていれば初期化を試みる
if st.session_state.openai_api_key:
os.environ["OPENAI_API_KEY"] = st.session_state.openai_api_key
st.session_state.emb_app = App() # embedchainアプリのインスタンスを作成
except Exception as e:
# 初期化失敗時はNoneを入れておくか、エラー処理
st.session_state.emb_app = None
st.warning(f"Embedchain Appの初期化に失敗しました: {e}")
# --- 設定の保存処理 ---
def save_settings_to_env():
"""現在のセッション状態の設定値を .env ファイルと環境変数に保存・反映する"""
# セッション状態から設定値を取得して .env ファイルの内容を生成
env_content = f"""OPENAI_API_KEY={st.session_state.openai_api_key}
CLIENT_ID={st.session_state.google_client_id}
CLIENT_SECRET={st.session_state.google_client_secret}
REDIRECT_URI={st.session_state.google_redirect_uri}"""
try:
# ローカル環境であれば .env ファイルに書き込む
# Streamlit Cloudなど書き込み権限がない環境では失敗する可能性がある
with open('.env', 'w') as f:
f.write(env_content)
except Exception as e:
# Streamlit Cloudなどでは書き込めないので警告を表示
st.warning("環境設定ファイルの保存に失敗しました。Streamlit Cloudでは設定は一時的にのみ保持されます。")
print(f"Error saving .env file: {e}")
# 現在の実行プロセスにおける環境変数にも直接設定
# これにより、再起動なしに新しい設定が一部のライブラリで利用可能になる
os.environ["OPENAI_API_KEY"] = st.session_state.openai_api_key
os.environ["CLIENT_ID"] = st.session_state.google_client_id
os.environ["CLIENT_SECRET"] = st.session_state.google_client_secret
os.environ["REDIRECT_URI"] = st.session_state.google_redirect_uri
# load_dotenv(override=True) で環境変数を強制的に再読み込み
# これにより、os.getenv() などを使うコードで新しい値が反映される
load_dotenv(override=True)
# --------------------------------------------------------------------------
# !!! 注意: 以下の sys.modules ループは高度なテクニックであり、リスクも伴う !!!
# --------------------------------------------------------------------------
# 既にインポート済みのモジュールが内部で os.environ をコピーしている場合、
# 上記の os.environ 更新だけでは反映されない可能性がある。
# そのため、ロード済みの全モジュールを走査し、os.environ 属性があれば更新を試みる。
# これは、ライブラリの実装に依存するため、必ずしも期待通りに動くとは限らない。
# また、意図しない副作用を生む可能性もあるため、慎重に使用する必要がある。
import sys
for module in list(sys.modules.values()): # list()でコピーを取るのは、ループ中に辞書が変更される可能性への配慮
# モジュールが存在し、かつ 'os' 属性を持ち、その 'os' が 'environ' 属性を持つかチェック
if module and hasattr(module, 'os') and hasattr(module.os, 'environ'):
try:
# モジュール内の os.environ を更新
module.os.environ.update({
"OPENAI_API_KEY": st.session_state.openai_api_key,
"CLIENT_ID": st.session_state.google_client_id,
"CLIENT_SECRET": st.session_state.google_client_secret,
"REDIRECT_URI": st.session_state.google_redirect_uri
})
except Exception as e:
# 辞書でないなど、更新できない場合のエラー処理
print(f"Error updating environment variables in module {getattr(module, '__name__', 'unknown')}: {e}")
# --------------------------------------------------------------------------
st.success("設定が保存され、環境変数に反映されました。")
# --- 設定UIの表示 ---
def settings_section():
"""サイドバーに設定セクションを表示する"""
st.sidebar.title("設定")
# 設定項目の表示/非表示を切り替えるボタン
if st.sidebar.button("設定を表示/非表示"):
st.session_state.show_settings = not st.session_state.show_settings
# st.session_state.show_settings が True の場合に設定項目を表示
if st.session_state.show_settings:
# st.sidebar.expander: 折りたたみ可能なセクションを作成
with st.sidebar.expander("API キー設定", expanded=True):
# APIキーやシークレットは type="password" で入力内容を隠す
st.session_state.openai_api_key = st.text_input(
"OpenAI API キー",
value=st.session_state.openai_api_key,
type="password"
)
st.session_state.google_client_id = st.text_input(
"Google Client ID",
value=st.session_state.google_client_id,
type="password"
)
st.session_state.google_client_secret = st.text_input(
"Google Client Secret",
value=st.session_state.google_client_secret,
type="password"
)
st.session_state.google_redirect_uri = st.text_input(
"Google Redirect URI",
value=st.session_state.google_redirect_uri
)
# 設定保存ボタン
if st.button("設定を保存"):
save_settings_to_env() # 設定保存関数を呼び出し
# Google認証情報が変更された可能性があるので、
# 保存されている認証情報 ('creds') と認証フロー ('auth_flow') を削除
if 'creds' in st.session_state:
del st.session_state['creds']
if 'auth_flow' in st.session_state:
del st.session_state['auth_flow']
# 設定を反映させるためにアプリを再実行(ページリロードに近い効果)
st.experimental_rerun()
# --- メインアプリケーションロジック ---
def main():
"""アプリケーションのメイン関数"""
# ページのタイトルとレイアウトを設定
st.set_page_config(page_title="Private ChatGPT", layout="wide")
st.title("Private ChatGPT on Local Data or Google Drive")
# セッション状態の初期化(起動時に必ず実行)
initialize_session_state()
# サイドバーに設定セクションを表示
settings_section()
# サイドバーにラジオボタンでモード選択UIを表示
mode = st.sidebar.radio("データソース", ("ローカルデータ", "Google Drive"))
# 選択されたモードに応じて対応する関数を実行
if mode == "Google Drive":
drive_mode()
else:
local_mode()
# --- Google Drive モードの処理 ---
def drive_mode():
"""Google Driveをデータソースとする場合の処理"""
# st.session_state に 'creds' (認証情報) が存在しない場合は、認証プロセスを実行
if 'creds' not in st.session_state:
st.info("まずは Google Drive 認証を行ってください。")
# Google認証情報が設定されているか確認
if not st.session_state.google_client_id or not st.session_state.google_client_secret:
st.warning("Google OAuth 認証情報を設定してください。")
return # 設定がない場合は処理中断
# 認証方法を選択 (Webブラウザ経由 or 認証コード手動入力)
auth_method = st.radio(
"認証方法を選択してください",
["Webブラウザ認証", "認証コード入力"]
)
# 「認証開始」ボタンが押されたら
if st.button("認証開始"):
try:
# Google OAuth クライアント設定を構築
# 'installed' キーは主にローカルアプリ用だが、'web' キーでも代用可能な場合がある
# リダイレクトURIは後でフローオブジェクト作成時に指定する
client_config = {
'installed': {
'client_id': st.session_state.google_client_id,
'client_secret': st.session_state.google_client_secret,
'redirect_uris': ['urn:ietf:wg:oauth:2.0:oob', 'http://localhost'], # ダミー/フォールバック
'auth_uri': 'https://accounts.google.com/o/oauth2/auth',
'token_uri': 'https://oauth2.googleapis.com/token'
}
}
if auth_method == "Webブラウザ認証":
# Webサーバーアプリケーション用のフローを使用
redirect_uri = st.session_state.google_redirect_uri # 設定されたリダイレクトURIを使用
flow = Flow.from_client_config(
# client_config は 'web' キーを持つ辞書を期待する
client_config={
'web': {
'client_id': st.session_state.google_client_id,
'client_secret': st.session_state.google_client_secret,
'redirect_uris': [redirect_uri], # 実際に使うリダイレクトURI
'auth_uri': 'https://accounts.google.com/o/oauth2/auth',
'token_uri': 'https://oauth2.googleapis.com/token'
}
},
scopes=SCOPES, # 要求する権限スコープ
redirect_uri=redirect_uri # コールバックを受け取るURI
)
# 認証URLを生成
# access_type='offline': リフレッシュトークンを要求 (長期間アクセスに必要)
# prompt='consent': 毎回ユーザーに同意を求める (リフレッシュトークン取得に推奨)
# include_granted_scopes='true': 既に許可済みのスコープも含めて同意画面に表示
auth_url, _ = flow.authorization_url(
access_type='offline',
prompt='consent',
include_granted_scopes='true'
)
# 生成したフローオブジェクトをセッション状態で保持 (コールバック/コード入力処理で使用)
st.session_state.auth_flow = flow
# 認証用リンクを表示
st.write(f"[認証用リンクを開く]({auth_url})")
st.info("上記リンクをクリックして認証を行ってください。認証後、指定されたリダイレクトURIにリダイレクトされます。")
else: # 認証コード入力方式
# Installed Application (またはOOB - Out Of Band) フローに近い方式
# redirect_uri を指定しないか、'urn:ietf:wg:oauth:2.0:oob' を使うのが一般的だが、
# ここではWebフローを流用し、ユーザーが手動でコードをコピー&ペーストする想定
flow = Flow.from_client_config(
client_config={ # installed キーでも動くかもしれないが、webで統一
'web': {
'client_id': st.session_state.google_client_id,
'client_secret': st.session_state.google_client_secret,
# OOB と 設定されたリダイレクトURIの両方を許可リストに入れておくのが無難
'redirect_uris': ['urn:ietf:wg:oauth:2.0:oob', st.session_state.google_redirect_uri],
'auth_uri': 'https://accounts.google.com/o/oauth2/auth',
'token_uri': 'https://oauth2.googleapis.com/token'
}
},
scopes=SCOPES
# redirect_uri は authorization_url生成時に指定しない (OOB扱い or デフォルト)
)
auth_url, _ = flow.authorization_url(
access_type='offline',
prompt='consent',
include_granted_scopes='true'
# redirect_uriを指定しない場合、デフォルトでOOB(認証コード表示ページ)になることを期待
)
st.session_state.auth_flow = flow
st.write(f"[認証用リンクを開く]({auth_url})")
st.info("上記リンクをクリックして認証を行い、表示される認証コードをコピーしてください。")
except Exception as e:
st.error(f"認証プロセスでエラーが発生しました: {str(e)}")
st.write("詳細なエラー情報:")
st.code(traceback.format_exc()) # エラーの詳細なスタックトレースを表示
# 認証フローオブジェクトがセッション状態に存在する場合 (認証URL生成後)
if 'auth_flow' in st.session_state and st.session_state.auth_flow:
# 認証コード入力欄を表示
code = st.text_input("認証コードをここに貼り付け", key='drive_code')
if code: # コードが入力されたら
try:
# 入力された認証コードを使ってトークンを取得
# このメソッド内でGoogleのトークンエンドポイントにリクエストが送られる
st.session_state.auth_flow.fetch_token(code=code)
# 取得成功後、flowオブジェクトの credentials 属性に認証情報が格納される
creds = st.session_state.auth_flow.credentials
# 認証情報を辞書化してセッション状態に保存
st.session_state.creds = creds_to_dict(creds)
st.success("認証に成功しました!")
# 認証プロセス中に使った flow オブジェクトは不要なので削除
del st.session_state.auth_flow
# 認証が完了したので、画面を再描画して認証済み状態のUIを表示
st.experimental_rerun()
except Exception as e:
st.error(f"認証コードの処理でエラーが発生しました: {str(e)}")
st.write("詳細なエラー情報:")
st.code(traceback.format_exc())
else: # --- 認証済みの場合 ---
try:
# セッション状態から認証情報を辞書として取得し、Credentialsオブジェクトに復元
creds = dict_to_creds(st.session_state.creds)
# リフレッシュトークンを使ってアクセストークンを更新する必要があるかチェックし、必要なら更新
# google-authライブラリが自動で処理してくれることが多いが、明示的に行う場合は以下
# if creds.expired and creds.refresh_token:
# from google.auth.transport.requests import Request
# creds.refresh(Request())
# st.session_state.creds = creds_to_dict(creds) # 更新された認証情報を保存し直す
# 認証情報を使ってGoogle Drive API v3 のサービスオブジェクトを構築
service = build('drive', 'v3', credentials=creds)
st.subheader("Drive ファイル一覧")
# Driveからファイルリストを取得 (drive_operations.py内の関数)
files = fetch_files_from_drive(service)
if files:
# 取得したファイル名とMIMEタイプを表示
for f in files:
st.write(f"{f['name']} ({f['mimeType']})")
else:
st.info("アクセス可能なファイルがありません。")
# DriveへのファイルアップロードUI
uploaded = st.file_uploader("Drive にアップロード", type=['pdf','txt','docx'])
if uploaded:
# --- 非同期関数を同期的に呼び出すラッパー ---
# Streamlitは基本的に同期的に動作するため、asyncioで書かれた関数を
# 直接呼び出すには工夫が必要。ここでは新しいイベントループを作成して実行する。
def sync_upload_to_drive(service, file):
loop = asyncio.new_event_loop() # 新しいイベントループを作成
asyncio.set_event_loop(loop) # 現在のスレッドのイベントループとして設定
try:
# loop.run_until_complete() で非同期関数を実行し、完了するまで待機
return loop.run_until_complete(upload_file_to_drive(service, file))
finally:
loop.close() # イベントループを閉じる
# 同期ラッパー経由でアップロード関数を実行
file_id = sync_upload_to_drive(service, uploaded)
st.success(f"ファイルがアップロードされました (ID: {file_id})")
# TODO: アップロード後、ファイルリストを再読み込みして表示を更新するのが親切
# 認証済みなので、チャットセクションを表示 (Driveモードで)
chat_section(creds=creds, mode='drive')
except Exception as e:
# API呼び出し中にエラーが発生した場合 (例: トークン失効、権限不足など)
st.error(f"Google Drive接続エラー: {str(e)}")
st.write("認証情報が無効になっているか、期限切れの可能性があります。再認証を行ってください。")
# 認証情報をクリアして、再認証を促す
if 'creds' in st.session_state:
del st.session_state['creds']
if st.button("再認証"):
st.experimental_rerun()
# --- ローカルデータモードの処理 ---
def local_mode():
"""ローカルファイルをデータソースとする場合の処理"""
st.subheader("ローカルデータモード")
# ファイルをアップロード・保存する 'sandbox' ディレクトリを作成 (存在すれば何もしない)
try:
os.makedirs('sandbox', exist_ok=True)
except Exception as e:
# Streamlit Cloudなど書き込み権限がない場合のエラー処理
st.warning("sandboxディレクトリの作成に失敗しました。Streamlit Cloudではファイルアップロード機能が制限される場合があります。")
print(f"Error creating sandbox directory: {e}")
# ローカルファイルアップロードUI (sandboxディレクトリ向け)
# type: 受け入れるファイル拡張子を指定
uploaded_local = st.file_uploader("ローカルにアップロード (sandbox)", type=['pdf','txt','docx'])
if uploaded_local:
try:
# 保存パスを構築 (sandbox/ファイル名)
save_path = os.path.join('sandbox', uploaded_local.name)
# アップロードされたファイルをバイナリ書き込みモード ('wb') で保存
with open(save_path, 'wb') as f:
# uploaded_local は Streamlit の UploadedFile オブジェクト
# getbuffer() でファイル内容をメモリ上のバイトバッファとして取得
f.write(uploaded_local.getbuffer())
st.success(f"{uploaded_local.name} を sandbox に保存しました。")
# ファイルリストを更新するためにセッション状態のカウンターを変更 (少しトリッキーな方法)
# これにより Streamlit が差分を検知し、UIが再描画されやすくなる
if 'last_refresh' not in st.session_state:
st.session_state.last_refresh = 0
st.session_state.last_refresh += 1
# st.experimental_rerun() を呼ぶ方が確実かもしれない
except Exception as e:
st.error(f"ファイルの保存中にエラーが発生しました: {str(e)}")
st.info("Streamlit Cloudでは一時的なファイル保存のみが可能です。")
# sandbox ディレクトリ内のファイルリストを取得 (utilities.refresh_file() を想定)
# (例: def refresh_file(): return os.listdir('sandbox'))
try:
sandbox_files = refresh_file()
# ファイルが1つ以上存在する場合のみ選択UIを表示
if sandbox_files and len(sandbox_files) > 0:
# 複数ファイルを選択できるUI (st.multiselect)
selected = st.multiselect("対話に使用するファイルを選択", sandbox_files)
if selected: # ファイルが選択されたら
# チャットセクションを表示 (ローカルモード、選択されたファイルリストを渡す)
chat_section(selected_files=selected, mode='local')
else:
st.info("sandboxディレクトリにファイルをアップロードしてください。")
except Exception as e:
st.error(f"ファイル一覧の取得中にエラーが発生しました: {str(e)}")
st.info("Streamlit Cloudでは、セッションごとに新しいファイルをアップロードする必要があります。")
# --- チャット処理セクション ---
def chat_section(creds=None, selected_files=None, mode='local'):
"""データソースに基づいてチャット応答を行うUIセクション"""
st.subheader("チャット")
# OpenAI APIキーが設定されているか確認
if not st.session_state.openai_api_key:
st.warning("OpenAI API キーを設定してください。")
return
# 現在のセッションの環境変数にOpenAI APIキーを設定 (embedchainが内部で参照するため)
os.environ["OPENAI_API_KEY"] = st.session_state.openai_api_key
# embedchain アプリケーションインスタンスを取得/初期化
try:
# セッション状態に保存されたインスタンスを使う
if 'emb_app' not in st.session_state or st.session_state.emb_app is None:
# もし未初期化なら、ここで再度初期化を試みる
st.session_state.emb_app = App()
emb_app = st.session_state.emb_app
if emb_app is None: # それでも初期化失敗している場合
st.error("Embedchain Appの初期化に失敗しています。設定を確認してください。")
return
except Exception as e:
st.error(f"Embedchain Appの取得/初期化中にエラーが発生しました: {str(e)}")
st.info("OpenAI API キーが正しく設定されているか確認してください。")
return
# --- データソースをembedchainに追加 ---
# この部分は、モードやファイル選択が変わるたびに再実行される可能性がある。
# embedchain側で重複追加をある程度ハンドリングしてくれることを期待するか、
# 追加済みファイルリストを st.session_state で管理するなどの工夫が必要になる場合がある。
try:
if mode == 'local' and selected_files:
# ローカルモード: 選択されたファイルをループして追加
for f in selected_files:
try:
file_path = os.path.join('sandbox', f)
if not os.path.exists(file_path):
st.warning(f"ファイル {f} が見つかりません。")
continue
# emb_app.add() でファイルパスとデータタイプを指定して追加
# data_type は embedchain がファイル形式を認識するために重要
# (例: 'pdf_file', 'web_page', 'youtube_video', 'docx' など)
# ここでは 'pdf_file' を仮定しているが、拡張子を見て動的に変更するのが望ましい
# 例: data_type = 'docx' if f.endswith('.docx') else 'pdf_file' if f.endswith('.pdf') else 'text'
emb_app.add(source=file_path, data_type='pdf_file') # 要ファイルタイプ判定改善
st.info(f"'{f}' を処理対象に追加しました。") # ユーザーへのフィードバック
except Exception as e:
# ファイルごとのエラー処理
st.warning(f"ファイル '{f}' の処理中にエラーが発生しました: {str(e)}")
if mode == 'drive' and creds:
# Google Driveモード: 認証情報を使ってファイルリストを取得し、追加
try:
service = build('drive', 'v3', credentials=creds)
# Driveからファイルリストを取得
files = fetch_files_from_drive(service) # 必要ならMIMEタイプでフィルタリング
if files:
for f in files:
try:
# Google Drive のファイルはファイルIDを source として指定
# data_type='google_drive' または 'google_drive_file' などを指定 (embedchainのバージョンによる)
emb_app.add(source=f['id'], data_type='google_drive_file') # 要確認
st.info(f"Driveファイル '{f['name']}' を処理対象に追加しました。")
except Exception as e:
st.warning(f"Driveファイル '{f['name']}' (ID: {f['id']}) の処理中にエラーが発生しました: {str(e)}")
else:
st.info("処理対象のGoogle Driveファイルがありません。")
except Exception as e:
st.error(f"Google Driveファイルの取得中にエラーが発生しました: {str(e)}")
st.code(traceback.format_exc())
except Exception as e:
# データソース追加全体の予期せぬエラー
st.error(f"ファイル処理中に予期せぬエラーが発生しました: {str(e)}")
st.code(traceback.format_exc())
# --- 質問応答 UI ---
# 質問入力用のテキストボックス
# key='query' を指定することで、再実行後も入力内容を保持できる場合がある
query = st.text_input("質問を入力", key='query')
# 送信ボタンが押され、かつ質問が入力されている場合
if st.button("送信") and query:
try:
# 処理中であることを示すスピナーを表示
with st.spinner("回答を生成中..."):
# embedchain に質問を投げる
# query() メソッドが RAG のコア処理を実行
# 1. 質問文をベクトル化
# 2. ベクトルDB内で類似度の高いドキュメントチャンクを検索
# 3. 検索結果と質問文をプロンプトに組み込み、LLM (OpenAI) に送信
# 4. LLMからの回答を受け取る
# citations=False: 回答の根拠となったソースを表示しない
# max_tokens=4000: OpenAI APIに渡す最大トークン数を指定 (embedchainのデフォルト制限を回避)
answer = emb_app.query(query, citations=False, max_tokens=4000)
st.markdown("### 回答")
# 回答をマークダウン形式で表示 (改行などが反映される)
st.markdown(answer)
except Exception as e:
# クエリ処理中のエラー (APIキー無効、クレジット不足、ネットワークエラーなど)
st.error(f"クエリ処理中にエラーが発生しました: {str(e)}")
st.write("詳細なエラー情報:")
st.code(traceback.format_exc())
st.info("OpenAI API キーが正しく設定されているか、また十分なクレジットがあるか確認してください。")
# --- スクリプト実行のエントリポイント ---
if __name__ == "__main__":
# Pythonスクリプトが直接実行された場合に main() 関数を呼び出す
main()
4. コード詳細解説 (drive_operations.py
想定)
from functools import lru_cache # Least Recently Used キャッシュデコレータ
from googleapiclient.discovery import build # APIクライアント構築 (main.pyでも使用)
from googleapiclient.http import MediaFileUpload # ファイルアップロード用メディアオブジェクト
# utilities.py から定数インポート (想定)
# (例: utilities.py に MAX_DEPTH = 5 のような定義がある)
from utilities import MAX_DEPTH
import logging # ログ出力用
import tempfile # 一時ファイル作成用
import asyncio # 非同期処理用
import os # ファイルパス操作、ファイル削除用
# --- ロギング設定 ---
logging.basicConfig(level=logging.INFO) # INFOレベル以上のログを出力
# --- Google Drive ファイルリスト取得 ---
def fetch_files_from_drive(service, folder_id='root', file_types=None):
"""
指定されたフォルダID内のファイルリストをGoogle Drive APIから取得する。
Args:
service: Google Drive APIサービスオブジェクト
folder_id: 検索対象のフォルダID (デフォルトは 'root')
file_types: フィルタリングするMIMEタイプのリスト (例: ['application/pdf', 'text/plain'])
Returns:
ファイルのメタデータ (id, name, mimeTypeなど) のリスト
"""
# Google Drive API の検索クエリを作成
# '{folder_id}' in parents: 指定フォルダ直下のアイテム
# trashed=false: ゴミ箱に入っていないもの
query = f"'{folder_id}' in parents and trashed=false"
# MIMEタイプによるフィルタが指定されている場合、クエリに追加
if file_types:
# 各MIMEタイプに対するクエリ部分を作成
type_queries = [f"mimeType='{mime_type}'" for mime_type in file_types]
# 'or' で結合 (例: "mimeType='application/pdf' or mimeType='text/plain'")
type_query = " or ".join(type_queries)
query += f" and ({type_query})" # 元のクエリに AND で結合
results = [] # 結果を格納するリスト
page_token = None # ページネーショントークン (初回はNone)
# APIが nextPageToken を返す限りループして全ページを取得
while True:
# files().list() メソッドでファイルリストをリクエスト
response = service.files().list(
q=query, # 検索クエリ
spaces='drive', # 検索対象スペース (通常 'drive')
# 取得するフィールドを指定 (必要なものだけに絞ると効率的)
fields='nextPageToken, files(id, name, mimeType, size, createdTime)',
pageToken=page_token # 前回のレスポンスから取得したトークン (初回はNone)
).execute() # APIリクエストを実行
# レスポンスからファイルリスト部分を取得して results に追加
results.extend(response.get('files', []))
# 次ページのトークンを取得 (なければNone)
page_token = response.get('nextPageToken', None)
# nextPageToken がなければ全ページ取得完了なのでループを抜ける
if not page_token:
break
return results
# --- フォルダ内容取得 (キャッシュ付き) ---
@lru_cache(maxsize=None) # 関数の結果をキャッシュ (引数が同じなら再計算せずキャッシュを返す)
def fetch_folder_contents(service, folder_id):
"""指定フォルダID直下のアイテム(ファイルとサブフォルダ)のID, MIMEタイプ, サイズを取得 (キャッシュ有効)"""
items = []
query = f"'{folder_id}' in parents and trashed=false"
page_token = None
while True:
# fields を絞り、サイズ情報も取得
response = service.files().list(
q=query,
corpora="user", # 自分のドライブ内のファイルが対象
fields="nextPageToken, files(id, mimeType, size)", # サイズも取得
pageToken=page_token
).execute()
items.extend(response.get("files", []))
page_token = response.get('nextPageToken', None)
if page_token is None:
break
return items
# --- フォルダ内容の再帰的取得 (非同期) ---
async def get_folder_contents(service, folder_id, current_depth=0, websocket=None):
"""
指定フォルダ以下の全ファイル/フォルダを再帰的に探索し、合計サイズ、ファイル数、サブフォルダ数を計算。
非同期処理で実装され、オプションでWebSocketに進捗を通知できる。
Args:
service: Google Drive APIサービスオブジェクト
folder_id: 起点となるフォルダID
current_depth: 現在の再帰深度
websocket: 進捗通知用のWebSocketオブジェクト (オプション)
Returns:
(total_size, total_files, total_subfolders) のタプル
"""
total_size = 0
total_files = 0
total_subfolders = 0
# 設定された最大深度を超えたら再帰を停止
if current_depth > MAX_DEPTH:
logging.warning(f"Max depth {MAX_DEPTH} reached in folder {folder_id}.")
return total_size, total_files, total_subfolders
# キャッシュ付き関数でフォルダ直下のアイテムを取得
items = fetch_folder_contents(service, folder_id)
# 進捗ログ/通知
text = f"Fetched {len(items)} items from folder {folder_id} at depth {current_depth}."
logging.info(text)
if websocket: # WebSocketが指定されていれば
await websocket.send_text(text) # テキストメッセージを送信 (非同期)
await asyncio.sleep(0.1) # 短い待機 (UI更新などのため)
# 取得したアイテムをループ処理
for item in items:
# MIMEタイプがフォルダでない場合 (ファイルの場合)
if item["mimeType"] != "application/vnd.google-apps.folder":
total_files += 1
# size属性が存在すれば加算 (Google Docsなどはサイズがない場合がある)
total_size += int(item.get("size", 0))
else: # MIMEタイプがフォルダの場合
# 再帰的にサブフォルダの内容を取得 (非同期呼び出し)
subfolder_size, subfolder_files, subfolder_subfolders = await get_folder_contents(
service, item["id"], current_depth + 1, websocket
)
# サブフォルダの結果を現在の合計に加算
total_files += subfolder_files
total_size += subfolder_size
# サブフォルダ自体 + その下のサブフォルダ数
total_subfolders += 1 + subfolder_subfolders
return total_size, total_files, total_subfolders
# --- Google Driveへのファイルアップロード (非同期) ---
async def upload_file_to_drive(service, file, websocket=None):
"""
StreamlitのUploadedFileオブジェクトをGoogle Driveにアップロードする非同期関数。
Args:
service: Google Drive APIサービスオブジェクト
file: StreamlitのUploadedFileオブジェクト
websocket: 進捗通知用のWebSocketオブジェクト (オプション)
Returns:
アップロードされたファイルのID
"""
# StreamlitのUploadedFileは直接ファイルパスとして扱えないため、
# 一時ファイルに内容を書き出す必要がある。
# delete=False: withブロックを抜けてもファイルが自動削除されないようにする (後で手動削除)
temp_file = tempfile.NamedTemporaryFile(delete=False)
try:
with temp_file as buffer:
# UploadedFileオブジェクトの file 属性 (BytesIOライク) から読み込み、一時ファイルに書き込む
buffer.write(file.file.read()) # ここは file.read() の方が適切かも (UploadedFileの仕様による)
# file.getbuffer() も使える場合がある
# アップロードするファイルのメタデータ (ファイル名など) を定義
file_metadata = {"name": file.filename}
# MediaFileUploadオブジェクトを作成
# 第1引数: アップロードするファイルのパス (一時ファイルのパス)
# mimetype: ファイルのMIMEタイプ (UploadedFileオブジェクトから取得)
# resumable=True: 大きなファイルの場合、再開可能なアップロードを行う (デフォルトFalseかも)
media = MediaFileUpload(temp_file.name, mimetype=file.content_type, resumable=True)
# service.files().create() でアップロードを実行
# body: ファイルメタデータ
# media_body: ファイル内容 (MediaFileUploadオブジェクト)
# fields="id": レスポンスとしてファイルIDのみを受け取るよう指定 (効率化)
uploaded_file_info = service.files().create(
body=file_metadata,
media_body=media,
fields="id"
).execute() # 同期的に実行される (内部でHTTPリクエスト)
# アップロード完了通知
file_id = uploaded_file_info.get('id')
logging.info(f"File uploaded successfully. File ID: {file_id}")
if websocket:
await websocket.send_text(f"Uploaded file with ID: {file_id}.")
return file_id
finally:
# 一時ファイルを確実に削除する
os.remove(temp_file.name)
5. 補足と注意点
-
utilities.py
: このファイルは提供されていませんが、refresh_file()
(おそらくos.listdir('sandbox')
のような実装) とMAX_DEPTH
(再帰深度の上限を示す定数) が定義されていると想定されます。 -
エラーハンドリング: 各所で
try...except
が使われていますが、さらに細かいエラーハンドリング(特定の例外クラスをキャッチするなど)を追加することで、より堅牢なアプリケーションになります。 -
状態管理: Streamlitの
st.session_state
は便利ですが、複雑な状態を持つアプリでは管理が煩雑になることがあります。状態遷移や依存関係を明確に設計することが重要です。特にembedchain
のデータ追加タイミングと重複管理は注意が必要です。 -
非同期処理:
drive_operations.py
ではasyncio
が使われていますが、main.py
からは同期的なラッパー経由で呼び出されています。Streamlit自体は非同期処理に完全には対応していないため、このような工夫が必要になります。 -
embedchainのデータタイプ:
emb_app.add()
のdata_type
は非常に重要です。ファイルの拡張子などから適切なタイプを動的に判定するロジックを追加すると、より多くのファイル形式に対応できます。 -
セキュリティ: APIキーやクライアントシークレットの管理には最大限の注意を払ってください。
.env
ファイルもリポジトリに含めないように.gitignore
で管理することが基本です。Streamlit Cloudを利用する場合は、StreamlitのSecrets管理機能の利用を検討してください。 -
依存関係のバージョン: 使用しているライブラリ(特に
embedchain
やgoogle-api-python-client
)のバージョンによって、APIの仕様や関数の使い方が変わる可能性があります。requirements.txt
などでバージョンを固定することが推奨されます。
この解説が、コードの理解とさらなる開発の助けになれば幸いです。