シリーズ記事
はじめに
「CLI ツールをプログラムから呼び出したい」これは開発者なら誰もが一度は思うことがあるはずです。
Codex CLI は強力ですが、シェルスクリプトから呼び出すだけでは限界があります。CI/CD パイプラインに統合したり、複雑な条件分岐を実装したり、結果を構造化データとして扱いたい場合、プログラマティックな API が必要になります。
Codex TypeScript SDK は、まさにこのニーズに応えます。
本記事では、型安全で柔軟な SDK を使った API 連携と、実践的な Node.js アプリケーションへの統合方法を詳解します。
⚠️ 注意: SDK は現在開発中(バージョン 0.0.0-dev
)のため、API が変更される可能性があります。
1. なぜ SDK が必要なのか?
1.1 CLI 直接呼び出しの限界
シェルスクリプトでの呼び出し:
#!/bin/bash
# ❌ エラーハンドリングが難しい
codex exec "テストを実行" > output.txt
if [ $? -ne 0 ]; then
echo "Failed"
exit 1
fi
# ❌ 出力のパースが面倒
result=$(cat output.txt | grep "Test passed" | wc -l)
問題点:
- エラーハンドリングが粗雑
- 出力のパースが必要
- 型安全性がゼロ
- ストリーミング対応が困難
- テストが書きにくい
1.2 SDK がもたらす価値
TypeScript SDK を使用:
import { Codex } from "@openai/codex-sdk";
// ✅ 型安全
const codex = new Codex({
apiKey: process.env.OPENAI_API_KEY,
});
// ✅ エラーハンドリング
try {
const thread = codex.startThread();
const result = await thread.run("テストを実行");
// ✅ 構造化されたデータ
console.log("Response:", result.finalResponse);
console.log("Items:", result.items);
} catch (error) {
// ✅ 型付きエラー
console.error("Failed:", error.message);
}
メリット:
✅ 型安全性: TypeScript の恩恵を完全に享受
✅ エラーハンドリング: try/catch で標準的な処理
✅ ストリーミング: リアルタイムイベント受信
✅ テスタビリティ: モックしやすい設計
✅ IDE サポート: 自動補完とドキュメント
2. SDK アーキテクチャ
2.1 レイヤー構造
2.2 主要クラスとファイル
ファイル | 主要な型/クラス | 役割 |
---|---|---|
codex.ts |
Codex |
SDK エントリポイント、スレッド作成 |
thread.ts |
Thread |
会話スレッド管理、run/runStreamed |
exec.ts |
CodexExec |
CLI プロセス実行ラッパー |
events.ts |
ThreadEvent |
イベント型定義(union type) |
items.ts |
ThreadItem |
アイテム型定義(union type) |
turnOptions.ts |
TurnOptions |
ターン実行オプション |
codexOptions.ts |
CodexOptions |
SDK 初期化オプション |
3. 基本的な使い方
3.1 シンプルなクエリ実行
import { Codex } from "@openai/codex-sdk";
async function basicExample() {
const codex = new Codex({
apiKey: process.env.OPENAI_API_KEY!,
});
const thread = codex.startThread();
const result = await thread.run("README.md を要約して");
console.log("=== Final Response ===");
console.log(result.finalResponse);
console.log("\n=== Items ===");
result.items.forEach((item, index) => {
console.log(`${index + 1}. ${item.item_type}`);
if (item.item_type === "assistant_message") {
console.log(` Text: ${item.text.substring(0, 100)}...`);
}
});
}
basicExample();
3.2 ストリーミング実行
リアルタイムでイベントを受信:
async function streamingExample() {
const codex = new Codex({
apiKey: process.env.OPENAI_API_KEY!,
});
const thread = codex.startThread();
const { events } = await thread.runStreamed("テストを実行して");
console.log("=== Streaming Events ===\n");
for await (const event of events) {
switch (event.type) {
case "thread.started":
console.log("🚀 Thread started");
break;
case "turn.started":
console.log("💭 Turn started");
break;
case "item.started":
console.log(`📝 Item started: ${event.item.item_type}`);
break;
case "item.updated":
// ストリーミング中のテキスト更新
if (event.item.item_type === "assistant_message") {
process.stdout.write("."); // プログレス表示
}
break;
case "item.completed":
console.log(`\n✅ Item completed: ${event.item.item_type}`);
if (event.item.item_type === "assistant_message") {
console.log(` ${event.item.text}`);
} else if (event.item.item_type === "command_execution") {
console.log(` Command: ${event.item.command}`);
console.log(` Exit code: ${event.item.exit_code}`);
}
break;
case "turn.completed":
console.log("\n🎉 Turn completed!");
break;
case "turn.failed":
console.error(`❌ Turn failed: ${event.error.message}`);
break;
case "thread.error":
console.error(`⚠️ Thread error: ${event.error.message}`);
break;
}
}
}
streamingExample();
出力例:
=== Streaming Events ===
🚀 Thread started
💭 Turn started
📝 Item started: assistant_message
.........
✅ Item completed: assistant_message
まず、テストを実行します。
📝 Item started: command_execution
✅ Item completed: command_execution
Command: npm test
Exit code: 0
📝 Item started: assistant_message
....
✅ Item completed: assistant_message
全てのテストに合格しました。
🎉 Turn completed!
3.3 オプション指定
async function optionsExample() {
const codex = new Codex({
apiKey: process.env.OPENAI_API_KEY!,
});
const thread = codex.startThread();
// 承認モードを有効化
const result = await thread.run(
"データベースをリセットして",
{
approvalMode: "on-request", // 危険な操作で確認を求める
}
);
// カスタムワーキングディレクトリ
const result2 = await thread.run(
"package.json を更新",
{
workingDirectory: "/path/to/project",
}
);
// サンドボックスポリシー指定
const result3 = await thread.run(
"コードレビューして",
{
sandboxPolicy: "read-only", // 読み取り専用
}
);
}
4. 高度な使用例
4.1 CI/CD パイプライン統合
GitHub Actions でのテスト自動化:
// scripts/ci-test-runner.ts
import { Codex } from "@openai/codex-sdk";
import * as core from "@actions/core";
async function runCITests() {
const codex = new Codex({
apiKey: process.env.OPENAI_API_KEY!,
});
const thread = codex.startThread();
try {
console.log("🔍 Running automated tests...");
const { events } = await thread.runStreamed(
"全てのテストを実行して、失敗したテストがあれば詳細を報告してください",
{
workingDirectory: process.env.GITHUB_WORKSPACE,
sandboxPolicy: "workspace-write",
}
);
let testsPassed = false;
let failureDetails = "";
for await (const event of events) {
if (event.type === "item.completed") {
if (event.item.item_type === "command_execution") {
console.log(`📋 Command: ${event.item.command}`);
console.log(` Exit code: ${event.item.exit_code}`);
if (event.item.exit_code !== 0) {
testsPassed = false;
failureDetails += `\nFailed: ${event.item.command}\n`;
failureDetails += event.item.stdout || "";
}
}
if (event.item.item_type === "assistant_message") {
console.log(`💬 ${event.item.text}`);
if (event.item.text.includes("全てのテストに合格")) {
testsPassed = true;
}
}
}
}
if (testsPassed) {
console.log("✅ All tests passed!");
core.setOutput("test-result", "success");
} else {
console.error("❌ Tests failed!");
core.setFailed(failureDetails);
}
} catch (error) {
console.error("Error running tests:", error);
core.setFailed((error as Error).message);
}
}
runCITests();
GitHub Actions ワークフロー:
name: Automated Testing with Codex
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Node.js
uses: actions/setup-node@v3
with:
node-version: '20'
- name: Install dependencies
run: npm install
- name: Install Codex SDK
run: npm install @openai/codex-sdk
- name: Run AI-powered tests
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
run: npx tsx scripts/ci-test-runner.ts
4.2 インタラクティブなコードレビューボット
// scripts/pr-reviewer.ts
import { Codex } from "@openai/codex-sdk";
import { Octokit } from "@octokit/rest";
async function reviewPullRequest(
owner: string,
repo: string,
prNumber: number
) {
const octokit = new Octokit({
auth: process.env.GITHUB_TOKEN,
});
const codex = new Codex({
apiKey: process.env.OPENAI_API_KEY!,
});
// PR の差分を取得
const { data: files } = await octokit.pulls.listFiles({
owner,
repo,
pull_number: prNumber,
});
const thread = codex.startThread();
console.log(`🔍 Reviewing PR #${prNumber}...`);
const { events } = await thread.runStreamed(
`以下のファイルの変更をレビューして、セキュリティ問題、
バグ、コードスタイルの改善点を指摘してください:
${files.map(f => `- ${f.filename} (+${f.additions} -${f.deletions})`).join('\n')}
各ファイルの diff を確認して、具体的な問題点と改善提案を
Markdown 形式でまとめてください。`,
{
sandboxPolicy: "read-only",
}
);
let reviewComments: string[] = [];
for await (const event of events) {
if (event.type === "item.completed") {
if (event.item.item_type === "assistant_message") {
reviewComments.push(event.item.text);
}
}
}
// GitHub にレビューコメントを投稿
const reviewBody = reviewComments.join("\n\n");
await octokit.pulls.createReview({
owner,
repo,
pull_number: prNumber,
body: `## AI Code Review\n\n${reviewBody}`,
event: "COMMENT",
});
console.log("✅ Review posted!");
}
// 実行
const [owner, repo, prNumber] = process.argv.slice(2);
reviewPullRequest(owner, repo, parseInt(prNumber));
4.3 プロジェクト分析ダッシュボード
// scripts/project-analyzer.ts
import { Codex } from "@openai/codex-sdk";
import express from "express";
const app = express();
const codex = new Codex({
apiKey: process.env.OPENAI_API_KEY!,
});
// プロジェクト分析 API
app.get("/api/analyze", async (req, res) => {
const projectPath = req.query.path as string;
if (!projectPath) {
return res.status(400).json({ error: "path parameter required" });
}
const thread = codex.startThread();
const { events } = await thread.runStreamed(
`このプロジェクトを分析して、以下の情報を JSON 形式で返してください:
1. 技術スタック(使用言語、フレームワーク)
2. プロジェクト構造(主要ディレクトリの説明)
3. 依存関係の数と主要な依存関係
4. コードメトリクス(ファイル数、総行数)
5. 潜在的な問題点や改善提案`,
{
workingDirectory: projectPath,
sandboxPolicy: "read-only",
}
);
let analysisResult = "";
for await (const event of events) {
if (event.type === "item.completed") {
if (event.item.item_type === "assistant_message") {
analysisResult += event.item.text;
}
}
}
// JSON を抽出(Markdown コードブロック内を想定)
const jsonMatch = analysisResult.match(/```json\n([\s\S]*?)\n```/);
if (jsonMatch) {
const analysis = JSON.parse(jsonMatch[1]);
res.json(analysis);
} else {
res.json({ raw: analysisResult });
}
});
// WebSocket でリアルタイムストリーミング
import { WebSocketServer } from "ws";
import { createServer } from "http";
const server = createServer(app);
const wss = new WebSocketServer({ server });
wss.on("connection", (ws) => {
console.log("Client connected");
ws.on("message", async (message) => {
const { query, options } = JSON.parse(message.toString());
const thread = codex.startThread();
const { events } = await thread.runStreamed(query, options);
for await (const event of events) {
ws.send(JSON.stringify(event));
}
});
});
server.listen(3000, () => {
console.log("Dashboard running on http://localhost:3000");
});
フロントエンド(React):
// components/ProjectAnalyzer.tsx
import { useState, useEffect } from "react";
export function ProjectAnalyzer() {
const [analysis, setAnalysis] = useState(null);
const [events, setEvents] = useState<any[]>([]);
useEffect(() => {
const ws = new WebSocket("ws://localhost:3000");
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
setEvents(prev => [...prev, data]);
if (data.type === "turn.completed") {
// 分析完了
}
};
ws.onopen = () => {
ws.send(JSON.stringify({
query: "このプロジェクトを分析",
options: {
workingDirectory: process.cwd(),
sandboxPolicy: "read-only",
},
}));
};
return () => ws.close();
}, []);
return (
<div>
<h1>Project Analysis</h1>
<div className="events">
{events.map((event, i) => (
<div key={i} className={`event ${event.type}`}>
{event.type === "item.updated" &&
event.item.item_type === "assistant_message" && (
<p>{event.item.text}</p>
)}
</div>
))}
</div>
{analysis && (
<div className="analysis">
<h2>Analysis Result</h2>
<pre>{JSON.stringify(analysis, null, 2)}</pre>
</div>
)}
</div>
);
}
5. エラーハンドリングとリトライ
5.1 標準的なエラーハンドリング
import { Codex } from "@openai/codex-sdk";
async function robustExecution() {
const codex = new Codex({
apiKey: process.env.OPENAI_API_KEY!,
});
const thread = codex.startThread();
try {
const result = await thread.run("テストを実行");
console.log("Success:", result.finalResponse);
} catch (error: any) {
// エラータイプ別の処理
if (error.code === "ENOENT") {
console.error("Codex CLI not found. Please install it.");
} else if (error.message.includes("API key")) {
console.error("Invalid API key. Check OPENAI_API_KEY.");
} else if (error.message.includes("rate limit")) {
console.error("Rate limited. Retry after some time.");
} else {
console.error("Unknown error:", error.message);
}
}
}
5.2 自動リトライ機能
async function executeWithRetry(
query: string,
maxRetries = 3,
retryDelay = 1000
) {
const codex = new Codex({
apiKey: process.env.OPENAI_API_KEY!,
});
const thread = codex.startThread();
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
console.log(`Attempt ${attempt}/${maxRetries}...`);
const result = await thread.run(query);
console.log("✅ Success!");
return result;
} catch (error: any) {
console.error(`❌ Attempt ${attempt} failed:`, error.message);
if (attempt === maxRetries) {
throw new Error(`Failed after ${maxRetries} attempts: ${error.message}`);
}
// 指数バックオフ
const delay = retryDelay * Math.pow(2, attempt - 1);
console.log(`⏳ Retrying in ${delay}ms...`);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
// 使用例
executeWithRetry("テストを実行", 3, 1000);
5.3 タイムアウト処理
async function executeWithTimeout(
query: string,
timeoutMs = 60000 // 60秒
) {
const codex = new Codex({
apiKey: process.env.OPENAI_API_KEY!,
});
const thread = codex.startThread();
// タイムアウト Promise
const timeoutPromise = new Promise<never>((_, reject) => {
setTimeout(() => reject(new Error("Timeout")), timeoutMs);
});
// 実行 Promise
const executionPromise = thread.run(query);
try {
const result = await Promise.race([
executionPromise,
timeoutPromise,
]);
return result;
} catch (error: any) {
if (error.message === "Timeout") {
console.error(`⏱️ Execution timed out after ${timeoutMs}ms`);
// プロセスをキルする必要がある場合の処理
}
throw error;
}
}
6. テストとモック
6.1 Jest でのユニットテスト
// __tests__/codex-integration.test.ts
import { Codex } from "@openai/codex-sdk";
describe("Codex Integration", () => {
let codex: Codex;
beforeAll(() => {
codex = new Codex({
apiKey: process.env.OPENAI_API_KEY!,
});
});
test("should execute simple query", async () => {
const thread = codex.startThread();
const result = await thread.run("echo 'Hello, World!'");
expect(result.finalResponse).toBeDefined();
expect(result.items.length).toBeGreaterThan(0);
});
test("should handle errors gracefully", async () => {
const thread = codex.startThread();
await expect(async () => {
await thread.run("invalid command that will fail");
}).rejects.toThrow();
});
test("should support streaming", async () => {
const thread = codex.startThread();
const { events } = await thread.runStreamed("echo 'test'");
const eventTypes: string[] = [];
for await (const event of events) {
eventTypes.push(event.type);
}
expect(eventTypes).toContain("thread.started");
expect(eventTypes).toContain("turn.completed");
});
});
6.2 モックを使ったテスト
// __mocks__/@openai/codex-sdk.ts
export class MockThread {
async run(query: string) {
return {
finalResponse: "Mock response",
items: [
{
item_type: "assistant_message",
text: "Mock response",
},
],
};
}
async *runStreamed(query: string) {
yield { type: "thread.started" };
yield {
type: "item.completed",
item: {
item_type: "assistant_message",
text: "Mock response",
},
};
yield { type: "turn.completed" };
}
}
export class Codex {
startThread() {
return new MockThread();
}
}
// __tests__/app.test.ts
jest.mock("@openai/codex-sdk");
import { Codex } from "@openai/codex-sdk";
import { analyzeProject } from "../src/analyzer";
test("should analyze project", async () => {
const result = await analyzeProject("/path/to/project");
expect(result).toBe("Mock response");
});
7. ベストプラクティス
7.1 API キー管理
// ✅ 環境変数から読み取る
const codex = new Codex({
apiKey: process.env.OPENAI_API_KEY!,
});
// ❌ コードにハードコードしない
const codex = new Codex({
apiKey: "sk-...", // 絶対にやらないこと!
});
.env ファイル:
# .env
OPENAI_API_KEY=sk-...
# .gitignore に追加
.env
dotenv を使用:
import "dotenv/config";
import { Codex } from "@openai/codex-sdk";
const codex = new Codex({
apiKey: process.env.OPENAI_API_KEY!,
});
7.2 リソース管理
// ✅ スレッドを再利用
const codex = new Codex({ apiKey: process.env.OPENAI_API_KEY! });
const thread = codex.startThread();
await thread.run("クエリ 1");
await thread.run("クエリ 2"); // 同じスレッドで継続
// ❌ 毎回新しいインスタンスを作成しない
async function badExample(query: string) {
const codex = new Codex({ apiKey: process.env.OPENAI_API_KEY! });
const thread = codex.startThread();
return thread.run(query);
}
7.3 ログとモニタリング
import winston from "winston";
const logger = winston.createLogger({
level: "info",
format: winston.format.json(),
transports: [
new winston.transports.File({ filename: "codex.log" }),
new winston.transports.Console(),
],
});
async function monitoredExecution(query: string) {
const codex = new Codex({
apiKey: process.env.OPENAI_API_KEY!,
});
const thread = codex.startThread();
logger.info("Starting query", { query });
const startTime = Date.now();
try {
const { events } = await thread.runStreamed(query);
for await (const event of events) {
logger.debug("Event received", { type: event.type });
if (event.type === "item.completed") {
logger.info("Item completed", {
itemType: event.item.item_type,
});
}
}
const duration = Date.now() - startTime;
logger.info("Query completed", { duration });
} catch (error: any) {
logger.error("Query failed", {
error: error.message,
duration: Date.now() - startTime,
});
throw error;
}
}
8. まとめ
8.1 SDK の主要機能
✅ 型安全な API: TypeScript による完全な型サポート
✅ 同期/非同期実行: run()
と runStreamed()
✅ 柔軟なオプション: サンドボックス、承認モード、作業ディレクトリ
✅ イベント駆動: リアルタイムストリーミングイベント
✅ エラーハンドリング: try/catch による標準的な処理
8.2 実用的なユースケース
ユースケース | 推奨API | ポイント |
---|---|---|
CI/CD統合 | run() |
エラーコードで判定 |
インタラクティブUI | runStreamed() |
リアルタイム更新 |
コードレビュー |
run() + read-only |
読み取り専用で安全 |
分析ダッシュボード |
runStreamed() + WebSocket |
ライブフィード |
8.3 次のステップ
- SDK のソースコードを読んで内部動作を理解
- 独自の統合アプリケーションを構築
- コミュニティにフィードバックを提供