はじめに
この記事は Android Advent Calendar 2019 11日目の記事です。10日目は @kenz_firespeed さんの Jetpack Composeは速い?遅い? 、12日目は @nichiyoshi さんの 子孫ビューの相対Rect情報を取得してオーバーレイから切り抜く です。
みなさんはKotlinのFlow使ってますか?
趣味でgRPCの双方向ストリーミングを使った簡単なチャットアプリのサンプルを作っていたときに、サーバーとAndroidの値の送受信の部分をFlowを使ってやるといい感じに抽象化できそうだと思ったのでやってみました。
作ったもの
ユーザーの概念すらないので、チャットとは…って感じですが、ちゃんと動いてます。
実装
ChatService
インターフェースの定義です。双方向通信を表現するために、Flowを受け取ってFlowを返すメソッドを定義しています。
interface ChatService {
fun flowChatMessage(request: Flow<ChatMessage>): Flow<ChatMessage>
}
ChatApi
gRPCで通信する部分です。PCのローカルに立てたgRPCのサーバーと通信してます。この時点ではコールバックパターンを使った実装です。
class ChatApi @Inject constructor() {
private val channel = ManagedChannelBuilder.forAddress("10.0.2.2", 6565)
.usePlaintext()
.build()
private val chatServiceStub = ChatServiceGrpc.newStub(channel)
fun observeChatMessage(
onNext: (String) -> Unit,
onError: (Throwable) -> Unit,
onCompleted: () -> Unit
): StreamObserver<MessageRequest> {
return chatServiceStub.execStream(object : StreamObserver<MessageResponse> {
override fun onNext(value: MessageResponse?) {
value?.message?.let(onNext)
}
override fun onError(t: Throwable?) {
t?.let(onError)
}
override fun onCompleted() {
onCompleted.invoke()
}
})
}
}
ChatServiceOnGrpc
先程のChatServiceを継承し、ChatApiを用いて実装します。
ここで、channelFlowを使うことにより、コールバックで受け取った値をFlowに流しています。
@ExperimentalCoroutinesApi
class ChatServiceOnGrpc @Inject constructor(
private val chatApi: ChatApi
) : ChatService {
override fun flowChatMessage(
request: Flow<ChatMessage>
): Flow<ChatMessage> = channelFlow<ChatMessage> {
withContext(Dispatchers.IO) {
val observer = chatApi.observeChatMessage(
onNext = {
launch {
channel.send(ChatMessage(it))
}
},
onError = {
throw it
},
onCompleted = {
channel.close()
}
)
request.onEach {
val req = MessageRequest.newBuilder()
.setMessage(it.value)
.build()
observer.onNext(req)
}.launchIn(this)
}
awaitClose()
}
.flowOn(Dispatchers.IO)
.buffer()
}
ChatViewModel
UI側ではLiveDataで扱えると楽ですよね。 ViewModel側で Flow#asLiveData
と LiveData#asFlow
を使って相互に変換してあげましょう。
class ChatViewModel(
chatService: ChatService
) : ViewModel() {
private val myMassage = MutableLiveData<ChatMessage>()
val receiveMessage = chatService.flowChatMessage(myMassage.asFlow())
.asLiveData()
fun sendMessage(message: String): Job = viewModelScope.launch {
myMassage.value = ChatMessage(message)
}
}
あとはActivityやFragment側でobserveしてRecyclerViewに表示するなりすれば完成です。
まとめ
今回、gRPCの双方向ストリーミングをFlowを使ってラップしてみました。これにより、gRPC特有の型への依存をインフラ層に閉じ込め、使う側はFlowで扱いやすくなりました。
gRPCだけでなく、WebSocketなどで双方向通信する場合でもFlowでラップすると使いやすいと思います。
今回のサンプルのソースコードはこちらに公開しているので、実際に動かしてみたい場合はどうぞ。
Android側: https://github.com/yt8492/gRPCChat
サーバー側: https://github.com/yt8492/grpc-chat
12/12追記 おまけ
WebSocketを使った場合のサンプルも作ってみました。Flowを使ったことにより、ChatServiceを継承したクラスを実装して差し替えるだけで動きます。
サーバー側はこちら。
https://github.com/yt8492/WebSocketChat
@ExperimentalCoroutinesApi
class ChatServiceOnWebSocket @Inject constructor() : ChatService {
override fun flowChatMessage(
request: Flow<ChatMessage>
): Flow<ChatMessage> = channelFlow<ChatMessage> {
withContext(Dispatchers.IO) {
val clientSocket = Socket("10.0.2.2", 6789)
val serverWriter = clientSocket.getOutputStream().bufferedWriter()
val socketReader = clientSocket.getInputStream().bufferedReader()
socketReader.lineSequence()
.asFlow()
.onEach {
channel.send(ChatMessage(it))
}.launchIn(this)
request.onEach {
serverWriter.write("${it.value}\n")
serverWriter.flush()
}.launchIn(this)
}
awaitClose()
}
.flowOn(Dispatchers.IO)
.buffer()
}