全体像
LINE Botは以下の2つの主要なコンポーネントで構成される。
-
Flaskで構築されたウェブサーバ
- LINEサーバからのコールバックを受信する。
- ユーザーメッセージをDify API形式に変換し、LLMサーバにリクエストを送信する。
- LLMサーバの応答をLINE Botの形式で返却する。
- Flaskサーバのコードは後で追記予定。
-
Difyワークフロー
- ユーザーのメッセージに対して適切なLLMを呼び出し、応答を生成する。
- 必要に応じて複数のステップや条件分岐を含むワークフローを作成する。
Flaskサーバの役割と動作
Flaskサーバの役割は以下の通りである。
- LINEサーバからコールバックを受け取る
- コールバック内容からユーザーのメッセージを抽出
- 抽出したメッセージをDify APIに送信するために加工
- Dify APIのレスポンスをLINE Bot形式に変換してLINEサーバに返却
- タイムアウトした場合はLLMサーバ(今回はopen webui)を再起動
※以下のコードの場合、Difyのワークフローは「会話用」「ネット検索用」「画像へのリアクション生成」の3つがあるため、それぞれに応じてどのワークフローにユーザーのメッセージを投じるかを分岐している
import os
import json
import subprocess
import requests
from flask import Flask, request, abort, send_from_directory
from linebot.v3.messaging import Configuration, ApiClient, MessagingApi, ReplyMessageRequest, TextMessage
from linebot.v3 import WebhookHandler
from linebot.v3.webhooks import MessageEvent, TextMessageContent, ImageMessageContent
from linebot.v3.exceptions import InvalidSignatureError
# Flaskアプリケーションの初期化
app = Flask(__name__)
# 設定ファイルの読み込み
CONFIG_FILE = "config.json"
if not os.path.exists(CONFIG_FILE):
raise FileNotFoundError(f"設定ファイルが見つかりません: {CONFIG_FILE}")
with open(CONFIG_FILE, "r") as file:
config = json.load(file)
# LINE BotとAPIの設定
configuration = Configuration(access_token=config["LINE_CHANNEL_ACCESS_TOKEN"])
handler = WebhookHandler(config["LINE_CHANNEL_SECRET"])
# 定数の設定
IMAGE_SAVE_DIRECTORY = config["IMAGE_SAVE_DIRECTORY"]
os.makedirs(IMAGE_SAVE_DIRECTORY, exist_ok=True)
DIFY_API_KEYS = {
"text": config["DIFY_API_KEY_TEXT"],
"image": config["DIFY_API_KEY_IMAGE"],
"search": config["DIFY_API_KEY_SEARCH"],
}
DIFY_BASE_URL = config["DIFY_BASE_URL"]
DIFY_PROMPT = config["DIFY_PROMPT"]
MAX_CONTEXT_LENGTH = config.get("MAX_CONTEXT_LENGTH", 20)
# チャット履歴の初期化
chat_history = {}
# 各種ユーティリティ関数
def get_chat_key(event):
"""チャットごとに一意のキーを生成"""
if hasattr(event.source, 'group_id'):
return f"group-{event.source.group_id}"
elif hasattr(event.source, 'user_id'):
return f"user-{event.source.user_id}"
return "unknown"
def update_chat_history(chat_key, message):
"""チャット履歴を更新"""
if chat_key not in chat_history:
chat_history[chat_key] = []
if len(chat_history[chat_key]) >= MAX_CONTEXT_LENGTH:
chat_history[chat_key].pop(0)
chat_history[chat_key].append(message)
def restart_open_webui(reply_token):
"""open-webuiコンテナを再起動"""
try:
subprocess.run(["docker", "restart", "open-webui"], check=True)
app.logger.info("open-webuiコンテナが正常に再起動されました。")
send_line_reply(reply_token, "ごめん寝てた。何?")
except subprocess.CalledProcessError as e:
app.logger.error(f"open-webuiの再起動に失敗しました: {e}")
def get_group_member_display_name(group_id, user_id):
"""グループメンバーのプロフィール情報から表示名を取得"""
with ApiClient(configuration) as api_client:
messaging_api = MessagingApi(api_client)
try:
profile = messaging_api.get_group_member_profile(group_id, user_id)
return profile.display_name
except Exception as e:
app.logger.error(f"プロフィール情報の取得に失敗しました: {e}")
return None
def send_line_reply(reply_token, text):
"""LINEメッセージを返信"""
with ApiClient(configuration) as api_client:
line_bot_api = MessagingApi(api_client)
line_bot_api.reply_message(
ReplyMessageRequest(
reply_token=reply_token,
messages=[TextMessage(text=text)]
)
)
def prepare_messages_for_dify(chat_key):
"""Dify APIに渡すメッセージを構築"""
history = chat_history[chat_key]
if len(history) == 1:
return [{"role": "user", "content": history[0]}]
conversation_history = "\n".join(history[:-1])
targeted_message = history[-1]
return [
{"role": "user", "content": "###以下は直近の会話の履歴\n" + conversation_history},
{"role": "user", "content": "###以下はあなたへのメッセージ\n" + targeted_message}
]
# ルートとハンドラー
@app.route('/images/<filename>', methods=['GET'])
def serve_image(filename):
"""画像を提供"""
file_path = os.path.join(IMAGE_SAVE_DIRECTORY, filename)
if os.path.exists(file_path):
return send_from_directory(IMAGE_SAVE_DIRECTORY, filename)
abort(404)
@app.route("/callback", methods=['POST'])
def callback():
"""LINEプラットフォームからのコールバックを処理"""
signature = request.headers.get('X-Line-Signature')
body = request.get_data(as_text=True)
app.logger.info(f"受信リクエストボディ: {body}")
try:
handler.handle(body, signature)
except InvalidSignatureError:
app.logger.error("不正なシグネチャです。")
abort(400)
return 'OK'
@handler.add(MessageEvent, message=TextMessageContent)
def handle_text_message(event):
"""テキストメッセージを処理"""
user_message = event.message.text
is_targeted = user_message.startswith('@人工無能')
user_message = user_message.replace('@人工無能', '').strip() if is_targeted else user_message
chat_key = get_chat_key(event)
update_chat_history(chat_key, user_message)
if is_targeted and "質問。" in user_message:
user_query = user_message.replace('質問。', '').strip()
app.logger.info(f"ウェブ検索クエリを検出: {user_query}")
dify_headers = {
"Authorization": f"Bearer {DIFY_API_KEYS['search']}",
"Content-Type": "application/json"
}
dify_data = {"query": user_query, "inputs": {}, "response_mode": "blocking", "user": "unique_user_id"}
try:
dify_response = requests.post(DIFY_BASE_URL, json=dify_data, headers=dify_headers, timeout=180)
if dify_response.status_code == 200:
response_data = dify_response.json()
reply_text = response_data.get('answer', '応答なし')
update_chat_history(chat_key, f"人工無能「{reply_text}」")
send_line_reply(event.reply_token, reply_text)
else:
send_line_reply(event.reply_token, "エラーが発生しました。再度試してください。")
except requests.exceptions.Timeout:
restart_open_webui(event.reply_token)
except requests.exceptions.RequestException as e:
send_line_reply(event.reply_token, "APIリクエスト中にエラーが発生しました。")
return
if is_targeted:
messages = prepare_messages_for_dify(chat_key)
dify_headers = {
"Authorization": f"Bearer {DIFY_API_KEYS['text']}",
"Content-Type": "application/json"
}
dify_data = {"query": "\n".join([msg['content'] for msg in messages]), "inputs": {}, "response_mode": "blocking", "user": "unique_user_id"}
try:
response = requests.post(DIFY_BASE_URL, json=dify_data, headers=dify_headers, timeout=180)
if response.status_code == 200:
reply_text = response.json().get('answer', '応答なし')
update_chat_history(chat_key, f"人工無能「{reply_text}」")
send_line_reply(event.reply_token, reply_text)
else:
send_line_reply(event.reply_token, "エラーが発生しました。再度試してください。")
except requests.exceptions.Timeout:
restart_open_webui(event.reply_token)
except requests.exceptions.RequestException as e:
send_line_reply(event.reply_token, "APIリクエスト中にエラーが発生しました。")
@handler.add(MessageEvent, message=ImageMessageContent)
def handle_image_message(event):
"""画像メッセージを処理"""
headers = {'Authorization': f'Bearer {config["LINE_CHANNEL_ACCESS_TOKEN"]}'}
content_url = f"https://api-data.line.me/v2/bot/message/{event.message.id}/content"
response = requests.get(content_url, headers=headers, stream=True)
if response.status_code == 200:
file_path = os.path.join(IMAGE_SAVE_DIRECTORY, f"{event.message.id}.jpg")
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
image_url = f"http://{config['HOST']}:{config['PORT']}/images/{event.message.id}.jpg"
dify_headers = {
"Authorization": f"Bearer {DIFY_API_KEYS['image']}",
"Content-Type": "application/json"
}
dify_data = {
"query": DIFY_PROMPT, "inputs": {}, "files": [{"type": "image", "transfer_method": "remote_url", "url": image_url}],
"response_mode": "blocking", "user": "unique_user_id"
}
try:
dify_response = requests.post(DIFY_BASE_URL, json=dify_data, headers=dify_headers, timeout=180)
if dify_response.status_code == 200:
reply_text = dify_response.json().get('answer', '応答なし')
send_line_reply(event.reply_token, reply_text)
else:
send_line_reply(event.reply_token, "画像の解析中にエラーが発生しました。")
except requests.exceptions.Timeout:
restart_open_webui(event.reply_token)
except requests.exceptions.RequestException as e:
send_line_reply(event.reply_token, "APIリクエスト中にエラーが発生しました。")
if os.path.exists(file_path):
os.remove(file_path)
else:
send_line_reply(event.reply_token, "画像の取得に失敗しました。")
if __name__ == "__main__":
app.run(host="0.0.0.0", port=config["PORT"], debug=True)
今回作成したDifyのワークフロー3種
1.単純なチャット応答用のワークフロー
あえてDifyで作る必要は無いほど単純なものだが、ワークフロー作成の練習として。
結果的にはシステムプロンプトの試行錯誤がやりやすい、というメリットがあった。
2.ユーザーの質問に応じて検索キーワードを生成し、Google検索を行い、結果を要約するワークフロー
- 適切なキーワードを生成させるシステムプロンプトが難しい…(ローカルLLM故にChatGPTやらGeminiほど頭良くない)
- イテレーションの練習として。検索結果(処理速度を考慮して3件まで)で要約を繰り返し、最後にそれらをまとめて回答を生成する
3.ユーザーがチャットに投稿した画像に対して様々なリアクションを行うワークフロー
マルチモーダルのLLMモデル(llama3.2-vision)の利用テスト。
画像が写真かどうか・写っているのが男性か女性かそれ以外か・と行った判定を行わせ、それに応じて嘘のカロリーを告げたり、カスの嘘をついたり、と処理が分岐する。
Dify APIとワークフロー
LineからLLMまでの通信全体の構造
全体としては以下のような通信の流れになる。
この途中にDifyのワークフローが含まれる。
(実際には戻り方向の通信も発生するわけだが、そこは省略)