LoginSignup
1
0
音声認識APIを使ってみよう!

TRPGのボイスセッションのログを感情分析して、リプレイのハイライトを作成する

Last updated at Posted at 2024-05-17

GM:じゃあ、熱さに耐えかね、樽の陰に隠れていたゴブリンが現れる。その数は、えーと。1d6……6体だ。
PL1:最大値だしちゃった。
PL2:うわあ。
PL3:ぎゅうぎゅうじゃないですか(笑)
GM:PL3が先行ですね。
PL3:じゃあ攻撃します。あれっ、外しちゃった。ひどいですねこれは(笑)

みなさんはTRPGやマダミスをされるでしょうか?
ボイセ派でしょうか。テキセ派でしょうか?

みんなでワイワイ集まって、話し合ったり、サイコロを転がして事件を解決したり冒険するような遊びという感じでしょうか。

ボイセ/テキセというのは、ボイスセッション/テキストセッションの略です。
通話して遊ぶかチャットして遊ぶかというものです。
半テキ(半々)というのもあります。

ボイセ・テキセのどちらにもそれぞれの良さがあります。

私は、長らくテキセを中心に遊んでいたのですが、ここ数年はボイセ環境にも身を置くようになりました。

リプレイが欲しい……っ!

どんなセッションも、必ずドラマがあります。

シナリオには全く関係がない道端で小石を拾い、悪乗りして名前を付けて、げらげら笑ったかと思ったら、それが最後の起死回生のアイテムになる。そういうわけわかんないことが起きるわけです。

この思い出を誰かに共有したい。そんなとき、冒険を記したリプレイを記すことができます。

一部の遊び仲間は、OBS Studioを使用して録画環境を整え、動画ログを残しています。

テキストセッションであると比較的ログが残りやすく、ココフォリアからだったり、Discordからだったり、いろいろと取得する仕組みもあるのですが、動画だと、肉声ということもあってかね、なかなか卓外メンバーのところまではあがらないんですよね!

なんとかログ化のハードルを下げることができないだろうか……。

そう思ってさいきんは文字起こしの自動化を試して文字起こしにいそしんでいました。

もちろん、台本があってしゃべっているでもなし、このまま音声ログを起こしただけではきれいなログにはなりませんが、1から手で起こすよりは便利です。

文字起こしのほかにリプレイに役に立つ機能はないか?

AmiVoiceのAPIには、話者ダイアライゼーション感情分析があるようです。

話者識別を使えば、誰が発言しているのかラベリングすることができそうです。
(これは結構便利そうですね!)

感情分析を使えば、盛り上がったところ(感情が乗っているところ)を見つけたり、あとはそうですね……例えば、(笑)の付与ができるかもしれません。

一昔前のリプレイっぽくなるかもしれません(笑)

というわけで、目標は話者識別と、感情が強くノっていたところのハイライトです。

テスト用ツール

Pythonでツールを作成しました。

image.png

image.png

image.png

サンプルプログラムを参考にしつつ作りました。
Pythonが作りやすいな……。

1タブ目で解析をしてもらって、2タブ目で結果のJsonを加工して、3タブ目は感情パラメーターで使用する閾値を指定します。

話者ダイアライゼーション

発言者を見分けて、ラベリングをしてもらう機能です。
会議なんかの議事録に便利です。

APIを呼ぶときに話者の数を指定してあげるだけです。

通常の解析よりも時間がかかります。
また、話者ダイアライゼーションを使用すると3時間までとなるので、ファイルは3時間ごとの分割にします。

image.png

問題なくできていそうですね。

speaker0など、個人とは結び付かないラベリングになっているようです。
文字列置換でPL名にできますが、012と順番に出てくるわけでもないらしい点に注意。
ためしに手元のマダミスログで変換してみたところ、識別の精度はまあまあ良かったです。

感情分析

感情分析を使用するのはさほど難しくないものの(sentimentAnalysisをTrueに設定するだけです)、構造が結構めんどくさいので、結果を使うのは少し手間がかかります。

starttimeとendtime(どこからどこまでで検出したか)が入っているので、それに該当する発話を探す必要があります。

マニュアルのパラメーター名が日本語なのですが、中身はちゃんと英語でした。

発話のほかにこのようなパラメーターが得られます。

            {
                "starttime": 345940,
                "endtime": 346640,
                "sentiments": {
                    "energy": 1,
                    "content": 0,
                    "upset": 0,
                    "aggression": 0,
                    "stress": 13,
                    "uncertainty": 26,
                    "excitement": 15,
                    "concentration": 0,
                    "emo_cog": 28,
                    "hesitation": 23,
                    "brain_power": 30,
                    "embarrassment": 0,
                    "intensive_thinking": 38,
                    "imagination_activity": 3,
                    "extreme_emotion": 5,
                    "passionate": 0,
                    "atmosphere": 0,
                    "anticipation": 0,
                    "dissatisfaction": 0,
                    "confidence": 4
                }
            },

これを使用して、マークアップしてみましょう(たんに【】をつけているだけです)。

判定は2秒ごと、starttimeendtimeを見て、時間が一致するところを自分で照合して見つける必要があります。

image.png

たとえば『エネルギー』を53以上、にしてみて、JSONをマークアップします。

image.png

わお、いいですね!
閾値以上のところがマークできました。

見てみると、値が内包していることがあるようです。たとえばABCという会話があったときに、BにもA-Cにも入っていることがあるようです。また、単語ごとというわけでもなくブレスごとなので、たとえば「はいそうすね」といったような結果になることもあるようです。

マークアップを記号にしておいて、「マークアップのあった文(読点まで)」というようにして、含む文全体をとらえて、整理すれば良さそうです。

APIの料金は感情分析では通常の文字起こしに加算されてかかります。
また、ほかのモデルのようにバージョンがないので別モデルを使って節約するか~ということが難しいです。

料金は60分まで無料で、0.04円/1秒なので通常のAPIと変わりません。
(追加でかかる点は注意です。)
うっかりはみだしても、破産はしないくらいですね。

特定の人だけやたら感情が多く出てハイライトされたりするので、閾値でなにかやるなら人ごとに設定出来たらよさそうです。

その他 単語登録もしてみた

せっかくなので、TRPGに特徴的な「2d6」などの言葉を登録してみました。
専門用語(固有名詞)がたくさん出てきます。

image.png

手を動かしてみて思ったこと

文字起こしの結果はJSONなどで保存する

解析にはお金がかかってくるので、結果をJSONに全部入れて保存して、べつすあとからいいように加工しましょう。

モデルを選べるようにしておこう

利用の上限は各モデルごとに決まっています。
どれもそれぞれ60分無料枠があるようなので、節約をしたければテスト期間中はモデルを切り替えられるようにして、複数のモデルを使用すると安心してテストできます。

使える機能を調べておこう

WebSocket インタフェースを使用しようとして、感情分析は非同期 HTTP インタフェース限定の機能であることに気が付きました。
(※マニュアルにちゃんと書いてあります)。
こんな感じで、やりたいことによって使える機能が違ってくるので、適切な仕組みのやつを選びましょう。

サンプルプログラムではC#のアプリが見栄えよくてかなり使いやすそうだったのですが、個人的にはPythonが書きやすかったです。

実際のコード

あまり美しいコードとは言えませんが、このようなコードでした。
1枚ペラですがGithubにもあげました。


import tkinter as tk
from tkinter import filedialog
from tkinter import ttk
import os
import time
import json
import urllib
import logging
import requests
from pydub import AudioSegment
from threading import Thread
import re

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(message)s")

endpoint = 'https://acp-api-async.amivoice.com/v1/recognitions'
app_key = 'YOUR-API-KEY'

models = {
    "会話_汎用": "-a-general",
    "会話_医療": "-a-medgeneral",
    "会話_製薬": "-a-bizmrreport",
    "会話_金融": "-a-bizfinance",
    "会話_保険": "-a-bizinsurance",
    "音声入力_汎用": "-a-general-input",
    "音声入力_医療": "-a-medgeneral-input",
    "音声入力_製薬": "-a-bizmrreport-input",
    "音声入力_金融": "-a-bizfinance-input",
    "音声入力_保険": "-a-bizinsurance-input",
    "音声入力_電子カルテ": "-a-medkarte-input",
    "英語_汎用": "-a-general-en",
    "中国語_汎用": "-a-general-zh",
    "韓国語_汎用": "-a-general-ko"
}

sentiment_parameters = {
    "energy": (0, 100, "エネルギー"),
    "stress": (0, 100, "ストレス"),
    "emo_cog": (1, 500, "感情バランス論理"),
    "concentration": (0, 100, "濃縮"),
    "anticipation": (0, 100, "期待"),
    "excitement": (0, 30, "興奮した"),
    "hesitation": (0, 30, "躊躇"),
    "uncertainty": (0, 30, "不確実"),
    "intensive_thinking": (0, 100, "考える"),
    "imagination_activity": (0, 30, "想像"),
    "embarrassment": (0, 30, "困惑した"),
    "passionate": (0, 30, "情熱"),
    "brain_power": (0, 100, "脳活動"),
    "confidence": (0, 30, "自信"),
    "aggression": (0, 30, "攻撃性憤り"),
    "atmosphere": (-100, 100, "雰囲気会話傾向"),
    "upset": (0, 30, "動揺"),
    "content": (0, 30, "喜び"),
    "dissatisfaction": (0, 30, "不満"),
    "extreme_emotion": (0, 30, "極端な起伏"),
}

class App(tk.Tk):
    def __init__(self):
        super().__init__()
        self.title("Speech Recognition with Diarization and Sentiment Analysis")

        self.notebook = ttk.Notebook(self)
        self.notebook.grid(row=0, column=0, padx=10, pady=10)

        self.tab1 = tk.Frame(self.notebook)
        self.tab2 = tk.Frame(self.notebook)
        self.tab3 = tk.Frame(self.notebook)

        self.notebook.add(self.tab1, text="音声変換")
        self.notebook.add(self.tab2, text="中身の処理")
        self.notebook.add(self.tab3, text="感情パラメーター設定")

        # Tab 1: 音声変換
        self.file_label = tk.Label(self.tab1, text="File Name:")
        self.file_label.grid(row=0, column=0, padx=10, pady=10)
        self.file_entry = tk.Entry(self.tab1, width=50)
        self.file_entry.grid(row=0, column=1, padx=10, pady=10)
        self.file_button = tk.Button(self.tab1, text="Browse", command=self.browse_file)
        self.file_button.grid(row=0, column=2, padx=10, pady=10)

        self.speaker_label = tk.Label(self.tab1, text="Number of Speakers:")
        self.speaker_label.grid(row=1, column=0, padx=10, pady=10)
        self.speaker_spinbox = tk.Spinbox(self.tab1, from_=1, to=20)
        self.speaker_spinbox.grid(row=1, column=1, padx=10, pady=10)

        self.model_label = tk.Label(self.tab1, text="Model:")
        self.model_label.grid(row=3, column=0, padx=10, pady=10)
        self.model_var = tk.StringVar(self.tab1)
        self.model_var.set(list(models.keys())[0])
        self.model_menu = ttk.Combobox(self.tab1, textvariable=self.model_var, values=list(models.keys()))
        self.model_menu.grid(row=3, column=1, padx=10, pady=10)

        self.progress_frame = tk.Frame(self.tab1)
        self.progress_frame.grid(row=4, column=0, columnspan=3, padx=10, pady=10)
        self.progress_bars = []
        self.progress_labels = []

        self.start_button = tk.Button(self.tab1, text="Start", command=self.start_recognition)
        self.start_button.grid(row=5, column=0, columnspan=3, padx=10, pady=10)

        self.results_label = tk.Label(self.tab1, text="Results:")
        self.results_label.grid(row=6, column=0, padx=10, pady=10)
        self.results_text = tk.Text(self.tab1, width=60, height=20)
        self.results_text.grid(row=7, column=0, columnspan=3, padx=10, pady=10)

        self.time_label = tk.Label(self.tab1, text="Time Taken:")
        self.time_label.grid(row=8, column=0, padx=10, pady=10)
        self.time_text = tk.Label(self.tab1, text="")
        self.time_text.grid(row=8, column=1, padx=10, pady=10)

        # Tab 2: 中身の処理
        self.process_button = tk.Button(self.tab2, text="Process Saved Results", command=self.process_saved_results)
        self.process_button.grid(row=0, column=0, padx=10, pady=10)
        self.processed_results_label = tk.Label(self.tab2, text="Processed Results:")
        self.processed_results_label.grid(row=1, column=0, padx=10, pady=10)
        self.processed_results_text = tk.Text(self.tab2, width=60, height=20)
        self.processed_results_text.grid(row=2, column=0, columnspan=3, padx=10, pady=10)

        # Tab 3: 感情パラメーター設定
        self.canvas = tk.Canvas(self.tab3)
        self.scrollbar = ttk.Scrollbar(self.tab3, orient="vertical", command=self.canvas.yview)
        self.scrollable_frame = ttk.Frame(self.canvas)

        self.scrollable_frame.bind(
            "<Configure>",
            lambda e: self.canvas.configure(
                scrollregion=self.canvas.bbox("all")
            )
        )

        self.canvas.create_window((0, 0), window=self.scrollable_frame, anchor="nw")
        self.canvas.configure(yscrollcommand=self.scrollbar.set)

        self.canvas.grid(row=0, column=0, sticky="nsew")
        self.scrollbar.grid(row=0, column=1, sticky="ns")

        self.tab3.grid_rowconfigure(0, weight=1)
        self.tab3.grid_columnconfigure(0, weight=1)

        self.sentiment_vars = {}
        self.markup_vars = {}

        for i, (param, (min_val, max_val, label)) in enumerate(sentiment_parameters.items()):
            self.sentiment_vars[param] = tk.IntVar(value=min_val)
            self.markup_vars[param] = tk.BooleanVar(value=False)

            tk.Label(self.scrollable_frame, text=label).grid(row=i, column=0, padx=5, pady=5)
            tk.Scale(self.scrollable_frame, from_=min_val, to=max_val, orient="horizontal",
                     variable=self.sentiment_vars[param]).grid(row=i, column=1, padx=5, pady=5)
            tk.Checkbutton(self.scrollable_frame, text="Markup", variable=self.markup_vars[param]).grid(row=i, column=2, padx=5, pady=5)

    def browse_file(self):
        file_path = filedialog.askopenfilename(filetypes=[("Audio Files", "*.wav")])
        if file_path:
            self.file_entry.delete(0, tk.END)
            self.file_entry.insert(0, file_path)

    def start_recognition(self):
        filename = self.file_entry.get()
        num_speakers = int(self.speaker_spinbox.get())
        self.selected_model = models[self.model_var.get()]
        self.start_time = time.time()  # 開始時間を記録

        for widget in self.progress_frame.winfo_children():
            widget.destroy()
        self.progress_bars.clear()
        self.progress_labels.clear()

        audio = AudioSegment.from_wav(filename)
        chunk_length_ms = 3 * 60 * 60 * 1000  # 3 hours
        chunks = [audio[i:i + chunk_length_ms] for i in range(0, len(audio), chunk_length_ms)]

        self.num_chunks = len(chunks)
        self.completed_chunks = 0

        for i in range(self.num_chunks):
            label = tk.Label(self.progress_frame, text=f"Chunk {i + 1}/{self.num_chunks}")
            label.grid(row=i, column=0, padx=10, pady=5)
            progress = ttk.Progressbar(self.progress_frame, orient="horizontal", length=300, mode="determinate")
            progress.grid(row=i, column=1, padx=10, pady=5)
            self.progress_labels.append(label)
            self.progress_bars.append(progress)

        self.results_text.delete(1.0, tk.END)
        self.results_text.insert(tk.END, "Starting recognition...\n")

        for i, chunk in enumerate(chunks):
            chunk_filename = f"chunk_{i}.wav"
            chunk.export(chunk_filename, format="wav")
            Thread(target=self.process_chunk, args=(chunk_filename, num_speakers, i)).start()

    def process_chunk(self, chunk_filename, num_speakers, index):
        domain = {
            'grammarFileNames': self.selected_model,
            'loggingOptOut': 'True',
            'contentId': chunk_filename,
            'speakerDiarization': 'True',
            'diarizationMinSpeaker': str(num_speakers),
            'diarizationMaxSpeaker': str(num_speakers),
            'sentimentAnalysis': 'True',
        }
        params = {
            'u': app_key,
            'd': ' '.join([f'{key}={urllib.parse.quote(value)}' for key, value in domain.items()]),
        }
        logger.info(params)

        try:
            with open(chunk_filename, 'rb') as f:
                request_response = requests.post(
                    url=endpoint,
                    data={key: value for key, value in params.items()},
                    files={'a': (chunk_filename, f.read(), 'application/octet-stream')}
                )

            if request_response.status_code != 200:
                logger.error(f'Failed to request - {request_response.content}')
                return

            request = request_response.json()

            if 'sessionid' not in request:
                logger.error(f'Failed to create job - {request["message"]} ({request["code"]})')
                return

            logger.info(request)

            while True:
                result_response = requests.get(
                    url=f'{endpoint}/{request["sessionid"]}',
                    headers={'Authorization': f'Bearer {app_key}'}
                )
                if result_response.status_code == 200:
                    result = result_response.json()
                    if 'status' in result and (result['status'] == 'completed' or result['status'] == 'error'):
                        self.results_text.insert(tk.END, f"Chunk {index + 1} completed.\n")
                        self.update_progress(index, 100)
                        self.save_chunk_result(index, result)
                        self.completed_chunks += 1
                        if self.completed_chunks == self.num_chunks:
                            self.results_text.insert(tk.END, "All chunks processed. Combining results...\n")
                            self.combine_results()
                            self.update_time_label()  # Move the update time label here
                        break
                    else:
                        logger.info(result)
                        self.update_progress(index, 10)
                        time.sleep(10)
                else:
                    logger.error(f'Failed. Response is {result_response.content}')
                    break
        except Exception as e:
            logger.error(f'An error occurred: {e}')
        finally:
            os.remove(chunk_filename)

    def update_progress(self, index, increment):
        self.progress_bars[index]["value"] += increment
        self.update_idletasks()

    def save_chunk_result(self, index, result):
        with open(f"chunk_{index}_result.json", 'w', encoding='utf-8') as f:
            json.dump(result, f, ensure_ascii=False, indent=4)

    def update_time_label(self):
        elapsed_time = time.time() - self.start_time
        self.time_text.config(text=f"{elapsed_time:.2f} seconds")

    def combine_results(self):
        combined_result = {"speakers": {}, "sentiments": {}}
        for i in range(self.num_chunks):
            chunk_result_filename = f"chunk_{i}_result.json"
            with open(chunk_result_filename, 'r', encoding='utf-8') as f:
                chunk_result = json.load(f)
                if "segments" in chunk_result:
                    for segment in chunk_result["segments"]:
                        if "results" in segment:
                            for result in segment["results"]:
                                for token in result["tokens"]:
                                    speaker = token.get("label", "Unknown Speaker")
                                    text = token.get("written", "")
                                    if speaker not in combined_result["speakers"]:
                                        combined_result["speakers"][speaker] = []
                                    combined_result["speakers"][speaker].append({
                                        "text": text,
                                        "starttime": token.get("starttime"),
                                        "endtime": token.get("endtime")
                                    })
                if "sentiment_analysis" in chunk_result:
                    for sentiment_segment in chunk_result["sentiment_analysis"]["segments"]:
                        starttime = sentiment_segment.get("starttime")
                        endtime = sentiment_segment.get("endtime")
                        sentiments = sentiment_segment  # Store the whole sentiment segment
                        del sentiments["starttime"]
                        del sentiments["endtime"]
                        if speaker not in combined_result["sentiments"]:
                            combined_result["sentiments"][speaker] = []
                        combined_result["sentiments"][speaker].append({
                            "starttime": starttime,
                            "endtime": endtime,
                            "sentiments": sentiments
                        })

            os.remove(chunk_result_filename)

        with open("combined_result.json", 'w', encoding='utf-8') as f:
            json.dump(combined_result, f, ensure_ascii=False, indent=4)
        
        self.process_saved_results()

    def process_saved_results(self):
        with open("combined_result.json", 'r', encoding='utf-8') as f:
            combined_result = json.load(f)
        
        final_text = ""
        for speaker, texts in combined_result["speakers"].items():
            combined_text = ''.join([text["text"] for text in texts])
            combined_text = re.sub(r' (?=[^\x00-\x7F])', '', combined_text)
            
            # Markup the excitement parts
            if speaker in combined_result["sentiments"]:
                for sentiment in combined_result["sentiments"][speaker]:
                    for param, threshold in self.sentiment_vars.items():
                        if self.markup_vars[param].get():
                            if param in sentiment["sentiments"] and sentiment["sentiments"][param] > threshold.get():
                                combined_text = self.markup_excitement(combined_text, texts, sentiment["starttime"], sentiment["endtime"], param)
            
            final_text += f"{speaker}: {combined_text}\n"
        
        self.processed_results_text.delete(1.0, tk.END)
        self.processed_results_text.insert(tk.END, final_text)

    def markup_excitement(self, combined_text, texts, start, end, param):
        for text in texts:
            if text["starttime"] >= start and text["endtime"] <= end:
                combined_text = combined_text.replace(text["text"], f"{param.upper()}: {text['text']}")
        return combined_text

if __name__ == "__main__":
    app = App()
    app.mainloop()


1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0