元記事
変更点
- Chat.completionの生成パラメータでstreamを有効化
- Responseに含めるbodyをReadableStreamに変更
import OpenAI from "@openai/openai";
const endpoint = Deno.env.get("AZURE_OPENAI_ENDPOINT") ?? "";
const deployment_name = Deno.env.get("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4.1-mini";
const api_key = Deno.env.get("AZURE_OPENAI_API_KEY") ?? "";
const client = new OpenAI({
baseURL: endpoint,
apiKey: api_key,
});
- async function chatWithGPT(): Promise<string> {
+ async function chatWithGPT(): Promise<AsyncIterable<OpenAI.Chat.Completions.ChatCompletionChunk>> {
const userMessage = "おでんのレシピを教えてください。";
const stream = await client.chat.completions.create({
messages: [
{ role: "system", content: "レシピ作成アシスタント" },
{ role: "user", content: userMessage },
],
model: deployment_name,
+ stream: true,
});
- return completion.choices[0].message.content || "";
+ return stream;
}
function handler(): Response {
+ const encoder = new TextEncoder();
+ const body = new ReadableStream({
+ async start(controller) {
+ const stream = await chatWithGPT();
+ for await (const chunk of stream) {
+ const content = chunk.choices[0]?.delta?.content ?? "";
+ if (content) {
+ controller.enqueue(encoder.encode(content));
+ }
+ }
+ controller.close();
+ },
+ });
- const response = new Response(await chatWithGPT(), {
- headers: { "Content-Type": "text/plain; charset=utf-8" },
- });
+ return new Response(body, {
+ headers: {
+ // SSE (Server-Sent Events) 用のヘッダー
+ "Content-Type": "text/event-stream; charset=utf-8",
+ // ブラウザキャッシュを無効化
+ "Cache-Control": "no-cache",
+ // プロキシサーバーによるレスポンスのバッファリングを無効化
+ // 今はプロキシサーバーは使用していないが、将来的に使用する可能性があるため追加
+ "X-Accel-Buffering": "no",
+ },
+ });
}
Deno.serve(handler);
実行結果
Streamにしたので、ポロポロと出力されるのだが、画像では表現できず、残念。

chatWithGPT関数の詳細まとめ
この関数がなぜこの定義で正しく動作するのか。
async function chatWithGPT(): Promise<AsyncIterable<OpenAI.Chat.Completions.ChatCompletionChunk>> {
const userMessage = "おでんのレシピを教えてください。";
const stream = await client.chat.completions.create({
messages: [
{ role: "system", content: "レシピ作成アシスタント" },
{ role: "user", content: userMessage },
],
model: deployment_name,
stream: true,
});
return stream;
}
関数定義での戻り値の型
async function chatWithGPT(): Promise<AsyncIterable<OpenAI.Chat.Completions.ChatCompletionChunk>> {
AsyncIterable<>
非同期イテレータ。
for await...of文と組み合わせることで、Iteratableなコレクションからデータを非同期で取り出す。
OpenAI.Chat.Completions.ChatCompletionChunk
チャットコンプリーションのストリーミングレスポンスの断片データ(チャンク)を表す型。
createメソッドが返すのは、この型のAsyncIterableであり、複数のチャンクが逐次的に追加される。
具体的には、ChatCompletionChunkは以下のような構造を持っており、チャンクデータはChatCompletionChunk.choices[0].delta.contentにある。
{
id: string; // チャットコンプリーションの一意の識別子
object: string; // オブジェクトの種類(例: "chat.completion.chunk")
created: number; // チャットコンプリーションが生成されたタイムスタンプ
model: string; // 使用されたモデルの名前
choices: Array<{
delta: {
content?: string; // チャットコンプリーションのレスポンスの一部(テキストなど)
tool_calls?: Array<{ /* ツール呼び出しの情報 */ }>; // ツール呼び出しの情報(ツールを使用する場合)
};
index: number; // 選択肢のインデックス
finish_reason?: string; // チャットコンプリーションが終了した理由(例: "stop")
}>;
}
stream: falseの場合、戻り値の型は OpenAI.Chat.Completions.ChatCompletionになる。
チャットコンプリーショのレスポンスは一度にすべて送られてくるため、データ構造が異なる。
コンテンツはChatCompletion.choices[0].message.contentにある。
{
id: string; // チャットコンプリーションの一意の識別子
object: string; // オブジェクトの種類(例: "chat.completion")
created: number; // チャットコンプリーションが生成されたタイムスタンプ
model: string; // 使用されたモデルの名前
choices: Array<{
message: {
role: string;
content: string
}; // チャットコンプリーションのレスポンス全体(テキストなど)
index: number; // 選択肢のインデックス
finish_reason?: string; // チャットコンプリーションが終了した理由(例: "stop")
}>;
}
ReadableStream
どうしてもこの処理に馴染めないので何度も繰り返し自分の理解を書き留める。
const encoder = new TextEncoder();
const body = new ReadableStream({
async start(controller) {
const stream = await chatWithGPT();
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content ?? "";
if (content) {
controller.enqueue(encoder.encode(content));
}
}
controller.close();
},
});
HTTPレスポンスボディとしてのReadableStream
GPTが生成したコンテンツをResponseのBodyとして指定し、ブラウザに返す必要があるが、bodyに指定できるのはBodyInitという型であり、これには以下の型のUnion型として定義されている。
stringBlobBufferSourceFormDataURLSearchParamsReadableStream<Uint8Array<ArrayBufferLike>>Iterable<Uint8Array<ArrayBufferLike>>AsyncIterable<Uint8Array<ArrayBufferLike>>
stream: falseの場合、bodyに指定するのはstringのデータ。
stream: trueの場合、ストリームを返す必要があるので、ここではbodyをReadableStream<Uint8Array>として生成している。
ReadableStreamのstartメソッド
ReadableStreamを生成すると、一度だけこのメソッドが実行される。
このメソッドでは、ストリームデータを生成し、キューイングするための処理を実装する。
ReadableStreamのstartメソッドの引数controller
ストリームデータをキューイングするためのパイプ役。
生成したストリームデータをcontroller.queueメソッドでキューイングする。
GPTへプロンプト実行指示chatWithGPT
startメソッド実行時にGPTへプロンプトを指示し、結果をストリームで返すためのレスポンスを取得しておく。これがAyncIterable<OpenAI.Chat.Completions.ChatCompletionsChunk>。
そして非同期ループで逐次処理for await ... of
startメソッド自体はReadableStreamが生成されたタイミングでしか実行されない。
GPTが返してくれたstreamを非同期ループで回しつつ、チャンクを受信したらキューイングするという処理をここで実装する。
startメソッドが処理を返した後も、GPTからのストリームが終了するまで非同期ルールはストリーム受信とキューイングを非同期で繰り返し実行する。
キューイング時のデータ変換 TextEncoder
GPTが返すチャンクはstringであり、ReadableStream<Uint8Array>とするためにstringをUint8Array型に変換する必要がある。
これをやっているのがTextEncoderのencodeメソッド。
ストリームのデータ終了を表す controller.closeメソッド
GPTからのストリームが終了したらcontrollerのキューイングも終了することでストリーミング終了を表すデータがブラウザに送信され、通信が終了する。
Server-side Event (SSE)
GPTからのストリーミングは、SSEという仕組みで動作している。
ブラウザからサーバーへリクエストを送信した後、SSEによって、サーバーからブラウザへ特定の終了条件になるまでデータを継続的に送り続ける仕組み。
SSEの特徴
- サーバーからブラウザへの1方向通信のため軽量
- WebSocketなどと比較してプロトコルが単純で、HTTP/HTTPS上で実現できる
- 接続が途中で切れてもブラウザが自動再接続を試行する
SSEのレスポンスヘッダー
以下のヘッダーが必ず含まれる。
- Content-Type: text/event-stream
- Cache-Control: no-cache
- Connection: keep-alive
SSEのbodyデータ
以下のデータ形式で送信される。
data: お
data: で
data: ん
data: のレシピ...
SSEの通信終了トリガー
controller.closeメソッド実行が通信終了トリガーになる。
、、、これでようやく次のステップに進める。