2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

WebAudioで音声をWebSocketで送りAkka経由でGoogleSpeechAPIとKafkaを呼ぶ

Posted at

はじめに

タイトルの通りですが、サーバでクライアントの音声を取得しながら、Google Speech APIを呼び出して文字おこしする…というWebアプリです。Kafka/Sparkは本筋ではないが、Kafkaに突っ込んでおくことで他にもリアルタイム/リアクティブにいろいろできるかも、っつーことで、KafkaとSparkも構成に加えた。
(最新版のKafkaのプレビュー機能でZookeeperなしで起動できる機能があるので使ってみたかっただけというのもある)

動機

もともとは、単にGoogle CloudのSpeech-to-Textをストリーム入力源で試してみたかったのだが、せっかくなのでビッグデータを扱うようなユースケースを仮定したWebアプリにしてみようと思い、PlayやAkka、さらにKafkaやSparkを入れてプロトタイプにした。

文字起こしされたテキストをKafkaに突っ込んで、その後はPythonやSparkで自然言語処理して、リアルタイムに結果を返すようなものを想定している。

全体概要

  ----------       -----------             ---------------          -----------
 | WebAudio | --> | WebSocket |  <------> | Playframework | ------ | Akka      |
  ----------       -----------             ---------------          -----------
                                                                       |
                     ---------------------------------------------------
                    |                    |                             |
            ---------------        ---------------       ------------------------
           | File書き出し  |      | Kafka         |     | Google Speech-to-Text  |
            ---------------       ---------------       -------------------------
                                         |
                                   ---------------
                                  | Spark         |
                                   ---------------
                                         |
                                   ---------------
                                  | Zeppelin      |
                                   ---------------
  • WebAudio
    • マイクデバイスから取得した音声データを、16Bit/8000HzのPCMデータにするのに利用。今回は電話音声相当で検証したが、良い精度を出すなら16000Hz以上でサンプリングした方がよいとGoogleのドキュメントにはある。
  • WebSocket
    • サーバーとの双方向通信に利用。音声データをTCPでやりとりするのはどうなんだという気もするが、その辺はHTTP3になると解決するんだろうか。
  • Playframework
    • WebSocketのサーバ側処理として利用。
  • Akka
    • Playを使うので必然的にAkkaになる。Speech-to-Textを呼び出したり、Kafkaにパブリッシュしたりを並行で行う。
  • Apache Kafka
    • 数秒単位でPCMデータをキューイング。
  • Google Speech-to-Text
    • 音声認識API。月60分までは無料。
  • Apache Spark
    • Kafkaにため込んだデータを処理する。何の処理するか決めてないけど…
  • Apache Zeppelin
    • Sparkを動かすノートブックとして利用。

フロント側

メイン側

実際はTypeScriptで書きましたが、重要なところだけ抜粋。
ポイントは、WebAudioでWorkletを使うところと、エンディアンに気をつけるところ。AudioWorkletからのメッセージはport.onmessageメソッドでハンドリングする。

ぶっちゃけ、Speech-to-Textを呼びたいだけなら、わざわざ生PCMデータでなくてもよい。Chromeなら基本的にWebM/Opusエンコーディングで圧縮するので、それをそのままサーバへ送信しSpeech-to-Textへ送信した方がよいかもしれない。

今回はWebM/Opusをどうやったらデコードできるか、いまいち良さそうなライブラリが見つけられなかったので、生PCMを送信することにした。本当はこんなユースケースはTCP(WebSocket)ではなくUDPにするべきなんだろうけど…。HTTP3が来れば手軽にできるんだろうか…。

main.js
/* マイクデバイスの取得 */
async function getAudioStream(){
  return await navigator.mediaDevices.getUserMedia({audio:true,video:false});
}
/* 環境がリトルエンディアンか確認する */
function checkLittleEndian(){
  const i16arr = new Int16Array(1);
  i16arr[0] = 0x0001;
  const i8arr = new Int8Array(i16arr.buffer);
  return i8arr[0] == 0x01;
}
/* リトルエンディアンに変換する関数 */
function convertLittleEndian(i16arr){
  const little = new DataView(new ArrayBuffer(i16arr.buffer.byteLength));
  i16arr.forEach( (i16,index) => little.setInt16(index * 2, i16, true);
  return little.buffer;
}
//ボタン押下イベントなどで起動するメイン処理
async function main(){
  const isLittleEndian = checkLittleEndian(); //環境がリトルエンディアンかどうか
  const audioStream = await getAudioStream(); //マイクデバイスを取得
  const audioContext = new AudioContext({sampleRate:8000}); //AudioApiの準備
  const inputSource = audioContext.createMediaStreamSource(audioStream); //マイク入力を取得

  await audioContext.addModule('audioworklet.js'); //AudioWorklet読み込み
  const pcmtransNode = new AudioWorkletNode(audioContext,'pcmtrans');

  //WebSocketでPCMデータを送信する
  const ws = new WebSocket('ws://localhost:9000/record'); //WebSocket準備
  ws.onopen = ev => { 
    pcmtransNode.port.onmessage = msg => {
      if (isLittleEndian){
        ws.send(msg.data.buffer);
      }else{
        ws.send(convertLittleEndian(msg.data));
     }
    };
  }
}

AudioWorklet側

オーディオ信号を16Bit符号付整数に変換するプロセッサを作る。
以下はWorkletとして別モジュールになる。AudioWorkletProcessorを継承し、processメソッドを実装する必要がある。

processメソッドのinputsには、PCMデータ(信号)がFloat型で-1.0〜1.0の範囲ではいってくる。outputsはこのプロセッサの出力信号になる。
今回は、Float型を16bit符号付整数に変換したいだけなので、16bit符号付整数の正数・負数の最大値を掛け合わせるだけ。
親スレッド側へはport.postMessageメソッドでデータをやりとりできるので、変換した配列を送信している。

processメソッドは、最後にtrueを返すようにしなければならない点に注意が必要。

audioworklet.js
class PCMTransformer extends AudioWorkletProcessor{
  constructor(context){
    super(context,'pcmtrans');
  }
  process(inputs,outputs,parameters){
    const inp = inputs[0][0]; //モノラルチャンネル
    const out = outputs[0][0]; //モノラルチャンネル
    const i16arr = new Int16Array(new ArrayBuffer(inp.length * 2));

    for (let i=0; i < inp.length; i++){
      out[i] = inp[i]; //入力信号は出力へスルーアウト

      //float浮動小数を16Bit符号付整数に変換
      i16arr[i] = inp[i] < 0 ? inp[i] * 0x8000 : inp[i] * 0x7FFF;
    }
    this.port.postMessage(i16arr); //メッセージ送信
    return true; //true返さないとダメ
  }
}

//登録する
registerProcessor('pcmtrans',PCMTransformer);

サーバ側

趣味なのでScalaで書くが、GoogleのSDKはJava用、Node.js用、Python用があるよう。(他言語でもgRPC経由でAPI実行は可能と思われる)。ScalaからはJava用のSDKを利用する。

build.sbt

sbt new playframework/play-scala-seed.g8 コマンドでPlayアプリの雛形を作ったあと、build.sbtにKafkaとGoogleのライブラリ依存関係追加。PlayにはAkkaが付属しているので、Akkaは明示的には依存に加えていない。

build.sbt
lazy val root = (project in file(".")).enablePlugins(PlayScala)

scalaVersion := "2.13.6"

libraryDependencies += guice
libraryDependencies += "org.scalatestplus.play" %% "scalatestplus-play" % "5.0.0" % Test
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.8.0"
libraryDependencies += "com.google.cloud" % "google-cloud-speech" % "1.30.2"

Playframework

WebSocketで受けたデータをアクターに渡すだけ。WebSocketはAkkaStreamのActorFlowを利用するようだが、AkkaStreamは勉強不足であまり理解が進んでいないので、効果的な書き方になってないかも。

SpeechController.scala

class SpeechController
  @Inject() (val cc:ControllerComponents)(implicit system:ActorSystem,mat:Materializer)
  extends BaseController {

  def record():WebSocket = WebSocket.accept[Array[Byte],Array[Byte]] { request =>
    ActorFlow.actorRef { out => AudioProcActor.props(out) }
  }
}

Akka

Akkaは大きく以下の記述スタイルがある。Akkaの勉強もかねてそれぞれの書き方で書いてみる。

  • クラシックスタイル(型安全ではない)
  • 型付ファンクショナルスタイル(型安全、関数スタイル)
  • 型付オブジェクト指向スタイル(型安全、クラススタイル)

WebSocket処理のアクター

WebSocketからのデータをまず処理するActorはクラシックスタイルで記述したが、akka.actor.typed.sacaladsl.adapterパッケージをインポートすることで、ActorContextにspawnメソッドが生えたりし、型付スタイルのアクターとの相互に混在することもできるようだ。

AudioProcActor.scala
import akka.actor.{Actor,ActorRef,Props}
import akka.actor.typed.scaladsl.adapter._
import java.util.UUID

object AudioProcActor {
  //このActorが受け付けるメッセージを表すクラス。
  case class SpeechText(text:String)

  //クラシックスタイルのActor生成用メソッド
  def props(out:ActorRef):Props = Props(new AudioProcActor(out))
}
class AudioProcActor(val out:ActorRef) extends Actor {
  //クラシックスタイルのアクターから型付スタイルでも子アクターを生成できる。
  private val s2tActor = context.spawn(SpeechToTextActor(context.self.toTyped),
  "speechtotext")
  private val kafkaActor = context.spawn(AudioDataKafkaActor(UUID.randomUUID),
  "kafkaactor")

  override def receive:Receive = {
    //WebSocketからの受信データ
    case data:Array[Byte] =>
      s2tActor ! SpeechToTextActor.Message(data) //Speech-to-Textへ送信
      kafkaActor ! KafkaActor.Message(data) //Kafkaへ送信

    //Speech-to-Textからの受信データ
    case text:SpeechText =>
      out ! text.text.getBytes() //クライアントへ送信
  }
}

Speech-to-Textを呼び出すアクター

一方、こちらは型付ファンクショナルスタイルで記述したActor。メッセージを受けたら、GoogleSpeechAPIを呼び出す処理を行う。クラシックスタイルと異なり、型安全になるが状態などは持ちにくい。

SpeechToTextActor.scala
object SpeechToTextActor {
  case class Message(data:Array[Byte])
  def apply(sender:TypedActorRef[AudioProcActor.SpeechText]):Behavior[Message] =
    Behaviors.setup[Message]( ctx => {
      val s2t = new GoogleSpeechToText(text => sender ! AudioProcActor.SpeechText(text))
      Behaviors
        .receiveMessage[Message]{
          case Message(data) =>
            s2t.send(data)
            Behaviors.same
        }
        .receiveSignal{
          case (context,PostStop) =>
            s2t.close()
            Behaviors.same
        }
    })
}

Google Cloud Speech API

Google Cloud Speech APIを呼ぶところの処理。Googleのサンプルプログラムを真似て作成。
ポイントは

  • Oververパターンで認識結果を受け取る
  • setModelで認識モデルを設定する。
  • 最初の1発目のリクエストで設定情報を送信する
GoogleSpeechToText.scala
import com.google.cloud.speech.v1._
import com.google.api.gax.rpc._

class GoogleSpeechToText(val callback:String => Unit) extends AutoCloseable{
  private val speechClient = SpeechClient.create()
  private val observer = {
    new ResponseObserver[StreamingRecognizeResponse] {
      override def onStart(controller: StreamController):Unit = {}
      override def onResponse(response: StreamingRecognizeResponse):Unit = {
        val text = response
          .getResultsList
          .get(0)
          .getAlternativesList
          .get(0)
          .getTranscript
        callback(text) // コールバック発火する
      }
      override def onError(t:Throwable):Unit = println(t)
      override def onComplete():Unit = {}
  }
  private val clientStream = speechClient
    .streamingRecognizeCallable()
    .splitCall(observer)
  
  {
    val config = RecognitionConfig
      .newBuilder()
      .setEncoding(RecognitionConfig.AudioEncoding.LINEAR16) //オーディオ形式
      .setLanguageCode("ja-JP") //日本語
      .setSampleRateHertz(8000) //ビットレート
      .setAudioChannelCount(1) //チャンネル数
      .setModel("phone_call") //"default","video","command_and_search"
      .build()
    val streamingConfig = StreamingRecognitionConfig
      .newBuilder()
      .setConfig(config)
      .build()
    val request = StreamingRecognizeRequest
      .newBuilder()
      .setStreamingConfig(streamingConfig)
      .build()

    clientStream.send(request) //1発目に設定情報を送信する
  }

  //APIへの送信処理
  def send(audiData:Array[Byte]):Unit = {
    val req = StreamingRecognizeRequest
      .newBuilder()
      .setAudioContent(ByteString.copyFrom(audioData))
      .build()
    clientStream.send(req)
  }
  //使い終わったストリームやクライアントを閉じる。
  override def close():Unit = {
    clientStream.closeSend()
    speechClient.close()
  }
}

Apache Kafka

最新版のKafkaはZookeeperなしで起動できる機能が、プレビュー版ではあるが備わった。本番利用にはまだ早いと思うが、Zookeeper周りのややこい設定が不要なので、ずいぶん気軽に起動できるようになった。

Kafkaの起動

# ランダムなUUIDが生成される
% $KAFKA_HOME/bin/kafka-storage.sh random-uuid

# ストレージのフォーマット
% $KAFKA_HOME/bin/kafka-storage.sh format -t [上記のUUID] -c $KAFKA_HOME/config/kraft/server.properties

# Kafkaの起動(Zookeeper不要!)
% $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/kraft/server.properties

KafkaのProducerクライアントはスレッドセーフなので、使いまわしてOKのはず。なのでシングルトンオブジェクトのメソッドとして送信処理を記述。

Kafkaクライアントの利用

AudioDataKafkaProducer.scala
import org.apache.kafka.client.producer._
import java.util.{Properties,UUID}

object AudioDataKafkaProducer{
  val topicName = "audiodata"
  val producer = {
    val config = new Properties()
    config.setProperty(ProducerConfig.BOOTSTRAP_SERVER_CONFIG,
      "localhost:9092") //Kafkaのクラスタのリストをカンマ区切りで。今回は1台で検証

    config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.UUIDSerializer")

    config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.ByteArraySerializer")

    new KafkaProducer[UUID,Array[Byte]](config)
  }

  def send(key:UUID,value:Array[Byte]):Unit =
    val message = new ProducerRecord[UUID,Array[Byte]](topicName,key,value)

    producer.send(message,(metadata:RecordMetadata,err:Exception) => {
      if (metadata != null && err == null) {
        //送信成功
      }else{
        //失敗
        println(err.getMesaage)
      }
    })

  def close():Unit =
    producer.close()
}

Kafkaへ送信するアクター

型付オブジェクト指向スタイルで記述してみる。オブジェクト指向スタイルでは AbstractBehavior を継承したクラスを作成する。比較的クラシックスタイルに近い書き方ができる。

AudioDataKafkaActor.scala
object AudioDataKafkaActor{
  case class Message(data:Array[Byte])

  def apply(key:UUID):Behavior[Message] =
    Behaviors.setup(ctx => new AudioDataKafkaActor(ctx,key))
}

import AudioDataKafkaActor.Message
class AudioDataKafkaActor(context:ActorContext[Message],key:UUID)
  extends AbstractBehavior(context) {
  
  val buffer = ByteBuffer.allocate(8000 * 2 * 5) //5秒溜まったらKafkaに送信
  
  override def onMessage(message:Message):Behavior[Message] =
    if (buffer.position() + message.data.length > buffer.capacity()) {
      AudioDataKafkaProducer.send(key, convertBigEndian())
      buffer.clear()
    }
    buffer.put(message.data)
    Behavior.same
  
  override def onSignal:PartialFunction[Signal,Behavior[Message]] = {
    case PostStop =>
      AudioDataKafkaProducer.send(key, convertBigEndian())
      Behavior.same
  }

  private def covertBigEndian():Array[Byte] =
    //溜め込んだバッファをビッグエンディアン(JVM用)に変換する
    val bigEnd = ByteBuffer.allocate(buffer.position)
    buffer.flip()
    bigEnd
      .asShortBuffer
      .put(buffer.order(ByteOrder.LITTLE_ENDIAN).asShortBuffer)
      .array
}

Apache SparkとApache Zeppelin

Kafkaのトピックに登録されるデータをSparkで処理する。
ZeppelinのSparkインタプリタ設定で、以下の依存関係を設定しておく。例では波形表示しかしていないが、SparkStreamingでKafkaをソースとするストリーム処理もできる模様。

org.apache.spark:spark-sql-kafka-0-10_2.12:jar:3.0.1
Zeppelin/Spark
val df = spark.
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers","localhost:9092")
  .option("subscribe","audiodata")
  .load()

//ちょっと以下はしこたまイケてないので再考要
val waves = df
  .select("value")
  .as[Array[Byte]]
  .flatMap(_.toSeq.zipWithIndex)

z.show(waves)

こんな形でZeppelinで波形を見れる。
zeppelin.png

最後に

やってみて、

  • Akkaの3つの記述スタイルについて理解が進んだ。というか、アクターモデルの理解が少しだけ進んだ気がする。
  • PlayFramework便利。
  • KafkaがZookeeperなしで起動するのは便利。
  • (記事には記載していないが)WAV形式のファイルフォーマットを理解した。
  • バイナリデータを扱う場合はエンディアンに気をつけないといけない。(←当たり前なのだけれど、戒め)
  • Speech-to-Textは話者1人なら結構、文字起こししてくれる。
2
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?