Posted at

Apache CamelでExchangeを集約するAggregatorパターンを使ってみる


Apache CamelのAggregatorパターンを使ってみる

Aggregatorではルート中のExchangeを集約し処理することができます。例えば、ルートに流れてきた1データをCSVの1レコードに変更し、複数レコードを集約しまとめてCSVファイルに出力するようなことが可能です。

Aggregator.gif

Aggregatorパターンでは以下の3つのオプションを必ず指定します。


  • 集約条件

  • 完了条件

  • Aggregation Strategy

集約条件(correlationExpression)は、集約(Aggregate)する際にどの単位で集約するかの条件を指定します。例えば、データの種類としてA, B, Cとあり、それを集約条件にした上でAABBBCというデータを受信した場合、集約後のデータはA2個、B3個、C1個になります。もちろん集約時にデータを分けず、1つに集約することもできます。

完了条件は、受信したメッセージの集約を区切るための条件で、例えば、10個単位に集約する、10秒ごとに集約するなどの条件を指定することができます。条件はいろいろな設定を実施することができますので、後で説明します。

最後にAggregation Strategyは、各メッセージを集約時にどのように1つにマージするかを指定します。例えば、集約時にCSVを出力したいのであれば、1つのメッセージを1行のCSVとし、新しいメッセージを受信したら集約中のデータの次の行に追加するなどのことができます。


シンプルなAggregatorパターンの実装を見てみる

まずは、シンプルなAggregatorパターンの実装を見てみましょう。

        <route>

<from uri="direct:aggregate" />
<log message="before: ${body}" />
<aggregate strategyRef="MyAggregateStrategy" completionSize="5">
<correlationExpression>
<constant>true</constant>
</correlationExpression>
<log message="aggregate: ${body}" />
</aggregate>
<log message="after: ${body}" />
</route>

上のルートは次の条件を指定したAggregatorパターンの例です。


  • 集約条件:correlationExpression要素にtrueを指定することで1つに集約している。

  • 完了条件:completionSize="5"と指定することで、5つのメッセージ毎に集約している。

  • Aggregation Strategy:strategyRefでAggregationするクラスの参照を指定します。

Java DSLの場合は以下のように書きます。

                    from("direct:aggregate")

.log("before: body->${body}")
.aggregate(constant(true), new MyAggregateStrategy())
.completionSize(5)
.log("aggregate: body->${body}")
.end()
.log("after: body->${body}");

Aggregation Strategyについて、もう少し見ていきましょう。

Aggregationするクラスの参照先は以下のようにMyAggregateStrategyクラスです。

    <bean id="MyAggregateStrategy"

class="example.camelbegginer.aggregator.MyAggregateStrategy" />

MyAggregateStrategyクラスは次のようにAggregationStrategyインターフェースを実装したクラスを作成します。

package example.camelbegginer.aggregator;

import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;

public class MyAggregateStrategy implements AggregationStrategy {

@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

// (1)
if (oldExchange == null) {
return newExchange;
}

String oldStr = oldExchange.getIn().getBody(String.class);
String newStr = newExchange.getIn().getBody(String.class);

// (2)
oldStr = oldStr + "\n" + newStr;
oldExchange.getIn().setBody(oldStr);

return oldExchange;
}
}

(1) 集約中のメッセージ(oldExchange)がnullであれば、新規に受信したメッセージ(newExchange)で集約する。

(2) 集約中のメッセージ(oldExchange)がnullでなければ、oldExchangeの次の行にnewExchangeを入れて集約する。


集約条件

集約条件(correlationExpression)は、集約(Aggregate)する際にどの単位で集約するかの条件を指定します。

先ほどの例では全体で1つに集約していましたが、今回はヘッダーkindの値により集約を分けてみます。

        <route>

<from uri="direct:aggregate" />
<log message="before: kind->${header.kind}, body->${body}" />
<aggregate strategyRef="MyAggregateStrategy" completionSize="3">
<correlationExpression>
<simple>${header.kind}</simple><!-- (1) -->
</correlationExpression>
<log message="aggregate: kind->${header.kind}, body->${body}" />
</aggregate>
<log message="after: kind->${header.kind}, body->${body}" />
</route>

(1) メッセージには全てkindというヘッダを設定しており、その値に応じて集約するように設定しています。

その設定はcorrelationExpressionオプションを使用し、Simple式で"${header.kind}"というようにヘッダkindを用いるように指定します。

Java DSLでは以下のようになります。

                    from("direct:aggregate")

.log("before: kind->${header.kind}, body->${body}")
.aggregate(simple("${header.kind}"), new MyAggregateStrategy())
.completionSize(3)
.log("aggregate: kind->${header.kind}, body->${body}")
.end()
.log("after: kind->${header.kind}, body->${body}");


完了条件

先ほどの例では、5つのメッセージ毎に集約するという、メッセージ数毎の完了条件を説明しました。

ここでは他の集約条件も含めて説明していきます。

今回は説明しませんが、メッセージ数や時間(インターバル・タイムアウト)以外の条件を完了条件として利用する場合、completionPredicateオプションを使用することができます。 (単に使ったことがないので省略)


メッセージ数による完了条件(completionSize)

まずはメッセージ数による完了条件です。これは指定した数のメッセージが集まったら集約を完了させる条件です。先ほどの例での5つのメッセージ毎に集約するというのがこの完了条件になります。

なお、completionSizeだけを設定した場合、メッセージが集まらないと集約されません。そのため、通常は次のcompletionIntervalかcompletionTimeoutと一緒に設定します。


インターバルによる完了条件(completionInterval)

一定のインターバル毎に集約を完了させる完了条件です。メッセージ数によらず、一定のインターバル(5秒など)が経過すると集約を完了します。

以下はインターバルによる完了条件のルート例です。

completionInterval="1000"で、インターバルを1秒(1000ミリ秒)を指定しています。

completionSize="3"でメッセージ数も指定しているため、1秒未満で3つのメッセージが集約されるとその時点で集約を完了します。3つのメッセージが集まらなかった場合は1秒で集約を完了します。

        <route>

<from uri="direct:aggregate" />
<log message="before: body->${body}" />
<aggregate strategyRef="MyAggregateStrategy" completionSize="3" completionInterval="1000">
<correlationExpression>
<constant>true</constant>
</correlationExpression>
<log message="aggregate: body->${body}" />
</aggregate>
<log message="after: body->${body}" />
</route>

Java DSLでは以下のようになります。

                    from("direct:aggregate")

.log("before: body->${body}")
.aggregate(constant(true), new MyAggregateStrategy())
.completionSize(3)
.completionInterval(1000L)
.log("aggregate: body->${body}")
.end()
.log("after: body->${body}");


タイムアウトによる完了条件(completionTimeout)

タイムアウトによる完了条件では、最後に集約したメッセージから指定した時間経過しても次のメッセージを受信しない場合に集約を完了します。

次の例ではcompletionTimeout="2000"と、タイムアウトを2秒(2000ミリ秒)を指定しています。

completionSize="5"でメッセージ数も指定しているため、5つのメッセージ毎に集約完了し、5つ未満の場合でも2秒間メッセージを受信しないならば集約を完了することになります。

        <route>

<from uri="direct:aggregate" />
<log message="before: body->${body}" />
<aggregate strategyRef="MyAggregateStrategy" completionSize="5" completionTimeout="2000">
<correlationExpression>
<constant>true</constant>
</correlationExpression>
<log message="aggregate: body->${body}" />
</aggregate>
<log message="after: body->${body}" />
</route>

Java DSLでは以下のようになります。

                    from("direct:aggregate")

.log("before: body->${body}")
.aggregate(constant(true), new MyAggregateStrategy())
.completionSize(5)
.completionTimeout(2000L)
.log("aggregate: body->${body}")
.end()
.log("after: body->${body}");


Apache CamelのAggregatorがサーバダウン時にメッセージをロストするのを防ぐ

Aggregatorは集約中のメッセージをメモリ上に保持するため、アプリケーションがダウンすると集約中のメッセージがロストします。

リポジトリにLevelDBを利用することでデータロストを防止することができます。

詳細は以下の記事に記載しています。


オプション

Aggregatorパターンの主なオプションを以下の表に記載します。

プロパティ名
デフォルト値

説明

correlationExpression

集約に使用するcorrelationキーの式を指定する必須のプロパティ。同じcorrelationキーを持つExchangeがまとめられます。correlationキーを評価できなかった場合、例外がスローされます。 ignoreBadCorrelationKeysオプションを使用してこれを無効にできます。

aggregationStrategy

受信したExchangeを既存のExchangeとマージするために使用されるAggregationStrategyインスタンスを指定する。最初の呼び出しでoldExchangeパラメータはnullが設定される。2回目からは、マージ後のExchangeがoldExchangeに設定されます。

completionSize

集約を完了するメッセージ数を指定する。 このオプションは固定値、もしくは動的な値を設定できます。 両方が設定されている場合、動的な値がnullまたは0の場合、Camelは固定値を使用します。

completionTimeout

集約された交換が完了する前に非アクティブになっている必要がある時間(ミリ秒)。 このオプションは、固定値として、またはタイムアウトを動的に評価できるExpressionを使用して設定できます。 両方とも設定されている場合、Expressionの結果がnullまたは0の場合、Camelは固定値を使用します。このオプションをcompletionIntervalとともに使用することはできません。

completionInterval

現在のメッセージの集約を完了するまでのインターバル(ミリ秒)。 このオプションをcompletionTimeoutと一緒に使用することはできません。

completionPredicate

Exchangeの集約がいつ完了したかを示す述語。

forceCompletionOnStop
false
boolean
"true"に設定すると、コンテキストが停止したときに現在の集約されたExchangeをすべて完了します。

completeAllOnStop
false
boolean
コンテキストが停止されたときに、集約中のExchangeがすべて完了するのを待つことを示します。これはまた、アグリゲーションリポジトリに格納されている保留中の交換がすべて完了するのを待つため、リポジトリが空になる前に停止できることを意味します。 メモリベースのみであり、ディスクにデータを格納しないメモリベースの集計リポジトリを使用する場合は、これを有効にします。 このオプションが有効になっていると、アグリゲーターは、CamelContextまたはそれを使用しているルートを停止するときに、停止する前にこれらすべての交換が完了するのを待機しています。

parallelProcessing
false
boolean
並列処理のためにスレッドプールを使用するかどうかを示します。カスタムスレッドプールが指定されていない場合、デフォルトで10スレッドを持つデフォルトプールを作成します。

executorService

parallelProcessingを使用している場合は、使用するカスタムスレッドプールを指定できます。

executorServiceRef

レジストリ内のexecutorServiceを検索するための参照。


参考