Help us understand the problem. What is going on with this article?

AndroidでgRPCの双方向通信などにFlowは相性がいいかもしれない話

はじめに

この記事は Android Advent Calendar 2019 11日目の記事です。10日目は @kenz_firespeed さんの Jetpack Composeは速い?遅い? 、12日目は @nichiyoshi さんの 子孫ビューの相対Rect情報を取得してオーバーレイから切り抜く です。

みなさんはKotlinのFlow使ってますか?
趣味でgRPCの双方向ストリーミングを使った簡単なチャットアプリのサンプルを作っていたときに、サーバーとAndroidの値の送受信の部分をFlowを使ってやるといい感じに抽象化できそうだと思ったのでやってみました。

作ったもの

画面収録-2019-12-11-17.14.35.gif

ユーザーの概念すらないので、チャットとは…って感じですが、ちゃんと動いてます。

実装

ChatService

インターフェースの定義です。双方向通信を表現するために、Flowを受け取ってFlowを返すメソッドを定義しています。

ChatService
interface ChatService {
    fun flowChatMessage(request: Flow<ChatMessage>): Flow<ChatMessage>
}

ChatApi

gRPCで通信する部分です。PCのローカルに立てたgRPCのサーバーと通信してます。この時点ではコールバックパターンを使った実装です。

ChatApi
class ChatApi @Inject constructor() {

    private val channel = ManagedChannelBuilder.forAddress("10.0.2.2", 6565)
        .usePlaintext()
        .build()

    private val chatServiceStub = ChatServiceGrpc.newStub(channel)

    fun observeChatMessage(
        onNext: (String) -> Unit,
        onError: (Throwable) -> Unit,
        onCompleted: () -> Unit
    ): StreamObserver<MessageRequest> {
        return chatServiceStub.execStream(object : StreamObserver<MessageResponse> {
            override fun onNext(value: MessageResponse?) {
                value?.message?.let(onNext)
            }

            override fun onError(t: Throwable?) {
                t?.let(onError)
            }

            override fun onCompleted() {
                onCompleted.invoke()
            }
        })
    }
}

ChatServiceOnGrpc

先程のChatServiceを継承し、ChatApiを用いて実装します。
ここで、channelFlowを使うことにより、コールバックで受け取った値をFlowに流しています。

ChatServiceOnGrpc
@ExperimentalCoroutinesApi
class ChatServiceOnGrpc @Inject constructor(
    private val chatApi: ChatApi
) : ChatService {
    override fun flowChatMessage(
        request: Flow<ChatMessage>
    ): Flow<ChatMessage> = channelFlow<ChatMessage> {
        withContext(Dispatchers.IO) {
            val observer = chatApi.observeChatMessage(
                onNext = {
                    launch {
                        channel.send(ChatMessage(it))
                    }
                },
                onError = {
                    throw it
                },
                onCompleted = {
                    channel.close()
                }
            )
            request.onEach {
                val req = MessageRequest.newBuilder()
                    .setMessage(it.value)
                    .build()
                observer.onNext(req)
            }.launchIn(this)
        }
        awaitClose()
    }
        .flowOn(Dispatchers.IO)
        .buffer()
}

ChatViewModel

UI側ではLiveDataで扱えると楽ですよね。 ViewModel側で Flow#asLiveDataLiveData#asFlow を使って相互に変換してあげましょう。

ChatViewModel
class ChatViewModel(
    chatService: ChatService
) : ViewModel() {

    private val myMassage = MutableLiveData<ChatMessage>()

    val receiveMessage = chatService.flowChatMessage(myMassage.asFlow())
        .asLiveData()

    fun sendMessage(message: String): Job = viewModelScope.launch {
        myMassage.value = ChatMessage(message)
    }
}

あとはActivityやFragment側でobserveしてRecyclerViewに表示するなりすれば完成です。

まとめ

今回、gRPCの双方向ストリーミングをFlowを使ってラップしてみました。これにより、gRPC特有の型への依存をインフラ層に閉じ込め、使う側はFlowで扱いやすくなりました。
gRPCだけでなく、WebSocketなどで双方向通信する場合でもFlowでラップすると使いやすいと思います。

今回のサンプルのソースコードはこちらに公開しているので、実際に動かしてみたい場合はどうぞ。
Android側: https://github.com/yt8492/gRPCChat
サーバー側: https://github.com/yt8492/grpc-chat

12/12追記 おまけ

WebSocketを使った場合のサンプルも作ってみました。Flowを使ったことにより、ChatServiceを継承したクラスを実装して差し替えるだけで動きます。
サーバー側はこちら。
https://github.com/yt8492/WebSocketChat

ChatServiceOnWebSocket
@ExperimentalCoroutinesApi
class ChatServiceOnWebSocket @Inject constructor() : ChatService {

    override fun flowChatMessage(
        request: Flow<ChatMessage>
    ): Flow<ChatMessage> = channelFlow<ChatMessage> {
        withContext(Dispatchers.IO) {
            val clientSocket = Socket("10.0.2.2", 6789)
            val serverWriter = clientSocket.getOutputStream().bufferedWriter()
            val socketReader = clientSocket.getInputStream().bufferedReader()
            socketReader.lineSequence()
                .asFlow()
                .onEach {
                    channel.send(ChatMessage(it))
                }.launchIn(this)
            request.onEach {
                serverWriter.write("${it.value}\n")
                serverWriter.flush()
            }.launchIn(this)
        }
        awaitClose()
    }
        .flowOn(Dispatchers.IO)
        .buffer()
}
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした