用語に関して
- Obsevableとは値の生成元であり、データストリーム自体。
- Observeする、とはObservableをSubscribeすること。
- Subscribeするとは、Observableに流れるデータストリームから値を取得すること。および、監視し続けること。
- PublishSubjectとは、Observableであり、Observableのデータストリームに新規値を流すことができるもの
Observe系
いずれもObservableの生成源。データストリームを表していて、Subscribeによって値を受け取ることができる
- Observable
0個または複数のアイテムを発行し、終了することがあるデータストリームを表します。オブザーバーは、Observableにサブスクライブして、Observableによって発行された新しいアイテムの通知を受け取ることができます。
// subscribeされると1秒ごとに数値を発行するObservable
val observable = Observable.interval(1, TimeUnit.SECONDS)
.take(10)
.map { it + 1 }
observable.subscribeOn(Schedulers.io()) // subscribeはIOスレッドで行う
.observeOn(Schedulers.single()) // observeを別のシングルスレッドで行う
.subscribe { value -> println("Received value: $value") } // Observableの購読、値が発行されると読み取る。
delay(15000)
- Maybe
Observableに似ていますが、1つのアイテムまたはアイテムを発行しない場合があります。また、通常またはエラーで終了することができます。
val maybeSuccess: Maybe<Int> = Maybe.just(5)
val maybeError: Maybe<Int> = Maybe.create { emitter ->
emitter.onError(Exception("error"))
}
val maybeEmpty: Maybe<Int> = Maybe.create { emitter ->
}
val maybeObserver: MaybeObserver<Int> = object : MaybeObserver<Int> {
override fun onSubscribe(d: Disposable) {
// Maybeのサブスクライブが完了した際に呼び出される
println("onSubscribe")
}
override fun onSuccess(t: Int) {
// Maybeが1つのアイテムを発行した場合に呼び出される
println("onSuccess: $t")
}
override fun onError(e: Throwable) {
// Maybeがエラーで終了した場合に呼び出される
println("onError: ${e.message}")
}
override fun onComplete() {
// Maybeがアイテムを発行しなかった場合に呼び出される
println("onComplete")
}
}
/*
onSubscribe
onSuccess: 5
*/
maybeSuccess.subscribe(maybeObserver)
/*
onSubscribe
onError: error
*/
maybeError.subscribe(maybeObserver)
(Haskellとかにもあるよね)
- Single
Maybeに似ていますが、常に1つのアイテムを発行し、アイテムが発行されない場合または1つ以上のアイテムが発行された場合はエラーを発行します。
// Singleオブジェクトの生成
val single: Single<String> = Single.just("Hello World")
// subscribeメソッドによるSingleの購読
single
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(
{ result -> println(result) }, // onSuccess
{ error -> println(error) } // onError
)
- Completable
操作が正常に完了したか、エラーが発生したかを示すイベントのストリームを表します。データアイテムは発行しません。
// Completableオブジェクトの生成
val completable: Completable = Completable.fromAction {
// 何らかの処理を行う
// 処理が正常に完了する場合
// 処理が例外をスローする場合
}
// subscribeメソッドによるCompletableの購読
completable
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(
{ println("Complete") }, // onComplete
{ error -> println(error) } // onError
)
Publish 系
Observableのサブクラスで、自信が保持しているデータストリームに対して新しい要素を追加することができます。データストリームの中間に挿入され、Observableが発行するデータストリームを別のコンポーネントに渡すことができます。
- PublishSubject
PublishSubjectは、サブスクライブされる前に発行された値は受け取れませんが、サブスクライブされた後に発行された値のみを受け取ることができるSubjectです。サブスクライブする前に発行された値は破棄され、新しい値からの通知だけを受け取ります。
val subject = PublishSubject.create<String>()
subject.onNext("First") // サブスクライブ前に発行された値は破棄される
subject.subscribe { value -> Log.d("TAG", "Received value: $value") }
subject.onNext("Second")
- BehaviorSubject
BehaviorSubjectは、サブスクライブされる前に発行された最後の値を受け取ることができるSubjectです。つまり、最新の値をキャッシュしており、サブスクライブ時に最後に発行された値を受け取ることができます。
val subject = BehaviorSubject.createDefault("Initial")
subject.subscribe { value -> Log.d("TAG", "Received value: $value") }
subject.onNext("Second")
// Initial も Secondもsubscribeされる
- ReplaySubject
ReplaySubjectは、発行されたすべての値をキャッシュしており、サブスクライブ時に過去に発行されたすべての値を受け取ることができます。
val subject = ReplaySubject.create<String>()
subject.onNext("First")
subject.onNext("Second")
subject.subscribe { value -> Log.d("TAG", "Received value: $value") }
subject.onNext("Third")
// この場合、"First"、"Second"、"Third"のすべての値がログに出力されます。
- UnicastSubject
UnicastSubjectは、1つのサブスクライバーに対してのみ値を発行できるSubjectです。複数のサブスクライバーを持たせることはできません。
val subject = UnicastSubject.create<String>()
subject.onNext("First")
subject.subscribe { value -> Log.d("TAG", "Received value: $value") } //
以上の情報から以下を読み解く
MVI KotlinのReaktiveライブラリ サンプル
https://github.com/badoo/Reaktive/tree/master
internal interface Feature<in Wish : Any, out State : Any> : Consumer<Wish>, Disposable {
val state: BehaviorObservable<State>
}
@Suppress("FunctionNaming") // Factory function
internal fun <Event : Any, State : Any, Msg : Any> Feature(
initialState: State,
actor: (Event, State) -> Observable<Msg>,
reducer: State.(Msg) -> State,
): Feature<Event, State> =
object : Feature<Event, State>, DisposableScope by DisposableScope() {
private val wishes = PublishSubject<Event>()
private val _state = BehaviorSubject(initialState).scope { it.onComplete() }
override val state: BehaviorObservable<State> = _state
init {
wishes
.flatMap { event -> actor(event, _state.value) }
.map { msg -> _state.value.reducer(msg) }
.subscribeScoped(onNext = _state::onNext)
}
override fun onNext(value: Event) {
wishes.onNext(value)
}
}
そと(UI)からのEventによって、Stateを変更するための機構。
まず、Eventの入口だが、WishとEventがおなじ型なので、FeatureFactoryのonNext(value: Event)
が外からこの関数を使用する入口となる。
呼ばれ方↓
val feature = Feature(state = State(), ...
feature(Event.HogeEvent) // featureインスタンスを評価することでonNextが発火する。
fun onNext(value: Event)
で wishes.onNext(value)
が呼ばれる。
wishesはPublishSubjectなので、オブサーバへ値が出力される。
オブサーバは以下
init {
wishes
.flatMap { event -> actor(event, _state.value) }
.map { msg -> _state.value.reducer(msg) }
.subscribeScoped(onNext = _state::onNext)
}
うけとったeventをactorでObservableにマップ
Msgは新規Stateを作るための値群のこと。
後続のreducerでBehaviorSubjectである_stateとmsgから新規のStateを作成する。
最後のsubscribeScopedにて、その新規StateのonNextを呼び出すことでオブサーバへ新規Stateが出力される。
このStateのオブサーバは以下
override val state: BehaviorObservable<State> = _state // このオブサーバブルから
↓
// uiクラス等の下をオブサーバとして登録する
feature.state.subscribeScoped { state ->
// 新規StateがくるのでUI更新などを行う。
}