1.はじめに
タイトル通りですが、AIボイスチャットを作成しました。
なお、当記事の内容について、2023/3/2にChatGPT APIとWhisper APIが発表されたのでそちらを利用した方が良いかもしれません。
(開発してから記事化するのが遅過ぎました…)
ですがせっかくなので備忘録として残しておきます。
2.各種インストール
2-1.ソフトのインストール
-
COEIROINK
無料で使えるAI音声合成ソフトです。
返答の音声に使用するためインストールします。 -
FFmpeg
動画や音声を扱うためのソフトです。
whisper
やpydub
で必要なためインストールします。
2-2.ライブラリインストール
pip install transformers sentencepiece torch requests pyaudio pydub
pip install git+https://github.com/openai/whisper.git
必要なライブラリをインストールします。
3.コード解説
大体コード内に書いてあるコメントの通りです。
3-1.アプリケーション(COEIROINK)の起動
クリックしてコードを表示
import subprocess
from subprocess import PIPE
import sys
import requests
def launch(application_path, url):
# アプリケーションを起動
process = subprocess.Popen(application_path, stdout=subprocess.PIPE)
# カウンタ初期値設定
cnt = 1
# カウンタ上限設定
cnt_limit = 10
print("アプリケーションにリクエストを送信し、起動確認をします。")
# アプリケーションが起動するまでリクエストを送信し続けて起動確認
while True:
try:
print("アプリケーションに" + str(cnt) + "回目のリクエストを送信します。")
print("アプリケーションの起動確認中………")
# POSTで指定URLにリクエストを送信して起動確認
requests.post(url)
print("起動しました")
break
except:
cnt += 1
if cnt > cnt_limit:
print("リクエストの送信回数が" + str(cnt_limit) + "回を超えました。処理の実行を終了します。")
# プロセスを終了
process.kill()
sys.exit()
else:
continue
return process
指定されたアプリケーション(COEIROINK)を起動するやつです。
指定されたURLにリクエストを送信し続けて、アプリケーションが起動したかどうかを確認します。
リクエスト送信回数は今回10
で設定しています。
任意の数を設定してください。
今回は上記の方法でアプリケーションの起動と確認を行っていますが、より良い方法があればコメントやTwitterで教えてください。
3-2.音声の入出力
クリックしてコードを表示
import pyaudio
import wave
from pydub import AudioSegment
from pydub.playback import play
# 録音データの保存先パスを指定
input_path = ".\input.wav"
# wav形式で録音
def recsound(record_time):
# 録音時のパラメータを指定
CHUNK = 2**10
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
# インスタンス生成
p = pyaudio.PyAudio()
# パラメータを設定
stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK
)
# 録音開始のメッセージを指定
print(str(record_time) + "秒間録音します。何か話しかけてください。")
# 録音開始
frames = []
for i in range(0, int(RATE / CHUNK * record_time)):
data = stream.read(CHUNK)
frames.append(data)
# 録音終了のメッセージを表示
print("録音が完了しました。")
# 録音終了
stream.stop_stream()
stream.close()
p.terminate()
# 録音した音声をwav形式で保存
wf = wave.open(input_path, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(p.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(b''.join(frames))
wf.close()
# 録音データのパスを返す
return input_path
# wavの音声を再生(path指定ではない)
def playsound(output_wav):
# 再生時のパラメータを指定
WIDTH = 2
CHANNELS = 1
RATE = 44100
# パラメータを設定
sound = AudioSegment(output_wav,
sample_width=WIDTH,
channels=CHANNELS,
frame_rate=RATE
)
# 再生
play(sound)
音声の入力(録音)と返答の音声の出力(再生)を行うやつです。
録音データの保存先パスは任意のものを指定してください。
3-3.音声認識
クリックしてコードを表示
import whisper
# モデルを指定してロード
whisper_model = whisper.load_model("small")
# 認識する言語を日本語に指定
language = 'ja'
# 音声データをテキストに変換
def convert_voice_to_text(audio):
# 音声認識を行い結果を取得
result = whisper_model.transcribe(
audio,
verbose=True,
language=language,
fp16=False
)
result_text = result['text']
# 認識結果のテキストを返す
return result_text
音声認識するやつです。
入力(録音)した音声データをテキストに変換します。
モデルは"small"
を指定しています。
認識する言語は日本語に指定しています。
3-4.返答の文章生成
クリックしてコードを表示
# Transformerライブラリを介してモデル呼び出し
from transformers import T5Tokenizer, AutoModelForCausalLM
# 正規表現
import re
# テンソルとモデルのGPU/CPUを指定・切り替え
import torch
# モデル名指定
model_name = 'rinna/japanese-gpt-1b'
# tokenizerとmodelの生成
# tokenizerで文字列をトークンにエンコード/デコード
tokenizer = T5Tokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)
# GPUが使用可能な環境かどうかを判定
if torch.cuda.is_available():
# モデルのGPU/CPUを切り替え(デフォルトはCPU。サポートされているGPUはNVIDIA製のみ)
# GPUを使うことで文章生成時間を短縮(GPUは演算能力・並列処理能力に優れており、CPUよりも適しているため(ディープラーニングで行う行列演算と三次元グラフィックス処理時の行列演算が同じ))
# GPU(CPU)上にモデルのコピーを作成して上書き
model = model.to("cuda")
# 受け取ったメッセージに対して文章を生成する処理
# input_messageにチャットボットに返答させる文章を格納
def generate_text(input_message):
# 受け取ったメッセージをtokenizerでエンコード
# 発言者(ユーザーとAI)を区別することで対話文と認識させる
input_ids = tokenizer.encode(
"私: " + input_message + "\nAI: ",
# スペシャルトークンの自動追加を無効(自分で追加するため)
add_special_tokens=False,
# PyTorchのテンソル型で返す(Transformerモデルの入力でテンソル形式に変換する必要があるため)
# 機械学習におけるテンソルは多次元配列とほぼ同義(=行列。GPUとテンソルの計算の相性が良い)
return_tensors="pt"
)
# テンソル計算時の勾配情報を持たないことでメモリ消費を抑える
# 勾配はU字のグラフをイメージ。最も予測誤差の少なくなるような結果(損失関数を最小にする重み(=最適な重み))を求める
# その際の最大値から最小値へと数値が移動する(0に近づく)勾配を差す
# 値が推移する→勾配(=変数)の値が変化(=更新)→変化していく値の記録を保持しない(出力される結果(=最小値)のみ)ことでメモリ消費を抑える(変数の前後で値を比較?(調査不足))
# 【ざっくり】より良い結果を求めていく際の途中経過の計算の値(イメージとしてはU字グラフの点の値(=変数))を保持しない(=結果のみ)(少し語弊あり。詳しくは勾配降下法や誤差逆伝播法で調べてみてください)
with torch.no_grad():
# generate関数で文章生成
output_sequences = model.generate(
# テンソル型のエンコードしたメッセージ
# テンソルのGPU/CPUをモデルと同じデバイス(GPU/CPU)に切り替え(PyTorchではモデル学習時にモデルとデータを同じデバイスに置く必要がある)
# デバイス(GPU/CPU)上にエンコードしたメッセージのコピーを作成
input_ids.to(model.device),
# 文章生成文字数上限
max_length=50,
# 文章生成文字数下限
min_length=10,
# 値が1に近づくほどクリエイティブな文章、0に近づくと論理的で正確な返答になる…らしい
temperature=0.9,
# 確率の高い上位k個の候補の単語からランダムに選択
top_k=50,
# 確率の高い上位候補の単語の確率の合計がpを超えるような最小個数の候補を動的に選択(1>p>=0)
top_p=0.95,
# 1に近づけると同じ文章の繰り返しを減少する効果
repetition_penalty=1.0,
# サンプリングを有効化
do_sample=True,
# 一度の実行で生成する文章の数を指定
num_return_sequences=1,
# モデルの入力に必要な情報となるスペシャルトークンを指定
# PAD(Padding)は使用されていない部分を埋めるためのトークン
pad_token_id=tokenizer.pad_token_id,
# BOS(Beggin Of Sentence)はシーケンス(文章)の始まりを表すトークン
bos_token_id=tokenizer.bos_token_id,
# EOS(End Of Sentence)はシーケンス(文章)の終わりを表すトークン
eos_token_id=tokenizer.eos_token_id,
# 生成が許可されていないトークンIDリストを設定(禁止用語等の設定で使用)
# UNK(Unknown)は未知の語彙を表すトークン
bad_word_ids=[[tokenizer.unk_token_id]]
)
# トークンの状態の生成された文章が二次元リストで格納されているので二次元リスト0番目のリストを取得
output_sequence = output_sequences.tolist()[0]
# 生成された文章をtokenizerでデコード
text = tokenizer.decode(
output_sequence,
# 余分に生成されたスペースを削除
clean_up_tokenization_spaces=True
)
# 最初に与えた入力文の長さを取得
input_ids_length = len(tokenizer.decode(
input_ids[0],
# 余分に生成されたスペースを削除
clean_up_tokenization_spaces=True
))
# デコードされた文章から最初に与えた入力文を除去
total_text = (text[input_ids_length:])
# 生成文にpatternにある文字列が含まれていた場合それ以降の文字列を除去。含まれていなかった場合はそのまま出力
pattern = r"AI:|私:|俺:|僕:|あなた:|<unk>|</s>|[UNK]"
if re.search(pattern, total_text) != None:
# 文章中のpatternにある文字列の位置を全て抽出
match_position = [match.span() for match in re.finditer(pattern, total_text)]
# 一番最初に出現したpatternにある文字列の位置を抽出
first_match_position = match_position[0][0]
# 一番最初に出現したpatternにある文字列以降の文字列を除去
edited_text = total_text[:first_match_position]
# 生成された文章を返す
return edited_text
else:
# 生成された文章を返す
return total_text
入力した内容に対して返答の文章を生成するやつです。
詳しくは下記記事を参照してください。
コードもほぼそのままです。
3-5.音声合成(COEIROINK)
クリックしてコードを表示
import json
import requests
# スピーカーIDを指定(0:つくよみちゃん)
speaker_id = 0
# 引数にセリフを指定
def speech_synthesis(text):
# 音声合成のクエリの作成
response = requests.post(
"http://localhost:50031/audio_query",
params={
'text': text,
'speaker': speaker_id,
'core_version': '0.0.0'
})
query = response.json()
# 音声合成のwavの生成
response = requests.post(
'http://localhost:50031/synthesis',
params={
'speaker': speaker_id,
'core_version': "0.0.0",
'enable_interrogative_upspeak': 'true'
},
data=json.dumps(query))
return response.content
COEIROINKを使用して音声合成するやつです。
セリフに生成した返答の文章を指定し、COEIROINKに対してリクエストを送信して音声合成のwav
を生成します。
音声は今回つくよみちゃんを使用しています。
3-6.全て実行
クリックしてコードを表示
import LaunchApplication
import SoundRecAndPlay as sound
import ConvertVoiceToText
import GenerateText
import SpeechSynthesis
def main():
try:
print("処理を開始します。終了する際はターミナルに「Ctrl+C」を入力してください。")
# アプリケーションの起動
process = LaunchApplication.launch(application, url)
while True:
# 録音データのパスと録音時間を指定して録音
input_sound = sound.recsound(record_time)
# input_soundに保存された録音データをConvertVoiceToTextでテキストに変換
input_text = ConvertVoiceToText.convert_voice_to_text(input_sound)
# 録音データ削除
os.remove(input_sound)
# input_textをGenerateTextに投げて生成された文章を受け取る
result_text = GenerateText.generate_text(input_text)
print("AI:" + result_text)
# result_textをSpeechSynthesisに与えて音声合成のwavを生成
response = SpeechSynthesis.speech_synthesis(result_text)
# responseを再生
sound.playsound(response)
except KeyboardInterrupt:
# プロセスを終了
process.kill()
print("処理を終了しました。")
# COEIROINKのパスとローカルAPIのURLを指定
application = r"COEIROINKのパスを記載"
url = "http://localhost:50031/"
# 録音時間を指定
record_time = 5
if __name__ == '__main__':
main()
これまでの処理をこちらで全て実行します。
COEIROINKのパスは自分のものを指定してください。
録音時間は任意の数を指定してください。
今回は5
秒で指定しています。
あとは上記コピペでそのまま使用できます。
4.実行結果
実行するとこんな感じです。
※動画の音量がかなり小さいです。
5.おわりに
メモリ消費が激しい点やレスポンスにラグがある点がネックなので、改善出来るとより良さそうです。
(前述したChatGPT APIやWhisper APIに変更することで解決する部分もあるかもしれません)
改善点や指摘等ありましたらコメントまたはTwitterでいただけると幸いです。
6.参考記事