チャットなんてWebSocketで簡単にできるでしょ?
WebTransportをチェックしている皆様こんにちは。
特に仕事でもないのにWebTransporttが気になって仕方ない私です。
(しかも git add 直前に文字化けを直そうとしてファイルが消えましたが挫けません!)
さて、前回は簡単なechoをして1対1のやりとりができました。
そこで今回は複数人への送信ということでグループでのテキストチャットルームをやってみます。
まあ、WebTransportでもWebSocketとそんなに変わらずすぐできるでしょ。とか思っていました。
今回作ったもの
![](https://qiita-user-contents.imgix.net/https%3A%2F%2Fqiita-image-store.s3.ap-northeast-1.amazonaws.com%2F0%2F91710%2Fd51c5d58-c5cb-79ef-7bff-efd700f3a343.png?ixlib=rb-4.0.0&auto=format&gif-q=60&q=75&s=50c46ca73b73b51d79fa5f32a722629c)
部屋は一つですが複数人に同時にメッセージ配信します。
画面はライブ配信なんかでありがちなやつで、そのうち動画配信部分も作ります。
機能としては、個別配信と全体配信を作ります。
- 名前を入力してサーバーに接続する
- 入室したら、本人にだけサーバーから「xxさん、こんにちは」と送る
- 同時に、本人以外にサーバーから「xxさんが入室しました」と送る
- 誰かがコメント欄に入力して送信ボタンを押すと、全員にコメントを送る
- 誰かが退室すると(ウインドウを閉じるかリロードすると)、サーバーが「xxさんが退室しました」と送る
ちなみにやり取りするデータはJSONで、入室とコメントをやり取りします。
退室については切断処理をトリガーにしました。
クライアントからサーバーへ
・入室 {"command": "enter", "name": "test1"}
・コメント(名前は送信済みなので送らない) {"command": "comment", "comment": "さん、こんにちは"}
サーバーからクライアントへ
・コメント {"command": "comment", "name": "server", "comment": "さん、こんにちは"}
今回やらないこと
pythonでサーバー処理を書くのが辛いので、参加者の情報はグローバル変数に直接突っ込んでいます
本当は非同期処理周りのpub/subをやらないといけないですが今回はあくまで通信周りの話ということで。
別言語のミドルウェアが対応したらまた書きます。
また、今回はテキストチャットなのでデータグラム送信は扱いません。
全員に送信する、という点では考え方は同じなので、ライブ配信サービスなどの賑やかしコメント機能を開発する際にデータの確実性が不要であればデータグラムを使っても良いのかも知れません。
そもそもWebSocketとどう違うの?
プロトコル的な話は前回したので、今回は使い方の視点からどう異なるのか確認していきます。
結論から言うとテキストチャットやコマンド的なリアルタイム通信は正直WebSocketで良いと思います。
WebSocketは一度にまるっと、WebTransportは少しずつ小分けで
WebSocketとWebTransport、どちらもサーバー・クライアントモデルで
リアルタイム双方向通信のために作られていますが、
一番の違いは パケットを意識するかどうか の違いです
WebSocketはシンプル
WebSocketにはテキストモードとバイナリモードがありますが、いずれも簡単に使えます。
send(data) とすればdataがきちんと送信できますし、
onmessageできちんと送信されたデータが引数にわたってきます。
(まあ、当たり前といえば当たり前です。バイナリでも同じです)
// 接続
let connection = new WebSocket('ws://example.com/');
connection.onopen = () => {
// メッセージの送信(まあ大体JSONをテキストにして送りますかね)
connection.send(JSON.stringify({command: 'enter', name: 'test'}));
};
// メッセージを受け取った場合
connection.onmessage = (message) => {
// JSONからオブジェクトにして処理する
var data = JSON.parse(message.data);
...
}
WebTransportは小分けで送られる
一方、WebTransportの方は接続・ストリーム待ち受け・受信の三つに分かれます。
(エラー処理など一部省きます)
// 接続、これはシンプル
var transport = new WebTransport('https://example.com/chat');
await transport.ready;
// メッセージの送信(JSONを文字列にして送る)
let stream = await transport.createUnidirectionalStream();
let writer = stream.getWriter();
await writer.write(new TextEncoder("utf-8").encode(JSON.stringify( /* 送信したいオブジェクト */ )));
await writer.close();
acceptUnidirectionalStreams(transport); // 下の関数を呼び出す。非同期なのでブロックしない
// ストリームを受け付ける
async function acceptUnidirectionalStreams(transport) {
let reader = transport.incomingUnidirectionalStreams.getReader();
while (true) {
const { value, done } = await reader.read();
if (done) {
return;
}
let stream = value;
readFromIncomingStream(stream);
}
}
// ストリームからデータを受け取る
async function readFromIncomingStream(stream) {
let decoder = new TextDecoderStream('utf-8');
let reader = stream.pipeThrough(decoder).getReader();
let buffer = new Uint8Array();
while (true) {
// データ読み込みが完了するまで繰り返す
const { value, done } = await reader.read();
if (done) {
// ここではvalueは空になる
//
// jsonでデータをやり取りする
let data = JSON.parse(buffer);
return;
}
buffer += value;
}
}
細かく見ていきましょう。
接続についてはシンプルです。
WebSocketよりスマートです。
(イベントが良いかプロミスが良いかはmozaic.fm ep72でも話題に出ていましたね)
// 接続、これはシンプル
var transport = new WebTransport('https://example.com/chat');
await transport.ready;
メッセージ送信はwriterを取得してcloseしている分、行数が増えていますが、
utf-8のバイト列を渡しているだけなのでWebSocketとあまり変わりません。
TextEncoder
が出来たおかげで文字列周りの処理もだいぶ良くなるのではないでしょうか。
let stream = await transport.createUnidirectionalStream();
let writer = stream.getWriter();
await writer.write(new TextEncoder("utf-8").encode(JSON.stringify( /* 送信したいオブジェクト */ )));
await writer.close();
次はストリームを受け付けるところですが、node.jsでサーバー書くときと似た感じです。
ストリームからの読み込み関数がasync
なのでwhile(true)
してもブロックすることはありません。
async function acceptUnidirectionalStreams(transport) {
let reader = transport.incomingUnidirectionalStreams.getReader();
while (true) {
const { value, done } = await reader.read(); // 新しい接続がくるまでブロックする
if (done) {
return;
}
let stream = value;
readFromIncomingStream(stream); // 新しいストリームからデータを読み込む。ここは非同期なのですぐに次のストリームを待ち受ける
}
}
最後にストリームから読み込む処理です。
ここがWebSocketと大きく違います。
WebSocketは一度にデータが読み込めますが、WebTransportでは少しずつデータが送られてきます。
そのため、ストリームが閉じられるまで読み続ける必要があります。
async function readFromIncomingStream(stream) {
let decoder = new TextDecoderStream('utf-8'); // 受け取ったデータをutf-8にするやつ
let reader = stream.pipeThrough(decoder).getReader(); // reader
let buffer = new Uint8Array(); // バッファ。ストリームが終了するまでこれにデータを貯めていく。
while (true) {
// データ読み込みが完了するまで繰り返す
const { value, done } = await reader.read();
if (done) {
// データが読み終わった (fin bitがセットされたQUICのストリームフレームを受信した)
// ここではvalueは空になる
//
// jsonでデータをやり取りするのでオブジェクトにしておく
let data = JSON.parse(buffer);
... // コメント欄に表示させる
return;
}
buffer += value; // バッファに追加する
}
}
この方法以外にも、こちらでバッファを用意してそこにデータを書き込ませるBYOB readerといったやり方もあるようです。
今回は文字列を扱っていますが、本来の用途であるバイナリストリームを扱うならこういう方法が良いのかもしれませんね。(特にwasm側でバッファ確保してるときとかに使えそう?)
なぜデータが小分けなの?
さて、なぜいちいちこんな面倒なことをしないといけないんでしょうか?
それはネットワークがパケット交換方式だからであり、よりリアルタイムで処理したいというニーズに応えるためでしょう。
どれくらい大きなデータが分割されるの?
ではどれくらいのサイズで分割されるのでしょうか?
イーサネットや一般的なインターネット海戦では、MTUが1500byte未満であることが多いため、そのくらいのサイズに分割されるものと思われます。
果たして本当にそうなのでしょうか?
実際に、今回作成したチャットのコメント欄にひらがな600文字くらいを入力して送信してみました。(ひらがなはutf8では1文字3バイト)
サーバー側のイベントログでは1221バイトで区切られていることがわかります。
StreamDataReceived(data=b'{"command":"comment","comment":"\xe3\x81\x82 ..(略).. \x81\x88\xe3', end_stream=False, stream_id=4)
recv data len = 1221
... いろいろなログ...
StreamDataReceived(data=b'\x81\x88\xe3 ..(略).. \xe3\x81\x8b"}', end_stream=False, stream_id=4)
recv data len = 613
クライアント側はどうでしょうか?
試してみたところ、600文字では一度の読み込みでデータを取得できてしまいました。
そこで文字列を倍にしてみます。するとこちらも途中で分割されて受信しました。
サーバー側が送信ログを画面に表示してから数秒くらい経ってからブラウザが読み込むので
おそらくQUIC層でバッファリングされているのかもしれません。
Received data on stream : {"command": "comment", "name": "test", "comment": "ああああ...(略
Received data on stream : うううう... (略) ...カカかかか"}
WebSocketにはストリームがないけど、WebTransportにはある
WebSocketは複数の通信を並行して処理するにはあまり向いていません。
send()でデータ送信中は他のデータは送信できませんし、
じゃあコネクションをたくさん貼れば良いかというと、HTTP(もしくはHTTP/2)なので負荷が高くなってしまいます。
そこで、データの信頼性などのTCPの良いとこどりをしつつ複数通信を並行するためにWebTransportではストリームという単位で通信します。
データを送る際にストリームIDを生成して送信し、少しずつ送信して、データが送り終わったらそれを相手に知らせます。
データの整合性はストリームごとに検証されるので、ストリーム0が破損しても、ストリーム1は影響なくどんどんデータをやり取りすることができます。
ストリームとは?
ストリームとはデータを順次送る方式です。
WebSocketではデータ送信時にデータサイズは大体決まっているので、「1000byte送信します」のようにサイズを指定できますが、
ストリームはいつ終わるかわからないデータを小分けにして送信します。
そのため、送信し終わったらそれを通知するフレームを送信して、相手にストリーム送信が終わったことを教えます。
今回はテキストチャットなので、一度に送信できることが多いですが、長いテキストを送信する場合、
JSONのバイト列が途中で区切られ、2,3回に分かれて送受信されます。
ストリームは使い捨て
最初、コネクションとごっちゃにしてしまっていたのですが、ストリームは基本使い捨てです。
今回の場合では、一つのコメントを送信したらストリームは終了します。使いまわしません。
RFC9000のストリームのステータス回りをみても、使い回せるような繊維はありません。
使ったら終了する、もしくはリセットしたらもう使えないようです。
しかも、ストリームIDを再利用するなとあります。
A QUIC endpoint MUST NOT reuse a stream ID within a connection.
QUIC端末は接続中ストリームIDを再利用すべからず
使い捨てにして大丈夫?
なんかもったいないような気もしますね。
IDが枯渇してしまったりはしないでしょうか?
確認してみましょう。
ストリームIDは64bitですが、下位2桁は方向を示すのに使うため、使えるのは62bit分になります。
ストリームIDはコネクション毎に独立して管理されるので、1コネクションあたりの通信数が2^62回を超えなければ良さそうです。
仮に、1秒間に65,336(2^16)回、QUICでやり取りするとします。(まあそんなシステムはないでしょうけど)
そうすると、2^62[回] / 2^16[回/秒] = 2^(62 - 16) = 2^46秒 = 2229898.57年
ということなので、Dr.Stoneみたいなことになってもサーバーの再起動する必要はなさそうです。
WebSocketとWebTransportの違いまとめ
さて、ここまで見てWebTransportはWebSocketより低レイヤーで面倒な部分があることがわかりました。
用途が違うから当然ですが、主観でまとめるとこんな感じでしょうか?
_ | WebSocket | WebTransport |
---|---|---|
レベル | 高レベル | 低レベル |
バッファリング | 内部 | ある程度内部 or 自前 |
信頼性 | 高い | 選べる |
並行性 | 普通 | 高い |
リアルタイム性 | 低い | 高い |
用途 | 小さいテキスト バイナリデータ |
大きいマルチメディア |
サーバー側のお話
さて、それではサーバー周りの処理をもう少し詳しくみていきましょう。
初回のechoサーバーの記事ではあまりよくわかっていなかったので軽く流してしまいましたが、
デバッグしているうちに構造が掴めてくる気がします。
まあ、QUIC, HTTP/3, WebTransportと階層があるのでどうしても複雑にはなるんでしょうね。
要点だけ抜き出しつつソースコードを見ていきたいと思います。
データの持ち方
チャットルームには参加者の情報が必要になります。
名前はもちろん、相手に送信するためのコネクションとセッションIDが必要です。
セッションIDとはストリームIDのことです。(WebTransport接続を確立したCONNECTストリームのIDのことをそう呼ぶようです)
この参加者情報はQUICコネクションを束ねる必要があるので、グローバルかmainに持たせるなどしないとコネクションをまたげません。
# グローバル変数に突っ込んでおきます
# members = {'connection_id' : ['name', http, session_id]
members = defaultdict(list)
# こんな感じのデータが入ります。
defaultdict(<class 'list'>,
{b'\x07\xd4\x13\xb9K\x9f%\xdd': ['1111',
<__main__.H3ConnectionWithDatagram object at 0x10ec845e0>,
0],
b'Hi\x05+\xf88\xeb\x85': ['22',
<__main__.H3ConnectionWithDatagram object at 0x10ec21580>,
0]})
入室とコメント送信処理
WebTransportのストリームデータを受信し、javascript同様にストリームが終了したらJSONをパースします。
ただし、バッファはストリームID毎に分けて保存しておく必要があります。
class ChatHandler:
def __init__(...):
...
self._comments = defaultdict(bytes) # ストリームIDをキーにしてバッファする
def h3_event_received(self, event: H3Event) -> None:
if isinstance(event, WebTransportStreamDataReceived):
self._comments[event.stream_id] += event.data
if event.stream_ended:
data = json.loads(self._comments[event.stream_id].decode('utf-8'))
# 入室
if data['command'] == 'enter':
comment = data['name'] + 'さん、こんにちは'
payload = {'command': 'comment', 'name': 'server', 'comment': comment}
payload = json.dumps(payload, ensure_ascii=False).encode('utf-8')
# 返信用のストリームを作る。予めルールを決めておけば双方向のほうが使いやすいかも。
stream_id = self._http.create_webtransport_stream(
self._session_id, is_unidirectional=True)
self._http._quic.send_stream_data(
stream_id, payload, end_stream=True)
# 本人以外に通知
broadcast('server', data['name'] + 'さんがログインしました')
# メンバー情報に追加する
members[self._http._quic.host_cid] = [data['name'], self._http, self._session_id]
elif data['command'] == 'comment':
broadcast(members[self._http._quic.host_cid][0], data['comment'])
# バッファをクリアする
self.stream_closed(event.stream_id)
みんなにコメントを送信する
はい、今回のメインの部分です。
必要なのはそれぞれのQUICコネクションです。
QUICコネクション毎に class WebTransportProtocol(QuicConnectionProtocol)
インスタンスが生成されるので
きちんとレイヤーを理解して、接続情報を管理すれば大丈夫です。
# 上記の入室のところで必要な情報を揃える
class ChatHandler:
...
# メンバー情報に追加する
members[self._http._quic.host_cid] = [data['name'], self._http, self._session_id]
# みんなにコメントを送信する
def broadcast(name, comment):
# メッセージを作る
payload = json.dumps({'name': name, 'comment': comment}, ensure_ascii=False)
pprint(members)
print('send data ' + payload)
payload = payload.encode('utf-8')
# 一人ずつ送信する
for member in members.values():
# member[1] は H3Connection
# ストリームを作成して、、、
stream_id = member[1].create_webtransport_stream(
member[2], is_unidirectional=True)
# 送信する
member[1]._quic.send_stream_data(
stream_id, payload, end_stream=True)
退室処理をする
今回、一番苦労したのはここです。
クライアントがページを閉じたり、リロードしたときにログアウト処理をする必要があります。
ただし、WebTransportをcloseしてもQUICコネクションは再利用のためしばらく維持されます。
そこで、ドラフトによるとCLOSE_WEBTRANSPORT_SESSION
というデータがあるようです。
実際にブラウザのタブを閉じた時のログを確認する
ブラウザにonunload
を入れてclose()した時のサーバーログを見てみます。
HTTP/3のデータが飛んできていることが確認できます。
DataReceived(data=b'hC\x04\x00\x00\x00\x00', stream_id=0, stream_ended=True, push_id=None)
ドラフトを見るとこんなHTTP/3データが来ると書いてあります。
CLOSE_WEBTRANSPORT_SESSION Capsule {
Type (i) = CLOSE_WEBTRANSPORT_SESSION,
Length (i),
Application Error Code (32),
Application Error Message (..8192),
}
上記のデータを当てはめるとこうなります。
CLOSE_WEBTRANSPORT_SESSION Capsule {
Type (i) => hC = 0x6843
Length (i), => 0x04
Application Error Code (32), => 0x00000000
Application Error Message (..8192), = なし
}
正常なcloseなのでエラ〜コードは0のはずですし、近い気がします。
で、定数は CLOSE_WEBTRANSPORT_SESSION (0x2843)
と定義されています。
あれ? 0x2843
ですね??
しかし飛んでくるデータは 0x6843
なんです。。
まあ、意図としてはあっているはずなので細かいことは気にせず実装しちゃいましょう!
退室処理のソースコード
ライブラリの方は未対応なようなので、HTTP/3のイベント処理のところに書きます。
やること自体はデータの先頭2バイトを検証して、0x6843
ならデータ削除と通知をします。
(LL系言語にありがちなチェックと、ストリーム自体が終了しているかのチェックを一応しておきます)
class WebTransportProtocol(QuicConnectionProtocol):
def _h3_event_received(self, event: H3Event) -> None:
if ヘッダーなら...
elif isinstance(event, DataReceived):
# CLOSE_WEBTRANSPORT_SESSION 0x2843 なら退室処理をする なぜか送られてくるデータは0x6843になっている??
if (hasattr(event, 'data') and len(event.data) >= 3 and event.stream_ended and
event.data[0] == 0x68 and event.data[1] == 0x43):
name = members[self._http._quic.host_cid][0]
del members[self._http._quic.host_cid]
broadcast('server', name + 'さんがログアウトしました')
終わりに
いかがだったでしょうか? (最近、この文言をホントよく見かけますね)
テキストチャットくらい、日曜の午後にさくっとできるだろう、と思っていたら何日か経ってしまいました。
そんなに難しいことはしていないはずなんですが、抽象化がもれる、とはこのことを言うのでしょうか。
まあ、そんなことよりそろそろねこかホリネズミかどちらかWebTransport対応きて欲しいものです。
WebCodecもM94でリリースされていて、いろいろ使い道が広がりそうなのでその辺りも見ておきたいところですね。