はじめに
今日は、LLM(大規模言語モデル)の応答をユーザーにリアルタイムで表示するストリーミングレスポンスの実装方法を学びます。単一の大きな応答を待つのではなく、生成されたテキストを少しずつ画面に表示することで、ユーザー体験を劇的に向上させます。
1. ストリーミングレスポンスの仕組み
従来のHTTP通信は「リクエスト-レスポンス」という一往復のモデルです。クライアントがリクエストを送信し、サーバーがすべての処理を終えてから完全な応答を返します。
しかし、LLMの応答は生成に時間がかかるため、このモデルではユーザーは何も表示されない画面を待つことになります。
ストリーミングレスポンスは、サーバーが生成したデータを逐次的に(トークン単位で)送信し続ける仕組みです。これにより、クライアントはサーバーからの最初の小さなデータを受け取った直後から、画面を更新し始めることができます。
LLMのストリーミングは通常、Server-Sent Events (SSE) という技術を使用し、text/event-stream形式でデータを送信します。
2. FlowとRetrofit2によるストリーミング実装
Kotlin CoroutinesのFlowは、ストリーミングデータを扱うのに最適なコンポーネントです。Retrofit2と組み合わせることで、複雑な通信処理をシンプルに記述できます。
ステップ1: データモデルの定義
まず、ストリーミングで受け取るデータ構造を定義します:
data class ChatRequest(
val model: String,
val messages: List<Message>,
val stream: Boolean = true // ストリーミングを有効にする
)
data class ChatStreamChunk(
val id: String,
val `object`: String,
val created: Long,
val model: String,
val choices: List<Choice>
)
data class Choice(
val index: Int,
val delta: Delta?,
val finish_reason: String?
)
data class Delta(
val content: String?
)
data class Message(
val id: String = UUID.randomUUID().toString(),
val role: String,
val content: String,
val isStreaming: Boolean = false
)
ステップ2: APIサービスインターフェースの定義
Retrofit2のインターフェースでは、応答タイプをResponseBodyとして定義し、@Streamingアノテーションを使用します:
interface LlmApiService {
@Streaming
@POST("v1/chat/completions")
@Headers("Accept: text/event-stream")
suspend fun streamChatCompletion(@Body requestBody: ChatRequest): ResponseBody
}
ステップ3: Repositoryでのストリーム処理
Repository層で、SSE形式のストリームをパースし、アプリケーションのデータモデルに変換するロジックを実装します:
class LlmRepository(private val apiService: LlmApiService) {
private val gson = Gson()
fun streamChatCompletion(request: ChatRequest): Flow<String> = flow {
try {
val responseBody = apiService.streamChatCompletion(request)
responseBody.byteStream().bufferedReader().use { reader ->
var line: String?
while (reader.readLine().also { line = it } != null) {
line?.let { currentLine ->
when {
currentLine.startsWith("data: ") -> {
val data = currentLine.substring(6)
// "[DONE]"は終了シグナル
if (data.trim() == "[DONE]") {
return@flow
}
try {
val chunk = gson.fromJson(data, ChatStreamChunk::class.java)
val content = chunk.choices.firstOrNull()?.delta?.content
if (!content.isNullOrEmpty()) {
emit(content)
}
} catch (e: JsonSyntaxException) {
// JSONパースエラーをログに記録して続行
println("JSON parse error: ${e.message}")
}
}
currentLine.isEmpty() -> {
// 空行は無視
}
else -> {
// その他の行(コメントなど)も無視
}
}
}
}
}
} catch (e: Exception) {
throw IOException("Stream processing failed: ${e.message}", e)
}
}.flowOn(Dispatchers.IO)
}
ステップ4: ViewModelでのデータの収集
ViewModelでは、Repositoryから返されたFlowを収集し、UIの状態を管理します:
class ChatViewModel(private val repository: LlmRepository) : ViewModel() {
private val _messages = MutableStateFlow<List<Message>>(emptyList())
val messages: StateFlow<List<Message>> = _messages.asStateFlow()
private val _isLoading = MutableStateFlow(false)
val isLoading: StateFlow<Boolean> = _isLoading.asStateFlow()
private val _error = MutableStateFlow<String?>(null)
val error: StateFlow<String?> = _error.asStateFlow()
fun sendMessage(userMessage: String) {
viewModelScope.launch {
try {
_isLoading.value = true
_error.value = null
// ユーザーメッセージを追加
val userMsg = Message(role = "user", content = userMessage)
_messages.value = _messages.value + userMsg
// アシスタントメッセージのプレースホルダーを追加
val assistantMsg = Message(
role = "assistant",
content = "",
isStreaming = true
)
_messages.value = _messages.value + assistantMsg
// ストリーミング開始
val request = ChatRequest(
model = "gpt-3.5-turbo",
messages = _messages.value.map {
Message(role = it.role, content = it.content)
}
)
repository.streamChatCompletion(request)
.catch { exception ->
_error.value = "エラーが発生しました: ${exception.message}"
}
.collect { chunk ->
// 最後のメッセージにチャンクを追加
_messages.update { currentMessages ->
val updatedMessages = currentMessages.toMutableList()
val lastIndex = updatedMessages.lastIndex
val lastMessage = updatedMessages[lastIndex]
updatedMessages[lastIndex] = lastMessage.copy(
content = lastMessage.content + chunk
)
updatedMessages.toList()
}
}
// ストリーミング完了後の処理
_messages.update { currentMessages ->
val updatedMessages = currentMessages.toMutableList()
val lastIndex = updatedMessages.lastIndex
val lastMessage = updatedMessages[lastIndex]
updatedMessages[lastIndex] = lastMessage.copy(isStreaming = false)
updatedMessages.toList()
}
} catch (e: Exception) {
_error.value = "メッセージ送信に失敗しました: ${e.message}"
} finally {
_isLoading.value = false
}
}
}
}
ステップ5: Viewでの表示更新
UI(Activity/Fragment)では、StateFlowを監視し、データが更新されるたびに画面を再描画します:
class ChatFragment : Fragment() {
private var _binding: FragmentChatBinding? = null
private val binding get() = _binding!!
private lateinit var chatAdapter: ChatAdapter
private lateinit var viewModel: ChatViewModel
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
super.onViewCreated(view, savedInstanceState)
setupRecyclerView()
observeViewModel()
}
private fun setupRecyclerView() {
chatAdapter = ChatAdapter()
binding.recyclerView.apply {
adapter = chatAdapter
layoutManager = LinearLayoutManager(requireContext())
}
}
private fun observeViewModel() {
viewLifecycleOwner.lifecycleScope.launch {
viewModel.messages.collect { messages ->
chatAdapter.submitList(messages) {
// リストを最下部にスクロール
if (messages.isNotEmpty()) {
binding.recyclerView.smoothScrollToPosition(messages.size - 1)
}
}
}
}
viewLifecycleOwner.lifecycleScope.launch {
viewModel.isLoading.collect { isLoading ->
binding.sendButton.isEnabled = !isLoading
binding.progressBar.isVisible = isLoading
}
}
viewLifecycleOwner.lifecycleScope.launch {
viewModel.error.collect { error ->
error?.let {
// エラーメッセージを表示(SnackbarやToastなど)
Snackbar.make(binding.root, it, Snackbar.LENGTH_LONG).show()
}
}
}
}
}
3. エラーハンドリングとパフォーマンスの考慮
ネットワークエラーの処理
fun streamChatCompletion(request: ChatRequest): Flow<String> = flow {
// ... 実装
}.retry(3) { exception ->
when (exception) {
is IOException -> {
delay(1000) // 1秒待機してリトライ
true
}
else -> false
}
}.catch { exception ->
emit("") // エラー時は空文字を送信
throw exception
}
メモリ使用量の最適化
長時間のストリーミングでメモリリークを防ぐため、適切なライフサイクル管理が重要です:
override fun onDestroyView() {
super.onDestroyView()
// ViewModelのジョブをキャンセル
viewModel.cancelStreaming()
_binding = null
}
4. まとめ
- ストリーミングレスポンスは、LLMの応答をリアルタイムに表示し、ユーザーの待機時間を短縮します
- Server-Sent Events (SSE) 形式のデータを適切にパースすることが重要です
-
Kotlin Coroutinesの
Flowは、ストリームデータを扱うための強力なツールです -
Repository層で
flowビルダーを使い、生のストリームをアプリのデータモデルに変換します -
ViewModel層で
collectメソッドを使い、データを収集してUIの状態を管理します - エラーハンドリングとライフサイクル管理を適切に実装することで、安定したアプリケーションを構築できます
これにより、ユーザーはAIが考えながら応答を生成する様子をリアルタイムで見ることができ、まるで人間と会話しているかのような体験を得られます。次回は、このアプリを公開に向けてデバッグとテストする方法を学びます。