9
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Apache CamelとRxJavaとの間でObservableをやりとりする

Last updated at Posted at 2015-12-29

皆さんこんにちは。しうへいと申します。
今回は、Apache CamelとRxJavaを組み合わせて使う方法を、Apache Camelの紹介およびサンプルコードを交えて書きます。

##サンプルの内容
日本語版Wikipediaにおいて、直近で編集された記事情報が流れてくるストリームを、以下のようにして作ります。

  • Wikipediaが提供しているMediawiki APIを30秒ごとに叩く。
  • Apache Camel から Jsoupを使ってリクエストする。
  • 取得した記事一覧をパースして、各要素をObservableとしてRxJavaへ渡す。
  • RxJava上では、distinct(Func1)を使って重複を排除する。
  • Apache CamelへObservableを返し、ロガーで標準出力する。

(Apache Camelの前置きが不要な方は、早速サンプルコードをご覧ください。)

##Apache Camelとは?
id:daikuro さんの「Apache Camel (Java)を使うと開発が楽になる7つの理由」 より引用します。

Apache Camelとは

Javaのフレームワーク。どんなフレームワークかというと

  • ベルトコンベア(ライン生産的な)フレームワーク
  • 「生産物」に相当するものはデータ
  • 「生産物を作るロボットや人」に相当するものはコンポーネントや個別実装
  • 「スタート」は外部からのリクエストやタイマーでの監視(例えばメールチェックみたいな)
  • 「最終生成物」はリクエストもらった人に返す
  • フレームワーク自体は超軽量。

声優さんによる、ナレーション付きのドキュメントもあります。(うれしい)
Apache Camel 説明資料 - JACUG

Apache Camelの最大のウリは、有力なオープンソースプロダクト・標準技術・各種Webサービスへのアダプター(=コンポーネント)がすでに多数提供されており、それら全てを、統一されたルール(EIP:Enterprise Integration Patterns)の下で統合できるという点です。

提供されているコンポーネントは公式で一覧できます。2015/12/29現在、194ものコンポーネントが用意されており、一例としてはAWS、Docker、Git、Gmail、Jetty、JMX、MQTT、Twitter、WebSocketなど、「この二つがどうつながるんだ?」と一見すると予想がつかないようなのもあります。
(あくまでプロダクトの機能の太い部分をAPI化しているようなイメージですので、その点はご了承ください。)
コンポーネントの他にも、オブジェクトをJSONやXMLなど任意のフォーマットに変換するコンバーターが多数提供されています。

##Observable生成ツールとしてのApache Camel
Apache Camelは、全てのコンポーネントからrx.Observableを生成することができます。
Apache Camel: RX より引用

###Camel RX

####Available as of Camel 2.11

The camel-rx library provides Camel support for the Reactive Extensions (RX) using the RxJava library so that:

  • Camel users can use the RxJava API for processing messages on endpoints using a typesafe composable API
  • RxJava users get to use all of the Camel transports and protocols from within the RxJava API

決してApache Camelというプロダクト自体がPub/Subでフル・ノンブロッキングというわけではなく、あくまで任意の処理段階でObservableを生成して、それをRxJavaに渡して操作したり、またObservableを受け取って処理をはじめたりすることができる、という理解で良いかと思います。
(非同期処理そのものはApache Camelの得意とするところではありますが、いかんせんコンポーネントベースなので、ケースバイケースになります。)


##サンプルコード
Maven上でjsoup、camel-rx、 ロガーとしてslf4j-simpleを追加しています。

pom.xml
    <dependencies>
        <dependency>
            <groupId>org.jsoup</groupId>
            <artifactId>jsoup</artifactId>
            <version>1.8.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-rx</artifactId>
            <version>2.16.1</version>
            <type>jar</type>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.6.6</version>
        </dependency>
    </dependencies>
App.java
import java.util.Collections;
import java.util.List;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.rx.ReactiveCamel;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.select.Elements;
import org.jsoup.nodes.Element;
import rx.Observable;

public class App {

    public static void main(String[] args) throws Exception {
        CamelContext context = new DefaultCamelContext();
        context.addRoutes(new RouteBuilder() {

            // RouteBuilderクラスを拡張して、Apache Camelでのルーティングルールを定義
            @Override
            public void configure() throws Exception {
                // timerコンポーネントで30秒ごとに処理を開始する
                from("timer:foo?period=30s")
                        .process((Exchange exchange) -> {
                            // Jsoup からAPIを叩く処理
                            // 通常の記事の編集(rcnamespace=0)について、直近の100件を、XML形式で取得
                            String requestUrl = "https://ja.wikipedia.org/w/api.php?action=query&list=recentchanges&rcnamespace=0&rclimit=100&format=xml";
                            Document doc = Jsoup.connect(requestUrl).ignoreContentType(true).get();
                            Elements elements = doc.select("recentchanges rc");
                            Collections.reverse(elements);
                            // 次の処理へelementsを渡す
                            exchange.getIn().setBody(elements);
                        })
                        // List実装であるElementsオブジェクトをElementに分解する
                        .split().body(List.class)
                        .to("direct:rx_in"); // RxJavaへ処理を渡す
                // RxJavaからObservableを受け取り、ロガーに出力する
                from("direct:rx_out").to("log:foo");
            }
        });
        ReactiveCamel rx = new ReactiveCamel(context);
        //Apache CamelからObservableを受け取る
        Observable<Message> rx_in = rx.toObservable("direct:rx_in");
        // rcid属性をKeyとして、重複する要素を排除
        // ElementはMessageコンテナにラップされているので、getBody(Class<T> type)で取り出す
        rx_in.distinct((Message message) -> message.getBody(Element.class).attr("rcid"));
        //Apache CamelへObservableを送り返す
        rx.sendTo(rx_in, "direct:rx_out");
        context.start();
        Thread.sleep(Long.MAX_VALUE);
    }
}

Apache Camelには外部サーバーへリクエストを送るコンポーネントも当然、用意されていますが(camel-http4や、camel-jettyなど)、今回のサンプルではコンポーネントを使わず、個人的に愛用しているHTMLパーサーであるJsoupをプロセスの中で使用しています。

Apache CamelとRxJavaの接続

Apache Camelでは、処理のルールはメソッドチェーンで記述します。ひとつづきの処理が終わり、異なるチェーンへ処理を移したい時に、送信側ではto(endpoint_uri)を、受信側ではfrom(endpoint_uri)のように書いて処理を移すことができます。
RxJavaと接続する場合にも、この方法を用いて、処理と処理の間に割って入ることができます。

App.java
//Apache CamelからObservableを受け取る
ReactiveCamel rx = new ReactiveCamel(context);
Observable<Message> rx_in = rx.toObservable("direct:rx_in");

Observableの型がMessageとなっていますが、これはApache Camelにおいて多様なコンポーネントから流れてくる多様なオブジェクトを共通して扱うためのコンテナだと考えていただけると良いかと思います。
(実際には、Exchangeというコンテナの中にさらにMessageというコンテナが入っている二重構造になっています。Observableに変換された時点でExchangeという外箱は取り払われています。)

(2016/1/1追記)

ObservableのMessageがめんどくさいよって時は、第二引数にClassオブジェクトを渡すとその型でObservableが生成されます。
第二引数を書かない場合、流すオブジェクトがIterableか何かの時に、Messageへの変換がうまくいかない事象に遭遇したので、こちらの方を使うべきかも…
ただ、Map<String,String>のような型は引数にそのままでは渡せないので、ストリーム中でキャストする等、工夫が必要になります。

App.java
//Apache CamelからMap<String,String>型のObservableを受け取る
ReactiveCamel rx = new ReactiveCamel(context);
Observable<Map<String,String>> rx_in = rx.toObservable("direct:rx_in", Map.class)
         .map((m)->(Map<String,String>)m);

(追記ここまで)

RxJavaからApache Camelへ、Observableを送り返すことも簡単にできます。

App.java
//Apache CamelへObservableを送り返す
rx.sendTo(rx_in, "direct:rx_out");

これだけではつまらないので、追記として、RxJavaの機能を使ってObservableを複製し(Cold to Hot変換)、Apache Camelの複数のチェーンへ返す場合のサンプルコードを掲載します。

App.java
ReactiveCamel rx = new ReactiveCamel(context);
//Apache CamelからObservableを受け取る
// publish() およびconnect()でCold to Hot変換
ConnectableObservable<Message> rx_in = rx.toObservable("direct:rx_in").publish();
rx_in.connect();
rx_in.distinct((Message message) -> message.getBody(Element.class).attr("rcid"));
//Apache CamelへObservableを送り返す
rx.sendTo(rx_in, "direct:rx_out1");
rx.sendTo(rx_in, "direct:rx_out2");

まとめ

「サーバーサイドでもReactiveをガンガンやりたいけど、対象のソースをObservableとして扱えるように変換したり、結果を戻したりするコードを書くのは煩雑」と感じている場合に、Apache Camelが有効かもしれません。Apache Camelなら「とりあえずObservable」がいつでも可能です。

Apache Camelのルーティングパターンや、それを記述するメソッドチェーンがそれ自体Stream的な要素を持っているため、「どっちにやらせたらいいの?」という疑問が沸くかもしれませんが、EIPに興味がなければ「Apache Camelはコンポーネント資産、および各種コンバーターを提供するもの。ロジックはRxJavaで実装」という風に割り切っても使えると思います。

注意点

ノンブロッキングなAPIが用意されていないプロダクトに関しては、当然、Apache Camelからアクセスしてもブロッキングな結果になります。
最近はRxJavaやReactorなどからノンブロッキングにアクセスできるAPIを提供しているプロダクトも増えているため、提供されている場合はそちらを使うようにしてください。

9
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?