2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Goで、Amazon Bedrockを呼び出す(InvokeModelWithResponseStream API)

Posted at

はじめに

この記事は、TIS アドベンドカレンダー2025 の2日目の記事です。

これから、3回にわたって以下のような、AIチャットで見かけるリアルタイムレスポンスの実装方法の解説をします。

  1. Go言語でAmazon BedrockのInvokeMoelWithResponseStreamを使ったサーバーサイドの実装
  2. Go言語で、Ginフレームワークを使ったServer-Sent-Eventsの応答の実装
  3. Reactで、Server-Sent-Eventsを使ったリアルタイムレスポンスの表示の実装

どんなことをしたいか?

以下の画像のように、AIを使ったチャットアプリケーションで、リアルタイムにレスポンスを表示したい場面がありました。

sse-demo.gif

要素技術的には、

  • リアルタイムレスポンスを返すAPIとしては、Amazon BedrockのInvokeModelWithResponseStreamを使い、
  • Server-Sent-Eventsという技術を利用すればできそうでしたのでサンプルを作りました

今回はサーバー側のAmazon Bedrockを呼び出す部分の紹介です。

Amazon Bedrockとは?

Amazonが提供している様々なAIの大規模言語モデル(LLM)を使えるマネージドサービスです。

APIは大きく分けて、抽象度の低いInvokeModel系と抽象度の高いConverse系の2系統があります。
InvokeModelは対象のLLMの生のAPIに近い仕様で使えるAPI で、Converseはモデル名などもパラメータ化されており、いろいろなLLMで統一して使えるAPIです。
今回紹介するのは、Anthropic社のClaude Sonnet 4.5をInvokeModelWithResponseStreamで使った例です。

InvokeModel

プロンプトを投入してLLMの応答を一回で取得するAPI。基本的なAPIで1回のリクエストで1回のレスポンスをもらいます。

InvokeModelWithResponseStream

プロンプトを投入してLLMのレスポンスを少しずつ小出しでもらうようにしたAPIです。ユーザー体験をよくするために使います。

InvokeModelWithResponseStream のサンプルコード

以下のページで紹介されていますが、Claude Sonnet 2.0のときのサンプルです。大筋はClaude Sonnet 4.5でも正しいものの、レスポンスのJSONのフォーマットが新しいモデルでは更新されていたため、そこは調査が必要でした。

ストリームは、次のコードのように、BedrockのSDKのInvokeModelWithResponseStreamが返す output 変数に格納されており、for event := range output.GetStream().Events() でループを回すと、少しずつ小出しにレスポンスが表示されます。

func processStreamingOutput(output *bedrockruntime.InvokeModelWithResponseStreamOutput, channel chan ResponseData) error {
	for event := range output.GetStream().Events() {
		switch v := event.(type) {
		case *types.ResponseStreamMemberChunk:
            // ここで標準出力する
            fmt.Println("data", string(v.Value.Bytes))
            // 以下のような値が出力される
            // data {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}}
            // data {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "Hello"}}
		case *types.UnknownUnionMember:
			fmt.Println("unknown tag:", v.Tag)
		default:
			fmt.Println("union is nil or unknown type")
		}
	}
	return nil
}

上記のコードで標準出力される、v.Value.Bytes の形式はeventのタイプに応じて中身が変わるものでした。

改行を入れた出力例
data {
  "type":"message_start",
  "message":{
    "model":"claude-sonnet-4-20250514","id":"msg_bdrk_xxxxx",
    "type":"message",
    "role":"assistant",
    "content":[],
    "stop_reason":null,
    "stop_sequence":null,
    "usage":{
      "input_tokens":17,
      "cache_creation_input_tokens":0,
      "cache_read_input_tokens":0,
      "output_tokens":7
    }
  }
} 
data {
  "type":"content_block_start",
  "index":0,
  "content_block":{
    "type":"text",
    "text":""
  }
}
data {
  "type":"content_block_delta",
  "index":0,
  "delta":{
    "type":"text_delta",
    "text":"こんにちは!お"
  }
}
data {
  "type":"content_block_delta",
  "index":0,
  "delta":{
    "type":"text_delta",
    "text":"元気ですか?何"
  }
}
data {
  "type":"content_block_delta",
  "index":0,
  "delta":{
    "type":"text_delta",
    "text":"かお手伝いできること"
  }
}
data {
  "type":"content_block_delta",
  "index":0,
  "delta":{
    "type":"text_delta",
    "text":"があれば、"
  }
}
data {
  "type":"content_block_delta",
  "index":0,
  "delta":{
    "type":"text_delta",
    "text":"お気軽にお声か"
  }
}
data {
  "type":"content_block_delta",
  "index":0,
  "delta":{
    "type":"text_delta",
    "text":"けください。"
  }
}
data {
  "type":"content_block_stop",
  "index":0
}
data {
  "type":"message_delta",
  "delta":{
    "stop_reason":"end_turn",
    "stop_sequence":null
  },
  "usage":{
    "output_tokens":41
  }
}
data {
  "type":"message_stop",
  "amazon-bedrock-invocationMetrics":{
    "inputTokenCount":17,
    "outputTokenCount":41,
    "invocationLatency":1713,
    "firstByteLatency":1481
  }
}  

InvokeModelWithResponseStreamで帰ってくるClaude Sonnet 4.5のレスポンス情報はどこ?

InvokeModelWithResponseStreamはClaude Sonnet 4.5のレスポンスをそのまま返しており、調べたところ、以下のページにリファレンスが書かれていました。

主要な箇所を抜粋すると、以下のようになります。

event: message_start
data: {"type": "message_start", "message": {"id": "msg_1nZdL29xx5MUA1yADyHTEsnR8uuvGzszyY", "type": "message", "role": "assistant", "content": [], "model": "claude-sonnet-4-5-20250929", "stop_reason": null, "stop_sequence": null, "usage": {"input_tokens": 25, "output_tokens": 1}}}

event: content_block_start
data: {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}}

event: ping
data: {"type": "ping"}

event: content_block_delta
data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "Hello"}}

event: content_block_delta
data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "!"}}

event: content_block_stop
data: {"type": "content_block_stop", "index": 0}

event: message_delta
data: {"type": "message_delta", "delta": {"stop_reason": "end_turn", "stop_sequence":null}, "usage": {"output_tokens": 15}}

event: message_stop
data: {"type": "message_stop"}

全てのdataの種類に対応させると以下のコードになりました。(この変換はAIに実装させました)

claude_sonnet_45_types.go
package bedrock

import (
	"encoding/json"
)

type TypeOnly struct {
	Type string `json:"type"`
}

func GetType(data []byte) (string, error) {
	var t TypeOnly
	err := json.Unmarshal(data, &t)
	return t.Type, err
}

type MessageStartData struct {
	Type    string        `json:"type"`
	Message MessageDetail `json:"message"`
}

type MessageDetail struct {
	ID           string        `json:"id"`
	Type         string        `json:"type"`
	Role         string        `json:"role"`
	Model        string        `json:"model"`
	StopSequence *string       `json:"stop_sequence"`
	Usage        UsageDetail   `json:"usage"`
	Content      []interface{} `json:"content"` // 内容に応じて型変える
	StopReason   *string       `json:"stop_reason"`
}

type UsageDetail struct {
	InputTokens  int `json:"input_tokens"`
	OutputTokens int `json:"output_tokens"`
}

type ContentBlockStartData struct {
	Type         string            `json:"type"`
	Index        int               `json:"index"`
	ContentBlock ContentBlockStart `json:"content_block"`
}

type ContentBlockStart struct {
	Type  string                 `json:"type"`
	Text  string                 `json:"text,omitempty"` // text型 or tool_use型
	ID    string                 `json:"id,omitempty"`
	Name  string                 `json:"name,omitempty"`
	Input map[string]interface{} `json:"input,omitempty"`
}

type ContentBlockDeltaData struct {
	Type  string            `json:"type"`
	Index int               `json:"index"`
	Delta ContentBlockDelta `json:"delta"`
}

type ContentBlockDelta struct {
	Type        string `json:"type"`
	Text        string `json:"text,omitempty"`         // text_delta
	PartialJSON string `json:"partial_json,omitempty"` // input_json_delta
}

type ContentBlockStopData struct {
	Type  string `json:"type"`
	Index int    `json:"index"`
}

type MessageDeltaData struct {
	Type  string             `json:"type"`
	Delta MessageDeltaDetail `json:"delta"`
	Usage UsageDetail        `json:"usage"`
}

type MessageDeltaDetail struct {
	StopReason   string  `json:"stop_reason,omitempty"`
	StopSequence *string `json:"stop_sequence"`
}

type MessageStopData struct {
	Type string `json:"type"`
}

type PingData struct {
	Type string `json:"type"`
}

func ParseEventJson(data []byte) (any, error) {
	t, err := GetType(data)
	if err != nil {
		return nil, err
	}
	switch t {
	case "message_start":
		var ms MessageStartData
		err = json.Unmarshal(data, &ms)
		return ms, err
	case "content_block_start":
		var cbs ContentBlockStartData
		err = json.Unmarshal(data, &cbs)
		return cbs, err
	case "content_block_delta":
		var cbd ContentBlockDeltaData
		err = json.Unmarshal(data, &cbd)
		return cbd, err
	case "content_block_stop":
		var cbst ContentBlockStopData
		err = json.Unmarshal(data, &cbst)
		return cbst, err
	case "message_delta":
		var md MessageDeltaData
		err = json.Unmarshal(data, &md)
		return md, err
	case "message_stop":
		var msd MessageStopData
		err = json.Unmarshal(data, &msd)
		return msd, err
	case "ping":
		var pd PingData
		err = json.Unmarshal(data, &pd)
		return pd, err
	default:
		return nil, nil
	}
}

ParseEventJson 関数で、上記のように、typeごとに変わるJSONをパースして、それぞれに対応する構造体へマッピングしています。

さて、構造体へのマッピングができたので、全体の実装を見てみましょう。

InvokeModelWithResponseStreamを呼び出すコード

※.envファイルで環境変数を設定しておきます。

.env
AWS_ACCESS_KEY_ID=your_access_key_id
AWS_SECRET_ACCESS_KEY=your_secret_access_key
AWS_REGION=ap-northeast-1
AWS_CLAUDE_MODEL_ID=jp.anthropic.claude-sonnet-4-5-20250929-v1:0
API_SERVER_CORS_ALLOW_ORIGINS=http://localhost:3000

(1) BedrockのClientを作ります。

  func New() (*BedrockClient, error) {
    // AWS認証情報とリージョンを自動読み込み
    ctx := context.Background()
    cfg, err := config.LoadDefaultConfig(ctx)
    if err != nil {
		return nil, fmt.Errorf("AWS設定の読み込みに失敗しました。 %v", err)
    }
    client := bedrockruntime.NewFromConfig(cfg)
    return &BedrockClient{
  	  Ctx:          ctx,
  	  Client:       client,
  	  SystemPrompt: "", // 初期値は空"",
    }, nil
  }

(2) bedrockruntime.ClientInvokeModelWithResponseStreamを実行します。
ほぼAWSのサンプルコードと同じです。

	// Anthropic Claude requires you to enclose the prompt as follows:
	prefix := "Human: "
	postfix := "\n\nAssistant:"
	prompt = prefix + prompt + postfix

	body := map[string]interface{}{
		"anthropic_version": "bedrock-2023-05-31",
		"system":            c.SystemPrompt,
		"messages": []map[string]string{
			{
				"role":    "user",
				"content": prompt,
			},
		},
		"max_tokens":  8192,
		"temperature": 0.3,
	}
	bodyBytes, err := json.Marshal(body)
	if err != nil {
		channel <- ResponseData{Message: fmt.Sprintf("JSONのエンコードに失敗しました。 %v", err), Metadata: "error"}
		return
	}

    
    contentType := "application/json"
	output, err := c.Client.InvokeModelWithResponseStream(ctx, &bedrockruntime.InvokeModelWithResponseStreamInput{
		Body:        bodyBytes,
		ModelId:     aws.String(modelId),
		ContentType: aws.String(contentType),
	})

(3) InvokeModelWithResponseStreamが返すoutputを、processStreamingOutput関数で逐次処理していきます。

ここで、先のコードにあった ParseEventJson 関数でtypeごとに変わるJSONをパースします。

ContentBlockDeltaDataというデータの型と、MessageStopDataというデータの型以外は、リアルタイムレスポンスでは不要なメッセージ型でした。
前者は、逐次レスポンスの差分メッセージ(data.Delta.Text)を含んでおり、後者は、APIが終わったことを通知するためのメッセージでした。

	err = processStreamingOutput(output, channel)
	if err != nil {
		channel <- ResponseData{Message: fmt.Sprintf("ストリーミング出力の処理に失敗しました。 %v", err), Metadata: "error"}
	}
// 中略

func processStreamingOutput(output *bedrockruntime.InvokeModelWithResponseStreamOutput, channel chan ResponseData) error {
	for event := range output.GetStream().Events() {
		switch v := event.(type) {
		case *types.ResponseStreamMemberChunk:
			data, err := ParseEventJson(v.Value.Bytes)
			if err != nil {
				return err
			}
			switch data := data.(type) {
			case ContentBlockDeltaData:
				// fmt.Println(data.Delta.Text)
				channel <- ResponseData{Message: data.Delta.Text}
			case MessageStopData:
				channel <- ResponseData{Message: "", Metadata: "end"}
			}
		case *types.UnknownUnionMember:
			fmt.Println("unknown tag:", v.Tag)
		default:
			fmt.Println("union is nil or unknown type")
		}
	}
	return nil
}
コードの全量はここを展開
bedrock_client.go
package bedrock

import (
	"context"
	"encoding/json"
	"fmt"
	"os"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/bedrockruntime"
	"github.com/aws/aws-sdk-go-v2/service/bedrockruntime/types"
)

type ResponseData struct {
	Message  string `json:"message"`
	Metadata string `json:"metadata,omitempty"`
}

func (c *BedrockClient) InvokeModelWithResponseStream(ctx context.Context, prompt string, channel chan ResponseData) {

	modelId := os.Getenv("AWS_CLAUDE_MODEL_ID")
	if modelId == "" {
		channel <- ResponseData{Message: ".envファイルで、 AWS_CLAUDE_MODEL_ID を指定してください", Metadata: "error"}
		return
	}

	// Anthropic Claude requires you to enclose the prompt as follows:
	prefix := "Human: "
	postfix := "\n\nAssistant:"
	prompt = prefix + prompt + postfix

	body := map[string]interface{}{
		"anthropic_version": "bedrock-2023-05-31",
		"system":            c.SystemPrompt,
		"messages": []map[string]string{
			{
				"role":    "user",
				"content": prompt,
			},
		},
		"max_tokens":  8192,
		"temperature": 0.3,
	}
	bodyBytes, err := json.Marshal(body)
	if err != nil {
		channel <- ResponseData{Message: fmt.Sprintf("JSONのエンコードに失敗しました。 %v", err), Metadata: "error"}
		return
	}
	contentType := "application/json"
	output, err := c.Client.InvokeModelWithResponseStream(ctx, &bedrockruntime.InvokeModelWithResponseStreamInput{
		Body:        bodyBytes,
		ModelId:     aws.String(modelId),
		ContentType: aws.String(contentType),
	})

	if err != nil {
		errMsg := err.Error()
		channel <- ResponseData{Message: errMsg, Metadata: "error"}
		return
	}

	err = processStreamingOutput(output, channel)
	if err != nil {
		channel <- ResponseData{Message: fmt.Sprintf("ストリーミング出力の処理に失敗しました。 %v", err), Metadata: "error"}
	}
}

func processStreamingOutput(output *bedrockruntime.InvokeModelWithResponseStreamOutput, channel chan ResponseData) error {
	for event := range output.GetStream().Events() {
		switch v := event.(type) {
		case *types.ResponseStreamMemberChunk:
			data, err := ParseEventJson(v.Value.Bytes)
			if err != nil {
				return err
			}
			switch data := data.(type) {
			case ContentBlockDeltaData:
				fmt.Println(data.Delta.Text)
				channel <- ResponseData{Message: data.Delta.Text}
			case MessageStopData:
				channel <- ResponseData{Message: "", Metadata: "end"}
			}
		case *types.UnknownUnionMember:
			fmt.Println("unknown tag:", v.Tag)
		default:
			fmt.Println("union is nil or unknown type")
		}
	}
	return nil
}

type BedrockClient struct {
	Ctx          context.Context
	Client       *bedrockruntime.Client
	SystemPrompt string // システムプロンプトの内容
}

func New() (*BedrockClient, error) {
	// AWS認証情報とリージョンを自動読み込み
	ctx := context.Background()
	cfg, err := config.LoadDefaultConfig(ctx)
	if err != nil {
		return nil, fmt.Errorf("AWS設定の読み込みに失敗しました。 %v", err)
	}
	client := bedrockruntime.NewFromConfig(cfg)
	return &BedrockClient{
		Ctx:          ctx,
		Client:       client,
		SystemPrompt: "", // 初期値は空"",
	}, nil
}

さいごに

Amazon BedrockのInvokeModelWithResponseStreamの呼び出しと、Claude Sonnet 4.5固有のJSONレスポンスのパース部分を紹介しました。

明日は、メイン関数のほうで、Ginを使ってWeb APIサーバーとして作り、チャットのAPIはGoルーチンを起動して小出しのレスポンスを、チャンネルに渡すことでServer-Sent-Eventsとして返されるよう実装します。
そのあたりは、2回目の投稿をお待ちください。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?