はじめに
Aggregatorは、複数メッセージを1つのメッセージに集約することができます。複数のメッセージを分割するSplitterと逆の処理になります。
画像の出典:Enterprise Integration Patterns - Aggregator
Aggregatorは集約中のメッセージをメモリ上に保持するため、アプリケーションがダウンすると集約中のメッセージがロストします。
リポジトリにLevelDBを利用することでデータロストを防止することができます。
SEDAやrouteを流れるExchangeは永続化されていませんので、Aggregatorだけ永続化したいユースケースは少ない(?)かもしれませんが、たまたまそのようなユースケースに出会ったので調べてみました。
LevelDBの準備
まず、「camel-leveldb」のライブラリを追加します(以下はMavenの場合)。
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-leveldb</artifactId>
<version>${camel.version}</version>
</dependency>
次にLevelDBのリポジトリを定義します。
・persistentFileNameでLevelDBが使用するディレクトリを指定する。
・repositoryNameでレポジトリの名前を指定する。
<bean id="myRepo"
class="org.apache.camel.component.leveldb.LevelDBAggregationRepository">
<property name="persistentFileName" value="data/leveldb.dat" />
<property name="repositoryName" value="myRepo" />
<property name="sync" value="true" />
<property name="useRecovery" value="true" />
<property name="recoveryInterval" value="5000" />
</bean>
テスト用のXML DSLを作成する
routeは以下のように作成。
・1秒ごとに実行日時の文字列を持ったメッセージをrouteに流す。
・aggregationRepositoryRefでリポジトリにLevelDBを指定。
・aggregateのcompletionSizeで"5"を指定することでメッセージが5件まで集約される。
・completionTimeoutに"5000"(5秒)を指定することで最後にメッセージを受信してから5秒後に集約を完了する。
・集約されたメッセージは、「<to uri="file://output」でoutputフォルダにファイル出力する。
・correlationExpressionでどのような単位で集約するかを指定する。今回は全てのメッセージを1つに集約するので"true"を指定している。
・forceCompletionOnStopでアプリケーション停止時に集約を完了させるように指定。
<camelContext
xmlns="http://camel.apache.org/schema/spring">
<route id="parentRoute">
<from uri="timer://run?delay=1000&period=1000" />
<setBody>
<simple>${date:now:yyyy-MM-dd HH:mm:ss}</simple>
</setBody>
<log message="#### timer start" />
<aggregate strategyRef="MyAggregationStrategy" aggregationRepositoryRef="myRepo" forceCompletionOnStop="true"
completionSize="5">
<correlationExpression>
<constant>true</constant>
</correlationExpression>
<completionTimeout>
<simple>5000</simple>
</completionTimeout>
<to
uri="file://output?doneFileName=$simple{file:name}.done&fileExist=Fail" />
<log message="#### aggregate end" />
</aggregate>
</route>
</camelContext>
strategyRefでは、集約のやり方(フォーマット)を指定します。
今回は「,」区切り(CSV)のstrategyを以下のように書いています。
package sample;
import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;
public class MyAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
return newExchange;
}
String oldB = oldExchange.getIn().getBody(String.class);
String newB = newExchange.getIn().getBody(String.class);
oldExchange.getIn().setBody(oldB + "," + newB);
return oldExchange;
}
}
実行してメッセージがロストしないことを確認する
実行すると、以下のようにログ出力され、5件(timer start)ごとに集約(aggregate end)されていることが分かります。
[2018-10-04 20:29:21.635], [INFO ], parentRoute, Camel (camel-1) thread #3 - timer://run, parentRoute, #### timer start
[2018-10-04 20:29:22.636], [INFO ], parentRoute, Camel (camel-1) thread #3 - timer://run, parentRoute, #### timer start
[2018-10-04 20:29:23.637], [INFO ], parentRoute, Camel (camel-1) thread #3 - timer://run, parentRoute, #### timer start
[2018-10-04 20:29:24.638], [INFO ], parentRoute, Camel (camel-1) thread #3 - timer://run, parentRoute, #### timer start
[2018-10-04 20:29:25.638], [INFO ], parentRoute, Camel (camel-1) thread #3 - timer://run, parentRoute, #### timer start
[2018-10-04 20:29:25.656], [INFO ], parentRoute, Camel (camel-1) thread #3 - timer://run, parentRoute, #### aggregate end
出力されたファイルは以下のように、実行日時が「,」区切りでまとめられています。
2018-10-04 20:29:21,2018-10-04 20:29:22,2018-10-04 20:29:23,2018-10-04 20:29:24,2018-10-04 20:29:25
正常に動作することを確認したところで、次にアプリケーションを途中でKILLしてもメッセージがロストしないことを確認します。
実行1回目は3件でKILLする。5件未満なので集約されないためファイルも出力されない。
[2018-10-04 20:40:49.502], [INFO ], parentRoute, Camel (camel-1) thread #3 - timer://run, parentRoute, #### timer start
[2018-10-04 20:40:50.471], [INFO ], parentRoute, Camel (camel-1) thread #3 - timer://run, parentRoute, #### timer start
[2018-10-04 20:40:51.472], [INFO ], parentRoute, Camel (camel-1) thread #3 - timer://run, parentRoute, #### timer start
2回目を実行すると2件目(1回目と合わせて合計5件)で集約され、ファイル出力されている。
[2018-10-04 20:41:38.047], [INFO ], parentRoute, Camel (camel-1) thread #3 - timer://run, parentRoute, #### timer start
[2018-10-04 20:41:39.041], [INFO ], parentRoute, Camel (camel-1) thread #3 - timer://run, parentRoute, #### timer start
[2018-10-04 20:41:39.076], [INFO ], parentRoute, Camel (camel-1) thread #3 - timer://run, parentRoute, #### aggregate end
ファイルを確認すると、1回目(20時40分)と2回目(20時41分)の5件が漏れなく出力されていることを確認できた。
2018-10-04 20:40:49,2018-10-04 20:40:50,2018-10-04 20:40:51,2018-10-04 20:41:38,2018-10-04 20:41:39
アプリケーションを途中でKILLしてもメッセージがロストしないことを確認できました。
なるべくこのような永続化に頼らないでリカバリできるように設計した方が良いかと思いますが、そうはうまくいかないこともありますので、Aggregatorで永続化できるのも便利だなと思いました。
注意点
CamelのLevelDBのアクセスが排他処理になっており、複数スレッドが同時にLevelDBへアクセスできないようです。そのため、大量データを高速処理する場合には向きません。
XML DSL全体
試しに作成したXML DSL全体は以下のとおり。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<bean id="myRepo"
class="org.apache.camel.component.leveldb.LevelDBAggregationRepository">
<property name="persistentFileName" value="./leveldb_data" />
<property name="repositoryName" value="myRepo" />
<property name="sync" value="true" />
<property name="useRecovery" value="true" />
<property name="recoveryInterval" value="5000" />
</bean>
<bean id="MyAggregationStrategy"
class="sample.MyAggregationStrategy" />
<camelContext
xmlns="http://camel.apache.org/schema/spring">
<route id="parentRoute">
<from uri="timer://run?delay=1000&period=1000" />
<setBody>
<simple>${date:now:yyyy-MM-dd HH:mm:ss}</simple>
</setBody>
<log message="#### timer start" />
<aggregate strategyRef="MyAggregationStrategy" aggregationRepositoryRef="myRepo" forceCompletionOnStop="true"
completionSize="5">
<correlationExpression>
<constant>true</constant>
</correlationExpression>
<completionTimeout>
<simple>5000</simple>
</completionTimeout>
<to
uri="file://output?doneFileName=$simple{file:name}.done&fileExist=Fail" />
<log message="#### aggregate end" />
</aggregate>
</route>
</camelContext>
</beans>
camel-leveldbコンポーネントの主なプロパティ
最後にcamel-leveldbコンポーネントの主なプロパティについて説明します。
プロパティは以下の表以外にもあり、詳細は公式ページを参照してください。
プロパティ名 | 説明 | デフォルト値 | 型 |
---|---|---|---|
repositoryName | 必須オプション。LevelDBのリポジトリ名を指定する。 複数のリポジトリに共有LevelDBFileを使用できます。 | String | |
persistentFileName | 永続ストレージのファイル名を指定する。起動時にファイルが存在しない場合は、新しいファイルが作成される。 | String | |
sync | LevelDBFileへの書き込み時に同期するかどうかを指定する。デフォルト値はfalse(非同期)。書き込み時に同期することで、すべての書き込みが完了するまで待機するため、データがロストすることはない。 | false | boolean |
useRecovery | リカバリを有効にするかを指定する。デフォルト値はtrue(リカバリ有効)。有効にすると、Camel AggregatorはExchangeの集約に失敗すると自動でリカバリし、それらを再送信する。 | true | boolean |
recoveryInterval | リカバリが有効になっている場合、失敗したExchangeをスキャンしてリカバリおよび再送信するためにバックグラウンドタスクが実行される。デフォルトでは5000ミリ秒間隔で実行される。 | 5000 | long |
maximumRedeliveries | リカバリされたExchangeの再送信の最大試行回数を指定する。有効にした場合、最大試行回数だけ失敗した場合、Exchangeはデッドレターチャンネルに移動される。 デフォルトでは無効になっている。このオプションを使用する場合はdeadLetterUriオプションも指定する。 | int | |
deadLetterUri | リトライアウトしたExchangeが移動されるデッドレターチャンネルのためのエンドポイントURIを指定する。 | String |