概要
JavaでReactive Programmingを行う際に良く使われるReactorですが、公式リファレンスは非常に良く出来ていると思うのですが、Reactorを始めたばかりだと少々厳しいように思います。この記事では、Reactorの公式リファレンスを読んで独学出来る事を目指して、Reactorの良く使う機能を中心に分かりづらい概念を疑似コードを用いて説明します。
Reactorの基本
Reactorの基本的な動きをReactor Coreのコードから主要な要素を抽出した疑似コードで説明します。
使用イメージ
MyMono
は、Reactor CoreのMonoの疑似コードです。機能は、Mono.create(Consumer<MonoSink>)
と、Mono.map(Function<I, O>)
の二つだけ提供します。
以下のテストで疑似コードのMyMonoを本物のMonoに置き換えても全く同じ動作になります
@Test
public void testMapping() {
var mono = MyMono.create(s -> {
s.success("good");
}).map(v -> {
return v + "_mod";
});
mono.subscribe(v -> {
Assertions.assertEquals("good_mod", v);
});
}
createでMonoを作成して、mapで、生成した文字列を変更しています。subscribe実行で生成した文字列が取得出来る事が確認出来ます。
疑似コード
package taka8.sample.reactor;
import java.util.function.Consumer;
import java.util.function.Function;
abstract public class MyMono<T> {
public interface Sink<T> {
void success(T value);
}
abstract public void subscribe(Consumer<? super T> subscriber);
public static <T> MyMono<T> create(Consumer<Sink<T>> template) {
return new _SupplierMono<>(template);
}
public <V> MyMono<V> map(Function<? super T, ? extends V> mapper) {
return new _MappingMono<>(this, mapper);
}
private static class _MappingMono<I, O> extends MyMono<O> {
private MyMono<I> _source;
private Function<? super I, ? extends O> _mapper;
public _MappingMono(MyMono<I> source, Function<? super I, ? extends O> mapper) {
super();
_source = source;
_mapper = mapper;
}
public void subscribe(Consumer<? super O> subscriber) {
_source.subscribe(i -> {
var o = _mapper.apply(i);
subscriber.accept(o);
});
}
}
private static class _SupplierMono<T> extends MyMono<T> {
private Consumer<Sink<T>> _template;
public _SupplierMono(Consumer<Sink<T>> template) {
super();
_template = template;
}
public void subscribe(Consumer<? super T> subscriber) {
_template.accept(new Sink<T>() {
@Override
public void success(T value) {
subscriber.accept(value);
}
});
}
}
}
map
メソッドは、_MappingMono
クラスのインスタンスを生成して返しています。これが、テストコードで、最終的に使われているMonoになっています。_MappingMono.subscribe
では、_sourceのsubscribeを実行し、実行結果を変換して呼び出しもとのsubscriber
に渡しています。
スタックトレース
以下のようにcreateでエラーを発生させます。
@Test
public void testException() {
var mono = MyMono.create(s -> {
throw new RuntimeException("Error.");
}).map(v -> {
return v + "_mod";
});
mono.subscribe(v -> {
});
}
スタックトレースは以下のようになります。
java.lang.RuntimeException: Error.
at taka8.sample.reactor.MyMonoTest.lambda$3(MyMonoTest.java:23)
at taka8.sample.reactor.MyMono$_SupplierMono.subscribe(MyMono.java:55)
at taka8.sample.reactor.MyMono$_MappingMono.subscribe(MyMono.java:37)
at taka8.sample.reactor.MyMonoTest.testException(MyMonoTest.java:27)
at
...
subscribe
呼び出しと同一のスタックでcreate
のラムダ式が実行されている事が分かります。
非同期実行
Mono.create
は、通信やデータベースアクセス等の時間がかかる処理をReactiveな形式に変換する際に使用する事が出来ます。こちらは、こんなイメージになります。
@Test
public void testAsync() throws Exception {
var executor = Executors.newSingleThreadExecutor();
var mono = MyMono.create(s -> {
executor.execute(() -> {
__logger.info("create CALLED");
s.success("good");
});
}).map(v -> {
__logger.info("map CALLED");
return v + "_mod";
});
__logger.info("subscribe CALLED");
var latch = new CountDownLatch(1);
mono.subscribe(v -> {
__logger.info("subscribe RECEIVE result=<{}>", v);
Assertions.assertEquals("good_mod", v);
latch.countDown();
});
latch.await(1000, TimeUnit.MILLISECONDS);
}
create
内のラムダ式は別スレッドで非同期に実行され、実行された結果がMonoSinkに渡されます。コード中のログの出力は以下のようになります。
17:50:59.946 [main] INFO taka8.sample.reactor.MyMonoTest - subscribe CALLED
17:50:59.949 [pool-1-thread-1] INFO taka8.sample.reactor.MyMonoTest - create CALLED
17:50:59.950 [pool-1-thread-1] INFO taka8.sample.reactor.MyMonoTest - map CALLED
17:50:59.951 [pool-1-thread-1] INFO taka8.sample.reactor.MyMonoTest - subscribe RECEIVE result=<good_mod>
subscribe
の呼び出しは、mainスレッドですが、ラムダ式の実行、変換、結果の出力はすべて非同期実行のスレッドで行われているのが分かります。
繰り返し実行
subscribe
が複数回呼ばれるとどうなるかを見てみます。
@Test
public void testRepeatSubscribe() throws Exception {
var count = new AtomicInteger(0);
var mono = MyMono.create(s -> {
var id = count.getAndIncrement();
__logger.info("create CALLED id=<{}>", id);
s.success(id);
}).map(v -> {
return "good_" + v;
});
for (int i = 0; i < 3; i++) {
var id = i;
mono.subscribe(v -> {
__logger.info("subscribe RECEIVE result=<{}>", v);
Assertions.assertEquals("good_" + id, v);
});
}
}
このコードを実行すると以下のログが出力されます。
07:51:39.541 [main] INFO taka8.sample.reactor.MyMonoTest - create CALLED id=<0>
07:51:39.545 [main] INFO taka8.sample.reactor.MyMonoTest - subscribe RECEIVE result=<good_0>
07:51:39.548 [main] INFO taka8.sample.reactor.MyMonoTest - create CALLED id=<1>
07:51:39.549 [main] INFO taka8.sample.reactor.MyMonoTest - subscribe RECEIVE result=<good_1>
07:51:39.549 [main] INFO taka8.sample.reactor.MyMonoTest - create CALLED id=<2>
07:51:39.549 [main] INFO taka8.sample.reactor.MyMonoTest - subscribe RECEIVE result=<good_2>
create
のラムダ式が複数回実行されています。疑似コードを見て頂ければ理解しやすいと思いますが、Mono
はテンプレートになっていて、subscribe
によって登録されているテンプレートが実行されるイメージになります。
エラー処理
全章の疑似コードはエラー処理に対応していませんでした。エラー処理(onErrorResumeのみ)に対応した疑似コードは以下になります。
最初のコードとの差分は、org.reactivestreams.Subscriber
でMono
間を連携させている所です。onErrorResume
で作成される_ErrorHandleMono
で、渡されたSubscriber
をラップしてエラー処理追加しています。
package taka8.sample.reactor;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract public class MyMonoWithErrorHandle<T> {
private static final Logger __logger = LoggerFactory.getLogger(MyMonoWithErrorHandle.class);
public interface Sink<T> {
void success(T value);
void error(Throwable error);
}
public static <T> MyMonoWithErrorHandle<T> create(Consumer<Sink<T>> template) {
return new _SupplierMono<>(template);
}
abstract protected void subscribe(Subscriber<? super T> subscriber);
public <V> MyMonoWithErrorHandle<V> map(Function<? super T, ? extends V> mapper) {
return new _MappingMono<>(this, mapper);
}
public <V> MyMonoWithErrorHandle<T> onErrorResume(
Function<? super Throwable, ? extends MyMonoWithErrorHandle<? extends T>> errorHandler) {
return new _ErrorHandleMono<>(this, errorHandler);
}
public void subscribe(Consumer<? super T> subscriber) {
this.subscribe(new Subscriber<T>() {
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(T t) {
subscriber.accept(t);
}
@Override
public void onError(Throwable t) {
__logger.warn("Error occurred.", t);
}
@Override
public void onComplete() {
}
});
}
private static class _MappingMono<I, O> extends MyMonoWithErrorHandle<O> {
private MyMonoWithErrorHandle<I> _source;
private Function<? super I, ? extends O> _mapper;
public _MappingMono(MyMonoWithErrorHandle<I> source, Function<? super I, ? extends O> mapper) {
super();
_source = source;
_mapper = mapper;
}
@Override
public void subscribe(Subscriber<? super O> subscriber) {
_source.subscribe(i -> {
var o = _mapper.apply(i);
subscriber.onNext(o);
});
}
}
private static class _SupplierMono<T> extends MyMonoWithErrorHandle<T> {
private Consumer<Sink<T>> _template;
public _SupplierMono(Consumer<Sink<T>> template) {
super();
_template = template;
}
@Override
public void subscribe(Subscriber<? super T> subscriber) {
_template.accept(new Sink<T>() {
@Override
public void success(T value) {
subscriber.onNext(value);
}
@Override
public void error(Throwable error) {
subscriber.onError(error);
}
});
}
}
private static class _ErrorHandleMono<T> extends MyMonoWithErrorHandle<T> {
private MyMonoWithErrorHandle<T> _source;
private Function<? super Throwable, ? extends MyMonoWithErrorHandle<? extends T>> _errorHandler;
public _ErrorHandleMono(MyMonoWithErrorHandle<T> source,
Function<? super Throwable, ? extends MyMonoWithErrorHandle<? extends T>> errorHandler) {
super();
_source = source;
_errorHandler = errorHandler;
}
@Override
public void subscribe(Subscriber<? super T> subscriber) {
_source.subscribe(new Subscriber<T>() {
@Override
public void onSubscribe(Subscription s) {
subscriber.onSubscribe(s);
}
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable t) {
var mono = _errorHandler.apply(t);
mono.subscribe(subscriber);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
});
}
}
}
動作イメージ
疑似コードを動作させるテストは以下になります。
エラーが処理されて正常系の結果が返ります。
@Test
public void testOnErrorResume() {
var mono = MyMonoWithErrorHandle.create(s -> {
s.error(new IllegalArgumentException("Invalid call."));
}).onErrorResume(th -> {
return MyMonoWithErrorHandle.create(s -> {
s.success("good");
});
});
mono.subscribe(v->{
Assertions.assertEquals("good", v);
});
}
まとめ
Mono
の基本的な動作の理解を疑似コードを作る事で深める事が出来たかと思います。上記から特に注意すべきMono
の挙動をまとめると以下になります(こちらは、Flux
にも適用出来ます`。
-
Mono
は作成した時点では何も動作しない。subscribe
の呼び出しで初めて定義された処理が動作する。 -
subscribe
は、subscribe
が呼び出されたスレッドで実行されるが、どこかの処理が別スレッドで実行された場合には、それ以降の処理もそのスレッドで実行される(最終的に、subscribeの結果出力もそのスレッドで実行される)。 -
subscribe
を繰り返し実行すると、同じ処理が何度も最初から実行される。 - 正常系とエラー系で個別に対応を設定出来る。