概要
この前 O'Reilly 社が無料で配っていた RxJava の電子書籍 "RxJava for Android App Development" を読んでいて、subscribeOn と observeOn の指定がどこに効くのかがよくわからなくなってきたので、別の Reactive Extensions である Reactor Core で確認してみました。Reactor Core では subscribeOn は同じ名前のメソッドがあり、observeOn に当たるメソッドには publishOn があります。
実行環境
Java | SE 1.8.0_102 |
---|---|
OS | Windows 10 |
Eclipse | Mars 4.5.2 |
Reactor Core | 3.0.2.RELEASE |
検証用コード
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class Verification {
public static void main(final String[] args) throws InterruptedException {
Mono.create(emitter -> {System.out.println("emitter " + Thread.currentThread().getName());emitter.success("");})
.publishOn(Schedulers.newSingle("pub"))
.subscribeOn(Schedulers.newSingle("sub"))
.subscribe(str -> System.out.println("subscribe " + Thread.currentThread().getName()));
while (true) {
Thread.sleep(1000L);
}
}
}
実行結果
下記の通りでした。
emitter sub-1
subscribe pub-2
publishOn と subscribeOn の位置を逆にしても同じ結果でした。publishOn が subscribe 内の実行スレッドを、subscribeOn が Mono および Flux の emitter の実行スレッドを、それぞれ指定するようでした。
次に、片方だけを指定した場合の実行スレッドを確認してみます。
emitter sub-1
subscrive sub-1
emitter main
subscrive pub-1
これを見ると、とりあえず subscribeOn で指定しておけば、別スレッドで動かすことができるようです。
追記
operator を使った場合について調べないと役に立たないと思ったので調べ直しました。今回は filter と map で調べます。
operator の後で subscribeOn と publishOn を実行
Mono.create(emitter -> {print("emitter ");emitter.success("");})
.filter(str -> {print("filter "); return true;})
.map(str -> {print("map "); return str;})
.subscribeOn(Schedulers.newSingle("sub"))
.publishOn(Schedulers.newSingle("pub"))
.subscribe(str -> print("subscrive "));
emitter sub-1
filter sub-1
map sub-1
subscrive pub-2
subscribeOn までの operator は subscribeOn で指定したスレッドで実行され、subscribe での処理は publishOn のスレッドで動作します。
subscribeOn と publishOn を実行してから operator を使う
Mono.create(emitter -> {print("emitter ");emitter.success("");})
.subscribeOn(Schedulers.newSingle("sub"))
.publishOn(Schedulers.newSingle("pub"))
.filter(str -> {print("filter "); return true;})
.map(str -> {print("map "); return str;})
.subscribe(str -> print("subscrive "));
emitter sub-1
filter pub-2
map pub-2
subscrive pub-2
emitter の処理だけは subscribeOn で指定したスレッドで実行され、 publishOn 以下の処理は publishOn で指定したスレッドで動作します。
operator の後で subscribeOn を実行
Mono.create(emitter -> {print("emitter ");emitter.success("");})
.filter(str -> {print("filter "); return true;})
.map(str -> {print("map "); return str;})
.subscribeOn(Schedulers.newSingle("sub"))
.subscribe(str -> print("subscrive "));
emitter sub-1
filter sub-1
map sub-1
subscrive sub-1
すべての処理が subscribeOn のスレッドで動作します。
operator の後で publishOn を実行
Mono.create(emitter -> {print("emitter ");emitter.success("");})
.filter(str -> {print("filter "); return true;})
.map(str -> {print("map "); return str;})
.publishOn(Schedulers.newSingle("pub"))
.subscribe(str -> print("subscrive "));
emitter main
filter main
map main
subscrive pub-1
subscribe での処理は publishOn のスレッドで動作します。それ以外は main スレッドで動きます
publishOn を実行してから operator を使う
Mono.create(emitter -> {print("emitter ");emitter.success("");})
.publishOn(Schedulers.newSingle("pub"))
.filter(str -> {print("filter "); return true;})
.map(str -> {print("map "); return str;})
.subscribe(str -> print("subscrive "));
emitter main
filter pub-1
map pub-1
subscrive pub-1
emitter の処理は main スレッドで、 publishOn 以降の処理は publishOn のスレッドで動作します。
参考
つい酔っぱらった勢いでこんな記事を書いてしまいましたが、この手の話はすでに他の方が Qiita でも記事を書いていらっしゃるので、そちらをご参照ください。