はじめに
PAY.JPを使った記事を書いてみようと思い、LLMを使って商品の購入体験を改善するということに挑戦してみました。この記事を書くためだけに、数週間前にLangChain,LangGraphに入門したので「この書き方は良くない!」とかあるかもしれませんが、雰囲気でも伝われば幸いです。
今回のゴール
PAY.JPを利用して書籍をオンライン販売しているショップが、AIとのやり取りで購入体験ができる機能をユーザーに提供しようとしているという設定です。具体的には以下の機能をチャットボットで提供します。
- ユーザーがチャットボットと対話をして、商品を探索することができる
- ユーザーが購入したい商品を指定すると購入手続きに進められる
- ユーザーが購入完了までをチャット内で完結できる
ユーザーは事前に会員登録が済んでいる想定で、
注文時の支払い方法の指定や発送先の指定などには対応していません。
ただし、上記で挙げた対応していない箇所についても機能拡張すれば対応は可能だと思います。「チャットでやるのか?」という疑問は置いておいて。
今回使っているライブラリ
name | 概要 |
---|---|
LangChain | LLMのアプリケーション呼び出しを簡単に |
LangGraph | ワークフロー制御が簡単に |
LangSmith | ロギング用途 |
Chainlit | チャットボット形式のUIを提供 |
Chroma | ベクトルデータベース |
PAY.JP | 決済 |
Chainlitですが、投稿時点の最新versionである1.3.2では、elementという機能に脆弱性が確認されているので、利用する際には注意してください。
https://github.com/Chainlit/chainlit/releases/tag/1.3.2
デモはこちら
コンソール画面から売上を確認すると、無事に決済が通っていることが確認できました
成果物としては以上となりますが、
どういう処理が裏側で動いているのかを解説していきます。
データセット準備
商品データ
商品としての書籍はLLMで100件ほど自動生成します。このcsvをChromaDBに登録していきます
id,title,category,price,stock,description
1,現代数学の基礎入門,数学新書,1980,45,大学1年生向けの数学基礎教科書。集合論から始まり、線形代数、微積分まで丁寧に解説。例題も充実しており、独学でも理解しやすい構成。
2,歴史で読み解く日本の古代,歴史新書,1580,32,縄文時代から平安時代までの日本の歴史を最新の研究成果を交えて解説。考古学的発見と文献史料を組み合わせた新しい古代史観を提示。
3,量子力学への第一歩,物理新書,2200,28,高校物理の知識があれば読める量子力学入門書。シュレーディンガー方程式までの道のりを、豊富な図解とともにわかりやすく説明。
ChromaDB
ChromaDBではcollectionという単位でレコードを保存できます。
商品タイトル、カテゴリ、商品詳細をembeddingしたものと、商品のメタデータをセットで保存することができます。
class ChromaInitializer:
"""ChromaDB initialization handler"""
...省略...
def _prepare_item_data(self, item: Item) -> tuple[str, dict, str]:
"""Prepare item data for ChromaDB"""
text_to_embed = f"{item.title} {item.category} {item.description}"
metadata = {
"title": item.title,
"price": item.price,
"category": item.category,
"description": item.description,
}
return text_to_embed, metadata, item.id
def _load_items_from_csv(self) -> [tuple[str, dict, str]]:
"""Load and process items from CSV file"""
items_data = []
with open(self.config.data_path, "r") as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
item = Item(**row)
items_data.append(self._prepare_item_data(item))
return items_data
def initialize_collection(self) -> None:
"""Initialize ChromaDB collection with data"""
collection = self._get_or_create_collection()
if collection.count() > 0:
print("Collection already contains data. Skipping initialization.")
return
try:
items_data = self._load_items_from_csv()
documents, metadatas, ids = zip(*items_data)
collection.add(documents=list(documents), metadatas=list(metadatas), ids=list(ids))
print(f"Successfully initialized collection with {len(documents)} items")
except Exception as e:
print(f"Error during initialization: {str(e)}")
raise
ホスティングされているChromaDBに対しては、以下のようにクエリを発行することができます。
ChromaDBは内部でembeddingを行なう機能を有しており、stringをそのまま渡しても問題ないため便利でした。
@tool
def search_items(query: str, config: RunnableConfig) -> list[Item]:
"""
ユーザーが求めている商品の検索を行う
Args:
query: 検索クエリ
config: RunnableConfig
Returns:
list[Item]: 商品情報のリスト
"""
chroma_config = ChromaDBConfig.from_env()
client = chromadb.HttpClient(host=chroma_config.host, port=chroma_config.port)
collection = client.get_collection(name=chroma_config.collection_name)
try:
results = collection.query(
query_texts=[query],
n_results=3,
)
except Exception as e:
logger.error(f"Error: {e}")
raise
...省略...
chainlit
chainlitはチャットボットのインターフェイスを提供し、入出力を細かく制御することができます。
デコレーターを設定すると状況に応じて各関数がフックされるようになります。
@cl.on_chat_start
は、セッション開始時一度だけ起動する処理
@cl.on_message
は、ユーザーからのメッセージ入力が行われた時に起動する処理
@cl.on_chat_start
async def start():
WELCOME_MESSAGE = """
こんにちは!
当店は幅広いジャンルの書籍を取り扱っております。どのようなものをお探しでしょうか?
"""
cl.user_session.set("state", State(messages=[]))
cl.user_session.set("workflow", create_order_graph())
cl.user_session.set("thread_id", str(uuid4()))
await cl.Message(content=WELCOME_MESSAGE).send()
@cl.on_message
async def on_message(message: cl.Message):
state = cl.user_session.get("state")
workflow = cl.user_session.get("workflow")
config = RunnableConfig({"configurable": {"thread_id": cl.user_session.get("thread_id")}})
# メッセージを状態に追加
state["messages"].append(HumanMessage(content=message.content))
try:
# ワークフロー実行
result = await workflow.ainvoke(state, config)
...省略...
LangGraph
ワークフローを制御するために利用しています。
START
からEND
までをNode(処理内容)とedge(遷移先)を設定することでルーティングすることができます。
def create_order_graph() -> CompiledStateGraph:
"""注文フローのグラフを構築"""
workflow = StateGraph(State)
# ノードの追加
workflow.add_node("get_customer_info", _customer_info)
workflow.add_node("assistant", create_assistant())
workflow.add_node("search_items", ToolNode([search_items]))
workflow.add_node("purchase_items", ToolNode([purchase_items]))
# Define logic
workflow.add_edge(START, "get_customer_info")
workflow.add_edge("get_customer_info", "assistant")
workflow.add_conditional_edges("assistant", _route_tools, ["search_items", "purchase_items", END])
workflow.add_edge("search_items", "assistant")
workflow.add_edge("purchase_items", "assistant")
memory = MemorySaver()
graph = workflow.compile(
checkpointer=memory,
interrupt_before=["purchase_items"],
)
return graph
以下の通りで設計されています
name | 概要 |
---|---|
get_cutomer_info | セッション開始時に対象ユーザーの情報を取得する |
assistant | ユーザーとのやり取りを通じて、最適なtoolを使い分ける |
search_items | ユーザーとのやり取りを通じて、購入したい商品を探索する |
purchase_items | 購入の意向を確認した場合、購入対象の商品詳細をユーザーに最終確認してもらった後に購入処理を実行する |
また、Nodeにtoolとしての関数を仕込むことによって、エージェントが関数の引数・コメント・処理内容を読み取って、状況に応じて自動で関数を呼び出してくれるようになります。
今回は、pay.jpを通じて支払いを行う処理をtoolとして定義しています。
@tool
def purchase_items(item: Item, customer: Customer, config: RunnableConfig) -> None:
"""
商品の購入処理を行う
Args:
item: 購入商品
customer: 購入者情報
config: RunnableConfig
"""
payjp.api_key = os.getenv("PAYJP_API_KEY")
try:
charge = payjp.Charge.create(
amount=item.price,
customer=customer.id,
currency="jpy",
metadata={"item_id": item.id, "via": "chatbot"},
)
except Exception as e:
logger.error(f"Error: {e}")
raise
LangSmith
LangSmithはそれぞれのセッションでどういう処理が行われていたのかをトレーシングするためのツールです。エラーが発生した場合の入力・出力の確認ができます。また、機能としてユーザーからのフィードバックを受け付けていた場合にそのフィードバック内容の紐付けも可能なので非常に便利だなと思いました。
おわりに
もし、手元で動かしてみたい方は
PAY.JPのAPIとOpenAIのAPIキーを準備してみてください
ちなみに今回の開発に関わる挙動確認でかかったOpenAIへのコストは0.1ドルだけでした。かなり安いですね。
追記
streamlitとpydantic-aiで実装した方が良いと思いました