1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Android開発30日間マスターシリーズ - Day26: ストリーミングレスポンス対応 - リアルタイムなLLM応答の表示実装

Posted at

はじめに

今日は、LLM(大規模言語モデル)の応答をユーザーにリアルタイムで表示するストリーミングレスポンスの実装方法を学びます。単一の大きな応答を待つのではなく、生成されたテキストを少しずつ画面に表示することで、ユーザー体験を劇的に向上させます。


1. ストリーミングレスポンスの仕組み

従来のHTTP通信は「リクエスト-レスポンス」という一往復のモデルです。クライアントがリクエストを送信し、サーバーがすべての処理を終えてから完全な応答を返します。

しかし、LLMの応答は生成に時間がかかるため、このモデルではユーザーは何も表示されない画面を待つことになります。

ストリーミングレスポンスは、サーバーが生成したデータを逐次的に(トークン単位で)送信し続ける仕組みです。これにより、クライアントはサーバーからの最初の小さなデータを受け取った直後から、画面を更新し始めることができます。

LLMのストリーミングは通常、Server-Sent Events (SSE) という技術を使用し、text/event-stream形式でデータを送信します。


2. FlowとRetrofit2によるストリーミング実装

Kotlin CoroutinesFlowは、ストリーミングデータを扱うのに最適なコンポーネントです。Retrofit2と組み合わせることで、複雑な通信処理をシンプルに記述できます。

ステップ1: データモデルの定義

まず、ストリーミングで受け取るデータ構造を定義します:

data class ChatRequest(
    val model: String,
    val messages: List<Message>,
    val stream: Boolean = true // ストリーミングを有効にする
)

data class ChatStreamChunk(
    val id: String,
    val `object`: String,
    val created: Long,
    val model: String,
    val choices: List<Choice>
)

data class Choice(
    val index: Int,
    val delta: Delta?,
    val finish_reason: String?
)

data class Delta(
    val content: String?
)

data class Message(
    val id: String = UUID.randomUUID().toString(),
    val role: String,
    val content: String,
    val isStreaming: Boolean = false
)

ステップ2: APIサービスインターフェースの定義

Retrofit2のインターフェースでは、応答タイプをResponseBodyとして定義し、@Streamingアノテーションを使用します:

interface LlmApiService {
    @Streaming
    @POST("v1/chat/completions")
    @Headers("Accept: text/event-stream")
    suspend fun streamChatCompletion(@Body requestBody: ChatRequest): ResponseBody
}

ステップ3: Repositoryでのストリーム処理

Repository層で、SSE形式のストリームをパースし、アプリケーションのデータモデルに変換するロジックを実装します:

class LlmRepository(private val apiService: LlmApiService) {
    private val gson = Gson()
    
    fun streamChatCompletion(request: ChatRequest): Flow<String> = flow {
        try {
            val responseBody = apiService.streamChatCompletion(request)
            
            responseBody.byteStream().bufferedReader().use { reader ->
                var line: String?
                while (reader.readLine().also { line = it } != null) {
                    line?.let { currentLine ->
                        when {
                            currentLine.startsWith("data: ") -> {
                                val data = currentLine.substring(6)
                                
                                // "[DONE]"は終了シグナル
                                if (data.trim() == "[DONE]") {
                                    return@flow
                                }
                                
                                try {
                                    val chunk = gson.fromJson(data, ChatStreamChunk::class.java)
                                    val content = chunk.choices.firstOrNull()?.delta?.content
                                    if (!content.isNullOrEmpty()) {
                                        emit(content)
                                    }
                                } catch (e: JsonSyntaxException) {
                                    // JSONパースエラーをログに記録して続行
                                    println("JSON parse error: ${e.message}")
                                }
                            }
                            currentLine.isEmpty() -> {
                                // 空行は無視
                            }
                            else -> {
                                // その他の行(コメントなど)も無視
                            }
                        }
                    }
                }
            }
        } catch (e: Exception) {
            throw IOException("Stream processing failed: ${e.message}", e)
        }
    }.flowOn(Dispatchers.IO)
}

ステップ4: ViewModelでのデータの収集

ViewModelでは、Repositoryから返されたFlowを収集し、UIの状態を管理します:

class ChatViewModel(private val repository: LlmRepository) : ViewModel() {
    private val _messages = MutableStateFlow<List<Message>>(emptyList())
    val messages: StateFlow<List<Message>> = _messages.asStateFlow()
    
    private val _isLoading = MutableStateFlow(false)
    val isLoading: StateFlow<Boolean> = _isLoading.asStateFlow()
    
    private val _error = MutableStateFlow<String?>(null)
    val error: StateFlow<String?> = _error.asStateFlow()

    fun sendMessage(userMessage: String) {
        viewModelScope.launch {
            try {
                _isLoading.value = true
                _error.value = null
                
                // ユーザーメッセージを追加
                val userMsg = Message(role = "user", content = userMessage)
                _messages.value = _messages.value + userMsg
                
                // アシスタントメッセージのプレースホルダーを追加
                val assistantMsg = Message(
                    role = "assistant", 
                    content = "", 
                    isStreaming = true
                )
                _messages.value = _messages.value + assistantMsg
                
                // ストリーミング開始
                val request = ChatRequest(
                    model = "gpt-3.5-turbo",
                    messages = _messages.value.map { 
                        Message(role = it.role, content = it.content) 
                    }
                )
                
                repository.streamChatCompletion(request)
                    .catch { exception ->
                        _error.value = "エラーが発生しました: ${exception.message}"
                    }
                    .collect { chunk ->
                        // 最後のメッセージにチャンクを追加
                        _messages.update { currentMessages ->
                            val updatedMessages = currentMessages.toMutableList()
                            val lastIndex = updatedMessages.lastIndex
                            val lastMessage = updatedMessages[lastIndex]
                            
                            updatedMessages[lastIndex] = lastMessage.copy(
                                content = lastMessage.content + chunk
                            )
                            updatedMessages.toList()
                        }
                    }
                
                // ストリーミング完了後の処理
                _messages.update { currentMessages ->
                    val updatedMessages = currentMessages.toMutableList()
                    val lastIndex = updatedMessages.lastIndex
                    val lastMessage = updatedMessages[lastIndex]
                    
                    updatedMessages[lastIndex] = lastMessage.copy(isStreaming = false)
                    updatedMessages.toList()
                }
                
            } catch (e: Exception) {
                _error.value = "メッセージ送信に失敗しました: ${e.message}"
            } finally {
                _isLoading.value = false
            }
        }
    }
}

ステップ5: Viewでの表示更新

UI(Activity/Fragment)では、StateFlowを監視し、データが更新されるたびに画面を再描画します:

class ChatFragment : Fragment() {
    private var _binding: FragmentChatBinding? = null
    private val binding get() = _binding!!
    
    private lateinit var chatAdapter: ChatAdapter
    private lateinit var viewModel: ChatViewModel
    
    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        super.onViewCreated(view, savedInstanceState)
        
        setupRecyclerView()
        observeViewModel()
    }
    
    private fun setupRecyclerView() {
        chatAdapter = ChatAdapter()
        binding.recyclerView.apply {
            adapter = chatAdapter
            layoutManager = LinearLayoutManager(requireContext())
        }
    }
    
    private fun observeViewModel() {
        viewLifecycleOwner.lifecycleScope.launch {
            viewModel.messages.collect { messages ->
                chatAdapter.submitList(messages) {
                    // リストを最下部にスクロール
                    if (messages.isNotEmpty()) {
                        binding.recyclerView.smoothScrollToPosition(messages.size - 1)
                    }
                }
            }
        }
        
        viewLifecycleOwner.lifecycleScope.launch {
            viewModel.isLoading.collect { isLoading ->
                binding.sendButton.isEnabled = !isLoading
                binding.progressBar.isVisible = isLoading
            }
        }
        
        viewLifecycleOwner.lifecycleScope.launch {
            viewModel.error.collect { error ->
                error?.let {
                    // エラーメッセージを表示(SnackbarやToastなど)
                    Snackbar.make(binding.root, it, Snackbar.LENGTH_LONG).show()
                }
            }
        }
    }
}

3. エラーハンドリングとパフォーマンスの考慮

ネットワークエラーの処理

fun streamChatCompletion(request: ChatRequest): Flow<String> = flow {
    // ... 実装
}.retry(3) { exception ->
    when (exception) {
        is IOException -> {
            delay(1000) // 1秒待機してリトライ
            true
        }
        else -> false
    }
}.catch { exception ->
    emit("") // エラー時は空文字を送信
    throw exception
}

メモリ使用量の最適化

長時間のストリーミングでメモリリークを防ぐため、適切なライフサイクル管理が重要です:

override fun onDestroyView() {
    super.onDestroyView()
    // ViewModelのジョブをキャンセル
    viewModel.cancelStreaming()
    _binding = null
}

4. まとめ

  • ストリーミングレスポンスは、LLMの応答をリアルタイムに表示し、ユーザーの待機時間を短縮します
  • Server-Sent Events (SSE) 形式のデータを適切にパースすることが重要です
  • Kotlin CoroutinesのFlow は、ストリームデータを扱うための強力なツールです
  • Repository層flowビルダーを使い、生のストリームをアプリのデータモデルに変換します
  • ViewModel層collectメソッドを使い、データを収集してUIの状態を管理します
  • エラーハンドリングとライフサイクル管理を適切に実装することで、安定したアプリケーションを構築できます

これにより、ユーザーはAIが考えながら応答を生成する様子をリアルタイムで見ることができ、まるで人間と会話しているかのような体験を得られます。次回は、このアプリを公開に向けてデバッグとテストする方法を学びます。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?