WebTransportのビデオエコーデモの公開がしたい!
画面上で52ミリ秒か。
早い、確かに早い。
ブラウザからエンコードしたchunkデータにタイムスタンプを埋め込み、サーバーにエコーさせてchunkデータが戻ってくるまでわずか41ms
これなら確かによりリアルタイムな何かができるだろう。
しかしこれを実用化するには別の問題があった。安定性だ。
781, 772, 763 ...
694, 682, 678 ...
黒い画面に映し出される数字は数秒毎にじりじりと減っていく。
121, 110, 103, 99
メモリ残量を表す vmstat
の free
の値はとうとう残り100MBを切ってしまったことを示していた。
92, 106, 102, 98, 112, 108 ...
お? ちゃんとGCが仕事をしてる?? メモリを食い尽くさずに安定稼働できるのか??
そう一抹の期待を抱きながら、ふと視線をずらすと、そこには固まって動きのないウェブカメラの画像が表示されていた。
ああ、やはり10分は持たないか。。
ブラウザでエンコードしたウェブカメラの動画・音声データをサーバーに送信し、Pythonのaioquic
で送り返しているだけなのに、どうしてメモリリークしてしまうのだろうか。。
ああ、Python以外の言語とライブラリでWebTransportがしたい!
というわけで安定稼働するようにしたビデオエコーのデモがこちらです。(とりあえず3/31まで)
しょーもない前置きはこれくらいにして本題です!
ビデオエコーを試せるデモを立ててみました。
使い方
- Chrome M97以降でここにアクセスし、画面右の
connect
をクリック - ビデオとマイクのアクセス許可を求められるので許可します。
- 画面上部でカメラの解像度やコーデックを選択できます。
- datagramが選択されていることを確認します。(バッファリングなどは行なっていません)
- 解像度はカメラによって対応していないものがあります。
- 動画のコーデックは
vp8
vp9
h264
が選べます。(AV1はWebCodecs未実装だったと思います) - 音声のコーデックは
opus
のみです。(WebCodecsは対応ずみなはずですが、デスクリプションの記載方法がよくわかっていません)
- ▶︎ボタンを押すとビデオエコーが始まります。
- ハウリングにご注意ください。
- 設定を変更するときは一度リロードしてください。
- 画面下部にレイテンシーやフレーム情報が出力されます。
動作環境
- サーバー
- AWS t4g.small メモリ2GB
- Go1.17
ソースコード
プログラムはこちらです。
Pythonのqioquic以外でWebTransportをするには?
さて、サーバーが安定しないのはどうもaioquic
の使い方がよくないようでした。
と言ってもライブラリをざっとみても、あるいはコネクションをdel
してみても改善されないようだったので他のライブラリを探すことにします。
WebTrasnportに対応しようとしているライブラリなどは quic-goやneqo などあるのですが、
どうもマージされる気配がなかったり、かと言って自分でその辺りを実装するにはまだまだ不明点も多くあり、なんか良い方法ないかとgithubを漁っていました。
QuicライブラリでWebTransport通信を実装しているライブラリを発見
そこで見つけたのが quic-go
を使いWebTransportを実装している二つのリポジトリです。
- webtransport-go : 前述のquic-goの中々マージされないプルリクに投稿されたもの。マージされないならマージしなくて良いものを作ってしまえいうことのようです。
- yomo-presense-background : こちらのソースコードに懇切丁寧に解説が入っているのでかなり参考になります。
上記はいずれもquic-go
を使ってブラウザで動くサンプルもあるので、これを参考にすればWebTransportのサーバーサイドの開発の選択肢が広がるはずです。
プログラムのお話
ここからは具体的にソースコードを追っていきたいと思います。
処理の要点を理解するためにわざわざベタっと書くように切り出しています。
WebCodecs周りもそうですが、今後きれいにモジュール化していきたいところです。
WebTransportはQuicプロトコルを利用するので、基本的にはQuicライブラリを使えば実装はできそうです。
ただし、HTTP/3でのハンドシェイクを必要とするため、この辺りがどう実装して良いかイメージしづらいところとなっていました。
ともあれ、上記のリポジトリから要点をまとめると次のようになります。
- Quicサーバーを起動する
- TLSが必須なため、証明書を読み込ませる
- 接続を待ち受ける
- 端方向の
CONTROL
ストリームを作成し、SETTINGS
フレームを送信する。(WebTransportとh3_datagramに対応していることを通知する) - 端方向の
CONTROL
ストリームを受信し、SETTINGS
フレームを確認する。 - 双方向ストリームを受信し、HTTP/3リクエストを処理する
- HTTP/3ヘッダーを取得し、
:protocol
を確認したり:path
をもとに処理したいハンドラを設定する。 - HTTP/3ヘッダーをエンコードし、200レスポンスを返す
- HTTP/3ヘッダーを取得し、
- ハンドラの処理を開始する。
- データグラムの場合 : データグラムを受信し、直ちにデータを送信する。
- ストリームの場合 : 端方向ストリームを受信し、データを受信したら同じデータを送信する。
- 端方向の
ライブラリや定数など
まずはQuicなどの依存ライブラリを読み込み、次にHTTP/3
やWebTransport
で使う定数を定義します。
気になるのはHTTP/3
のヘッダー圧縮に関わるQPACK
くらいでしょうか。
ということは他の言語でもQuic
とQPACK
に対応したライブラリがあれば実装できそうです。
package main
// 必要なライブラリ(全て)
import (
"bufio"
"bytes"
"context"
"crypto/tls"
"errors"
"io"
"io/ioutil"
"log"
"net/http"
"net/url"
"strconv"
"strings"
"github.com/lucas-clemente/quic-go"
"github.com/lucas-clemente/quic-go/quicvarint"
"github.com/marten-seemann/qpack"
)
// 必要な定数(ライブラリに定義済みのものもあります)
const STREAM_TYPE_CONTROL = 0x00
const STREAM_TYPE_WEBTRANSPORT_UNI = 0x54
const FRAME_TYPE_HEADER = 0x01
const FRAME_TYPE_SETTINGS = 0x04
const H3_DATAGRAM = 0xffd277
const ENABLE_WEBTRANSPORT = 0x2b603742
main関数
通常はコマンドラインパラメータで証明書のファイル名やポート番号を指定すると思いますが、今回は特に何も処理を入れていません。
go func() { wt.Serve("0.0.0.0:4433", certs)}()
とさくっと非同期処理ができてしまうのがGoの良いとこだと思います。
func main() {
cert, err := tls.LoadX509KeyPair("cert.pem", "cert.key")
if err != nil {
log.Println(err)
return
}
certs := []tls.Certificate{cert}
wt := NewWebTransportServer()
wtErr := make(chan error)
addr := "0.0.0.0:4433"
go func() {
// WebTransport Server. (UDP)
wtErr <- wt.Serve(addr, certs)
}()
select {
case err := <-wtErr:
log.Println(err)
return
}
}
Quicサーバーを起動して待ち受ける
Quicサーバーを起動してコネクションを待ち受けます。
TCPやUDPのサーバーと大きく異なるところはなさそうです。
for {}
でコネクションを待ち受け、接続があれば go wt.handleSession(sess)
で処理を開始します。
func (wt *WebTransportServer) Serve(addr string, certs []tls.Certificate) error {
tlsConfig := &tls.Config{
Certificates: certs,
NextProtos: []string{"h3"},
}
quicConfig := &quic.Config{
EnableDatagrams: true,
}
listener, err := quic.ListenAddr(addr, tlsConfig, quicConfig)
if err != nil {
return err
}
log.Printf("WebTransport server listening on %s", listener.Addr().String())
for {
sess, err := listener.Accept(context.Background())
if err != nil {
log.Println(err)
continue
}
log.Printf("+Session: %s", sess.RemoteAddr().String())
go wt.handleSession(sess)
}
}
接続されたらハンドシェイクする
Quicレベルの処理はライブラリがわでよしなにやってくれるので、早速HTTP/3
のハンドシェイクを行います。
まず、こちらからCONTROL
ストリームを作成してSETTINGS
フレームを送信し、WebTransportやデータグラムに対応していることを示します。
次に、クライアントからのCONTROL
ストリームを受信し、SETTINGS
フレームを確認します。
これが済むとブラウザではnew WebTransport()
が通ります。
次にHTTP/3
のリクエストストリーム(双方向)を受け付け、HTTPヘッダー
を確認して200
を返します。
ここまでいけるとブラウザが wt.ready()
のプロミスを解決してくれて、晴れてWebTransport通信を開始することができます。
func (wt *WebTransportServer) handleSession(sess quic.Session) {
// 1. send setting frame. ENABLE_WEBTRANSPORT
wt.sendSettingFrame(sess)
// 2. recv setting frame.
wt.receiveSettingFrame(sess)
// 3. HTTP/3のリクエスト処理
conn, err := wt.receiveClientConnect(sess)
if err != nil {
log.Println("HTTP CONNECT failed", err)
return
}
if conn != nil {
log.Println("conn run")
conn.run()
}
log.Println("Prepared! Start to work...")
}
SETTINGSフレームを送る
ここからは実際にQuicデータをバイナリとして書き込んでいきます。
ストリームやフレームといった概念が紛らわしいですが、データ構造をきちんと把握しておきたいところです。
まずは端方向ストリームを作成し、最初にストリーム種別がCONTROL
であることを設定します。
次にフレームがSETTINGS
であることを設定します。
長さパラメータを設定した後、実際にパラメータと値を入力します。
func (wt *WebTransportServer) sendSettingFrame(sess quic.Session) {
log.Printf("[1] Send SETTINGS frame")
respStream, err := sess.OpenUniStream()
if err != nil {
log.Println(err)
return
}
buf := &bytes.Buffer{}
quicvarint.Write(buf, STREAM_TYPE_CONTROL)
// 7. HTTP Framing Layer
// HTTP/3 Frame Format {
// Type (i),
// Length (i),
// Frame Payload (..),
// }
quicvarint.Write(buf, FRAME_TYPE_SETTINGS)
var l uint64
l += uint64(quicvarint.Len(ENABLE_WEBTRANSPORT) + quicvarint.Len(1))
quicvarint.Write(buf, l)
// Write value
// Setting {
// Identifier (i),
// Value (i),
// }
// SETTINGS Frame {
// Type (i) = 0x04,
// Length (i),
// Setting (..) ...,
// }
//
quicvarint.Write(buf, H3_DATAGRAM)
quicvarint.Write(buf, 1)
quicvarint.Write(buf, ENABLE_WEBTRANSPORT)
quicvarint.Write(buf, 1)
bbb := buf.Bytes()
log.Printf("\t>>>bbb:[len=%d] %# x", len(bbb), bbb)
n, err := respStream.Write(bbb)
if err != nil {
log.Println(err)
return
}
log.Printf("\t>>>wrote n:%d", n)
log.Printf("\tSettings frame sent !")
}
SETTINGSフレームを受け取る
次にブラウザからの設定を受け取ります。
汎用的なサーバーを開発するのであればきちんと設定値を確認しないといけませんが、今回は検証目的なので特にチェックはしていません。
func (wt *WebTransportServer) receiveSettingFrame(sess quic.Session) {
recvSettingStream, _ := sess.AcceptUniStream(sess.Context())
log.Printf("[2] receive client SETTINGS frame")
sqr := quicvarint.NewReader(recvSettingStream)
// control stream type
sty, err := quicvarint.Read(sqr)
if err != nil {
log.Println(err)
return
}
log.Printf("\tStreamType: %# x\r\n", sty)
//
ftype, err := quicvarint.Read(sqr)
if err != nil {
log.Println(err)
return
}
log.Printf("\tFrameType: %# x\r\n", ftype)
// setting length
flen, err := quicvarint.Read(sqr)
if err != nil {
log.Println(err)
return
}
log.Printf("\tLength: %# x(oct=%d)\r\n", flen, flen)
payload := make(map[uint64]uint64)
payloadBuf := make([]byte, flen)
if _, err := io.ReadFull(recvSettingStream, payloadBuf); err != nil {
log.Println(err)
return
}
bb := bytes.NewReader(payloadBuf)
for bb.Len() > 0 {
id, err := quicvarint.Read(bb)
if err != nil {
log.Println(err)
return
}
value, err := quicvarint.Read(bb)
if err != nil {
log.Println(err)
return
}
payload[id] = value
log.Printf("\tidentifier:%# x, value: %d (%# x)\r\n", id, value, value)
}
}
HTTP/3リクエスト処理
さて、通常のWebプログラム同様にHTTP/3
のヘッダーのパスを見てロジックを切り替えることになります。
ただしヘッダーはQPACK
でエンコード・デコードしないといけません。
また、ドラフト段階ではそれを示すためのフラグを設定する必要があります。
ここは双方向ストリームでのやりとりになります。(閉じてはいけません)
func (wt *WebTransportServer) receiveClientConnect(sess quic.Session) (EventHandler, error) {
ctx := context.Background()
reqStream, err := sess.AcceptStream(ctx)
log.Printf("[3] Recieve HTTP CONNECT from client")
if err != nil {
return nil, err
}
log.Printf("\trequest stream accepted: %d", reqStream.StreamID())
qr := quicvarint.NewReader(reqStream)
t, err := quicvarint.Read(qr)
if err != nil {
log.Println(err)
return nil, err
}
log.Printf("\tt: %# x", t)
ll, err := quicvarint.Read(qr)
if err != nil {
log.Println(err)
return nil, err
}
log.Printf("\tll: %# x", ll)
if t != FRAME_TYPE_HEADER {
// header frame
log.Println("\tnot header frame, should force close connection!!!!!")
return nil, errors.New("not header frame")
}
headerBlock := make([]byte, ll)
var n int
if n, err = io.ReadFull(reqStream, headerBlock); err != nil {
return nil, err
}
log.Printf("\tn: %d", n)
decoder := qpack.NewDecoder(nil)
headers, err := decoder.DecodeFull(headerBlock)
if err != nil {
return nil, err
}
// check header.
var path string
for k, v := range headers {
log.Printf("\t[header] %d: %s", k, v)
if v.Name == ":path" {
path = v.Value
}
}
var conn EventHandler
u, err := url.Parse(path)
if err != nil {
log.Println("parse :path failed: ", err)
}
if u.Path == "/audio/echo/stream" {
conn = &StreamEcho{session: sess, ctx: ctx}
}
if u.Path == "/video/echo/stream" {
conn = &StreamEcho{session: sess, ctx: ctx}
}
if u.Path == "/audio/echo/datagram" {
conn = &DatagramEcho{session: sess}
}
if u.Path == "/video/echo/datagram" {
conn = &DatagramEcho{session: sess}
}
// response
{
status := http.StatusOK
header := http.Header{}
header.Add(":status", strconv.Itoa(status))
header.Add("Sec-Webtransport-Http3-Draft", "draft02")
var qpackHeaders bytes.Buffer
encoder := qpack.NewEncoder(&qpackHeaders)
for k, v := range header {
log.Printf("\t[response header] %s: %s", k, v)
for index := range v {
log.Printf("\t[range v] v=%s, index=%d, v[index]=%s", v, index, v[index])
encoder.WriteField(qpack.HeaderField{
Name: strings.ToLower(k),
Value: v[index],
})
}
}
buf := &bytes.Buffer{}
quicvarint.Write(buf, FRAME_TYPE_HEADER)
quicvarint.Write(buf, uint64(qpackHeaders.Len()))
log.Printf("\tresponse 200: %# x %# x", buf.Bytes(), qpackHeaders.Bytes())
writer := bufio.NewWriter(reqStream)
if n, err = writer.Write(buf.Bytes()); err != nil {
return nil, err
}
log.Printf("\tn1=%d", n)
if n, err = writer.Write(qpackHeaders.Bytes()); err != nil {
return nil, err
}
if err = writer.Flush(); err != nil {
return nil, err
}
log.Printf("\tn1=%d", n)
}
return conn, nil
}
エコーする(データグラム)
さて、ここまできてようやくロジックを書くことができます。
先ほど、パスをチェックするところでハンドラを定義し、レスポンス呼び出し時に実行するように。
func (c *DatagramEcho) run() {
go c.read()
}
func (c *DatagramEcho) read() {
for {
msg, err := c.session.ReceiveMessage()
if err != nil {
log.Println(err)
break
}
// echo
{
buf := &bytes.Buffer{}
buf.Write(msg)
err = c.session.SendMessage(buf.Bytes())
if err != nil {
log.Println(err)
}
}
}
}
エコーする(ストリーム)
さて、ストリームはデータグラムより少しややこしいです。
今回は1フレームを1ストリームとしているため、単方向ストリームを受信し、データを受信し終わったらすぐに送信する形にしました。
単方向ストリームは先頭にストリームタイプとセッションIDがあり、その後データが続きます。
ここのセッションIDは、作成したストリームのIDではなく、最初のHTP/3リクエストのストリームID(基本的には0)になります。
0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | 0x54 (i) ... +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Session ID (i) ... +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Stream Body ... +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
ioutil.ReadAll()
で全部読み込まずに、stream.Read(buf)
で読んだデータを即時送信する方法の方が良い気がしたのですが、
読み取り完了時にRead()
がすぐに制御を返してくれない挙動にハマったので、ひとまずioutil.ReadAll()
にしました。
func (c *StreamEcho) read() {
for {
stream, err := c.session.AcceptUniStream(c.ctx)
if err != nil {
log.Println(err)
return
}
log.Println("accept stream.")
// ここを非同期にしないとブロックして処理が進まない
go func(stream quic.ReceiveStream) {
// read heder
qr := quicvarint.NewReader(stream)
sty, err := quicvarint.Read(qr)
if err != nil {
log.Println(err)
return
}
log.Printf("\tStreamType: %# x\r\n", sty)
if sty != STREAM_TYPE_WEBTRANSPORT_UNI {
return
}
sid, err := quicvarint.Read(qr)
if err != nil {
log.Println(err)
return
}
log.Printf("accept stream id %d, session %d.", stream.StreamID(), sid)
// echo by uni stream.
res, err := c.session.OpenUniStream()
if err != nil {
log.Println(err)
return
}
log.Printf("open %d stream.", res.StreamID())
defer res.Close()
h := &bytes.Buffer{}
quicvarint.Write(h, STREAM_TYPE_WEBTRANSPORT_UNI)
quicvarint.Write(h, uint64(sid))
res.Write(h.Bytes())
buf, err := ioutil.ReadAll(stream)
if err != nil {
log.Println(err)
}
res.Write(buf)
}(stream)
}
}
まとめ
何はともあれ、Goで安定してビデオエコーができるようになったのは大きいと思います。
エラー処理や非同期処理まわり、ブラウザ周りの処理などやることはまだまだたくさんありますが、
面白い分野ではあるのでもっと盛り上がってくれば良いなと思います。