概要
- リアクティブプログラミングとは
- Camel + Reactive Streams
- Camel + Vert.x
リアクティブプログラミングとは
- リアクティブシステムの特徴
- 即応性(Responsive)― 即時に応答を返す
- 耐障害性(Resilient)― 障害があっても応答する
- 弾力性(Elastic)― 高負荷であっても応答する
- メッセージ駆動(Message Driven)― 疎結合な非同期通信に基づく
バックプレッシャー(Back pressure)
❝あるコンポーネントが全体に追いつけなくなった場合、システム全体として何らかの対処をする必要がある。過負荷状態のコンポーネントが壊滅的にクラッシュしたり、制御無くメッセージを損失することは許されない。処理が追いつかなくなっていて、かつクラッシュすることも許されないならば、コンポーネントは上流のコンポーネント群に自身が過負荷状態であることを伝えて負荷を減らしてもらうべきだ。このバック・プレッシャー (back-pressure) と呼ばれる仕組みは、過負荷の下でシステムを崩壊させず緩やかに応答を続ける重要なフィードバック機構だ。バック・プレッシャーはユーザまで転送してもよく、その場合、即応性 (resilient) は低下するが負荷の下でのシステムの耐障害性が保証される。また、システムがその情報を使って自身に他のリソースを振り向け、負荷分散を促すこともできる。❞
Reactive Streamsの例
- JDK 9の
java.util.concurrent.Flow
と1:1に対応 - back pressureのサポート
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
// Hello world
Flowable.just("Hello World").subscribe(System.out::println);
// Publisher/Subscriber example
Publisher<Integer> publisher = Flowable.just(1, 2, 3, 4, 5);
Flowable.fromPublisher(publisher)
.map(i -> i*i)
.doOnNext(System.out::println)
.subscribe();
Camel + Reactive Streams
- CamelルートとReactive Streamsフローとの双方向の連携を可能にする
reactive-streams:xxxxx[?options]
- 以下のリアクティブフレームワーク実装に対応
- Back pressureにも対応
Camel (pub) → Reactive Streams (sub)
- Camelのプロデューサーエンドポイント(
to
)からReactive Streamsのフローへ
CamelContext camel = new DefaultCamelContext();
CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(camel);
camel.start();
// Camel (pub) → Reactive Streams (sub)
//Publisher<Integer> publisher = rsCamel.from("seda:input", Integer.class);
Publisher<Integer> publisher = rsCamel.fromStream("input", Integer.class);
Flowable.fromPublisher(publisher)
.map(i -> i*i)
.doOnNext(System.out::println)
.subscribe();
FluentProducerTemplate template = camel.createFluentProducerTemplate();
IntStream.rangeClosed(1, 5).forEach(i ->
//template.withBody(i).to("seda:input").send());
template.withBody(i).to("reactive-streams:input").send());
Reactive Streams (pub) → Camel (sub)
- Reactive StreamsのフローからCamelのコンシューマーエンドポイント(
from
)へ
CamelContext camel = new DefaultCamelContext();
camel.addRoutes(new RouteBuilder() {
public void configure() {
from("reactive-streams:input")
.to("stream:out");
}
});
CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(camel);
camel.start();
// Reactive Streams (pub) → Camel (sub)
Flowable.just(1, 2, 3, 4, 5)
.map(i -> i*i)
.subscribe(rsCamel.streamSubscriber("input", Integer.class));
CamelルートをReactive Streamsフローで実装
- ルートとフローのミックス実装もできる
CamelContext camel = new DefaultCamelContext();
camel.addRoutes(new RouteBuilder() {
public void configure() {
from("reactive-streams:input")
.to("stream:out");
from("direct:output")
.log("Done: ${body}");
}
});
CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(camel);
camel.start();
// Camel route & Reactive Streams flow
Flowable.fromPublisher(rsCamel.from("seda:input"))
.map(e -> {
int i = e.getMessage().getBody(Integer.class);
e.getMessage().setBody(i * i);
return e;
})
.doOnNext(e -> rsCamel.to("stream:out", e))
.subscribe(rsCamel.subscriber("direct:output"));
FluentProducerTemplate template = camel.createFluentProducerTemplate();
IntStream.rangeClosed(1, 5).forEach(i ->
template.withBody(i).to("seda:input").send());
CamelのBack pressureサポート
- 処理中(in-flight)エクスチェンジの数を制限することでback pressureを実現できる
プロデューサー側
-
ThrottlingInflightRoutePolicy
ポリシー- スロットル中はルートが一時停止する
ThrottlingInflightRoutePolicy policy = new ThrottlingInflightRoutePolicy();
policy.setMaxInflightExchanges(10);
from("jms:queue1")
.routePolicy(policy)
.to("reactive-streams:flow1");
-
backpressureStrategy
オプション-
BUFFER
,LATEST
,OLDEST
-
from("jms:queue2")
.to("reactive-streams:flow2?backpressureStrategy=LATEST");
コンシューマー側
-
maxInflightExchanges
オプション
from("reactive-streams:numbers?maxInflightExchanges=10")
.to("direct:endpoint");
Camel + Vert.x
- Vert.xのEventBusとイベントのやり取りを可能にする
vertx:xxxxx[?options]
Vert.xとは
- JVM版のNode.jsのようなもの
- Verticle(= Actor)とイベントバス
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
class HttpServer {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new LoggingVerticle());
EventBus eventBus = vertx.eventBus();
vertx.createHttpServer()
.requestHandler(req -> {
eventBus.send("logging", "Got request: " + req.method() + " " + req.uri());
req.response().end("Hello!");
})
.listen(8888);
// ... クローズ処理
}
}
class LoggingVerticle extends AbstractVerticle {
public void start() {
vertx.eventBus().<String>consumer("logging",
message -> log.info(message.body()));
}
}
Camel Vert.x
- CamelからVert.xのイベントバス、Vert.xイベントバスからCamelへの双方向のメッセージングが可能
class CamelHttpServer {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new LoggingVerticle());
EventBus eventBus = vertx.eventBus();
vertx.createHttpServer()
.requestHandler(req -> {
eventBus.send("camel-logging", "Got request: " + req.method() + " " + req.uri());
req.response().end("Hello!");
})
.listen(8888);
CamelContext camel = new DefaultCamelContext();
camel.getComponent("vertx", VertxComponent.class).setVertx(vertx);
camel.addRoutes(new RouteBuilder() {
public void configure() {
// Vert.x → Camel
from("vertx:camel-logging")
.log("body = ${body}")
.to("direct:logging");
// Camel → Vert.x
from("direct:logging")
.to("vertx:logging");
}
});
camel.start();
// ... クローズ処理
}
}
class LoggingVerticle extends AbstractVerticle {
public void start() {
vertx.eventBus().<String>consumer("logging",
message -> log.info(message.body()));
}
}
Vert.x関連コンポーネント
コンポーネント | 説明 |
---|---|
camel-vertx-http | Vert.x Web ClientによるHTTPエンドポイント実装(Producerのみ) |
camel-vertx-kafka | Vert.x Kafka ClientによるKafkaとの接続 |
camel-vertx-websocket | Vert.x WebSocket機能によるWebSocketエンドポイントの実装 |
Vert.x Camel Bridge | CamelでなくVert.x側からのCamelへのブリッジ(ちなみに作者はCamelのClaus) |
参考文献
- Camel in Action, 2nd Edition — Chapter 20: Reactive Camel