Reactiveの勉強記事
Reactiveを理解したいが何もわからんので、
Reactive Programming With RxJava のサンプルコードが、裏でどのように動いているのかを記述していく。
2016年の本なので、Observable.create のような Deprecatedとなっている古いAPIなどを使用している。
RxJava
簡単な登場人物
シンプル
fun main(args: Array<String>) {
Observable.create<String> { s ->
s.onNext("one")
s.onCompleted()
}.subscribe { s -> println("VALUE -> $s") }
}
-
create {s -> s.OnNext("one") ...}- 引数のラムダ式は
OnSubscribeの実装として解釈される。
- 引数のラムダ式は
-
subscribe { s -> println("VALUE -> $s") }- 引数のラムダ式は
Action1という内部的に使用されるインターフェイスの実装となる、 -
subscribeの内部でAction1をwrapしてSubscriberのインスタンスを作成 - その後
OnSubscribeのcallを呼び出す。- これによって
createの引数のラムダ式が呼び出される。 -
callの引数は直前に作成したSubscriber
- これによって
- ラムダ式は引数の
Subscriberに対して、onNextを呼び出す。 -
SubscriberのonNextは、subscribeの引数のラムダ式をcallすることで呼び出す。
- 引数のラムダ式は
doOnNext
fun main(args: Array<String>) {
Observable.create<String> { s ->
s.onNext("one")
s.onCompleted()
}.doOnNext { i -> println("doOnNext $i") }
.subscribe { s -> println("VALUE -> $s") }
}
オレンジが doOnNext 関連によって作成される処理
proxy的な動きをしていることがわかる
map
fun main(args: Array<String>) {
Observable.create<String> { s ->
s.onNext("one")
s.onCompleted()
}.map { "hello $it" }
.subscribe { s -> println("VALUE -> $s") }
}
doOnNext とほぼ同じ動作
別ThreadからのPublisher
fun main(args: Array<String>) {
println(Thread.currentThread())
Observable.create<String> { s ->
Thread {
println(Thread.currentThread())
s.onNext("one")
s.onNext("two")
s.onCompleted()
}.start()
}.subscribe { s -> println("VALUE -> $s by ${Thread.currentThread()}") }
}
// 出力結果
Thread[main,5,main]
Thread[Thread-0,5,main]
VALUE -> one by Thread[Thread-0,5,main]
VALUE -> two by Thread[Thread-0,5,main]
VALUE -> three by Thread[Thread-0,5,main]
別スレッドでpublishされた場合は、そのスレッドでsubscribeされる。





