LoginSignup
3
1

More than 5 years have passed since last update.

Apache CamelのWire Tapパターンでメインルートと非同期に処理を行う

Last updated at Posted at 2019-02-25

Apache CamelのWire Tapパターンでメインルートと非同期に処理を行う

レスポンスを必要とせず、現在のメッセージをメインのルートとは別にバックグラウンドで非同期に処理したい場合はWire Tapパターンを使用します。
例えば、メインルートとは別にバックエンドシステムにロギングのためのメッセージを送信したい場合等に使用できます。

WireTap.gif

XML DSLでのルート定義の例は以下のようになります。wireTap要素でWire Tapパターンであることを指定し、uriオプションに送信先のエンドポイントを指定します。

        <route>
            <from uri="direct:start" />
            <log message="main start" />
            <wireTap uri="direct:tap" />
            <log message="main end" />
        </route>

Java DSLでは以下のようになります。

                    from("direct:start")
                        .log("main start")
                        .wireTap("direct:tap")
                        .log("main end");

WireTapされたメッセージは別スレッドで処理されており、処理結果を待たずにメインのルートの処理が継続されます。別スレッドで処理されているため、WireTapの中でエラーが発生した場合もメインのルートには影響を及ぼしません。

Wire TapパターンとMulticastパターンの違い

Wire Tapパターンと似ているパターンにMulticastパターンがあります。
Multicastパターンはメッセージを複数のエンドポイントで並列に処理するという点でWire Tapパターンと近いです。

Wire TapパターンとMulticastパターンの違いは以下の点です。

  • Multicastパターンは複数のエンドポイントにメッセージを送信します。Wire Tapパターンはメインルートとは別に1つのエンドポイントのみを持ちます。
  • Multicastパターンは並列処理したメッセージを集約することができます。一方、Wire Tapパターンは1方向のみで送信したメッセージの応答を受け取ることはできません。

Exchangeの浅いコピー(Shallow Copy)と深いコピー(Deep Copy)

Wire Tapでは、デフォルトでExchangeの浅いコピーを作成します。浅いコピーでは元のメッセージのBODYと同じオブジェクトであるため、Wire Tapのルート中でそのオブジェクトが変更されると、呼び出し元のメッセージも変更される可能性があります。

そのようなことを避けるために深いコピーで実装することもできます。
深いコピーを利用するためには、WireTap要素のonPrepareRefオプションを使用します。
onPrepareRefオプションにより、WireTapの前処理を行うプロセッサーを指定することができ、そのプロセッサー内で以下のように深いコピーを行います。

        <route>
            <from uri="direct:start" />
            <setBody><simple>${bean:item?method=getNewItem('org_name', 'org_value')}</simple></setBody>
            <log message="start1" />
            <wireTap uri="direct:tap" onPrepareRef="itemClone" />
            <delay><simple>3000</simple></delay>
            <log message="main: ${body.getName}" />
        </route>

メッセージのBODYには以下のItemクラスのインスタンスを格納しています。
また、深いコピーを行うdeepCloneメソッドを実装しています。

public class Item {

    private String name;
    private String value;

省略
    public Item deepClone() {
        Item item = new Item();
        item.name = this.name;
        item.value = this.value;
        return item;
    }
}

前処理を行うプロセッサーは通常のプロセッサーと同様でProcessorインターフェースのprocessメソッドを実装します。
以下のprocessメソッドでは、ItemクラスのdeepCloneメソッドで深いコピーのインスタンスを取得し、BODYに設定しています。

public class ItemClonePrepareProcessor implements Processor {

    @Override
    public void process(Exchange exchange) throws Exception {
        Item item = exchange.getIn().getBody(Item.class);
        exchange.getIn().setBody(item.deepClone());
    }
}

このようにWire Tapパターンで深いコピーを実装することができ、浅いコピーのようにWireTap先でメインルートに影響を及ぼすことはありません。
また、コピーではなく新しいメッセージを作成し、WireTapすることもできます。その場合はprocessorRefオプションで新規メッセージを作成するプロセッサーを作成します。

Wire Tapプロセッサのオプション一覧

Wire Tapプロセッサのオプションを下表に記載します。

プロパティ名 デフォルト値 説明
executorServiceRef WireTapされたメッセージを並列で処理するためのカスタムスレッドプールのIDを参照する。
processorRef WireTap時にメッセージをコピーするのではなく、新しいメッセージを作成する場合、カスタムプロセッサを定義します。
copy true boolean メッセージをWireTapする際にExchangeをコピーするかどうかを指定します。デフォルトではtrue(コピーする)で、processorRefを指定する場合にはfalseを設定します。
onPrepareRef メッセージをWireTapする際のExchangeコピー前に事前処理が必要な場合、カスタムプロセッサのIDを指定します。WireTap時にコピーされたメッセージは浅いコピーであるため、深いコピーを実現する際などに使用されます。
cacheSize 1000 int 再利用のためにプロデューサをキャッシュするProducerCacheのキャッシュサイズを設定できます。 デフォルトでは1000のデフォルトキャッシュサイズが使用されます。値を-1に設定すると、キャッシュをまとめてオフにできます。(いまいちよく分かりませんが)
ignoreInvalidEndpoint false boolean 解決できなかったエンドポイントURIを無視するかどうか。falseの場合、Camelは無効なエンドポイントURIを識別すると例外をスローします。

参考

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1