10
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Android Advent Calendar 2019

Day 11

AndroidでgRPCの双方向通信などにFlowは相性がいいかもしれない話

Last updated at Posted at 2019-12-11

はじめに

この記事は Android Advent Calendar 2019 11日目の記事です。10日目は @kenz_firespeed さんの Jetpack Composeは速い?遅い? 、12日目は @nichiyoshi さんの 子孫ビューの相対Rect情報を取得してオーバーレイから切り抜く です。

みなさんはKotlinのFlow使ってますか?
趣味でgRPCの双方向ストリーミングを使った簡単なチャットアプリのサンプルを作っていたときに、サーバーとAndroidの値の送受信の部分をFlowを使ってやるといい感じに抽象化できそうだと思ったのでやってみました。

作ったもの

画面収録-2019-12-11-17.14.35.gif

ユーザーの概念すらないので、チャットとは…って感じですが、ちゃんと動いてます。

実装

ChatService

インターフェースの定義です。双方向通信を表現するために、Flowを受け取ってFlowを返すメソッドを定義しています。

ChatService
interface ChatService {
    fun flowChatMessage(request: Flow<ChatMessage>): Flow<ChatMessage>
}

ChatApi

gRPCで通信する部分です。PCのローカルに立てたgRPCのサーバーと通信してます。この時点ではコールバックパターンを使った実装です。

ChatApi
class ChatApi @Inject constructor() {

    private val channel = ManagedChannelBuilder.forAddress("10.0.2.2", 6565)
        .usePlaintext()
        .build()

    private val chatServiceStub = ChatServiceGrpc.newStub(channel)

    fun observeChatMessage(
        onNext: (String) -> Unit,
        onError: (Throwable) -> Unit,
        onCompleted: () -> Unit
    ): StreamObserver<MessageRequest> {
        return chatServiceStub.execStream(object : StreamObserver<MessageResponse> {
            override fun onNext(value: MessageResponse?) {
                value?.message?.let(onNext)
            }

            override fun onError(t: Throwable?) {
                t?.let(onError)
            }

            override fun onCompleted() {
                onCompleted.invoke()
            }
        })
    }
}

ChatServiceOnGrpc

先程のChatServiceを継承し、ChatApiを用いて実装します。
ここで、channelFlowを使うことにより、コールバックで受け取った値をFlowに流しています。

ChatServiceOnGrpc
@ExperimentalCoroutinesApi
class ChatServiceOnGrpc @Inject constructor(
    private val chatApi: ChatApi
) : ChatService {
    override fun flowChatMessage(
        request: Flow<ChatMessage>
    ): Flow<ChatMessage> = channelFlow<ChatMessage> {
        withContext(Dispatchers.IO) {
            val observer = chatApi.observeChatMessage(
                onNext = {
                    launch {
                        channel.send(ChatMessage(it))
                    }
                },
                onError = {
                    throw it
                },
                onCompleted = {
                    channel.close()
                }
            )
            request.onEach {
                val req = MessageRequest.newBuilder()
                    .setMessage(it.value)
                    .build()
                observer.onNext(req)
            }.launchIn(this)
        }
        awaitClose()
    }
        .flowOn(Dispatchers.IO)
        .buffer()
}

ChatViewModel

UI側ではLiveDataで扱えると楽ですよね。 ViewModel側で Flow#asLiveDataLiveData#asFlow を使って相互に変換してあげましょう。

ChatViewModel
class ChatViewModel(
    chatService: ChatService
) : ViewModel() {

    private val myMassage = MutableLiveData<ChatMessage>()

    val receiveMessage = chatService.flowChatMessage(myMassage.asFlow())
        .asLiveData()

    fun sendMessage(message: String): Job = viewModelScope.launch {
        myMassage.value = ChatMessage(message)
    }
}

あとはActivityやFragment側でobserveしてRecyclerViewに表示するなりすれば完成です。

まとめ

今回、gRPCの双方向ストリーミングをFlowを使ってラップしてみました。これにより、gRPC特有の型への依存をインフラ層に閉じ込め、使う側はFlowで扱いやすくなりました。
gRPCだけでなく、WebSocketなどで双方向通信する場合でもFlowでラップすると使いやすいと思います。

今回のサンプルのソースコードはこちらに公開しているので、実際に動かしてみたい場合はどうぞ。
Android側: https://github.com/yt8492/gRPCChat
サーバー側: https://github.com/yt8492/grpc-chat

12/12追記 おまけ

WebSocketを使った場合のサンプルも作ってみました。Flowを使ったことにより、ChatServiceを継承したクラスを実装して差し替えるだけで動きます。
サーバー側はこちら。
https://github.com/yt8492/WebSocketChat

ChatServiceOnWebSocket
@ExperimentalCoroutinesApi
class ChatServiceOnWebSocket @Inject constructor() : ChatService {

    override fun flowChatMessage(
        request: Flow<ChatMessage>
    ): Flow<ChatMessage> = channelFlow<ChatMessage> {
        withContext(Dispatchers.IO) {
            val clientSocket = Socket("10.0.2.2", 6789)
            val serverWriter = clientSocket.getOutputStream().bufferedWriter()
            val socketReader = clientSocket.getInputStream().bufferedReader()
            socketReader.lineSequence()
                .asFlow()
                .onEach {
                    channel.send(ChatMessage(it))
                }.launchIn(this)
            request.onEach {
                serverWriter.write("${it.value}\n")
                serverWriter.flush()
            }.launchIn(this)
        }
        awaitClose()
    }
        .flowOn(Dispatchers.IO)
        .buffer()
}
10
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?