はじめに
タイトルの通りですが、サーバでクライアントの音声を取得しながら、Google Speech APIを呼び出して文字おこしする…というWebアプリです。Kafka/Sparkは本筋ではないが、Kafkaに突っ込んでおくことで他にもリアルタイム/リアクティブにいろいろできるかも、っつーことで、KafkaとSparkも構成に加えた。
(最新版のKafkaのプレビュー機能でZookeeperなしで起動できる機能があるので使ってみたかっただけというのもある)
動機
もともとは、単にGoogle CloudのSpeech-to-Textをストリーム入力源で試してみたかったのだが、せっかくなのでビッグデータを扱うようなユースケースを仮定したWebアプリにしてみようと思い、PlayやAkka、さらにKafkaやSparkを入れてプロトタイプにした。
文字起こしされたテキストをKafkaに突っ込んで、その後はPythonやSparkで自然言語処理して、リアルタイムに結果を返すようなものを想定している。
全体概要
---------- ----------- --------------- -----------
| WebAudio | --> | WebSocket | <------> | Playframework | ------ | Akka |
---------- ----------- --------------- -----------
|
---------------------------------------------------
| | |
--------------- --------------- ------------------------
| File書き出し | | Kafka | | Google Speech-to-Text |
--------------- --------------- -------------------------
|
---------------
| Spark |
---------------
|
---------------
| Zeppelin |
---------------
- WebAudio
- マイクデバイスから取得した音声データを、16Bit/8000HzのPCMデータにするのに利用。今回は電話音声相当で検証したが、良い精度を出すなら16000Hz以上でサンプリングした方がよいとGoogleのドキュメントにはある。
- WebSocket
- サーバーとの双方向通信に利用。音声データをTCPでやりとりするのはどうなんだという気もするが、その辺はHTTP3になると解決するんだろうか。
- Playframework
- WebSocketのサーバ側処理として利用。
- Akka
- Playを使うので必然的にAkkaになる。Speech-to-Textを呼び出したり、Kafkaにパブリッシュしたりを並行で行う。
- Apache Kafka
- 数秒単位でPCMデータをキューイング。
- Google Speech-to-Text
- 音声認識API。月60分までは無料。
- Apache Spark
- Kafkaにため込んだデータを処理する。何の処理するか決めてないけど…
- Apache Zeppelin
- Sparkを動かすノートブックとして利用。
フロント側
メイン側
実際はTypeScriptで書きましたが、重要なところだけ抜粋。
ポイントは、WebAudioでWorkletを使うところと、エンディアンに気をつけるところ。AudioWorkletからのメッセージはport.onmessage
メソッドでハンドリングする。
ぶっちゃけ、Speech-to-Textを呼びたいだけなら、わざわざ生PCMデータでなくてもよい。Chromeなら基本的にWebM/Opusエンコーディングで圧縮するので、それをそのままサーバへ送信しSpeech-to-Textへ送信した方がよいかもしれない。
今回はWebM/Opusをどうやったらデコードできるか、いまいち良さそうなライブラリが見つけられなかったので、生PCMを送信することにした。本当はこんなユースケースはTCP(WebSocket)ではなくUDPにするべきなんだろうけど…。HTTP3が来れば手軽にできるんだろうか…。
/* マイクデバイスの取得 */
async function getAudioStream(){
return await navigator.mediaDevices.getUserMedia({audio:true,video:false});
}
/* 環境がリトルエンディアンか確認する */
function checkLittleEndian(){
const i16arr = new Int16Array(1);
i16arr[0] = 0x0001;
const i8arr = new Int8Array(i16arr.buffer);
return i8arr[0] == 0x01;
}
/* リトルエンディアンに変換する関数 */
function convertLittleEndian(i16arr){
const little = new DataView(new ArrayBuffer(i16arr.buffer.byteLength));
i16arr.forEach( (i16,index) => little.setInt16(index * 2, i16, true);
return little.buffer;
}
//ボタン押下イベントなどで起動するメイン処理
async function main(){
const isLittleEndian = checkLittleEndian(); //環境がリトルエンディアンかどうか
const audioStream = await getAudioStream(); //マイクデバイスを取得
const audioContext = new AudioContext({sampleRate:8000}); //AudioApiの準備
const inputSource = audioContext.createMediaStreamSource(audioStream); //マイク入力を取得
await audioContext.addModule('audioworklet.js'); //AudioWorklet読み込み
const pcmtransNode = new AudioWorkletNode(audioContext,'pcmtrans');
//WebSocketでPCMデータを送信する
const ws = new WebSocket('ws://localhost:9000/record'); //WebSocket準備
ws.onopen = ev => {
pcmtransNode.port.onmessage = msg => {
if (isLittleEndian){
ws.send(msg.data.buffer);
}else{
ws.send(convertLittleEndian(msg.data));
}
};
}
}
AudioWorklet側
オーディオ信号を16Bit符号付整数に変換するプロセッサを作る。
以下はWorkletとして別モジュールになる。AudioWorkletProcessor
を継承し、process
メソッドを実装する必要がある。
process
メソッドのinputs
には、PCMデータ(信号)がFloat
型で-1.0〜1.0の範囲ではいってくる。outputs
はこのプロセッサの出力信号になる。
今回は、Float
型を16bit符号付整数に変換したいだけなので、16bit符号付整数の正数・負数の最大値を掛け合わせるだけ。
親スレッド側へはport.postMessage
メソッドでデータをやりとりできるので、変換した配列を送信している。
processメソッドは、最後にtrueを返すようにしなければならない点に注意が必要。
class PCMTransformer extends AudioWorkletProcessor{
constructor(context){
super(context,'pcmtrans');
}
process(inputs,outputs,parameters){
const inp = inputs[0][0]; //モノラルチャンネル
const out = outputs[0][0]; //モノラルチャンネル
const i16arr = new Int16Array(new ArrayBuffer(inp.length * 2));
for (let i=0; i < inp.length; i++){
out[i] = inp[i]; //入力信号は出力へスルーアウト
//float浮動小数を16Bit符号付整数に変換
i16arr[i] = inp[i] < 0 ? inp[i] * 0x8000 : inp[i] * 0x7FFF;
}
this.port.postMessage(i16arr); //メッセージ送信
return true; //true返さないとダメ
}
}
//登録する
registerProcessor('pcmtrans',PCMTransformer);
サーバ側
趣味なのでScalaで書くが、GoogleのSDKはJava用、Node.js用、Python用があるよう。(他言語でもgRPC経由でAPI実行は可能と思われる)。ScalaからはJava用のSDKを利用する。
build.sbt
sbt new playframework/play-scala-seed.g8
コマンドでPlayアプリの雛形を作ったあと、build.sbt
にKafkaとGoogleのライブラリ依存関係追加。PlayにはAkkaが付属しているので、Akkaは明示的には依存に加えていない。
lazy val root = (project in file(".")).enablePlugins(PlayScala)
scalaVersion := "2.13.6"
libraryDependencies += guice
libraryDependencies += "org.scalatestplus.play" %% "scalatestplus-play" % "5.0.0" % Test
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.8.0"
libraryDependencies += "com.google.cloud" % "google-cloud-speech" % "1.30.2"
Playframework
WebSocketで受けたデータをアクターに渡すだけ。WebSocketはAkkaStreamのActorFlow
を利用するようだが、AkkaStreamは勉強不足であまり理解が進んでいないので、効果的な書き方になってないかも。
class SpeechController
@Inject() (val cc:ControllerComponents)(implicit system:ActorSystem,mat:Materializer)
extends BaseController {
def record():WebSocket = WebSocket.accept[Array[Byte],Array[Byte]] { request =>
ActorFlow.actorRef { out => AudioProcActor.props(out) }
}
}
Akka
Akkaは大きく以下の記述スタイルがある。Akkaの勉強もかねてそれぞれの書き方で書いてみる。
- クラシックスタイル(型安全ではない)
- 型付ファンクショナルスタイル(型安全、関数スタイル)
- 型付オブジェクト指向スタイル(型安全、クラススタイル)
WebSocket処理のアクター
WebSocketからのデータをまず処理するActorはクラシックスタイルで記述したが、akka.actor.typed.sacaladsl.adapter
パッケージをインポートすることで、ActorContextにspawn
メソッドが生えたりし、型付スタイルのアクターとの相互に混在することもできるようだ。
import akka.actor.{Actor,ActorRef,Props}
import akka.actor.typed.scaladsl.adapter._
import java.util.UUID
object AudioProcActor {
//このActorが受け付けるメッセージを表すクラス。
case class SpeechText(text:String)
//クラシックスタイルのActor生成用メソッド
def props(out:ActorRef):Props = Props(new AudioProcActor(out))
}
class AudioProcActor(val out:ActorRef) extends Actor {
//クラシックスタイルのアクターから型付スタイルでも子アクターを生成できる。
private val s2tActor = context.spawn(SpeechToTextActor(context.self.toTyped),
"speechtotext")
private val kafkaActor = context.spawn(AudioDataKafkaActor(UUID.randomUUID),
"kafkaactor")
override def receive:Receive = {
//WebSocketからの受信データ
case data:Array[Byte] =>
s2tActor ! SpeechToTextActor.Message(data) //Speech-to-Textへ送信
kafkaActor ! KafkaActor.Message(data) //Kafkaへ送信
//Speech-to-Textからの受信データ
case text:SpeechText =>
out ! text.text.getBytes() //クライアントへ送信
}
}
Speech-to-Textを呼び出すアクター
一方、こちらは型付ファンクショナルスタイルで記述したActor。メッセージを受けたら、GoogleSpeechAPIを呼び出す処理を行う。クラシックスタイルと異なり、型安全になるが状態などは持ちにくい。
object SpeechToTextActor {
case class Message(data:Array[Byte])
def apply(sender:TypedActorRef[AudioProcActor.SpeechText]):Behavior[Message] =
Behaviors.setup[Message]( ctx => {
val s2t = new GoogleSpeechToText(text => sender ! AudioProcActor.SpeechText(text))
Behaviors
.receiveMessage[Message]{
case Message(data) =>
s2t.send(data)
Behaviors.same
}
.receiveSignal{
case (context,PostStop) =>
s2t.close()
Behaviors.same
}
})
}
Google Cloud Speech API
Google Cloud Speech APIを呼ぶところの処理。Googleのサンプルプログラムを真似て作成。
ポイントは
- Oververパターンで認識結果を受け取る
-
setModel
で認識モデルを設定する。 - 最初の1発目のリクエストで設定情報を送信する
import com.google.cloud.speech.v1._
import com.google.api.gax.rpc._
class GoogleSpeechToText(val callback:String => Unit) extends AutoCloseable{
private val speechClient = SpeechClient.create()
private val observer = {
new ResponseObserver[StreamingRecognizeResponse] {
override def onStart(controller: StreamController):Unit = {}
override def onResponse(response: StreamingRecognizeResponse):Unit = {
val text = response
.getResultsList
.get(0)
.getAlternativesList
.get(0)
.getTranscript
callback(text) // コールバック発火する
}
override def onError(t:Throwable):Unit = println(t)
override def onComplete():Unit = {}
}
private val clientStream = speechClient
.streamingRecognizeCallable()
.splitCall(observer)
{
val config = RecognitionConfig
.newBuilder()
.setEncoding(RecognitionConfig.AudioEncoding.LINEAR16) //オーディオ形式
.setLanguageCode("ja-JP") //日本語
.setSampleRateHertz(8000) //ビットレート
.setAudioChannelCount(1) //チャンネル数
.setModel("phone_call") //"default","video","command_and_search"
.build()
val streamingConfig = StreamingRecognitionConfig
.newBuilder()
.setConfig(config)
.build()
val request = StreamingRecognizeRequest
.newBuilder()
.setStreamingConfig(streamingConfig)
.build()
clientStream.send(request) //1発目に設定情報を送信する
}
//APIへの送信処理
def send(audiData:Array[Byte]):Unit = {
val req = StreamingRecognizeRequest
.newBuilder()
.setAudioContent(ByteString.copyFrom(audioData))
.build()
clientStream.send(req)
}
//使い終わったストリームやクライアントを閉じる。
override def close():Unit = {
clientStream.closeSend()
speechClient.close()
}
}
Apache Kafka
最新版のKafkaはZookeeperなしで起動できる機能が、プレビュー版ではあるが備わった。本番利用にはまだ早いと思うが、Zookeeper周りのややこい設定が不要なので、ずいぶん気軽に起動できるようになった。
Kafkaの起動
# ランダムなUUIDが生成される
% $KAFKA_HOME/bin/kafka-storage.sh random-uuid
# ストレージのフォーマット
% $KAFKA_HOME/bin/kafka-storage.sh format -t [上記のUUID] -c $KAFKA_HOME/config/kraft/server.properties
# Kafkaの起動(Zookeeper不要!)
% $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/kraft/server.properties
KafkaのProducerクライアントはスレッドセーフなので、使いまわしてOKのはず。なのでシングルトンオブジェクトのメソッドとして送信処理を記述。
Kafkaクライアントの利用
import org.apache.kafka.client.producer._
import java.util.{Properties,UUID}
object AudioDataKafkaProducer{
val topicName = "audiodata"
val producer = {
val config = new Properties()
config.setProperty(ProducerConfig.BOOTSTRAP_SERVER_CONFIG,
"localhost:9092") //Kafkaのクラスタのリストをカンマ区切りで。今回は1台で検証
config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.UUIDSerializer")
config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer")
new KafkaProducer[UUID,Array[Byte]](config)
}
def send(key:UUID,value:Array[Byte]):Unit =
val message = new ProducerRecord[UUID,Array[Byte]](topicName,key,value)
producer.send(message,(metadata:RecordMetadata,err:Exception) => {
if (metadata != null && err == null) {
//送信成功
}else{
//失敗
println(err.getMesaage)
}
})
def close():Unit =
producer.close()
}
Kafkaへ送信するアクター
型付オブジェクト指向スタイルで記述してみる。オブジェクト指向スタイルでは AbstractBehavior
を継承したクラスを作成する。比較的クラシックスタイルに近い書き方ができる。
object AudioDataKafkaActor{
case class Message(data:Array[Byte])
def apply(key:UUID):Behavior[Message] =
Behaviors.setup(ctx => new AudioDataKafkaActor(ctx,key))
}
import AudioDataKafkaActor.Message
class AudioDataKafkaActor(context:ActorContext[Message],key:UUID)
extends AbstractBehavior(context) {
val buffer = ByteBuffer.allocate(8000 * 2 * 5) //5秒溜まったらKafkaに送信
override def onMessage(message:Message):Behavior[Message] =
if (buffer.position() + message.data.length > buffer.capacity()) {
AudioDataKafkaProducer.send(key, convertBigEndian())
buffer.clear()
}
buffer.put(message.data)
Behavior.same
override def onSignal:PartialFunction[Signal,Behavior[Message]] = {
case PostStop =>
AudioDataKafkaProducer.send(key, convertBigEndian())
Behavior.same
}
private def covertBigEndian():Array[Byte] =
//溜め込んだバッファをビッグエンディアン(JVM用)に変換する
val bigEnd = ByteBuffer.allocate(buffer.position)
buffer.flip()
bigEnd
.asShortBuffer
.put(buffer.order(ByteOrder.LITTLE_ENDIAN).asShortBuffer)
.array
}
Apache SparkとApache Zeppelin
Kafkaのトピックに登録されるデータをSparkで処理する。
ZeppelinのSparkインタプリタ設定で、以下の依存関係を設定しておく。例では波形表示しかしていないが、SparkStreaming
でKafkaをソースとするストリーム処理もできる模様。
org.apache.spark:spark-sql-kafka-0-10_2.12:jar:3.0.1
val df = spark.
.read
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("subscribe","audiodata")
.load()
//ちょっと以下はしこたまイケてないので再考要
val waves = df
.select("value")
.as[Array[Byte]]
.flatMap(_.toSeq.zipWithIndex)
z.show(waves)
最後に
やってみて、
- Akkaの3つの記述スタイルについて理解が進んだ。というか、アクターモデルの理解が少しだけ進んだ気がする。
- PlayFramework便利。
- KafkaがZookeeperなしで起動するのは便利。
- (記事には記載していないが)WAV形式のファイルフォーマットを理解した。
- バイナリデータを扱う場合はエンディアンに気をつけないといけない。(←当たり前なのだけれど、戒め)
- Speech-to-Textは話者1人なら結構、文字起こししてくれる。