1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

MongoDB + mongooseでMySQLのSELECT ... FOR UPDATEのような排他ロックを実装する

Last updated at Posted at 2024-11-17

はじめに

MongoDB + mongooseでMySQLの SELECT ... FOR UPDATE のような排他ロックを実装しました。

何を実現したいのか?

以下のような処理があるとします。

const user = await this.user.findOneById(id);
const amount = await calculateAmount();
await user.updateOne({amount: user.amount + amount});

userコレクションのドキュメントをid指定で取得し、amountを calculateAmount メソッドで計算し、user.amountに追加します。

トランザクションを貼らない場合、並列に処理が走るとデータの一貫性が保てず、amountの値がおかしくなってしまうため、トランザクションを張ることを考えます。

mongooseで以下のようにトランザクションを貼ると、すでに他のトランザクションによって user コレクションのデータが更新されているなどして一貫性が保てない場合、 writeConflict エラーが発生して、処理が異常終了します。

    const session = await this.connection.startSession();
    session.startTransaction();
    try {
      const user = await this.user.findOneById(id).session(session);
      const amount = await calculateAmount();
      await user.updateOne({amount: user.amount + amount}).session(session);
      await session.commitTransaction();
    } catch (error) {
      await session.abortTransaction();
      throw error;
    } finally {
      session.endSession();
    }

以下のように session.withTransaction を使ったコールバック形式に変えると、writeConflict エラーが発生しても成功するまで引数のコールバック全体を再試行してくれます。

    try {
      await session.withTransaction(async () => {
        const user = await this.user.findOneById(id).session(session);
        const amount = await calculateAmount();
        await user.updateOne({amount: user.amount + amount}).session(session);
      });
    } finally {
      await session.endSession();
    }
    

これで一貫性を保つことはできます。
しかし、問題は更新処理(上記のuser.updateOne)の前にMongoDb以外のデータストアへの保存や外部APIを叩く処理があると、それらの処理が何回も再試行されてしまい、バグやパフォーマンスなどの問題が発生してしまいます。

MySQLではデータの一貫性を保つため、更新するテーブルに対して以下のような FOR UPDATE のクエリを発行して排他ロックをかけることができます。

SELECT * FROM user WHERE id = 1 FOR UPDATE;

並列にこのトランザクションを実行した場合、後続のトランザクションはこのSQLを流した箇所で処理が止まるため、上記のような心配がありません。
今回はこれをMongoDb + mongooseで実装しました。

バージョン

実装して挙動確認した環境は以下です。
フレームワークはNest.jsを使っています。

対象 バージョン
MongoDb 6.0.19
mongoose 8.1.1
nestjs/mongoose 10.0.2

設計

こちらの方法を使います。
MongoDB のトランザクション内で SELECT ... FOR UPDATE を 使用する方法

ドキュメントをfindする時に、 findAndUpdate を使って更新をします。
ロックの値専用のフィールドを用意し、それを乱数を使って更新することで実質的に必ず更新されるようにします。
並列にトランザクションが走った場合、後続のトランザクションは findAndUpdate で更新が失敗して再試行されるため、ロックで処理が止まることをエミュレートできます。

実装

以下のようなメソッドを実装しました。
ロックをかけたい場合、トランザクションの中でこのメソッドを呼ぶようにすることでMySQLの SELECT ... FOR UPDATE のような排他ロックを実現できます。

  async findOneByIdForUpdate(
    id: string,
    session: ClientSession
  ): Promise<UserDocument> {
    
    // lockIdフィールドをランダムな値で更新することで必ず更新されるようにする(同じ値だとロックされない)
    const query = this.user
      .findByIdAndUpdate(id, { $set: { lockId: generateRandomId() } })
      .session(session ?? null);
    return query;
  

最終的に実現したかった実装は以下になります。
トランザクションが並列に走る場合、後続のトランザクションは findOneByIdForUpdate の行でエラーになってコールバック関数が再試行されるので、その後の処理は実行されません。
これで user.amount の一貫性を保つ処理を実装することができました。

    try {
      await session.withTransaction(async () => {
        const user = await this.findOneByIdForUpdate(id, session);
        const amount = await calculateAmount();
        await user.updateOne({amount: user.amount + amount}).session(session);
      });
    } finally {
      await session.endSession();
    }
    

注意点

session.startTransaction ではなくて session.withTransaction を使う

mongooseのトランザクションは session.startTransactionを使う方法とsession.withTransactionを使う方法がありますが、session.withTransactionを使いましょう。
session.withTransactionwriteConflict エラーの際に自動で引数のコールバックの処理を再試行してくれますが、session.startTransactionはエラーで落ちてしまいます。
session.startTransactionを使う場合は再試行処理を自前で実装する必要があります。

findOneByIdForUpdate をトランザクションの初めで呼ぶ

トランザクションが並列に走る際、 findOneByIdForUpdate の前の処理は findOneByIdForUpdate が成功するまで何度も呼ばれてしまうため、注意が必要です。
例えば、以下のような処理があるとします。

    try {
      await session.withTransaction(async () => {
        処理A
        const user = await this.findOneByIdForUpdate(id, session);
        処理B
      });
    } finally {
      await session.endSession();
    }
    

このトランザクションが並列に実行される時、処理Bはロック後なので1回しか実行されませんが、処理Aは後続のトランザクションがロックを取得できるまで何度も実行されてしまいます。
なので、ここにMongoDb以外のデータストアへの保存や外部APIを叩く処理があると、それらの処理が何回も再試行されてしまい、バグやパフォーマンスなどの問題が発生してしまいます。
ロックの処理はなるべくトランザクションの頭で呼びましょう。

終わりに

MongoDBでMySQLの SELECT ... FOR UPDATE の処理を実装しました。
ただ、MySQLと異なりロックの処理の前までが何回も再試行されてしまうため、実装者がそこに気を配らないといけなく、MySQLのロックの方が扱いやすいと思いました。
トランザクションの初めでロックの処理を呼ばないとそれ以前の処理が再試行されてしまう問題はありますが、そこだけ気をつければ排他ロックを実現できるので、みなさんも使ってみてください!

参考

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?