突然ですが、先日参加したJJUG ccc fallでReactiveStreams/ReactiveExtensionsという仕様とそのJava実装の一つであるReactorというものを知りました。
JJUG ccc fallでReactiveStreamsとかとかを知ったセッション
Reactive Streams
Reactor
リアクティブ~~についてものすごく参考になった資料
リアクティブなJavaといえばRxJavaだと思っていましたが、いろいろあるもんですね。
RxJavaもReactiveStreamsが実装されているようです。
https://github.com/ReactiveX/RxJava
RxJavaは触ってみたことがあったので、今回はReactorについてコードを交えながらつらつらと書いていきたい。
ふわっとしているリアクティブプログラミングというものを少しでも理解したい。。。
時間がなくて書き方が統一されていないのは勘弁して下さい。
前提
環境
- java -> jdk 1.8.0_65
- reactor -> 2.0.6.RELEASE
Reactorとは
Reactorとはざっくりとリアクティブプログラミングをするときの便利ライブラリというイメージです。
RingBufferを採用しているようでめちゃ早いみたいです。
全体のアーキテクチャ以下の4つにわかれています
- reacror-core
- reactor-bus
- reactor-streams
- reactor-net
本記事ではreactore-core
とreactor-streams
について書こうかなと思います。
書いてあることはリファレンスの抜粋なので詳細を知りたい方が下記ページからどうぞ。
http://projectreactor.io/docs/reference/
ReactorでHello World
何はともあれHello Worldです。
Reactorってどんな感じなのか、感じ取ってください。
public class HelloWorld {
static {
//環境の初期化
Environment.initialize();
}
public static void main(String[] args) throws InterruptedException {
// ストリームの作成
Broadcaster<String> sink = Broadcaster.create(Environment.get());
sink
// プロセスを実行するスレッドのディスパッチャを指定
.dispatchOn(Environment.cachedDispatcher())
// UpperCaseに変換
.map(String::toUpperCase)
// 実行スレッド情報と共にに出力
.consume(e -> System.out.printf("%s : %s\n", Thread.currentThread().getId(), e));
// メインスレッドのIDを出力
System.out.println(Thread.currentThread().getId());
// ストリームにデータを投入
sink.onNext("Hello World");
//実行スレッドが止まらないように少し待つ。。。
Thread.sleep(500);
}
}
java8のストリームAPIの用な感じです。
なんとなく雰囲気を掴めればここはいいかなと。
reactor-core
Hello Worldで雰囲気を掴みつつ、少しずつ説明します。
reactor-coreの解説は独断と偏見により以下についてです。
- Environment
- Dispatcher
- Processor
Environment -> Reactor全体の設定的なやつ
すごい説明しづらいのだけど、Reactor全体を管理する設定のようなものです。
これを利用するときは必ずEnvironment.initialize()
を呼ぶ必要があります。
Environmentを初期化したあとは基本的にstaticアクセスで必要な設定を呼んだりします。
APIの中には直接Environment.get()
でインスタンスを渡して、Environmentに沿ったなにがしを生成してくれるような物もあります。
Environmentの設定はMETA-INF/reactor/reactor-environment.properties
に書けばその通りに設定するようですが、とりあえずは特に置かなくても良いと思います。
そんな感じです。
Dispatcher-> 実行スレッドのディスパッチャ
DispatcherはDispatcher#dispatch(E,Consumer<E>,Consumer<Throwable>)
を呼び出すと処理(=Consumer)を別スレッド(同じスレッドの場合もあるけど)にディスパッチします。
ConsumerはJava8のConsumerと同じでインターフェースで引数一つの戻り値なしメソッドを持つインターフェースです。
ちょっとイメージつきづらいかもしれないので、コード載せます。
public class DispatcherTest {
/** ロガー */
private static final Logger log = LoggerFactory.getLogger(DispatcherTest.class);
private SynchronousDispatcher synchronousDispatcher;
private ThreadPoolExecutorDispatcher threadPoolExecutorDispatcher;
private RingBufferDispatcher ringBufferDispatcher;
@Before
public void before() throws Exception {
synchronousDispatcher = new SynchronousDispatcher();
threadPoolExecutorDispatcher = new ThreadPoolExecutorDispatcher(1, 128);
ringBufferDispatcher = new RingBufferDispatcher("ring");
}
@Test
//【1】
public void test_synchronousDispathcer() {
log.info("synchronousDispathcer");
synchronousDispatcher
.dispatch("test", log::info, e -> System.out.println(e));
}
@Test
//【2】
public void test_threadPoolExecutorDispatcher() {
log.info("threadPoolExecutorDispatcher");
threadPoolExecutorDispatcher
.dispatch("test", log::info, e -> System.out.println(e));
}
@Test
//【3】
public void test_ringBufferDispatcher() {
log.info("ringBufferDispatcher");
ringBufferDispatcher
.dispatch("test", log::info, e -> System.out.println(e));
}
}
3種類のDispatcherについて簡単に説明します。
-
SynchronousDispatcher
はdispatchを呼び出したスレッドと同じスレッドでConsumerを実行します。 -
ThreadPoolExecutorDispatcher
はJavaのThreadPoolExecutor
を利用して処理をディスパッチします。Consumerは別スレッドの実行となります。 -
RingBufferDispatcher
はRingBufferのアルゴリズムを利用して実行スレッドを決定し処理をそのスレッドディスパッチします。http://projectreactor.io/docs/reference/#_dispatchers にある図がわかりやすいです。
リアクティブプログラミングで非同期処理を行うときの実行制御アルゴリズムを決めている用なイメージでしょうか。
コード見てなんとなくイメージ出来たら良いかなと思います。
Processorに行く前にReactive Streams(≠ Reactor Streams)
次のProcessorに行く前に少しだけReactive Streamsの解説です。
ここで、少しReactive Streamsについてざっくり解説です。
すごいややこしいポイントですがReactive Streamsはreactor-streamsとは別物です。前者は仕様で後者Reactorのストリームを扱うAPI達です。ここはReactive Streamsでほんの少しだけ。
Reactive Streamsは仕様です。4つのI/Fだけです。シンプルですね。
- Publisher => イベントをパブリッシュします。(Subscriberへ通知)
- Subscriber => Publisherによってパブリッシュされたイベントを検知し処理をします。onNext、onComplete、onErrorパブリッシュされたイベントが拾われるメソッドです。
- Subscription => PublisherとSubscriberをつなぐもの
- Processor => Publisher兼Subscriber
こんな感じです。
まぁ何が何というのを把握しなくても、コードを書いたりはできますがReactive StreamsとReactorの言葉のマッピングができると理解が早くなるかと思います。
より詳細な情報についてはこの辺に書いてあるので適宜参照しましょう。
http://projectreactor.io/docs/reference/#reactivestreams
Processor => Processor
察しがいい人はわかったかと思いますが、ProcessorはReactive StreamsのProcessorです。back-pressureをサポートしています。
back-pressureサポートとはものすごくざっくりと解説すると、PublisherとSubscriberで処理速度に差がある場合でもちゃんと制御しますよという事です。
特にSubscriberの能力が低いとPublisherから次々くるイベントを捌ききれなくなってどうしようっていうケースですね。
TCPのフロー制御に似ています。
RingBufferProcessor
RingBufferを利用した Processorです。スループットがとかback-pressureが賢いとか、良いみたいです。
Processorを利用したコードサンプルです。
@Test
public void test_ringBufferProcessor() throws Exception {
Processor<String, String> processor = RingBufferProcessor.create("test", 32);
// Reactor Streamでラップ
Stream<String> stream = Streams.wrap(processor);
stream.consume(log::info);
stream.consume(log::info);
stream.consume(log::info);
// 各スレッドにパブリッシュされる
processor.onNext("1");
processor.onNext("2");
processor.onNext("3");
Thread.sleep(1000);
}
-
RingBufferProcessor.create()
でProcessorを生成 - Reactor Streamでラップする
- consumeを三つ指定(Processor内部のPublisherをSubscribeする)
- processorでイベントをパブリッシュ
RingBufferProcessorの場合はすべてのスレッドにイベントがパブリッシュされます。
よって上記例の場合consumeで3つSubscriberが設定されているので、onNextでパブリッシュすると、
3 x 3の系9回ログが出力されます。
ちなみに、processor.onNext("x")
を初回のconsume()
より前に実行すると、初回のconsume()
で設定せれたSubscriberにパブリッシュされるため3回のログが出力されます。
RingBufferProcessor
はそんなものなんですが、RingBufferのアルゴリズムで3回のログ出力にすることもできます。
RingBufferWorkProcessor -> subscribeは全体で一回
これはパブリッシュを一回したら、どれか一つのSubscriberにパブリッシュされるというProcessorです。
@Test
public void test_ringBufferWorkProcessor() throws Exception {
Processor<String, String> processor = RingBufferWorkProcessor.create("test", 32);
Stream<String> stream = Streams.wrap(processor);
stream.consume(log::info);
stream.consume(log::info);
stream.consume(log::info);
// 各一つのonNextでどれか一つのスレッドで実行
processor.onNext("1");
processor.onNext("2");
processor.onNext("3");
Thread.sleep(1000);
}
コードはよく似ていますが、この場合だとログ出力は3回のみです。ただし、consumeの実行スレッドはバラバラです。
reactor-coreまとめ
reactor-coreはcoreというだけあって、後述のreactor-streamとかの基礎となる実装です。とりわけReactive Streamsの実装です。
reactor-coreのAPIを直接使ってリアクティブプログラミングって大変そうじゃないですか。
なので通常はreactor-streamsを使います。reactor-streamsはreactive-extensionsの実装のようなものです(本当は全然違うのですがとりあえずここでは。。。。。。。)。
プログラミングをするときはreactor-streamsを利用するので、reactor-coreは表には出てこないのですが、中で何が起こっているかを理解するのはとても重要です。
偉そうなことをいいつつ、自分自身まだまだ勉強中。
#reactor-streams
Java8でいうStreamAPIがイメージに近いです。内部的には違うようですが。
reactor-streamsの内部ではreactor-coreを使っています。
http://projectreactor.io/docs/reference/#streams
の図がわかりやすいです。
細かなAPIを上げるのは疲れるのでこの記事では、
- 2種類のStream
- Publisherの勘所
といった感じで紹介します。
本当はもっといろいろ調べつつ紹介したいけど、本当に疲れてきたので抜粋。
2種類のStream
Streamとはデータというかイベントというか、そういったものの流れみたいなイメージです。そのStreamにはhotとcoldの二種類があって、それぞれ同じStreamですが意味が違います。
ColdStream
データの数がすでに決定しているStreamです。Collectionとかそういうものを処理するStreamはすべてColdStreamです。Java8のStreamAPIはすべてColdStreamぽいです(間違っていたらすみません)。
なのでコレクションをイテレートでぐるぐる回すとかはColdStreamです。
public static void main(String[] args) throws InterruptedException {
Environment.initialize();
Streams.just("Hello", "World", "!")
.dispatchOn(Environment.cachedDispatcher())
.map(String::toUpperCase)
.consume(log::info);
// HELLO
// WORLD
// !
Thread.sleep(500);
}
HotStream
データやイベントの数が決まっていないStreamです。UIのイベントとか非同期の戻りはHotStreamです。いつどのタイミングくるかわからない、入力データが不定なケースです。
下の例は200msごとに別スレッドでconsume回数を出力するソースコードです。これもHotStreamです。
public static void main(String[] args) throws InterruptedException {
Environment.initialize();
Streams.period(200, TimeUnit.MILLISECONDS)
.dispatchOn(Environment.cachedDispatcher())
.map(e -> e.toString())
.consume(log::info);
Thread.sleep(1000);
}
Publisher返しのアプリケーション
ちょっと疲れてきたのでコードイメージだけ抜粋します。
http://projectreactor.io/docs/reference/#streams-microservice-start
に載っていたけど、インターフェースが微妙に違っていて、動作見ていないですが、こんな感じでNonBlockingなサービスを作っていく感じになっていくと予想されます。こんな感じで簡単なWebアプリ作ってみたいです(暇があったら作ります。)
詳細は元ページを参照してください。
public Promise<User> get(final String name) {
return Promises
.task( () -> userDB.findByName(name))
.timeout(3, TimeUnit.Seconds)
.map(this::convert)
.subscribeOn(workDispatcher());
}
public Stream<User> allFriends(final User user) {
return Streams
.defer(() ->
Streams.just(userDB.findAllFriends(user)))
.timeout(3, TimeUnit.Seconds)
.map(this::convertToList)
.flatMap(Streams::from)
.dispatchOn(cachedDispatcher());
.subscribeOn(workDispatcher());
}
public Stream<User> filteredFind(String name){
return get(name)
.stream()
.filter(User::isAdmin)
.flatMap(this::allFriends);
}
最後に
結局何が言いたいかわからない感じになってしまいました。Reactor触ると面白いよ。
以上。