0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

UE5でSSE(Server-Sent Events)受信の実装

Last updated at Posted at 2025-08-17

UE C++でSSE形式のレスポンス受信の見つからなかったので作成します。
Google Geminiでの実装例になりますが、本質的なところは他のREST APIでも同じだと思います。

環境

UE5.3
Windows 11

1. SSEとは

詳しいことは他の記事で多数書かれている為省略しますが、HTTPリクエストをサーバに送信した時、クライアントに対してデータを複数回に分けて送信するような通信のことです。
昔からある技術のようですが、対話AIでの会話文生成等でよく使われるようになったようです。

2. SSE受信の実装コード

2.1 Build.cs

実装するモジュールのBuild.csに以下の依存モジュールを追加する必要があります。
Json系はレスポンス受信時のデータ解析に使用します。

  • HTTP
  • Json
  • JsonUtilities

典型例は以下のようになります。

using UnrealBuildTool;

public class UE5_sandbox : ModuleRules
{
    public UE5_sandbox(ReadOnlyTargetRules Target) : base(Target)
    {
        PCHUsage = PCHUsageMode.UseExplicitOrSharedPCHs;

        PublicDependencyModuleNames.AddRange(new string[] {
                "Core",
                "CoreUObject",
                "Engine",
                "HTTP",
                "Json",
                "JsonUtilities"
            });
    }
}

2.2 ヘッダファイル

テスト用にActorを継承したMyActorクラスを作成しました。
ApiKeyにはGemini AI Studioで取得したAPIキーを指定してください。但し本番環境では、別ファイルに切り出す等セキュリティ保護を行うことを推奨します。

MyActor.h
#pragma once

#include "CoreMinimal.h"
#include "GameFramework/Actor.h"
#include "HttpModule.h"
#include "MyActor.generated.h"

/// <summary>
/// プロンプトに使用する役割
/// </summary>
UENUM()
enum class ERole : uint8
{
    MODEL = 0,
    USER,
    ASSISTANT
};

UCLASS(Placeable)
class AMyActor : public AActor
{
    GENERATED_BODY()

public:
    AMyActor();

protected:
    virtual void BeginPlay() override;

public:
    bool Setup();

    /// <summary>
    /// メッセージを送信し、応答を取得する
    /// </summary>
    void Chat();

    /// <summary>
    /// 会話履歴に追加する
    /// </summary>
    /// <param name="Role">役割</param>
    /// <param name="Message">メッセージ内容</param>
    void AddHistory(const ERole& InRole, const FString& Message);

private:
    /// <summary>
    /// HTTPリクエスト結果受信
    /// </summary>
    /// <param name="Request"></param>
    /// <param name="Response"></param>
    /// <param name="bConnectedSuccessfully"></param>
    void OnCompletedResponseReceived(FHttpRequestPtr Request, FHttpResponsePtr Response, bool bConnectedSuccessfully);

    /// <summary>
    /// HTTPリクエストをSSE形式での受信処理
    /// </summary>
    /// <param name="Ptr">データ本体</param>
    /// <param name="Length">データ長</param>
    bool OnMiddleResponseReceived(void* Ptr, int64 Length);

    /// <summary>
    /// SSEフォーマットの行をパースする
    /// </summary>
    bool ParseSSELine(const FString& Line, FString& OutText);

private:
    /// <summary>
    /// APIキー
    /// </summary>
    /// TODO: セキュリティ上の理由から、APIキーはコードにハードコーディングせず外部ファイルから取得するよう変更してください。
    const FString ApiKey = TEXT("xxxxxxxxxxxxxxxxxxxx");

    /// <summary>
    /// API URL
    /// </summary>
    const FString ApiUrl = TEXT("https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:streamGenerateContent?alt=sse");

    /// <summary>
    /// プロンプトファイルのパス
    /// </summary>
    const FString PromptFilePath = FPaths::ProjectContentDir() / TEXT("Prompt.md");

    /// <summary>
    /// システムプロンプト
    /// </summary>
    TSharedPtr<FJsonObject> SystemInstruction;

    /// <summary>
    /// 途中結果収集用バッファ
    /// </summary>
    FString ResponseBuffer;

    /// <summary>
    /// SSE用Mutex
    /// </summary>
    FCriticalSection SseMutex;

    /// <summary>
    /// 会話履歴
    /// </summary>
    TArray<TSharedPtr<FJsonObject>> History;
};

2.3 ソースファイル

MyActor.cpp
#include "MyActor.h"
#include "Interfaces/IHttpRequest.h"
#include "Interfaces/IHttpResponse.h"
#include "Json.h"

AMyActor::AMyActor()
{
    PrimaryActorTick.bCanEverTick = false;
}

void AMyActor::BeginPlay()
{
    Super::BeginPlay();
    // 初期化処理
    if (Setup())
    {
        // チャット開始 実際にはユーザーからの入力を待つなどの処理が必要
        AddHistory(ERole::USER, TEXT("自己紹介をしてください。"));
        Chat();
    }
}

bool AMyActor::Setup()
{
    checkf(!ApiKey.IsEmpty(), TEXT("API key is empty."));

    // システムプロンプトの初期化
    SystemInstruction = MakeShared<FJsonObject>();
    TArray<TSharedPtr<FJsonValue>> Parts;

    FString Prompt;
    bool    bFileLoaded = FFileHelper::LoadFileToString(Prompt, *PromptFilePath);
    if (!bFileLoaded)
    {
        UE_LOG(LogTemp, Error, TEXT("Failed to load system prompt: %s"), *PromptFilePath);
    }

    auto TextObject = MakeShared<FJsonObject>();
    TextObject->SetStringField(TEXT("text"), Prompt);

    Parts.Add(MakeShared<FJsonValueObject>(TextObject));
    SystemInstruction->SetArrayField(TEXT("parts"), Parts);

    // 途中結果バッファを初期化
    ResponseBuffer.Empty();
    return true;
}

void AMyActor::Chat()
{
    // リクエストデータの作成
    auto RequestObj = MakeShared<FJsonObject>();

    // 過去の会話履歴を設定
    TArray<TSharedPtr<FJsonValue>> ContentsArray;
    for (const auto& HistoryItem : History)
    {
        ContentsArray.Add(MakeShared<FJsonValueObject>(HistoryItem));
    }
    RequestObj->SetArrayField(TEXT("contents"), ContentsArray);

    if (SystemInstruction.IsValid())
    {
        RequestObj->SetObjectField(TEXT("system_instruction"), SystemInstruction);
    }

    // JSON文字列にシリアライズ
    FString RequestBody;
    auto    Writer = TJsonWriterFactory<>::Create(&RequestBody);
    FJsonSerializer::Serialize(RequestObj, Writer);

    // HTTP リクエストの作成と送信
    auto Request = FHttpModule::Get().CreateRequest();
    Request->SetURL(ApiUrl + TEXT("&key=") + ApiKey);
    Request->SetVerb(TEXT("POST"));
    Request->SetHeader(TEXT("Content-Type"), TEXT("application/json"));
    Request->SetContentAsString(RequestBody);

    // レスポンスの全データ受信時
    Request->OnProcessRequestComplete().BindUObject(this, &AMyActor::OnCompletedResponseReceived);

    // SSE形式での受信時
    FHttpRequestStreamDelegate StreamDelegate;
    StreamDelegate.BindUObject(this, &AMyActor::OnMiddleResponseReceived);
    Request->SetResponseBodyReceiveStreamDelegate(StreamDelegate);

    // リクエスト送信
    Request->ProcessRequest();
}

void AMyActor::AddHistory(const ERole& InRole, const FString& Message)
{
    // メッセージを履歴に追加
    auto    MessageJson = MakeShared<FJsonObject>();
    FString FullEnumName = StaticEnum<ERole>()->GetNameByValue((int64)InRole).ToString();
    FString RoleString = FullEnumName.RightChop(FullEnumName.Find(TEXT("::")) + 2);
    MessageJson->SetStringField(TEXT("role"), RoleString);

    TArray<TSharedPtr<FJsonValue>> Parts;
    auto                           TextObject = MakeShared<FJsonObject>();
    TextObject->SetStringField(TEXT("text"), Message);
    Parts.Add(MakeShared<FJsonValueObject>(TextObject));

    MessageJson->SetArrayField(TEXT("parts"), Parts);
    History.Add(MessageJson);
}

void AMyActor::OnCompletedResponseReceived(FHttpRequestPtr Request, FHttpResponsePtr Response, bool bConnectedSuccessfully)
{
    if (!Response.IsValid() || Response->GetResponseCode() != EHttpResponseCodes::Type::Ok)
    {
        ResponseBuffer.Empty();
        return;
    }

    // 最終的なレスポンスは何も出力されない
    FString Content = Response->GetContentAsString();
    UE_LOG(LogTemp, Log, TEXT("OnCompletedResponseReceived Content: %s"), *Content);

    // 途中結果バッファを履歴に追加
    AddHistory(ERole::MODEL, ResponseBuffer);
    UE_LOG(LogTemp, Log, TEXT("OnMiddleResponseReceived Total Content: %s"), *ResponseBuffer);

    ResponseBuffer.Empty();
}

bool AMyActor::OnMiddleResponseReceived(void* Ptr, int64 Length)
{
    FScopeLock Lock(&SseMutex);

    FString DataStr;
    FString Content;
    if (Ptr != nullptr && Length > 0)
    {
        // 文字列に変換
        const uint8* Data = static_cast<uint8*>(Ptr);
        DataStr = FString(Length, UTF8_TO_TCHAR(reinterpret_cast<const char*>(Data)));
    }
    bool bResult = ParseSSELine(DataStr, Content);
    if (bResult)
    {
        ResponseBuffer += Content;
    }
    UE_LOG(LogTemp, Log, TEXT("Current response buffer: %s"), *ResponseBuffer);
    return true;
}

bool AMyActor::ParseSSELine(const FString& Line, FString& OutText)
{
    // SSE形式のデータは "data: " で始まる為これを除去してからJSONとしてパース
    static const FString HeaderPrefix = TEXT("data: ");

    FString Data = Line.Replace(*HeaderPrefix, TEXT(""));

    TSharedPtr<FJsonObject>   JsonObject;
    TSharedRef<TJsonReader<>> Reader = TJsonReaderFactory<>::Create(Data);

    bool bParsed = FJsonSerializer::Deserialize(Reader, JsonObject);
    if (!bParsed)
    {
        UE_LOG(LogTemp, Error, TEXT("Failed to parse SSE line: %s"), *Line);
        return false;
    }

    const TArray<TSharedPtr<FJsonValue>>* Candidates;
    if (!JsonObject->TryGetArrayField(TEXT("candidates"), Candidates))
    {
        UE_LOG(LogTemp, Error, TEXT("No candidates field in SSE line: %s"), *Line);
        return false;
    }

    // 最初の候補を取得
    auto Candidate = (*Candidates)[0]->AsObject();
    auto Content = Candidate->GetObjectField(TEXT("content"));
    auto Parts = Content->GetArrayField(TEXT("parts"));
    OutText = Parts[0]->AsObject()->GetStringField(TEXT("text"));
    UE_LOG(LogTemp, Log, TEXT("Parsing SSE line: %s\n\t->%s"), *Line, *OutText);
    return true;
}

2.4 プロンプト

MyActor.hPromptFilePathにて指定しているパスにファイルを作成します。
適宜書き換えれば応答が変わります。

Content/Prompt.md
20文字程度の短文で返すようにしてください。

3. Output Logの出力結果

レベル上に作成したMyActorを配置してエディタ実行すると以下のような出力が出るようになっていればOKです。

LogTemp: Parsing SSE line: data: {"candidates": [{"content": {"parts": [{"text": "AI"}],"role": "model"}}],"usageMetadata": {"promptTokenCount": 19,"totalTokenCount": 19,"promptTokensDetails": [{"modality": "TEXT","tokenCount": 19}]},"modelVersion": "gemini-2.0-flash","responseId": "Da6zaPbACpj8n9kPruzVsAI"}
    ->AI
LogTemp: Current response buffer: AI
LogTemp: Parsing SSE line: data: {"candidates": [{"content": {"parts": [{"text": "アシスタントです。\n"}],"role": "model"},"finishReason": "STOP"}],"usageMetadata": {"promptTokenCount": 17,"candidatesTokenCount": 7,"totalTokenCount": 24,"promptTokensDetails": [{"modality": "TEXT","tokenCount": 17}],"candidatesTokensDetails": [{"modality": "TEXT","tokenCount": 7}]},"modelVersion": "gemini-2.0-flash","responseId": "Da6zaPbACpj8n9kPruzVsAI"}
    ->アシスタントです。
LogTemp: Current response buffer: AIアシスタントです。
LogTemp: OnCompletedResponseReceived Content:
LogTemp: OnMiddleResponseReceived Total Content: AIアシスタントです。

4. 解説

SSEに関わるところのみ解説します。
なお、SetupAddHistoryChat過去の会話履歴を設定の処理については、Gemini APIサンプルを参照してください。

4.1 SSEレスポンスの指定

ApiUrlにてSSE形式でレスポンスを返すように指定しています。

MyActor.h
/// <summary>
/// API URL
/// </summary>
const FString ApiUrl = TEXT("https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:streamGenerateContent?alt=sse");

ストリーミング レスポンス を元にしています。

4.2 途中結果受信処理

MyActor.cppの以下の処理です。この記事の肝です。

MyActor.cpp
// SSE形式での受信処理
FHttpRequestStreamDelegate StreamDelegate;
StreamDelegate.BindUObject(this, &AMyActor::OnMiddleResponseReceived);
Request->SetResponseBodyReceiveStreamDelegate(StreamDelegate);

IHttpRequest::SetResponseBodyReceiveStreamDelegate
を使用しているだけですが、これ自体の使用例が中々調べても出てこなかった為、エンジンコードを探りました。

なお、途中結果受信デリゲートを追加すると最終結果は空になるようです。
そのため、SSE受信処理の終了判定としてのみ使用しています。

MyActor.cpp
// レスポンスの全データ受信時
Request->OnProcessRequestComplete().BindUObject(this, &AMyActor::OnCompletedResponseReceived);
ログ出力
LogTemp: OnCompletedResponseReceived Content:

受け取った後は文字列に変換してJsonデータをパースする処理が必要になります。

4.2.1 文字列変換処理

バイト列がそのまま引数に入ってくるだけなので文字列に変換してパースすることで対応できます。(★部分)

MyActor.cpp
bool AMyActor::OnMiddleResponseReceived(void* Ptr, int64 Length)
{
    FScopeLock Lock(&SseMutex);

    FString DataStr;
    FString Content;
    if (Ptr != nullptr && Length > 0)
    {
        // 文字列に変換 ★
        const uint8* Data = static_cast<uint8*>(Ptr);
        DataStr = FString(Length, UTF8_TO_TCHAR(reinterpret_cast<const char*>(Data)));
    }
    bool bResult = ParseSSELine(DataStr, Content);
    if (bResult)
    {
        ResponseBuffer += Content;
    }
    UE_LOG(LogTemp, Log, TEXT("Current response buffer: %s"), *ResponseBuffer);
    return true;
}

4.2.2 パース処理

このフォーマットの仕様が探した限り見つからなかった為、途中結果をログに出力しつつ、パース処理を作成しました。OpenAIのAPIだと最後に[DONE]がつくはずですが、Geminiではつかないようです。
Gemini以外のSSE形式データの受信処理を作成する場合は以下のコードを一旦コメントアウトした状態で作成することをお勧めします。

MyActor.cpp
bool AMyActor::ParseSSELine(const FString& Line, FString& OutText)
{
    // SSE形式のデータは "data: " で始まる為これを除去してからJSONとしてパース
    static const FString HeaderPrefix = TEXT("data: ");

    FString Data = Line.Replace(*HeaderPrefix, TEXT(""));

    TSharedPtr<FJsonObject>   JsonObject;
    TSharedRef<TJsonReader<>> Reader = TJsonReaderFactory<>::Create(Data);

    bool bParsed = FJsonSerializer::Deserialize(Reader, JsonObject);
    if (!bParsed)
    {
        UE_LOG(LogTemp, Error, TEXT("Failed to parse SSE line: %s"), *Line);
        return false;
    }

    const TArray<TSharedPtr<FJsonValue>>* Candidates;
    if (!JsonObject->TryGetArrayField(TEXT("candidates"), Candidates))
    {
        UE_LOG(LogTemp, Error, TEXT("No candidates field in SSE line: %s"), *Line);
        return false;
    }

    // 最初の候補を取得
    auto Candidate = (*Candidates)[0]->AsObject();
    auto Content = Candidate->GetObjectField(TEXT("content"));
    auto Parts = Content->GetArrayField(TEXT("parts"));
    OutText = Parts[0]->AsObject()->GetStringField(TEXT("text"));
    UE_LOG(LogTemp, Log, TEXT("Parsing SSE line: %s\n\t->%s"), *Line, *OutText);
    return true;
}
0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?