4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

LINE Bot(ローカル)とDify(ローカル)とLLMサーバ(ローカル)の連携

Posted at

全体像

LINE Botは以下の2つの主要なコンポーネントで構成される。

  1. Flaskで構築されたウェブサーバ

    • LINEサーバからのコールバックを受信する。
    • ユーザーメッセージをDify API形式に変換し、LLMサーバにリクエストを送信する。
    • LLMサーバの応答をLINE Botの形式で返却する。
    • Flaskサーバのコードは後で追記予定。
  2. Difyワークフロー

    • ユーザーのメッセージに対して適切なLLMを呼び出し、応答を生成する。
    • 必要に応じて複数のステップや条件分岐を含むワークフローを作成する。

Flaskサーバの役割と動作

Flaskサーバの役割は以下の通りである。

  1. LINEサーバからコールバックを受け取る
  2. コールバック内容からユーザーのメッセージを抽出
  3. 抽出したメッセージをDify APIに送信するために加工
  4. Dify APIのレスポンスをLINE Bot形式に変換してLINEサーバに返却
  5. タイムアウトした場合はLLMサーバ(今回はopen webui)を再起動

※以下のコードの場合、Difyのワークフローは「会話用」「ネット検索用」「画像へのリアクション生成」の3つがあるため、それぞれに応じてどのワークフローにユーザーのメッセージを投じるかを分岐している

import os
import json
import subprocess
import requests
from flask import Flask, request, abort, send_from_directory
from linebot.v3.messaging import Configuration, ApiClient, MessagingApi, ReplyMessageRequest, TextMessage
from linebot.v3 import WebhookHandler
from linebot.v3.webhooks import MessageEvent, TextMessageContent, ImageMessageContent
from linebot.v3.exceptions import InvalidSignatureError

# Flaskアプリケーションの初期化
app = Flask(__name__)

# 設定ファイルの読み込み
CONFIG_FILE = "config.json"
if not os.path.exists(CONFIG_FILE):
    raise FileNotFoundError(f"設定ファイルが見つかりません: {CONFIG_FILE}")
with open(CONFIG_FILE, "r") as file:
    config = json.load(file)

# LINE BotとAPIの設定
configuration = Configuration(access_token=config["LINE_CHANNEL_ACCESS_TOKEN"])
handler = WebhookHandler(config["LINE_CHANNEL_SECRET"])

# 定数の設定
IMAGE_SAVE_DIRECTORY = config["IMAGE_SAVE_DIRECTORY"]
os.makedirs(IMAGE_SAVE_DIRECTORY, exist_ok=True)
DIFY_API_KEYS = {
    "text": config["DIFY_API_KEY_TEXT"],
    "image": config["DIFY_API_KEY_IMAGE"],
    "search": config["DIFY_API_KEY_SEARCH"],
}
DIFY_BASE_URL = config["DIFY_BASE_URL"]
DIFY_PROMPT = config["DIFY_PROMPT"]
MAX_CONTEXT_LENGTH = config.get("MAX_CONTEXT_LENGTH", 20)

# チャット履歴の初期化
chat_history = {}

# 各種ユーティリティ関数
def get_chat_key(event):
    """チャットごとに一意のキーを生成"""
    if hasattr(event.source, 'group_id'):
        return f"group-{event.source.group_id}"
    elif hasattr(event.source, 'user_id'):
        return f"user-{event.source.user_id}"
    return "unknown"

def update_chat_history(chat_key, message):
    """チャット履歴を更新"""
    if chat_key not in chat_history:
        chat_history[chat_key] = []
    if len(chat_history[chat_key]) >= MAX_CONTEXT_LENGTH:
        chat_history[chat_key].pop(0)
    chat_history[chat_key].append(message)

def restart_open_webui(reply_token):
    """open-webuiコンテナを再起動"""
    try:
        subprocess.run(["docker", "restart", "open-webui"], check=True)
        app.logger.info("open-webuiコンテナが正常に再起動されました。")
        send_line_reply(reply_token, "ごめん寝てた。何?")
    except subprocess.CalledProcessError as e:
        app.logger.error(f"open-webuiの再起動に失敗しました: {e}")

def get_group_member_display_name(group_id, user_id):
    """グループメンバーのプロフィール情報から表示名を取得"""
    with ApiClient(configuration) as api_client:
        messaging_api = MessagingApi(api_client)
        try:
            profile = messaging_api.get_group_member_profile(group_id, user_id)
            return profile.display_name
        except Exception as e:
            app.logger.error(f"プロフィール情報の取得に失敗しました: {e}")
            return None

def send_line_reply(reply_token, text):
    """LINEメッセージを返信"""
    with ApiClient(configuration) as api_client:
        line_bot_api = MessagingApi(api_client)
        line_bot_api.reply_message(
            ReplyMessageRequest(
                reply_token=reply_token,
                messages=[TextMessage(text=text)]
            )
        )

def prepare_messages_for_dify(chat_key):
    """Dify APIに渡すメッセージを構築"""
    history = chat_history[chat_key]
    if len(history) == 1:
        return [{"role": "user", "content": history[0]}]
    conversation_history = "\n".join(history[:-1])
    targeted_message = history[-1]
    return [
        {"role": "user", "content": "###以下は直近の会話の履歴\n" + conversation_history},
        {"role": "user", "content": "###以下はあなたへのメッセージ\n" + targeted_message}
    ]

# ルートとハンドラー
@app.route('/images/<filename>', methods=['GET'])
def serve_image(filename):
    """画像を提供"""
    file_path = os.path.join(IMAGE_SAVE_DIRECTORY, filename)
    if os.path.exists(file_path):
        return send_from_directory(IMAGE_SAVE_DIRECTORY, filename)
    abort(404)

@app.route("/callback", methods=['POST'])
def callback():
    """LINEプラットフォームからのコールバックを処理"""
    signature = request.headers.get('X-Line-Signature')
    body = request.get_data(as_text=True)
    app.logger.info(f"受信リクエストボディ: {body}")
    try:
        handler.handle(body, signature)
    except InvalidSignatureError:
        app.logger.error("不正なシグネチャです。")
        abort(400)
    return 'OK'

@handler.add(MessageEvent, message=TextMessageContent)
def handle_text_message(event):
    """テキストメッセージを処理"""
    user_message = event.message.text
    is_targeted = user_message.startswith('@人工無能')
    user_message = user_message.replace('@人工無能', '').strip() if is_targeted else user_message
    chat_key = get_chat_key(event)
    update_chat_history(chat_key, user_message)

    if is_targeted and "質問。" in user_message:
        user_query = user_message.replace('質問。', '').strip()
        app.logger.info(f"ウェブ検索クエリを検出: {user_query}")
        dify_headers = {
            "Authorization": f"Bearer {DIFY_API_KEYS['search']}",
            "Content-Type": "application/json"
        }
        dify_data = {"query": user_query, "inputs": {}, "response_mode": "blocking", "user": "unique_user_id"}
        try:
            dify_response = requests.post(DIFY_BASE_URL, json=dify_data, headers=dify_headers, timeout=180)
            if dify_response.status_code == 200:
                response_data = dify_response.json()
                reply_text = response_data.get('answer', '応答なし')
                update_chat_history(chat_key, f"人工無能「{reply_text}」")
                send_line_reply(event.reply_token, reply_text)
            else:
                send_line_reply(event.reply_token, "エラーが発生しました。再度試してください。")
        except requests.exceptions.Timeout:
            restart_open_webui(event.reply_token)
        except requests.exceptions.RequestException as e:
            send_line_reply(event.reply_token, "APIリクエスト中にエラーが発生しました。")
        return

    if is_targeted:
        messages = prepare_messages_for_dify(chat_key)
        dify_headers = {
            "Authorization": f"Bearer {DIFY_API_KEYS['text']}",
            "Content-Type": "application/json"
        }
        dify_data = {"query": "\n".join([msg['content'] for msg in messages]), "inputs": {}, "response_mode": "blocking", "user": "unique_user_id"}
        try:
            response = requests.post(DIFY_BASE_URL, json=dify_data, headers=dify_headers, timeout=180)
            if response.status_code == 200:
                reply_text = response.json().get('answer', '応答なし')
                update_chat_history(chat_key, f"人工無能「{reply_text}」")
                send_line_reply(event.reply_token, reply_text)
            else:
                send_line_reply(event.reply_token, "エラーが発生しました。再度試してください。")
        except requests.exceptions.Timeout:
            restart_open_webui(event.reply_token)
        except requests.exceptions.RequestException as e:
            send_line_reply(event.reply_token, "APIリクエスト中にエラーが発生しました。")

@handler.add(MessageEvent, message=ImageMessageContent)
def handle_image_message(event):
    """画像メッセージを処理"""
    headers = {'Authorization': f'Bearer {config["LINE_CHANNEL_ACCESS_TOKEN"]}'}
    content_url = f"https://api-data.line.me/v2/bot/message/{event.message.id}/content"
    response = requests.get(content_url, headers=headers, stream=True)
    if response.status_code == 200:
        file_path = os.path.join(IMAGE_SAVE_DIRECTORY, f"{event.message.id}.jpg")
        with open(file_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        image_url = f"http://{config['HOST']}:{config['PORT']}/images/{event.message.id}.jpg"
        dify_headers = {
            "Authorization": f"Bearer {DIFY_API_KEYS['image']}",
            "Content-Type": "application/json"
        }
        dify_data = {
            "query": DIFY_PROMPT, "inputs": {}, "files": [{"type": "image", "transfer_method": "remote_url", "url": image_url}],
            "response_mode": "blocking", "user": "unique_user_id"
        }
        try:
            dify_response = requests.post(DIFY_BASE_URL, json=dify_data, headers=dify_headers, timeout=180)
            if dify_response.status_code == 200:
                reply_text = dify_response.json().get('answer', '応答なし')
                send_line_reply(event.reply_token, reply_text)
            else:
                send_line_reply(event.reply_token, "画像の解析中にエラーが発生しました。")
        except requests.exceptions.Timeout:
            restart_open_webui(event.reply_token)
        except requests.exceptions.RequestException as e:
            send_line_reply(event.reply_token, "APIリクエスト中にエラーが発生しました。")
        if os.path.exists(file_path):
            os.remove(file_path)
    else:
        send_line_reply(event.reply_token, "画像の取得に失敗しました。")

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=config["PORT"], debug=True)

今回作成したDifyのワークフロー3種

1.単純なチャット応答用のワークフロー
 あえてDifyで作る必要は無いほど単純なものだが、ワークフロー作成の練習として。
 結果的にはシステムプロンプトの試行錯誤がやりやすい、というメリットがあった。
image.png

2.ユーザーの質問に応じて検索キーワードを生成し、Google検索を行い、結果を要約するワークフロー

  • 適切なキーワードを生成させるシステムプロンプトが難しい…(ローカルLLM故にChatGPTやらGeminiほど頭良くない)
  • イテレーションの練習として。検索結果(処理速度を考慮して3件まで)で要約を繰り返し、最後にそれらをまとめて回答を生成する
    image.png

3.ユーザーがチャットに投稿した画像に対して様々なリアクションを行うワークフロー
 マルチモーダルのLLMモデル(llama3.2-vision)の利用テスト。
 画像が写真かどうか・写っているのが男性か女性かそれ以外か・と行った判定を行わせ、それに応じて嘘のカロリーを告げたり、カスの嘘をついたり、と処理が分岐する。
image.png

Dify APIとワークフロー

LineからLLMまでの通信全体の構造

全体としては以下のような通信の流れになる。
この途中にDifyのワークフローが含まれる。
(実際には戻り方向の通信も発生するわけだが、そこは省略)

4
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?