💡 海外コミュニティでの反響について
先日、Reddit (r/Python) で本作について議論した際、世界のエンジニアたちから最も共感を集めたのが「インメモリ処理におけるOOM対策」でした。どうやらプロセスの突然のクラッシュは世界共通の痛い課題のようです。本記事では、この問題に対する D-MemFS の「ハードクォータ」と「メモリーガード」を用いた防衛策について解説します。
はじめに
Python でインメモリ処理を書いていると、いつかこういった障害に遭遇します。
Killed
あるいは Windows なら何の説明もなくプロセスが消える。OOM(Out of Memory)キルです。io.BytesIO も dict も、メモリが尽きるまで際限なく拡張します。「どこで」「なぜ」落ちたのかすら分からないまま、プロセスが消える——これが Python インメモリ処理の最も厄介な落とし穴の一つです。
この記事では、D-MemFS のハードクォータ設計がこの問題をどう解決するかを、設計の思想レベルから掘り下げます。
問題:BytesIO と dict は無制限に膨らむ
まず問題を明確にしましょう。
from io import BytesIO
buf = BytesIO()
# どれだけ書いても止まらない
# 物理メモリが尽きるまで成功し続ける
for i in range(100_000):
buf.write(b"x" * 10_000)
print(buf.tell()) # 1,000,000,000 — 1 GiB
この write は失敗しません。OS がプロセスをキルするまで、ひたすら成功し続けます。
dict も同様です。
vfs: dict[str, bytes] = {}
for i in range(100_000):
vfs[f"file_{i}.bin"] = b"x" * 10_000
# 1 GiB 積み上がるまでエラーなし
ソフトクォータでは不十分
「書き込み後にサイズをチェックして警告する」というアプローチをソフトクォータと呼びます。しかしこれには根本的な問題があります——データは既に書き込まれているのです。
# ソフトクォータの擬似実装(ダメな例)
MAX_BYTES = 100 * 1024 * 1024 # 100 MiB
total = 0
def soft_write(buf: BytesIO, data: bytes) -> None:
buf.write(data) # ← まず書く
global total
total += len(data)
if total > MAX_BYTES: # ← 書いた後に気づく
raise MemoryError("quota exceeded") # 手遅れ
閾値を超えた瞬間、すでにメモリは消費されています。しかも例外を投げた後、書き込んだデータをロールバックするのは容易ではありません。
D-MemFS のハードクォータ設計
D-MemFS のクォータは中央銀行モデルで動きます。書き込みが実行される前に、クォータ残高を確認し、足りなければ即座に拒否します。
write(data) が呼ばれる
↓
クォータマネージャに len(data) バイトの予約を要求
↓
残高が足りる?
YES → 残高を減らして書き込みを実行
NO → MFSQuotaExceededError を raise(書き込みは発生しない)
データが書き込まれることは一切ありません。ファイルは汚染されず、例外をキャッチして処理を続行できます。
コード例:クォータを実際に使う
基本的なクォータ設定と例外処理
from dmemfs import MemoryFileSystem, MFSQuotaExceededError
# 10 MiB のハードクォータ
mfs = MemoryFileSystem(max_quota=10 * 1024 * 1024)
mfs.mkdir("/data")
def safe_write(mfs: MemoryFileSystem, path: str, data: bytes) -> bool:
"""書き込みに失敗しても処理を続行できる"""
try:
with mfs.open(path, "wb") as f:
f.write(data)
return True
except MFSQuotaExceededError as e:
print(f"[警告] クォータ超過のため {path} の書き込みをスキップ: {e}")
return False
# 成功するケース
safe_write(mfs, "/data/small.bin", b"x" * (1 * 1024 * 1024)) # 1 MiB → OK
# 失敗するケース(クォータを超える)
safe_write(mfs, "/data/big.bin", b"x" * (20 * 1024 * 1024)) # 20 MiB → スキップ
# ファイルは汚染されていない(wb で空ファイルは残るが、データは未書き込み)
st = mfs.stat("/data/big.bin")
print(st["size"]) # 0
クォータの残量を確認しながら処理する
from dmemfs import MemoryFileSystem, MFSQuotaExceededError
QUOTA = 64 * 1024 * 1024 # 64 MiB
mfs = MemoryFileSystem(max_quota=QUOTA)
mfs.mkdir("/chunks")
def process_stream(stream, chunk_size: int = 4 * 1024 * 1024):
"""ストリームをチャンク単位でインメモリに読み込む"""
chunk_index = 0
written_paths = []
for chunk in iter(lambda: stream.read(chunk_size), b""):
path = f"/chunks/chunk_{chunk_index:04d}.bin"
try:
with mfs.open(path, "wb") as f:
f.write(chunk)
written_paths.append(path)
chunk_index += 1
except MFSQuotaExceededError:
print(f"クォータ到達: {chunk_index} チャンクまで保持")
break
return written_paths
ノード数制限:MFSNodeLimitExceededError
ファイル数(ノード数)にも上限を設けられます。ファイル数が爆発するバグを早期に検出するのに役立ちます。
from dmemfs import MemoryFileSystem, MFSNodeLimitExceededError
# 最大 100 ファイル
mfs = MemoryFileSystem(max_nodes=100)
mfs.mkdir("/logs")
for i in range(200):
try:
with mfs.open(f"/logs/entry_{i:04d}.log", "xb") as f:
f.write(f"log entry {i}\n".encode())
except MFSNodeLimitExceededError:
print(f"ノード数上限到達: {i} ファイルで停止")
break
ストレージバックエンド:SequentialMemoryFile と RandomAccessMemoryFile
D-MemFS には 2 種類のストレージバックエンドがあります。
SequentialMemoryFile(シーケンシャル)
内部的にバイト列のチェーン(list[bytes])として実装されています。
- 先頭からの追記・読み取りが高速
- ランダムアクセスは遅い(チャンクをたどる必要がある)
- メモリ効率が良い(アロケーションが少ない)
RandomAccessMemoryFile(ランダムアクセス)
内部的に bytearray として実装されています。
-
seek+read/writeが高速 - 書き込み時にバッファを事前確保するため、シーケンシャル書き込みのみなら無駄が出る場合がある
auto-promotion(自動昇格)
default_storage="auto" (デフォルト)のとき、ファイルへのアクセスパターンを観察して自動的にバックエンドを切り替えます。
ファイル作成 → SequentialMemoryFile として開始
↓
ランダムアクセス(seek)が検出される
↓
RandomAccessMemoryFile に自動昇格
# 明示的にバックエンドを固定することも可能
from dmemfs import MemoryFileSystem
mfs_seq = MemoryFileSystem(default_storage="sequential") # 常にシーケンシャル
mfs_ra = MemoryFileSystem(default_storage="random_access") # 常にランダムアクセス
mfs_auto = MemoryFileSystem(default_storage="auto") # 自動(デフォルト)
promotion_hard_limit:巨大ファイルの昇格を抑制する
自動昇格はランダムアクセス時に bytearray へのコピーが発生します。非常に大きなファイルでこれが起きると、一時的にメモリ使用量が倍になります。
promotion_hard_limit を設定すると、このサイズを超えたファイルは自動昇格しません。
from dmemfs import MemoryFileSystem
# 64 MiB 以上のファイルは自動昇格しない
mfs = MemoryFileSystem(
max_quota=512 * 1024 * 1024,
promotion_hard_limit=64 * 1024 * 1024,
)
大量データを扱うパイプラインで、メモリスパイクを防ぐために重要なパラメータです。エンタープライズ環境のバッチ処理やデータパイプラインにおいて、このパラメータはメモリスパイクを意図的に平滑化するための安全装置として機能します。メモリ使用量の上限を厳密に制御できることは、k8s のメモリリミットや CI のリソース制約と相性が良く、運用の安定性に直結します。
メモリ会計:何がクォータに含まれるか
クォータが追跡するのは、純粋なデータバイト数だけではありません。
クォータ消費 = 実データのバイト数 + チャンクオーバーヘッド
SequentialMemoryFile はチャンク単位でデータを保持するため、チャンクのヘッダ情報がわずかにオーバーヘッドとして加算されます。これにより「クォータ = 10 MiB」と設定したとき、実際のメモリ使用量は確実に 10 MiB 以下に収まります(オーバーヘッドの分だけ実データは少し少なくなる)。
この設計は「クォータを絶対に超えない」という保証を優先しています。
スレッドセーフな原子操作
クォータの更新はロックの下で原子的に行われます。
from dmemfs import MemoryFileSystem
import threading
mfs = MemoryFileSystem(max_quota=10 * 1024 * 1024)
mfs.mkdir("/concurrent")
errors = []
def writer(thread_id: int):
for i in range(50):
try:
path = f"/concurrent/t{thread_id}_f{i}.bin"
with mfs.open(path, "xb") as f:
f.write(b"x" * (100 * 1024)) # 100 KiB each
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=writer, args=(i,)) for i in range(10)]
for t in threads: t.start()
for t in threads: t.join()
# クォータを超えた分は例外になっているが、ファイルシステムは壊れていない
quota_errors = [e for e in errors if "quota" in str(e).lower()]
print(f"クォータ超過: {len(quota_errors)} 回(正常な動作)")
print(f"FS 破損: なし")
「クォータを確認して書き込む」という 2 ステップが分離していると、確認後・書き込み前に別スレッドが割り込んでクォータを使い切るという競合が発生しえます。D-MemFS ではこの確認と予約を単一の RW ロック下で実行することで、この競合を排除しています。
ハードクォータがある世界とない世界
| クォータなし(BytesIO / dict) | D-MemFS ハードクォータ | |
|---|---|---|
| メモリ超過時の挙動 | プロセスが OOM キルされる |
MFSQuotaExceededError が発生 |
| 検出タイミング | OS がキルするまで気づかない | 書き込み前に即座に検出 |
| 例外のキャッチ | 不可能(SIGKILL) |
try/except で回収可能 |
| ロールバック | 不可能 | 書き込みが発生していないので不要 |
| ファイルの整合性 | 破壊される可能性あり | 保証される |
| ログ・監視 | 残らないことが多い | 例外として記録可能 |
クォータの設定指針
どの値に設定するべきか、実践的な指針を挙げます。
import os
import psutil
def recommended_quota() -> int:
"""
利用可能なメモリの一定割合をクォータとして使う例。
本番では固定値のほうが予測しやすい。
"""
available = psutil.virtual_memory().available
return int(available * 0.25) # 利用可能メモリの 25%
# 実際のユースケース別の目安
QUOTAS = {
"unit_test": 32 * 1024 * 1024, # 32 MiB — テスト用
"ci_pipeline": 256 * 1024 * 1024, # 256 MiB — CI パイプライン
"batch_processing": 2 * 1024 * 1024 * 1024, # 2 GiB — バッチ処理
}
基本原則: ワーストケースの入力サイズを見積もり、その 1.5〜2 倍をクォータにする。それがプロセスの総メモリ予算を超えるなら、設計を見直す。
メモリーガード:物理メモリの枯渇を事前に検知する
ハードクォータは「仮想 FS 内の予算」を管理しますが、もう一つの問題があります——クォータの設定値がホストマシンの物理メモリを超えている場合です。
例えば max_quota=4GiB と設定しても、マシンの空きメモリが 2 GiB しかなければ、クォータに到達する前に OS が OOM キルを実行します。ハードクォータだけではこの問題を防げません。
v0.3.0 で導入されたメモリーガードは、この「クォータの外側で起きる OOM」に対処します。
3つのモード
| モード | 動作 |
|---|---|
"none" |
チェックなし(デフォルト、従来互換) |
"init" |
FS 初期化時に max_quota が利用可能メモリを超えていないかチェック |
"per_write" |
書き込みごとに物理メモリ残量をチェック(間隔指定可能) |
from dmemfs import MemoryFileSystem
# 初期化時にメモリ不足を検出(推奨)
mfs = MemoryFileSystem(
max_quota=4 * 1024 * 1024 * 1024, # 4 GiB
memory_guard="init",
memory_guard_action="raise", # "warn" なら ResourceWarning
)
# 書き込みごとにチェック(より厳格な用途に)
mfs = MemoryFileSystem(
max_quota=4 * 1024 * 1024 * 1024,
memory_guard="per_write",
memory_guard_action="warn",
memory_guard_interval=1.0, # チェック間隔(秒)
)
ハードクォータとメモリーガードの関係
住宅に例えると分かりやすいかもしれません。
- ハードクォータ = 部屋の面積。荷物をどれだけ置けるかの制限
- メモリーガード = 建物の耐荷重チェック。そもそも建物がその重さに耐えられるかの確認
両方あって初めて、インメモリ処理の安全性が完結します。
per_write モードの設計上の工夫
"per_write" モードは毎回 OS に物理メモリ残量を問い合わせるため、パフォーマンスへの影響が懸念されます。これに対して、memory_guard_interval パラメータでチェック間隔を制御できるようにしています。デフォルトは 1 秒——前回のチェックから 1 秒経過していなければ、キャッシュした値を使います。
# 高頻度書き込みでも性能を維持しつつ安全性を確保
mfs = MemoryFileSystem(
max_quota=1 * 1024 * 1024 * 1024,
memory_guard="per_write",
memory_guard_action="raise",
memory_guard_interval=2.0, # 2秒ごとにチェック
)
「クォータを絶対に超えない」というハードクォータの保証と、「物理メモリが足りないまま走り続けない」というメモリーガードの保証。この二重の防御が、D-MemFS の OOM 対策の全体像です。
free-threaded Python(GIL=0)での動作
Python 3.13 以降の free-threaded モード(python3.13t)では GIL が存在しないため、スレッド間の競合がより顕在化します。D-MemFS は GIL=0 環境でテスト済み(369テスト × 3OS × 3Pythonバージョン)であり、クォータの原子性は GIL に依存せず明示的なロックで保証されています。
# free-threaded Python でのテスト
python3.13t -c "
from dmemfs import MemoryFileSystem
import threading
mfs = MemoryFileSystem(max_quota=5 * 1024 * 1024)
mfs.mkdir('/test')
def worker(n):
for i in range(100):
try:
with mfs.open(f'/test/w{n}_{i}.bin', 'xb') as f:
f.write(b'x' * 10240)
except Exception:
pass
threads = [threading.Thread(target=worker, args=(i,)) for i in range(20)]
for t in threads: t.start()
for t in threads: t.join()
print('完了(クラッシュなし)')
"
おわりに
OOM はデバッグが非常に難しい障害です。スタックトレースが残らず、どのコードが原因か特定しにくい。ハードクォータは「予算内に収まらない書き込みを最初から拒否する」ことで、この問題を回収可能な例外に変換します。
D-MemFS のクォータ設計は「驚かせない(No Surprises)」という思想に基づいています。メモリ使用量は設定値を絶対に超えず、例外は try/except で処理でき、ファイルシステムの整合性は常に保たれます。
インメモリ処理で OOM 障害を経験したことがある方は、ぜひ試してみてください。
pip install D-MemFS
🛡️ D-MemFS 活用ガイド(連載)
- 第1弾:BytesIO じゃダメな理由 — Python インメモリ FS ライブラリを作った話
- 第2弾:管理者権限不要でWindowsにRAMディスクを構築する ― Python環境のI/O高速化手法
- 第3弾:PythonのOOMキルを完全防御する:BytesIOの罠とD-MemFS「ハードクォータ」の設計思想(本記事)
- 第4弾:PythonでSQLiteの共有インメモリDBが消える仕様と、
deserialize()の落とし穴をbackup()で回避した話
📖 設計の裏側をもっと深く
なぜこのライブラリを作ったのか、その原体験や「バイブコーディング消耗からの脱却」としてのSDD(仕様駆動開発)の実践記録については、Zenn で連載しています。あわせてどうぞ。
GitHub で Star ⭐ をいただけると、開発の大きな励みになります!
(↓以下のリンク先がプロジェクトの本体です)