Apache CamelのWire Tapパターンでメインルートと非同期に処理を行う
レスポンスを必要とせず、現在のメッセージをメインのルートとは別にバックグラウンドで非同期に処理したい場合はWire Tapパターンを使用します。
例えば、メインルートとは別にバックエンドシステムにロギングのためのメッセージを送信したい場合等に使用できます。
XML DSLでのルート定義の例は以下のようになります。wireTap要素でWire Tapパターンであることを指定し、uriオプションに送信先のエンドポイントを指定します。
<route>
<from uri="direct:start" />
<log message="main start" />
<wireTap uri="direct:tap" />
<log message="main end" />
</route>
Java DSLでは以下のようになります。
from("direct:start")
.log("main start")
.wireTap("direct:tap")
.log("main end");
WireTapされたメッセージは別スレッドで処理されており、処理結果を待たずにメインのルートの処理が継続されます。別スレッドで処理されているため、WireTapの中でエラーが発生した場合もメインのルートには影響を及ぼしません。
Wire TapパターンとMulticastパターンの違い
Wire Tapパターンと似ているパターンにMulticastパターンがあります。
Multicastパターンはメッセージを複数のエンドポイントで並列に処理するという点でWire Tapパターンと近いです。
Wire TapパターンとMulticastパターンの違いは以下の点です。
- Multicastパターンは複数のエンドポイントにメッセージを送信します。Wire Tapパターンはメインルートとは別に1つのエンドポイントのみを持ちます。
- Multicastパターンは並列処理したメッセージを集約することができます。一方、Wire Tapパターンは1方向のみで送信したメッセージの応答を受け取ることはできません。
Exchangeの浅いコピー(Shallow Copy)と深いコピー(Deep Copy)
Wire Tapでは、デフォルトでExchangeの浅いコピーを作成します。浅いコピーでは元のメッセージのBODYと同じオブジェクトであるため、Wire Tapのルート中でそのオブジェクトが変更されると、呼び出し元のメッセージも変更される可能性があります。
そのようなことを避けるために深いコピーで実装することもできます。
深いコピーを利用するためには、WireTap要素のonPrepareRefオプションを使用します。
onPrepareRefオプションにより、WireTapの前処理を行うプロセッサーを指定することができ、そのプロセッサー内で以下のように深いコピーを行います。
<route>
<from uri="direct:start" />
<setBody><simple>${bean:item?method=getNewItem('org_name', 'org_value')}</simple></setBody>
<log message="start1" />
<wireTap uri="direct:tap" onPrepareRef="itemClone" />
<delay><simple>3000</simple></delay>
<log message="main: ${body.getName}" />
</route>
メッセージのBODYには以下のItemクラスのインスタンスを格納しています。
また、深いコピーを行うdeepCloneメソッドを実装しています。
public class Item {
private String name;
private String value;
~省略~
public Item deepClone() {
Item item = new Item();
item.name = this.name;
item.value = this.value;
return item;
}
}
前処理を行うプロセッサーは通常のプロセッサーと同様でProcessorインターフェースのprocessメソッドを実装します。
以下のprocessメソッドでは、ItemクラスのdeepCloneメソッドで深いコピーのインスタンスを取得し、BODYに設定しています。
public class ItemClonePrepareProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
Item item = exchange.getIn().getBody(Item.class);
exchange.getIn().setBody(item.deepClone());
}
}
このようにWire Tapパターンで深いコピーを実装することができ、浅いコピーのようにWireTap先でメインルートに影響を及ぼすことはありません。
また、コピーではなく新しいメッセージを作成し、WireTapすることもできます。その場合はprocessorRefオプションで新規メッセージを作成するプロセッサーを作成します。
Wire Tapプロセッサのオプション一覧
Wire Tapプロセッサのオプションを下表に記載します。
プロパティ名 | デフォルト値 | 型 | 説明 |
---|---|---|---|
executorServiceRef | WireTapされたメッセージを並列で処理するためのカスタムスレッドプールのIDを参照する。 | ||
processorRef | WireTap時にメッセージをコピーするのではなく、新しいメッセージを作成する場合、カスタムプロセッサを定義します。 | ||
copy | true | boolean | メッセージをWireTapする際にExchangeをコピーするかどうかを指定します。デフォルトではtrue(コピーする)で、processorRefを指定する場合にはfalseを設定します。 |
onPrepareRef | メッセージをWireTapする際のExchangeコピー前に事前処理が必要な場合、カスタムプロセッサのIDを指定します。WireTap時にコピーされたメッセージは浅いコピーであるため、深いコピーを実現する際などに使用されます。 | ||
cacheSize | 1000 | int | 再利用のためにプロデューサをキャッシュするProducerCacheのキャッシュサイズを設定できます。 デフォルトでは1000のデフォルトキャッシュサイズが使用されます。値を-1に設定すると、キャッシュをまとめてオフにできます。(いまいちよく分かりませんが) |
ignoreInvalidEndpoint | false | boolean | 解決できなかったエンドポイントURIを無視するかどうか。falseの場合、Camelは無効なエンドポイントURIを識別すると例外をスローします。 |