3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Google Cloud Speech-to-Text の始め方

Posted at

Google Cloud Speech-to-Text を使うと、音声ファイルやリアルタイムストリーミング音声をテキストに変換することができます。この記事では、初めて使う方を想定して、設定から簡単なサンプルコード、そしてウェイクワード検出によるコマンド受付の例課金に関する注意事項コスト削減策までを紹介します。


1. Google Cloud Speech-to-Text とは?

Google Cloud Speech-to-Text は、Google Cloud が提供している音声認識サービスです。さまざまな言語や環境 (電話音声、動画音声、ストリーミングなど) に対応しており、強力な音声認識エンジンを活用できます。


2. 前提となる環境

  • Google Cloud Platform (GCP) アカウント
    無料枠があるので、まだお持ちでない場合は 無料トライアル でセットアップ可能です。
  • Google Cloud プロジェクト(プロジェクト ID が用意されている状態)
  • Cloud SDK (gcloud コマンド) が使える環境 (ローカルPC等)
    ローカル開発環境で Python や Node.js などから呼び出すケースを想定しています。

3. Speech-to-Text API の有効化

  1. GCPコンソールにアクセス
    Google Cloud Console にログインし、対象のプロジェクトを選択します。

  2. API とサービスライブラリ を開く
    「>_ APIとサービス」から「ライブラリ」を開きます。

  3. 「Speech-to-Text API」 を検索して選択し、有効化する

    • 「Google Cloud Speech-to-Text API」などの表記の場合もあります。
    • 「有効にする」ボタンを押して API を有効化します。
  4. サービスアカウントを作成(任意)

    • GCP 上のリソースにアクセスするための サービスアカウント を必要に応じて作成し、JSON形式の鍵ファイルをダウンロードしておきます。
    • 後述の認証設定で利用します。

4. 認証設定 (サービスアカウントキー)

ローカルやサーバー環境で Speech-to-Text を呼び出す場合は、GCP への認証が必要です。

  1. サービスアカウントキーの作成

    • サービスアカウントのページ で、「キーを管理」→「鍵を追加」→「新しい鍵を作成」と進み、JSON 形式を選択して作成します。
    • JSONファイルがダウンロードされます。
  2. 環境変数 GOOGLE_APPLICATION_CREDENTIALS の設定

    • ダウンロードした JSON ファイルのパスを環境変数に設定します。
      export GOOGLE_APPLICATION_CREDENTIALS="/path/to/your-service-account-file.json"
      
    • あるいは、プログラムコードの中で明示的にキーのパスを指定する方法もあります。
  3. Cloud Shell を使う場合

    • Cloud Shell 上ではデフォルトで認証済みなので、追加の設定が不要なことが多いです。
    • ただし API 呼び出しに必要な権限が付与されたサービスアカウントを使っているかどうかだけ確認してください。

5. サンプルコード (Python 編)

ここでは、Python クライアントライブラリを使って、ローカルにある音声ファイルをテキスト変換する例を紹介します。

5.1. Python クライアントライブラリのインストール

pip install google-cloud-speech

5.2. 簡単なサンプルコード

import os
from google.cloud import speech

def transcribe_audio(local_file_path: str) -> str:
    """
    ローカルの音声ファイルを読み込んでテキスト変換した結果を返す
    """
    # 認証情報のファイルパス
    # 既に環境変数が設定されている場合、以下の設定は不要
    # os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/path/to/your-key.json"

    client = speech.SpeechClient()

    # 音声ファイルをバイナリで読み込む
    with open(local_file_path, "rb") as audio_file:
        content = audio_file.read()

    # Speech-to-Text に渡すパラメータを構築
    audio = speech.RecognitionAudio(content=content)
    config = speech.RecognitionConfig(
        encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,  # wav 等の場合
        sample_rate_hertz=16000,  # 音声ファイルのサンプリングレートに合わせる
        language_code="ja-JP",    # 日本語の場合
        enable_automatic_punctuation=True,  # 句読点を自動付与する
    )

    # 音声認識をリクエスト
    response = client.recognize(config=config, audio=audio)

    # 結果をまとめて返す
    transcript = ""
    for result in response.results:
        transcript += result.alternatives[0].transcript

    return transcript

if __name__ == "__main__":
    file_path = "sample.wav"  # テスト用の音声ファイルパスを指定
    result_text = transcribe_audio(file_path)
    print("Transcribed text:", result_text)
  • 音声ファイルが WAV (リニアPCM) の場合は LINEAR16 を指定し、サンプリングレートはファイルに合わせます。
  • MP3 などの場合は AudioEncoding を適切に変更する必要があります(公式ドキュメントを参照)。

6. リアルタイムストリーミングの例 (Python)

リアルタイムの音声入力を文字起こしする例です。実装はやや複雑ですが、マイク入力などに応じて都度クラウド側へデータを送る構成が可能です。

import os
import sys
import time
from google.cloud import speech
import pyaudio
from six.moves import queue

RATE = 16000
CHUNK = int(RATE / 10)  # 100ms フレーム

class MicrophoneStream(object):
    def __init__(self, rate, chunk):
        self._rate = rate
        self._chunk = chunk
        self._buff = queue.Queue()
        self._closed = True

    def __enter__(self):
        self._audio_interface = pyaudio.PyAudio()
        self._audio_stream = self._audio_interface.open(
            format=pyaudio.paInt16,
            channels=1,
            rate=self._rate,
            input=True,
            frames_per_buffer=self._chunk,
            stream_callback=self._fill_buffer,
        )
        self._closed = False
        return self

    def __exit__(self, type, value, traceback):
        self._audio_stream.stop_stream()
        self._audio_stream.close()
        self._closed = True
        self._buff.put(None)
        self._audio_interface.terminate()

    def _fill_buffer(self, in_data, frame_count, time_info, status_flags):
        self._buff.put(in_data)
        return None, pyaudio.paContinue

    def generator(self):
        while not self._closed:
            chunk = self._buff.get()
            if chunk is None:
                return
            data = [chunk]

            while True:
                try:
                    chunk = self._buff.get(block=False)
                    if chunk is None:
                        return
                    data.append(chunk)
                except queue.Empty:
                    break
            yield b"".join(data)

def listen_print_loop(responses):
    for response in responses:
        if not response.results:
            continue
        result = response.results[0]
        if not result.alternatives:
            continue
        transcript = result.alternatives[0].transcript
        print("Transcript: {}".format(transcript))

def main():
    # os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/path/to/your-key.json"
    client = speech.SpeechClient()
    config = speech.RecognitionConfig(
        encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
        sample_rate_hertz=RATE,
        language_code="ja-JP",
        enable_automatic_punctuation=True,
    )
    streaming_config = speech.StreamingRecognitionConfig(
        config=config,
        interim_results=True,  # 中間結果を随時受け取る
    )

    with MicrophoneStream(RATE, CHUNK) as stream:
        audio_generator = stream.generator()
        requests = (
            speech.StreamingRecognizeRequest(audio_content=content)
            for content in audio_generator
        )
        responses = client.streaming_recognize(streaming_config, requests)
        listen_print_loop(responses)

if __name__ == "__main__":
    main()
  • pyaudio を使ってマイク入力を取得し、それをストリーミングで送信しています。
  • interim_results=True を有効にすると、最終結果が確定する前の段階の結果も逐次受け取れます。

7. 料金と注意点

  • 無料枠
    Google Cloud Speech-to-Text には毎月 60 分間の無料枠があります(2025年1月時点)。超過分は従量課金となります。
    最新の料金情報は 公式ドキュメント を参照してください。

  • 音声ファイルのフォーマット
    音質やサンプリングレートを適切に指定しないと、正しい認識結果を得られない場合があります。

  • API リクエスト上限やクォータ
    プロジェクトごとにクォータが設定されているので、テスト量が多い場合はクォータを確認・引き上げ申請が必要になることがあります。


8. まとめ

  1. GCPプロジェクトを用意する。
  2. Speech-to-Text API を有効化する。
  3. サービスアカウントキー (JSONファイル) をダウンロードし、認証を設定する。
  4. クライアントライブラリ (Python など) を使ってリクエストを送る。
  5. 必要に応じてリアルタイムストリーミングや言語・サンプリングレートの指定を行う。

ここまでの手順を踏めば、Google Cloud Speech-to-Text を使った音声認識を簡単に始めることができます。アプリケーションや業務システムへの組み込みはもちろん、音声議事録の作成や動画の字幕生成など、さまざまなユースケースに応用できるので、ぜひ試してみてください。


9. ウェイクワードでコマンド実行のサンプル

ここからは、ウェイクワード(ここでは「コンピュータ」)を検出した後にコマンドを受け付け、そのコマンドを表示するサンプルプログラムを紹介します。前述のストリーミング音声認識の例をベースにしており、Python でマイク入力を取り込みながらリアルタイムで Google Cloud Speech-to-Text へ送信します。

9.1. サンプルコード

import os
import sys
import time
from google.cloud import speech
import pyaudio
from six.moves import queue

# -----------------------------
# ウェイクワード & コマンド
# -----------------------------
WAKE_WORD = "コンピュータ"

SUPPORTED_COMMANDS = {
    "ログイン": "LOGIN",
    "発進": "ENGAGE",
    "被害報告": "REPORT",
    "ログアウト": "LOGOUT",
    "シャットダウン": "SHUTDOWN"
}

RATE = 16000
CHUNK = int(RATE / 10)  # 100ms フレーム

class MicrophoneStream(object):
    """マイク入力を取得してジェネレータで返すクラス"""
    def __init__(self, rate, chunk):
        self._rate = rate
        self._chunk = chunk
        self._buff = queue.Queue()
        self._closed = True

    def __enter__(self):
        self._audio_interface = pyaudio.PyAudio()
        self._audio_stream = self._audio_interface.open(
            format=pyaudio.paInt16,
            channels=1,
            rate=self._rate,
            input=True,
            frames_per_buffer=self._chunk,
            stream_callback=self._fill_buffer,
        )
        self._closed = False
        return self

    def __exit__(self, type, value, traceback):
        self._audio_stream.stop_stream()
        self._audio_stream.close()
        self._closed = True
        self._buff.put(None)
        self._audio_interface.terminate()

    def _fill_buffer(self, in_data, frame_count, time_info, status_flags):
        """マイクから受信した音声データをバッファに詰める"""
        self._buff.put(in_data)
        return None, pyaudio.paContinue

    def generator(self):
        """バッファからデータを取り出し、音声のチャンクを yield する"""
        while not self._closed:
            chunk = self._buff.get()
            if chunk is None:
                return
            data = [chunk]

            while True:
                try:
                    chunk = self._buff.get(block=False)
                    if chunk is None:
                        return
                    data.append(chunk)
                except queue.Empty:
                    break
            yield b"".join(data)

def listen_print_loop_with_wake_word(responses):
    """
    Speech-to-Text からの認識結果をループで受け取り、
    ウェイクワード(コンピュータ) 検出後にコマンドが言われたか判定して出力する
    """
    awaiting_command = False

    for response in responses:
        if not response.results:
            continue

        result = response.results[0]
        if not result.alternatives:
            continue

        # 認識結果テキスト(中間 or 最終)を取得
        transcript = result.alternatives[0].transcript.strip()

        # interim_results=True の場合、途中の結果も返ってくる
        # finalized (確定結果) かどうかは result.is_final で判定
        if result.is_final:
            print(f"Recognized (final): {transcript}")

            # ウェイクワード未検出の場合
            if not awaiting_command:
                # ウェイクワード"コンピュータ"が含まれているか
                if WAKE_WORD in transcript:
                    awaiting_command = True
                    print("ウェイクワード検出!コマンドをお話しください...")
            else:
                # すでにウェイクワード検出済み → 次のコマンド待ち
                detected_command = None
                for key in SUPPORTED_COMMANDS:
                    if key in transcript:
                        detected_command = SUPPORTED_COMMANDS[key]
                        break

                if detected_command:
                    print(f"コマンド検出: {detected_command}")
                else:
                    print("有効なコマンドが見つかりませんでした...")

                # コマンド受け付けが完了したので、またウェイクワード待ちに戻る
                awaiting_command = False

def main():
    # もしサービスアカウントキーをローカルで指定する場合は環境変数を設定
    # os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/path/to/your-key.json"

    client = speech.SpeechClient()
    config = speech.RecognitionConfig(
        encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
        sample_rate_hertz=RATE,
        language_code="ja-JP",
        enable_automatic_punctuation=True,
    )
    streaming_config = speech.StreamingRecognitionConfig(
        config=config,
        interim_results=True,  # 中間結果を随時受け取る(デモしやすくするため有効化)
    )

    with MicrophoneStream(RATE, CHUNK) as stream:
        audio_generator = stream.generator()
        requests = (
            speech.StreamingRecognizeRequest(audio_content=content)
            for content in audio_generator
        )
        # リアルタイムで認識結果を取得
        responses = client.streaming_recognize(streaming_config, requests)

        # ウェイクワード & コマンド検出
        listen_print_loop_with_wake_word(responses)

if __name__ == "__main__":
    main()

9.2. 実行手順

  1. ライブラリのインストール
    pip install google-cloud-speech pyaudio six
    
  2. サービスアカウントキー (JSON) を準備し、環境変数を設定
    export GOOGLE_APPLICATION_CREDENTIALS="/path/to/your-service-account-file.json"
    
    もしくは、コード中で os.environ["GOOGLE_APPLICATION_CREDENTIALS"] を設定します。
  3. 上記のスクリプトを実行
    python your_script.py
    
  4. マイクに向かって話す
    • まず「コンピュータ」と言ってウェイクワードを検出させる。
    • 検出されたら、「ログイン」「発進」「被害報告」など、サポートされているコマンドを話す。
    • コマンドが認識されると、対応する英語コマンドが表示されます。

10. 課金に関する注意事項

ストリーミング認識をずっとオンにしている間は、その分だけ課金されます。

  • Google Cloud Speech-to-Text は、音声が無音でも音声データ(無音含む)を送信している時間に対して発生します。
  • ウェイクワードを常にクラウド側で検出しようとすると、待機時間も含めて長時間ストリーミングが行われるため、課金がかさみやすくなります。

課金を抑える方法

  1. ローカル(端末側)でウェイクワード検出

    • 軽量のライブラリやモデルを用いて、ウェイクワードだけはローカルで検出。
    • ウェイクワードが検出されたタイミングで初めて Google Cloud Speech-to-Text のストリーミングを開始。
  2. Push to Talk (ボタン操作) を導入

    • 「マイクボタン」を押している間だけストリーミングを行い、話し終わったら終了する方式。
    • 必要な時間だけ課金対象となるため、無駄が少ない。
  3. 一定時間ごとのセッション切断

    • ストリーミングセッションを常時オンにせず、無音状態が続いたらセッションを切断し、再度検出し直すなどのロジックを導入。

11. 無音時に API を停止する方法 (コスト削減例)

長時間稼働させる場合、無音時に API 呼び出しを停止する方法がさらに有効です。無音検出 (VAD: Voice Activity Detection) を導入することで、音声がないときはクラウド側のストリーミング接続を切断し、コストを大幅に抑えられます。

11.1. 無音検出を導入する

  • オープンソースツールの利用
    たとえば Python であれば、pyaudiowebrtcvad を組み合わせて無音を判定できます。
  • 閾値設定
    騒音レベルや環境に応じて、適切な無音判定の閾値を設定します。
    (例:音量レベルが一定以下、または音声のエネルギーが低い場合)

簡単な例

import pyaudio
import webrtcvad

vad = webrtcvad.Vad()
vad.set_mode(3)  # 高感度モード

def is_speech(frame):
    # 16000Hz, 10ms のフレームに対して is_speech を呼び出す
    return vad.is_speech(frame, 16000)

while True:
    frame = get_audio_frame()  # マイクからフレームを取得
    if is_speech(frame):
        start_api_call()
    else:
        stop_api_call()

11.2. 無音検出と Speech-to-Text API の連携

  1. 音声ストリームを常時モニタリング
  2. 無音を検出したら Google Cloud Speech-to-Text API を切断
  3. 音声が戻ったタイミングで API を再接続

このように、無音時の API 使用を削減することで、トータルコストを大幅に軽減できます。

11.3. 注意点

  • 処理の軽量化
    無音検出が高負荷になるとシステム全体の性能に影響を与えるため、軽量な方法を採用するとよいでしょう。
  • 再接続の遅延
    音声が検出された直後に API を再接続する際、わずかなタイムラグが発生する可能性があります。リアルタイム性が厳密に要求される場合は、この遅延を最小化する工夫が必要です。

期待される効果

  • 無音時に切断することで、1 か月 24 時間稼働が必須の場合でも、音声がない時間を省略し、料金を数分の1 に削減可能
  • バッチ処理の利用や Google のエンハンス機能を併用することで、さらなるコスト削減が見込めます。

12. よくある質問(FAQ)

Q1. ローカルでウェイクワードを検出するにはどうすればいいですか?

A1. たとえば Snowboy (開発終了)Porcupine といったライブラリを利用して、比較的軽量にウェイクワードを検出できます。日本語対応のものもあるので、調査してみてください。

Q2. MP3 音声を扱う場合はどうすればいいですか?

A2. RecognitionConfig.AudioEncodingMP3 に変更してください。あるいは音質の劣化を最小限に抑えたいなら、事前に WAV (LINEAR16) に変換してから渡す方法もあります。

Q3. API のレスポンスが遅い場合は?

A3. ネットワーク環境の問題や、Speech-to-Text API 側の応答速度、ローカル環境の処理負荷などが考えられます。interim_results=True を有効にし、こまめに中間結果を受け取りながら使用するとスムーズに感じられることがあります。

Q4. 1 度のストリーミング認識でどれくらいの音声を扱えますか?

A4. 公式ドキュメント によれば、ストリーミング セッションには最大時間が設定されています。常時使用する場合は定期的な再接続を検討してください。

Q5. テキストの認識精度を上げるには?

A5. 高品質のマイクを使用したり、環境ノイズを減らしたり、適切なサンプリングレートを指定することに加え、カスタム辞書や Speech Adaptation (旧 Phrase Hints) を活用すると精度を高められます。


まとめ & おわりに

ここまで、Google Cloud Speech-to-Text の導入手順簡単な使用例、さらにウェイクワードを使ったコマンド受付無音検出を活用したコスト削減策などを紹介しました。

  1. GCP のプロジェクトを作成し、Speech-to-Text API を有効化する。
  2. 認証ファイル(JSON)を取得し、環境変数を設定する。
  3. クライアントライブラリ (Pythonなど) を使ってテストする。
  4. ストリーミング認識やウェイクワード検出など、ユースケースに合わせて実装を拡張する。
  5. 料金体系やクォータに注意しつつ、本番運用に備える。
  6. 無音検出を導入することで、長時間稼働でも費用を大幅に抑えられる可能性がある。

音声認識は多くのシーンで活用できる技術です。この記事が、みなさんの開発の第一歩としてお役に立てれば幸いです。不明点があれば、公式ドキュメントやコミュニティ、Qiita の関連投稿などを参照してみてください。


以上

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?