いまさらながら Reactor に入門しようと思ってやってみた。
Part01Flux
public class Part01Flux {
Flux<String> emptyFlux() {
return Flux.empty();
}
Flux<String> fooBarFluxFromValues() {
return Flux.just("foo", "bar");
}
Flux<String> fooBarFluxFromList() {
return Flux.fromIterable(Arrays.asList("foo", "bar"));
}
Flux<String> errorFlux() {
return Flux.error(new IllegalStateException());
}
Flux<Long> counter() {
return Flux.interval(Duration.ofMillis(100)).take(10);
}
}
Flux.empty() は Flux.just() でもいい。
最後のは zipwith を利用して以下のようにすることもできる。
Flux<Long> counter() {
return Flux.interval(Duration.ofMillis(100))
.zipWith(Flux.fromStream(LongStream.range(0, 10).boxed()))
.map(Tuple2::getT2);
}
Part02Mono
public class Part02Mono {
Mono<String> emptyMono() {
return Mono.empty();
}
Mono<String> monoWithNoSignal() {
return Mono.never();
}
Mono<String> fooMono() {
return Mono.just("foo");
}
Mono<String> errorMono() {
return Mono.error(new IllegalStateException());
}
}
Mono.never() の使いどころがわからん。
Part03StepVerifier
public class Part03StepVerifier {
void expectFooBarComplete(Flux<String> flux) {
StepVerifier.create(flux)
.expectNext("foo", "bar")
.verifyComplete();
}
void expectFooBarError(Flux<String> flux) {
StepVerifier.create(flux)
.expectNext("foo", "bar")
.expectError(RuntimeException.class)
.verify();
}
void expectSkylerJesseComplete(Flux<User> flux) {
StepVerifier.create(flux)
.assertNext(u -> Assertions.assertThat(u.getUsername()).isEqualTo("swhite"))
.assertNext(u -> Assertions.assertThat(u.getUsername()).isEqualTo("jpinkman"))
.verifyComplete();
}
void expect10Elements(Flux<Long> flux) {
Duration d = StepVerifier.create(flux)
.expectNextCount(10)
.verifyComplete();
LoggerFactory.getLogger(Part03StepVerifier.class).info(d.toString());
}
void expect3600Elements(Supplier<Flux<Long>> supplier) {
Duration d = StepVerifier.withVirtualTime(supplier)
.thenAwait(Duration.ofSeconds(3600))
.expectNextCount(3600)
.verifyComplete();
LoggerFactory.getLogger(Part03StepVerifier.class).info(d.toString());
}
private void fail() {
throw new AssertionError("workshop not implemented");
}
}
100 時間かかるようなスケジューリングになっている Flux だとしても withVirtualTime を利用することで仮想的に時間を進めることで数秒でテストが実行できるらしい。
https://mike-neck.hatenadiary.com/entry/2018/02/21/080000
Part04Transform
public class Part04Transform {
Mono<User> capitalizeOne(Mono<User> mono) {
return mono.map(this::capitalizeUser);
}
Flux<User> capitalizeMany(Flux<User> flux) {
return flux.map(this::capitalizeUser);
}
private User capitalizeUser(User user) {
return new User(user.getUsername().toUpperCase(), user.getFirstname().toUpperCase(), user.getLastname().toUpperCase());
}
Flux<User> asyncCapitalizeMany(Flux<User> flux) {
return flux.flatMap(this::asyncCapitalizeUser);
}
Mono<User> asyncCapitalizeUser(User u) {
return Mono.just(new User(u.getUsername().toUpperCase(), u.getFirstname().toUpperCase(), u.getLastname().toUpperCase()));
}
}
flatMap と map の違いは引数で渡す Function の戻り値が Flux や Mono か、そうでないか。
faltMap の場合は非同期に処理されるので、もとの順序とは異なる可能性がある。
Flux<String> flat() {
Flux<String> flux = Flux.just("hoge", "fuga", "piyo");
return flux.flatMap(s -> Flux.interval(Duration.ofMillis(100)).take(3).map(l -> s + l));
}
上記のような処理にすると最終的に得られる出力の順序は以下のようになる。
hoge0
fuga0
piyo0
hoge1
fuga1
piyo1
hoge2
fuga2
piyo2
flatMapSequentail を利用すると、もとの順序が保持される。
Flux<String> flat() {
Flux<String> flux = Flux.just("hoge", "fuga", "piyo");
return flux.flatMapSequential(s -> Flux.interval(Duration.ofMillis(100)).take(3).map(l -> s + l));
}
hoge0
hoge1
hoge2
fuga0
fuga1
fuga2
piyo0
piyo1
piyo2
Part05Merge
public class Part05Merge {
Flux<User> mergeFluxWithInterleave(Flux<User> flux1, Flux<User> flux2) {
return flux1.mergeWith(flux2);
}
Flux<User> mergeFluxWithNoInterleave(Flux<User> flux1, Flux<User> flux2) {
return flux1.concatWith(flux2);
}
Flux<User> createFluxFromMultipleMono(Mono<User> mono1, Mono<User> mono2) {
return mono1.concatWith(mono2);
}
}
mergeWith は到着順に並べてマージする。concatWith はもとの順序を維持してマージする。(引数で与えられたものが後ろになる)
上記のテストケースでは flux1 が遅れてやってくるようになっている。
mergeWith では flux2 -> flux1 の順に、concatWith では flux1 -> flux2 の順になる。
Part06Request
public class Part06Request {
ReactiveRepository<User> repository = new ReactiveUserRepository();
StepVerifier requestAllExpectFour(Flux<User> flux) {
return StepVerifier.create(flux)
.expectNextCount(4)
.expectComplete();
}
StepVerifier requestOneExpectSkylerThenRequestOneExpectJesse(Flux<User> flux) {
return StepVerifier.create(flux)
.thenRequest(1)
.expectNext(User.SKYLER)
.thenRequest(1)
.expectNext(User.JESSE)
.thenCancel();
}
Flux<User> fluxWithLog() {
return repository.findAll().log();
}
Flux<User> fluxWithDoOnPrintln() {
Logger logger = LoggerFactory.getLogger(Part06Request.class);
return repository.findAll()
.doOnSubscribe(s -> logger.info("Starring:"))
.doOnNext(u -> logger.info("{} {}", u.getFirstname(), u.getLastname()))
.doOnComplete(() -> logger.info("The end!"));
}
}
doOn~~ というメソッドで Flux や Mono の特定の動作に対して処理を定義できる。
イベントリスナーみたいなものだと思っている。
Part07Errors
public class Part07Errors {
Mono<User> betterCallSaulForBogusMono(Mono<User> mono) {
return mono.onErrorReturn(User.SAUL);
}
Flux<User> betterCallSaulAndJesseForBogusFlux(Flux<User> flux) {
return flux.onErrorResume(th -> Flux.just(User.SAUL, User.JESSE));
}
Flux<User> capitalizeMany(Flux<User> flux) {
return flux.map(u -> {
try {
return capitalizeUser(u);
} catch (GetOutOfHereException e) {
throw Exceptions.propagate(e);
}
});
}
User capitalizeUser(User user) throws GetOutOfHereException {
if (user.equals(User.SAUL)) {
throw new GetOutOfHereException();
}
return new User(user.getUsername(), user.getFirstname(), user.getLastname());
}
protected final class GetOutOfHereException extends Exception {
private static final long serialVersionUID = 0L;
}
}
onErrorReturn だとエラー発生時に静的なデータを返却できる。
しかし、Flux の場合も含めることができるデータが 1 つしかない。
onErrorResume だとフォールバック処理を定義できるので、そこで複数の値が定義された Fluxを生成できる。
onErrorMap というのもあるが、これは発生した Throwable を別の Throwable に変換するための処理を定義するため、今回のケースでは利用できない。
Part08OtherOperations
public class Part08OtherOperations {
Flux<User> userFluxFromStringFlux(Flux<String> usernameFlux, Flux<String> firstnameFlux, Flux<String> lastnameFlux) {
return Flux.zip(usernameFlux, firstnameFlux, lastnameFlux).map(t -> new User(t.getT1(), t.getT2(), t.getT3()));
}
Mono<User> useFastestMono(Mono<User> mono1, Mono<User> mono2) {
return Mono.first(mono1, mono2);
}
Flux<User> useFastestFlux(Flux<User> flux1, Flux<User> flux2) {
return Flux.first(flux1, flux2);
}
Mono<Void> fluxCompletion(Flux<User> flux) {
return flux.then();
}
Mono<User> nullAwareUserToMono(User user) {
return Mono.justOrEmpty(user);
}
Mono<User> emptyToSkyler(Mono<User> mono) {
return mono.switchIfEmpty(Mono.just(User.SKYLER));
}
}
Part09Adapt
public class Part09Adapt {
Flowable<User> fromFluxToFlowable(Flux<User> flux) {
return Flowable.fromPublisher(flux);
}
Flux<User> fromFlowableToFlux(Flowable<User> flowable) {
return Flux.from(flowable);
}
Observable<User> fromFluxToObservable(Flux<User> flux) {
return Observable.fromPublisher(flux);
}
Flux<User> fromObservableToFlux(Observable<User> observable) {
return Flux.from(observable.toFlowable(BackpressureStrategy.MISSING));
}
Single<User> fromMonoToSingle(Mono<User> mono) {
return Single.fromPublisher(mono);
}
Mono<User> fromSingleToMono(Single<User> single) {
return Mono.from(single.toFlowable());
}
CompletableFuture<User> fromMonoToCompletableFuture(Mono<User> mono) {
return mono.toFuture();
}
Mono<User> fromCompletableFutureToMono(CompletableFuture<User> future) {
return Mono.fromFuture(future);
}
}
Reactor にも RxJava にも Publisher を受け取って生成するメソッドがあるのでそれを利用すればいい。
ただ、Observable と Single は Publisher の実装クラスではないので、Flowable に変換する必要がある。
Observable を Flowable に変換する場合は、Backpressure の設定を指定しないといけない。MISSING でいいのかはわからん。
Part10ReactiveToBlocking
public class Part10ReactiveToBlocking {
User monoToValue(Mono<User> mono) {
return mono.block();
}
Iterable<User> fluxToValues(Flux<User> flux) {
return flux.toIterable();
}
}
他にも toStream とかでもブロッキングできる。
Part11BlockingToReactive
public class Part11BlockingToReactive {
Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
return Flux.defer(() -> Flux.fromIterable(repository.findAll())).subscribeOn(Schedulers.elastic());
}
Mono<Void> fluxToBlockingRepository(Flux<User> flux, BlockingRepository<User> repository) {
return flux.publishOn(Schedulers.elastic())
.doOnNext(u -> repository.save(u))
.then();
}
}
defer を使うことによって、この Flux が subscribe されるまで処理を遅延させることができる。
defer を利用せずに以下のようにしてしまうと、この処理の時点で repository.findAll() が実行される。
Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
return Flux.fromIterable(repository.findAll()).subscribeOn(Schedulers.elastic());
}
また、subscribeOn と publishOn で subscribe 時、publish 時に使用するスレッドの戦略を指定できるっぽい。
https://kazuhira-r.hatenablog.com/entry/20180107/1515327957
さいごに
なんとなく API の使い方は分かったような気がするが、実際の使いどころがいまいちわかっていない・・・。