Posted at

Apache CamelのSplitterパターンでメッセージを分割して処理する


はじめに

今回紹介するSplitterパターンは、Eneterprise Integration Patternsの1つになります。

Splitterでは、メッセージExchangeのデータを分割し、パラレルで処理させることができます。例えば、ExchangeにList型のデータが格納されており、それを1つずつパラレルで処理できます。

Sequencer.gif

画像の出典:Enterprise Integration Patterns - Splitter

まずは、以下の3パターンでメッセージの分割処理を試してみます。


  • Listの分割

  • 文字列の分割

  • XPATHでの分割


Listの分割

メッセージにListで格納されたデータをSplitterを使用して分割します。

以下はXML DSLを用いた例で、split要素でListを分割しています。

TestDataProcessorで、StringをListに格納したデータを作成しています。

そのデータはメッセージのBODYに格納されており、Simple要素で分割するデータとしてBODYを指定しています。

    <bean id="testData"

class="example.camelbegginer.first.TestDataProcessor">
</bean>

<camelContext
xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="timer:trigger?repeatCount=1" />
<process ref="testData" />
<log message="Body-before: ${body}" />
<split>
<simple>${body}</simple>
<log message="Body-split: ${body}" />
</split>
<log message="Body-after: ${body}" />
</route>
</camelContext>

リストを作成するためのTestDataProcessorのソースは以下になります。

processメソッドをオーバーライドし、"aaa", "bbb", "ccc"という3つの文字列のリストを作成し、ExchangeのBODYに格納しています。

    @Override

public void process(Exchange exchange) throws Exception {
List<String> list = new ArrayList<String>();
list.add("aaa");
list.add("bbb");
list.add("ccc");

exchange.getIn().setBody(list);
}

このルートを実行すると以下のログが出力されます。

まず、分割前のデータが出力されています。

[2019-02-13 20:17:01.751], [INFO ], route1, Camel (camel-1) thread #1 - timer://trigger, route1, Body-before: [aaa, bbb, ccc]

次に分割後のメッセージが出力されます。同じスレッド(thread #1)で分割されたデータが1行ずつ表示されます。

[2019-02-13 20:17:01.764], [INFO ], route1, Camel (camel-1) thread #1 - timer://trigger, route1, Body-split: aaa

[2019-02-13 20:17:01.766], [INFO ], route1, Camel (camel-1) thread #1 - timer://trigger, route1, Body-split: bbb
[2019-02-13 20:17:01.767], [INFO ], route1, Camel (camel-1) thread #1 - timer://trigger, route1, Body-split: ccc

最後に処理後のデータが集約されて表示されます。今回は分割後にデータを変更していないため、分割前と分割後で同じデータが表示されています。

[2019-02-13 20:17:01.774], [INFO ], route1, Camel (camel-1) thread #1 - timer://trigger, route1, Body-after: [aaa, bbb, ccc]

先ほどXML DSLで記述したルートをJava DSLで記述すると以下のようになります。

                    from("timer:trigger?repeatCount=1")

.process(new TestDataProcessor())
.log("Body-before: ${body}")
.split()
.simple("${body}")
.log("Body-split: ${body}")
.end()
.log("Body-after: ${body}");


文字列の分割

次に複数行の文字列を1行ずつ分割してみます。分割するデータは次の3行からなるCSVファイルを用います。

1,aaa

2,bbb
3,ccc

XML DSLで記載したルートは以下のようになります。先ほどのCSVファイルをFileコンポーネントで読み込み、tokinize要素で分割しています。tokenize要素のtoken属性で、分割する単位に"\n"(改行)を指定することで1行ずつ分割されます。

        <route>

<from uri="file:data/input?noop=true&amp;fileName=file2.csv" />
<log message="Body-before: ${body}" />
<split>
<tokenize token="\n"/>
<log message="Body-split: ${body}" />
</split>
<log message="Body-after: ${body}" />
</route>

ルートの実行例は以下のようになります。

[2019-02-13 20:33:55.645], [INFO ], route2, Camel (camel-1) thread #1 - file://data/input, route2, Body-before: 1,aaa

2,bbb
3,ccc
[2019-02-13 20:33:55.655], [INFO ], route2, Camel (camel-1) thread #1 - file://data/input, route2, Body-split: 1,aaa

[2019-02-13 20:33:55.656], [INFO ], route2, Camel (camel-1) thread #1 - file://data/input, route2, Body-split: 2,bbb

[2019-02-13 20:33:55.657], [INFO ], route2, Camel (camel-1) thread #1 - file://data/input, route2, Body-split: 3,ccc
[2019-02-13 20:33:55.658], [INFO ], route2, Camel (camel-1) thread #1 - file://data/input, route2, Body-after: 1,aaa
2,bbb
3,ccc

また、先ほどXML DSLで記述したルートをJava DSLで記述すると以下のようになります。

                    from("file:data/input?noop=true&fileName=file2.csv&initialDelay=3000")

.log("Body-before: ${body}")
.split()
.tokenize("\n")
.log("Body-split: ${body}")
.end()
.log("Body-after: ${body}");


XPATHでの分割

最後にXMLファイルをXPATHで分割してみます。読み込むXMLファイルは以下の3行になります。これをbar要素で分割します。

<foo>

<bar>aaa</bar>
<bar>bbb</bar>
<bar>ccc</bar>
</foo>

XML DSLで記載したルートは以下のようになります。先ほどのXMLファイルをFileコンポーネントで読み込み、xpath要素で分割しています。xpath要素にXPathとして"//foo/bar"のように指定し、分割する単位を指定しています。

        <route>

<from uri="file:data/input?noop=true&amp;fileName=file1.xml" />
<log message="Body-before: ${body}" />
<split>
<xpath>//foo/bar</xpath>
<log message="Body-split: ${body}" />
</split>
<log message="Body-after: ${body}" />
</route>

ルートの実行例は以下のようになります。

[2019-02-13 20:53:56.451], [INFO ], route1, Camel (camel-1) thread #1 - file://data/input, route1, Body-before: <foo>

<bar>aaa</bar>
<bar>bbb</bar>
<bar>ccc</bar>
</foo>
[2019-02-13 20:53:56.458], [INFO ], o.a.c.b.x.XPathBuilder, Camel (camel-1) thread #1 - file://data/input, org.apache.camel.builder.xml.XPathBuilder, Created default XPathFactory com.sun.org.apache.xpath.internal.jaxp.XPathFactoryImpl@58af0c62
[2019-02-13 20:53:56.516], [INFO ], route1, Camel (camel-1) thread #1 - file://data/input, route1, Body-split: <bar>aaa</bar>
[2019-02-13 20:53:56.517], [INFO ], route1, Camel (camel-1) thread #1 - file://data/input, route1, Body-split: <bar>bbb</bar>
[2019-02-13 20:53:56.518], [INFO ], route1, Camel (camel-1) thread #1 - file://data/input, route1, Body-split: <bar>ccc</bar>
[2019-02-13 20:53:56.519], [INFO ], route1, Camel (camel-1) thread #1 - file://data/input, route1, Body-after: <foo>
<bar>aaa</bar>
<bar>bbb</bar>
<bar>ccc</bar>
</foo>

先ほどXML DSLで記述したルートをJava DSLで記述すると以下のようになります。

                    from("file:data/input?noop=true&fileName=file1.xml")

.log("Body-before: ${body}")
.split()
.xpath("//foo/bar")
.log("Body-split: ${body}")
.end()
.log("Body-after: ${body}");


並列処理

これまでの例では、分割したデータは1スレッドで後続の処理が実行されていました。これを並列で処理するためには、以下のルートのようにsplit要素にparallelProcessing="true"と指定します。parallelProcessing属性を追加しただけで、ルートの内容は文字列分割時の例と同じです。

これによりデフォルトのスレッドプールが使用されて並列処理が行われます。デフォルトのスレッドプールは最大で20スレッドで定義されており、最大スレッド数を上げるなどしたい場合はカスタムのスレッドプールを使用することになります。

        <route>

<from uri="file:data/input?noop=true&amp;fileName=file2.csv&amp;initialDelay=3000" />
<log message="Body-before: ${body}" />
<split parallelProcessing="true">
<tokenize token="\n"/>
<log message="Body-split: ${body}" />
</split>
<log message="Body-after: ${body}" />
</route>

先ほどXML DSLで記述したルートをJava DSLで記述すると以下のようになります。

                    from("file:data/input?noop=true&fileName=file2.csv&initialDelay=3000")

.log("Body-before: ${body}")
.split()
.tokenize("\n")
.parallelProcessing(true)
.log("Body-split: ${body}")
.end()
.log("Body-after: ${body}");


エラー処理

データ分割後の各メッセージの処理でエラーが発生した場合、デフォルトでは発生したメッセージ以外のメッセージは中断されずに処理されます。実際にエラーを発生させて、どのような動きをするか確認してみます。

実行するルートは以下のXML DSLで、1番目のメッセージで強制的にRuntimeExceptionを発生させています。

"${header.CamelSplitIndex} == 0"で、1番目のメッセージを指定し、次のthrowException要素で例外をスローさせています。

        <route>

<from uri="timer:trigger2?repeatCount=1" />
<process ref="testData" />
<log message="Body-before: ${body}" />
<split>
<simple>${body}</simple>
<log message="Body-split: ${body}" />
<choice>
<when>
<simple>${header.CamelSplitIndex} == 0</simple>
<throwException exceptionType="java.lang.RuntimeException" message="test exception" />
</when>
</choice>
</split>
<log message="Body-after: ${body}" />
</route>

ルートを実行すると以下のようにログが出力されます。

2行目で"Body-split: aaa"と出力され、分割された1行目の処理が始まっています。

[2019-02-13 20:44:20.634], [INFO ], route1, Camel (camel-1) thread #1 - timer://trigger2, route1, Body-before: [aaa, bbb, ccc]

[2019-02-13 20:44:20.644], [INFO ], route1, Camel (camel-1) thread #1 - timer://trigger2, route1, Body-split: aaa

そして、すぐに"java.lang.RuntimeException"が発生し、Message Historyにメッセージが通ったパスが表示され、そのすぐ下にスタックトレースが表示されます。

[2019-02-13 20:44:20.666], [ERROR], o.a.c.p.DefaultErrorHandler, Camel (camel-1) thread #1 - timer://trigger2, org.apache.camel.processor.DefaultErrorHandler, Failed delivery for (MessageId: ID-mky-PC-1550058258944-0-4 on ExchangeId: ID-mky-PC-1550058258944-0-3). Exhausted after delivery attempt: 1 caught: java.lang.RuntimeException: test exception

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[route1 ] [route1 ] [timer://trigger2?repeatCount=1 ] [ 57]
[route1 ] [log2 ] [log ] [ 1]
[route1 ] [choice1 ] [when[simple{${header.CamelSplitIndex} == 0}]choice[] ] [ 0]
[route1 ] [throwException1 ] [throwException[ref:null] ] [ 1]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.RuntimeException: test exception
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_172]
~省略~
[2019-02-13 20:44:20.674], [INFO ], route1, Camel (camel-1) thread #1 - timer://trigger2, route1, Body-split: bbb
[2019-02-13 20:44:20.675], [INFO ], route1, Camel (camel-1) thread #1 - timer://trigger2, route1, Body-split: ccc

最後の2行で、"Body-split: bbb"、"Body-split: ccc"と分割された2つのメッセージが例外発生後も処理されていることが分かります。

エラーが発生しても回復可能な場合は問題ないですが、回復できない場合はエラーが発生すると処理を中断する必要があります。

その場合、以下の例のようにsplit要素にstopOnException="true"を追加することでエラーが発生した場合に処理を中断できます。

        <route>

<from uri="timer:trigger2?repeatCount=1" />
<process ref="testData" />
<log message="Body-before: ${body}" />
<split stopOnException="true">
<simple>${body}</simple>
<log message="Body-split: ${body}" />
<choice>
<when>
<simple>${header.CamelSplitIndex} == 0</simple>
<throwException exceptionType="java.lang.RuntimeException" message="test exception" />
</when>
</choice>
</split>
<log message="Body-after: ${body}" />
</route>

先ほどXML DSLで記述したルートをJava DSLで記述すると以下のようになります。

                    from("timer:trigger2?repeatCount=1")

.process(new TestDataProcessor())
.log("Body-before: ${body}")
.split()
.simple("${body}")
.stopOnException()
.log("Body-split: ${body}")
.choice()
.when()
.simple("${header.CamelSplitIndex} == 0")
.throwException(new RuntimeException("test exception"))
.end()
.log("Body-after: ${body}");


Splitterのオプション

Splitterの例を通していくつかのオプションを紹介しましたが、ここでは改めてオプションを紹介します。

Splitterの主なオプションは下表のとおりです。すべてのオプションは網羅していませんでのご注意ください。

プロパティ名
デフォルト値

説明

parallelProcessing
false
boolean
有効にした場合、サブメッセージは同時に処理される。 呼び出し元スレッドは、全てのサブメッセージが完全に処理されるまで待機する。

executorServiceRef

並列処理に使用されるカスタムスレッドプールを参照する。このオプションを設定すると、並列処理が暗黙的に行われるため、parallelProcessingオプションを有効にする必要はありません。

stopOnException
false
boolean
例外が発生したときに処理を続行するかどうか。 有効にするとサブメッセージの1つが失敗すると他のサブメッセージを処理せずに終了する。無効にすると、1つのサブメッセージが失敗しても他のサブメッセージを処理する。AggregationStrategyクラスで例外を処理する方法を制御できる。

streaming
false
boolean
有効にした場合、Camelはストリーミング方式で入力メッセージをチャンクに分割する。これにより、メモリのオーバーヘッドが削減される。例えば、大量のメッセージを処理する場合はストリーミングを有効にすることが推奨される。なお、ストリーミングが有効になっている場合、サブメッセージの応答は分割前とは順不同となる。

その他のオプションは公式サイトを参照してください。


Exchangeのプロパティ

スプリットに関する情報がプロパティとしてヘッダに格納されています。

格納されるプロパティは下表の3つになります。

プロパティ

説明

CamelSplitIndex
int
分割された各Exchange毎のインデックスで、0から始まる。

CamelSplitSize
int
分割されたExchangeの合計数です。ストリーム処理の場合は適用されません。

CamelSplitComplete
boolean
現在のExchangeが最後かどうかを示します。

これらのプロパティの値を確認するルートを作成し、どのような値が格納されているかを確認してみます。

        <route>

<from uri="file:data/input?noop=true&amp;fileName=file2.csv&amp;initialDelay=3000" />
<log message="Body-before: ${body}" />
<split>
<tokenize token="\n"/>
<log message="Body-split: ${body}, Index: ${header.CamelSplitIndex}, Size: ${header.CamelSplitSize}, Complete: ${header.CamelSplitComplete}" />
</split>
<log message="Body-after: ${body}" />
</route>

このルートを実行すると、以下のように出力されます。

[2019-02-13 21:58:40.668], [INFO ], route1, Camel (camel-1) thread #1 - file://data/input, route1, Body-before: 1,aaa

2,bbb
3,ccc
[2019-02-13 21:58:40.684], [INFO ], route1, Camel (camel-1) thread #1 - file://data/input, route1, Body-split: 1,aaa
, Index: 0, Size: 3, Complete: false
[2019-02-13 21:58:40.684], [INFO ], route1, Camel (camel-1) thread #1 - file://data/input, route1, Body-split: 2,bbb
, Index: 1, Size: 3, Complete: false
[2019-02-13 21:58:40.685], [INFO ], route1, Camel (camel-1) thread #1 - file://data/input, route1, Body-split: 3,ccc, Index: 2, Size: 3, Complete: true
[2019-02-13 21:58:40.686], [INFO ], route1, Camel (camel-1) thread #1 - file://data/input, route1, Body-after: 1,aaa
2,bbb
3,ccc

関係ある個所を抜粋すると以下の3行になります。Sizeで全体のデータ数が"3"であり、Indexで何番目のデータであるかが分かります。そして、最後のデータの場合はCompleteの値が"true"になっています。

Index: 0, Size: 3, Complete: false

Index: 1, Size: 3, Complete: false
Index: 2, Size: 3, Complete: true


参考