はじめに
この記事は、TIS アドベンドカレンダー2025 の2日目の記事です。
これから、3回にわたって以下のような、AIチャットで見かけるリアルタイムレスポンスの実装方法の解説をします。
- Go言語でAmazon BedrockのInvokeMoelWithResponseStreamを使ったサーバーサイドの実装
- Go言語で、Ginフレームワークを使ったServer-Sent-Eventsの応答の実装
- Reactで、Server-Sent-Eventsを使ったリアルタイムレスポンスの表示の実装
どんなことをしたいか?
以下の画像のように、AIを使ったチャットアプリケーションで、リアルタイムにレスポンスを表示したい場面がありました。
要素技術的には、
- リアルタイムレスポンスを返すAPIとしては、Amazon BedrockのInvokeModelWithResponseStreamを使い、
- Server-Sent-Eventsという技術を利用すればできそうでしたのでサンプルを作りました
今回はサーバー側のAmazon Bedrockを呼び出す部分の紹介です。
Amazon Bedrockとは?
Amazonが提供している様々なAIの大規模言語モデル(LLM)を使えるマネージドサービスです。
APIは大きく分けて、抽象度の低いInvokeModel系と抽象度の高いConverse系の2系統があります。
InvokeModelは対象のLLMの生のAPIに近い仕様で使えるAPI で、Converseはモデル名などもパラメータ化されており、いろいろなLLMで統一して使えるAPIです。
今回紹介するのは、Anthropic社のClaude Sonnet 4.5をInvokeModelWithResponseStreamで使った例です。
InvokeModel
プロンプトを投入してLLMの応答を一回で取得するAPI。基本的なAPIで1回のリクエストで1回のレスポンスをもらいます。
InvokeModelWithResponseStream
プロンプトを投入してLLMのレスポンスを少しずつ小出しでもらうようにしたAPIです。ユーザー体験をよくするために使います。
InvokeModelWithResponseStream のサンプルコード
以下のページで紹介されていますが、Claude Sonnet 2.0のときのサンプルです。大筋はClaude Sonnet 4.5でも正しいものの、レスポンスのJSONのフォーマットが新しいモデルでは更新されていたため、そこは調査が必要でした。
ストリームは、次のコードのように、BedrockのSDKのInvokeModelWithResponseStreamが返す output 変数に格納されており、for event := range output.GetStream().Events() でループを回すと、少しずつ小出しにレスポンスが表示されます。
func processStreamingOutput(output *bedrockruntime.InvokeModelWithResponseStreamOutput, channel chan ResponseData) error {
for event := range output.GetStream().Events() {
switch v := event.(type) {
case *types.ResponseStreamMemberChunk:
// ここで標準出力する
fmt.Println("data", string(v.Value.Bytes))
// 以下のような値が出力される
// data {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}}
// data {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "Hello"}}
case *types.UnknownUnionMember:
fmt.Println("unknown tag:", v.Tag)
default:
fmt.Println("union is nil or unknown type")
}
}
return nil
}
上記のコードで標準出力される、v.Value.Bytes の形式はeventのタイプに応じて中身が変わるものでした。
data {
"type":"message_start",
"message":{
"model":"claude-sonnet-4-20250514","id":"msg_bdrk_xxxxx",
"type":"message",
"role":"assistant",
"content":[],
"stop_reason":null,
"stop_sequence":null,
"usage":{
"input_tokens":17,
"cache_creation_input_tokens":0,
"cache_read_input_tokens":0,
"output_tokens":7
}
}
}
data {
"type":"content_block_start",
"index":0,
"content_block":{
"type":"text",
"text":""
}
}
data {
"type":"content_block_delta",
"index":0,
"delta":{
"type":"text_delta",
"text":"こんにちは!お"
}
}
data {
"type":"content_block_delta",
"index":0,
"delta":{
"type":"text_delta",
"text":"元気ですか?何"
}
}
data {
"type":"content_block_delta",
"index":0,
"delta":{
"type":"text_delta",
"text":"かお手伝いできること"
}
}
data {
"type":"content_block_delta",
"index":0,
"delta":{
"type":"text_delta",
"text":"があれば、"
}
}
data {
"type":"content_block_delta",
"index":0,
"delta":{
"type":"text_delta",
"text":"お気軽にお声か"
}
}
data {
"type":"content_block_delta",
"index":0,
"delta":{
"type":"text_delta",
"text":"けください。"
}
}
data {
"type":"content_block_stop",
"index":0
}
data {
"type":"message_delta",
"delta":{
"stop_reason":"end_turn",
"stop_sequence":null
},
"usage":{
"output_tokens":41
}
}
data {
"type":"message_stop",
"amazon-bedrock-invocationMetrics":{
"inputTokenCount":17,
"outputTokenCount":41,
"invocationLatency":1713,
"firstByteLatency":1481
}
}
InvokeModelWithResponseStreamで帰ってくるClaude Sonnet 4.5のレスポンス情報はどこ?
InvokeModelWithResponseStreamはClaude Sonnet 4.5のレスポンスをそのまま返しており、調べたところ、以下のページにリファレンスが書かれていました。
主要な箇所を抜粋すると、以下のようになります。
event: message_start
data: {"type": "message_start", "message": {"id": "msg_1nZdL29xx5MUA1yADyHTEsnR8uuvGzszyY", "type": "message", "role": "assistant", "content": [], "model": "claude-sonnet-4-5-20250929", "stop_reason": null, "stop_sequence": null, "usage": {"input_tokens": 25, "output_tokens": 1}}}
event: content_block_start
data: {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}}
event: ping
data: {"type": "ping"}
event: content_block_delta
data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "Hello"}}
event: content_block_delta
data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "!"}}
event: content_block_stop
data: {"type": "content_block_stop", "index": 0}
event: message_delta
data: {"type": "message_delta", "delta": {"stop_reason": "end_turn", "stop_sequence":null}, "usage": {"output_tokens": 15}}
event: message_stop
data: {"type": "message_stop"}
全てのdataの種類に対応させると以下のコードになりました。(この変換はAIに実装させました)
package bedrock
import (
"encoding/json"
)
type TypeOnly struct {
Type string `json:"type"`
}
func GetType(data []byte) (string, error) {
var t TypeOnly
err := json.Unmarshal(data, &t)
return t.Type, err
}
type MessageStartData struct {
Type string `json:"type"`
Message MessageDetail `json:"message"`
}
type MessageDetail struct {
ID string `json:"id"`
Type string `json:"type"`
Role string `json:"role"`
Model string `json:"model"`
StopSequence *string `json:"stop_sequence"`
Usage UsageDetail `json:"usage"`
Content []interface{} `json:"content"` // 内容に応じて型変える
StopReason *string `json:"stop_reason"`
}
type UsageDetail struct {
InputTokens int `json:"input_tokens"`
OutputTokens int `json:"output_tokens"`
}
type ContentBlockStartData struct {
Type string `json:"type"`
Index int `json:"index"`
ContentBlock ContentBlockStart `json:"content_block"`
}
type ContentBlockStart struct {
Type string `json:"type"`
Text string `json:"text,omitempty"` // text型 or tool_use型
ID string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Input map[string]interface{} `json:"input,omitempty"`
}
type ContentBlockDeltaData struct {
Type string `json:"type"`
Index int `json:"index"`
Delta ContentBlockDelta `json:"delta"`
}
type ContentBlockDelta struct {
Type string `json:"type"`
Text string `json:"text,omitempty"` // text_delta
PartialJSON string `json:"partial_json,omitempty"` // input_json_delta
}
type ContentBlockStopData struct {
Type string `json:"type"`
Index int `json:"index"`
}
type MessageDeltaData struct {
Type string `json:"type"`
Delta MessageDeltaDetail `json:"delta"`
Usage UsageDetail `json:"usage"`
}
type MessageDeltaDetail struct {
StopReason string `json:"stop_reason,omitempty"`
StopSequence *string `json:"stop_sequence"`
}
type MessageStopData struct {
Type string `json:"type"`
}
type PingData struct {
Type string `json:"type"`
}
func ParseEventJson(data []byte) (any, error) {
t, err := GetType(data)
if err != nil {
return nil, err
}
switch t {
case "message_start":
var ms MessageStartData
err = json.Unmarshal(data, &ms)
return ms, err
case "content_block_start":
var cbs ContentBlockStartData
err = json.Unmarshal(data, &cbs)
return cbs, err
case "content_block_delta":
var cbd ContentBlockDeltaData
err = json.Unmarshal(data, &cbd)
return cbd, err
case "content_block_stop":
var cbst ContentBlockStopData
err = json.Unmarshal(data, &cbst)
return cbst, err
case "message_delta":
var md MessageDeltaData
err = json.Unmarshal(data, &md)
return md, err
case "message_stop":
var msd MessageStopData
err = json.Unmarshal(data, &msd)
return msd, err
case "ping":
var pd PingData
err = json.Unmarshal(data, &pd)
return pd, err
default:
return nil, nil
}
}
ParseEventJson 関数で、上記のように、typeごとに変わるJSONをパースして、それぞれに対応する構造体へマッピングしています。
さて、構造体へのマッピングができたので、全体の実装を見てみましょう。
InvokeModelWithResponseStreamを呼び出すコード
※.envファイルで環境変数を設定しておきます。
AWS_ACCESS_KEY_ID=your_access_key_id
AWS_SECRET_ACCESS_KEY=your_secret_access_key
AWS_REGION=ap-northeast-1
AWS_CLAUDE_MODEL_ID=jp.anthropic.claude-sonnet-4-5-20250929-v1:0
API_SERVER_CORS_ALLOW_ORIGINS=http://localhost:3000
(1) BedrockのClientを作ります。
func New() (*BedrockClient, error) {
// AWS認証情報とリージョンを自動読み込み
ctx := context.Background()
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
return nil, fmt.Errorf("AWS設定の読み込みに失敗しました。 %v", err)
}
client := bedrockruntime.NewFromConfig(cfg)
return &BedrockClient{
Ctx: ctx,
Client: client,
SystemPrompt: "", // 初期値は空"",
}, nil
}
(2) bedrockruntime.ClientのInvokeModelWithResponseStreamを実行します。
ほぼAWSのサンプルコードと同じです。
// Anthropic Claude requires you to enclose the prompt as follows:
prefix := "Human: "
postfix := "\n\nAssistant:"
prompt = prefix + prompt + postfix
body := map[string]interface{}{
"anthropic_version": "bedrock-2023-05-31",
"system": c.SystemPrompt,
"messages": []map[string]string{
{
"role": "user",
"content": prompt,
},
},
"max_tokens": 8192,
"temperature": 0.3,
}
bodyBytes, err := json.Marshal(body)
if err != nil {
channel <- ResponseData{Message: fmt.Sprintf("JSONのエンコードに失敗しました。 %v", err), Metadata: "error"}
return
}
contentType := "application/json"
output, err := c.Client.InvokeModelWithResponseStream(ctx, &bedrockruntime.InvokeModelWithResponseStreamInput{
Body: bodyBytes,
ModelId: aws.String(modelId),
ContentType: aws.String(contentType),
})
(3) InvokeModelWithResponseStreamが返すoutputを、processStreamingOutput関数で逐次処理していきます。
ここで、先のコードにあった ParseEventJson 関数でtypeごとに変わるJSONをパースします。
ContentBlockDeltaDataというデータの型と、MessageStopDataというデータの型以外は、リアルタイムレスポンスでは不要なメッセージ型でした。
前者は、逐次レスポンスの差分メッセージ(data.Delta.Text)を含んでおり、後者は、APIが終わったことを通知するためのメッセージでした。
err = processStreamingOutput(output, channel)
if err != nil {
channel <- ResponseData{Message: fmt.Sprintf("ストリーミング出力の処理に失敗しました。 %v", err), Metadata: "error"}
}
// 中略
func processStreamingOutput(output *bedrockruntime.InvokeModelWithResponseStreamOutput, channel chan ResponseData) error {
for event := range output.GetStream().Events() {
switch v := event.(type) {
case *types.ResponseStreamMemberChunk:
data, err := ParseEventJson(v.Value.Bytes)
if err != nil {
return err
}
switch data := data.(type) {
case ContentBlockDeltaData:
// fmt.Println(data.Delta.Text)
channel <- ResponseData{Message: data.Delta.Text}
case MessageStopData:
channel <- ResponseData{Message: "", Metadata: "end"}
}
case *types.UnknownUnionMember:
fmt.Println("unknown tag:", v.Tag)
default:
fmt.Println("union is nil or unknown type")
}
}
return nil
}
コードの全量はここを展開
package bedrock
import (
"context"
"encoding/json"
"fmt"
"os"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/bedrockruntime"
"github.com/aws/aws-sdk-go-v2/service/bedrockruntime/types"
)
type ResponseData struct {
Message string `json:"message"`
Metadata string `json:"metadata,omitempty"`
}
func (c *BedrockClient) InvokeModelWithResponseStream(ctx context.Context, prompt string, channel chan ResponseData) {
modelId := os.Getenv("AWS_CLAUDE_MODEL_ID")
if modelId == "" {
channel <- ResponseData{Message: ".envファイルで、 AWS_CLAUDE_MODEL_ID を指定してください", Metadata: "error"}
return
}
// Anthropic Claude requires you to enclose the prompt as follows:
prefix := "Human: "
postfix := "\n\nAssistant:"
prompt = prefix + prompt + postfix
body := map[string]interface{}{
"anthropic_version": "bedrock-2023-05-31",
"system": c.SystemPrompt,
"messages": []map[string]string{
{
"role": "user",
"content": prompt,
},
},
"max_tokens": 8192,
"temperature": 0.3,
}
bodyBytes, err := json.Marshal(body)
if err != nil {
channel <- ResponseData{Message: fmt.Sprintf("JSONのエンコードに失敗しました。 %v", err), Metadata: "error"}
return
}
contentType := "application/json"
output, err := c.Client.InvokeModelWithResponseStream(ctx, &bedrockruntime.InvokeModelWithResponseStreamInput{
Body: bodyBytes,
ModelId: aws.String(modelId),
ContentType: aws.String(contentType),
})
if err != nil {
errMsg := err.Error()
channel <- ResponseData{Message: errMsg, Metadata: "error"}
return
}
err = processStreamingOutput(output, channel)
if err != nil {
channel <- ResponseData{Message: fmt.Sprintf("ストリーミング出力の処理に失敗しました。 %v", err), Metadata: "error"}
}
}
func processStreamingOutput(output *bedrockruntime.InvokeModelWithResponseStreamOutput, channel chan ResponseData) error {
for event := range output.GetStream().Events() {
switch v := event.(type) {
case *types.ResponseStreamMemberChunk:
data, err := ParseEventJson(v.Value.Bytes)
if err != nil {
return err
}
switch data := data.(type) {
case ContentBlockDeltaData:
fmt.Println(data.Delta.Text)
channel <- ResponseData{Message: data.Delta.Text}
case MessageStopData:
channel <- ResponseData{Message: "", Metadata: "end"}
}
case *types.UnknownUnionMember:
fmt.Println("unknown tag:", v.Tag)
default:
fmt.Println("union is nil or unknown type")
}
}
return nil
}
type BedrockClient struct {
Ctx context.Context
Client *bedrockruntime.Client
SystemPrompt string // システムプロンプトの内容
}
func New() (*BedrockClient, error) {
// AWS認証情報とリージョンを自動読み込み
ctx := context.Background()
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
return nil, fmt.Errorf("AWS設定の読み込みに失敗しました。 %v", err)
}
client := bedrockruntime.NewFromConfig(cfg)
return &BedrockClient{
Ctx: ctx,
Client: client,
SystemPrompt: "", // 初期値は空"",
}, nil
}
さいごに
Amazon BedrockのInvokeModelWithResponseStreamの呼び出しと、Claude Sonnet 4.5固有のJSONレスポンスのパース部分を紹介しました。
明日は、メイン関数のほうで、Ginを使ってWeb APIサーバーとして作り、チャットのAPIはGoルーチンを起動して小出しのレスポンスを、チャンネルに渡すことでServer-Sent-Eventsとして返されるよう実装します。
そのあたりは、2回目の投稿をお待ちください。
