5
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

RustAdvent Calendar 2023

Day 2

RustでもWebTransportがしたい! 2023年のミニまとめ付き

Posted at

2023年のWebTransport状況

みなさん、こんにちは。
どうやら2023年もすっかり終わってしまうようで時間の経つのは早いものです。

さて、2017年ごろから話が出て2021年の11月のCrome97でGAされたWebTransportですが、2年間でどう変わったのか軽くまとめておきたいと思います。

そもそもWebTransportって何? という方はこちらの記事をご覧ください

  • 2020年末 : QuickTransportが触れるようになり、WebTransportの紹介記事が話題に。
  • 2021年 : WebTrasnportベータ版
  • 2021年11月 : Chrome97にてWebTransportがGAされる。
  • 2022年初夏 : FacebookやTwitchがメディア配信向けのプロトタイプを公開(Warpなど)
  • 2023年4月 : WebkitがWebTransport対応する意思を表明
  • 2023年6月 : Firefox114にてWebTransport対応

ブラウザ対応状況

ブラウザ 状況
Chrome 97でリリース
Firefox 114にて対応 (WebCodecsはまだですか?)
Safari 2023年4月に対応するよ、という意思表明はありました。
Edge 確認してないです

さて本題 : RustでWebTransportしたいですか?

単に動かすだけなら Pythonのaioquicでよく、1Gbps越えないくらいしか通信しないよってことなら、quic-goという選択肢があります。(※quic-goは輻輳制御がRenoであり、BBRは実装中、なので去年試した時は1Gbpsで頭打ち)
で、まあWebTrasnportするためにはベースとなるQuicライブラリが必須で色々な言語で出てきてはいるのですが、特にRustは Mozillaのnqo、cloudflareのquiche、OSSのquinn、AWSのs2n-quic と出てきています。
そこで今回はquinnをベースとした、WTransportwebtransport-rsを試し、実際にプロトタイプを作る際に考慮すべきことをまとめました。

webtransport-rsのサンプルを読む

webtransport-rsのサンプルは以下にあり、Rustっぽさがあって好きなのですが残念ながらdatagramが未対応です。
一応、Chrome更新サンプルの接続直後のdatagramの送信部分を削除したところ動くことは確認できました。

(サンプルコードはこちら。シンプルで読みやすい)

(実際にブラウザから接続するには証明書のハッシュを指定する必要がありますがそれについては後述)

WTransportのサンプルを読む

webtransport-rsはtwitchの中の人が開発されており、参考にはなるのですがメンテ状況を見ていると先発のWTransportのほうが良さそうに見えるので今回は主にこちらを使ってみます。

こちらはWebTransportサーバーに加えてHTTPサーバーも起動しており、起動時に動的に生成したローカルホスト向けのオレオレ証明書のハッシュをJavaScript側に注入しています。

main.rs
async fn main() -> Result<()> {
    ...
    tokio::select! {
        result = http_server.serve() => {
            error!("HTTP server: {:?}", result);
        }
        result = webtransport_server.serve() => {
            error!("WebTransport server: {:?}", result);
        }
    }
    ...
}

それでは要点ごとに見ていきましょう。

ポイント1 : 自分で生成した証明書が使いたい!

サンプルでは動的にlocalhost用の証明書を生成して使っていますが、サーバーに乗っけるなら自分で生成した証明書を使いたいところです。
また、下記でも触れますがJS側に渡すハッシュ値を計算するメソッドがクレートに対してプライベートなので呼べません。 (ハブリックメソッドに修正されていました)

main.rs
async fn main() -> Result<()> {
    ...
    // 証明書を起動時に作成している
    let certificate = Certificate::self_signed(["localhost", "127.0.0.1", "::1"]);

    // JavaScriptで許可に使うハッシュ (ちゃんとした証明書であればもちろん不要)
    let cert_digest = certificate.hashes().pop().unwrap();

}

証明書の生成方法

本番サーバーなどで使うなら普通にLet's Encryptなどで発行すれば良いですが、ローカルやドメインを向けられないサーバーなどはいわゆるオレオレ証明書を使います。

生成方法はwebtransport-rsの方に便利なスクリプトがありますので使わせてもらいましょう。

ただしJavaScriptのserverCertificateHashesにハッシュを指定するこの方法は、証明書の有効期限を2週間以内に設定する必要があります。
https://developer.mozilla.org/en-US/docs/Web/API/WebTransport/WebTransport#servercertificatehashes

cert/generate
#/bin/bash
set -euo pipefail
cd "$(dirname "${BASH_SOURCE[0]}")"

# Generate a self-signed certificate for localhost.
# This is only valid for 10 days so we can use serverCertificateHashes to avoid a CA (bugged).
# https://developer.mozilla.org/en-US/docs/Web/API/WebTransport/WebTransport#servercertificatehashes
openssl ecparam -genkey -name prime256v1 -out localhost.key
openssl req -x509 -sha256 -nodes -days 10 -key localhost.key -out localhost.crt -config localhost.conf -extensions 'v3_req'

# Generate a hex-encoded (easy to parse) SHA-256 hash of the certificate.
openssl x509 -in localhost.crt -outform der | openssl dgst -sha256 -binary | xxd -p -c 256 > localhost.hex

これで証明書(crt + key)とハッシュが生成できました。
これを使うには次のようにサーバーのconfigに渡すだけです。ついでにパスをコマンドライン引数で渡せるようにしておきましょう。

Cargo.toml
# コマンドライン引数処理用
clap = { version = "4", features = ["derive"] }
main.rs
// 証明書のパスをコマンドラインから受け取る用
#[derive(Debug, Parser)]
struct Args {
    #[arg(long)]
    pub tls_cert: String,

    #[arg(long)]
    pub tls_key: String,

    #[arg(long, default_value = "4433")]
    pub port: u16,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {

    // コマンドライン引数から証明書のパスを取得して、、
    let args = Args::parse();

    // 証明書を読み込んで、、 (楽チン)
    let certificate = Certificate::load(args.tls_cert, args.tls_key).await?;

    // configに入れるだけ!
    let config = ServerConfig::builder()
        .with_bind_default(port)
        .with_certificate(certificate)
        .keep_alive_interval(Some(Duration::from_secs(3)))
        .build();

    let endpoint = Endpoint::server(config)?;
    ...
}

自分でファイルから読み込んで rustlsで処理しなくて良いのでとっても楽です。

ポイント2 : オレオレ証明書のハッシュはJS側に持たせる

以前はChrome起動時にオプションとしてオレオレ証明書のハッシュを指定していたのですが、それがJS側のWebTransportインスタンス生成時のオプションで指定できるようになりました。

以前はこう

# 証明書を作って
openssl x509 -pubkey -noout -in ../cert/local.pem |
openssl dgst -sha256 -binary | base64
openssl x509 -in ../cert/local.pem -outform der | openssl dgst -sha256

# ハッシュを計算して
 openssl x509 -pubkey -noout -in ../cert/local.pem |
  openssl rsa -pubin -outform der |
  openssl dgst -sha256 -binary | base64

# canary (chromeでも良い) を起動する
/Applications/Google\ Chrome\ Canary.app/Contents/MacOS/Google\ Chrome\ Canary \
    --origin-to-force-quic-on=localhost:4433 \
    --ignore-certificate-errors-spki-list=qdyV0zFJu3a7lciew4U54kkNix6ldRpnV2mCwbGlGPY=

これでも良いのですが、「内部ネットワークのサーバーとかでも使いたいよね」というニーズがありChrome100とかそれくらいからJSのオプションで指定できるようになりました。
WTransportのサンプルではそれをHTTPサーバー立てて動的にやっていますが、普通にサービスを作るならHTMLは別のところにホスティングするでしょう。

ハッシュの生成手順は少し前に記載したwebtrasnport-rsのスクリプトにの最後にありますが、このコマンドで生成できます。

# Generate a hex-encoded (easy to parse) SHA-256 hash of the certificate.
openssl x509 -in localhost.crt -outform der | openssl dgst -sha256 -binary | xxd -p -c 256 > localhost.hex

JSで指定するところはこのようになります。
有効期限は2週間以内でなくてはならないなど、色々と制限があります。
詳しくはMDNを参照ください。

client.js
function main() {
    ...

    // 上記のlocalhost.hexを直接コピペするか、importするなどして適宜読み込みます。
    const fingerprintHex = await (await fetch('./localhost.hex')).text();

    const options = { serverCertificateHashes: [ { algorithm: "sha-256", value: hex2ab(fingerprintHex) } ] };
    conn = new WebTransport(url, options);
    ...
}

// 16真数表記からArrayBufferもしくはUint8Arrayに変換します
function hex2ab(hex) {
    return new Uint8Array(hex.match(/.{1,2}/g).map(v => parseInt(v, 16)));
}

ポイント3 : 通信の基本部分

まず前提として送受信で二つのパターンが考えられます。

  1. コネクションを待ち受ける
  2. 送信する : ストリームを作成しデータを送る。
  3. 受信する : ストリームを待ち受け、ストリームからデータを読み取る

コネクションを待ち受ける

コネクションが来たら、tokio::spawnでコネクションごとの開始します。

main.rs
pub async fn serve(self) -> Result<()> {
    info!("Server running on port {}", self.local_port());

    for id in 0.. {
        let incoming_session = self.endpoint.accept().await;

        tokio::spawn(
            Self::handle_incoming_session(incoming_session)
                .instrument(info_span!("Connection", id)),
        );
    }

    Ok(())
}

送信する

送信に関してはストリームを作成して送信するだけなので簡単です。
注意点としては、write_all() ではなく write() に大きなデータ(10KB以上)を突っ込むと全部送ってくれません。(RustのIO系はそうなっている)

main.rs
fn send_setting() -> Result<()> {
    ...
    let stream = connection.open_uni().await?;
    let mut stream = stream.await?;
    stream.write_all(&data).await?;
    stream.finish().await?;
    ...
}

受信する

受信については、まずストリームを待ち受け、データを全て読み取ります。

main.rs]
fn receive_data() -> Result<()> {
    ...
                loop {
                    tokio::select! {
                        stream = connection.accept_uni() => {
                            let mut stream = stream?;
                            info!("Accepted UNI stream");

                            // データを読み取る
                            let mut buffer = vec![0; 65536].into_boxed_slice();
                            let bytes_read = match stream.read(&mut buffer).await? {
                                Some(bytes_read) => bytes_read,
                                None => continue,
                            };
                            let str_data = std::str::from_utf8(&buffer[..bytes_read])?;

                            info!("Received (uni) '{str_data}' from client");
                        }
                        ... // datagramの時
                    }
                }
    ...
}

サンプルでは短いテキストデータを想定していますが、実際には長いバイナリデータを扱うと思います。
それについては次で書きます。

ポイント4 : バイナリデータを非同期にやりとりしたい!

単なるテキストエコーであればサンプルにありますし、JSONデータを簡単にやりとりしたいというようなニーズにはWebSocketを使えば事足りるわけで、WebTransportを使いたいという場合はバイナリデータを扱いたいことが多いのではないでしょうか。
今回はバイナリデータを扱うこととし、クライアントから片方向ストリームで設定を受け取った後、サーバーから定期的に毎フレームごとに動画データを送り、クライアントから定期的に設定を更新する、というユースケースを考えてみます。

送受信の非同期タスクを作る

コネクションを受け取った後にアプリケーションロジックを載せていきます。
受信ループと送信ループを非同期で非同期で作ります。

main.rs
async fn handle_session(session: IncomingSession) -> anyhow::Result<()> {
    // 最初にクライアントから設定を受け取る
    let mut recv_stream = connection.accept_uni().await?;
    Self::receive_setting(&mut recv_stream, &response).await.unwrap();

    // 1本のストリームでブラウザから設定を受信する
    tokio::spawn(async move {
            info!("waiting for receive_setting uni stream...");
            loop {
                    match Self::receive_setting(&mut recv__stream, &data).await {
                        Ok(_) => (),
                        Err(e) => {
                            error!("failed to receive setting: {}", e);
                            continue
                        }
                    };
            }
        });

         // フォルダからGLB一覧を取得し、33msごとに順番に送信する
        let mut interval = tokio::time::interval(Duration::from_millis(33));
        loop {
        for i in 1..sending_data.length {
            tokio::select! {
                // 接続が切れたら終了
                _ = connection_cloned.closed() => {
                    info!("connection closed");
                    break;
                },
                _ = interval.tick() => {
                    info!("interval tick {}", i);
                    tokio::spawn(async move {
                        match Self::send_movie(&connection, i, &setting).await {
                            Ok(_) => (),
                            Err(e) => {
                                error!("failed to send glb: {}", e);
                            }
                        };
                    });
                },
            }
        }
}

バイナリデータを全部読み込む

さて、元のサンプルは短いテキストデータですが、長いバイナリは自分でバッファに追加しないといけません。

client.js
        // サーバーから開始されるストリームを受信する
        while (true) {
            const { value, done } = await streamReader.read();
            if (done) {
                console.log("server closed connection.");
                return;
            }
            const stream = value;
            const reader = stream.getReader();

            // ストリームごとにデータを受信する (非同期処理)
            (async (reader) => {
                // 扱うバイナリデータの型はUint8Arrayで統一
                let data = new Uint8Array();
                while (true) {
                    try {
                        const { value, done } = await reader.read();
                        console.log(`read data... ${done} length: ${value?.length}`)
                        if (done) {
                            console.log(`read all data... length: ${glb.length}`);
                            SetGlb(glb.buffer);
                        }
                        if (!value) {
                            console.log("no data");
                            return;
                        }
                        // 読み込んだデータをglbに追加
                        let buffer = new Uint8Array(data.length + value.length);
                        buffer.set(new Uint8Array(data), 0);
                        buffer.set(new Uint8Array(value), data.length);
                        data = buffer;
                    } catch (e) {
                        console.error(`failed to read data stream: ${e.toString()}`);
                        return;
                    }
                }
            })(reader);
        }

また、WebSocketのように送ったごとにデータが受信できるわけではなく、"{obj1:0}" "{obj2: 1}" とデータを2回に分けて送っても、受信側では "{obj1:0}{obj2:1}" という一塊のデータとして受信してしまいます。
1400byte以下なら一つのパケットで送れるので(保証はないですが)プロトタイプを作るなら一回で読み取れるだろうと仮定しても良いかもしれません。

JavaScript -> WTransportのバイナリ

動画データや画像のように送ったものをまるっと使うなら問題はないですが、
設定データのように複数のデータをまとめて送る場合はエンディアンとレイアウトを決めておく必要があります。

今回は4バイト float32 x 4 を設定データとしてクライアントからサーバーに送っています。エンディアンはビックエンディアン

client.js
async sendSetting(float1, float2, float3, float4) {
        console.log(`send setting: ${float1}, ${float2}, ${float3}, ${data4}`);

        if (!this.conn) return;
        if (!this.sendStream) {
            this.sendStream = await this.conn.createUnidirectionalStream();
            this.sendWriter = this.sendStream.getWriter();
        }
        const writer = this.sendWriter;
    
        const settingArray = new Float32Array([float1, float2, float3, float4]);
        let buffer = new ArrayBuffer(16);
        new DataView(buffer).setFloat32(0, settingArray[0], false /* Big Endian */);
        new DataView(buffer).setFloat32(4, settingArray[1], false /* Big Endian */);
        new DataView(buffer).setFloat32(8, settingArray[2], false /* Big Endian */);
        new DataView(buffer).setFloat32(12, settingArray[3], false /* Big Endian */);
    
        await writer.write(buffer);
    }

バイナリデータを受信するときはこんな感じでエンディアンを指定して送信します。

main.rs
async fn receive_setting(stream: &mut wtransport::RecvStream, perspective_camera: &Arc<RwLock<PerspectiveCamera>>) -> anyhow::Result<()> {

        ...
        
        let mut buffer = vec![0; 65536].into_boxed_slice();
        match stream.read(&mut buffer).await? {
            Some(bytes_read) => anyhow::ensure!(bytes_read == 16, "invalid data length"),
            None => return Err(anyhow::anyhow!("receive_setting : no data read")),
        };

        // 読み取ったデータ4byteをビックエンディアンとしてi32に変換
        let float1 = f32::from_be_bytes(buffer[..4].try_into().unwrap());
        let float2 = f32::from_be_bytes(buffer[4..8].try_into().unwrap());
        let float3 = f32::from_be_bytes(buffer[8..12].try_into().unwrap());
        let float4 = f32::from_be_bytes(buffer[12..16].try_into().unwrap());

        ...
}

おまけ : Rustの非同期ひとくちメモ

非同期処理は色々あるのでそのうちちゃんとまとめたい。

  • aync{} で共有するデータは Arc を使って cloneしてmoveする (hoge_clonedだらけになる方法に対して、ちゃんとしたやり方があったような気がする)
  • 複数のasync{}で書き込みたいデータは Arc> で囲って参照を渡す。読み書きするときはロックして使う。RwLockでも良い。
  • ロックはできるだけ短くすべし、
      - (self.position.lock()? としたまま async{} や self.other_long_method() とか呼んでいるといるとロックしたまま跨ぐことになるのでMutexGuradのライフタイムがスレッドセーフではないと怒られたりする)
  • 所有権やSend+Syncトレイト満たしてないよエラーに困ったらメッセージパッシングの方が適切かも
5
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?