はじめに
長いタイトルですが、最近流行りのOpenAI APIを使ってChatwork上で会話できるボットを、AWS Lambda単体で作ってみたところ、同じ手間でElixirだと少し良いものができたので、そのお話です。
作ってみる
材料
- OpenAIのAPI key
- Chatworkのアカウント(質問者のアカウントと、回答者(ChatGPT)のアカウント)
- 回答者(ChatGPT)のアカウントのAPIトークン
- Chatworkの[サービス連携]→[APIトークン]より取得できます。
- 回答者(ChatGPT)のアカウントのID
- Chatworkの[環境設定]→[Chatworkについて]より取得できます。
- ChatGPT用のグループチャットのルームID
- Chatworkでチャットを開いた時に表示されるURLの#!rid以降の部分を指します。たとえば、
https://www.chatwork.com/#!rid123
のルームIDは123
です。
- Chatworkでチャットを開いた時に表示されるURLの#!rid以降の部分を指します。たとえば、
AWS Lambdaの実装
まずはmix new chatgpt_bot
でプロジェクトを作成し、mix.exs
にAWS Lambda用のライブラリを追加し、アプリケーションとして起動するようにします。
defmodule ChatgptBot.MixProject do
use Mix.Project
def project do
[
app: :chatgpt_bot,
version: "0.1.0",
elixir: ">= 1.10.4",
start_permanent: Mix.env() == :prod,
deps: deps()
]
end
# Run "mix help compile.app" to learn about applications.
def application do
[
mod: {FaasBase.Aws.Application, []},
extra_applications: [:logger]
]
end
# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:faas_base, "~> 1.1.0"}
# {:dep_from_hexpm, "~> 0.3.0"},
# {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}
]
end
end
$ mix deps.get
次にchatgpt_bot.ex
に下記のようにハンドラを実装します。
defmodule ChatgptBot do
use FaasBase, service: :aws
alias FaasBase.Logger
alias FaasBase.Aws.Request
alias FaasBase.Aws.Response
@openai_base_url "https://api.openai.com/v1"
@chatwork_base_url "https://api.chatwork.com/v2"
@impl FaasBase
def init(context) do
# call back one time
Messages.start_link()
{:ok, context}
end
@impl FaasBase
def handle(
%Request{body: body, headers: %{"x-chatworkwebhooksignature" => webhook_signature}},
_event,
_context
) do
openai_api_key = System.get_env("OPENAI_API_KEY")
chatwork_bot_account_id = System.get_env("CHATWORK_BOT_ACCOUNT_ID") |> String.to_integer()
chatwork_bot_api_token = System.get_env("CHATWORK_BOT_API_TOKEN")
chatwork_webhook_token = System.get_env("CHATWORK_WEBHOOK_TOKEN")
with {:ok, {message, account_id, room_id}} <- parse_body(body),
:ok <- verify_webhook_signature(webhook_signature, chatwork_webhook_token, body),
:ok <- verify_account_id(account_id, chatwork_bot_account_id),
{:ok, response_message} <- chat_completion(message, openai_api_key),
{:ok, _response_body} <-
notify_to_chatwork(response_message, room_id, chatwork_bot_api_token) do
Logger.info(:ok)
else
:post_by_bot_account ->
Logger.info(:skip)
error ->
Logger.error(error)
end
{:ok, Response.to_response("", %{}, 200)}
end
defp parse_body(body) do
case body |> Jason.decode() do
{:ok,
%{
"webhook_event" => %{
"from_account_id" => from_account_id,
"to_account_id" => to_account_id,
"room_id" => room_id,
"body" => message
}
}} ->
{:ok, {message |> String.replace("[To:#{to_account_id}]", ""), from_account_id, room_id}}
{:ok,
%{
"webhook_event" => %{
"account_id" => account_id,
"room_id" => room_id,
"body" => message
}
}} ->
{:ok, {message, account_id, room_id}}
_ ->
:message_not_found
end
end
defp verify_webhook_signature(webhook_signature, chatwork_webhook_token, body) do
secret = chatwork_webhook_token |> Base.decode64!()
digest = :crypto.mac(:hmac, :sha256, secret, body)
if webhook_signature == digest |> Base.encode64(), do: :ok, else: :invalid_webhook_signature
end
defp verify_account_id(account_id, bot_account_id) do
if account_id != bot_account_id, do: :ok, else: :post_by_bot_account
end
defp chat_completion(message, token) do
chat_completion_url = "#{@openai_base_url}/chat/completions"
headers = %{
"Content-Type" => "application/json",
"Authorization" => "Bearer #{token}"
}
request_message = %{"role" => "user", "content" => message}
messages = Messages.get()
body = %{
"model" => "gpt-3.5-turbo",
"messages" => messages ++ [request_message]
}
case HTTPoison.post(chat_completion_url, body |> Jason.encode!(), headers,
timeout: 30_000,
recv_timeout: 30_000
) do
{:error, error} ->
{:error, error.reason}
{:ok, response} ->
%{"choices" => [%{"message" => %{"content" => content} = response_message}]} =
response.body |> Jason.decode!()
Messages.add(request_message)
Messages.add(response_message)
{:ok, content |> String.trim()}
end
end
defp notify_to_chatwork(message, room_id, token) do
endpoint_uri = "#{@chatwork_base_url}/rooms/#{room_id}/messages"
headers = %{
"X-ChatWorkToken" => token
}
params = [body: message, self_unread: 1]
case HTTPoison.post(endpoint_uri, "", headers, params: params) do
{:error, error} ->
{:error, error.reason}
{:ok, response} ->
{:ok, response.body}
end
end
end
やっていることは単純で、
- Webhookイベントから、メッセージと発信者のアカウントID、ルームIDを抜き出す
- ChatworkからのWebhookイベントの署名検証を行う
- 発信者のアカウントIDがChatGPT用のアカウントIDの場合は処理を抜ける
- Openai APIのchat completionエンドポイントにメッセージをPOSTする
- 応答を受け取ったらChatworkに通知する
のような流れで会話を行います。
ポイントは、Messages
というモジュールで、質問者がChatworkで投稿した質問や受け取った回答をMessages
に保持させて、Openai API側にはMessages
から取り出した過去の会話履歴も含めてPOSTしているというところです。これにより、受けた回答に対して、更に質問して回答を得るといったことが可能になります。
「投稿した質問や受け取った回答を保持」ということで、永続化の仕組みを想像しますが、今回Messages
の中では、DynamoDB等による永続化ではなく、Agentを使ってメモリ上に指定件数分の質問と回答を保持させています。
defmodule Messages do
use Agent
def start_link(max_message \\ 10) do
Agent.start_link(fn -> {[], max_message} end, name: __MODULE__)
end
def get do
Agent.get(__MODULE__, & &1) |> elem(0)
end
def add(message) do
Agent.update(__MODULE__, fn {messages, max_message} -> {Enum.take(messages ++ [message], -max_message), max_message} end)
end
end
AWS Lambdaの仕組み上、AWS Lambdaを実行するためのコンテナインスタンスが起動すると、一定期間はそのインスタンスが処理を行うため、実行数を1に制限しておくと、そのインスタンスが残っている限りにおいては、メモリ上に前回の状態を残して後続で利用することが可能になります。
推奨されない使い方(というか止めてねっていうレベルの使い方かも)なので、本来は外部に永続化してあげるのが筋なのですが、他のサービスを使わずにAWS Lambdaだけで簡易に会話履歴のPOSTが実現できるので、個人で利用する分にはありかなと思ってます。(同時にリクエストがすると片方がthrottleエラーで値を返せませんが、これも個人利用ということで。。)
ElixirではAgentで簡単に状態を保持する仕組みを作れるので、「同じ手間でElixirだと少し良いもの」というのはこの部分になります。(仕組み的にJavaとかでも同じ感じでできそうだけど、nodejsとかはどうなんだろ?)
Lambdaのデプロイ
下記のREADMEを参考にAWS LambdaのzipファイルかDockerイメージを作成してデプロイしてください。
zipファイルの場合は、下記を実行して、AWSの管理コンソールよりカスタムランタイムのLambdaを作成します。
$ mkdir -p _build
$ docker run -d -it --rm --name elx erintheblack/elixir-lambda-builder:al2_1.10.4
$ docker cp mix.exs elx:/tmp
$ docker cp lib elx:/tmp
$ docker exec elx /bin/bash -c "mix deps.get; MIX_ENV=prod mix aws.release"
$ docker cp elx:/tmp/_aws ./_build
$ docker stop elx
その際、ハンドラーはモジュール名であるChatgptBot
としてください。
その後、Webhookイベントの登録で使用する関数URLを取得しておきます。
Webhookイベントの登録
次に、Chatwork側でWebhookイベントを登録します。
Chatworkの[サービス連携]→[Webhook]より以下の内容で登録します。
Webhook名 ・・・ 任意
Webhook URL ・・・ 関数URL
イベント ・・・ アカウントイベント or ルームイベント
メッセージ作成 ・・・ チェック
ルームID ・・・ ルームイベントの場合は、該当グループチャットのルームID
登録後に発行されたWebhookトークンを控えておきます。
Lanbdaのその他の設定
Lambdaの環境変数には以下の変数を設定します。
OPENAI_API_KEY ・・・ OpenAIのAPI key
CHATWORK_BOT_API_TOKEN ・・・ 回答者(ChatGPT)のアカウントのAPIトークン
CHATWORK_BOT_ACCOUNT_ID ・・・ 回答者(ChatGPT)のアカウントのID
CHATWORK_WEBHOOK_TOKEN ・・・ 登録時に発行されたWebhookトークン
CHATWORK_BOT_ACCOUNT_ID
には必ず、回答者(ChatGPT)のアカウントIDを指定するようにしてください。(間違えると、ChatGPTの回答をまたChatGPTに質問として送ることになり、無限ループしてしまいますのでご注意ください。)
更に、同時実行数を1に、タイムアウトを30秒にしておきます。
以上で、該当グループチャットで質問を投稿すると、ChatGPTが回答を投稿してくれるようになります。
さいごに
関数URLが作れるようになったことで、API Gatewayも要らず、Agentを使ってメモリ上で会話履歴を管理することで、DBのような永続化のサービスも要らずで、AWS Lambdaだけでシンプルにそれなりの機能が実現できました。
しばらく放置すると別のインスタンスが立ち上がって過去の会話を忘れてしまいますが、そもそも膨大な過去データを保持してOpenai APIを叩くと請求額が怖いことになりますし、適度に忘れちゃうボットの方が、それはそれで愛嬌があるのかなと。