LoginSignup
2
1

More than 1 year has passed since last update.

Apache Camelとリアクティブプログラミング

Last updated at Posted at 2021-05-25
1 / 16

概要

  • リアクティブプログラミングとは
  • Camel + Reactive Streams
  • Camel + Vert.x

リアクティブプログラミングとは

  • リアクティブシステムの特徴
    • 即応性(Responsive)― 即時に応答を返す
    • 耐障害性(Resilient)― 障害があっても応答する
    • 弾力性(Elastic)― 高負荷であっても応答する
    • メッセージ駆動(Message Driven)― 疎結合な非同期通信に基づく

バックプレッシャー(Back pressure)

❝あるコンポーネントが全体に追いつけなくなった場合、システム全体として何らかの対処をする必要がある。過負荷状態のコンポーネントが壊滅的にクラッシュしたり、制御無くメッセージを損失することは許されない。処理が追いつかなくなっていて、かつクラッシュすることも許されないならば、コンポーネントは上流のコンポーネント群に自身が過負荷状態であることを伝えて負荷を減らしてもらうべきだ。このバック・プレッシャー (back-pressure) と呼ばれる仕組みは、過負荷の下でシステムを崩壊させず緩やかに応答を続ける重要なフィードバック機構だ。バック・プレッシャーはユーザまで転送してもよく、その場合、即応性 (resilient) は低下するが負荷の下でのシステムの耐障害性が保証される。また、システムがその情報を使って自身に他のリソースを振り向け、負荷分散を促すこともできる。❞


Reactive Streamsの例

  • JDK 9のjava.util.concurrent.Flowと1:1に対応
  • back pressureのサポート
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;

// Hello world
Flowable.just("Hello World").subscribe(System.out::println);

// Publisher/Subscriber example
Publisher<Integer> publisher = Flowable.just(1, 2, 3, 4, 5);
Flowable.fromPublisher(publisher)
    .map(i -> i*i)
    .doOnNext(System.out::println)
    .subscribe();

Camel + Reactive Streams

  • CamelルートとReactive Streamsフローとの双方向の連携を可能にする
    • reactive-streams:xxxxx[?options]
  • 以下のリアクティブフレームワーク実装に対応
  • Back pressureにも対応

Camel (pub) → Reactive Streams (sub)

  • Camelのプロデューサーエンドポイント(to)からReactive Streamsのフローへ
CamelContext camel = new DefaultCamelContext();
CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(camel);
camel.start();

// Camel (pub) → Reactive Streams (sub)
//Publisher<Integer> publisher = rsCamel.from("seda:input", Integer.class);
Publisher<Integer> publisher = rsCamel.fromStream("input", Integer.class);
Flowable.fromPublisher(publisher)
    .map(i -> i*i)
    .doOnNext(System.out::println)
    .subscribe();

FluentProducerTemplate template = camel.createFluentProducerTemplate();
IntStream.rangeClosed(1, 5).forEach(i ->
    //template.withBody(i).to("seda:input").send());
    template.withBody(i).to("reactive-streams:input").send());

Reactive Streams (pub) → Camel (sub)

  • Reactive StreamsのフローからCamelのコンシューマーエンドポイント(from)へ
CamelContext camel = new DefaultCamelContext();
camel.addRoutes(new RouteBuilder() {
    public void configure() {
        from("reactive-streams:input")
            .to("stream:out");
    }
});
CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(camel);
camel.start();

// Reactive Streams (pub) → Camel (sub)
Flowable.just(1, 2, 3, 4, 5)
    .map(i -> i*i)
    .subscribe(rsCamel.streamSubscriber("input", Integer.class));

CamelルートをReactive Streamsフローで実装

  • ルートとフローのミックス実装もできる
CamelContext camel = new DefaultCamelContext();
camel.addRoutes(new RouteBuilder() {
    public void configure() {
        from("reactive-streams:input")
            .to("stream:out");
        from("direct:output")
            .log("Done: ${body}");
    }
});
CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(camel);
camel.start();

// Camel route & Reactive Streams flow
Flowable.fromPublisher(rsCamel.from("seda:input"))
    .map(e -> {
        int i = e.getMessage().getBody(Integer.class);
        e.getMessage().setBody(i * i);
        return e;
    })
    .doOnNext(e -> rsCamel.to("stream:out", e))
    .subscribe(rsCamel.subscriber("direct:output"));

FluentProducerTemplate template = camel.createFluentProducerTemplate();
IntStream.rangeClosed(1, 5).forEach(i ->
    template.withBody(i).to("seda:input").send());

CamelのBack pressureサポート

  • 処理中(in-flight)エクスチェンジの数を制限することでback pressureを実現できる

プロデューサー側

  • ThrottlingInflightRoutePolicyポリシー
    • スロットル中はルートが一時停止する
ThrottlingInflightRoutePolicy policy = new ThrottlingInflightRoutePolicy();
policy.setMaxInflightExchanges(10);

from("jms:queue1")
    .routePolicy(policy)
    .to("reactive-streams:flow1");
  • backpressureStrategyオプション
    • BUFFER, LATEST, OLDEST
from("jms:queue2")
    .to("reactive-streams:flow2?backpressureStrategy=LATEST");

コンシューマー側

  • maxInflightExchangesオプション
from("reactive-streams:numbers?maxInflightExchanges=10")
    .to("direct:endpoint");

Camel + Vert.x

  • Vert.xのEventBusとイベントのやり取りを可能にする
    • vertx:xxxxx[?options]

Vert.xとは

  • JVM版のNode.jsのようなもの
    • Verticle(= Actor)とイベントバス
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;

class HttpServer {
    public static void main(String[] args) {
        Vertx vertx = Vertx.vertx();
        vertx.deployVerticle(new LoggingVerticle());
        EventBus eventBus = vertx.eventBus();
        vertx.createHttpServer()
            .requestHandler(req -> {
                eventBus.send("logging", "Got request: " + req.method() + " " + req.uri());
                req.response().end("Hello!");
            })
            .listen(8888);

        // ... クローズ処理
    }
}

class LoggingVerticle extends AbstractVerticle {
    public void start() {
        vertx.eventBus().<String>consumer("logging",
            message -> log.info(message.body()));
    }
}

Camel Vert.x

  • CamelからVert.xのイベントバス、Vert.xイベントバスからCamelへの双方向のメッセージングが可能
class CamelHttpServer {
    public static void main(String[] args) {
        Vertx vertx = Vertx.vertx();
        vertx.deployVerticle(new LoggingVerticle());
        EventBus eventBus = vertx.eventBus();
        vertx.createHttpServer()
            .requestHandler(req -> {
                eventBus.send("camel-logging", "Got request: " + req.method() + " " + req.uri());
                req.response().end("Hello!");
            })
            .listen(8888);

        CamelContext camel = new DefaultCamelContext();
        camel.getComponent("vertx", VertxComponent.class).setVertx(vertx);
        camel.addRoutes(new RouteBuilder() {
            public void configure() {
                // Vert.x → Camel
                from("vertx:camel-logging")
                    .log("body = ${body}")
                    .to("direct:logging");
                // Camel → Vert.x
                from("direct:logging")
                    .to("vertx:logging");
            }
        });
        camel.start();

        // ... クローズ処理
    }
}

class LoggingVerticle extends AbstractVerticle {
    public void start() {
        vertx.eventBus().<String>consumer("logging",
            message -> log.info(message.body()));
    }
}

Vert.x関連コンポーネント

コンポーネント 説明
camel-vertx-http Vert.x Web ClientによるHTTPエンドポイント実装(Producerのみ)
camel-vertx-kafka Vert.x Kafka ClientによるKafkaとの接続
camel-vertx-websocket Vert.x WebSocket機能によるWebSocketエンドポイントの実装
Vert.x Camel Bridge CamelでなくVert.x側からのCamelへのブリッジ(ちなみに作者はCamelのClaus)

参考文献


END

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1