はじめに
本記事は「Claude Codeのノウハウをサンプルコードで学ぶ」シリーズの**4本目(最終回)**です。シリーズ全体は以下の4本構成になっています。
- 入門編 — Claude Codeの全体像とAIエージェントの仕組みを体感する
- 中級編 — エージェント設計の考え方を概念レベルで解説する
- ハーネス設計8パターン編 — 設計パターンを動くTypeScriptで再現する
- エージェント実装深掘り編(本記事) — 統合エージェントの実装詳細を読み解く
前記事(ハーネス設計8パターン編)では、「ハーネスパラダイム」「ツールコントラクト」などClaude Codeのハーネス設計を概念レベルで8パターン紹介しました。本記事はその発展版として、同じ設計を持つエージェントを実際にTypeScriptで組み立てる場合に必要な実装詳細を、動くコードを引きながら6テーマで深掘りしていきます。
サンプルコードは、以下のGitHubリポジトリに完全実装があります。機能フラグで各要素を個別にON/OFFできる構成です。本記事のコード断片は、全てそこから直接引用したものになります。
- リポジトリ: nogataka/ts-claude-code-patterns-tutorial
- 対象ディレクトリ: examples/mini-agent
手元で動かしてみたい方は、リポジトリをクローンして npm install 後に後述の npm run mini-agent コマンドでお試しいただけます。
本記事に掲載するコードは、公開情報をもとに設計パターンを再構築したオリジナルの教育用最小実装です。Claude Codeのソースコードそのものではなく、同じ設計思想を理解するために著者が独立に書き起こしたものであり、MITライセンスで自由に改変・利用していただけます。
動作環境
- Node.js v20.18.1以上
- TypeScript 5.x
- Anthropic SDK(Anthropic公式API / Azure Anthropicの両方に対応)
ディレクトリ構成
examples/mini-agent/
├── index.ts # エントリポイント(機能フラグ・SDK接続)
├── client.ts # Anthropic APIクライアント設定(公式 / Azure両対応)
├── loop.ts # 状態マシン型ストリーミングループ
├── streamingExecutor.ts# 並列ツール実行 + 順序保証
├── compact.ts # 多段コンテキスト圧縮
├── promptCache.ts # キャッシュブレイク検出
├── hooks.ts # ライフサイクルフック
├── session.ts # Write-Ahead JSONL永続化
└── tools/ # Read / Grep / Glob / Bash
全体の流れ
6つのテーマが実行時にどう連携するかを、まず図で確認しておきます。
目次
- 状態マシン型ストリーミングループ — ReActとの本質的差分
-
順序保証付き並列ツール実行 —
OrderedDrainの設計 - 多段コンテキスト圧縮 — 安価な圧縮から順に試す
- プロンプトキャッシュブレイク検出 — 静的/動的分離と異常検知
- ライフサイクルフック — 拡張ポイントとしての契約
- Write-Ahead JSONL永続化 — クラッシュしても会話を失わない設計
1. 状態マシン型ストリーミングループ
概要
ReActの素朴な実装は「LLMの応答を待ち切ってからツールを起動し、結果を追加して再度呼び出す」という同期ループです。これだと、応答が長いほどツール実行の開始が遅れ、ユーザー体感のレイテンシが肥大化してしまいます。
Claude Code流のループは、LLMがトークンを流している最中に tool_use ブロックを検出した瞬間、ツール実行を開始します。応答末尾に到達する頃には、先頭のツールはすでに完了していることも珍しくありません。
実装のキーアイデア
| 要素 | 設計判断 |
|---|---|
| ループ本体 |
AsyncGenerator として書き、外側は for await で消費 |
| ストリーミング中のツール |
executor.schedule(block) で即時登録、drainReady() で完了分を回収 |
| ストリーム終了後の残り |
drainAll() でブロックして全件を順序保証付きで回収 |
| 継続判定 |
tool_use があればfollow-up、無ければ completed でreturn |
コード解説: runLoop 本体
loop.ts の核心は、3つのフェーズから成るメインループです。
// loop.ts
export async function* runLoop(
params: LoopParams,
): AsyncGenerator<LoopEvent, StopReason> {
let state: State = { messages: params.initialMessages, turnCount: 1 }
while (true) {
yield { type: 'turn_start', turnCount: state.turnCount }
const executor = new StreamingExecutor()
let needsFollowUp = false
const assistantBlocks: ContentBlock[] = []
const toolResults: ToolResult[] = []
let textBuf = ''
const flushText = () => {
if (textBuf.length > 0) {
assistantBlocks.push({ type: 'text', text: textBuf })
textBuf = ''
}
}
ポイントは textBuf + flushText() です。LLMのストリーミングはテキスト断片をたくさん吐き出しますが、APIにfollow-upメッセージとして返す際は連続したテキストを1つのtextブロックにまとめる必要があります。空textブロックが混ざると messages: text content blocks must be non-empty という400エラーが返ってくるため、「ツール呼び出し直前」「ストリーム終端」で明示的にflushします。
// Phase 1: ストリーミング
for await (const chunk of params.callModel(state.messages)) {
if (chunk.type === 'text') {
if (chunk.text.length === 0) continue
yield { type: 'text_chunk', text: chunk.text }
textBuf += chunk.text
} else if (chunk.type === 'tool_use') {
flushText()
yield { type: 'tool_use', block: chunk.block }
assistantBlocks.push({
type: 'tool_use',
id: chunk.block.id,
name: chunk.block.name,
input: chunk.block.input ?? {},
})
executor.schedule(chunk.block)
needsFollowUp = true
}
// ストリーミング中の完了ツール結果を回収
for await (const result of executor.drainReady()) {
yield { type: 'tool_result', result }
await params.onToolResult?.(result)
toolResults.push(result)
}
}
executor.schedule() は内部でread-onlyなら並列・writeなら直列に振り分けます(詳細は§2で解説します)。drainReady() は「先頭の未完了ツールがあれば何もせず帰る」ジェネレータで、ノンブロッキングに使えます。ここで回収し忘れると、後段のPhase 2だけで全件回収する必要があり、ユーザー体感のレイテンシが増えてしまいます。
flushText()
// Phase 2: 残りのツール結果
for await (const result of executor.drainAll()) {
yield { type: 'tool_result', result }
params.onToolResult?.(result)
toolResults.push(result)
}
// Phase 3: 継続判定
if (!needsFollowUp) {
return 'completed'
}
state.messages.push({ role: 'assistant', content: assistantBlocks })
state.messages.push({
role: 'user',
content: toolResults.map(r => ({
type: 'tool_result' as const,
tool_use_id: r.tool_use_id,
content: r.content,
is_error: r.is_error,
})),
})
state.turnCount++
if (state.turnCount > params.maxTurns) return 'max_turns_reached'
}
}
Phase 2の drainAll() は「未完了を含め登録順にyieldする」ジェネレータで、await task.promise でブロックします。これでリクエスト順=yield順が保証され、APIへのfollow-upメッセージで tool_use と tool_result のID対応が崩れません。
落とし穴1: toolResults へのpushをPhase 1で忘れる
実装中に実際に踏んだバグが、これです。以下のようにPhase 1のdrainでpushしていないと、並列10件のうち「Phase 2時点で残っている1〜2件だけ」しかfollow-upに載らず、Anthropic APIが以下のエラーを返します。
messages.N: `tool_use` ids were found without `tool_result` blocks
immediately after: toolu_XXX, toolu_YYY, ...
drainReady() でストリーミング中に拾った結果も toolResults に積んでおかないと、継続メッセージ組み立てで欠落してしまいます。
落とし穴2: content_block_start 時点での input は空
index.ts 側でSDKのストリームイベントを処理する際、つい content_block_start で tool_use.input を取り出したくなりますが、この時点では常に {} です。実際の入力は input_json_delta イベントで断片的に届き、content_block_stop で完成します。
// index.ts — 正しい組み立て
const toolDrafts = new Map<number, { id: string; name: string; jsonBuf: string }>()
for await (const event of stream as any) {
if (event.type === 'content_block_start' && event.content_block.type === 'tool_use') {
toolDrafts.set(event.index, { id: event.content_block.id, name: event.content_block.name, jsonBuf: '' })
} else if (event.type === 'content_block_delta' && event.delta.type === 'input_json_delta') {
const draft = toolDrafts.get(event.index)
if (draft) draft.jsonBuf += event.delta.partial_json ?? ''
} else if (event.type === 'content_block_stop') {
const draft = toolDrafts.get(event.index)
if (draft) {
const input = draft.jsonBuf.length > 0 ? JSON.parse(draft.jsonBuf) : {}
yield { type: 'tool_use' as const, block: { id: draft.id, name: draft.name, input } }
}
}
}
空入力でツールを起動すると、ファイルパス無しのGlobやReadが連鎖失敗し、ループが maxTurns まで回り続けてしまいます。ストリームイベントの扱いには十分気をつけてみてください。
動作確認
npm run mini-agent -- --dry-run
期待される出力は、以下のようなものです。
--- Turn 1 ---
[tool_use] Read
[tool_result] OK
--- Turn 2 ---
Done.
2. 順序保証付き並列ツール実行
概要
並列実行自体は Promise.all で誰でも書けますが、Anthropic APIには「各 tool_use_id に対応する tool_result を、直後のuserメッセージの先頭から順に並べる」という制約があります(公式ドキュメント)。ID自体で対応は取れるため順序が絶対ではない場面もありますが、欠落があればAPIが400エラーを返します。OrderedDrain は、対応関係を取り違えず全件を揃えるための最小ユーティリティです。
実装のキーアイデア
| 要素 | 設計判断 |
|---|---|
| タスクキュー | 登録順に { id, promise, settled } をpush |
settled 追跡 |
promise.then / promise.catch で両方とも settled = true に |
drainReady |
先頭から settled のものだけyield、未settledで停止 |
drainAll |
先頭を await(未完了ならブロック)してyield |
| 並列/直列 | read-onlyは drain.add に直接追加、writeは直前チェーンに then で連結 |
コード解説: OrderedDrain
// streamingExecutor.ts
export class OrderedDrain<T> {
private tasks: Array<{ id: string; promise: Promise<T>; settled: boolean; value?: T }> = []
private yielded = 0
add(id: string, promise: Promise<T>): void {
const entry = { id, promise, settled: false, value: undefined as T | undefined }
promise.then(
v => { entry.settled = true; entry.value = v },
() => { entry.settled = true }, // エラーもsettled扱い
)
this.tasks.push(entry)
}
async *drainReady(): AsyncGenerator<T> {
while (this.yielded < this.tasks.length) {
const task = this.tasks[this.yielded]
if (!task.settled) return
yield await task.promise
this.yielded++
}
}
async *drainAll(): AsyncGenerator<T> {
while (this.yielded < this.tasks.length) {
yield await this.tasks[this.yielded].promise
this.yielded++
}
}
}
this.yielded はグローバルに進行するインデックスで、drainReady と drainAll の両方で共有される点が重要です。Phase 1でストリーミング中にドレインした分は this.yielded が進んでいるので、Phase 2の drainAll はその続きから処理します。これで同じタスクを2回yieldする事故を防ぎつつ、順序を保つ設計になっています。
コード解説: StreamingExecutor
export class StreamingExecutor {
private drain = new OrderedDrain<ToolResult>()
private pendingWriteChain: Promise<void> = Promise.resolve()
private siblingAborted = false
constructor(private registry: ToolRegistry = tools) {}
schedule(block: ToolUseBlock): void {
const descriptor = this.registry[block.name]
if (!descriptor) {
this.drain.add(block.id, Promise.resolve({
tool_use_id: block.id,
content: `Unknown tool: ${block.name}`,
is_error: true,
}))
return
}
const isReadOnly = descriptor.safety === 'read-only'
if (isReadOnly && !this.pendingWrites()) {
// 並列
this.drain.add(block.id, this.runOne(block, descriptor))
} else {
// 直列チェーン
const chained = this.pendingWriteChain.then(() =>
this.runOne(block, descriptor),
)
this.pendingWriteChain = chained.then(() => {})
this.drain.add(block.id, chained)
}
}
pendingWriteChain は**「直前のwriteが完了するまで待たせる」ためのチェーン**です。read-onlyであっても、直前のツールがwriteなら後続のreadは直列化する、という設計になります(本サンプルでは pendingWrites() が false 固定なので簡略化されていますが、本来はwrite実行中フラグを参照します)。
private async runOne(
block: ToolUseBlock,
descriptor: ToolRegistry[string],
): Promise<ToolResult> {
if (this.siblingAborted) {
return {
tool_use_id: block.id,
content: 'Sibling tool aborted',
is_error: true,
}
}
try {
const out = await descriptor.run(block.input)
return {
tool_use_id: block.id,
content: typeof out === 'string' ? out : JSON.stringify(out),
is_error: false,
}
} catch (err) {
if (descriptor.safety === 'write') {
this.siblingAborted = true // 兄弟キャンセル
}
return {
tool_use_id: block.id,
content: String(err),
is_error: true,
}
}
}
}
writeツール失敗時に siblingAborted を立てるのは、連鎖するBashが中途半端な状態を残す事故を減らすためです。たとえば「DBマイグレーションを流して、次にアプリを再起動する」という連番で前者が失敗したら、後者はスキップしたほうが安全だと判断できます。
落とし穴: read-onlyとwriteの判定はツール側の責務
tools/index.ts で全ツールに safety: 'read-only' | 'write' を明示しています。GrepやGlobはread-onlyですが、Edit や Bash はwriteです。Bash は極端に言えば任意のコマンドを実行できるので、一律write扱いにして直列化するのが無難だと思います。
動作確認
npm run bench
並列版と直列版を両方走らせて速度差を測定します(期待値は約3倍)。
3. 多段コンテキスト圧縮
概要
コンテキスト溢れ(413 Prompt too long)を単一の手法で解決しようとすると、どうしても破壊的な圧縮が必要になってしまいます。代わりに、安価な手法から段階的に試すパイプラインを組むと、情報の損失を最小化できます。
実装のキーアイデア
| 段階 | 手法 | コスト | 消える情報 |
|---|---|---|---|
| 1 | microCompact |
regex + 時間比較 | 60分以上前のread-onlyツール結果 |
| 2 | contextCollapse |
メッセージ前半を固定文字列に置換 | 古い会話の大半 |
| 3 | autoCompact |
LLMによる要約 | 全履歴(サマリーのみ残る) |
コード解説: microCompact
// compact.ts
export const CLEARABLE_TOOLS = new Set(['Read', 'Grep', 'Glob', 'WebFetch'])
export function microCompact(
messages: Message[],
opts: CompactOptions = {},
): Message[] {
const now = opts.now ?? Date.now()
const threshold = 60 * 60 * 1000
return messages.map(msg => {
if (typeof msg.content === 'string') return msg
const mapped = msg.content.map((block: any) => {
if (block.type !== 'tool_result') return block
if (typeof block.timestamp !== 'number') return block
if (now - block.timestamp <= threshold) return block
if (block.toolName && !CLEARABLE_TOOLS.has(block.toolName)) return block
return { ...block, content: '[Old tool result content cleared]' }
})
return { ...msg, content: mapped }
})
}
ポイントはツールの名前で対象を絞ることです。Bash の結果は副作用の有無を判断する材料になる可能性があるので、触りません。Read / Grep などは「同じ内容を再度取りに行けば復元できる」ため、消しても安全と判断できます。
コード解説: CompactPolicy.apply
export class CompactPolicy {
constructor(
private threshold: CompactThreshold = DEFAULT_THRESHOLD,
private summarize?: (m: Message[]) => Promise<string>,
) {}
async apply(messages: Message[]): Promise<Message[]> {
const tokens0 = estimateTokens(messages)
if (tokens0 < this.threshold.microCompact) return messages
messages = microCompact(messages)
const tokens1 = estimateTokens(messages)
if (tokens1 < this.threshold.contextCollapse) return messages
messages = await contextCollapse(messages)
const tokens2 = estimateTokens(messages)
if (tokens2 < this.threshold.autoCompact) return messages
return await autoCompact(messages, { summarize: this.summarize })
}
}
閾値の既定値は50K → 70K → 90Kトークンです。各段で効果測定してから次に進む設計になっているため、microCompact だけで足りればそこで止まります。
落とし穴: トークン見積もりの精度
estimateTokens は JSON.stringify(messages).length / 4 で近似しています。実際のトークン数とは数%ずれますが、圧縮判断には十分です。正確性が必要な場合は tiktoken 等のトークナイザーを併用してみてください。
動作確認
node --test --require ts-node/register test/microCompact.test.ts
60分の境界値テストを含む3ケースが通ります。
4. プロンプトキャッシュブレイク検出
概要
Anthropic APIのプロンプトキャッシュは、明示的に cache_control: { type: 'ephemeral' } を付けたブロックとそれより前のプレフィックスを対象にキャッシュします。公式ドキュメントによれば、cache readは通常入力トークンの 0.1倍(90%削減)、cache writeは5分TTLで1.25倍・1時間TTLで2倍です。問題は、キャッシュが効いているつもりで効いていなかった場合、気付きにくいことです。本機構は cache_read_input_tokens の変化を監視して異常を検知します。
実装のキーアイデア
| 要素 | 設計判断 |
|---|---|
| 境界 | システムプロンプトを DYNAMIC_BOUNDARY で静的/動的に分割 |
| 変更検知 | 静的部分とtoolsのSHA-256を保存、前回と比較 |
| ブレイク判定 |
cache_read_input_tokens が2000以上かつ5%以上ドロップ |
| 原因診断 | ハッシュ差分から system_prompt_changed / tools_changed / unknown
|
コード解説: splitSystemPrompt
// promptCache.ts
export const DYNAMIC_BOUNDARY = '__SYSTEM_PROMPT_DYNAMIC_BOUNDARY__'
export function splitSystemPrompt(prompt: string): {
static: string
dynamic: string
} {
const idx = prompt.indexOf(DYNAMIC_BOUNDARY)
if (idx === -1) return { static: prompt, dynamic: '' }
return {
static: prompt.slice(0, idx),
dynamic: prompt.slice(idx + DYNAMIC_BOUNDARY.length),
}
}
プロンプトを組み立てる側で、以下のようにマーカーを挿入しておきます。
const system = `
You are a helpful agent.
Tool guidelines:
- Prefer Read over Bash cat.
- ...
${DYNAMIC_BOUNDARY}
Current time: ${new Date().toISOString()}
User workspace: ${cwd}
`
静的部分に cache_control: { type: 'ephemeral' } を付けて送信すれば、動的部分がどう変わっても静的側のキャッシュは効き続けます。
コード解説: CacheBreakDetector
export class CacheBreakDetector {
private lastState?: CacheState
private pending?: { systemHash: string; toolsHash: string }
recordRequest(system: string, tools: unknown[]): void {
this.pending = {
systemHash: sha256(splitSystemPrompt(system).static),
toolsHash: sha256(JSON.stringify(tools)),
}
}
checkResponse(usage: {
cache_read_input_tokens: number
}): CacheBreakReport | null {
if (!this.pending) return null
const next: CacheState = {
systemHash: this.pending.systemHash,
toolsHash: this.pending.toolsHash,
cacheReadTokens: usage.cache_read_input_tokens,
}
let report: CacheBreakReport | null = null
if (this.lastState) {
const drop = this.lastState.cacheReadTokens - usage.cache_read_input_tokens
const ratio =
this.lastState.cacheReadTokens > 0
? drop / this.lastState.cacheReadTokens
: 0
if (drop > 2000 && ratio > 0.05) {
const causes: string[] = []
if (this.lastState.systemHash !== next.systemHash)
causes.push('system_prompt_changed')
if (this.lastState.toolsHash !== next.toolsHash)
causes.push('tools_changed')
if (causes.length === 0) causes.push('unknown')
report = { dropped: drop, causes }
}
}
this.lastState = next
this.pending = undefined
return report
}
}
なぜ絶対値と比率の両方を使うか: 小規模リクエストで数百トークン差が出るだけなら正常な揺らぎです。逆に巨大キャッシュの1%が2万トークン分のブレイクなら、絶対値だけでは気付きません。両方をANDで見ることで、誤検知と見逃しを抑える設計にしています。
落とし穴: cache_control の位置と動的要素の扱い
Anthropic APIのキャッシュは「cache_control を付けたブロックの末尾まで」をキャッシュ境界にします。したがって、静的プロンプトの末尾に cache_control: { type: 'ephemeral' } を付けたブロックを置けば、その後ろに現在時刻やリクエストIDを追加しても静的側のキャッシュは壊れません。
一方で、よくある失敗は以下のパターンです。
- 変化する値そのものを含むブロックに
cache_controlを付けてしまう(例: 毎ターン書き換わるTODOリストの末尾にcache breakpointを置く) - 静的と動的を同じブロックに混ぜる(境界が切れない)
対策としては、動的部分は「毎回微変するが、短時間では同一」になる設計にしておくと、動的側にもキャッシュを効かせられる場合があります(例: タイムスタンプを「分単位で丸める」など)。
「現在時刻をミリ秒で入れる」「リクエストIDを入れる」といったパターンは、静的側の cache_control より前に置かれるとキャッシュを壊します。動的要素は必ずキャッシュ境界より後ろに置いてください。
動作確認
node --test --require ts-node/register test/cacheBreak.test.ts
閾値の境界(drop=2000, ratio=0.05)を跨ぐテストが4件通ります。
5. ライフサイクルフック
概要
ハーネス本体に「ロギング」「ユーザー承認」「入力検証」を直接書き込むと、コードが肥大化し再利用性も落ちてしまいます。フックパターンはイベント名に対してハンドラを登録するだけの最小契約で、拡張を外部化する仕組みです。
実装のキーアイデア
| 要素 | 設計判断 |
|---|---|
| イベント |
PreToolUse / PostToolUse / Stop / SessionStart / UserPromptSubmit の5種 |
| ハンドラ署名 |
(input) => Promise<HookOutput> の非同期関数 |
| 出力 |
decision: 'block' | 'approve'、updatedInput、additionalContext など |
| 早期終了 | 1つでも block を返したら残りを呼ばず即return |
| Command型 | 外部プロセス spawn でJSON入出力(シェルスクリプトでも書ける) |
コード解説: HookManager
// hooks.ts
export class HookManager {
private handlers = new Map<HookEvent, HookHandler[]>()
on(event: HookEvent, handler: HookHandler): void {
const list = this.handlers.get(event) ?? []
list.push(handler)
this.handlers.set(event, list)
}
async fire(event: HookEvent, input: unknown): Promise<HookOutput> {
const handlers = this.handlers.get(event) ?? []
let merged: HookOutput = {}
for (const h of handlers) {
const out = await h(input)
if (out.decision === 'block') return out
merged = { ...merged, ...out }
}
return merged
}
}
fire は直列実行になっています。非同期だからといって Promise.all にしてしまうと、先行ハンドラの updatedInput を後続が参照できません。ハンドラは意味的に依存関係を持ちうるので、順序のある実行が必須です。
merged は Object.assign 相当のマージですが、プリミティブ値(systemMessage など)は後勝ちになります。必要に応じて「配列化してマージ」にカスタマイズしてみてください。
コード解説: Command型フック
import { spawn } from 'child_process'
export function commandHook(
command: string,
args: string[] = [],
): HookHandler {
return async (input) => {
const stdin = JSON.stringify(input)
return new Promise<HookOutput>((resolve) => {
try {
const child = spawn(command, args, { timeout: 5000 })
let stdout = ''
child.stdout.on('data', (chunk: Buffer) => {
stdout += chunk.toString('utf8')
})
child.on('error', () => resolve({}))
child.on('close', () => {
try {
resolve(JSON.parse(stdout || '{}') as HookOutput)
} catch {
resolve({})
}
})
child.stdin.write(stdin)
child.stdin.end()
} catch {
resolve({})
}
})
}
}
stdinにJSON投入、stdoutでJSON受信という契約にすることで、ハンドラを任意の言語で書けます。Pythonでもシェルスクリプトでも可能です。Claude Codeの .claude/settings.json で定義するフックもこの形式に近い契約で動きます(Claude Code本体のデフォルトタイムアウトは60秒・コマンドごとに変更可、本サンプルは最小実装として5秒で固定しています)。
使用例: Bashで rm -rf をブロック
const manager = new HookManager()
manager.on('PreToolUse', async (input: any) => {
if (input.name === 'Bash' && /rm\s+-rf\s+\//.test(input.input?.command ?? '')) {
return { decision: 'block', systemMessage: 'rm -rf / is blocked by policy' }
}
return {}
})
const result = await manager.fire('PreToolUse', {
name: 'Bash',
input: { command: 'rm -rf /' },
})
// => { decision: 'block', systemMessage: '...' }
落とし穴: ハンドラが矛盾する出力を返したとき
後勝ちマージのため、登録順が重要です。より厳しい判定をしたいハンドラは先に登録すべきで、逆にデフォルトを埋めるハンドラは最後に置きます。テストでは「登録順と最終結果」を検証するのが効果的です。
動作確認
node --test --require ts-node/register test/hooks.test.ts
6. Write-Ahead JSONL永続化
概要
エージェントが長時間走ると、プロセスクラッシュ・ネットワーク断・ホスト再起動のいずれかで落ちる可能性があります。会話履歴が消えるとユーザー体験上の損失が大きいため、LLM APIを呼ぶ前にディスクへフラッシュしておくのが王道です。
実装のキーアイデア
| 要素 | 設計判断 |
|---|---|
| フォーマット | JSON Lines(1エントリ1行) |
| 書き込み順序 | API呼び出し前に await appendFile
|
| タイムスタンプ | エントリに timestamp: Date.now() を付与 |
| 復元 |
resumeSession(path) で行ごとに JSON.parse
|
| パス規約 | sessions/<sessionId>.jsonl |
コード解説: SessionWriter
// session.ts
export class SessionWriter {
constructor(private filePath: string) {
fs.mkdirSync(path.dirname(filePath), { recursive: true })
}
async append(entry: SessionEntry): Promise<void> {
const withTs = { timestamp: Date.now(), ...entry }
// Write-Ahead: API呼び出し前にOSへの書き込みを完了させる
// 電源断レベルの永続化が必要なら { flush: true } を追加する
await fs.promises.appendFile(
this.filePath,
JSON.stringify(withTs) + '\n',
{ flush: true },
)
}
}
fs.promises.appendFile は内部で open(O_APPEND) → write → close を実行します。O_APPEND フラグによって追記位置がファイル末尾に自動で決まるため、同一プロセス内での逐次追記では行が安全に積まれていきます。
appendFile はOSのページキャッシュまで書き込みを完了させますが、Node.jsの仕様では flush オプションがデフォルト false です。プロセスクラッシュ程度であれば await で十分ですが、電源断レベルの耐性が必要なら { flush: true } を指定して明示的に fsync を走らせます。また、複数プロセスから同一ファイルに並行追記するケースでは、ファイルシステムやカーネルの実装に依存するため、ロックファイルや一意なファイル分割で競合を避けるのが確実です。
メインループ側では、以下の順で使います。
// 擬似コード
await sessionWriter.append({ role: 'user', content: prompt })
const response = await client.messages.stream({ messages: [...] })
// ↑ ここで落ちても、user messageは既にディスクにある
for await (const event of response) { ... }
コード解説: resumeSession
export async function resumeSession(
sessionPath: string,
): Promise<SessionEntry[]> {
const content = await fs.promises.readFile(sessionPath, 'utf-8')
const trimmed = content.trim()
if (!trimmed) return []
return trimmed.split('\n').map((line) => JSON.parse(line) as SessionEntry)
}
シンプルに見えますが、空ファイルや末尾改行だけのファイルが trim().split('\n') で [''] になるのを防ぐため、trimmed の空判定を入れています。この1行が無いと JSON.parse('') が例外で落ちてしまいます。
使用例: エントリポイントでの統合
// index.ts
const session = config.enableSession
? new SessionWriter(defaultSessionPath(String(Date.now())))
: undefined
if (session) {
await session.append({ role: 'user', content: prompt })
}
for await (const event of runLoop({
initialMessages,
maxTurns: config.maxTurns,
callModel,
onToolResult: async (r) => {
await session?.append({ role: 'tool', content: r })
},
})) {
reportEvent(event)
}
onToolResult コールバックで毎ツール結果を都度追記することで、ツール実行時に落ちても直前までの履歴は残るようになります。
ここでひとつ落とし穴があります。runLoop 側が params.onToolResult?.(result) を単に同期呼び出ししてawaitしていないと、session.append() のPromiseが未完了のまま次の処理に進んでしまい、Write-Ahead保証が崩れます。runLoop を自作する際は、コールバックを await params.onToolResult?.(result) の形で待ち受けるように組んでください。
落とし穴1: 壊れたJSONLの復元
プロセスが append 中に落ちると、途中で切れた行が残る可能性があります。プロダクションでは、以下のいずれかを検討してみてください。
- 行単位の
try { JSON.parse } catch { 切り捨て }でスキップ - 末尾行だけ破損しやすいので「最後の1行のみ検証」して不正なら除外
- 書き込みを
tmp → renameのatomic swapにする(追記には向かないので、初回書き込み限定)
本サンプルはシンプルさ優先で上記を省略しています。
落とし穴2: timestamp のスプレッド順
const withTs = { timestamp: Date.now(), ...entry }
この順序は意図的です。呼び出し側が entry.timestamp を指定していれば上書きされる(テストでは実時刻を固定したいので重要)、指定なければデフォルトが使われるという挙動になります。逆順 { ...entry, timestamp: Date.now() } だと呼び出し側が固定できません。
動作確認
node --test --require ts-node/register test/session.test.ts
ラウンドトリップ(書いた順で読める)と空ファイル処理の2テストが通ります。
まとめ
6つのテーマと対応ファイルを一覧にまとめます。
| # | テーマ | 対応ファイル | 要点 |
|---|---|---|---|
| 1 | ストリーミングループ | loop.ts |
AsyncGenerator + flushText + Phase 3段階 |
| 2 | 並列実行 | streamingExecutor.ts |
OrderedDrain の yielded 単一管理 |
| 3 | 多段圧縮 | compact.ts |
閾値3段 × 効果測定で早期return |
| 4 | キャッシュ検出 | promptCache.ts |
ハッシュ比較 + 絶対値/比率AND判定 |
| 5 | フック | hooks.ts |
直列実行・早期block・Command型契約 |
| 6 | 永続化 | session.ts |
Write-Ahead + JSONL + タイムスタンプ |
実行方法
# 実APIで動かす(ANTHROPIC_API_KEY もしくは AZURE_ANTHROPIC_* を設定)
npm run mini-agent -- "examples/mini-agent配下の.tsファイルを一覧して"
# 実APIなしで挙動確認
npm run mini-agent -- --dry-run
# 並列ベンチ
npm run bench
# ユニットテスト
npm test
次の一歩
実装として発展させるなら、以下が良い題材になると思います。
-
リアクティブ圧縮:
runLoop内で413エラーをキャッチしてCompactPolicy.apply()をリトライ前に挟む -
キャッシュコントロール適用:
splitSystemPromptの静的側にcache_control: { type: 'ephemeral' }を付けて、実APIでcache_read_input_tokensを観察する - フック合成: PreToolUseで入力を正規化、PostToolUseでテレメトリ送信、Stopでサマリー生成する3段パイプライン
- セッションフォーク: サブエージェント用に独立JSONLを作り、親に要約だけ返す設計
6つの要素はそれぞれ独立した機能フラグでON/OFFできるので、ご自身のエージェントに必要な部分だけ取り出して組み合わせてみてください。
関連記事
- Claude Code流のハーネスパターン8選を「動くTypeScript」で理解する — 概念編
- Claude Code流出から学ぶハーネスパターン10選 — ハーネス設計の全体像