UE C++でSSE形式のレスポンス受信の実装例が転がっていなかったので作成します。
Google Geminiでの実装例になりますが、本質的なところは他のREST APIでも同じだと思います。
環境
UE5.3
Windows 11
1. SSEとは
詳しいことは他の記事で沢山書かれている為省略しますが、HTTPリクエストをサーバに送信した時にクライアントに対してデータを複数回に分けて送信するような通信のことです。
昔からある技術のようですが、対話AIでの会話文生成等でよく使われるようになったようです。
2. SSE受信の実装コード
-
必要モジュール
実装するモジュールのBuil.csに以下の依存モジュールを追加する必要があります。
Json系はレスポンス受信時のデータ解析に使用します。- HTTP
- Json
- JsonUtilities
典型例としては以下のようになると思います。
// Fill out your copyright notice in the Description page of Project Settings. using UnrealBuildTool; public class UE5_sandbox : ModuleRules { public UE5_sandbox(ReadOnlyTargetRules Target) : base(Target) { PCHUsage = PCHUsageMode.UseExplicitOrSharedPCHs; PublicDependencyModuleNames.AddRange(new string[] { "Core", "CoreUObject", "Engine", "HTTP", "Json", "JsonUtilities" }); } }
-
実装コード
テスト用にActorを継承したMyActorクラスを作成しました。-
ヘッダ
ApiKey
にはGemini AI Studioで取得したAPIキーを指定してください。但し本番環境では、別ファイルに切り出す等セキュリティ保護を行うことを推奨します。MyActor.h// Fill out your copyright notice in the Description page of Project Settings. #pragma once #include "CoreMinimal.h" #include "GameFramework/Actor.h" #include "HttpModule.h" #include "MyActor.generated.h" /// <summary> /// プロンプトに使用する役割 /// </summary> UENUM() enum ERole : uint8 { MODEL = 0, USER, ASSISTANT }; UCLASS(Placeable) class AMyActor : public AActor { GENERATED_BODY() public: // Sets default values for this actor's properties AMyActor(); protected: // Called when the game starts or when spawned virtual void BeginPlay() override; public: bool Setup(); /// <summary> /// メッセージを送信し、応答を取得する /// </summary> void Chat(); /// <summary> /// 会話履歴に追加する /// </summary> /// <param name="Role">役割</param> /// <param name="Message">メッセージ内容</param> void AddHistory(const ERole& InRole, const FString& Message); private: /// <summary> /// HTTPリクエスト結果受信 /// </summary> /// <param name="Request"></param> /// <param name="Response"></param> /// <param name="bConnectedSuccessfully"></param> void OnCompletedResponseReceived(FHttpRequestPtr Request, FHttpResponsePtr Response, bool bConnectedSuccessfully); /// <summary> /// HTTPリクエストをSSE形式での受信処理 /// </summary> /// <param name="Ptr">データ本体</param> /// <param name="Length">データ長</param> bool OnMiddleResponseReceived(void* Ptr, int64 Length); /// <summary> /// SSEフォーマットの行をパースする /// </summary> bool ParseSSELine(const FString& Line, FString& OutText); private: /// <summary> /// APIキー /// </summary> /// TODO: セキュリティ上の理由から、APIキーはコードにハードコーディングせず外部ファイルから取得するよう変更してください。 const FString ApiKey = TEXT("xxxxxxxxxxxxxxxxxxxx"); /// <summary> /// API URL /// </summary> const FString ApiUrl = TEXT("https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:streamGenerateContent"); /// <summary> /// プロンプトファイルのパス /// </summary> const FString PromptFilePath = FPaths::ProjectContentDir() / TEXT("Prompt.md"); /// <summary> /// システムプロンプト /// </summary> TSharedPtr<FJsonObject> SystemInstruction; /// <summary> /// 途中結果収集用バッファ /// </summary> FString ResponseBuffer; /// <summary> /// SSE用Mutex /// </summary> FCriticalSection SseMutex; /// <summary> /// 会話履歴 /// </summary> TArray<TSharedPtr<FJsonObject>> History; };
-
ソース
MyActor.cpp// Fill out your copyright notice in the Description page of Project Settings. #include "MyActor.h" #include "Interfaces/IHttpRequest.h" #include "Interfaces/IHttpResponse.h" #include "Json.h" // Sets default values AMyActor::AMyActor() { // Set this actor to call Tick() every frame. You can turn this off to improve performance if you don't need it. PrimaryActorTick.bCanEverTick = false; } // Called when the game starts or when spawned void AMyActor::BeginPlay() { Super::BeginPlay(); // 初期化処理 if (Setup()) { // チャット開始 実際にはユーザーからの入力を待つなどの処理が必要 AddHistory(ERole::USER, TEXT("自己紹介をしてください。")); Chat(); } } bool AMyActor::Setup() { checkf(!ApiKey.IsEmpty(), TEXT("API key is empty.")); // システムプロンプトの初期化 SystemInstruction = MakeShared<FJsonObject>(); TArray<TSharedPtr<FJsonValue>> Parts; FString Prompt; bool bFileLoaded = FFileHelper::LoadFileToString(Prompt, *PromptFilePath); if (!bFileLoaded) { UE_LOG(LogTemp, Error, TEXT("Failed to load system prompt: %s"), *PromptFilePath); } auto TextObject = MakeShared<FJsonObject>(); TextObject->SetStringField(TEXT("text"), Prompt); Parts.Add(MakeShared<FJsonValueObject>(TextObject)); SystemInstruction->SetArrayField(TEXT("parts"), Parts); return true; } void AMyActor::Chat() { // リクエストデータの作成 auto RequestObj = MakeShared<FJsonObject>(); TArray<TSharedPtr<FJsonValue>> ContentsArray; for (const auto& HistoryItem : History) { ContentsArray.Add(MakeShared<FJsonValueObject>(HistoryItem)); } RequestObj->SetArrayField(TEXT("contents"), ContentsArray); if (SystemInstruction.IsValid()) { RequestObj->SetObjectField(TEXT("system_instruction"), SystemInstruction); } FString RequestBody; auto Writer = TJsonWriterFactory<>::Create(&RequestBody); FJsonSerializer::Serialize(RequestObj, Writer); // HTTP リクエストの作成と送信 auto Request = FHttpModule::Get().CreateRequest(); Request->SetURL(ApiUrl + TEXT("?alt=sse&key=") + ApiKey); Request->SetVerb(TEXT("POST")); Request->SetHeader(TEXT("Content-Type"), TEXT("application/json")); Request->SetContentAsString(RequestBody); ResponseBuffer.Empty(); // レスポンスの全データ受信時 Request->OnProcessRequestComplete().BindUObject(this, &AMyActor::OnCompletedResponseReceived); // SSE形式での受信処理 FHttpRequestStreamDelegate StreamDelegate; StreamDelegate.BindUObject(this, &AMyActor::OnMiddleResponseReceived); Request->SetResponseBodyReceiveStreamDelegate(StreamDelegate); // リクエスト送信 Request->ProcessRequest(); } void AMyActor::AddHistory(const ERole& InRole, const FString& Message) { // メッセージを履歴に追加 auto MessageJson = MakeShared<FJsonObject>(); FString RoleString = StaticEnum<ERole>()->GetNameByValue((int64)InRole).ToString(); MessageJson->SetStringField(TEXT("role"), RoleString); TArray<TSharedPtr<FJsonValue>> Parts; auto TextObject = MakeShared<FJsonObject>(); TextObject->SetStringField(TEXT("text"), Message); Parts.Add(MakeShared<FJsonValueObject>(TextObject)); MessageJson->SetArrayField(TEXT("parts"), Parts); History.Add(MessageJson); } void AMyActor::OnCompletedResponseReceived(FHttpRequestPtr Request, FHttpResponsePtr Response, bool bConnectedSuccessfully) { if (!Response.IsValid() || Response->GetResponseCode() != EHttpResponseCodes::Type::Ok) { ResponseBuffer.Empty(); return; } FString Content = Response->GetContentAsString(); TArray<FString> Lines; Content.ParseIntoArrayLines(Lines); if (!ResponseBuffer.IsEmpty()) { // 履歴に追加 AddHistory(ERole::MODEL, ResponseBuffer); UE_LOG(LogTemp, Log, TEXT("Final response: %s"), *ResponseBuffer); } else { UE_LOG(LogTemp, Warning, TEXT("Final Response is empty.")); } ResponseBuffer.Empty(); } bool AMyActor::OnMiddleResponseReceived(void* Ptr, int64 Length) { FScopeLock Lock(&SseMutex); FString DataStr; FString Content; if (Ptr != nullptr && Length > 0) { // UTF-8としてデータを解釈 const uint8* Data = static_cast<uint8*>(Ptr); DataStr = FString(UTF8_TO_TCHAR(Data)); } bool bResult = ParseSSELine(DataStr, Content); if (bResult) { ResponseBuffer += Content; } UE_LOG(LogTemp, Log, TEXT("Received SSE line: %s"), *DataStr); return true; } bool AMyActor::ParseSSELine(const FString& Line, FString& OutText) { // SSE形式のデータは "data: " で始まる static const FString HeaderPrefix = TEXT("data: "); FString Data = Line.Replace(*HeaderPrefix, TEXT("")); TSharedPtr<FJsonObject> JsonObject; TSharedRef<TJsonReader<>> Reader = TJsonReaderFactory<>::Create(Data); if (FJsonSerializer::Deserialize(Reader, JsonObject)) { const TArray<TSharedPtr<FJsonValue>>* Candidates; if (JsonObject->TryGetArrayField(TEXT("candidates"), Candidates)) { // 最初の候補を取得 auto Candidate = (*Candidates)[0]->AsObject(); auto Content = Candidate->GetObjectField(TEXT("content")); auto Parts = Content->GetArrayField(TEXT("parts")); OutText = Parts[0]->AsObject()->GetStringField(TEXT("text")); UE_LOG(LogTemp, Log, TEXT("Parsing SSE line: \n\t%s\n\t\t->%s"), *Line, *OutText); return true; } } UE_LOG(LogTemp, Error, TEXT("Failed to parse SSE line: %s"), *Line); return false; }
- プロンプト
MyActor.hでPromptFilePath
にて指定しているパスにファイルを作成します。
適宜書き換えれば応答が変わります。Content/Prompt.md20文字程度の短文で返すようにしてください。
-
Output Logの出力結果
レベル上に作成したMyActorを配置してエディタ実行すると以下のような出力が出るようになっていればOKです。
LogTemp: Parsing SSE line:
data: {"candidates": [{"content": {"parts": [{"text": "AI"}],"role": "model"}}],"usageMetadata": {"promptTokenCount": 19,"totalTokenCount": 19,"promptTokensDetails": [{"modality": "TEXT","tokenCount": 19}]},"modelVersion": "gemini-2.0-flash","responseId": "czChaJ3qKMmagbUPt96I0QI"}
->AI
LogTemp: Received SSE line: data: {"candidates": [{"content": {"parts": [{"text": "AI"}],"role": "model"}}],"usageMetadata": {"promptTokenCount": 19,"totalTokenCount": 19,"promptTokensDetails": [{"modality": "TEXT","tokenCount": 19}]},"modelVersion": "gemini-2.0-flash","responseId": "czChaJ3qKMmagbUPt96I0QI"}
LogTemp: Current response buffer: AI
LogTemp: Parsing SSE line:
data: {"candidates": [{"content": {"parts": [{"text": "アシスタントです。\n"}],"role": "model"},"finishReason": "STOP"}],"usageMetadata": {"promptTokenCount": 17,"candidatesTokenCount": 7,"totalTokenCount": 24,"promptTokensDetails": [{"modality": "TEXT","tokenCount": 17}],"candidatesTokensDetails": [{"modality": "TEXT","tokenCount": 7}]},"modelVersion": "gemini-2.0-flash","responseId": "czChaJ3qKMmagbUPt96I0QI"}
->アシスタントです。
LogTemp: Received SSE line: data: {"candidates": [{"content": {"parts": [{"text": "アシスタントです。\n"}],"role": "model"},"finishReason": "STOP"}],"usageMetadata": {"promptTokenCount": 17,"candidatesTokenCount": 7,"totalTokenCount": 24,"promptTokensDetails": [{"modality": "TEXT","tokenCount": 17}],"candidatesTokensDetails": [{"modality": "TEXT","tokenCount": 7}]},"modelVersion": "gemini-2.0-flash","responseId": "czChaJ3qKMmagbUPt96I0QI"}
LogTemp: Current response buffer: AIアシスタントです。
LogTemp: Final response: AIアシスタントです。
3. 解説
SSEに関わるところのみです。
-
SSEレスポンスの指定
ApiUrl
でSSE形式でレスポンスを返すように指定しています。MyActor.h/// <summary> /// API URL /// </summary> const FString ApiUrl = TEXT("https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:streamGenerateContent");
Gemini AI for Developers #ストリーミング レスポンス の
REST
のサンプルを元にしています。 -
途中結果受信処理
MyActor.cpp
の以下の処理です。この記事の肝です。MyActor.cpp// SSE形式での受信処理 FHttpRequestStreamDelegate StreamDelegate; StreamDelegate.BindUObject(this, &AMyActor::OnMiddleResponseReceived); Request->SetResponseBodyReceiveStreamDelegate(StreamDelegate);
IHttpRequest::SetResponseBodyReceiveStreamDelegate
を使用しているだけですが、これ自体の使用例が中々調べても出てこなかった為、エンジンコードを探りました。なお、途中結果受信デリゲートを追加しても最終結果は通常通りうけとれるようです。
MyActor.cpp// レスポンスの全データ受信時 Request->OnProcessRequestComplete().BindUObject(this, &AMyActor::OnCompletedResponseReceived);
- 文字列変換処理
バイト列がそのまま引数に入ってくるだけなので文字列に直してあげてパースをすればOKでした。MyActor.cppif (Ptr != nullptr && Length > 0) { // UTF-8としてデータを解釈 const uint8* Data = static_cast<uint8*>(Ptr); DataStr = FString(UTF8_TO_TCHAR(Data)); }
- パース処理
このフォーマットの仕様が探した限り見つからなかった為、途中結果をログに出力しつつ、パース処理を作成しました。OpenAIのAPIだと最後にDONE
がつくはずですが、Geminiではつかないようです。
Gemini以外のSSE形式データの受信処理を作成する場合は以下のコードを一旦コメントアウトした状態で作成することをお勧めします。MyActor.cppbool AMyActor::ParseSSELine(const FString& Line, FString& OutText) { // SSE形式のデータは "data: " で始まる static const FString HeaderPrefix = TEXT("data: "); FString Data = Line.Replace(*HeaderPrefix, TEXT("")); TSharedPtr<FJsonObject> JsonObject; TSharedRef<TJsonReader<>> Reader = TJsonReaderFactory<>::Create(Data); if (FJsonSerializer::Deserialize(Reader, JsonObject)) { const TArray<TSharedPtr<FJsonValue>>* Candidates; if (JsonObject->TryGetArrayField(TEXT("candidates"), Candidates)) { // 最初の候補を取得 auto Candidate = (*Candidates)[0]->AsObject(); auto Content = Candidate->GetObjectField(TEXT("content")); auto Parts = Content->GetArrayField(TEXT("parts")); OutText = Parts[0]->AsObject()->GetStringField(TEXT("text")); UE_LOG(LogTemp, Log, TEXT("Parsing SSE line: \n\t%s\n\t\t->%s"), *Line, *OutText); return true; } } UE_LOG(LogTemp, Error, TEXT("Failed to parse SSE line: %s"), *Line); return false; }
- 文字列変換処理