google
googleapi
python3
音声認識

[Python3] Google Cloud Speech gRPC APIでストリーミング音声認識の実行

はじめに

Speech APIの使い方(特にgRPCとストリーミング)の情報が少なく、公開されているコードも動かなくなっているものが多かったので、現時点での実装方法を紹介します。

参考文献

実装

マイク入力、およびスピーカー出力を用いた認識を行うコードをgithubに上げておきました。
https://github.com/musyoku/grpc-speech-recognition

動作環境

  • Ubuntu 16.04
  • macOS High Sierra(マイク入力のみ)
  • Python 3

準備

Speech APIの準備

まずGoogle Cloud PlatformでSpeech APIを有効にし、プロジェクトを作成しておきます。

https://console.cloud.google.com/apis/library/speech.googleapis.com/?q=speech

必要な認証情報は「サービスアカウントキー」です。
ブラウザ上でjsonファイルを作成するか、もしくはgcloudを入れてターミナルで作成する必要があります。
ここではgloudを用いた方法を説明します。
Google Cloud SDKをダウンロードし、展開したディレクトリに移動します。
ターミナルで以下のコマンドを実行しインストールします。

./install.sh

ただし、自動的に/usr/local/bin/などにインストールされるわけではなく、現在の場所にbinディレクトリが作成され、その中にバイナリが生成されます。
そのためPATHにbinディレクトリのパスを追加する必要があります。
PATHに追加してから以下のコマンドを実行します。

gcloud init

macOSではリンクが自動で開きますが、Ubuntuではリンクが開かないので表示されているURLを直接ブラウザに入力します。
ブラウザでの操作が完了するとターミナル上で作成済みのプロジェクトを選択する画面が表示されるので選択します。
(Do you want to configure Google Compute Engine ?と聞かれますがnと入力します。)
次にデフォルトのキーを作ります。

gcloud auth application-default login

ライブラリのインストール

以下のコマンドで全て入ります。

pip install google gcloud google-auth google-cloud-speech grpc-google-cloud-speech-v1beta1

実行時にgoogle.protobufが見つからないエラーが出ることがあります。
その場合はprotobufを再インストールすると直ります。

実装時の注意点

APIのインターフェースが変更されているため、昔のコードをコピペすると動かないケースがあります。
チャネルの作成部分は以下のように実装するとうまくいくのですが、

def make_channel(host, port):
    ssl_channel = implementations.ssl_channel_credentials(None, None, None)
    creds = get_credentials().create_scoped(args.speech_scope)
    auth_header = ("authorization", "Bearer " + creds.get_access_token().access_token)
    auth_plugin = implementations.metadata_call_credentials(lambda _, func: func([auth_header], None), name="google_creds")
    composite_channel = implementations.composite_channel_credentials(ssl_channel, auth_plugin)
    return implementations.secure_channel(host, port, composite_channel)

昔のコードでは以下のようにヘッダーのauthorizationキーがAuthorizationになっているものがあります。

auth_header = ("Authorization", "Bearer " + creds.get_access_token().access_token)

このまま動かすとエラーが返ってくるので注意が必要です。

マイク入力の認識

マイク入力の取得はPyAudioを使います。

def pyaudio_callback(in_data, frame_count, time_info, status):
    assert isinstance(in_data, bytes)
    frames.append(in_data)
    return (None, pyaudio.paContinue)

pa = pyaudio.PyAudio()
stream = pa.open(format=pa.get_format_from_width(2),
                channels=1,
                rate=args.sampling_rate,
                input_device_index=args.device_index,
                input=True,
                output=False,
                frames_per_buffer=int(args.sampling_rate * args.frame_seconds),
                stream_callback=pyaudio_callback)
stream.start_stream()

@sayonari氏のコードを参考に、無音区間はリクエストを送信しないようにしています。

サーバーへのリクエストはジェネレータで行います。

def listen_loop(recognize_stream):
    global should_finish_stream
    global recognition_result

    for resp in recognize_stream:
        if resp.error.code != code_pb2.OK:
            raise RuntimeError(resp.error.message)

        for result in resp.results:
            for alt in result.alternatives:
                # 結果をリアルタイム表示する場合はここで行う
                printr(" ".join((alt.transcript, "  ", "stability: ", str(int(result.stability * 100)), "%")))

            if result.is_final:
                recognition_result.is_final = True
                should_finish_stream = True
                return

def request_stream():
    recognition_config = cloud_speech_pb2.RecognitionConfig(
        encoding=args.audio_encoding,
        sample_rate=args.sampling_rate,
        language_code=args.lang_code,
        max_alternatives=1,
    )
    streaming_config = cloud_speech_pb2.StreamingRecognitionConfig(
        config=recognition_config,
        interim_results=True, 
        single_utterance=True
    )

    yield cloud_speech_pb2.StreamingRecognizeRequest(streaming_config=streaming_config)

    while True:
        time.sleep(args.frame_seconds / 4)

        if should_finish_stream:
            return

        if len(frames) > 0:
            yield cloud_speech_pb2.StreamingRecognizeRequest(audio_content=frames.pop(0))

def run_recognition_loop():
    global frames
    global silent_frames
    global is_recording
    global should_finish_stream

    if len(silent_frames) > 4:
        silent_frames = silent_frames[-4:]

    while not is_recording:
        time.sleep(args.frame_seconds // 4)

        if len(frames) > 4:
            for frame_index in range(4):
                data = frames[frame_index]
                rms = audioop.rms(data, 2)
                decibel = 20 * math.log10(rms) if rms > 0 else 0
                if decibel < args.silent_decibel:
                    silent_frames += frames[0:frame_index+1]
                    del frames[0:frame_index + 1]
                    return

            is_recording = True
            frames = silent_frames + frames
            silent_frames = []

    with cloud_speech_pb2.beta_create_Speech_stub(make_channel(args.host, args.ssl_port)) as service:
        try:
            listen_loop(service.StreamingRecognize(request_stream(), args.deadline_seconds))
            # 最終的な結果はここで表示
            printr(" ".join((bold(recognition_result.transcription), "  ", "confidence: ", str(int(recognition_result.confidence * 100)), "%")))
            print()
        except Exception as e:
            print(str(e))

ちなみにマイクのデバイスIDを確認するには以下のようにします。

pa = pyaudio.PyAudio()
devices = []
for device_index in range(pa.get_device_count()):
    metadata = pa.get_device_info_by_index(device_index)
    print(device_index, metadata["name"])

スピーカー出力の認識

スピーカー出力を直接取り出してSpeech APIに投げることもできます。(Ubuntuのみ動作確認)
まずマルチプロセスのキューを作ります。

from multiprocessing import Process, Queue
queue = Queue()

PyAlsaAudioでスピーカー出力を取り出します。

def reading_audio_loop(queue):
    recorder = alsaaudio.PCM(alsaaudio.PCM_CAPTURE, alsaaudio.PCM_NONBLOCK, device="pulse")
    recorder.setchannels(1)
    recorder.setrate(args.sampling_rate)
    recorder.setformat(alsaaudio.PCM_FORMAT_S16_LE)
    recorder.setperiodsize(1024)

    while True:
        length, data = recorder.read()
        if length > 0:
            queue.put(data)

preloading_process = Process(target=reading_audio_loop, args=[queue])
preloading_process.start()

reading_audio_loopは別のプロセスで動いているので内部ではひたすら読み取ったデータをキューに追加しています。
キューからフレームを取り出しサーバーに投げます。

def request_stream():
    global queue
    global recognition_result
    global should_finish_stream

    recognition_config = cloud_speech_pb2.RecognitionConfig(
        encoding=args.audio_encoding,
        sample_rate=args.sampling_rate,
        language_code=args.lang_code,
        max_alternatives=1,
    )
    streaming_config = cloud_speech_pb2.StreamingRecognitionConfig(
        config=recognition_config,
        interim_results=True, 
        single_utterance=True
    )

    yield cloud_speech_pb2.StreamingRecognizeRequest(streaming_config=streaming_config)

    frame_length = int(args.sampling_rate * args.frame_seconds)
    frame = b""

    while True:
        if should_finish_stream:
            return

        try:
            data = queue.get(False)
            frame += data
        except Exception as e:
            if len(frame) > frame_length:
                rms = audioop.rms(frame, 2)
                decibel = 20 * math.log10(rms) if rms > 0 else 0
                if decibel < args.silent_decibel:
                    recognition_result.success = False
                    return
                yield cloud_speech_pb2.StreamingRecognizeRequest(audio_content=frame)
                frame = b""
            time.sleep(args.frame_seconds / 4)

queue.get(False)をすると、キューが空の場合に例外が投げられます。
したがって、リアルタイム認識するには例外が投げられるまで読み続け、空になったタイミングでサーバーに投げます。
プログラムを実行中にPulseAudioを起動し、

pavucontrol

録音タブに表示されている動作中のプログラムの入力ソースを適切に選択すると、Python側でスピーカー出力を取り出せるようになります。

動作例です。