はじめに
MongoDB + mongooseでMySQLの SELECT ... FOR UPDATE
のような排他ロックを実装しました。
何を実現したいのか?
以下のような処理があるとします。
const user = await this.user.findOneById(id);
const amount = await calculateAmount();
await user.updateOne({amount: user.amount + amount});
user
コレクションのドキュメントをid指定で取得し、amountを calculateAmount
メソッドで計算し、user.amount
に追加します。
トランザクションを貼らない場合、並列に処理が走るとデータの一貫性が保てず、amount
の値がおかしくなってしまうため、トランザクションを張ることを考えます。
mongooseで以下のようにトランザクションを貼ると、すでに他のトランザクションによって user
コレクションのデータが更新されているなどして一貫性が保てない場合、 writeConflict
エラーが発生して、処理が異常終了します。
const session = await this.connection.startSession();
session.startTransaction();
try {
const user = await this.user.findOneById(id).session(session);
const amount = await calculateAmount();
await user.updateOne({amount: user.amount + amount}).session(session);
await session.commitTransaction();
} catch (error) {
await session.abortTransaction();
throw error;
} finally {
session.endSession();
}
以下のように session.withTransaction
を使ったコールバック形式に変えると、writeConflict
エラーが発生しても成功するまで引数のコールバック全体を再試行してくれます。
try {
await session.withTransaction(async () => {
const user = await this.user.findOneById(id).session(session);
const amount = await calculateAmount();
await user.updateOne({amount: user.amount + amount}).session(session);
});
} finally {
await session.endSession();
}
これで一貫性を保つことはできます。
しかし、問題は更新処理(上記のuser.updateOne
)の前にMongoDb以外のデータストアへの保存や外部APIを叩く処理があると、それらの処理が何回も再試行されてしまい、バグやパフォーマンスなどの問題が発生してしまいます。
MySQLではデータの一貫性を保つため、更新するテーブルに対して以下のような FOR UPDATE
のクエリを発行して排他ロックをかけることができます。
SELECT * FROM user WHERE id = 1 FOR UPDATE;
並列にこのトランザクションを実行した場合、後続のトランザクションはこのSQLを流した箇所で処理が止まるため、上記のような心配がありません。
今回はこれをMongoDb + mongooseで実装しました。
バージョン
実装して挙動確認した環境は以下です。
フレームワークはNest.jsを使っています。
対象 | バージョン |
---|---|
MongoDb | 6.0.19 |
mongoose | 8.1.1 |
nestjs/mongoose | 10.0.2 |
設計
こちらの方法を使います。
MongoDB のトランザクション内で SELECT ... FOR UPDATE を 使用する方法
ドキュメントをfindする時に、 findAndUpdate
を使って更新をします。
ロックの値専用のフィールドを用意し、それを乱数を使って更新することで実質的に必ず更新されるようにします。
並列にトランザクションが走った場合、後続のトランザクションは findAndUpdate
で更新が失敗して再試行されるため、ロックで処理が止まることをエミュレートできます。
実装
以下のようなメソッドを実装しました。
ロックをかけたい場合、トランザクションの中でこのメソッドを呼ぶようにすることでMySQLの SELECT ... FOR UPDATE
のような排他ロックを実現できます。
async findOneByIdForUpdate(
id: string,
session: ClientSession
): Promise<UserDocument> {
// lockIdフィールドをランダムな値で更新することで必ず更新されるようにする(同じ値だとロックされない)
const query = this.user
.findByIdAndUpdate(id, { $set: { lockId: generateRandomId() } })
.session(session ?? null);
return query;
最終的に実現したかった実装は以下になります。
トランザクションが並列に走る場合、後続のトランザクションは findOneByIdForUpdate
の行でエラーになってコールバック関数が再試行されるので、その後の処理は実行されません。
これで user.amount
の一貫性を保つ処理を実装することができました。
try {
await session.withTransaction(async () => {
const user = await this.findOneByIdForUpdate(id, session);
const amount = await calculateAmount();
await user.updateOne({amount: user.amount + amount}).session(session);
});
} finally {
await session.endSession();
}
注意点
session.startTransaction
ではなくて session.withTransaction
を使う
mongooseのトランザクションは session.startTransaction
を使う方法とsession.withTransaction
を使う方法がありますが、session.withTransaction
を使いましょう。
session.withTransaction
は writeConflict
エラーの際に自動で引数のコールバックの処理を再試行してくれますが、session.startTransaction
はエラーで落ちてしまいます。
session.startTransaction
を使う場合は再試行処理を自前で実装する必要があります。
findOneByIdForUpdate
をトランザクションの初めで呼ぶ
トランザクションが並列に走る際、 findOneByIdForUpdate
の前の処理は findOneByIdForUpdate
が成功するまで何度も呼ばれてしまうため、注意が必要です。
例えば、以下のような処理があるとします。
try {
await session.withTransaction(async () => {
処理A
const user = await this.findOneByIdForUpdate(id, session);
処理B
});
} finally {
await session.endSession();
}
このトランザクションが並列に実行される時、処理Bはロック後なので1回しか実行されませんが、処理Aは後続のトランザクションがロックを取得できるまで何度も実行されてしまいます。
なので、ここにMongoDb以外のデータストアへの保存や外部APIを叩く処理があると、それらの処理が何回も再試行されてしまい、バグやパフォーマンスなどの問題が発生してしまいます。
ロックの処理はなるべくトランザクションの頭で呼びましょう。
終わりに
MongoDBでMySQLの SELECT ... FOR UPDATE
の処理を実装しました。
ただ、MySQLと異なりロックの処理の前までが何回も再試行されてしまうため、実装者がそこに気を配らないといけなく、MySQLのロックの方が扱いやすいと思いました。
トランザクションの初めでロックの処理を呼ばないとそれ以前の処理が再試行されてしまう問題はありますが、そこだけ気をつければ排他ロックを実現できるので、みなさんも使ってみてください!