Reactiveの勉強記事
Reactiveを理解したいが何もわからんので、
Reactive Programming With RxJava のサンプルコードが、裏でどのように動いているのかを記述していく。
2016年の本なので、Observable.create
のような Deprecatedとなっている古いAPIなどを使用している。
RxJava
簡単な登場人物
シンプル
fun main(args: Array<String>) {
Observable.create<String> { s ->
s.onNext("one")
s.onCompleted()
}.subscribe { s -> println("VALUE -> $s") }
}
-
create {s -> s.OnNext("one") ...}
- 引数のラムダ式は
OnSubscribe
の実装として解釈される。
- 引数のラムダ式は
-
subscribe { s -> println("VALUE -> $s") }
- 引数のラムダ式は
Action1
という内部的に使用されるインターフェイスの実装となる、 -
subscribe
の内部でAction1
をwrapしてSubscriber
のインスタンスを作成 - その後
OnSubscribe
のcall
を呼び出す。- これによって
create
の引数のラムダ式が呼び出される。 -
call
の引数は直前に作成したSubscriber
- これによって
- ラムダ式は引数の
Subscriber
に対して、onNext
を呼び出す。 -
Subscriber
のonNext
は、subscribe
の引数のラムダ式をcallすることで呼び出す。
- 引数のラムダ式は
doOnNext
fun main(args: Array<String>) {
Observable.create<String> { s ->
s.onNext("one")
s.onCompleted()
}.doOnNext { i -> println("doOnNext $i") }
.subscribe { s -> println("VALUE -> $s") }
}
オレンジが doOnNext
関連によって作成される処理
proxy的な動きをしていることがわかる
map
fun main(args: Array<String>) {
Observable.create<String> { s ->
s.onNext("one")
s.onCompleted()
}.map { "hello $it" }
.subscribe { s -> println("VALUE -> $s") }
}
doOnNext
とほぼ同じ動作
別ThreadからのPublisher
fun main(args: Array<String>) {
println(Thread.currentThread())
Observable.create<String> { s ->
Thread {
println(Thread.currentThread())
s.onNext("one")
s.onNext("two")
s.onCompleted()
}.start()
}.subscribe { s -> println("VALUE -> $s by ${Thread.currentThread()}") }
}
// 出力結果
Thread[main,5,main]
Thread[Thread-0,5,main]
VALUE -> one by Thread[Thread-0,5,main]
VALUE -> two by Thread[Thread-0,5,main]
VALUE -> three by Thread[Thread-0,5,main]
別スレッドでpublishされた場合は、そのスレッドでsubscribeされる。