UE C++でSSE形式のレスポンス受信の見つからなかったので作成します。
Google Geminiでの実装例になりますが、本質的なところは他のREST APIでも同じだと思います。
環境
UE5.3
Windows 11
1. SSEとは
詳しいことは他の記事で多数書かれている為省略しますが、HTTPリクエストをサーバに送信した時、クライアントに対してデータを複数回に分けて送信するような通信のことです。
昔からある技術のようですが、対話AIでの会話文生成等でよく使われるようになったようです。
2. SSE受信の実装コード
2.1 Build.cs
実装するモジュールのBuild.csに以下の依存モジュールを追加する必要があります。
Json系はレスポンス受信時のデータ解析に使用します。
- HTTP
- Json
- JsonUtilities
典型例は以下のようになります。
using UnrealBuildTool;
public class UE5_sandbox : ModuleRules
{
public UE5_sandbox(ReadOnlyTargetRules Target) : base(Target)
{
PCHUsage = PCHUsageMode.UseExplicitOrSharedPCHs;
PublicDependencyModuleNames.AddRange(new string[] {
"Core",
"CoreUObject",
"Engine",
"HTTP",
"Json",
"JsonUtilities"
});
}
}
2.2 ヘッダファイル
テスト用にActorを継承したMyActorクラスを作成しました。
ApiKey
にはGemini AI Studioで取得したAPIキーを指定してください。但し本番環境では、別ファイルに切り出す等セキュリティ保護を行うことを推奨します。
#pragma once
#include "CoreMinimal.h"
#include "GameFramework/Actor.h"
#include "HttpModule.h"
#include "MyActor.generated.h"
/// <summary>
/// プロンプトに使用する役割
/// </summary>
UENUM()
enum class ERole : uint8
{
MODEL = 0,
USER,
ASSISTANT
};
UCLASS(Placeable)
class AMyActor : public AActor
{
GENERATED_BODY()
public:
AMyActor();
protected:
virtual void BeginPlay() override;
public:
bool Setup();
/// <summary>
/// メッセージを送信し、応答を取得する
/// </summary>
void Chat();
/// <summary>
/// 会話履歴に追加する
/// </summary>
/// <param name="Role">役割</param>
/// <param name="Message">メッセージ内容</param>
void AddHistory(const ERole& InRole, const FString& Message);
private:
/// <summary>
/// HTTPリクエスト結果受信
/// </summary>
/// <param name="Request"></param>
/// <param name="Response"></param>
/// <param name="bConnectedSuccessfully"></param>
void OnCompletedResponseReceived(FHttpRequestPtr Request, FHttpResponsePtr Response, bool bConnectedSuccessfully);
/// <summary>
/// HTTPリクエストをSSE形式での受信処理
/// </summary>
/// <param name="Ptr">データ本体</param>
/// <param name="Length">データ長</param>
bool OnMiddleResponseReceived(void* Ptr, int64 Length);
/// <summary>
/// SSEフォーマットの行をパースする
/// </summary>
bool ParseSSELine(const FString& Line, FString& OutText);
private:
/// <summary>
/// APIキー
/// </summary>
/// TODO: セキュリティ上の理由から、APIキーはコードにハードコーディングせず外部ファイルから取得するよう変更してください。
const FString ApiKey = TEXT("xxxxxxxxxxxxxxxxxxxx");
/// <summary>
/// API URL
/// </summary>
const FString ApiUrl = TEXT("https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:streamGenerateContent?alt=sse");
/// <summary>
/// プロンプトファイルのパス
/// </summary>
const FString PromptFilePath = FPaths::ProjectContentDir() / TEXT("Prompt.md");
/// <summary>
/// システムプロンプト
/// </summary>
TSharedPtr<FJsonObject> SystemInstruction;
/// <summary>
/// 途中結果収集用バッファ
/// </summary>
FString ResponseBuffer;
/// <summary>
/// SSE用Mutex
/// </summary>
FCriticalSection SseMutex;
/// <summary>
/// 会話履歴
/// </summary>
TArray<TSharedPtr<FJsonObject>> History;
};
2.3 ソースファイル
#include "MyActor.h"
#include "Interfaces/IHttpRequest.h"
#include "Interfaces/IHttpResponse.h"
#include "Json.h"
AMyActor::AMyActor()
{
PrimaryActorTick.bCanEverTick = false;
}
void AMyActor::BeginPlay()
{
Super::BeginPlay();
// 初期化処理
if (Setup())
{
// チャット開始 実際にはユーザーからの入力を待つなどの処理が必要
AddHistory(ERole::USER, TEXT("自己紹介をしてください。"));
Chat();
}
}
bool AMyActor::Setup()
{
checkf(!ApiKey.IsEmpty(), TEXT("API key is empty."));
// システムプロンプトの初期化
SystemInstruction = MakeShared<FJsonObject>();
TArray<TSharedPtr<FJsonValue>> Parts;
FString Prompt;
bool bFileLoaded = FFileHelper::LoadFileToString(Prompt, *PromptFilePath);
if (!bFileLoaded)
{
UE_LOG(LogTemp, Error, TEXT("Failed to load system prompt: %s"), *PromptFilePath);
}
auto TextObject = MakeShared<FJsonObject>();
TextObject->SetStringField(TEXT("text"), Prompt);
Parts.Add(MakeShared<FJsonValueObject>(TextObject));
SystemInstruction->SetArrayField(TEXT("parts"), Parts);
// 途中結果バッファを初期化
ResponseBuffer.Empty();
return true;
}
void AMyActor::Chat()
{
// リクエストデータの作成
auto RequestObj = MakeShared<FJsonObject>();
// 過去の会話履歴を設定
TArray<TSharedPtr<FJsonValue>> ContentsArray;
for (const auto& HistoryItem : History)
{
ContentsArray.Add(MakeShared<FJsonValueObject>(HistoryItem));
}
RequestObj->SetArrayField(TEXT("contents"), ContentsArray);
if (SystemInstruction.IsValid())
{
RequestObj->SetObjectField(TEXT("system_instruction"), SystemInstruction);
}
// JSON文字列にシリアライズ
FString RequestBody;
auto Writer = TJsonWriterFactory<>::Create(&RequestBody);
FJsonSerializer::Serialize(RequestObj, Writer);
// HTTP リクエストの作成と送信
auto Request = FHttpModule::Get().CreateRequest();
Request->SetURL(ApiUrl + TEXT("&key=") + ApiKey);
Request->SetVerb(TEXT("POST"));
Request->SetHeader(TEXT("Content-Type"), TEXT("application/json"));
Request->SetContentAsString(RequestBody);
// レスポンスの全データ受信時
Request->OnProcessRequestComplete().BindUObject(this, &AMyActor::OnCompletedResponseReceived);
// SSE形式での受信時
FHttpRequestStreamDelegate StreamDelegate;
StreamDelegate.BindUObject(this, &AMyActor::OnMiddleResponseReceived);
Request->SetResponseBodyReceiveStreamDelegate(StreamDelegate);
// リクエスト送信
Request->ProcessRequest();
}
void AMyActor::AddHistory(const ERole& InRole, const FString& Message)
{
// メッセージを履歴に追加
auto MessageJson = MakeShared<FJsonObject>();
FString FullEnumName = StaticEnum<ERole>()->GetNameByValue((int64)InRole).ToString();
FString RoleString = FullEnumName.RightChop(FullEnumName.Find(TEXT("::")) + 2);
MessageJson->SetStringField(TEXT("role"), RoleString);
TArray<TSharedPtr<FJsonValue>> Parts;
auto TextObject = MakeShared<FJsonObject>();
TextObject->SetStringField(TEXT("text"), Message);
Parts.Add(MakeShared<FJsonValueObject>(TextObject));
MessageJson->SetArrayField(TEXT("parts"), Parts);
History.Add(MessageJson);
}
void AMyActor::OnCompletedResponseReceived(FHttpRequestPtr Request, FHttpResponsePtr Response, bool bConnectedSuccessfully)
{
if (!Response.IsValid() || Response->GetResponseCode() != EHttpResponseCodes::Type::Ok)
{
ResponseBuffer.Empty();
return;
}
// 最終的なレスポンスは何も出力されない
FString Content = Response->GetContentAsString();
UE_LOG(LogTemp, Log, TEXT("OnCompletedResponseReceived Content: %s"), *Content);
// 途中結果バッファを履歴に追加
AddHistory(ERole::MODEL, ResponseBuffer);
UE_LOG(LogTemp, Log, TEXT("OnMiddleResponseReceived Total Content: %s"), *ResponseBuffer);
ResponseBuffer.Empty();
}
bool AMyActor::OnMiddleResponseReceived(void* Ptr, int64 Length)
{
FScopeLock Lock(&SseMutex);
FString DataStr;
FString Content;
if (Ptr != nullptr && Length > 0)
{
// 文字列に変換
const uint8* Data = static_cast<uint8*>(Ptr);
DataStr = FString(Length, UTF8_TO_TCHAR(reinterpret_cast<const char*>(Data)));
}
bool bResult = ParseSSELine(DataStr, Content);
if (bResult)
{
ResponseBuffer += Content;
}
UE_LOG(LogTemp, Log, TEXT("Current response buffer: %s"), *ResponseBuffer);
return true;
}
bool AMyActor::ParseSSELine(const FString& Line, FString& OutText)
{
// SSE形式のデータは "data: " で始まる為これを除去してからJSONとしてパース
static const FString HeaderPrefix = TEXT("data: ");
FString Data = Line.Replace(*HeaderPrefix, TEXT(""));
TSharedPtr<FJsonObject> JsonObject;
TSharedRef<TJsonReader<>> Reader = TJsonReaderFactory<>::Create(Data);
bool bParsed = FJsonSerializer::Deserialize(Reader, JsonObject);
if (!bParsed)
{
UE_LOG(LogTemp, Error, TEXT("Failed to parse SSE line: %s"), *Line);
return false;
}
const TArray<TSharedPtr<FJsonValue>>* Candidates;
if (!JsonObject->TryGetArrayField(TEXT("candidates"), Candidates))
{
UE_LOG(LogTemp, Error, TEXT("No candidates field in SSE line: %s"), *Line);
return false;
}
// 最初の候補を取得
auto Candidate = (*Candidates)[0]->AsObject();
auto Content = Candidate->GetObjectField(TEXT("content"));
auto Parts = Content->GetArrayField(TEXT("parts"));
OutText = Parts[0]->AsObject()->GetStringField(TEXT("text"));
UE_LOG(LogTemp, Log, TEXT("Parsing SSE line: %s\n\t->%s"), *Line, *OutText);
return true;
}
2.4 プロンプト
MyActor.h
のPromptFilePath
にて指定しているパスにファイルを作成します。
適宜書き換えれば応答が変わります。
20文字程度の短文で返すようにしてください。
3. Output Logの出力結果
レベル上に作成したMyActorを配置してエディタ実行すると以下のような出力が出るようになっていればOKです。
LogTemp: Parsing SSE line: data: {"candidates": [{"content": {"parts": [{"text": "AI"}],"role": "model"}}],"usageMetadata": {"promptTokenCount": 19,"totalTokenCount": 19,"promptTokensDetails": [{"modality": "TEXT","tokenCount": 19}]},"modelVersion": "gemini-2.0-flash","responseId": "Da6zaPbACpj8n9kPruzVsAI"}
->AI
LogTemp: Current response buffer: AI
LogTemp: Parsing SSE line: data: {"candidates": [{"content": {"parts": [{"text": "アシスタントです。\n"}],"role": "model"},"finishReason": "STOP"}],"usageMetadata": {"promptTokenCount": 17,"candidatesTokenCount": 7,"totalTokenCount": 24,"promptTokensDetails": [{"modality": "TEXT","tokenCount": 17}],"candidatesTokensDetails": [{"modality": "TEXT","tokenCount": 7}]},"modelVersion": "gemini-2.0-flash","responseId": "Da6zaPbACpj8n9kPruzVsAI"}
->アシスタントです。
LogTemp: Current response buffer: AIアシスタントです。
LogTemp: OnCompletedResponseReceived Content:
LogTemp: OnMiddleResponseReceived Total Content: AIアシスタントです。
4. 解説
SSEに関わるところのみ解説します。
なお、Setup
やAddHistory
、Chat
の過去の会話履歴を設定
の処理については、Gemini APIサンプルを参照してください。
4.1 SSEレスポンスの指定
ApiUrl
にてSSE形式でレスポンスを返すように指定しています。
/// <summary>
/// API URL
/// </summary>
const FString ApiUrl = TEXT("https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:streamGenerateContent?alt=sse");
ストリーミング レスポンス を元にしています。
4.2 途中結果受信処理
MyActor.cpp
の以下の処理です。この記事の肝です。
// SSE形式での受信処理
FHttpRequestStreamDelegate StreamDelegate;
StreamDelegate.BindUObject(this, &AMyActor::OnMiddleResponseReceived);
Request->SetResponseBodyReceiveStreamDelegate(StreamDelegate);
IHttpRequest::SetResponseBodyReceiveStreamDelegate
を使用しているだけですが、これ自体の使用例が中々調べても出てこなかった為、エンジンコードを探りました。
なお、途中結果受信デリゲートを追加すると最終結果は空になるようです。
そのため、SSE受信処理の終了判定としてのみ使用しています。
// レスポンスの全データ受信時
Request->OnProcessRequestComplete().BindUObject(this, &AMyActor::OnCompletedResponseReceived);
LogTemp: OnCompletedResponseReceived Content:
受け取った後は文字列に変換してJsonデータをパースする処理が必要になります。
4.2.1 文字列変換処理
バイト列がそのまま引数に入ってくるだけなので文字列に変換してパースすることで対応できます。(★部分)
bool AMyActor::OnMiddleResponseReceived(void* Ptr, int64 Length)
{
FScopeLock Lock(&SseMutex);
FString DataStr;
FString Content;
if (Ptr != nullptr && Length > 0)
{
// 文字列に変換 ★
const uint8* Data = static_cast<uint8*>(Ptr);
DataStr = FString(Length, UTF8_TO_TCHAR(reinterpret_cast<const char*>(Data)));
}
bool bResult = ParseSSELine(DataStr, Content);
if (bResult)
{
ResponseBuffer += Content;
}
UE_LOG(LogTemp, Log, TEXT("Current response buffer: %s"), *ResponseBuffer);
return true;
}
4.2.2 パース処理
このフォーマットの仕様が探した限り見つからなかった為、途中結果をログに出力しつつ、パース処理を作成しました。OpenAIのAPIだと最後に[DONE]
がつくはずですが、Geminiではつかないようです。
Gemini以外のSSE形式データの受信処理を作成する場合は以下のコードを一旦コメントアウトした状態で作成することをお勧めします。
bool AMyActor::ParseSSELine(const FString& Line, FString& OutText)
{
// SSE形式のデータは "data: " で始まる為これを除去してからJSONとしてパース
static const FString HeaderPrefix = TEXT("data: ");
FString Data = Line.Replace(*HeaderPrefix, TEXT(""));
TSharedPtr<FJsonObject> JsonObject;
TSharedRef<TJsonReader<>> Reader = TJsonReaderFactory<>::Create(Data);
bool bParsed = FJsonSerializer::Deserialize(Reader, JsonObject);
if (!bParsed)
{
UE_LOG(LogTemp, Error, TEXT("Failed to parse SSE line: %s"), *Line);
return false;
}
const TArray<TSharedPtr<FJsonValue>>* Candidates;
if (!JsonObject->TryGetArrayField(TEXT("candidates"), Candidates))
{
UE_LOG(LogTemp, Error, TEXT("No candidates field in SSE line: %s"), *Line);
return false;
}
// 最初の候補を取得
auto Candidate = (*Candidates)[0]->AsObject();
auto Content = Candidate->GetObjectField(TEXT("content"));
auto Parts = Content->GetArrayField(TEXT("parts"));
OutText = Parts[0]->AsObject()->GetStringField(TEXT("text"));
UE_LOG(LogTemp, Log, TEXT("Parsing SSE line: %s\n\t->%s"), *Line, *OutText);
return true;
}