LoginSignup
2
2

More than 3 years have passed since last update.

Lite Rx API Hands-on やってみた

Posted at

いまさらながら Reactor に入門しようと思ってやってみた。

Part01Flux

public class Part01Flux {
  Flux<String> emptyFlux() {
    return Flux.empty();
  }

  Flux<String> fooBarFluxFromValues() {
    return Flux.just("foo", "bar");
  }

  Flux<String> fooBarFluxFromList() {
    return Flux.fromIterable(Arrays.asList("foo", "bar"));
  }

  Flux<String> errorFlux() {
    return Flux.error(new IllegalStateException());
  }

  Flux<Long> counter() {
    return Flux.interval(Duration.ofMillis(100)).take(10);
  }
}

Flux.empty()Flux.just() でもいい。
最後のは zipwith を利用して以下のようにすることもできる。

Flux<Long> counter() {
  return Flux.interval(Duration.ofMillis(100))
          .zipWith(Flux.fromStream(LongStream.range(0, 10).boxed()))
          .map(Tuple2::getT2);
}

Part02Mono

public class Part02Mono {

  Mono<String> emptyMono() {
    return Mono.empty();
  }

  Mono<String> monoWithNoSignal() {
    return Mono.never();
  }

  Mono<String> fooMono() {
    return Mono.just("foo");
  }

  Mono<String> errorMono() {
    return Mono.error(new IllegalStateException());
  }
}

Mono.never() の使いどころがわからん。

Part03StepVerifier

public class Part03StepVerifier {

  void expectFooBarComplete(Flux<String> flux) {
    StepVerifier.create(flux)
            .expectNext("foo", "bar")
            .verifyComplete();
  }

  void expectFooBarError(Flux<String> flux) {
    StepVerifier.create(flux)
            .expectNext("foo", "bar")
            .expectError(RuntimeException.class)
            .verify();
  }

  void expectSkylerJesseComplete(Flux<User> flux) {
    StepVerifier.create(flux)
            .assertNext(u -> Assertions.assertThat(u.getUsername()).isEqualTo("swhite"))
            .assertNext(u -> Assertions.assertThat(u.getUsername()).isEqualTo("jpinkman"))
            .verifyComplete();
  }

  void expect10Elements(Flux<Long> flux) {
    Duration d = StepVerifier.create(flux)
            .expectNextCount(10)
            .verifyComplete();
    LoggerFactory.getLogger(Part03StepVerifier.class).info(d.toString());
  }

  void expect3600Elements(Supplier<Flux<Long>> supplier) {
    Duration d = StepVerifier.withVirtualTime(supplier)
            .thenAwait(Duration.ofSeconds(3600))
            .expectNextCount(3600)
            .verifyComplete();
    LoggerFactory.getLogger(Part03StepVerifier.class).info(d.toString());
  }

  private void fail() {
    throw new AssertionError("workshop not implemented");
  }
}

100 時間かかるようなスケジューリングになっている Flux だとしても withVirtualTime を利用することで仮想的に時間を進めることで数秒でテストが実行できるらしい。
https://mike-neck.hatenadiary.com/entry/2018/02/21/080000

Part04Transform

public class Part04Transform {

  Mono<User> capitalizeOne(Mono<User> mono) {
    return mono.map(this::capitalizeUser);
  }

  Flux<User> capitalizeMany(Flux<User> flux) {
    return flux.map(this::capitalizeUser);
  }

  private User capitalizeUser(User user) {
    return new User(user.getUsername().toUpperCase(), user.getFirstname().toUpperCase(), user.getLastname().toUpperCase());
  }

  Flux<User> asyncCapitalizeMany(Flux<User> flux) {
    return flux.flatMap(this::asyncCapitalizeUser);
  }

  Mono<User> asyncCapitalizeUser(User u) {
    return Mono.just(new User(u.getUsername().toUpperCase(), u.getFirstname().toUpperCase(), u.getLastname().toUpperCase()));
    }
}

flatMapmap の違いは引数で渡す Function の戻り値が FluxMono か、そうでないか。
faltMap の場合は非同期に処理されるので、もとの順序とは異なる可能性がある。

Flux<String> flat() {
  Flux<String> flux = Flux.just("hoge", "fuga", "piyo");
  return flux.flatMap(s -> Flux.interval(Duration.ofMillis(100)).take(3).map(l -> s + l));
}

上記のような処理にすると最終的に得られる出力の順序は以下のようになる。

hoge0
fuga0
piyo0
hoge1
fuga1
piyo1
hoge2
fuga2
piyo2

flatMapSequentail を利用すると、もとの順序が保持される。

Flux<String> flat() {
  Flux<String> flux = Flux.just("hoge", "fuga", "piyo");
  return flux.flatMapSequential(s -> Flux.interval(Duration.ofMillis(100)).take(3).map(l -> s + l));
}
hoge0
hoge1
hoge2
fuga0
fuga1
fuga2
piyo0
piyo1
piyo2

Part05Merge

public class Part05Merge {
  Flux<User> mergeFluxWithInterleave(Flux<User> flux1, Flux<User> flux2) {
    return flux1.mergeWith(flux2);
  }

  Flux<User> mergeFluxWithNoInterleave(Flux<User> flux1, Flux<User> flux2) {
    return flux1.concatWith(flux2);
  }

  Flux<User> createFluxFromMultipleMono(Mono<User> mono1, Mono<User> mono2) {
    return mono1.concatWith(mono2);
  }
}

mergeWith は到着順に並べてマージする。concatWith はもとの順序を維持してマージする。(引数で与えられたものが後ろになる)
上記のテストケースでは flux1 が遅れてやってくるようになっている。
mergeWith では flux2 -> flux1 の順に、concatWith では flux1 -> flux2 の順になる。

Part06Request

public class Part06Request {

  ReactiveRepository<User> repository = new ReactiveUserRepository();

  StepVerifier requestAllExpectFour(Flux<User> flux) {
    return StepVerifier.create(flux)
            .expectNextCount(4)
            .expectComplete();
  }

  StepVerifier requestOneExpectSkylerThenRequestOneExpectJesse(Flux<User> flux) {
    return StepVerifier.create(flux)
            .thenRequest(1)
            .expectNext(User.SKYLER)
            .thenRequest(1)
            .expectNext(User.JESSE)
            .thenCancel();
  }

  Flux<User> fluxWithLog() {
    return repository.findAll().log();
  }

  Flux<User> fluxWithDoOnPrintln() {
    Logger logger = LoggerFactory.getLogger(Part06Request.class);
    return repository.findAll()
            .doOnSubscribe(s -> logger.info("Starring:"))
            .doOnNext(u -> logger.info("{} {}", u.getFirstname(), u.getLastname()))
            .doOnComplete(() -> logger.info("The end!"));
  }
}

doOn~~ というメソッドで FluxMono の特定の動作に対して処理を定義できる。
イベントリスナーみたいなものだと思っている。

Part07Errors

public class Part07Errors {

  Mono<User> betterCallSaulForBogusMono(Mono<User> mono) {
    return mono.onErrorReturn(User.SAUL);
  }

  Flux<User> betterCallSaulAndJesseForBogusFlux(Flux<User> flux) {
    return flux.onErrorResume(th -> Flux.just(User.SAUL, User.JESSE));
  }

  Flux<User> capitalizeMany(Flux<User> flux) {
    return flux.map(u -> {
      try {
        return capitalizeUser(u);
      } catch (GetOutOfHereException e) {
        throw Exceptions.propagate(e);
      }
    });
  }

  User capitalizeUser(User user) throws GetOutOfHereException {
    if (user.equals(User.SAUL)) {
      throw new GetOutOfHereException();
    }
    return new User(user.getUsername(), user.getFirstname(), user.getLastname());
  }

  protected final class GetOutOfHereException extends Exception {
    private static final long serialVersionUID = 0L;
  }
}

onErrorReturn だとエラー発生時に静的なデータを返却できる。
しかし、Flux の場合も含めることができるデータが 1 つしかない。
onErrorResume だとフォールバック処理を定義できるので、そこで複数の値が定義された Fluxを生成できる。
onErrorMap というのもあるが、これは発生した Throwable を別の Throwable に変換するための処理を定義するため、今回のケースでは利用できない。

Part08OtherOperations

public class Part08OtherOperations {

  Flux<User> userFluxFromStringFlux(Flux<String> usernameFlux, Flux<String> firstnameFlux, Flux<String> lastnameFlux) {
    return Flux.zip(usernameFlux, firstnameFlux, lastnameFlux).map(t -> new User(t.getT1(), t.getT2(), t.getT3()));
  }

  Mono<User> useFastestMono(Mono<User> mono1, Mono<User> mono2) {
    return Mono.first(mono1, mono2);
  }

  Flux<User> useFastestFlux(Flux<User> flux1, Flux<User> flux2) {
    return Flux.first(flux1, flux2);
  }

  Mono<Void> fluxCompletion(Flux<User> flux) {
    return flux.then();
  }

  Mono<User> nullAwareUserToMono(User user) {
    return Mono.justOrEmpty(user);
  }

  Mono<User> emptyToSkyler(Mono<User> mono) {
    return mono.switchIfEmpty(Mono.just(User.SKYLER));
  }
}

Part09Adapt

public class Part09Adapt {

  Flowable<User> fromFluxToFlowable(Flux<User> flux) {
    return Flowable.fromPublisher(flux);
  }

  Flux<User> fromFlowableToFlux(Flowable<User> flowable) {
    return Flux.from(flowable);
  }

  Observable<User> fromFluxToObservable(Flux<User> flux) {
    return Observable.fromPublisher(flux);
  }

  Flux<User> fromObservableToFlux(Observable<User> observable) {
    return Flux.from(observable.toFlowable(BackpressureStrategy.MISSING));
  }

  Single<User> fromMonoToSingle(Mono<User> mono) {
    return Single.fromPublisher(mono);
  }

  Mono<User> fromSingleToMono(Single<User> single) {
    return Mono.from(single.toFlowable());
  }

  CompletableFuture<User> fromMonoToCompletableFuture(Mono<User> mono) {
    return mono.toFuture();
  }

  Mono<User> fromCompletableFutureToMono(CompletableFuture<User> future) {
    return Mono.fromFuture(future);
  }
}

Reactor にも RxJava にも Publisher を受け取って生成するメソッドがあるのでそれを利用すればいい。
ただ、ObservableSinglePublisher の実装クラスではないので、Flowable に変換する必要がある。
ObservableFlowable に変換する場合は、Backpressure の設定を指定しないといけない。MISSING でいいのかはわからん。

Part10ReactiveToBlocking

public class Part10ReactiveToBlocking {
  User monoToValue(Mono<User> mono) {
    return mono.block();
  }

  Iterable<User> fluxToValues(Flux<User> flux) {
    return flux.toIterable();
  }
}

他にも toStream とかでもブロッキングできる。

Part11BlockingToReactive

public class Part11BlockingToReactive {
  Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
    return Flux.defer(() -> Flux.fromIterable(repository.findAll())).subscribeOn(Schedulers.elastic());
  }

  Mono<Void> fluxToBlockingRepository(Flux<User> flux, BlockingRepository<User> repository) {
    return flux.publishOn(Schedulers.elastic())
            .doOnNext(u -> repository.save(u))
            .then();
  }
}

defer を使うことによって、この Fluxsubscribe されるまで処理を遅延させることができる。
defer を利用せずに以下のようにしてしまうと、この処理の時点で repository.findAll() が実行される。

Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
  return Flux.fromIterable(repository.findAll()).subscribeOn(Schedulers.elastic());
}

また、subscribeOnpublishOn で subscribe 時、publish 時に使用するスレッドの戦略を指定できるっぽい。
https://kazuhira-r.hatenablog.com/entry/20180107/1515327957

さいごに

なんとなく API の使い方は分かったような気がするが、実際の使いどころがいまいちわかっていない・・・。

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2