はじめに
Apache Camelのメッセージルーティングの基本を説明していきます。
その前にルートなどのCamelの基本的な概念を知っておく必要がありますので説明していきます。
ルート
Camelの基本的な要素はルートです。
ルートの開始時、Camelはエンドポイントからメッセージを受信し、ルートの終了時に別のエンドポイントへメッセージを送信します。このルートを定義していくことがCamelを用いた開発の基本で、一つのアプリケーション内で複数のルートを作成し、エンドポイントと接続していきます。
エンドポイント
エンドポイントは、Camelから見た他の外部システムなどを表します。
エンドポイントは、ローカルのファイルシステム、Webサービス、FTPサーバー、JMSキュー、メールサーバなどであり、コンポーネントによって作成されます。
コンポーネント
エンドポイントへ接続するために使用するのがコンポーネントです。
Camelでは200以上のコンポーネントが用意されており、エンドポイントとの間でデータを送受信するためのコードを書くことなく、必要なコンポーネントを見つけて利用するだけでデータの送受信が実現できます。
このコンポーネントはプロデューサとコンシューマに分かれます。
何かを書き込むためのコンポーネントはプロデューサと呼ばれます。例えば、ファイルへの書き込み、Webサービスへのデータ送信、FTPサーバへのアップロードなどがプロデューサのコンポーネントになります。
次に、何かを読み取るためのコンポーネントはコンシューマと呼ばれます。例えば、ファイルの読み込み、Webサービスでのリクエスト受信、キューからサブスクライブするなどがコンシューマのコンポーネントになります。
Exchange
コンシューマによって受信されたデータはExchangeと呼ばれます。CamelではExchangeがルートを流れ、処理を行うProcessorにより加工され、プロデューサに渡されます。
ルート定義の例
簡単なルート定義の例を見てみましょう。
ルートの定義でよく使用されるのはJava DSLとSpring XML DSLの2つになります。
次のソースはJava DSLで定義したルートの例です。
from("file:data/input?noop=true&fileName=file1.txt")
.log("contents: ${body}")
.marshal().zipFile()
.to("file:data/output");
from("file:data/input?noop=true&fileName=file1.txt")
fromメソッドはプロデューサを表します。プロデューサで使用されるコンポーネントは"file"で、"data/input"ディレクトリにあるfile1.txtを読み込みExchangeに入れられます。
.log("contents: ${body}")
次にlogコンポーネントが使用されており、fileコンポーネントで読み取られたファイルの中身がログに出力されます。
.marshal().zipFile()
marshalはデータの変換で、ファイルの中身がzipファイルに圧縮されます。
.to("file:data/output");
最後にtoメソッドはコンシューマを表します。コンシューマで使用されているfileコンポーネントにより、"data/output"ディレクトリへ先ほど圧縮されたzipファイルが出力されます。
Camelを使用しない場合は数十行はかかる処理が、たった4行で定義された簡単なルートで実現できました。
次にSpring XML DSLの場合は先ほどのルートは以下のXMLファイルになります。
<route id="main_route">
<from uri="file:data/input?noop=true&fileName=file1.txt" />
<log message="contents: ${body}" />
<marshal>
<zipFile />
</marshal>
<to uri="file:data/output" />
</route>
ルートの同期・非同期処理
Camelの基本的な概念が分かったところで、いよいよメッセージルーティング・・・といきたいところですが、その前にルートの同期・非同期処理を実施するコンポーネントについて説明します。
これもルートを定義する上で最初に覚えておくべきことですので、メッセージルーティングの前に説明します。
同期のメッセージングのコンポーネントにはDirect, Direct-VMがあり、非同期のメッセージングのコンポーネントにはSEDA, VMがあります。
ここでは、よく利用されるDirectとSEDAの2つのコンポーネントについて扱います(私は実システムでそれしか使ったことがありません)。
今回説明しないDirect-VMとVMは、複数のCamelContextを跨ってメッセージングを行うコンポーネントで、Webアプリケーションで複数のCamelContextに分かれている場合などに利用できます。
Directコンポーネントを使用したルートの同期処理
Directコンポーネントのプロデューサは、同じDirectコンポーネントのコンシューマを直接呼び出します。それにより、ルートを複数に分割することができます。
ルートを分割すると可読性が高くなるというメリットの他に、ルートの部品化ができるというメリットがあります。例えば、複数のルートで共通の処理を抽出して別のルートとすることで、同じ内容のルートを重複して記述することが無くなります。
DirectコンポーネントのURIは次のとおりです。
direct:endpointName
Directコンポーネントで先ほどのファイルをzip圧縮するルートを分割した例は次のとおりです。
<route id="main_route">
<from uri="file:data/input?noop=true&fileName=file1.txt" />
<log message="contents: ${body}" />
<to uri="direct:second" />
</route>
<route id="sub_route">
<from uri="direct:second" />
<marshal>
<zipFile />
</marshal>
<to uri="file:data/output" />
</route>
最初のルートのプロデューサ("")から、2番目のルートのコンシューマ("")が同期で接続されています。
SEDAコンポーネントを使用したルートの非同期処理
SEDAコンポーネントを用いるとルート間をメッセージキューで非同期処理を行うことができます。
SEDAコンポーネントはインメモリで処理する簡易なメッセージキューを実現します。これをActiveMQなどに置き換えることも容易です。
SEDAコンポーネントのURIは次のとおりです。
seda:endpointName
SEDAコンポーネントで先ほどのファイルをzip圧縮するルートを分割した例は次のとおりです。
from("file:data/input?noop=true&fileName=file1.txt").id("main_route")
.log("contents: ${body}")
.to("seda:second")
.log("main_route end.");
from("seda:second").id("sub_route")
.delay(1000L)
.marshal().zipFile()
.to("file:data/output")
.log("sub_route end.");
XML DSLの場合は以下のように定義します。
<route id="main_route">
<from uri="file:data/input?noop=true&fileName=file1.txt" />
<log message="contents: ${body}" />
<to uri="seda:second" />
<log message="main_route end." />
</route>
<route id="sub_route">
<from uri="seda:second" />
<delay asyncDelayed="false">
<constant>1000</constant>
</delay>
<marshal>
<zipFile />
</marshal>
<to uri="file:data/output" />
<log message="sub_route end." />
</route>
実行するとスレッド名が「thread #1」、「thread #2」とルートで分かれていることが分かります。
[2019-02-07 21:47:07.015], [INFO ], main_route, Camel (camel-1) thread #1 - file://data/input, main_route, contents: file1 example
[2019-02-07 21:47:07.023], [INFO ], main_route, Camel (camel-1) thread #1 - file://data/input, main_route, main_route end.
[2019-02-07 21:47:08.047], [INFO ], sub_route, Camel (camel-1) thread #2 - seda://second, sub_route, sub_route end.
SEDAについては以下の記事も参照してください。
メッセージルーティング
Apache Camelで、メッセージのルーティングはEnterprise Integration Patternに対応し、様々な方法が実装されています。
ここではよく使用されるメッセージルーティングについて紹介します。
ルーティング | 説明 |
---|---|
Content Based Router(条件分岐) | Content Based Routerでは、メッセージExchangeのデータの内容を条件にメッセージをルーティングします。 |
Splitter(分割) | Splitterでは、メッセージExchangeのデータを分割し、並列で処理させることができます。例えば、ExchangeにList型のデータが格納されており、それを1つずつ並列で処理できます。 |
Aggregator(集約) | Aggregatorでは、Splitterとは逆で、ルート中のExchangeを集約して処理することができます。例えば、ルートに流れてきた1データをCSVの1レコードに変更し、複数レコードを集約してCSVファイルに出力するようなことが可能です。 |
Throttler(スロットル・流量制御) | Throttlerでは次のエンドポイントへ流れるデータ量を制御することができます。例えば、送り先が高負荷にならないように、単位時間当たりの処理データ件数を制限することができます。 |
Message Filter(フィルタリング) | Message Filterでは、メッセージExchangeのデータの内容を条件にメッセージをフィルタリングします。 |
Dynamic Router(動的ルーティング) | Dynamic Routerでは、メッセージExchangeのデータの内容とルーティング元のエンドポイントを条件に、メッセージを動的にルーティングするロジックを実装できます。 |
Load Balancer(負荷分散) | Load Balancerでは、複数の送り先のエンドポイントがある場合に分散してルーティングすることができます。 |
Multicast(マルチキャスト) | Multicastは、同じメッセージを複数のエンドポイントにルーティングし、それらを異なる方法で処理することを可能にします。 |
Content Based Router(条件分岐)
Content Based Routerはメッセージの内容に基づいてメッセージの宛先をルーティングします。
Content Based Routerは、choice-when-otherwise句を利用して記述します。
choiceはルーティングの一番親になり、その下に条件(つまり、if)の数だけ、when句を記述します。
when句では、最初にsimple句で条件を記述します。
簡単な例を示します。
from("direct:second").id("sub_route")
.choice()
.when().simple("${header.CamelFileName} ends with '.xml'")
.to("seda:xml")
.log("choice xml.")
.endChoice();
XML DSLの場合は以下のように定義します。
<choice>
<when>
<simple>${header.CamelFileName} ends with '.xml'</simple>
<to uri="seda:xml" />
<log message="choice xml." />
</when>
</choice>
when句で条件が1つだけのルーティングになります。simple句では、"${header.CamelFileName} ends with '.xml'"と記載し、ヘッダー(header)のCamelFileNameという変数の値が".xml"で終了する場合が条件に設定しています。
条件に合う場合は、続く""の処理が行われます。条件に合致しない場合は、それ以上の条件がないため、何も処理されません。
条件が複数ある場合は、以下のようにwhen句を条件の数だけ記述します。when句は先頭から順番に適用され、最初に条件にあったものだけが実行されます。
from("direct:second").id("sub_route")
.choice()
.when().simple("${header.CamelFileName} ends with '.xml'")
.to("seda:xml")
.log("choice xml.")
.when().simple("${header.CamelFileName} regex '^.*(csv|txt)$'")
.to("seda:csvortxt")
.log("choice csv or txt.")
.endChoice();
XML DSLの場合は以下のように定義します。
<choice>
<when>
<simple>${header.CamelFileName} ends with '.xml'</simple>
<to uri="seda:xml" />
<log message="choice xml." />
</when>
<when>
<simple>${header.CamelFileName} regex '^.*(csv|txt)$'</simple>
<log message="choice csv or txt." />
<to uri="seda:csvortxt" />
</when>
</choice>
最後にどの条件にも合わなかった場合に処理する、otherwise句を見てみましょう。
otherwise句ではwhen句と異なり、実行する条件がないためsimple句がありません。他の全てのwhen句で処理されなかった場合のみ、otherwise句が実行されます。
from("direct:second").id("sub_route")
.choice()
.when().simple("${header.CamelFileName} ends with '.xml'")
.to("seda:xml")
.log("choice xml.")
.when().simple("${header.CamelFileName} regex '^.*(csv|txt)$'")
.to("seda:csvortxt")
.log("choice csv or txt.")
.otherwise()
.log("choice otherwise.")
.to("seda:end")
.endChoice();
XML DSLの場合は以下のように定義します。
<choice>
<when>
<simple>${header.CamelFileName} ends with '.xml'</simple>
<to uri="seda:xml" />
<log message="choice xml." />
</when>
<when>
<simple>${header.CamelFileName} regex '^.*(csv|txt)$'</simple>
<log message="choice csv or txt." />
<to uri="seda:csvortxt" />
</when>
<otherwise>
<log message="choice otherwise." />
<to uri="seda:end" />
<stop />
</otherwise>
</choice>
実行すると以下のようにログが出力されます。
[2019-02-08 07:40:18.447], [INFO ], main_route, Camel (camel-1) thread #1 - file://data/input, main_route, file name: file1.xml
[2019-02-08 07:40:18.470], [INFO ], main_route, Camel (camel-1) thread #1 - file://data/input, main_route, choice xml.
[2019-02-08 07:40:18.470], [INFO ], main_route, Camel (camel-1) thread #1 - file://data/input, main_route, main_route end.
[2019-02-08 07:40:18.473], [INFO ], main_route, Camel (camel-1) thread #1 - file://data/input, main_route, file name: file2.csv
[2019-02-08 07:40:18.476], [INFO ], main_route, Camel (camel-1) thread #1 - file://data/input, main_route, choice csv or txt.
[2019-02-08 07:40:18.477], [INFO ], main_route, Camel (camel-1) thread #1 - file://data/input, main_route, main_route end.
[2019-02-08 07:40:18.478], [INFO ], main_route, Camel (camel-1) thread #1 - file://data/input, main_route, file name: file3.jpg
[2019-02-08 07:40:18.478], [INFO ], main_route, Camel (camel-1) thread #1 - file://data/input, main_route, choice otherwise.
Splitter(分割)
Splitterでは、メッセージExchangeのデータを分割し、パラレルで処理させることができます。例えば、ExchangeにList型のデータが格納されており、それを1つずつパラレルで処理できます。
詳細は以下の記事を参照してください。
Aggregator(集約)
Aggregatorでは、Splitterとは逆にルート中のExchangeを集約し処理することができます。例えば、ルートに流れてきた1データをCSVの1レコードに変更し、複数レコードを集約しまとめてCSVファイルに出力するようなことが可能です。
詳細は以下の記事を参照してください。
Throttler(スロットル・流量制御)
Throttlerパターンを使用すると、送信先のシステム(エンドポイント)に対してのデータの流量を制御することができます。例えば、1秒間に500個のデータまでしか送信しないなど。
それにより相手側システムが過負荷にならないように制御することができます。
詳細は以下の記事を参照してください。
Message Filter(フィルタリング)
Message Filterパターンを使用すると、特定の条件が満たされた場合にのみ受信したメッセージを通過させます。条件に合わなかったメッセージは破棄されます。
from("file:data/input?noop=true&fileName=file4.txt").id("main_route")
.log("contents: ${body}")
.filter()
.simple("${body} regex '^C.*'")
.to("seda:second")
.end()
.log("main_route end.");
XML DSLの場合は以下のように定義します。
<route id="main_route">
<from uri="file:data/input?noop=true&fileName=file4.txt" />
<log message="contents: ${body}" />
<filter>
<simple>${body} regex '^C.*'</simple>
<to uri="seda:second" />
</filter>
<log message="main_route end." />
</route>
Dynamic Router(動的ルーティング)
Dynamic Routerでは、メッセージExchangeのデータの内容とルーティング元のエンドポイントを条件に、メッセージを動的にルーティングするロジックを実装できます。
詳細は以下の公式サイトを参照してください。(使ったことがないのでスキップ。そのうち解説記事を書くかもしれません)
Load Balancer(負荷分散)
Load Balancerでは、複数の送り先のエンドポイントがある場合に分散してルーティングすることができます。
詳細は以下の公式サイトを参照してください。(そのうち解説記事を書くかもしれません)
Multicast(マルチキャスト)
Multicastはデフォルトで各エンドポイントを順番に呼び出します。順番に実行するならtoを複数並べるのとほとんど変わらないので、並列処理させることが多いです。
並列処理する場合は、parallelProcessing属性に"true"を設定します。
.multicast()
.parallelProcessing(true)
.to("direct:a")
.to("direct:b")
.to("direct:c")
XML DSLの場合は以下のように定義します。
<multicast parallelProcessing="true">
<to uri="direct:a" />
<to uri="direct:b" />
<to uri="direct:c" />
</multicast>
マルチキャストするメッセージは浅いコピーであり、最後に処理されたエンドポイントのメッセージが処理結果になります。
上の例では、direct:cで処理されたメッセージが最終的なメッセージになり、他のdirect:a, direct:bのメッセージは破棄されます。
すべてのメッセージを集約するためには、AggregationStrategyを使用します。
また、メッセージを深いコピーにする場合にはonPrepareRefオプションでカスタムする必要があります。
例外について
マルチキャストでは、エンドポイントの1つで例外がスローされた場合も処理を継続されます。
例えば3つのエンドポイントがある場合、1つで例外がスローされても他の2つは処理が行われます。
例外が発生した場合は処理を終了し、Camelエラーハンドラに処理させることもできます。
次に示すようにstopOnExceptionオプションを使用します。
from("timer:trigger?repeatCount=1")
.setBody()
.simple("start: ")
.multicast()
.parallelProcessing(false)
.stopOnException()
.to("direct:a")
.to("direct:b")
.to("direct:c")
.end()
.log("body: ${body}");
XML DSLの場合は以下のように定義します。
<route>
<from uri="timer:trigges?repeatCount=1" />
<setBody>
<simple>start: </simple>
</setBody>
<multicast parallelProcessing="false" stopOnException="true">
<to uri="direct:a" />
<to uri="direct:b" />
<to uri="direct:c" />
</multicast>
<log message="body: ${body}" />
</route>
ただし、並列処理の場合は例外がスローされた場合、処理が終了されることに注意が必要です。
その他
Delayer
Delayerを使用することで、メッセージの処理を待機(WAIT)させることができます。
以下のJava DSLでは、delayに"1000"を指定することで1秒間(1000ミリ秒)待機します。
from("timer:trigger?repeatCount=1")
.log("start.")
.delay()
.constant("1000")
.log("end.");
XML DSLの場合は以下のように定義します。
<route>
<from uri="timer:trigger?repeatCount=1" />
<log message="start." />
<delay>
<constant>1000</constant>
</delay>
<log message="end." />
</route>
上の例ではconstant句で待機時間を固定値で指定していましたがsimple言語で指定することもできます。
以下の例ではメッセージのヘッダーにDelayIntervalTest=3000を指定して待機時間を指定しています。ヘッダーをプロセッサーで動的に指定するようにすればメッセージ毎に任意の時間待機させることもできます(そんなユースケースがあるかは分かりませんが・・・)。
from("timer:trigger?repeatCount=1")
.setHeader("DelayIntervalTest")
.simple("3000")
.log("start.")
.delay()
.simple("${header.DelayIntervalTest}")
.asyncDelayed()
.log("delay asyncDelayed")
.end()
.log("end.");
XML DSLの場合は以下のように定義します。
<route>
<from uri="timer:trigger?repeatCount=1" />
<setHeader headerName="DelayIntervalTest">
<simple>3000</simple>
</setHeader>
<delay asyncDelayed="true">
<simple>${header.DelayIntervalTest}</simple>
<log message="delay asyncDelayed" />
</delay>
<log message="end." />
</route>
また、asyncDelayed="true"を指定しており、スレッドプールを使用してメッセージが非同期的に処理されます。
Wire tap
レスポンスを必要とせず、現在のメッセージを現在のルートとは別に非同期(バックグラウンド)で処理したい場合はWire Tapパターンを使用します。
例えば、メインルートとは別にバックエンドシステムにメッセージを送信したい場合に使用できます。
詳細は以下の記事を参照してください。
Loop
Loopでは繰り返し処理を行えます。
以下の例ではsimple句により繰り返し処理の回数を5回に指定しています。
また、ヘッダーのCamelLoopIndex変数には現在の繰り返し回数が入っており、その値をログに出力するようにしています。
from("timer:trigger?repeatCount=1")
.loop(5)
.log("CamelLoopIndex: ${header.CamelLoopIndex}");
実行結果は以下のようになります。
CamelLoopIndexに0から4まで、5回表示されていることが分かります。
[2019-02-08 22:21:16.178], [INFO ], route1, Camel (camel-1) thread #1 - timer://trigger, route1, CamelLoopIndex: 0
[2019-02-08 22:21:16.180], [INFO ], route1, Camel (camel-1) thread #1 - timer://trigger, route1, CamelLoopIndex: 1
[2019-02-08 22:21:16.180], [INFO ], route1, Camel (camel-1) thread #1 - timer://trigger, route1, CamelLoopIndex: 2
[2019-02-08 22:21:16.180], [INFO ], route1, Camel (camel-1) thread #1 - timer://trigger, route1, CamelLoopIndex: 3
[2019-02-08 22:21:16.181], [INFO ], route1, Camel (camel-1) thread #1 - timer://trigger, route1, CamelLoopIndex: 4
上の例では繰り返し回数を固定で指定していましたが、while文のように条件式を指定することもできます。
XML DSLの場合は以下のように定義します。
<route>
<from uri="timer:trigger?repeatCount=1" />
<loop>
<simple>5</simple>
<log message="CamelLoopIndex: ${header.CamelLoopIndex}" />
</loop>
</route>
Sampling Throttler(サンプリングスロットル)
サンプリングスロットルを使用すると、ルートを通過するトラフィックからサンプルを抽出できます。
サンプリング間隔(時間)を設定すると、その間の1つのExchangeだけが処理されます。
デフォルトではサンプリング間隔は1秒間が指定されます。
サンプリングスロットルのルート例は以下のようになります。
sample句に指定したsamplePeriodオプションがサンプリング間隔で、unitsが間隔の単位になります。
以下の例ではサンプリング間隔は5秒であり、5秒に1回サンプリングされることになります。
from("timer:trigger?repeatCount=100")
.setBody()
.simple("${date:now:yyyy-MM-dd HH:mm:ss}")
.sample()
.samplePeriod(5)
.log("Period Body: ${body}");
XML DSLの場合は以下のように定義します。
<route>
<from uri="timer:trigges?repeatCount=100" />
<setBody>
<simple>${date:now:yyyy-MM-dd HH:mm:ss}</simple>
</setBody>
<sample samplePeriod="5" units="SECOND">
<log message="Period Body: ${body}" />
</sample>
</route>
messageFrequencyオプションで、サンプリングを間隔ではなくメッセージの数で指定することもできます。
下の例では、sample句のmessageFrequencyオプションを"10"に指定しており、10メッセージ毎に1メッセージをサンプリングします。
SimpleDataSet dataSet = new SimpleDataSet();
dataSet.setSize(100);
SimpleRegistry registry = new SimpleRegistry();
registry.put("testDataSet", dataSet);
CamelContext context = new DefaultCamelContext(registry);
try {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("dataset:testDataSet?initialDelay=3000&produceDelay=0")
.sample()
.sampleMessageFrequency(10)
.log("CamelDataSetIndex ${header.CamelDataSetIndex}");
}
});
context.start();
Thread.sleep(10000);
context.stop();
} catch (Exception e) {
e.printStackTrace();
}
実行例は以下のとおりで、ヘッダーのCamelDataSetIndex変数をログに出力しています。
CamelDataSetIndex変数にはデータのシーケンス番号が入っており、9, 19, 29...と10メッセージ毎にログへ出力されていることが確認できます。
[2019-02-08 23:07:54.866], [INFO ], route1, Camel (camel-1) thread #1 - dataset://testDataSet, route1, CamelDataSetIndex 9
[2019-02-08 23:07:54.867], [INFO ], route1, Camel (camel-1) thread #1 - dataset://testDataSet, route1, CamelDataSetIndex 19
[2019-02-08 23:07:54.870], [INFO ], route1, Camel (camel-1) thread #1 - dataset://testDataSet, route1, CamelDataSetIndex 29
[2019-02-08 23:07:54.871], [INFO ], route1, Camel (camel-1) thread #1 - dataset://testDataSet, route1, CamelDataSetIndex 39
[2019-02-08 23:07:54.872], [INFO ], route1, Camel (camel-1) thread #1 - dataset://testDataSet, route1, CamelDataSetIndex 49
XML DSLの場合は以下のように定義します。
<bean id="testDataSet"
class="org.apache.camel.component.dataset.SimpleDataSet">
<property name="size" value="100" />
<property name="reportCount" value="100" />
</bean>
<camelContext
xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="dataset:testDataSet?initialDelay=3000&produceDelay=0" />
<sample messageFrequency="10">
<log message="CamelDataSetIndex ${header.CamelDataSetIndex}" />
</sample>
</route>
</camelContext>
まとめ
本エントリーでは、Apache Camelのメッセージルーティングの基本と、ルートなどのCamelの基本的な概念について説明しました。
メッセージのルーティングは条件分岐、分割、集約など様々な方法が最初から準備されており、簡単なルートの定義でその機能を利用することができました。
ルート・ルーティング・コンポーネントにより、メッセージ(データ)処理を簡単で効率的に実現することができます。