2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

最新のReactorの使い方

Last updated at Posted at 2021-07-06

概要

JavaでReactive Programmingを行う際に良く使われるReactorですが、公式リファレンスは非常に良く出来ていると思うのですが、Reactorを始めたばかりだと少々厳しいように思います。この記事では、Reactorの公式リファレンスを読んで独学出来る事を目指して、Reactorの良く使う機能を中心に分かりづらい概念を疑似コードを用いて説明します。

Reactorの基本

Reactorの基本的な動きをReactor Coreのコードから主要な要素を抽出した疑似コードで説明します。

使用イメージ

MyMonoは、Reactor CoreのMonoの疑似コードです。機能は、Mono.create(Consumer<MonoSink>)と、Mono.map(Function<I, O>)の二つだけ提供します。

以下のテストで疑似コードのMyMonoを本物のMonoに置き換えても全く同じ動作になります

	@Test
	public void testMapping() {
		var mono = MyMono.create(s -> {
			s.success("good");
		}).map(v -> {
			return v + "_mod";
		});
		mono.subscribe(v -> {
			Assertions.assertEquals("good_mod", v);
		});
	}

createでMonoを作成して、mapで、生成した文字列を変更しています。subscribe実行で生成した文字列が取得出来る事が確認出来ます。

疑似コード

package taka8.sample.reactor;

import java.util.function.Consumer;
import java.util.function.Function;

abstract public class MyMono<T> {

	public interface Sink<T> {

		void success(T value);

	}

	abstract public void subscribe(Consumer<? super T> subscriber);

	public static <T> MyMono<T> create(Consumer<Sink<T>> template) {
		return new _SupplierMono<>(template);
	}

	public <V> MyMono<V> map(Function<? super T, ? extends V> mapper) {
		return new _MappingMono<>(this, mapper);
	}

	private static class _MappingMono<I, O> extends MyMono<O> {

		private MyMono<I> _source;

		private Function<? super I, ? extends O> _mapper;

		public _MappingMono(MyMono<I> source, Function<? super I, ? extends O> mapper) {
			super();
			_source = source;
			_mapper = mapper;
		}

		public void subscribe(Consumer<? super O> subscriber) {
			_source.subscribe(i -> {
				var o = _mapper.apply(i);
				subscriber.accept(o);
			});
		}

	}

	private static class _SupplierMono<T> extends MyMono<T> {

		private Consumer<Sink<T>> _template;

		public _SupplierMono(Consumer<Sink<T>> template) {
			super();
			_template = template;
		}

		public void subscribe(Consumer<? super T> subscriber) {
			_template.accept(new Sink<T>() {

				@Override
				public void success(T value) {
					subscriber.accept(value);
				}
			});
		}

	}

}

mapメソッドは、_MappingMonoクラスのインスタンスを生成して返しています。これが、テストコードで、最終的に使われているMonoになっています。_MappingMono.subscribeでは、_sourceのsubscribeを実行し、実行結果を変換して呼び出しもとのsubscriberに渡しています。

スタックトレース

以下のようにcreateでエラーを発生させます。

	@Test
	public void testException() {
		var mono = MyMono.create(s -> {
			throw new RuntimeException("Error.");
		}).map(v -> {
			return v + "_mod";
		});
		mono.subscribe(v -> {
		});
	}

スタックトレースは以下のようになります。


java.lang.RuntimeException: Error.
	at taka8.sample.reactor.MyMonoTest.lambda$3(MyMonoTest.java:23)
	at taka8.sample.reactor.MyMono$_SupplierMono.subscribe(MyMono.java:55)
	at taka8.sample.reactor.MyMono$_MappingMono.subscribe(MyMono.java:37)
	at taka8.sample.reactor.MyMonoTest.testException(MyMonoTest.java:27)
	at 
...

subscribe呼び出しと同一のスタックでcreateのラムダ式が実行されている事が分かります。

非同期実行

Mono.createは、通信やデータベースアクセス等の時間がかかる処理をReactiveな形式に変換する際に使用する事が出来ます。こちらは、こんなイメージになります。

	@Test
	public void testAsync() throws Exception {
		var executor = Executors.newSingleThreadExecutor();
		var mono = MyMono.create(s -> {
			executor.execute(() -> {
				__logger.info("create CALLED");
				s.success("good");
			});
		}).map(v -> {
			__logger.info("map CALLED");
			return v + "_mod";
		});
		__logger.info("subscribe CALLED");
		var latch = new CountDownLatch(1);
		mono.subscribe(v -> {
			__logger.info("subscribe RECEIVE result=<{}>", v);
			Assertions.assertEquals("good_mod", v);
			latch.countDown();
		});
		latch.await(1000, TimeUnit.MILLISECONDS);
	}

create内のラムダ式は別スレッドで非同期に実行され、実行された結果がMonoSinkに渡されます。コード中のログの出力は以下のようになります。

17:50:59.946 [main] INFO  taka8.sample.reactor.MyMonoTest - subscribe CALLED
17:50:59.949 [pool-1-thread-1] INFO  taka8.sample.reactor.MyMonoTest - create CALLED
17:50:59.950 [pool-1-thread-1] INFO  taka8.sample.reactor.MyMonoTest - map CALLED
17:50:59.951 [pool-1-thread-1] INFO  taka8.sample.reactor.MyMonoTest - subscribe RECEIVE result=<good_mod>

subscribeの呼び出しは、mainスレッドですが、ラムダ式の実行、変換、結果の出力はすべて非同期実行のスレッドで行われているのが分かります。

繰り返し実行

subscribeが複数回呼ばれるとどうなるかを見てみます。

	@Test
	public void testRepeatSubscribe() throws Exception {
		var count = new AtomicInteger(0);
		var mono = MyMono.create(s -> {
			var id = count.getAndIncrement();
			__logger.info("create CALLED id=<{}>", id);
			s.success(id);
		}).map(v -> {
			return "good_" + v;
		});
		for (int i = 0; i < 3; i++) {
			var id = i;
			mono.subscribe(v -> {
				__logger.info("subscribe RECEIVE result=<{}>", v);
				Assertions.assertEquals("good_" + id, v);
			});
		}
	}

このコードを実行すると以下のログが出力されます。

07:51:39.541 [main] INFO  taka8.sample.reactor.MyMonoTest - create CALLED id=<0>
07:51:39.545 [main] INFO  taka8.sample.reactor.MyMonoTest - subscribe RECEIVE result=<good_0>
07:51:39.548 [main] INFO  taka8.sample.reactor.MyMonoTest - create CALLED id=<1>
07:51:39.549 [main] INFO  taka8.sample.reactor.MyMonoTest - subscribe RECEIVE result=<good_1>
07:51:39.549 [main] INFO  taka8.sample.reactor.MyMonoTest - create CALLED id=<2>
07:51:39.549 [main] INFO  taka8.sample.reactor.MyMonoTest - subscribe RECEIVE result=<good_2>

createのラムダ式が複数回実行されています。疑似コードを見て頂ければ理解しやすいと思いますが、Monoはテンプレートになっていて、subscribeによって登録されているテンプレートが実行されるイメージになります。

エラー処理

全章の疑似コードはエラー処理に対応していませんでした。エラー処理(onErrorResumeのみ)に対応した疑似コードは以下になります。
最初のコードとの差分は、org.reactivestreams.SubscriberMono間を連携させている所です。onErrorResumeで作成される_ErrorHandleMonoで、渡されたSubscriberをラップしてエラー処理追加しています。

package taka8.sample.reactor;

import java.util.function.Consumer;
import java.util.function.Function;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract public class MyMonoWithErrorHandle<T> {

	private static final Logger __logger = LoggerFactory.getLogger(MyMonoWithErrorHandle.class);

	public interface Sink<T> {

		void success(T value);

		void error(Throwable error);

	}

	public static <T> MyMonoWithErrorHandle<T> create(Consumer<Sink<T>> template) {
		return new _SupplierMono<>(template);
	}

	abstract protected void subscribe(Subscriber<? super T> subscriber);

	public <V> MyMonoWithErrorHandle<V> map(Function<? super T, ? extends V> mapper) {
		return new _MappingMono<>(this, mapper);
	}

	public <V> MyMonoWithErrorHandle<T> onErrorResume(
			Function<? super Throwable, ? extends MyMonoWithErrorHandle<? extends T>> errorHandler) {
		return new _ErrorHandleMono<>(this, errorHandler);
	}

	public void subscribe(Consumer<? super T> subscriber) {
		this.subscribe(new Subscriber<T>() {

			@Override
			public void onSubscribe(Subscription s) {
			}

			@Override
			public void onNext(T t) {
				subscriber.accept(t);
			}

			@Override
			public void onError(Throwable t) {
				__logger.warn("Error occurred.", t);
			}

			@Override
			public void onComplete() {
			}
		});
	}

	private static class _MappingMono<I, O> extends MyMonoWithErrorHandle<O> {

		private MyMonoWithErrorHandle<I> _source;

		private Function<? super I, ? extends O> _mapper;

		public _MappingMono(MyMonoWithErrorHandle<I> source, Function<? super I, ? extends O> mapper) {
			super();
			_source = source;
			_mapper = mapper;
		}

		@Override
		public void subscribe(Subscriber<? super O> subscriber) {
			_source.subscribe(i -> {
				var o = _mapper.apply(i);
				subscriber.onNext(o);
			});
		}

	}

	private static class _SupplierMono<T> extends MyMonoWithErrorHandle<T> {

		private Consumer<Sink<T>> _template;

		public _SupplierMono(Consumer<Sink<T>> template) {
			super();
			_template = template;
		}

		@Override
		public void subscribe(Subscriber<? super T> subscriber) {
			_template.accept(new Sink<T>() {

				@Override
				public void success(T value) {
					subscriber.onNext(value);
				}

				@Override
				public void error(Throwable error) {
					subscriber.onError(error);
				}
			});
		}

	}

	private static class _ErrorHandleMono<T> extends MyMonoWithErrorHandle<T> {

		private MyMonoWithErrorHandle<T> _source;

		private Function<? super Throwable, ? extends MyMonoWithErrorHandle<? extends T>> _errorHandler;

		public _ErrorHandleMono(MyMonoWithErrorHandle<T> source,
				Function<? super Throwable, ? extends MyMonoWithErrorHandle<? extends T>> errorHandler) {
			super();
			_source = source;
			_errorHandler = errorHandler;
		}

		@Override
		public void subscribe(Subscriber<? super T> subscriber) {
			_source.subscribe(new Subscriber<T>() {

				@Override
				public void onSubscribe(Subscription s) {
					subscriber.onSubscribe(s);
				}

				@Override
				public void onNext(T t) {
					subscriber.onNext(t);
				}

				@Override
				public void onError(Throwable t) {
					var mono = _errorHandler.apply(t);
					mono.subscribe(subscriber);
				}

				@Override
				public void onComplete() {
					subscriber.onComplete();
				}
			});
		}

	}

}

動作イメージ

疑似コードを動作させるテストは以下になります。
エラーが処理されて正常系の結果が返ります。

	@Test
	public void testOnErrorResume() {
		var mono = MyMonoWithErrorHandle.create(s -> {
			s.error(new IllegalArgumentException("Invalid call."));
		}).onErrorResume(th -> {
			return MyMonoWithErrorHandle.create(s -> {
				s.success("good");
			});
		});
		mono.subscribe(v->{
			Assertions.assertEquals("good", v);
		});
	}

まとめ

Monoの基本的な動作の理解を疑似コードを作る事で深める事が出来たかと思います。上記から特に注意すべきMonoの挙動をまとめると以下になります(こちらは、Fluxにも適用出来ます`。

  • Monoは作成した時点では何も動作しない。subscribeの呼び出しで初めて定義された処理が動作する。
  • subscribeは、subscribeが呼び出されたスレッドで実行されるが、どこかの処理が別スレッドで実行された場合には、それ以降の処理もそのスレッドで実行される(最終的に、subscribeの結果出力もそのスレッドで実行される)。
  • subscribeを繰り返し実行すると、同じ処理が何度も最初から実行される。
  • 正常系とエラー系で個別に対応を設定出来る。
2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?