3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Rxを使ったEventBusをCoroutinesのFlowに置き換えてみる

Last updated at Posted at 2021-04-08

#はじめに
どこかしらでEventが発火されたら、そのEventの発火を待ち受けていた別の箇所で処理をするEventBusをRxで書いたものをKotlinのCoroutinesのFlowを用いて置き換えたのを記します。

##Rxを使ったEventBus
以下はRxを使ったEventBusの簡単な例です。
Buttonを押した時にRxを経由してEventを発火し、TextViewの文字を変えています。

RxEventBus.kt
class RxEventBus {

    private val bus = PublishSubject.create<String>()

    fun sendEvent(event: String) = bus.onNext(event)

    fun toObservable(): Observable<String> = bus

    companion object {
        private var rxBus: RxEventBus? = null

        fun getInstance()=
            rxBus ?: synchronized(RxEventBus::class.java) { 
                RxEventBus().apply { rxBus = this } 
            }
    }
}
MainActivity.kt
class MainActivity : AppCompatActivity() {

    private lateinit var disposable: Disposable

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        //RxEventBusをsubscribeしてイベントを待ち受ける
        disposable = RxEventBus.getInstance().toObservable()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe {
                when (it) {
                    "EVENT_BUTTON_PUSH" -> {
                        findViewById<TextView>(R.id.text).text = "Button Pushed"
                    }
                }
            }

        findViewById<Button>(R.id.button).setOnClickListener {
            //Event発火
            RxEventBus.getInstance().sendEvent("EVENT_BUTTON_PUSH")
        }
    }
}

RxEventBusでイベント発火と待ち受け用のSubjectを用意して、
使いたい箇所でこれを使っています。

##CoroutinesのFlow
同じ処理をCoroutinesのFlowを用いたものが下記になります。

EventFlow.kt
class EventFlow {
    private val flow = MutableSharedFlow<String>()

    suspend fun sendEvent(event: String) = flow.emit(event)

    fun subscribe(scope: CoroutineScope, onConsume: (String) -> Unit) {
        flow.onEach {
            onConsume(it)
        }.launchIn(scope)
    }
}
EventFlow.kt
class MainActivity : AppCompatActivity() {

    private val eventFlow = EventFlow()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        //EventFlowをsubscribeしてイベントを待ち受ける
        eventFlow.subscribe(lifecycleScope) {
            when (it) {
                "EVENT_BUTTON_PUSH" -> {
                    findViewById<TextView>(R.id.text).text = "Button Pushed"
                }
            }
        }

        findViewById<Button>(R.id.button).setOnClickListener {
            runBlocking {
                //Event発火
                eventFlow.sendEvent("EVENT_BUTTON_PUSH")
            }
        }
    }
}

Flowは非同期処理やRxのような処理ができるCoroutinesのライブラリで、今回はホットストリームなFlowとして扱えるSharedFlowを使います。
これにより複数箇所から呼び出してもflow側の処理数が増えることはありません。
そして、subscribe()の引数で指定したscopeの中で、flowのイベントを待ち受けます。

##参考記事

3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?