こちらの続きです。
上のQ&A botの背後はOpenAI APIなので精度は申し分ないですが、(当たり前ですが)APIコールの都度お金がかかります。そこで、オープンソースLLMに置き換えられないかと思った次第です。
ソースコードはこちらです。
利用させていただいたLLMはこちらinu-ai/dolly-japanese-gpt-1b
です。
環境設定
OpenAI API関連の設定は削除して、以下のLLMとテンプレートを使用しています。
# エンべディング変換を担うLLM
config['hf_embedding_model'] = 'sonoisa/sentence-bert-base-ja-mean-tokens-v2'
# 要約処理を担うLLM
config['hf_chat_model'] = "inu-ai/dolly-japanese-gpt-1b"
# プロンプトテンプレート
config['prompt_template'] = r"<s>\nあなたはDatabricksによって開発された有能なアシスタントであり、指定された文脈に基づいて質問に回答することを得意としており、文脈は文書です。文脈が回答を決定するのに十分な情報を提供しない場合には、わかりませんと言ってください。文脈が質問に適していない場合には、わかりませんと言ってください。文脈から良い回答が見つからない場合には、わかりませんと言ってください。問い合わせが完全な質問になっていない場合には、わからないと言ってください。あなたは同じ言葉を繰り返しません。以下は、文脈を示す文書と、文脈のある質問の組み合わせです。文書を要約することで質問を適切に満たす回答をしなさい。\n[SEP]\n文書:\n{context}\n[SEP]\n質問:\n{question}\n[SEP]\n回答:\n"
config['temperature'] = 0.15
ベクトルDBの構築
ここはエンべディング変換のLLM以外は前回と変わりません。
モデルの構築
ここがガラッと変わりました。前回はOpen AI APIコールをLangChainに組み込んで、さらにカスタムPython関数でラッピングし、mlflow.pyfunc
フレーバーでロギングして終了でした。
今回は、Transformerモデルをダウンロードして、LangChain、カスタムPython関数でラッピングという流れになります。この場合、MLflowの使い方が大きく変わります。
とっても参考になったのはこちらの記事。同じDatabricksのエンジニアによるものでした。ありがとうございます。
Python関数で全ての処理をカバーできるのであれば、前回のアプローチでOKなのですが、Transformerモデルを使う際にはモデルのダウンロードが発生します。このため、ノートブックでモデルを使っている場合には問題ないのですが、モデルサービングのケースでコンテナ化する際にはモデル自体もパッケージングしなくてはなりません。
最初はこれに気づかず、Python関数をロギングすればいいかーとやっていたら、サービングの際には「モデルが見つかりません」となります。当たり前と言えば当たり前。
それでは、どうしたらいいのか。その答えが上の記事です。MLflowにモデルを登録する際に、モデル本体をロードするモジュールも一緒に記録するのです。このモジュールが ローダーモジュール(loader module) です。この記事を読むまで存在を知りませんでした。お恥ずかしい。こちらにちゃんと記載されてます。
ローダーモジュール
カスタムPython関数フレーバーのモデルを記録する際に、カスタム乗りジックを組み込めるのは大きなメリットですが、ローダーモジュールを組み合わせることで、トランスフォーマーモデルのロード処理も一緒に組み込めるようになります。
以下にシンプルな実装を示します。
class HogeModel(mlflow.pyfunc.PythonModel):
# カスタムモデルの実装
class MLflowQABot(mlflow.pyfunc.PythonModel):
def __init__(self, llm, retriever):
self.qabot = QABot(llm, retriever)
def predict(self, inputs):
questions = list(inputs['question'])
# 回答の返却
return [self.qabot.get_answer(q) for q in questions]
def _load_pyfunc(data_path):
device = 0 if torch.cuda.is_available() else -1
tokenizer = AutoTokenizer.from_pretrained(data_path, use_fast=False)
model = AutoModelForCausalLM.from_pretrained(data_path)
pipe = pipeline(
"text-generation", model=model, tokenizer=tokenizer, max_new_tokens=200, device=device
)
llm = HuggingFacePipeline(pipeline=pipe)
return HogeModel(llm)
このローダーモジュールのエントリーポイントは_load_pyfunc(data_path)
です。MLflowで記録されたモデルがロードされる際に、このメソッドが最初にコールされます。引数のdata_path
の値は、MLflowで記録する際に指定する引数data_path
で指定する、モデルファイルの保存パスです。
モデルコンポーネントの保存
この仕組みを活用するために、モデルファイルやベクトルDBを保存しておきます。
from langchain.llms import HuggingFacePipeline
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
# モデルとトークナイザをパイプラインにまとめる
tokenizer = AutoTokenizer.from_pretrained(config['hf_chat_model'], use_fast=False)
model = AutoModelForCausalLM.from_pretrained(config['hf_chat_model'])
pipe = pipeline(
"text-generation", model=model, tokenizer=tokenizer, max_new_tokens=200
)
llm = HuggingFacePipeline(pipeline=pipe)
# モデルアーティファクトを一時的に保存するDatabricksがアクセス可能なクラウドストレージのパス
base_data_path = "/dbfs/tmp/takaaki.yayoi@databricks.com/qabot_open/model_artifact"
pipe.save_pretrained(base_data_path)
# Vector DBを保存
vector_store.save_local(f"{base_data_path}/vector_db")
この仕組みを活用することで、LLMのロード処理も含むカスタムモデルをハンドリングできるようになります。
モデルローダーの実装
以下にここで使用しているローダーモジュールの実装を示します。以前使用したロジックを含めています。デバッグしやすいようにもっとパラメーター化しようとは思います。
import re
import time
import mlflow
import torch
import pandas as pd
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
from langchain.llms import HuggingFacePipeline
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores.faiss import FAISS
from langchain.schema import BaseRetriever
from langchain.prompts import SystemMessagePromptTemplate, HumanMessagePromptTemplate, ChatPromptTemplate
from langchain.prompts.base import BasePromptTemplate
from langchain.prompts import PromptTemplate
from langchain.base_language import BaseLanguageModel
from langchain import LLMChain
class QABot():
template = r"<s>\nあなたはDatabricksによって開発された有能なアシスタントであり、指定された文脈に基づいて質問に回答することを得意としており、文脈は文書です。文脈が回答を決定するのに十分な情報を提供しない場合には、わかりませんと言ってください。文脈が質問に適していない場合には、わかりませんと言ってください。文脈から良い回答が見つからない場合には、わかりませんと言ってください。問い合わせが完全な質問になっていない場合には、わからないと言ってください。あなたは同じ言葉を繰り返しません。以下は、文脈を示す文書と、文脈のある質問の組み合わせです。文書を要約することで質問を適切に満たす回答をしなさい。\n[SEP]\n文書:\n{context}\n[SEP]\n質問:\n{question}\n[SEP]\n回答:\n"
def __init__(self, llm, retriever):
self.llm = llm
self.retriever = retriever
self.prompt = PromptTemplate(template=self.template, input_variables=["context", "question"])
self.qa_chain = LLMChain(llm = self.llm, prompt=self.prompt)
self.abbreviations = { # 置換したい既知の略語
"DBR": "Databricks Runtime",
"ML": "Machine Learning",
"UC": "Unity Catalog",
"DLT": "Delta Live Table",
"DBFS": "Databricks File Store",
"HMS": "Hive Metastore",
"UDF": "User Defined Function"
}
def _is_good_answer(self, answer):
''' 回答が妥当かをチェック '''
result = True # デフォルトのレスポンス
badanswer_phrases = [ # モデルが回答を生成しなかったことを示すフレーズ
"わかりません", "コンテキストがありません", "知りません", "答えが明確でありません", "すみません",
"答えがありません", "説明がありません", "リマインダー", "コンテキストが提供されていません", "有用な回答がありません",
"指定されたコンテキスト", "有用でありません", "適切ではありません", "質問がありません", "明確でありません",
"十分な情報がありません", "適切な情報がありません", "直接関係しているものが無いようです"
]
if answer is None: # 回答がNoneの場合は不正な回答
results = False
else: # badanswer phraseを含んでいる場合は不正な回答
for phrase in badanswer_phrases:
if phrase in answer.lower():
result = False
break
return result
def _get_answer(self, context, question, timeout_sec=60):
'''' タイムアウトハンドリングありのLLMからの回答取得 '''
# デフォルトの結果
result = None
# 終了時間の定義
end_time = time.time() + timeout_sec
# タイムアウトに対するトライ
while time.time() < end_time:
# レスポンス取得の試行
try:
result = self.qa_chain.generate([{'context': context, 'question': question}])
break # レスポンスが成功したらループをストップ
# その他のエラーでも例外を発生
except Exception as e:
print(f'LLM QA Chain encountered unexpected error: {e}')
raise e
return result
def get_answer(self, question):
''' 指定された質問の回答を取得 '''
# デフォルトの結果
result = {'answer':None, 'source':None, 'output_metadata':None}
# 質問から一般的な略語を削除
for abbreviation, full_text in self.abbreviations.items():
pattern = re.compile(fr'\b({abbreviation}|{abbreviation.lower()})\b', re.IGNORECASE)
question = pattern.sub(f"{abbreviation} ({full_text})", question)
# 適切なドキュメントの取得
docs = self.retriever.get_relevant_documents(question)
# それぞれのドキュメントごとに ...
for doc in docs:
# ドキュメントのキー要素を取得
text = doc.page_content
source = doc.metadata['source']
# LLMから回答を取得
output = self._get_answer(text, question)
# 結果からアウトプットを取得
generation = output.generations[0][0]
answer = generation.text
output_metadata = output.llm_output
# no_answer ではない場合には結果を構成
if self._is_good_answer(answer):
result['answer'] = answer
result['source'] = source
result['output_metadata'] = output_metadata
break # 良い回答であればループをストップ
return result
class MLflowQABot(mlflow.pyfunc.PythonModel):
def __init__(self, llm, retriever):
self.qabot = QABot(llm, retriever)
def predict(self, inputs):
questions = list(inputs['question'])
# 回答の返却
return [self.qabot.get_answer(q) for q in questions]
def _load_pyfunc(data_path):
device = 0 if torch.cuda.is_available() else -1
tokenizer = AutoTokenizer.from_pretrained(data_path, use_fast=False)
model = AutoModelForCausalLM.from_pretrained(data_path)
pipe = pipeline(
"text-generation", model=model, tokenizer=tokenizer, max_new_tokens=200, device=device
)
llm = HuggingFacePipeline(pipeline=pipe)
# エンべディングにアクセスするためにベクトルストアをオープン
embeddings = HuggingFaceEmbeddings(model_name='sonoisa/sentence-bert-base-ja-mean-tokens-v2')
vector_store = FAISS.load_local(f"{data_path}/vector_db", embeddings)
n_documents = 5 # 取得するドキュメントの数
retriever = vector_store.as_retriever(search_kwargs={'k': n_documents}) # 取得メカニズムの設定
return MLflowQABot(llm, retriever)
モデルのロギング
そして、モデルのロギングの際にこのローダーモジュールを指定します。
# 上のQABotクラスをloader moduleに含めます
loader_code_path = "/Workspace/Users/takaaki.yayoi@databricks.com/20230603_diy-llm-qa-bot-jpn-open/pyfunc_modules/transformer_loader_module.py"
# mlflowにモデルを永続化
with mlflow.start_run() as run:
_ = (
mlflow.pyfunc.log_model(artifact_path="model", # モデルを保存する相対パス
loader_module="transformer_loader_module", # loader module名
extra_pip_requirements=['langchain==0.0.166', 'tiktoken==0.4.0', 'faiss-cpu==1.7.4', 'transformers==4.28.0', 'sentence_transformers==2.2.2', 'fugashi==1.2.1', 'ipadic==1.0.0'], # 依存ライブラリ
code_path=[loader_code_path], # loader moduleのパス
await_registration_for = 1200, # モデルサイズが大きいので長めの待ち時間にします
data_path=base_data_path, # loader moduleで読み込むモデルデータのパス
registered_model_name=config['registered_model_name'] # 登録モデル名
)
)
モデルのテスト
ここまでの処理で、カスタムLLMがMLflowモデルレジストリに登録されているので、従来の方法でモデルの操作、参照が可能になります。
# mlflowに接続
client = mlflow.MlflowClient()
# 最新モデルバージョンの特定
latest_version = client.get_latest_versions(config['registered_model_name'], stages=['None'])[0].version
# モデルをプロダクションに移行
client.transition_model_version_stage(
name=config['registered_model_name'],
version=latest_version,
stage='Production',
archive_existing_versions=True
)
# mlflowからモデルを取得
model = mlflow.pyfunc.load_model(f"models:/{config['registered_model_name']}/Production"
# 質問入力の構築
queries = pd.DataFrame({'question':[
"Delta Sharingとは何?",
"MLflowのメリットは?",
"Unity Catalogのセットアップ方法"
]})
# レスポンスの取得
model.predict(queries)
オープンLLMでもそれなりの結果が得られています!可能性が広がる...。
Setting `pad_token_id` to `eos_token_id`:3 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:3 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:3 for open-end generation.
Out[25]: [{'answer': 'Delta Sharingは、Delta Lakeテーブルのインクリメンタルなクローンを作成するために使用されるSQLクエリです。Delta Sharingは、Delta Lakeテーブルのインクリメンタルなクローンを作成するために使用されるSQLクエリです。Delta Sharingは、Delta Lakeテーブルのインクリメンタルなクローンを作成するために使用されるSQLクエリです。Delta Sharingは、Delta Lakeテーブルのインクリメンタルなクローンを作成するために使用されるSQLクエリです。Delta Sharingは、Delta Lakeテーブルのインクリメンタルなクローンを作成するために使用されるSQLクエリです。Delta Sharingは、Delta Lakeテーブルのインクリメンタルなクローンを作成するために使用されるSQLクエリです。Delta Sharingは、Delta Lakeテーブルのインクリメンタルなクローンを作成するために使用されるSQLクエリです。Delta Sh',
'source': 'https://qiita.com/taka_yayoi/items/bc3edb691c6f1fc7c79a',
'output_metadata': None},
{'answer': 'MLflowは、データ分析と機械学習のワークフローを簡素化し、データサイエンティストと開発者の生産性を向上させる強力なツールです。MLflowは、データサイエンティストと開発者が、データから洞察を引き出し、ビジネス上の意思決定を行うのに役立つ強力なツールです。\\n',
'source': '',
'output_metadata': None},
{'answer': 'Unity Catalogのセットアップ方法には、次のものがあります。\\n\\n1. Unity Catalogのセットアップ:Unity Catalogのセットアップには、次の手順が含まれます。\\n\\n1. Unity Catalogのインストール:Unity Catalogのインストールには、次の手順が含まれます。\\n\\n1. Unity Catalogのインストール:Unity Catalogのインストールには、次の手順が含まれます。\\n\\n1. Unity Catalogのインストール:Unity Catalogのインストールには、次の手順が含まれます。\\n\\n1. Unity Catalogのインストール:Unity Catalogのインストールには、次の手順が含まれます。\\n\\n1. Unity Catalogのインストール:Unity Catalogのインストールには、次の手順が含まれます。\\n\\n1. Unity Cat',
'source': 'https://qiita.com/taka_yayoi/items/b34b89d8e158935c3095',
'output_metadata': None}]
モデルのデプロイ
本当はここまでやりたかったのですが、現時点ではDatabricksのモデルサービングエンドポイントに大容量のLLMをデプロイしようとするとエラーになるので、今回はここまでに留めておきます。近いうち状況は改善されることでしょう(多分)。
もちろん、バッチ処理であればこの仕組みは問題なく動作します。