目次
- 第 1 回 — Row Level Security でテナント境界を DB レベルで強制する(Qiita 掲載後に URL を差し替え)
- 第 2 回 —
SET LOCALでトランザクションスコープのセッション変数を使う - 第 3 回 —
SELECT FOR UPDATEで並行 INSERT のシーケンス重複を防ぐ - 第 4 回 —
IS DISTINCT FROMで NULL 安全に比較する - 第 5 回 — HMAC チェーンで監査ログの改ざんを検出する
この記事で扱うこと
この記事では、次を扱います。
- HMAC とは何か
- なぜ 1 行ごとの HMAC だけでは足りないのか
- HMAC チェーンの考え方
- 監査ログテーブルの設計
- 書き込み時の HMAC 計算
- 検証時の再計算
- 検出できる範囲とできない範囲
- シリーズ全体とのつながり
なぜ監査ログに改ざん検出が必要なのか
監査ログは、インシデント対応や内部統制で重要になります。
たとえば、次のような場面です。
- 誰が設定を変更したのかを後から確認したい
- 不正操作の痕跡を調べたい
- SOC 2 や GDPR などの監査対応をしたい
- 管理者による操作を追跡したい
ここで問題になるのは、「監査ログ自身が改ざんされていないこと」をどう確認するかです。
監査ログが DB に入っているだけだと、次のようなリスクが残ります。
DB 管理者が誤って UPDATE する
DB 認証情報が漏えいして外部から書き換えられる
不正操作のあとに監査ログを DELETE する
バックアップ・リストア時に一部の行が欠ける
もちろん、DB 権限を絞ることは重要です。
しかし、それだけでは「書き換えられたかどうか」を後から検出する仕組みにはなりません。
そこで、監査ログの各行に検証用の HMAC を持たせます。
HMAC とは何か
HMAC は、秘密鍵とメッセージから計算する認証コードです。
HMAC(secret, message) = 固定長の値
同じ secret と同じ message なら、同じ HMAC が計算されます。
しかし、message が 1 文字でも変わると、HMAC はまったく別の値になります。
message = "user A approved"
→ hmac = abc...
message = "user A rejected"
→ hmac = 9f3...
secret、つまり HMAC 用の秘密鍵を知らない人は、正しい HMAC を再計算できません。
つまり、保存されているデータと HMAC を後から再計算して比べれば、そのデータが書き換えられていないかを検証できます。
1 行ごとの HMAC だけでは足りない
各行に HMAC を付けるだけでも、行の内容の改ざんは検出できます。
seq=1 data=... hmac=h1
seq=2 data=... hmac=h2
seq=3 data=... hmac=h3
しかし、これだけでは「行が削除された」ことを検出しにくいです。
たとえば、攻撃者が seq=3 の行を削除したとします。
seq=1 data=... hmac=h1
seq=2 data=... hmac=h2
残った seq=1 と seq=2 の HMAC は正しいままです。
1 行ごとの HMAC だけを見ると、seq=3 が存在したことを検出できません。
そこで、各行の HMAC 計算に、直前の行の HMAC も含めます。
HMAC チェーンの考え方
HMAC チェーンでは、各行の HMAC を計算するときに、直前の行の HMAC も入力に含めます。
seq=1:
hmac1 = HMAC(secret, prev_hmacなし + row1)
seq=2:
hmac2 = HMAC(secret, hmac1 + row2)
seq=3:
hmac3 = HMAC(secret, hmac2 + row3)
seq=4:
hmac4 = HMAC(secret, hmac3 + row4)
図にすると、次のような鎖になります。
row1 ──hmac1──▶ row2 ──hmac2──▶ row3 ──hmac3──▶ row4
各行が、1 つ前の行の HMAC に依存しています。
そのため、途中の行を書き換えると、その行だけでなく、後続の行との整合性も壊れます。
改ざんをどう検出するか
たとえば、攻撃者が seq=2 の data を書き換えたとします。
seq=1 data=row1 hmac=h1
seq=2 data=row2 改ざん hmac=h2
seq=3 data=row3 prev_hmac=h2
検証時には、seq=2 のデータから HMAC を再計算します。
expected_h2 = HMAC(secret, h1 + row2 改ざん)
しかし、DB に保存されている hmac は、改ざん前の row2 から計算された h2 のままです。
比較すると一致しません。
expected_h2 != stored_h2
これで改ざんを検出できます。
また、仮に seq=2 の hmac も書き換えられたとしても、seq=3 の prev_hmac が合わなくなります。
行の削除も、seq の gap や prev_hmac の不一致で検出できます。
seq=1
seq=2
seq=4 ← seq=3 がない
このように、HMAC チェーンは「行の内容」と「行の順序」をまとめて検証する仕組みです。
テーブル設計
監査ログテーブルに、次の列を持たせます。
CREATE TABLE audit_logs (
id UUID PRIMARY KEY,
tenant_id UUID NOT NULL,
seq BIGINT NOT NULL,
operation TEXT NOT NULL,
target JSONB,
before JSONB,
after JSONB,
created_at TIMESTAMPTZ NOT NULL,
prev_hmac BYTEA,
hmac BYTEA NOT NULL,
UNIQUE (tenant_id, seq)
);
重要なのは次の 3 つです。
seq:
テナント内での連番
prev_hmac:
直前の監査ログ行の HMAC
hmac:
この行自身の HMAC
seq は第 3 回で扱った tenant_seq + SELECT FOR UPDATE のパターンで払い出します。
これにより、同じテナント内で 1, 2, 3... と順番に並びます。
seq と prev_hmac を同じトランザクションで取得する
HMAC チェーンでは、次の 2 つを同じトランザクション内で行います。
1. 次の seq を払い出す
2. 直前の `hmac` を取得する
第 3 回の tenant_seq パターンを少し拡張して、last_hmac も持たせます。
CREATE TABLE audit_log_tenant_seq (
tenant_id UUID PRIMARY KEY,
last_seq BIGINT NOT NULL DEFAULT 0,
last_hmac BYTEA
);
連番を進めるときに、この行を FOR UPDATE でロックします。
async function advanceAuditSeq(
client: PoolClient,
tenantId: string,
): Promise<{ seq: bigint; prevHmac: Buffer | null }> {
await client.query(
`
INSERT INTO audit_log_tenant_seq (tenant_id, last_seq, last_hmac)
VALUES ($1, 0, NULL)
ON CONFLICT (tenant_id) DO NOTHING
`,
[tenantId],
);
const result = await client.query<{
last_seq: string;
last_hmac: Buffer | null;
}>(
`
SELECT last_seq, last_hmac
FROM audit_log_tenant_seq
WHERE tenant_id = $1
FOR UPDATE
`,
[tenantId],
);
const row = result.rows[0];
if (!row) {
throw new Error(`audit_log_tenant_seq row not found: tenantId=${tenantId}`);
}
return {
seq: BigInt(row.last_seq) + 1n,
prevHmac: row.last_hmac,
};
}
このロックにより、同じテナントの監査ログ書き込みが直列化されます。
canonical JSON を作る
HMAC はバイト列に対して計算します。
同じ意味の JSON でも、キーの順番や空白が違うと、別のバイト列になります。
{"a":1,"b":2}
{"b":2,"a":1}
この 2 つは意味としては同じでも、文字列としては違います。
そのため、HMAC を計算する前に、必ず同じ形の JSON に正規化します。
これを canonical JSON、つまり HMAC 計算用に正規化した JSON と呼びます。
ここでは、HMAC に含めるフィールドを固定順に並べる簡略版を使います。
function buildCanonicalJson(params: {
tenant_id: string;
seq: string;
operation: string;
target: unknown;
before: unknown;
after: unknown;
created_at: string;
}): string {
return JSON.stringify({
after: params.after,
before: params.before,
created_at: params.created_at,
operation: params.operation,
seq: params.seq,
target: params.target,
tenant_id: params.tenant_id,
});
}
この例では、トップレベルのキー順を固定しています。
ネストした JSON も厳密に扱う場合は、JCS、JSON Canonicalization Scheme、のような標準的な canonicalization を検討します。
Node.js だけで書き込み、入力構造が限定されている場合は、このような固定順の JSON から始める設計も現実的です。複数言語の書き込み経路がある場合や、ネストした JSON を厳密に扱いたい場合は、JCS に沿った専用実装へ寄せるほうが安全です。
HMAC を計算する
HMAC は Node.js の crypto.createHmac で計算できます。
import { createHmac } from 'node:crypto';
function computeHmac(
secret: string,
prevHmac: Buffer | null,
canonicalJson: string,
): Buffer {
const h = createHmac('sha256', secret);
if (prevHmac) {
h.update(prevHmac);
}
h.update(canonicalJson);
return h.digest();
}
prevHmac と現在行の canonical JSON を入力にしているのがポイントです。
これにより、前の行と現在の行が鎖のようにつながります。
書き込み側の実装
監査ログを書き込む関数は、次の流れになります。
1. トランザクションを開始する
2. tenant_seq 行を FOR UPDATE でロックする
3. 次の seq と prev_hmac を取得する
4. created_at を生成する
5. canonical JSON を作る
6. HMAC を計算する
7. audit_logs に INSERT する
8. tenant_seq の last_seq / last_hmac を更新する
9. COMMIT する
実装例です。
import { createHmac, randomUUID } from 'node:crypto';
import type { PoolClient } from 'pg';
const HMAC_SECRET = process.env['AUDIT_HMAC_SECRET']!;
interface AuditLogParams {
tenantId: string;
operation: string;
target: unknown;
before: unknown;
after: unknown;
}
async function insertAuditLog(
client: PoolClient,
params: AuditLogParams,
): Promise<void> {
const { seq, prevHmac } = await advanceAuditSeq(
client,
params.tenantId,
);
const createdAt = new Date().toISOString();
const canonicalJson = buildCanonicalJson({
tenant_id: params.tenantId,
seq: String(seq),
operation: params.operation,
target: params.target,
before: params.before,
after: params.after,
created_at: createdAt,
});
const hmac = computeHmac(
HMAC_SECRET,
prevHmac,
canonicalJson,
);
await client.query(
`
INSERT INTO audit_logs (
id,
tenant_id,
seq,
operation,
target,
before,
after,
created_at,
prev_hmac,
hmac
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
`,
[
randomUUID(),
params.tenantId,
String(seq),
params.operation,
JSON.stringify(params.target),
JSON.stringify(params.before),
JSON.stringify(params.after),
createdAt,
prevHmac,
hmac,
],
);
await client.query(
`
UPDATE audit_log_tenant_seq
SET last_seq = $1,
last_hmac = $2
WHERE tenant_id = $3
`,
[String(seq), hmac, params.tenantId],
);
}
この関数は、呼び出し元のトランザクション内で実行します。
監査対象の本体更新と監査ログ INSERT を同じトランザクションに入れると、片方だけ成功する状態を避けられます。
ここでは説明を簡単にするため、target / before / after を JSONB として保存しています。検証時には DB から取り出した値を再度 buildCanonicalJson に通すため、書き込み時と検証時で同じ正規化ルールを使うことが重要です。ネストした JSON や複数言語からの書き込みがある場合は、簡略版の JSON.stringify ではなく、JCS などの canonicalization に寄せます。
created_at はアプリ側で 1 回だけ作る
HMAC に含める値は、DB に保存する値と完全に一致している必要があります。
たとえば、HMAC 計算では Node.js の時刻文字列を使い、DB 保存では now() を使うと、微妙に違う値になります。
HMAC に含めた created_at:
2026-05-25T10:00:00.123Z
DB に保存された created_at:
2026-05-25 10:00:00.123456+00
この差があると、後から再計算しても HMAC が一致しません。
そのため、created_at はアプリ側で 1 回だけ作り、HMAC 計算と DB 保存の両方で同じ値を使います。
const createdAt = new Date().toISOString();
created_at を HMAC に含める場合は、書き込み時と検証時で完全に同じ文字列表現に戻せることを確認します。DB やドライバの時刻変換で表現が変わる場合は、HMAC に含める時刻文字列を別カラムで保存する、または canonical JSON に含める時刻表現を固定する設計にします。
HMAC 用 secret は厳重に管理する
HMAC の安全性は、secret が守られていることに依存します。
const HMAC_SECRET = process.env['AUDIT_HMAC_SECRET']!;
この secret が漏れると、攻撃者は改ざん後のデータに対して正しい HMAC を再計算できてしまいます。
そのため、次のような管理が必要です。
AWS Secrets Manager や Vault で管理する
アプリケーションに必要最小限の権限で渡す
ログに出さない
コードにハードコードしない
ローテーション方針を決める
アクセスログを監視する
HMAC チェーンは改ざん検出には有効ですが、secret が漏れれば前提が崩れます。
検証側の実装
検証では、監査ログを seq 順に読み、各行の HMAC を再計算します。
import { timingSafeEqual } from 'node:crypto';
import type { Pool } from 'pg';
export interface VerifyChainResult {
valid: boolean;
checked: number;
errors: string[];
}
export async function verifyAuditChain(
pool: Pool,
tenantId: string,
): Promise<VerifyChainResult> {
const result = await pool.query(
`
SELECT
id,
tenant_id,
seq,
operation,
target,
before,
after,
created_at,
prev_hmac,
hmac
FROM audit_logs
WHERE tenant_id = $1
ORDER BY seq ASC
`,
[tenantId],
);
const errors: string[] = [];
let checked = 0;
let prevHmac: Buffer | null = null;
let prevSeq: bigint | null = null;
for (const row of result.rows) {
const seq = BigInt(row.seq);
if (prevSeq === null) {
if (seq !== 1n) {
errors.push(`first seq is ${seq}, expected 1`);
}
} else if (seq !== prevSeq + 1n) {
errors.push(
`seq gap: expected ${prevSeq + 1n}, got ${seq} (id=${row.id})`,
);
}
if (!bufferNullableEqual(row.prev_hmac, prevHmac)) {
errors.push(`prev_hmac mismatch at seq=${seq}`);
}
const canonicalJson = buildCanonicalJson({
tenant_id: row.tenant_id,
seq: String(seq),
operation: row.operation,
target: row.target,
before: row.before,
after: row.after,
created_at: normalizeCreatedAt(row.created_at),
});
const expectedHmac = computeHmac(
HMAC_SECRET,
prevHmac,
canonicalJson,
);
if (!timingSafeBufferEqual(row.hmac, expectedHmac)) {
errors.push(`hmac mismatch at seq=${seq}`);
}
prevHmac = row.hmac;
prevSeq = seq;
checked++;
}
return {
valid: errors.length === 0,
checked,
errors,
};
}
function timingSafeBufferEqual(a: Buffer, b: Buffer): boolean {
return a.length === b.length && timingSafeEqual(a, b);
}
function bufferNullableEqual(
a: Buffer | null,
b: Buffer | null,
): boolean {
if (a === null || b === null) {
return a === b;
}
return timingSafeBufferEqual(a, b);
}
function normalizeCreatedAt(value: string | Date): string {
return value instanceof Date ? value.toISOString() : value;
}
pg の設定によっては created_at が文字列ではなく Date オブジェクトとして返ることがあります。検証時に toISOString() へそろえているのは、書き込み時と同じ文字列表現で HMAC を再計算するためです。
検証で見るポイントは 3 つです。
seq が連続しているか
prev_hmac が前の行の hmac と一致しているか
現在行の hmac を再計算して一致するか
どれか 1 つでも崩れていれば、改ざんまたはデータ欠損の可能性があります。
timingSafeEqual を使う理由
HMAC の比較には、通常の文字列比較や Buffer.equals ではなく、timingSafeEqual を使います。
import { timingSafeEqual } from 'node:crypto';
通常の比較では、先頭から順番に見て、不一致が見つかった時点で処理が終わる可能性があります。
攻撃者が処理時間を観測できる場合、どの位置まで一致しているかを推測される可能性があります。
timingSafeEqual は、比較時間が値に依存しにくいように設計されています。
ただし、長さが違う Buffer に対しては例外になるため、先に長さを確認します。
function timingSafeBufferEqual(a: Buffer, b: Buffer): boolean {
return a.length === b.length && timingSafeEqual(a, b);
}
検出できるもの、できないもの
HMAC チェーンは、改ざんを検出する仕組みです。
防止する仕組みではありません。
検出しやすいものは次です。
過去レコードの内容変更
行の削除
行の追加
seq の入れ替え
prev_hmac の不一致
一方で、できないこともあります。
secret が漏れた場合:
攻撃者が正しい HMAC を再計算できる
検証ジョブを実行しない場合:
改ざんされても気づけない
最後尾の行の削除:
DB 内だけを検証すると、短くなったチェーンとして整合して見える場合がある
外部に latest_seq / latest_hmac を保管していないと、末尾の巻き戻しは検出しにくい
最後尾の削除まで厳密に検出したい場合は、定期的に次のような値を外部に保存します。
tenant_id
latest_seq
latest_hmac
verified_at
これを DB の外、たとえば監視基盤や別ストレージに保存しておくと、末尾の巻き戻しも検出しやすくなります。
定期検証とアラート
HMAC チェーンは、検証して初めて意味があります。
そのため、verifyAuditChain を定期実行します。
1 時間ごとに全テナントを検証する
日次で全量検証する
重要テナントだけ高頻度で検証する
検証結果にエラーがあれば、すぐにアラートを出します。
valid = false
errors が 1 件以上
seq gap
hmac mismatch
prev_hmac mismatch
この結果を CloudWatch、Datadog、Grafana などに送れば、改ざん検知を運用に載せられます。
まとめ
シリーズ最終回では、HMAC チェーンで監査ログの改ざんを検出する方法を扱いました。
HMAC チェーンでは、各行の HMAC を計算するときに、直前の行の HMAC も入力に含めます。
seq=1 → hmac1
seq=2 → HMAC(hmac1 + row2)
seq=3 → HMAC(hmac2 + row3)
これにより、過去の行を書き換えると、その行または後続の行で不整合が検出されます。
要点は次のとおりです。
seq:
テナント内の順序を保証する
prev_hmac:
直前の行とのつながりを表す
hmac:
現在行の内容と prev_hmac から計算する
canonical JSON:
HMAC の入力を安定させる
timingSafeEqual:
HMAC 比較を安全に行う
verifyAuditChain:
定期実行して改ざんを検出する
HMAC チェーンは、改ざんを防ぐ仕組みではありません。
改ざんされたことを検出する仕組みです。
そのため、secret の管理、定期検証、アラート運用まで含めて設計します。
シリーズを振り返って
5 回のシリーズで、PostgreSQL を使ったマルチテナント SaaS の基礎を扱いました。
| 回 | テーマ |
|---|---|
| 第 1 回 | RLS でテナント境界を DB レベルで強制する |
| 第 2 回 |
SET LOCAL でテナント ID を安全に注入する |
| 第 3 回 |
SELECT FOR UPDATE でテナント別連番を安全に払い出す |
| 第 4 回 |
IS DISTINCT FROM で NULL 安全に差分判定する |
| 第 5 回 | HMAC チェーンで監査ログの改ざんを検出する |
それぞれは独立した技術ですが、組み合わせると、長期運用しやすい SaaS の土台になります。
RLS:
テナント境界を DB レベルで守る
SET LOCAL:
リクエストごとの tenant_id を安全に渡す
SELECT FOR UPDATE:
テナント別の順序を保証する
IS DISTINCT FROM:
NULL を含む差分判定を安全に書く
HMAC チェーン:
監査ログの改ざんを検出する
新しい API を追加するときに、毎回コードレビューだけで事故を防ぐのは大変です。
テナント境界、連番、差分判定、監査ログの整合性を DB と共通基盤に寄せておくと、安全な既定値を持つ SaaS に近づきます。