Edited at

Apache CamelのWire Tapパターンでメインルートと非同期に処理を行う


Apache CamelのWire Tapパターンでメインルートと非同期に処理を行う

レスポンスを必要とせず、現在のメッセージをメインのルートとは別にバックグラウンドで非同期に処理したい場合はWire Tapパターンを使用します。

例えば、メインルートとは別にバックエンドシステムにロギングのためのメッセージを送信したい場合等に使用できます。

WireTap.gif

XML DSLでのルート定義の例は以下のようになります。wireTap要素でWire Tapパターンであることを指定し、uriオプションに送信先のエンドポイントを指定します。

        <route>

<from uri="direct:start" />
<log message="main start" />
<wireTap uri="direct:tap" />
<log message="main end" />
</route>

Java DSLでは以下のようになります。

                    from("direct:start")

.log("main start")
.wireTap("direct:tap")
.log("main end");

WireTapされたメッセージは別スレッドで処理されており、処理結果を待たずにメインのルートの処理が継続されます。別スレッドで処理されているため、WireTapの中でエラーが発生した場合もメインのルートには影響を及ぼしません。


Wire TapパターンとMulticastパターンの違い

Wire Tapパターンと似ているパターンにMulticastパターンがあります。

Multicastパターンはメッセージを複数のエンドポイントで並列に処理するという点でWire Tapパターンと近いです。

Wire TapパターンとMulticastパターンの違いは以下の点です。


  • Multicastパターンは複数のエンドポイントにメッセージを送信します。Wire Tapパターンはメインルートとは別に1つのエンドポイントのみを持ちます。

  • Multicastパターンは並列処理したメッセージを集約することができます。一方、Wire Tapパターンは1方向のみで送信したメッセージの応答を受け取ることはできません。


Exchangeの浅いコピー(Shallow Copy)と深いコピー(Deep Copy)

Wire Tapでは、デフォルトでExchangeの浅いコピーを作成します。浅いコピーでは元のメッセージのBODYと同じオブジェクトであるため、Wire Tapのルート中でそのオブジェクトが変更されると、呼び出し元のメッセージも変更される可能性があります。

そのようなことを避けるために深いコピーで実装することもできます。

深いコピーを利用するためには、WireTap要素のonPrepareRefオプションを使用します。

onPrepareRefオプションにより、WireTapの前処理を行うプロセッサーを指定することができ、そのプロセッサー内で以下のように深いコピーを行います。

        <route>

<from uri="direct:start" />
<setBody><simple>${bean:item?method=getNewItem('org_name', 'org_value')}</simple></setBody>
<log message="start1" />
<wireTap uri="direct:tap" onPrepareRef="itemClone" />
<delay><simple>3000</simple></delay>
<log message="main: ${body.getName}" />
</route>

メッセージのBODYには以下のItemクラスのインスタンスを格納しています。

また、深いコピーを行うdeepCloneメソッドを実装しています。

public class Item {

private String name;
private String value;

~省略~
public Item deepClone() {
Item item = new Item();
item.name = this.name;
item.value = this.value;
return item;
}
}

前処理を行うプロセッサーは通常のプロセッサーと同様でProcessorインターフェースのprocessメソッドを実装します。

以下のprocessメソッドでは、ItemクラスのdeepCloneメソッドで深いコピーのインスタンスを取得し、BODYに設定しています。

public class ItemClonePrepareProcessor implements Processor {

@Override
public void process(Exchange exchange) throws Exception {
Item item = exchange.getIn().getBody(Item.class);
exchange.getIn().setBody(item.deepClone());
}
}

このようにWire Tapパターンで深いコピーを実装することができ、浅いコピーのようにWireTap先でメインルートに影響を及ぼすことはありません。

また、コピーではなく新しいメッセージを作成し、WireTapすることもできます。その場合はprocessorRefオプションで新規メッセージを作成するプロセッサーを作成します。


Wire Tapプロセッサのオプション一覧

Wire Tapプロセッサのオプションを下表に記載します。

プロパティ名
デフォルト値

説明

executorServiceRef

WireTapされたメッセージを並列で処理するためのカスタムスレッドプールのIDを参照する。

processorRef

WireTap時にメッセージをコピーするのではなく、新しいメッセージを作成する場合、カスタムプロセッサを定義します。

copy
true
boolean
メッセージをWireTapする際にExchangeをコピーするかどうかを指定します。デフォルトではtrue(コピーする)で、processorRefを指定する場合にはfalseを設定します。

onPrepareRef

メッセージをWireTapする際のExchangeコピー前に事前処理が必要な場合、カスタムプロセッサのIDを指定します。WireTap時にコピーされたメッセージは浅いコピーであるため、深いコピーを実現する際などに使用されます。

cacheSize
1000
int
再利用のためにプロデューサをキャッシュするProducerCacheのキャッシュサイズを設定できます。 デフォルトでは1000のデフォルトキャッシュサイズが使用されます。値を-1に設定すると、キャッシュをまとめてオフにできます。(いまいちよく分かりませんが)

ignoreInvalidEndpoint
false
boolean
解決できなかったエンドポイントURIを無視するかどうか。falseの場合、Camelは無効なエンドポイントURIを識別すると例外をスローします。


参考