11
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

朝日新聞社Advent Calendar 2024

Day 13

Yjsの共同編集でアップデート送信を間引いてDB負荷を軽減する方法

Last updated at Posted at 2024-12-12

朝日新聞社・サービス開発部の松山莞太と申します。

Yjsを用いた共同編集ツールを開発している場合、ユーザーがテキストを編集するたびに、サーバーへ「更新内容(update)」を送信します。その結果、大量のDBの書き込みが発生し、サーバーやDBに負荷がかかることがあります。
特に、Yjsのバックエンドがサーバレスになっている場合、サーバー側での書き込みQueueの制御が困難なため、フロント側で通信を間引く必要があります。

これに対処するため、DBのスペックを上げる必要がありますが、コストも増大するのでその手段は取りたくないですよね。
今回私は、ユーザーの編集内容の送信間隔を間引くことで対応しました。
送信間隔が開くためリアルタイム性は減ってしまいますが、代わりに負荷を削減できました。

今回の説明ではy-websocketをカスタマイズして、アップデートの送信を間引き、複数の更新を一つにまとめて一括送信する方法を解説します。
他の通信方法を利用する際は、読み替えてカスタマイズしてください。

y-websocketにおける送信部分のデフォルト実装

説明の大前提として、どのようにしてYjsの更新がサーバーに送信されるのかを説明します。
Yjsでは、ユーザーが編集しているドキュメントをYDocオブジェクトとしてクライアント側で管理します。そして、y-websocketは、このYDocが更新されるたびにサーバーへその変更を送信します。
具体的には、doc.on('update', this._updateHandler)というコードで、ドキュメントに更新があったときに、_updateHandler関数が呼ばれるようになっています。

this._updateHandler = (update, origin) => {
  // ここでupdateがエンコード(バイナリ化)され、サーバーに送信される。
  broadcastMessage(this, encoding.toUint8Array(encoder))
}
const broadcastMessage = (provider, buf) => {
  // websocket接続で、データが送信される
  const ws = provider.ws
  ws.send(buf)
}

this.doc.on('update', this._updateHandler)

つまりは、

  1. ユーザーが何かを編集し、Docが更新される
  2. doc.on("update",...)によって、_updateHandlerが呼ばれる
  3. _updateHandler内で、update(変更内容)をWebSocketでサーバーへ送信する

この実装では、ユーザーが共同編集を少しでも編集するたびにupdateを送りつけるので、多量の通信が発生し、DBへの負荷が高まります。
DBのCPU稼働率に余裕を持たせたかったり、コストにシビアな場合は、工夫して通信量を削減する必要があります。

負荷を減らす方法

通信負荷を減らすために、短時間に連続して発生するアップデートを「ためて」おき、一定間隔ごとに一括送信します。その際、YjsのmergeUpdatesを使って複数のアップデートを一つにまとめます。

手順は3ステップです。

  1. アップデートの送信を間引く
  2. 溜まったアップデートをマージする
  3. マージされたアップデートを送信する

1. アップデートの送信を間引く

短い間に複数回の編集が行われていても、すぐには送らず、一定間隔でしか送らせないようにします。
これは、スロットリングによって実現できます。myFuncの実行間隔を間引きたいときは次のように書きます。

import { throttle } from 'lodash';

const throttledMyFunc = throttle(
  () => {
    myFunc()
  }, 
  5000 //実行間隔
);

throttledMyFuncは、短期間に繰り返し実行しても5秒に1回しか実行されません。

setInterval(throttledMyFunc, 1000) //1秒ごとに実行

// => 5秒おきにしか実行されない

これにより、関数が高頻度に実行されるのを抑制できます。
共同編集では、編集内容の送信をスロットリングで抑制できます。

const throttledBroadcastMessage = throttle(
  () => {
    // ...送信処理
  }, 
  5000 
);

2. 溜まったアップデートをマージする

次に、スロットリングによって溜まったアップデートを一つにマージします。

yjsではmergeUpdatesでアップデートを一つにマージできます。

const mergedUpdate = Y.mergeUpdates([update1, update2, update3])

これを利用します。
updateがトリガーされた際には、まずは_updates配列にアップデートを取り込むようにします。

this._updates = [];

this._updateHandler = (update, origin) => {
  // 更新を配列に溜め込む
  this._updates.push(update);
  this.throttledBroadcastMessage();
};

そして、throttledBroadcastMessage内で_updatesをマージします。

this.throttledBroadcastMessage = () => {
  //マージ
  const mergedUpdate = Y.mergeUpdates(this._updates)
  // リセット
  this._updates = []; 
  // mergedUpdateを送信する処理が続く...
};

3. マージされたアップデートを送信する

マージしたアップデートを通常通りエンコードして、WebSocketで一括送信します。

this._batchBroadcastUpdates = () => {
  // アップデートをマージ
  const mergedUpdate = Y.mergeUpdates(this._updates)
  this._updates = [];
  
  // エンコード
  const encoder = encoding.createEncoder();
  encoding.writeVarUint(encoder, messageSync);
  syncProtocol.writeUpdate(encoder, mergedUpdate);

  // エンコードされたupdateを送信
  broadcastMessage(this, encoding.toUint8Array(encoder), false);
};

実装全体

まとめると次のようになります。

// y-websocket
export class WebsocketProvider extends Observable<string> {
  constructor(doc, ...) {
    // ...省略...

    // 溜め込み用の配列
    this._updates = [];

    // 溜まったアップデートをマージして送信する処理。
    this._batchBroadcastUpdates = () => {
      // 貯めたアップデートを統合
      const mergedUpdate = Y.mergeUpdates(this._updates)
      this._updates = [];

      // エンコード
      const encoder = encoding.createEncoder();
      encoding.writeVarUint(encoder, messageSync);
      syncProtocol.writeUpdate(encoder, mergedUpdate);

      // 一括送信
      broadcastMessage(this, encoding.toUint8Array(encoder), false);
    };

    // スロットリング(5秒に1回だけ実行)
    this._throttledBroadcast = throttle(() => {
      this._batchBroadcastUpdates();
    }, 5000);

    // ユーザーが編集した際に動作する関数
    this._updateHandler = (update, origin) => {
      // 自分以外が原因のupdateの場合のみ処理
      if (origin !== this) {
        // アップデートを蓄積
        this._updates.push(update);
        // スロットリングされた関数を呼ぶ
        this._throttledBroadcast();
      }
    };

    // YDocのupdateイベントを監視
    this.doc.on('update', this._updateHandler);
  }
  // ...省略...
}

まとめ

デフォルトでは編集ごとに即時にサーバーへ送信するため、DBへの負荷が増大します。
一方で、スロットリングで送信間隔を空けることで、複数アップデートをマージして一括送信することで、負荷とコストを削減できます。
リアルタイム性はやや下がりますが、負荷軽減とコスト抑制できます。

この手法を用いることで、DBへの負荷を緩和して、コストを抑えつつ、共同編集を安定して運用できます。

(サービス開発部・松山莞太)

11
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?