0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【第3回】`SELECT FOR UPDATE` で並行 INSERT のシーケンス重複を防ぐ

0
Posted at

目次

この記事で扱うこと

この記事では、次を扱います。

  • MAX(seq) + 1 が並行実行で壊れる理由
  • テナント別の連番状態を別テーブルに切り出す設計
  • SELECT FOR UPDATE による行ロック
  • INSERT ON CONFLICT DO NOTHING で初回行を作る理由
  • LOCK TABLE やグローバル SEQUENCE が向かない理由
  • 第 5 回の HMAC チェーンとのつながり

なぜ MAX(seq) + 1 は並行実行で競合するのか

監査ログテーブルを考えます。

CREATE TABLE audit_logs (
  id UUID PRIMARY KEY,
  tenant_id UUID NOT NULL,
  seq BIGINT NOT NULL,
  data JSONB,
  UNIQUE (tenant_id, seq)
);

テナントごとに seq を連番にしたいとします。

まず思いつきやすいのは、現在の最大値を読んで 1 を足す方法です。

import { randomUUID } from 'node:crypto';

async function appendAuditLog(
  pool: Pool,
  tenantId: string,
  data: unknown,
) {
  const maxResult = await pool.query<{ max_seq: string | null }>(
    `
    SELECT MAX(seq) AS max_seq
    FROM audit_logs
    WHERE tenant_id = $1
    `,
    [tenantId],
  );

  const seq = BigInt(maxResult.rows[0].max_seq ?? '0') + 1n;

  await pool.query(
    `
    INSERT INTO audit_logs (id, tenant_id, seq, data)
    VALUES ($1, $2, $3, $4)
    `,
    [
      randomUUID(),
      tenantId,
      String(seq),
      JSON.stringify(data),
    ],
  );
}

単独のリクエストであれば、この実装でも動きます。

しかし、同じテナントに対して 2 つのリクエストが同時に来ると、同じ seq を計算してしまうことがあります。

1. トランザクション A が MAX(seq) = 5 を読む
2. トランザクション B も MAX(seq) = 5 を読む
3. A が seq = 6 で INSERT する
4. B も seq = 6 で INSERT する
5. UNIQUE (tenant_id, seq) に違反する

UNIQUE 制約があるので、片方は失敗します。

失敗したらリトライすればよいようにも見えます。

しかし、負荷が高いと何度も同じ競合が起きます。リトライが増えるほど DB 負荷も増えます。

根本的には、「同じテナントの seq 払い出し」を直列化する必要があります。

連番状態を別テーブルで管理する

よく使う設計は、テナントごとの現在の連番を別テーブルに持つ方法です。

CREATE TABLE tenant_seq (
  tenant_id UUID PRIMARY KEY,
  last_seq BIGINT NOT NULL DEFAULT 0
);

このテーブルは、テナントごとに 1 行だけ持ちます。

ここでは説明を簡単にするため、テナント ID を A / B / C と書きます。実際のテーブルでは UUID を使います。

tenant_id = A, last_seq = 5
tenant_id = B, last_seq = 10
tenant_id = C, last_seq = 2

新しい監査ログを追加するときは、次の流れにします。

1. tenant_seq の該当 tenant_id の行をロックする
2. last_seq を読む
3. last_seq + 1 を次の seq とする
4. tenant_seq.last_seq を更新する
5. audit_logs に INSERT する
6. COMMIT する

この「該当 tenant_id の行をロックする」ために使うのが SELECT FOR UPDATE です。

SELECT FOR UPDATE で行ロックを取る

まず、連番を払い出す関数を作ります。

async function advanceTenantSeq(
  client: PoolClient,
  tenantId: string,
): Promise<bigint> {
  await client.query(
    `
    INSERT INTO tenant_seq (tenant_id, last_seq)
    VALUES ($1, 0)
    ON CONFLICT (tenant_id) DO NOTHING
    `,
    [tenantId],
  );

  const result = await client.query<{ last_seq: string }>(
    `
    SELECT last_seq
    FROM tenant_seq
    WHERE tenant_id = $1
    FOR UPDATE
    `,
    [tenantId],
  );

  const seq = BigInt(result.rows[0].last_seq) + 1n;

  await client.query(
    `
    UPDATE tenant_seq
    SET last_seq = $1
    WHERE tenant_id = $2
    `,
    [String(seq), tenantId],
  );

  return seq;
}

ポイントはここです。

SELECT last_seq
FROM tenant_seq
WHERE tenant_id = $1
FOR UPDATE

FOR UPDATE を付けると、取得した行にロックがかかります。

同じ tenant_id の行を別トランザクションが FOR UPDATE しようとすると、先にロックしたトランザクションが終わるまで待ちます。

INSERT 側は同じトランザクションで行う

advanceTenantSeq は、必ず監査ログの INSERT と同じトランザクションで呼びます。

import { randomUUID } from 'node:crypto';

async function appendAuditLog(
  pool: Pool,
  tenantId: string,
  data: unknown,
) {
  const client = await pool.connect();

  try {
    await client.query('BEGIN');

    const seq = await advanceTenantSeq(client, tenantId);

    await client.query(
      `
      INSERT INTO audit_logs (id, tenant_id, seq, data)
      VALUES ($1, $2, $3, $4)
      `,
      [
        randomUUID(),
        tenantId,
        String(seq),
        JSON.stringify(data),
      ],
    );

    await client.query('COMMIT');
  } catch (err) {
    await client.query('ROLLBACK').catch(() => {});
    throw err;
  } finally {
    client.release();
  }
}

トランザクション全体はこうなります。

BEGIN
  tenant_seq の行を初期化する
  tenant_seq の行を FOR UPDATE でロックする
  last_seq + 1 を計算する
  tenant_seq を更新する
  audit_logs に INSERT する
COMMIT

COMMIT するまで、同じテナントの tenant_seq 行は他のトランザクションから更新できません。

並行実行時に何が起きるか

同じテナント X に対して、A と B の 2 つの処理が同時に走ったとします。

1. A が BEGIN
2. A が tenant_seq の tenant_id = X を FOR UPDATE
   → ロック取得。last_seq = 5

3. B が BEGIN
4. B も tenant_id = X を FOR UPDATE
   → A が COMMIT / ROLLBACK するまで待つ

5. A が seq = 6 で audit_logs に INSERT
6. A が tenant_seq.last_seq = 6 に更新
7. A が COMMIT
   → ロック解放

8. B の FOR UPDATE が進む
   → last_seq = 6 を読む

9. B が seq = 7 で audit_logs に INSERT
10. B が tenant_seq.last_seq = 7 に更新
11. B が COMMIT

結果として、同じテナントでは必ず次のように並びます。

seq = 6
seq = 7
seq = 8
...

一方で、別テナントの操作は別の行をロックします。

そのため、テナント A の連番払い出しは、テナント B の連番払い出しを待たせません。

tenant_id = A の行ロック
tenant_id = B の行ロック

ロックの単位がテナントごとの 1 行なので、SaaS のワークロードに合いやすい設計です。

なぜ INSERT ON CONFLICT DO NOTHING が必要か

advanceTenantSeq の最初に、次の SQL を実行しています。

INSERT INTO tenant_seq (tenant_id, last_seq)
VALUES ($1, 0)
ON CONFLICT (tenant_id) DO NOTHING

これは、初回テナントの行を作るためです。

SELECT FOR UPDATE は、存在する行にしかロックをかけられません。

もし tenant_seq に行がない状態で次を実行すると、

SELECT last_seq
FROM tenant_seq
WHERE tenant_id = $1
FOR UPDATE

結果は 0 行です。

ロックする対象もありません。

そのため、先に INSERT ... ON CONFLICT DO NOTHING で、行がなければ作ります。

既に行がある場合は、何もしません。

この SQL は何度実行しても安全です。

初回:
  tenant_seq に行を作る

2 回目以降:
  既にあるので何もしない

同じテナントの初回リクエストが同時に 2 つ来ても、片方だけが tenant_seq の行を作り、もう片方は ON CONFLICT DO NOTHING で何もしません。その後、両方とも同じ行に対して SELECT ... FOR UPDATE を行うため、連番払い出しは直列化されます。

ここで ON CONFLICT DO UPDATE を使う必要はありません。

毎回更新すると、余計な書き込みやロック競合の原因になります。初期化だけが目的なので、DO NOTHING で十分です。

アンチパターン: LOCK TABLE で全体を止める

テーブル全体をロックすれば競合は防げます。

LOCK TABLE tenant_seq IN EXCLUSIVE MODE;

しかし、これは避けたほうがよいです。

テーブル全体をロックすると、別テナントの処理まで止まります。

テナント A の処理が tenant_seq 全体をロック
  ↓
テナント B の処理も待たされる
  ↓
テナント C の処理も待たされる

マルチテナント SaaS では、あるテナントの重い処理が、他テナントの書き込みを止める状態は避けたいです。

SELECT FOR UPDATE なら、ロック対象は該当テナントの 1 行だけです。

tenant_id = A の行だけロック
tenant_id = B の行は別に進める

この違いにより、同じテナント内の順序は守りつつ、別テナントの処理は並行に進められます。

アンチパターン: グローバル SEQUENCE を使う

PostgreSQL には SEQUENCE があります。

CREATE SEQUENCE global_audit_seq;

これを使うと、グローバルな連番は比較的簡単に作れます。

しかし、テナント別の連番には向きません。

たとえば、全テナントで 1 つの SEQUENCE を共有すると、次のようになります。

テナント A のログ → seq = 1
テナント B のログ → seq = 2
テナント A のログ → seq = 3

テナント A から見ると、seq は 1, 3 になります。

2 が飛んでいます。

グローバルに一意な番号でよいなら SEQUENCE は便利です。

しかし、テナントごとに 1, 2, 3... と連続させたい場合は、tenant_seq のようなスコープ別の状態テーブルを使うほうが向いています。

第 5 回の HMAC チェーンとの関係

このシリーズの第 5 回では、監査ログの HMAC チェーンを扱います。

HMAC チェーンでは、監査ログが次のように連続していることが重要になります。

seq = 1
seq = 2
seq = 3
...

途中の行が削除されると、seq の gap で検出できます。

seq = 1
seq = 2
seq = 4

そのため、HMAC チェーンでは、テナントごとの seq が重複せず、順番に払い出されることが前提になります。

今回の tenant_seq + SELECT FOR UPDATE は、第 5 回の HMAC チェーンの土台になります。

まとめ

テナント別の連番を払い出すとき、素朴な MAX(seq) + 1 は並行 INSERT で壊れます。

A が MAX(seq) = 5 を読む
B も MAX(seq) = 5 を読む
A が seq = 6 で INSERT
B も seq = 6 で INSERT

これを防ぐには、テナントごとの連番状態を別テーブルに切り出し、SELECT FOR UPDATE で行ロックを取ります。

SELECT last_seq
FROM tenant_seq
WHERE tenant_id = $1
FOR UPDATE

要点は次のとおりです。

tenant_seq:
  テナントごとの last_seq を 1 行で持つ

INSERT ON CONFLICT DO NOTHING:
  初回テナントの行を冪等に作る

SELECT FOR UPDATE:
  同じテナントの連番払い出しを直列化する

トランザクション:
  seq 払い出しと audit_logs INSERT をまとめて成功・失敗させる

LOCK TABLE は別テナントまで止めるため避けます。

グローバル SEQUENCE は、テナント別の連番には向きません。

テナントごとの順序が必要な場面では、tenant_seqSELECT FOR UPDATE の組み合わせを使うと、安全で分かりやすい設計になります。

次回は、PostgreSQL の IS DISTINCT FROM を使って、NULL を含む比較を安全に扱う方法を見ます。

参考資料

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?