0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

UE5でSSE(Server-Sent Events)受信の実装

Posted at

UE C++でSSE形式のレスポンス受信の実装例が転がっていなかったので作成します。
Google Geminiでの実装例になりますが、本質的なところは他のREST APIでも同じだと思います。

環境

UE5.3
Windows 11

1. SSEとは

詳しいことは他の記事で沢山書かれている為省略しますが、HTTPリクエストをサーバに送信した時にクライアントに対してデータを複数回に分けて送信するような通信のことです。
昔からある技術のようですが、対話AIでの会話文生成等でよく使われるようになったようです。

2. SSE受信の実装コード

  • 必要モジュール
    実装するモジュールのBuil.csに以下の依存モジュールを追加する必要があります。
    Json系はレスポンス受信時のデータ解析に使用します。

    • HTTP
    • Json
    • JsonUtilities

    典型例としては以下のようになると思います。

     // Fill out your copyright notice in the Description page of Project Settings.
    
    using UnrealBuildTool;
    
    public class UE5_sandbox : ModuleRules
    {
        public UE5_sandbox(ReadOnlyTargetRules Target) : base(Target)
        {
            PCHUsage = PCHUsageMode.UseExplicitOrSharedPCHs;
    
            PublicDependencyModuleNames.AddRange(new string[] {
                    "Core",
                    "CoreUObject",
                    "Engine",
                    "HTTP",
                    "Json",
                    "JsonUtilities"
                });
        }
    }
    
  • 実装コード
    テスト用にActorを継承したMyActorクラスを作成しました。

    • ヘッダ
      ApiKeyにはGemini AI Studioで取得したAPIキーを指定してください。但し本番環境では、別ファイルに切り出す等セキュリティ保護を行うことを推奨します。

      MyActor.h
      // Fill out your copyright notice in the Description page of Project Settings.
      
      #pragma once
      
      #include "CoreMinimal.h"
      #include "GameFramework/Actor.h"
      #include "HttpModule.h"
      #include "MyActor.generated.h"
      
      /// <summary>
      /// プロンプトに使用する役割
      /// </summary>
      UENUM()
      enum ERole : uint8
      {
          MODEL = 0,
          USER,
          ASSISTANT
      };
      
      UCLASS(Placeable)
      class AMyActor : public AActor
      {
          GENERATED_BODY()
      
      public:
          // Sets default values for this actor's properties
          AMyActor();
      
      protected:
          // Called when the game starts or when spawned
          virtual void BeginPlay() override;
      
      public:
          bool Setup();
      
          /// <summary>
          /// メッセージを送信し、応答を取得する
          /// </summary>
          void Chat();
      
          /// <summary>
          /// 会話履歴に追加する
          /// </summary>
          /// <param name="Role">役割</param>
          /// <param name="Message">メッセージ内容</param>
          void AddHistory(const ERole& InRole, const FString& Message);
      
      private:
          /// <summary>
          /// HTTPリクエスト結果受信
          /// </summary>
          /// <param name="Request"></param>
          /// <param name="Response"></param>
          /// <param name="bConnectedSuccessfully"></param>
          void OnCompletedResponseReceived(FHttpRequestPtr Request, FHttpResponsePtr Response, bool bConnectedSuccessfully);
      
          /// <summary>
          /// HTTPリクエストをSSE形式での受信処理
          /// </summary>
          /// <param name="Ptr">データ本体</param>
          /// <param name="Length">データ長</param>
          bool OnMiddleResponseReceived(void* Ptr, int64 Length);
      
          /// <summary>
          /// SSEフォーマットの行をパースする
          /// </summary>
          bool ParseSSELine(const FString& Line, FString& OutText);
      
      private:
          /// <summary>
          /// APIキー
          /// </summary>
          /// TODO: セキュリティ上の理由から、APIキーはコードにハードコーディングせず外部ファイルから取得するよう変更してください。
          const FString ApiKey = TEXT("xxxxxxxxxxxxxxxxxxxx");
      
          /// <summary>
          /// API URL
          /// </summary>
          const FString ApiUrl = TEXT("https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:streamGenerateContent");
      
          /// <summary>
          /// プロンプトファイルのパス
          /// </summary>
          const FString PromptFilePath = FPaths::ProjectContentDir() / TEXT("Prompt.md");
      
          /// <summary>
          /// システムプロンプト
          /// </summary>
          TSharedPtr<FJsonObject> SystemInstruction;
      
          /// <summary>
          /// 途中結果収集用バッファ
          /// </summary>
          FString ResponseBuffer;
      
          /// <summary>
          /// SSE用Mutex
          /// </summary>
          FCriticalSection SseMutex;
      
          /// <summary>
          /// 会話履歴
          /// </summary>
          TArray<TSharedPtr<FJsonObject>> History;
      };
      
      
    • ソース

      MyActor.cpp
      // Fill out your copyright notice in the Description page of Project Settings.
      
      #include "MyActor.h"
      #include "Interfaces/IHttpRequest.h"
      #include "Interfaces/IHttpResponse.h"
      #include "Json.h"
      
      // Sets default values
      AMyActor::AMyActor()
      {
          // Set this actor to call Tick() every frame.  You can turn this off to improve performance if you don't need it.
          PrimaryActorTick.bCanEverTick = false;
      }
      
      // Called when the game starts or when spawned
      void AMyActor::BeginPlay()
      {
          Super::BeginPlay();
          // 初期化処理
          if (Setup())
          {
              // チャット開始 実際にはユーザーからの入力を待つなどの処理が必要
              AddHistory(ERole::USER, TEXT("自己紹介をしてください。"));
              Chat();
          }
      }
      
      bool AMyActor::Setup()
      {
          checkf(!ApiKey.IsEmpty(), TEXT("API key is empty."));
      
          // システムプロンプトの初期化
          SystemInstruction = MakeShared<FJsonObject>();
          TArray<TSharedPtr<FJsonValue>> Parts;
      
          FString Prompt;
          bool    bFileLoaded = FFileHelper::LoadFileToString(Prompt, *PromptFilePath);
          if (!bFileLoaded)
          {
              UE_LOG(LogTemp, Error, TEXT("Failed to load system prompt: %s"), *PromptFilePath);
          }
      
          auto TextObject = MakeShared<FJsonObject>();
          TextObject->SetStringField(TEXT("text"), Prompt);
      
          Parts.Add(MakeShared<FJsonValueObject>(TextObject));
          SystemInstruction->SetArrayField(TEXT("parts"), Parts);
          return true;
      }
      
      void AMyActor::Chat()
      {
          // リクエストデータの作成
          auto RequestObj = MakeShared<FJsonObject>();
      
          TArray<TSharedPtr<FJsonValue>> ContentsArray;
          for (const auto& HistoryItem : History)
          {
              ContentsArray.Add(MakeShared<FJsonValueObject>(HistoryItem));
          }
          RequestObj->SetArrayField(TEXT("contents"), ContentsArray);
      
          if (SystemInstruction.IsValid())
          {
              RequestObj->SetObjectField(TEXT("system_instruction"), SystemInstruction);
          }
      
          FString RequestBody;
          auto    Writer = TJsonWriterFactory<>::Create(&RequestBody);
          FJsonSerializer::Serialize(RequestObj, Writer);
      
          // HTTP リクエストの作成と送信
          auto Request = FHttpModule::Get().CreateRequest();
          Request->SetURL(ApiUrl + TEXT("?alt=sse&key=") + ApiKey);
          Request->SetVerb(TEXT("POST"));
          Request->SetHeader(TEXT("Content-Type"), TEXT("application/json"));
          Request->SetContentAsString(RequestBody);
      
          ResponseBuffer.Empty();
      
          // レスポンスの全データ受信時
          Request->OnProcessRequestComplete().BindUObject(this, &AMyActor::OnCompletedResponseReceived);
      
          // SSE形式での受信処理
          FHttpRequestStreamDelegate StreamDelegate;
          StreamDelegate.BindUObject(this, &AMyActor::OnMiddleResponseReceived);
          Request->SetResponseBodyReceiveStreamDelegate(StreamDelegate);
      
          // リクエスト送信
          Request->ProcessRequest();
      }
      
      void AMyActor::AddHistory(const ERole& InRole, const FString& Message)
      {
          // メッセージを履歴に追加
          auto    MessageJson = MakeShared<FJsonObject>();
          FString RoleString = StaticEnum<ERole>()->GetNameByValue((int64)InRole).ToString();
          MessageJson->SetStringField(TEXT("role"), RoleString);
      
          TArray<TSharedPtr<FJsonValue>> Parts;
          auto                           TextObject = MakeShared<FJsonObject>();
          TextObject->SetStringField(TEXT("text"), Message);
          Parts.Add(MakeShared<FJsonValueObject>(TextObject));
      
          MessageJson->SetArrayField(TEXT("parts"), Parts);
          History.Add(MessageJson);
      }
      
      void AMyActor::OnCompletedResponseReceived(FHttpRequestPtr Request, FHttpResponsePtr Response, bool bConnectedSuccessfully)
      {
          if (!Response.IsValid() || Response->GetResponseCode() != EHttpResponseCodes::Type::Ok)
          {
              ResponseBuffer.Empty();
              return;
          }
      
          FString         Content = Response->GetContentAsString();
          TArray<FString> Lines;
          Content.ParseIntoArrayLines(Lines);
          if (!ResponseBuffer.IsEmpty())
          {
              // 履歴に追加
              AddHistory(ERole::MODEL, ResponseBuffer);
      
              UE_LOG(LogTemp, Log, TEXT("Final response: %s"), *ResponseBuffer);
          }
          else
          {
              UE_LOG(LogTemp, Warning, TEXT("Final Response is empty."));
          }
          ResponseBuffer.Empty();
      }
      
      bool AMyActor::OnMiddleResponseReceived(void* Ptr, int64 Length)
      {
          FScopeLock Lock(&SseMutex);
      
          FString DataStr;
          FString Content;
          if (Ptr != nullptr && Length > 0)
          {
              // UTF-8としてデータを解釈
              const uint8* Data = static_cast<uint8*>(Ptr);
              DataStr = FString(UTF8_TO_TCHAR(Data));
          }
          bool bResult = ParseSSELine(DataStr, Content);
          if (bResult)
          {
              ResponseBuffer += Content;
          }
      
          UE_LOG(LogTemp, Log, TEXT("Received SSE line: %s"), *DataStr);
          return true;
      }
      
      bool AMyActor::ParseSSELine(const FString& Line, FString& OutText)
      {
          // SSE形式のデータは "data: " で始まる
          static const FString HeaderPrefix = TEXT("data: ");
      
          FString Data = Line.Replace(*HeaderPrefix, TEXT(""));
      
          TSharedPtr<FJsonObject>   JsonObject;
          TSharedRef<TJsonReader<>> Reader = TJsonReaderFactory<>::Create(Data);
          if (FJsonSerializer::Deserialize(Reader, JsonObject))
          {
              const TArray<TSharedPtr<FJsonValue>>* Candidates;
              if (JsonObject->TryGetArrayField(TEXT("candidates"), Candidates))
              {
                  // 最初の候補を取得
                  auto Candidate = (*Candidates)[0]->AsObject();
                  auto Content = Candidate->GetObjectField(TEXT("content"));
                  auto Parts = Content->GetArrayField(TEXT("parts"));
                  OutText = Parts[0]->AsObject()->GetStringField(TEXT("text"));
                  UE_LOG(LogTemp, Log, TEXT("Parsing SSE line: \n\t%s\n\t\t->%s"), *Line, *OutText);
                  return true;
              }
          }
          UE_LOG(LogTemp, Error, TEXT("Failed to parse SSE line: %s"), *Line);
          return false;
      }
      
      
    • プロンプト
      MyActor.hでPromptFilePathにて指定しているパスにファイルを作成します。
      適宜書き換えれば応答が変わります。
      Content/Prompt.md
      20文字程度の短文で返すようにしてください。
      

Output Logの出力結果

レベル上に作成したMyActorを配置してエディタ実行すると以下のような出力が出るようになっていればOKです。

LogTemp: Parsing SSE line: 
    data: {"candidates": [{"content": {"parts": [{"text": "AI"}],"role": "model"}}],"usageMetadata": {"promptTokenCount": 19,"totalTokenCount": 19,"promptTokensDetails": [{"modality": "TEXT","tokenCount": 19}]},"modelVersion": "gemini-2.0-flash","responseId": "czChaJ3qKMmagbUPt96I0QI"}
        ->AI
LogTemp: Received SSE line: data: {"candidates": [{"content": {"parts": [{"text": "AI"}],"role": "model"}}],"usageMetadata": {"promptTokenCount": 19,"totalTokenCount": 19,"promptTokensDetails": [{"modality": "TEXT","tokenCount": 19}]},"modelVersion": "gemini-2.0-flash","responseId": "czChaJ3qKMmagbUPt96I0QI"}
LogTemp: Current response buffer: AI
LogTemp: Parsing SSE line: 
    data: {"candidates": [{"content": {"parts": [{"text": "アシスタントです。\n"}],"role": "model"},"finishReason": "STOP"}],"usageMetadata": {"promptTokenCount": 17,"candidatesTokenCount": 7,"totalTokenCount": 24,"promptTokensDetails": [{"modality": "TEXT","tokenCount": 17}],"candidatesTokensDetails": [{"modality": "TEXT","tokenCount": 7}]},"modelVersion": "gemini-2.0-flash","responseId": "czChaJ3qKMmagbUPt96I0QI"}
        ->アシスタントです。
LogTemp: Received SSE line: data: {"candidates": [{"content": {"parts": [{"text": "アシスタントです。\n"}],"role": "model"},"finishReason": "STOP"}],"usageMetadata": {"promptTokenCount": 17,"candidatesTokenCount": 7,"totalTokenCount": 24,"promptTokensDetails": [{"modality": "TEXT","tokenCount": 17}],"candidatesTokensDetails": [{"modality": "TEXT","tokenCount": 7}]},"modelVersion": "gemini-2.0-flash","responseId": "czChaJ3qKMmagbUPt96I0QI"}
LogTemp: Current response buffer: AIアシスタントです。
LogTemp: Final response: AIアシスタントです。

3. 解説

SSEに関わるところのみです。

  • SSEレスポンスの指定
    ApiUrlでSSE形式でレスポンスを返すように指定しています。

    MyActor.h
        /// <summary>
        /// API URL
        /// </summary>
        const FString ApiUrl = TEXT("https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:streamGenerateContent");
    

    Gemini AI for Developers #ストリーミング レスポンスRESTのサンプルを元にしています。

  • 途中結果受信処理
    MyActor.cppの以下の処理です。この記事の肝です。

    MyActor.cpp
    // SSE形式での受信処理
    FHttpRequestStreamDelegate StreamDelegate;
    StreamDelegate.BindUObject(this, &AMyActor::OnMiddleResponseReceived);
    Request->SetResponseBodyReceiveStreamDelegate(StreamDelegate);
    

    IHttpRequest::SetResponseBodyReceiveStreamDelegate
    を使用しているだけですが、これ自体の使用例が中々調べても出てこなかった為、エンジンコードを探りました。

    なお、途中結果受信デリゲートを追加しても最終結果は通常通りうけとれるようです。

    MyActor.cpp
    // レスポンスの全データ受信時
    Request->OnProcessRequestComplete().BindUObject(this, &AMyActor::OnCompletedResponseReceived);
    
    • 文字列変換処理
      バイト列がそのまま引数に入ってくるだけなので文字列に直してあげてパースをすればOKでした。
      MyActor.cpp
          if (Ptr != nullptr && Length > 0)
          {
              // UTF-8としてデータを解釈
              const uint8* Data = static_cast<uint8*>(Ptr);
              DataStr = FString(UTF8_TO_TCHAR(Data));
          }
      
    • パース処理
      このフォーマットの仕様が探した限り見つからなかった為、途中結果をログに出力しつつ、パース処理を作成しました。OpenAIのAPIだと最後にDONEがつくはずですが、Geminiではつかないようです。
      Gemini以外のSSE形式データの受信処理を作成する場合は以下のコードを一旦コメントアウトした状態で作成することをお勧めします。
      MyActor.cpp
      bool AMyActor::ParseSSELine(const FString& Line, FString& OutText)
      {
          // SSE形式のデータは "data: " で始まる
          static const FString HeaderPrefix = TEXT("data: ");
      
          FString Data = Line.Replace(*HeaderPrefix, TEXT(""));
      
          TSharedPtr<FJsonObject>   JsonObject;
          TSharedRef<TJsonReader<>> Reader = TJsonReaderFactory<>::Create(Data);
          if (FJsonSerializer::Deserialize(Reader, JsonObject))
          {
              const TArray<TSharedPtr<FJsonValue>>* Candidates;
              if (JsonObject->TryGetArrayField(TEXT("candidates"), Candidates))
              {
                  // 最初の候補を取得
                  auto Candidate = (*Candidates)[0]->AsObject();
                  auto Content = Candidate->GetObjectField(TEXT("content"));
                  auto Parts = Content->GetArrayField(TEXT("parts"));
                  OutText = Parts[0]->AsObject()->GetStringField(TEXT("text"));
                  UE_LOG(LogTemp, Log, TEXT("Parsing SSE line: \n\t%s\n\t\t->%s"), *Line, *OutText);
                  return true;
              }
          }
          UE_LOG(LogTemp, Error, TEXT("Failed to parse SSE line: %s"), *Line);
          return false;
      }
      
0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?