OpenAI DevDay(11月7日)
openai社より、革命的な発表がされました。
今回はその中でも2点
①Assistant APIの発表
②Whisperの新モデル「large-v3」の発表
今回はこれらを用いて簡単にしりとり名人との対決システムを作成します。
各機能
①Assistant API
アプリに簡単にAIアシスタントを構築できる機能
一時期はLangchainで構築できてしまうことと、API料金が発生するので、
発表の中の機能としては正直舐めてたところもありましたが、
組み込みやすさとスレッド機能の便利さによって私の中で撤回されました。
機能4点
・Persistent Threads
→会話履歴の保持が複雑なコーディング無で可能になった
・Data Retrieval
→アップロードした外部ファイルの内容から回答を生成してくれる
・Code Interpreter
→必要に応じてコードの自動生成をしてくれる
・Function Calling
→プロンプトに応じて関数を呼びだし、定型処理や外部APIから結果を返してくれる
②Whisper
音声認識モデル
Assistantの作成方法
コーディングによって作成することも可能みたいですが、今回はお手軽に
OpenAIが提供してくれているplaygroudを使って作成。
Instructionsにはどのような挙動をしてほしいかを定義することができます。
今回は単語で回答を返してきてくれますが、
他には「日本語で必ず回答して」や「ん がついたら負けです」「同じ回答が行われたら指摘して」等
ような条件が考えられます。
Code InterpreterがOFFで、Data RetrievalがONになっていますが、
何もファイルはアップロードしてないのでOFFでもいいかもしれないですね。
WEB検索機能があるといいですが、それはFunction Callingでできるのかな?
システムイメージ
〇概要
マイクに向かって単語を発声する
→音声がテキスト変換される
→テキストの単語に応じてAIアシスタントが回答をする
以下繰り返し
〇詳細
pythonのtkinterライブラリを使ってUIを作成
①pyaudioライブラリを使用
録音開始ボタンを押して、収音をスタート
発声→一定期間以上の無音の状態が認められたら無音終了
録音ファイルを作成
②音声認識モデル Whisperを使用
①で作成した音声ファイルからテキストを生成
③assistant api LLMモデル
②で生成されたテキストをassistant apiでの設定とスレッドに応じて
LLMモデルが回答を生成
コード全文
import tkinter as tk
import pyaudio
import wave
import threading
import whisper
import numpy as np
import os
import time
from openai import OpenAI
os.environ["OPENAI_API_KEY"] = "open api key"
print(os.environ["path"])
client = OpenAI()
my_assistant_id = "assistant id"
class AudioRecorderApp:
def __init__(self, root):
self.root = root
self.root.title("音声録音&テキスト変換アプリ")
self.root.geometry("1000x800") # ウィンドウのサイズを設定
threshold = 0.3 # 閾値
self.start_button = tk.Button(root, text="録音開始", command=self.start_recording, bg="green", fg="white", font=("Arial", 16))
self.start_button.pack(pady=20)
self.stop_button = tk.Button(root, text="録音停止", command=self.stop_recording, bg="red", fg="white", font=("Arial", 16), state="disabled")
self.stop_button.pack(pady=20)
self.chat_history_text = tk.Text(root, wrap=tk.WORD, height=600, width=400, font=("Arial", 12))
self.chat_history_text.pack(padx=20, pady=10)
self.stream = None
self.frames = []
self.recording = False
def start_recording(self):
print("録音開始")
self.audio = pyaudio.PyAudio()
self.recording = True
self.frames = []
self.stream = self.audio.open(format=pyaudio.paInt16,
channels=1,
rate=44100,
input=True,
frames_per_buffer=1024)
self.start_button.config(state="disabled") # 録音開始ボタンを無効化
self.stop_button.config(state="normal") # 録音停止ボタンを有効化
# 録音を開始する前にrecord_audioメソッドを呼び出す
self.record_audio()
def stop_recording(self):
if self.recording:
print("録音停止")
self.recording = False
self.stream.stop_stream()
self.stream.close()
self.audio.terminate()
self.save_audio()
self.convert_audio_to_text()
self.start_button.config(state="normal") # 録音開始ボタンを有効化
self.stop_button.config(state="disabled") # 録音停止ボタンを無効化
def record_audio(self):
chunk = 1024 # チャンクサイズ
threshold = 0.01 # 閾値
print("録音中...")
while True:
self.stream = self.audio.open(format=pyaudio.paInt16,
channels=1,
rate=44100,
input=True,
frames_per_buffer=1024)
data = self.stream.read(chunk)
x = np.frombuffer(data, dtype="int16") / 32768.0
if x.max() > threshold:
end = time.time() # 計測終了
print("録音中......")
finish = False
frames = []
# 録音処理
while True:
print("録音中...")
for i in range(0, int(44100 / chunk * 1)):
data = self.stream.read(chunk)
frames.append(data)
ndarray = np.frombuffer(data, dtype="int16") / 32768.0
print("閾値=" + str(ndarray.max()))
if ndarray.max() < 0.3:
zr = 0
while True:
data = self.stream.read(chunk)
ndarray = np.frombuffer(data, dtype="int16") / 32768.0
if zr >= 1:
finish = True
break
if ndarray.max() > threshold:
zr = 0
print(zr)
break
time.sleep(0.01)
zr += 0.01
print("zr:" + str(zr))
if finish:
break
break
print("録音終了")
self.frames = frames
def save_audio(self):
print(self.frames)
#if len(self.frames) > 0:
wf = wave.open("record.wav", 'wb')
wf.setnchannels(1)
wf.setsampwidth(self.audio.get_sample_size(pyaudio.paInt16))
wf.setframerate(44100)
wf.writeframes(b''.join(self.frames))
wf.close()
def convert_audio_to_text(self):
try:
model_size = "medium"
model = whisper.load_model(model_size)
result = model.transcribe("record.wav", language="ja")
myspeak = result["text"]
self.chat_history_text.insert(tk.END, f"お客様: {myspeak}\n")
self.chat_history_text.insert(tk.END, "\n")
# スレッドを作成し、メッセージを取得
messages, thread_id = self.create_and_get_thread_messages(client, myspeak)
message_content = messages[0].content[0].text
annotated_message = self.annotate_message_content(message_content, client)
print(annotated_message)
# 実行を作成し、その完了を待つ
run = client.beta.threads.runs.create(thread_id=thread_id, assistant_id=my_assistant_id, tools=[{"type": "retrieval"}])
if self.wait_for_run_completion(client, thread_id, run.id):
messages = client.beta.threads.messages.list(thread_id=thread_id)
print(messages.data[0].content[0].text.value)
responce_text = messages.data[0].content[0].text.value
else:
responce_text = "応答なし"
self.chat_history_text.insert(tk.END, f"受付担当: {responce_text}\n") # 固定の返答
self.chat_history_text.insert(tk.END, "\n")
except Exception as e:
self.chat_history_text.insert(tk.END, f"エラー: {str(e)}\n")
print(str(e))
self.chat_history_text.insert(tk.END, "Chatbot: 回答できません\n")
# スレッドを作成し、メッセージとスレッドIDを取得するための関数
def create_and_get_thread_messages(self,client, initial_message):
thread = client.beta.threads.create(messages=[{"role": "user", "content": initial_message}])
thread_id = thread.id # スレッドIDを取得
messages = client.beta.threads.messages.list(thread_id).data
return messages, thread_id # メッセージリストとスレッドIDを返す
# メッセージの内容に引用を加える関数
def annotate_message_content(self,message_content, client):
annotations = message_content.annotations
citations = []
for index, annotation in enumerate(annotations):
# テキストを脚注に置き換える
message_content.value = message_content.value.replace(annotation.text, f' [{index}]')
# 注釈属性に基づいて引用を収集
citation_text = self.get_citation_text(client, annotation, index)
if citation_text:
citations.append(citation_text)
return message_content.value + '\n' + '\n'.join(citations)
# 引用テキストを取得するためのヘルパー関数
def get_citation_text(self,client, annotation, index):
if file_citation := getattr(annotation, 'file_citation', None):
cited_file = client.files.retrieve(file_citation.file_id)
return f'[{index}] {file_citation.quote} from {cited_file.filename}'
elif file_path := getattr(annotation, 'file_path', None):
cited_file = client.files.retrieve(file_path.file_id)
return f'[{index}] Click <here> to download {cited_file.filename}'
return None
# 実行が完了するまで待つ関数
def wait_for_run_completion(self,client, thread_id, run_id):
total_attempts = 20 # 状態確認の最大試行回数
for _ in range(total_attempts):
run = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run_id)
if run.status == 'completed':
print('検索が完了しました\n')
return True
elif run.status == 'failed':
print('検索に失敗しました')
print(run.last_error)
return False
time.sleep(0.5)
return False
if __name__ == "__main__":
root = tk.Tk()
app = AudioRecorderApp(root)
root.mainloop()
解説
基本のコードパターン
ライブラリのインストール
tkinterウィンドウの作成
クラスのインスタンスを作成してメインアプリケーションを実行
mainloop()によってユーザー入力に応答させるようになる
import tkinter as tk
import pyaudio
import wave
import threading
import whisper
import numpy as np
import os
import time
from openai import OpenAI
os.environ["OPENAI_API_KEY"] = "open api key"
print(os.environ["path"])
client = OpenAI()
my_assistant_id = "assistant id"
if __name__ == "__main__":
root = tk.Tk()
app = AudioRecorderApp(root)
root.mainloop()
app = AudioRecorderApp(root)を実行した際に
def __init()に書かれた処理が最初に行われる
ここに書かれているのは、
画面の設計やボタンが押された際に実行される関数を定義している
def __init__(self, root):
self.root = root
self.root.title("音声録音&テキスト変換アプリ")
self.root.geometry("1000x800") # ウィンドウのサイズを設定
threshold = 0.3 # 閾値
self.start_button = tk.Button(root, text="録音開始", command=self.start_recording, bg="green", fg="white", font=("Arial", 16))
self.start_button.pack(pady=20)
self.stop_button = tk.Button(root, text="録音停止", command=self.stop_recording, bg="red", fg="white", font=("Arial", 16), state="disabled")
self.stop_button.pack(pady=20)
self.chat_history_text = tk.Text(root, wrap=tk.WORD, height=600, width=400, font=("Arial", 12))
self.chat_history_text.pack(padx=20, pady=10)
self.stream = None
self.frames = []
self.recording = False
緑、赤のボタンそれぞれの処理を定義
def start_recording(self):
print("録音開始")
self.audio = pyaudio.PyAudio()
self.recording = True
self.frames = []
self.stream = self.audio.open(format=pyaudio.paInt16,
channels=1,
rate=44100,
input=True,
frames_per_buffer=1024)
self.start_button.config(state="disabled") # 録音開始ボタンを無効化
self.stop_button.config(state="normal") # 録音停止ボタンを有効化
# 録音を開始する前にrecord_audioメソッドを呼び出す
self.record_audio()
def stop_recording(self):
if self.recording:
print("録音停止")
self.recording = False
self.stream.stop_stream()
self.stream.close()
self.audio.terminate()
self.save_audio()
self.convert_audio_to_text()
self.start_button.config(state="normal") # 録音開始ボタンを有効化
self.stop_button.config(state="disabled") # 録音停止ボタンを無効化
この後の処理の流れとしては、簡単に
緑のボタン(start_recording)では音声ファイルの作成処理
赤のボタンでは音声ファイルからテキストデータへの変換、AIアシスタントを通じて
LLMによる回答の生成を行っている。
start_recording内の各関数の内容
・record_audio
※chatGPTにより作成
音声を吹き込んでいる間は録音モードを継続して音声データをframesに格納
一定時間の無音が検出されたら録音モードを終了してループを抜ける
def record_audio(self):
chunk = 1024 # チャンクサイズ
threshold = 0.01 # 閾値
print("録音中...")
while True:
self.stream = self.audio.open(format=pyaudio.paInt16,
channels=1,
rate=44100,
input=True,
frames_per_buffer=1024)
data = self.stream.read(chunk)
x = np.frombuffer(data, dtype="int16") / 32768.0
if x.max() > threshold:
end = time.time() # 計測終了
print("録音中......")
finish = False
frames = []
# 録音処理
while True:
print("録音中...")
for i in range(0, int(44100 / chunk * 1)):
data = self.stream.read(chunk)
frames.append(data)
ndarray = np.frombuffer(data, dtype="int16") / 32768.0
print("閾値=" + str(ndarray.max()))
if ndarray.max() < 0.3:
zr = 0
while True:
data = self.stream.read(chunk)
ndarray = np.frombuffer(data, dtype="int16") / 32768.0
if zr >= 1:
finish = True
break
if ndarray.max() > threshold:
zr = 0
print(zr)
break
time.sleep(0.01)
zr += 0.01
print("zr:" + str(zr))
if finish:
break
break
print("録音終了")
self.frames = frames
stop_recording内の各関数の内容
・save_audio()
上記framesに格納した音声データを使って音声ファイルを生成
・self.convert_audio_to_text()
save_audio()で作成された音声データを
Whisperを使ってテキスト化→テキストとスレッドをAIアシスタントに渡して
回答を生成
def convert_audio_to_text(self):
try:
model_size = "medium"
model = whisper.load_model(model_size)
result = model.transcribe("record.wav", language="ja")
myspeak = result["text"]
self.chat_history_text.insert(tk.END, f"お客様: {myspeak}\n")
self.chat_history_text.insert(tk.END, "\n")
# スレッドを作成し、メッセージを取得
messages, thread_id = self.create_and_get_thread_messages(client, myspeak)
message_content = messages[0].content[0].text
annotated_message = self.annotate_message_content(message_content, client)
print(annotated_message)
# 実行を作成し、その完了を待つ
run = client.beta.threads.runs.create(thread_id=thread_id, assistant_id=my_assistant_id, tools=[{"type": "retrieval"}])
if self.wait_for_run_completion(client, thread_id, run.id):
messages = client.beta.threads.messages.list(thread_id=thread_id)
print(messages.data[0].content[0].text.value)
responce_text = messages.data[0].content[0].text.value
else:
responce_text = "応答なし"
self.chat_history_text.insert(tk.END, f"受付担当: {responce_text}\n") # 固定の返答
self.chat_history_text.insert(tk.END, "\n")
except Exception as e:
self.chat_history_text.insert(tk.END, f"エラー: {str(e)}\n")
print(str(e))
self.chat_history_text.insert(tk.END, "Chatbot: 回答できません\n")
# スレッドを作成し、メッセージとスレッドIDを取得するための関数
def create_and_get_thread_messages(self,client, initial_message):
thread = client.beta.threads.create(messages=[{"role": "user", "content": initial_message}])
thread_id = thread.id # スレッドIDを取得
messages = client.beta.threads.messages.list(thread_id).data
return messages, thread_id # メッセージリストとスレッドIDを返す
# メッセージの内容に引用を加える関数
def annotate_message_content(self,message_content, client):
annotations = message_content.annotations
citations = []
for index, annotation in enumerate(annotations):
# テキストを脚注に置き換える
message_content.value = message_content.value.replace(annotation.text, f' [{index}]')
# 注釈属性に基づいて引用を収集
citation_text = self.get_citation_text(client, annotation, index)
if citation_text:
citations.append(citation_text)
return message_content.value + '\n' + '\n'.join(citations)
# 引用テキストを取得するためのヘルパー関数
def get_citation_text(self,client, annotation, index):
if file_citation := getattr(annotation, 'file_citation', None):
cited_file = client.files.retrieve(file_citation.file_id)
return f'[{index}] {file_citation.quote} from {cited_file.filename}'
elif file_path := getattr(annotation, 'file_path', None):
cited_file = client.files.retrieve(file_path.file_id)
return f'[{index}] Click <here> to download {cited_file.filename}'
return None
# 実行が完了するまで待つ関数
def wait_for_run_completion(self,client, thread_id, run_id):
total_attempts = 20 # 状態確認の最大試行回数
for _ in range(total_attempts):
run = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run_id)
if run.status == 'completed':
print('検索が完了しました\n')
return True
elif run.status == 'failed':
print('検索に失敗しました')
print(run.last_error)
return False
time.sleep(0.5)
return False
今回はGPUの都合上、Whisperの「meadium」モデルを使用
上位モデルを使いたい場合は、「large-v2」「large-v3」と変更
※largeモデルを使うには VRAMは10GBは最低でも必要と思われます。
テキスト化後は、myspeakに格納され、画面の会話ボックスに挿入されます。
model_size = "medium"
model = whisper.load_model(model_size)
result = model.transcribe("record.wav", language="ja")
myspeak = result["text"]
self.chat_history_text.insert(tk.END, f"お客様: {myspeak}\n")
self.chat_history_text.insert(tk.END, "\n")
ここでスレッドの立ち上げと今回チャットを含めた会話履歴を呼び出す。
※このコードだと、しりとりで回答するたびに新規にスレッドIDを作成しているため
あまり、スレッド機能を活かせていない・・・
# スレッドを作成し、メッセージを取得
messages, thread_id = self.create_and_get_thread_messages(client, myspeak)
message_content = messages[0].content[0].text
# スレッドを作成し、メッセージとスレッドIDを取得するための関数
def create_and_get_thread_messages(self,client, initial_message):
thread = client.beta.threads.create(messages=[{"role": "user", "content": initial_message}])
thread_id = thread.id # スレッドIDを取得
messages = client.beta.threads.messages.list(thread_id).data
return messages, thread_id # メッセージリストとスレッドIDを返す
スレッドとアシスタントAIにあらかじめ指定された役割設定に基づき、
回答を生成する。
回答が生成中か完了しているかの判断を
wait_for_run_completion関数で行っている。
※割と無理くりのため、もう少しopenai側で最適化されるらしい
そして回答結果を会話ボックスに挿入する。
# 実行を作成し、その完了を待つ
run = client.beta.threads.runs.create(thread_id=thread_id, assistant_id=my_assistant_id, tools=[{"type": "retrieval"}])
if self.wait_for_run_completion(client, thread_id, run.id):
messages = client.beta.threads.messages.list(thread_id=thread_id)
print(messages.data[0].content[0].text.value)
responce_text = messages.data[0].content[0].text.value
else:
responce_text = "応答なし"
self.chat_history_text.insert(tk.END, f"受付担当: {responce_text}\n") # 固定の返答
self.chat_history_text.insert(tk.END, "\n")
# 実行が完了するまで待つ関数
def wait_for_run_completion(self,client, thread_id, run_id):
total_attempts = 20 # 状態確認の最大試行回数
for _ in range(total_attempts):
run = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run_id)
if run.status == 'completed':
print('検索が完了しました\n')
return True
elif run.status == 'failed':
print('検索に失敗しました')
print(run.last_error)
return False
time.sleep(0.5)
return False
補足
今回は処理に使っていないが、
コード内に注釈に関する記述がある。
# メッセージの内容に引用を加える関数
def annotate_message_content(self,message_content, client):
annotations = message_content.annotations
citations = []
for index, annotation in enumerate(annotations):
# テキストを脚注に置き換える
message_content.value = message_content.value.replace(annotation.text, f' [{index}]')
# 注釈属性に基づいて引用を収集
citation_text = self.get_citation_text(client, annotation, index)
if citation_text:
citations.append(citation_text)
return message_content.value + '\n' + '\n'.join(citations)
# 引用テキストを取得するためのヘルパー関数
def get_citation_text(self,client, annotation, index):
if file_citation := getattr(annotation, 'file_citation', None):
cited_file = client.files.retrieve(file_citation.file_id)
return f'[{index}] {file_citation.quote} from {cited_file.filename}'
elif file_path := getattr(annotation, 'file_path', None):
cited_file = client.files.retrieve(file_path.file_id)
return f'[{index}] Click <here> to download {cited_file.filename}'
return None
これらの注釈に対応する際に用いる処理らしい。
・file_quote : ファイルの引用は、Retrievalによって作成される。回答を生成するために使用した特定のファイル内の特定の引用への参照が含まれる。
・file_path : ファイルパスの注釈は、Code Interpreterによって作成される。ツールによって生成されたファイルへの参照が含まれる。
結果
GIF
早送りしています。