はじめに
「トランザクションとは何?」
友人に質問されたことがあります。
「一連の流れを一つのブロックとして処理することだ。
例えば、君が私に1000円送金するとき、次の処理が1つのトランザクションになる。
・君の口座から1000円を引き落とす
・私の口座に1000円入金する大事なのは、どちらかの処理が失敗した場合、全体を取り消してデータを元の状態に戻すことだ。」
と私は教科書通り自信満々に答えました。
「なるほど、だがこういう経験があるんだ。
1000円送金するとき
・私の口座から直ぐに1000円引き落とされる
・翌日、相手の口座に1000円入金される
これもトランザクションなの?」
「⋯⋯」
即答することができませんでした。
この件を切っ掛けにトランザクションついて振り返り、定義から単純パターン、リアルなケース、分散トランザクションに展開する形で記述していきます。
定義
トランザクション(Transaction) とは
コンピュータシステムで処理する、分けることのできない一連の情報処理の単位を意味する。
データベースをある一貫した状態から別の一貫した状態へ変更するアクションを1つに束ねたものである。
一般的にデータベースを指すことが多いと思います。
ACID特性
トランザクションは4つの特性を持っています。
Atomicity(原子性)
一連の処理はすべて成功するか、すべて失敗するかのどちらか。途中で一部だけが実行されることはない。
例: 銀行の送金処理では、送金元の口座から引き落としが成功しない限り、送金先に入金されない。
Consistency(一貫性)
トランザクションが終了した後、データベースは一貫した状態に保たれる。
例: 売上データを登録する際に、売上が負の値になるようなデータは登録されない。
Isolation(分離性)
複数のトランザクションが並行して実行されても、互いに干渉せず結果に影響を与えない。
例: 同時に在庫を更新する処理があっても、データが矛盾しないように制御される。
Durability(永続性)
トランザクションが成功した場合、その結果はシステム障害が発生しても失われない。
例: 注文データが登録された後、サーバーが落ちてもデータが保持される。
要するにデータベースが予期せぬ障害やエラーが発生した場合でも、データの不整合や破損がないことを保証する仕組みです。
銀行の送金処理:単純パターン
例えば、簡単な例を挙げて説明します。AさんからBさんに1000円を送金する場合、次のような処理が1つのトランザクションになります:
- Aさんの口座から1000円を引き落とす
- Bさんの口座に1000円を入金する
ポイント:
- どちらかの処理が失敗した場合、全体を取り消してデータを元の状態に戻す(ロールバック)
- 両方が成功した場合のみ、処理を確定(コミット)
トランザクションを使用したSQLの例
-- トランザクション開始
BEGIN TRANSACTION;
-- Aさんの口座から引き落とす
UPDATE accounts
SET balance = balance - 1000
WHERE account_id = 'A';
-- Bさんの口座に入金する
UPDATE accounts
SET balance = balance + 1000
WHERE account_id = 'B';
-- すべての処理が成功したら確定
COMMIT;
-- エラーが発生したら全ての処理を取り消す
ROLLBACK;
銀行の送金処理:リアルな資金移動
銀行間トランザクションの概要
今回はよりリアルな例を挙げて考えてみましょう。銀行間トランザクションは、送金元銀行(Aさんの銀行)と送金先銀行(Bさんの銀行)の間で行われる資金移動のプロセスです。このプロセスには、以下の主要な段階が含まれます:
- 送金リクエストの発生
- Aさんが送金依頼を行うと、送金元銀行はAさんの口座からの引き落としを処理します
- トランザクションとして、送金リクエストを銀行間決済システムに送信します
- 銀行間決済(清算)
- 各銀行間で資金のやり取りを行うために、中央銀行や決済ネットワークを介した清算が行われます
- 例: 日本では「全銀ネット」、国際送金では「SWIFT」や「RTGS(Real-Time Gross Settlement)」が使用されます
- 送金先銀行の受け取りと処理
- 送金先銀行が清算の結果を受け取り、Bさんの口座に入金を反映します
シミュレーション
送金元銀行、銀行間清算システム、送金先銀行はシステムが独立して設計されているのが普通でしょう。この前提で以下のデモを設計しました。
データベース
-- 顧客アカウント情報
CREATE TABLE accounts (
account_id INT PRIMARY KEY,
name VARCHAR(100),
balance DECIMAL(15, 2),
bank_code VARCHAR(10)
);
-- 送金ログ
CREATE TABLE transfer_logs (
transfer_id INT PRIMARY KEY AUTO_INCREMENT,
sender_id INT,
receiver_bank_code VARCHAR(10),
receiver_account_id INT,
amount DECIMAL(15, 2),
status VARCHAR(20),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 清算トランザクション情報
CREATE TABLE clearing_transactions (
transaction_id INT PRIMARY KEY AUTO_INCREMENT,
sender_bank_code VARCHAR(10),
receiver_bank_code VARCHAR(10),
receiver_account_id INT,
amount DECIMAL(15, 2),
status VARCHAR(20),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 顧客アカウント情報
CREATE TABLE accounts (
account_id INT PRIMARY KEY,
name VARCHAR(100),
balance DECIMAL(15, 2),
bank_code VARCHAR(10)
);
-- 入金ログ
CREATE TABLE transfer_receipts (
receipt_id INT PRIMARY KEY AUTO_INCREMENT,
receiver_id INT,
amount DECIMAL(15, 2),
status VARCHAR(20),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
アプリケーション
import pyodbc
import requests
def get_bank_a_connection():
conn = pyodbc.connect(
"Driver={ODBC Driver 17 for SQL Server};"
"Server=bankA.database.windows.net;"
"Database=BankA;"
"UID=<user>;"
"PWD=<password>;"
)
return conn
def initiate_transfer(sender_id, receiver_bank, receiver_account, amount):
conn = get_bank_a_connection()
cursor = conn.cursor()
try:
# トランザクション開始
cursor.execute("BEGIN TRANSACTION")
# 残高確認
cursor.execute("SELECT balance FROM accounts WHERE account_id = ?", sender_id)
sender_balance = cursor.fetchone()[0]
if sender_balance < amount:
raise ValueError("Insufficient funds.")
# 残高引き落とし
cursor.execute("UPDATE accounts SET balance = balance - ? WHERE account_id = ?", amount, sender_id)
# ログ記録
cursor.execute("""
INSERT INTO transfer_logs (sender_id, receiver_bank, amount, status)
VALUES (?, ?, ?, 'sent')
""", sender_id, receiver_bank, amount)
# トランザクション確定
cursor.execute("COMMIT")
# 銀行間清算システムにリクエスト送信
response = requests.post(
"https://clearing-system.api/transfer",
json={
"sender_bank": "A",
"receiver_bank": receiver_bank,
"receiver_account": receiver_account,
"amount": amount
}
)
if response.status_code != 200:
raise Exception("Failed to communicate with clearing system.")
except Exception as e:
cursor.execute("ROLLBACK")
print("Error:", e)
finally:
cursor.close()
conn.close()
from flask import Flask, request, jsonify
import pyodbc
import requests
app = Flask(__name__)
def get_clearing_db_connection():
conn = pyodbc.connect(
"Driver={ODBC Driver 17 for SQL Server};"
"Server=clearing.database.windows.net;"
"Database=Clearing;"
"UID=<user>;"
"PWD=<password>;"
)
return conn
@app.route("/transfer", methods=["POST"])
def process_transfer():
data = request.json
sender_bank = data["sender_bank"]
receiver_bank = data["receiver_bank"]
receiver_account = data["receiver_account"]
amount = data["amount"]
conn = get_clearing_db_connection()
cursor = conn.cursor()
try:
# トランザクション記録
cursor.execute("""
INSERT INTO clearing_transactions (sender_bank, receiver_bank, amount, status)
VALUES (?, ?, ?, 'pending')
""", sender_bank, receiver_bank, amount)
# 清算処理(送金先銀行に通知)
response = requests.post(
f"https://{receiver_bank}.api/receive_transfer",
json={"receiver_account": receiver_account, "amount": amount}
)
if response.status_code == 200:
cursor.execute("UPDATE clearing_transactions SET status = 'completed'")
else:
cursor.execute("UPDATE clearing_transactions SET status = 'failed'")
conn.commit()
return jsonify({"status": "success"}), 200
except Exception as e:
conn.rollback()
print("Error:", e)
return jsonify({"status": "failure", "error": str(e)}), 500
finally:
cursor.close()
conn.close()
if __name__ == "__main__":
app.run(port=5000)
from flask import Flask, request, jsonify
import pyodbc
app = Flask(__name__)
def get_bank_b_connection():
conn = pyodbc.connect(
"Driver={ODBC Driver 17 for SQL Server};"
"Server=bankB.database.windows.net;"
"Database=BankB;"
"UID=<user>;"
"PWD=<password>;"
)
return conn
@app.route("/receive_transfer", methods=["POST"])
def receive_transfer():
data = request.json
receiver_account = data["receiver_account"]
amount = data["amount"]
conn = get_bank_b_connection()
cursor = conn.cursor()
try:
# トランザクション開始
cursor.execute("BEGIN TRANSACTION")
# 入金処理
cursor.execute("UPDATE accounts SET balance = balance + ? WHERE account_id = ?", amount, receiver_account)
# ログ記録
cursor.execute("""
INSERT INTO transfer_receipts (receiver_id, amount, status)
VALUES (?, ?, 'completed')
""", receiver_account, amount)
# トランザクション確定
cursor.execute("COMMIT")
return jsonify({"status": "success"}), 200
except Exception as e:
cursor.execute("ROLLBACK")
print("Error:", e)
return jsonify({"status": "failure", "error": str(e)}), 500
finally:
cursor.close()
conn.close()
if __name__ == "__main__":
app.run(port=5001)
実行フロー
- 送金元銀行から送金指示を実行
- initiate_transfer 関数を呼び出すことで、送金元銀行が処理を開始します
- 送金元銀行のデータベースで残高が更新され、送金ログが記録されます
- 銀行間清算システムにHTTPリクエストが送信されます
- 銀行間清算システムがトランザクションを処理
- 清算トランザクションが記録されます
- 送金先銀行にHTTPリクエストを送信します
- 送金先銀行が入金処理を実行
- 送金先銀行のデータベースで受取人の残高が更新され、入金ログが記録されます
- 成功または失敗が銀行間清算システムに通知されます
- 送金プロセス完了
- 銀行間清算システムが処理結果を記録します
分散トランザクション
現実のソリューションでは単純なトランザクションのみでは対応できないことが確認できたと思います。そのため、新しい仕組みについて考えてみましょう。
分散トランザクション とは複数の独立したデータベースやシステム間で一貫性を持ちながらトランザクションを実行する仕組みを指します。従来型の分散トランザクションは2フェーズコミット(2PC)の仕組みが使用され、準備フェーズ(Prepare Phase)とコミットフェーズ(Commit Phase)で構成されます。
2PCのシミュレーション
class TransactionCoordinator:
def __init__(self):
self.participants = []
def add_participant(self, participant):
self.participants.append(participant)
def execute_transaction(self):
# 準備フェーズ
for participant in self.participants:
if not participant.prepare():
self.rollback()
return False
# コミットフェーズ
for participant in self.participants:
participant.commit()
return True
def rollback(self):
for participant in self.participants:
participant.rollback()
class Participant:
def __init__(self, name):
self.name = name
self.prepared = False
def prepare(self):
print(f"{self.name}: Preparing...")
self.prepared = True # ここで準備が成功するかシミュレート
return self.prepared
def commit(self):
if self.prepared:
print(f"{self.name}: Committing...")
else:
print(f"{self.name}: Cannot commit. Not prepared.")
def rollback(self):
if self.prepared:
print(f"{self.name}: Rolling back...")
else:
print(f"{self.name}: No need to roll back.")
# 分散トランザクションの実行
coordinator = TransactionCoordinator()
coordinator.add_participant(Participant("BankA"))
coordinator.add_participant(Participant("ClearingSystem"))
coordinator.add_participant(Participant("BankB"))
success = coordinator.execute_transaction()
if success:
print("Transaction completed successfully.")
else:
print("Transaction failed.")
1. 従来型の分散トランザクション (2PC: Two-Phase Commit)
2PCは、強い一貫性(Strong Consistency)を求める場面では有効ですが、以下の理由で現代のシステムには不向きな場合も多いです:
- 利点:
- トランザクションの一貫性(ACID特性)が保証される
- データの正確さが求められる金融システムや銀行間取引で有効
- 欠点:
- パフォーマンスが悪い(ネットワーク遅延が増える)
- フェイルオーバー処理が複雑
- 高スループットが求められる分散システムには不向き
- 使用例:
- 銀行のコアシステムや株式取引など、データ整合性が最優先のケース
将来性:
現在も一部のシステムで使用されますが、スケーラビリティの制約から、他のアプローチが好まれる傾向にあります。
2. Sagaパターン
Sagaパターンは、分散トランザクションを複数のローカルトランザクションに分割し、失敗時に補正操作(Compensation)を実行するアプローチです。
- 利点:
- 非同期処理に対応でき、高いスケーラビリティを実現
- 分散トランザクションを管理するためのコーディネータが不要
- Cloud Nativeやマイクロサービスに適している
- 欠点:
- 最終的整合性(Eventual Consistency)を採用するため、一時的なデータ不整合が発生し得る
- 補正操作(Compensation)の設計が複雑になる
- 使用例:
- 電子商取引プラットフォーム(注文・在庫管理・決済)
- マイクロサービスアーキテクチャのシステム
将来性:現代の分散システムやマイクロサービスでは広く採用されています。クラウド環境やスケーラブルな設計を重視する場合に適切です。
3. 最終的整合性(Eventual Consistency)
強い一貫性を放棄し、データの整合性が「最終的」に達成されることを許容するモデルです。
- 利点:
- 高スループットと低遅延を実現
- スケーラブルなシステム設計が可能
- データ整合性の要件が低いケースに最適
- 欠点:
- 一時的な不整合が許容されないシステムに不向き
- 開発者が整合性の問題をアプリケーションレベルで管理する必要がある
- 使用例:
- ソーシャルメディア(投稿の表示順序)
- 分散キャッシュやレプリケーションシステム
将来性:分散データベース(例:DynamoDB, CosmosDB)やクラウドサービスで広く使われるアプローチです。強い一貫性が不要な場合に非常に適しています。
4. Cloud Nativeサービスの活用
- 利点:
- クラウドサービスが管理を代行するため、開発者はビジネスロジックに集中可能
- 高可用性、スケーラビリティが標準提供される
- 欠点:
- Vendor lock-inのリスク
- コスト管理の必要
- 使用例(Azure):
- Azure Service Bus:メッセージベースで分散トランザクションを管理
- Azure Functions + Durable Functions:Sagaパターンを実現するためのサーバーレスサービス
- CosmosDB:最終的整合性または強い一貫性を選べる分散データベース
将来性:Cloud Nativeな設計が主流になる中、クラウドサービスを活用することが今後さらに一般化する。
アプローチの選択
- データの一貫性が最優先(金融など):2PCまたはSagaパターン
- スケーラビリティが重要(Eコマースなど):Sagaパターンまたは最終的整合性
- クラウド利用(Azure):Azure Functions + Durable Functions、AWS Lambda、GCPなど
最後に
今回はトランザクションを切っ掛けにその定義、簡単なシミュレーションから分散トランザクションのアプローチについて考えてみました。トランザクションはエンジニアが最初に理解した基礎概念の1つではないでしょうか。私も分かったつもりでしたが、いざ他人に説明してみることで自分の不足に気づきました。