いまさらながら Reactor に入門しようと思ってやってみた。
Part01Flux
public class Part01Flux {
Flux<String> emptyFlux() {
return Flux.empty();
}
Flux<String> fooBarFluxFromValues() {
return Flux.just("foo", "bar");
}
Flux<String> fooBarFluxFromList() {
return Flux.fromIterable(Arrays.asList("foo", "bar"));
}
Flux<String> errorFlux() {
return Flux.error(new IllegalStateException());
}
Flux<Long> counter() {
return Flux.interval(Duration.ofMillis(100)).take(10);
}
}
Flux.empty()
は Flux.just()
でもいい。
最後のは zipwith
を利用して以下のようにすることもできる。
Flux<Long> counter() {
return Flux.interval(Duration.ofMillis(100))
.zipWith(Flux.fromStream(LongStream.range(0, 10).boxed()))
.map(Tuple2::getT2);
}
Part02Mono
public class Part02Mono {
Mono<String> emptyMono() {
return Mono.empty();
}
Mono<String> monoWithNoSignal() {
return Mono.never();
}
Mono<String> fooMono() {
return Mono.just("foo");
}
Mono<String> errorMono() {
return Mono.error(new IllegalStateException());
}
}
Mono.never()
の使いどころがわからん。
Part03StepVerifier
public class Part03StepVerifier {
void expectFooBarComplete(Flux<String> flux) {
StepVerifier.create(flux)
.expectNext("foo", "bar")
.verifyComplete();
}
void expectFooBarError(Flux<String> flux) {
StepVerifier.create(flux)
.expectNext("foo", "bar")
.expectError(RuntimeException.class)
.verify();
}
void expectSkylerJesseComplete(Flux<User> flux) {
StepVerifier.create(flux)
.assertNext(u -> Assertions.assertThat(u.getUsername()).isEqualTo("swhite"))
.assertNext(u -> Assertions.assertThat(u.getUsername()).isEqualTo("jpinkman"))
.verifyComplete();
}
void expect10Elements(Flux<Long> flux) {
Duration d = StepVerifier.create(flux)
.expectNextCount(10)
.verifyComplete();
LoggerFactory.getLogger(Part03StepVerifier.class).info(d.toString());
}
void expect3600Elements(Supplier<Flux<Long>> supplier) {
Duration d = StepVerifier.withVirtualTime(supplier)
.thenAwait(Duration.ofSeconds(3600))
.expectNextCount(3600)
.verifyComplete();
LoggerFactory.getLogger(Part03StepVerifier.class).info(d.toString());
}
private void fail() {
throw new AssertionError("workshop not implemented");
}
}
100 時間かかるようなスケジューリングになっている Flux
だとしても withVirtualTime
を利用することで仮想的に時間を進めることで数秒でテストが実行できるらしい。
https://mike-neck.hatenadiary.com/entry/2018/02/21/080000
Part04Transform
public class Part04Transform {
Mono<User> capitalizeOne(Mono<User> mono) {
return mono.map(this::capitalizeUser);
}
Flux<User> capitalizeMany(Flux<User> flux) {
return flux.map(this::capitalizeUser);
}
private User capitalizeUser(User user) {
return new User(user.getUsername().toUpperCase(), user.getFirstname().toUpperCase(), user.getLastname().toUpperCase());
}
Flux<User> asyncCapitalizeMany(Flux<User> flux) {
return flux.flatMap(this::asyncCapitalizeUser);
}
Mono<User> asyncCapitalizeUser(User u) {
return Mono.just(new User(u.getUsername().toUpperCase(), u.getFirstname().toUpperCase(), u.getLastname().toUpperCase()));
}
}
flatMap
と map
の違いは引数で渡す Function
の戻り値が Flux
や Mono
か、そうでないか。
faltMap
の場合は非同期に処理されるので、もとの順序とは異なる可能性がある。
Flux<String> flat() {
Flux<String> flux = Flux.just("hoge", "fuga", "piyo");
return flux.flatMap(s -> Flux.interval(Duration.ofMillis(100)).take(3).map(l -> s + l));
}
上記のような処理にすると最終的に得られる出力の順序は以下のようになる。
hoge0
fuga0
piyo0
hoge1
fuga1
piyo1
hoge2
fuga2
piyo2
flatMapSequentail
を利用すると、もとの順序が保持される。
Flux<String> flat() {
Flux<String> flux = Flux.just("hoge", "fuga", "piyo");
return flux.flatMapSequential(s -> Flux.interval(Duration.ofMillis(100)).take(3).map(l -> s + l));
}
hoge0
hoge1
hoge2
fuga0
fuga1
fuga2
piyo0
piyo1
piyo2
Part05Merge
public class Part05Merge {
Flux<User> mergeFluxWithInterleave(Flux<User> flux1, Flux<User> flux2) {
return flux1.mergeWith(flux2);
}
Flux<User> mergeFluxWithNoInterleave(Flux<User> flux1, Flux<User> flux2) {
return flux1.concatWith(flux2);
}
Flux<User> createFluxFromMultipleMono(Mono<User> mono1, Mono<User> mono2) {
return mono1.concatWith(mono2);
}
}
mergeWith
は到着順に並べてマージする。concatWith
はもとの順序を維持してマージする。(引数で与えられたものが後ろになる)
上記のテストケースでは flux1
が遅れてやってくるようになっている。
mergeWith
では flux2
-> flux1
の順に、concatWith
では flux1
-> flux2
の順になる。
Part06Request
public class Part06Request {
ReactiveRepository<User> repository = new ReactiveUserRepository();
StepVerifier requestAllExpectFour(Flux<User> flux) {
return StepVerifier.create(flux)
.expectNextCount(4)
.expectComplete();
}
StepVerifier requestOneExpectSkylerThenRequestOneExpectJesse(Flux<User> flux) {
return StepVerifier.create(flux)
.thenRequest(1)
.expectNext(User.SKYLER)
.thenRequest(1)
.expectNext(User.JESSE)
.thenCancel();
}
Flux<User> fluxWithLog() {
return repository.findAll().log();
}
Flux<User> fluxWithDoOnPrintln() {
Logger logger = LoggerFactory.getLogger(Part06Request.class);
return repository.findAll()
.doOnSubscribe(s -> logger.info("Starring:"))
.doOnNext(u -> logger.info("{} {}", u.getFirstname(), u.getLastname()))
.doOnComplete(() -> logger.info("The end!"));
}
}
doOn~~
というメソッドで Flux
や Mono
の特定の動作に対して処理を定義できる。
イベントリスナーみたいなものだと思っている。
Part07Errors
public class Part07Errors {
Mono<User> betterCallSaulForBogusMono(Mono<User> mono) {
return mono.onErrorReturn(User.SAUL);
}
Flux<User> betterCallSaulAndJesseForBogusFlux(Flux<User> flux) {
return flux.onErrorResume(th -> Flux.just(User.SAUL, User.JESSE));
}
Flux<User> capitalizeMany(Flux<User> flux) {
return flux.map(u -> {
try {
return capitalizeUser(u);
} catch (GetOutOfHereException e) {
throw Exceptions.propagate(e);
}
});
}
User capitalizeUser(User user) throws GetOutOfHereException {
if (user.equals(User.SAUL)) {
throw new GetOutOfHereException();
}
return new User(user.getUsername(), user.getFirstname(), user.getLastname());
}
protected final class GetOutOfHereException extends Exception {
private static final long serialVersionUID = 0L;
}
}
onErrorReturn
だとエラー発生時に静的なデータを返却できる。
しかし、Flux
の場合も含めることができるデータが 1 つしかない。
onErrorResume
だとフォールバック処理を定義できるので、そこで複数の値が定義された Flux
を生成できる。
onErrorMap
というのもあるが、これは発生した Throwable
を別の Throwable
に変換するための処理を定義するため、今回のケースでは利用できない。
Part08OtherOperations
public class Part08OtherOperations {
Flux<User> userFluxFromStringFlux(Flux<String> usernameFlux, Flux<String> firstnameFlux, Flux<String> lastnameFlux) {
return Flux.zip(usernameFlux, firstnameFlux, lastnameFlux).map(t -> new User(t.getT1(), t.getT2(), t.getT3()));
}
Mono<User> useFastestMono(Mono<User> mono1, Mono<User> mono2) {
return Mono.first(mono1, mono2);
}
Flux<User> useFastestFlux(Flux<User> flux1, Flux<User> flux2) {
return Flux.first(flux1, flux2);
}
Mono<Void> fluxCompletion(Flux<User> flux) {
return flux.then();
}
Mono<User> nullAwareUserToMono(User user) {
return Mono.justOrEmpty(user);
}
Mono<User> emptyToSkyler(Mono<User> mono) {
return mono.switchIfEmpty(Mono.just(User.SKYLER));
}
}
Part09Adapt
public class Part09Adapt {
Flowable<User> fromFluxToFlowable(Flux<User> flux) {
return Flowable.fromPublisher(flux);
}
Flux<User> fromFlowableToFlux(Flowable<User> flowable) {
return Flux.from(flowable);
}
Observable<User> fromFluxToObservable(Flux<User> flux) {
return Observable.fromPublisher(flux);
}
Flux<User> fromObservableToFlux(Observable<User> observable) {
return Flux.from(observable.toFlowable(BackpressureStrategy.MISSING));
}
Single<User> fromMonoToSingle(Mono<User> mono) {
return Single.fromPublisher(mono);
}
Mono<User> fromSingleToMono(Single<User> single) {
return Mono.from(single.toFlowable());
}
CompletableFuture<User> fromMonoToCompletableFuture(Mono<User> mono) {
return mono.toFuture();
}
Mono<User> fromCompletableFutureToMono(CompletableFuture<User> future) {
return Mono.fromFuture(future);
}
}
Reactor にも RxJava にも Publisher
を受け取って生成するメソッドがあるのでそれを利用すればいい。
ただ、Observable
と Single
は Publisher
の実装クラスではないので、Flowable
に変換する必要がある。
Observable
を Flowable
に変換する場合は、Backpressure の設定を指定しないといけない。MISSING
でいいのかはわからん。
Part10ReactiveToBlocking
public class Part10ReactiveToBlocking {
User monoToValue(Mono<User> mono) {
return mono.block();
}
Iterable<User> fluxToValues(Flux<User> flux) {
return flux.toIterable();
}
}
他にも toStream
とかでもブロッキングできる。
Part11BlockingToReactive
public class Part11BlockingToReactive {
Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
return Flux.defer(() -> Flux.fromIterable(repository.findAll())).subscribeOn(Schedulers.elastic());
}
Mono<Void> fluxToBlockingRepository(Flux<User> flux, BlockingRepository<User> repository) {
return flux.publishOn(Schedulers.elastic())
.doOnNext(u -> repository.save(u))
.then();
}
}
defer
を使うことによって、この Flux
が subscribe
されるまで処理を遅延させることができる。
defer
を利用せずに以下のようにしてしまうと、この処理の時点で repository.findAll()
が実行される。
Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
return Flux.fromIterable(repository.findAll()).subscribeOn(Schedulers.elastic());
}
また、subscribeOn
と publishOn
で subscribe 時、publish 時に使用するスレッドの戦略を指定できるっぽい。
https://kazuhira-r.hatenablog.com/entry/20180107/1515327957
さいごに
なんとなく API の使い方は分かったような気がするが、実際の使いどころがいまいちわかっていない・・・。