今回もDatabricksで検証しています。文章中のUnity Catalogに関する機能はDatabricks固有です。
導入
前回記事では、FastAPIを使ってCTranslate2で変換したLLMのAPIサーバを作りました。
Databricksでは、機械学習モデルをサービングする(マネージドAPIサーバとして公開する)機能があり、本番運用ではこちらを使うことが望ましいです。
ただし、Databricksのサービング機能を使うにはMLflow Registryに登録されたモデルである必要があります。
今回は、CTranslate2で変換したモデルをMLflowでロギングします。
また、最近MLflowのバージョン2.6.0がリリースされました。正直アップデートが早くて2.5.0のキャッチアップもきちんとできてないのですが、せっかくなので2.6.0の新機能も試したいと思います。
2023/8時点で、Databricksのモデルサービング機能は日本リージョンで使用できません。
(Legacyのサービングサービスは使える)
日本リージョンでのサービス提供開始が待ち遠しいです。
MLflowとは
こちらからどうぞ(超盛沢山)。
バージョン2.6.0のリリースノートはこちら。
準備
今回はノートブックだけではなく、ファイル(.py)も作ります。
まずはノートブックを作って、必要なモジュールをインストールします。
MLflowは2.6.0の新機能を使うので、それ以上のバージョンをインストールしてください。
%pip install -U -qq "mlflow>=2.6.0" ctranslate2 langchain transformers accelerate sentencepiece
dbutils.library.restartPython()
次にmlflowをインポートします。
今回はUnity Catalog配下にモデルを登録しますので、あわせてmlflow.set_registry_uri("databricks-uc")
を指定してください。
import mlflow
mlflow.set_registry_uri("databricks-uc")
ここからは一旦ノートブックから離れて、必要なファイルを作成します。
CTranslate2の推論処理用クラスの作成
↓の記事で以前作成したCTranslate2をlangchainで動かすためのクラスを流用します。
ctranslate2llm.py
という名前でノートブックと同じ場所にファイルを作成し、中身をコピーします。
コード(ctranslate2llm.py) 長いので折り畳み
from ctranslate2 import Generator, GenerationStepResult
from transformers import PreTrainedTokenizer, PreTrainedTokenizerFast
from typing import (
Any,
List,
Union,
Mapping,
Optional,
Iterable,
)
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.callbacks.manager import CallbackManagerForLLMRun
from langchain.callbacks import AsyncIteratorCallbackHandler
from langchain.llms.base import LLM
class CTranslate2StreamLLM(LLM):
generator: Generator
tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast]
max_length: int = 128
repetition_penalty: float = 1.1
temperature: float = 0.1
topk: int = 1
@property
def _llm_type(self) -> str:
return "CTranslate2"
def _generate_tokens(
self,
prompt: str,
) -> Iterable[GenerationStepResult]:
# 推論の実行
tokens = self.tokenizer.convert_ids_to_tokens(
self.tokenizer.encode(prompt, add_special_tokens=False)
)
step_results = self.generator.generate_tokens(
tokens,
max_length=self.max_length,
sampling_topk=self.topk,
sampling_temperature=self.temperature,
repetition_penalty=self.repetition_penalty,
return_log_prob=True,
)
return step_results
def _decode_with_buffer(
self, step_result: GenerationStepResult, token_buffer: list
) -> Union[str, None]:
token_buffer.append(step_result.token_id)
word = self.tokenizer.decode(token_buffer)
# 全て変換不能文字の場合、終了
if all(c == "�" for c in word):
return None
# step_resultのtokenが▁から始まる場合、スペースを付与する
if step_result.token.startswith("▁"):
word = " " + word
# 正常な文字が生成できた場合、バッファをクリア
token_buffer.clear()
return word
def _call(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
) -> str:
if stop is not None:
raise ValueError("stop kwargs are not permitted.")
step_results = self._generate_tokens(prompt)
output_ids = []
token_buffer = []
for step_result in step_results:
output_ids.append(step_result.token_id)
if run_manager:
if word := self._decode_with_buffer(step_result, token_buffer):
run_manager.on_llm_new_token(
word,
verbose=self.verbose,
logprobs=step_result.log_prob if step_result.log_prob else None,
)
if output_ids:
text = self.tokenizer.decode(output_ids)
return text
return ""
async def _acall(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
) -> str:
if stop is not None:
raise ValueError("stop kwargs are not permitted.")
step_results = self._generate_tokens(prompt)
output_ids = []
token_buffer = []
for step_result in step_results:
output_ids.append(step_result.token_id)
if run_manager:
if word := self._decode_with_buffer(step_result, token_buffer):
await run_manager.on_llm_new_token(
word,
verbose=self.verbose,
logprobs=step_result.log_prob if step_result.log_prob else None,
)
if output_ids:
text = self.tokenizer.decode(output_ids)
return text
return ""
@property
def _identifying_params(self) -> Mapping[str, Any]:
"""Get the identifying parameters."""
return {
"generator": self.generator,
"tokenizer": self.tokenizer,
"max_length": self.max_length,
"repetition_penalty": self.repetition_penalty,
"temperature": self.temperature,
"topk": self.topk,
}
mlflow.pyfunc用のカスタムモデル作成
MLflowにロギングするための対応を行います。
CTranslate2用のflavorを作る手もありますが、大変なのでpyfuncのカスタムモデルで対応することにします。
ctranslate2_loader_module.py
という名前でノートブックと同じ場所にファイルを作成し、カスタムモデルと_load_pyfunc
を実装します。
from typing import (
Any,
Dict,
Optional,
)
import mlflow
import pandas as pd
import torch
import ctranslate2
import transformers
from ctranslate2llm import CTranslate2StreamLLM
from langchain.llms.base import LLM
from langchain import PromptTemplate
from langchain.chains import LLMChain
class CTranslate2Model(mlflow.pyfunc.PythonModel):
def __init__(self, generator, tokenizer):
self.generator = generator
self.tokenizer = tokenizer
def _get_answer(
self, inputs_pdf: pd.DataFrame, template: str, temperature: float = 0.5
) -> pd.DataFrame:
instructions = inputs_pdf[["instruction"]].to_dict(orient="records")
prompt = PromptTemplate(input_variables=["instruction"], template=template)
llm = CTranslate2StreamLLM(
generator=self.generator,
tokenizer=self.tokenizer,
temperature=temperature,
)
chain = LLMChain(llm=llm, prompt=prompt)
results = chain.apply(instructions)
return pd.DataFrame(
{
"instruction": [q["instruction"] for q in instructions],
"answer": [r["text"] for r in results],
}
)
def predict(
self,
inputs: pd.DataFrame,
params: Optional[Dict[str, Any]] = None,
) -> pd.DataFrame:
return self._get_answer(
inputs,
template=params["template"],
temperature=params["temperature"],
)
def _load_pyfunc(model_path_local):
# ジェネレーターとトークナイザーの準備
device = "cuda" if torch.cuda.is_available() else "cpu"
generator = ctranslate2.Generator(model_path_local, device=device)
tokenizer = transformers.AutoTokenizer.from_pretrained(
model_path_local, use_fast=False
)
return CTranslate2Model(
generator=generator,
tokenizer=tokenizer,
)
mlflow.pyfuncのカスタムモデルクラスは、mlflow.pyfunc.PythonModelを継承し、predict
を実装すればOKです。
内容としては、langchainのLLMChainを作って推論を実施、結果をpandasのDataFrameにラップして返しているだけです。
また、モデルをロードするための_load_pyfuncも実装必要です。前回までの記事同様、generator/tokenizerのインスタンスを作成して。カスタムモデルクラスへ渡しています。
ここで、MLflow 2.6.0からの新機能であるpredict関数へのパラメータ渡しも実装しています。
以前のバージョンではpredict関数は引数を入力データ1種しか取れなかったのですが、オプションとしてパラメータも指定できるようになりました。
今回はこのparamsを使ってtemplate(プロンプトのテンプレート文字列)とtemperatureのパラメータを渡せるように実装しています。
従来はインプットデータの中にこれらのパラメータを埋め込むことで対応していたのですが、明確に別でパラメータを記述できるのは見通しがよくなって良いですね。
MLflowでロギングする
ノートブックに戻って、MLflowロギング用のコードを作成します。
Sample InputとSignatureを作成
ロギング時に保管するSample InputデータとSignatureを作成します。
今回のモデルに与える入力データはinstructionという列を持つpandasのDataFrameにしました。
出力は、入力+answerという列を持つPandas DataFrameにしています。
従来ならこの二つからsignatureを作っていたのですが、今回は合わせてparameterを追加します。
infer_signature
の第3引数にデフォルトのパラメータを渡すことで、
predict
関数のパラメータが利用可能になります。
ここで指定しないとpredict関数に与えるパラメータが無視される仕様のようなのでパラメータを使用する場合は必ず指定しましょう。
# sample inputとsignatureを作成
import pandas as pd
from mlflow.models.signature import infer_signature
sample_input = pd.DataFrame(
{
"instruction": [
"LLMとは何ですか?",
]
}
)
sample_output = pd.DataFrame(
{
"instruction": [
"LLMとは何ですか?",
],
"answer": [
"LLMとは大規模言語モデルのことです。"
]
}
)
default_params = {"template": "ユーザー: {instruction}<NL>システム: ", "temperature": 0.5}
# mlflow保存用のsignature作成
signature = infer_signature(sample_input, sample_output, default_params)
カスタムクラスのファイルパスリストを作る
ロギングの際に渡すカスタムクラスのファイルパスリストを作ります。
今回は二つのpythonファイルがあるので、この2種のフルパスリストを作ります。
import os
notebook_path = (
dbutils.notebook.entry_point.getDbutils()
.notebook()
.getContext()
.notebookPath()
.get()
)
repo_root = os.path.dirname(notebook_path)
code_paths = ["ctranslate2_loader_module.py", "ctranslate2llm.py"]
code_paths = [f"/Workspace{repo_root}/{code_path}" for code_path in code_paths]
ロギングする
mlflow.pyfunc.log_model
を呼び出すことでモデルをロギングします。
引数data_pathにはCTranslate2で変換したモデルファイルが置いてあるパスを指定してください。
今回はrinna/japanese-gpt-neox-3.6b-instruction-ppo
をCTranslate2変換モデルを使っています。
引数extra_pip_requirementsは必要なモジュールを指定してください。(mlflowは不要だったかも)
また、引数registered_model_nameを指定することで、Unity Catalogのモデル登録も同時にしています。
今回はtraining.llm
というカタログ・スキーマがある前提として、そこにct2modelという名前で登録します。
# モデルを永続化
with mlflow.start_run() as run:
_ = mlflow.pyfunc.log_model(
artifact_path="model",
loader_module="ctranslate2_loader_module", # loader module名
extra_pip_requirements=[
"mlflow>=2.6.0",
"langchain>=0.0.268",
"ctranslate2==3.18.0",
"sentencepiece==0.1.99",
"transformers==4.31.0",
"accelerate==0.21.0",
], # 依存ライブラリ
signature=signature,
code_path=code_paths, # loader moduleのパス
await_registration_for=1200, # モデルサイズが大きいので長めの待ち時間にします
data_path=model_path, # loader moduleで読み込むモデルデータのパス
input_example=sample_input,
registered_model_name="training.llm.ct2model", # 登録モデル名 in Unity Catalog
)
ノートブックを実行することで、モデルがロギングされます。
データエクスプローラ等でモデルが登録されていることを確認してください。
テスト
モデルをロードして、推論させてみます。
model_name = "training.llm.ct2model"
model_uri = f"models:/{model_name}/1"
# mlflowからモデルを取得
model = mlflow.pyfunc.load_model(model_uri)
# 質問入力の構築
queries = pd.DataFrame({'instruction':[
"大規模言語モデルとは?",
]})
result = model.predict(
queries, params={"temperature": 0.4}
)
display(result)
instruction | answer |
---|---|
大規模言語モデルとは? | 大規模言語モデルとは、大規模なコーパスに適用される言語処理技術です。大規模言語モデルは、大量のデータを分析して有用な情報や知識を抽出するために使用されます。大規模言語モデルの重要な利点には、学習が容易であること、大規模なデータセットで作業できること、多数の言語研究を支援することがあります。 |
問題なく推論できることが確認できました。
まとめ
CTranslate2変換後モデルをMLflowでロギングしてみました。
これでDatabricks上で再利用な形で管理できます。
例えば、Chatbotで使用するモデルとしていろんなモデルを登録し、状況によって切り替えて使うということもMLflowを使えば容易にできると思います。
また、MLflow 2.6.0から提供されたpyfuncにおける推論の追加パラメータは、個人的に待ち望んていた機能なので嬉しいです。
試してないのですが、paramsでlangchainのコールバックオブジェクトを渡せば、ストリーミング出力もできるような気がします。 → 試したのですが、paramsで渡せるのはstrなどプリミティブなものだけでした。何らかの工夫が要りますね。。。
あとは日本リージョンにServerless Model Servingが来るだけ!
さて、CTranslate2を軸に短期集中で記事を書いてみました。
夏季休暇も終わるので、一旦このシリーズ(?)は終わりです。
あまり推敲せず書いたため、分かりづらいところや間違いも多々あると思いますが、寛大な心で読んでもらえれば幸いです。