32
39

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

突然ですが、先日参加したJJUG ccc fallでReactiveStreams/ReactiveExtensionsという仕様とそのJava実装の一つであるReactorというものを知りました。

JJUG ccc fallでReactiveStreamsとかとかを知ったセッション
Reactive Streams
Reactor
リアクティブ~~についてものすごく参考になった資料

リアクティブなJavaといえばRxJavaだと思っていましたが、いろいろあるもんですね。
RxJavaもReactiveStreamsが実装されているようです。
https://github.com/ReactiveX/RxJava

RxJavaは触ってみたことがあったので、今回はReactorについてコードを交えながらつらつらと書いていきたい。
ふわっとしているリアクティブプログラミングというものを少しでも理解したい。。。
時間がなくて書き方が統一されていないのは勘弁して下さい。

前提

環境

  • java -> jdk 1.8.0_65
  • reactor -> 2.0.6.RELEASE

Reactorとは

Reactorとはざっくりとリアクティブプログラミングをするときの便利ライブラリというイメージです。
RingBufferを採用しているようでめちゃ早いみたいです。
全体のアーキテクチャ以下の4つにわかれています

  • reacror-core
  • reactor-bus
  • reactor-streams
  • reactor-net

本記事ではreactore-corereactor-streamsについて書こうかなと思います。

書いてあることはリファレンスの抜粋なので詳細を知りたい方が下記ページからどうぞ。
http://projectreactor.io/docs/reference/

ReactorでHello World

何はともあれHello Worldです。
Reactorってどんな感じなのか、感じ取ってください。

HelloWorld.java
public class HelloWorld {
	static {
		//環境の初期化
		Environment.initialize();
	}

	public static void main(String[] args) throws InterruptedException {
		// ストリームの作成
		Broadcaster<String> sink = Broadcaster.create(Environment.get());
		sink
				// プロセスを実行するスレッドのディスパッチャを指定
				.dispatchOn(Environment.cachedDispatcher())
				// UpperCaseに変換
				.map(String::toUpperCase)
				// 実行スレッド情報と共にに出力
				.consume(e -> System.out.printf("%s : %s\n", Thread.currentThread().getId(), e));
		// メインスレッドのIDを出力
		System.out.println(Thread.currentThread().getId());
		// ストリームにデータを投入
		sink.onNext("Hello World");
		//実行スレッドが止まらないように少し待つ。。。
		Thread.sleep(500);
	}
}

java8のストリームAPIの用な感じです。
なんとなく雰囲気を掴めればここはいいかなと。

reactor-core

Hello Worldで雰囲気を掴みつつ、少しずつ説明します。
reactor-coreの解説は独断と偏見により以下についてです。

  • Environment
  • Dispatcher
  • Processor

Environment -> Reactor全体の設定的なやつ

すごい説明しづらいのだけど、Reactor全体を管理する設定のようなものです。
これを利用するときは必ずEnvironment.initialize()を呼ぶ必要があります。
Environmentを初期化したあとは基本的にstaticアクセスで必要な設定を呼んだりします。

EnvironmentSample.java

APIの中には直接Environment.get()でインスタンスを渡して、Environmentに沿ったなにがしを生成してくれるような物もあります。
Environmentの設定はMETA-INF/reactor/reactor-environment.propertiesに書けばその通りに設定するようですが、とりあえずは特に置かなくても良いと思います。

そんな感じです。

Dispatcher-> 実行スレッドのディスパッチャ

DispatcherはDispatcher#dispatch(E,Consumer<E>,Consumer<Throwable>)を呼び出すと処理(=Consumer)を別スレッド(同じスレッドの場合もあるけど)にディスパッチします。
ConsumerはJava8のConsumerと同じでインターフェースで引数一つの戻り値なしメソッドを持つインターフェースです。

ちょっとイメージつきづらいかもしれないので、コード載せます。

DispatcherTest.java
public class DispatcherTest {
	/** ロガー */
	private static final Logger log = LoggerFactory.getLogger(DispatcherTest.class);

	private SynchronousDispatcher synchronousDispatcher;
	private ThreadPoolExecutorDispatcher threadPoolExecutorDispatcher;
	private RingBufferDispatcher ringBufferDispatcher;

	@Before
	public void before() throws Exception {
		synchronousDispatcher = new SynchronousDispatcher();
		threadPoolExecutorDispatcher = new ThreadPoolExecutorDispatcher(1, 128);
		ringBufferDispatcher = new RingBufferDispatcher("ring");
	}

	@Test
	//【1】
	public void test_synchronousDispathcer() {
		log.info("synchronousDispathcer");
		synchronousDispatcher
				.dispatch("test", log::info, e -> System.out.println(e));
	}

	@Test
	//【2】
	public void test_threadPoolExecutorDispatcher() {
		log.info("threadPoolExecutorDispatcher");
		threadPoolExecutorDispatcher
				.dispatch("test", log::info, e -> System.out.println(e));
	}

	@Test
	//【3】
	public void test_ringBufferDispatcher() {
		log.info("ringBufferDispatcher");
		ringBufferDispatcher
				.dispatch("test", log::info, e -> System.out.println(e));
	}

}

3種類のDispatcherについて簡単に説明します。

  1. SynchronousDispatcherはdispatchを呼び出したスレッドと同じスレッドでConsumerを実行します。
  2. ThreadPoolExecutorDispatcherはJavaのThreadPoolExecutorを利用して処理をディスパッチします。Consumerは別スレッドの実行となります。
  3. RingBufferDispatcherはRingBufferのアルゴリズムを利用して実行スレッドを決定し処理をそのスレッドディスパッチします。http://projectreactor.io/docs/reference/#_dispatchers にある図がわかりやすいです。

リアクティブプログラミングで非同期処理を行うときの実行制御アルゴリズムを決めている用なイメージでしょうか。
コード見てなんとなくイメージ出来たら良いかなと思います。


Processorに行く前にReactive Streams(≠ Reactor Streams)

次のProcessorに行く前に少しだけReactive Streamsの解説です。
ここで、少しReactive Streamsについてざっくり解説です。

すごいややこしいポイントですがReactive Streamsはreactor-streamsとは別物です。前者は仕様で後者Reactorのストリームを扱うAPI達です。ここはReactive Streamsでほんの少しだけ。

Reactive Streamsは仕様です。4つのI/Fだけです。シンプルですね。

  • Publisher => イベントをパブリッシュします。(Subscriberへ通知)
  • Subscriber => Publisherによってパブリッシュされたイベントを検知し処理をします。onNext、onComplete、onErrorパブリッシュされたイベントが拾われるメソッドです。
  • Subscription => PublisherとSubscriberをつなぐもの
  • Processor => Publisher兼Subscriber

こんな感じです。

まぁ何が何というのを把握しなくても、コードを書いたりはできますがReactive StreamsとReactorの言葉のマッピングができると理解が早くなるかと思います。
より詳細な情報についてはこの辺に書いてあるので適宜参照しましょう。
http://projectreactor.io/docs/reference/#reactivestreams


Processor => Processor

察しがいい人はわかったかと思いますが、ProcessorはReactive StreamsのProcessorです。back-pressureをサポートしています。
back-pressureサポートとはものすごくざっくりと解説すると、PublisherとSubscriberで処理速度に差がある場合でもちゃんと制御しますよという事です。
特にSubscriberの能力が低いとPublisherから次々くるイベントを捌ききれなくなってどうしようっていうケースですね。
TCPのフロー制御に似ています。

RingBufferProcessor

RingBufferを利用した Processorです。スループットがとかback-pressureが賢いとか、良いみたいです。
Processorを利用したコードサンプルです。

RingBufferProcessorTest.java
	@Test
	public void test_ringBufferProcessor() throws Exception {
		Processor<String, String> processor = RingBufferProcessor.create("test", 32);
		// Reactor Streamでラップ
		Stream<String> stream = Streams.wrap(processor);
		stream.consume(log::info);
		stream.consume(log::info);
		stream.consume(log::info);

		// 各スレッドにパブリッシュされる
		processor.onNext("1");
		processor.onNext("2");
		processor.onNext("3");

		Thread.sleep(1000);
	}
  1. RingBufferProcessor.create() でProcessorを生成
  2. Reactor Streamでラップする
  3. consumeを三つ指定(Processor内部のPublisherをSubscribeする)
  4. processorでイベントをパブリッシュ

RingBufferProcessorの場合はすべてのスレッドにイベントがパブリッシュされます。
よって上記例の場合consumeで3つSubscriberが設定されているので、onNextでパブリッシュすると、
3 x 3の系9回ログが出力されます。

ちなみに、processor.onNext("x") を初回のconsume()より前に実行すると、初回のconsume()で設定せれたSubscriberにパブリッシュされるため3回のログが出力されます。

RingBufferProcessor はそんなものなんですが、RingBufferのアルゴリズムで3回のログ出力にすることもできます。

RingBufferWorkProcessor -> subscribeは全体で一回

これはパブリッシュを一回したら、どれか一つのSubscriberにパブリッシュされるというProcessorです。

RingBufferProcessorTest.java
	@Test
	public void test_ringBufferWorkProcessor() throws Exception {

		Processor<String, String> processor = RingBufferWorkProcessor.create("test", 32);
		Stream<String> stream = Streams.wrap(processor);
		stream.consume(log::info);
		stream.consume(log::info);
		stream.consume(log::info);

		// 各一つのonNextでどれか一つのスレッドで実行
		processor.onNext("1");
		processor.onNext("2");
		processor.onNext("3");

		Thread.sleep(1000);

	}

コードはよく似ていますが、この場合だとログ出力は3回のみです。ただし、consumeの実行スレッドはバラバラです。

reactor-coreまとめ

reactor-coreはcoreというだけあって、後述のreactor-streamとかの基礎となる実装です。とりわけReactive Streamsの実装です。
reactor-coreのAPIを直接使ってリアクティブプログラミングって大変そうじゃないですか。
なので通常はreactor-streamsを使います。reactor-streamsはreactive-extensionsの実装のようなものです(本当は全然違うのですがとりあえずここでは。。。。。。。)。
プログラミングをするときはreactor-streamsを利用するので、reactor-coreは表には出てこないのですが、中で何が起こっているかを理解するのはとても重要です。
偉そうなことをいいつつ、自分自身まだまだ勉強中。

#reactor-streams
Java8でいうStreamAPIがイメージに近いです。内部的には違うようですが。
reactor-streamsの内部ではreactor-coreを使っています。
http://projectreactor.io/docs/reference/#streams
の図がわかりやすいです。

細かなAPIを上げるのは疲れるのでこの記事では、

  • 2種類のStream
  • Publisherの勘所

といった感じで紹介します。
本当はもっといろいろ調べつつ紹介したいけど、本当に疲れてきたので抜粋。

2種類のStream

Streamとはデータというかイベントというか、そういったものの流れみたいなイメージです。そのStreamにはhotとcoldの二種類があって、それぞれ同じStreamですが意味が違います。

ColdStream

データの数がすでに決定しているStreamです。Collectionとかそういうものを処理するStreamはすべてColdStreamです。Java8のStreamAPIはすべてColdStreamぽいです(間違っていたらすみません)。
なのでコレクションをイテレートでぐるぐる回すとかはColdStreamです。

ColdStreamSample.java
	public static void main(String[] args) throws InterruptedException {
		Environment.initialize();
		Streams.just("Hello", "World", "!")
				.dispatchOn(Environment.cachedDispatcher())
				.map(String::toUpperCase)
				.consume(log::info);
		// HELLO
		// WORLD
		// !
		Thread.sleep(500);
	}

HotStream

データやイベントの数が決まっていないStreamです。UIのイベントとか非同期の戻りはHotStreamです。いつどのタイミングくるかわからない、入力データが不定なケースです。
下の例は200msごとに別スレッドでconsume回数を出力するソースコードです。これもHotStreamです。

HotStreamSample.java
	public static void main(String[] args) throws InterruptedException {

		Environment.initialize();
		Streams.period(200, TimeUnit.MILLISECONDS)
				.dispatchOn(Environment.cachedDispatcher())
				.map(e -> e.toString())
				.consume(log::info);
		Thread.sleep(1000);
	}

Publisher返しのアプリケーション

ちょっと疲れてきたのでコードイメージだけ抜粋します。
http://projectreactor.io/docs/reference/#streams-microservice-start
に載っていたけど、インターフェースが微妙に違っていて、動作見ていないですが、こんな感じでNonBlockingなサービスを作っていく感じになっていくと予想されます。こんな感じで簡単なWebアプリ作ってみたいです(暇があったら作ります。)
詳細は元ページを参照してください。

NonBlockingSample.java
public Promise<User> get(final String name) {
  return Promises
    .task( () -> userDB.findByName(name))
    .timeout(3, TimeUnit.Seconds)
    .map(this::convert)
    .subscribeOn(workDispatcher());
}

public Stream<User> allFriends(final User user)  {
  return Streams
    .defer(() ->
      Streams.just(userDB.findAllFriends(user)))
    .timeout(3, TimeUnit.Seconds)
    .map(this::convertToList)
    .flatMap(Streams::from)
    .dispatchOn(cachedDispatcher());
    .subscribeOn(workDispatcher());
}

public Stream<User> filteredFind(String name){
    return get(name)
      .stream()
      .filter(User::isAdmin)
      .flatMap(this::allFriends);
}

最後に

結局何が言いたいかわからない感じになってしまいました。Reactor触ると面白いよ。
以上。

32
39
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
32
39

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?