朝日新聞社・サービス開発部の松山莞太と申します。
Yjsを用いた共同編集ツールを開発している場合、ユーザーがテキストを編集するたびに、サーバーへ「更新内容(update)」を送信します。その結果、大量のDBの書き込みが発生し、サーバーやDBに負荷がかかることがあります。
特に、Yjsのバックエンドがサーバレスになっている場合、サーバー側での書き込みQueueの制御が困難なため、フロント側で通信を間引く必要があります。
これに対処するため、DBのスペックを上げる必要がありますが、コストも増大するのでその手段は取りたくないですよね。
今回私は、ユーザーの編集内容の送信間隔を間引くことで対応しました。
送信間隔が開くためリアルタイム性は減ってしまいますが、代わりに負荷を削減できました。
今回の説明ではy-websocketをカスタマイズして、アップデートの送信を間引き、複数の更新を一つにまとめて一括送信する方法を解説します。
他の通信方法を利用する際は、読み替えてカスタマイズしてください。
y-websocketにおける送信部分のデフォルト実装
説明の大前提として、どのようにしてYjsの更新がサーバーに送信されるのかを説明します。
Yjsでは、ユーザーが編集しているドキュメントをYDocオブジェクトとしてクライアント側で管理します。そして、y-websocketは、このYDoc
が更新されるたびにサーバーへその変更を送信します。
具体的には、doc.on('update', this._updateHandler)
というコードで、ドキュメントに更新があったときに、_updateHandler
関数が呼ばれるようになっています。
this._updateHandler = (update, origin) => {
// ここでupdateがエンコード(バイナリ化)され、サーバーに送信される。
broadcastMessage(this, encoding.toUint8Array(encoder))
}
const broadcastMessage = (provider, buf) => {
// websocket接続で、データが送信される
const ws = provider.ws
ws.send(buf)
}
this.doc.on('update', this._updateHandler)
つまりは、
- ユーザーが何かを編集し、Docが更新される
-
doc.on("update",...)
によって、_updateHandler
が呼ばれる -
_updateHandler
内で、update
(変更内容)をWebSocketでサーバーへ送信する
この実装では、ユーザーが共同編集を少しでも編集するたびにupdateを送りつけるので、多量の通信が発生し、DBへの負荷が高まります。
DBのCPU稼働率に余裕を持たせたかったり、コストにシビアな場合は、工夫して通信量を削減する必要があります。
負荷を減らす方法
通信負荷を減らすために、短時間に連続して発生するアップデートを「ためて」おき、一定間隔ごとに一括送信します。その際、YjsのmergeUpdates
を使って複数のアップデートを一つにまとめます。
手順は3ステップです。
- アップデートの送信を間引く
- 溜まったアップデートをマージする
- マージされたアップデートを送信する
1. アップデートの送信を間引く
短い間に複数回の編集が行われていても、すぐには送らず、一定間隔でしか送らせないようにします。
これは、スロットリングによって実現できます。myFuncの実行間隔を間引きたいときは次のように書きます。
import { throttle } from 'lodash';
const throttledMyFunc = throttle(
() => {
myFunc()
},
5000 //実行間隔
);
throttledMyFunc
は、短期間に繰り返し実行しても5秒に1回しか実行されません。
setInterval(throttledMyFunc, 1000) //1秒ごとに実行
// => 5秒おきにしか実行されない
これにより、関数が高頻度に実行されるのを抑制できます。
共同編集では、編集内容の送信をスロットリングで抑制できます。
const throttledBroadcastMessage = throttle(
() => {
// ...送信処理
},
5000
);
2. 溜まったアップデートをマージする
次に、スロットリングによって溜まったアップデートを一つにマージします。
yjsではmergeUpdates
でアップデートを一つにマージできます。
const mergedUpdate = Y.mergeUpdates([update1, update2, update3])
これを利用します。
updateがトリガーされた際には、まずは_updates
配列にアップデートを取り込むようにします。
this._updates = [];
this._updateHandler = (update, origin) => {
// 更新を配列に溜め込む
this._updates.push(update);
this.throttledBroadcastMessage();
};
そして、throttledBroadcastMessage内で_updatesをマージします。
this.throttledBroadcastMessage = () => {
//マージ
const mergedUpdate = Y.mergeUpdates(this._updates)
// リセット
this._updates = [];
// mergedUpdateを送信する処理が続く...
};
3. マージされたアップデートを送信する
マージしたアップデートを通常通りエンコードして、WebSocketで一括送信します。
this._batchBroadcastUpdates = () => {
// アップデートをマージ
const mergedUpdate = Y.mergeUpdates(this._updates)
this._updates = [];
// エンコード
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageSync);
syncProtocol.writeUpdate(encoder, mergedUpdate);
// エンコードされたupdateを送信
broadcastMessage(this, encoding.toUint8Array(encoder), false);
};
実装全体
まとめると次のようになります。
// y-websocket
export class WebsocketProvider extends Observable<string> {
constructor(doc, ...) {
// ...省略...
// 溜め込み用の配列
this._updates = [];
// 溜まったアップデートをマージして送信する処理。
this._batchBroadcastUpdates = () => {
// 貯めたアップデートを統合
const mergedUpdate = Y.mergeUpdates(this._updates)
this._updates = [];
// エンコード
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageSync);
syncProtocol.writeUpdate(encoder, mergedUpdate);
// 一括送信
broadcastMessage(this, encoding.toUint8Array(encoder), false);
};
// スロットリング(5秒に1回だけ実行)
this._throttledBroadcast = throttle(() => {
this._batchBroadcastUpdates();
}, 5000);
// ユーザーが編集した際に動作する関数
this._updateHandler = (update, origin) => {
// 自分以外が原因のupdateの場合のみ処理
if (origin !== this) {
// アップデートを蓄積
this._updates.push(update);
// スロットリングされた関数を呼ぶ
this._throttledBroadcast();
}
};
// YDocのupdateイベントを監視
this.doc.on('update', this._updateHandler);
}
// ...省略...
}
まとめ
デフォルトでは編集ごとに即時にサーバーへ送信するため、DBへの負荷が増大します。
一方で、スロットリングで送信間隔を空けることで、複数アップデートをマージして一括送信することで、負荷とコストを削減できます。
リアルタイム性はやや下がりますが、負荷軽減とコスト抑制できます。
この手法を用いることで、DBへの負荷を緩和して、コストを抑えつつ、共同編集を安定して運用できます。
(サービス開発部・松山莞太)