SEDAとは
SEDAはApache Camelフレームワーク標準のキューイングシステムです。標準だけあってCamelフレームワークから簡単に利用することができます。
なお、SEDAはメモリ上で処理され永続化することはできません。永続化するなら、ActiveMQやRabbitMQなどを別に準備して利用することになります。
SEDAの基本的な使い方
以下はXML DSLだけで、SEDAにメッセージを入れて取り出すプログラムです。
mainRouteとconsumerRouteの2つのルートを作成しています。
mainRoute
・setBodyで現在日時の文字列をBODYに入れる。
・「to uri="seda:seda_test"」で、seda_testというキューにメッセージ(BODY)を配信する。
consumerRoute
・「from uri="seda:seda_test?concurrentConsumers=1"」で、seda_testのキューからメッセージを取り出す。
・「log message」で、取り出したメッセージをログに出力する。
<camelContext
xmlns="http://camel.apache.org/schema/spring">
<route id="mainRoute">
<from uri="timer://run?delay=1000&period=1000" />
<setBody>
<simple>${date:now:yyyy-MM-dd HH:mm:ss}</simple>
</setBody>
<to uri="seda:seda_test" />
<log message="seda_test end" />
</route>
<route id="consumerRoute">
<from uri="seda:seda_test?concurrentConsumers=1" />
<log message="body -> ${body}" loggingLevel="INFO" />
</route>
</camelContext>
実行した結果は以下のとおり。
「seda://seda_test, consumerRoute, body -> 2018-10-26 04:11:43」のように、取り出したメッセージが表示されています。
[2018-10-26 04:11:43.955], [INFO ], mainRoute, Camel (camel-1) thread #2 - timer://run, mainRoute, seda_test end
[2018-10-26 04:11:43.955], [INFO ], consumerRoute, Camel (camel-1) thread #1 - seda://seda_test, consumerRoute, body -> 2018-10-26 04:11:43
[2018-10-26 04:11:44.955], [INFO ], mainRoute, Camel (camel-1) thread #2 - timer://run, mainRoute, seda_test end
[2018-10-26 04:11:44.955], [INFO ], consumerRoute, Camel (camel-1) thread #1 - seda://seda_test, consumerRoute, body -> 2018-10-26 04:11:44
[2018-10-26 04:11:45.956], [INFO ], mainRoute, Camel (camel-1) thread #2 - timer://run, mainRoute, seda_test end
SEDAの様々な使い方
SEDAのキューのサイズを確認する
SEDAのキューに入っているメッセージのサイズを確認します。
まず、以下のようにメッセージ数を取得するプロセッサーを作成します。
「end.getCurrentQueueSize()」で、メッセージ数が取得できます。
package sample;
import javax.annotation.Resource;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.seda.SedaEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
public class TestProcessor implements Processor {
Logger logger = LoggerFactory.getLogger(TestProcessor.class);
@Resource
private CamelContext context;
public void process(Exchange exchange) throws Exception {
logger.info("TestProcessor start ");
SedaEndpoint end = (SedaEndpoint)context.getEndpoint("seda:seda_test");
logger.info("Current Queue Size = {}", end.getCurrentQueueSize());
}
}
XML DSLは以下のように記述。
testRouteで、1秒ごとに作成したプロセッサーを呼び出しています。
<context:component-scan base-package="sample" />
<bean id="testProcessor" class="sample.TestProcessor" />
<camelContext
xmlns="http://camel.apache.org/schema/spring">
<route id="mainRoute">
<from uri="timer://run?delay=1000&period=1000" />
<setBody>
<simple>${date:now:yyyy-MM-dd HH:mm:ss}</simple>
</setBody>
<to uri="seda:seda_test" />
<log message="seda_test end" />
</route>
<route id="testRoute">
<from uri="timer://run?delay=1000&period=1000" />
<process ref="testProcessor" />
</route>
</camelContext>
実行結果は以下のとおり。
「timer://run, sample.TestProcessor, Current Queue Size = 1」のように、1秒ごとにキューの数が増えていることが確認できます。
[2018-10-26 04:26:16.435], [INFO ], mainRoute, Camel (camel-1) thread #1 - timer://run, mainRoute, seda_test end
[2018-10-26 04:26:16.437], [INFO ], s.TestProcessor, Camel (camel-1) thread #1 - timer://run, sample.TestProcessor, TestProcessor start
[2018-10-26 04:26:16.446], [INFO ], s.TestProcessor, Camel (camel-1) thread #1 - timer://run, sample.TestProcessor, Current Queue Size = 1
[2018-10-26 04:26:17.402], [INFO ], mainRoute, Camel (camel-1) thread #1 - timer://run, mainRoute, seda_test end
[2018-10-26 04:26:17.437], [INFO ], s.TestProcessor, Camel (camel-1) thread #1 - timer://run, sample.TestProcessor, TestProcessor start
[2018-10-26 04:26:17.438], [INFO ], s.TestProcessor, Camel (camel-1) thread #1 - timer://run, sample.TestProcessor, Current Queue Size = 2
[2018-10-26 04:26:18.403], [INFO ], mainRoute, Camel (camel-1) thread #1 - timer://run, mainRoute, seda_test end
[2018-10-26 04:26:18.437], [INFO ], s.TestProcessor, Camel (camel-1) thread #1 - timer://run, sample.TestProcessor, TestProcessor start
[2018-10-26 04:26:18.438], [INFO ], s.TestProcessor, Camel (camel-1) thread #1 - timer://run, sample.TestProcessor, Current Queue Size = 3
SEDAのキューを削除する
SEDAに格納されたメッセージを全て削除します。
「end.purgeQueue()」で削除しています。
public void process(Exchange exchange) throws Exception {
logger.info("TestProcessor start ");
SedaEndpoint end = (SedaEndpoint)context.getEndpoint("seda:seda_test");
logger.info("Before Current Queue Size = {}", end.getCurrentQueueSize());
end.purgeQueue();
logger.info("After Current Queue Size = {}", end.getCurrentQueueSize());
}
実行結果は以下のとおり。
削除前はメッセージ1件で、削除後はメッセージが0件になっていることが確認できます。
[2018-10-26 04:28:32.327], [INFO ], s.TestProcessor, Camel (camel-1) thread #1 - timer://run, sample.TestProcessor, TestProcessor start
[2018-10-26 04:28:32.328], [INFO ], s.TestProcessor, Camel (camel-1) thread #1 - timer://run, sample.TestProcessor, Before Current Queue Size = 1
[2018-10-26 04:28:32.328], [INFO ], s.TestProcessor, Camel (camel-1) thread #1 - timer://run, sample.TestProcessor, After Current Queue Size = 0
SEDAのキューの最大サイズを指定する
SEDAに格納できるメッセージの最大数をsizeで指定します。
今回は最大5メッセージで設定しています。
<route id="mainRoute">
<from uri="timer://run?delay=1000&period=1000" />
<setBody>
<simple>${date:now:yyyy-MM-dd HH:mm:ss}</simple>
</setBody>
<to uri="seda:seda_test?size=5" />
<log message="seda_test end" />
</route>
実行すると、5件目の後(6件目)で「Queue full」というメッセージで例外が発生しています。
[2018-10-26 04:32:39.070], [INFO ], mainRoute, Camel (camel-1) thread #1 - timer://run, mainRoute, seda_test end
[2018-10-26 04:32:39.103], [INFO ], s.TestProcessor, Camel (camel-1) thread #1 - timer://run, sample.TestProcessor, TestProcessor start
[2018-10-26 04:32:39.103], [INFO ], s.TestProcessor, Camel (camel-1) thread #1 - timer://run, sample.TestProcessor, Before Current Queue Size = 4
[2018-10-26 04:32:40.069], [INFO ], mainRoute, Camel (camel-1) thread #1 - timer://run, mainRoute, seda_test end
[2018-10-26 04:32:40.103], [INFO ], s.TestProcessor, Camel (camel-1) thread #1 - timer://run, sample.TestProcessor, TestProcessor start
[2018-10-26 04:32:40.103], [INFO ], s.TestProcessor, Camel (camel-1) thread #1 - timer://run, sample.TestProcessor, Before Current Queue Size = 5
[2018-10-26 04:32:41.076], [ERROR], o.a.c.p.DefaultErrorHandler, Camel (camel-1) thread #1 - timer://run, org.apache.camel.processor.DefaultErrorHandler, Failed delivery for (MessageId: ID-mky-PC-1540495954604-0-12 on ExchangeId: ID-mky-PC-1540495954604-0-11). Exhausted after delivery attempt: 1 caught: java.lang.IllegalStateException: Queue full
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[mainRoute ] [mainRoute ] [timer://run?delay=1000&period=1000 ] [ 4]
[mainRoute ] [setBody1 ] [setBody[simple{18/10/26 4:32}] ] [ 0]
[mainRoute ] [to1 ] [seda:seda_test?size=5 ] [ 1]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:98) ~[?:1.8.0_172]
~省略~
キューが最大(フル)になっている場合に例外を出さずに待機する場合は、「blockWhenFull=true」を指定します。
<to uri="seda:seda_test?size=5&blockWhenFull=true" />
「blockWhenFull=true」を指定するとキューが空くまで永遠と待ち続けます。
タイムアウトしたい場合は、offerTimeoutを使用します。
以下の例では、5000(ms)を指定しています。5秒経過してもキューが空かないと例外が発生します。
<to uri="seda:seda_test?size=5&blockWhenFull=true&offerTimeout=5000" />
SEDAのconsumerのスレッド数を変更する
メッセージを取り出すconsumerのスレッド数を変更します。
デフォルトは"1"で、以下ではconcurrentConsumersで"5"(スレッド)を設定しています。
<camelContext
xmlns="http://camel.apache.org/schema/spring">
<route id="mainRoute">
<from uri="timer://run?delay=1000&period=1000" />
<setBody>
<simple>${date:now:yyyy-MM-dd HH:mm:ss}</simple>
</setBody>
<loop>
<constant>10</constant>
<to uri="seda:seda_test" />
</loop>
<log message="seda_test end" />
</route>
<route id="consumerRoute">
<from uri="seda:seda_test?concurrentConsumers=5" />
<log message="body -> ${body}" loggingLevel="INFO" />
</route>
</camelContext>
実行すると、「thread #1~5」というスレッドが動いていることが確認できます。
[2018-10-26 04:52:29.653], [INFO ], mainRoute, Camel (camel-1) thread #6 - timer://run, mainRoute, seda_test end
[2018-10-26 04:52:29.654], [INFO ], consumerRoute, Camel (camel-1) thread #2 - seda://seda_test, consumerRoute, body -> 2018-10-26 04:52:29
[2018-10-26 04:52:29.654], [INFO ], consumerRoute, Camel (camel-1) thread #4 - seda://seda_test, consumerRoute, body -> 2018-10-26 04:52:29
[2018-10-26 04:52:29.655], [INFO ], consumerRoute, Camel (camel-1) thread #3 - seda://seda_test, consumerRoute, body -> 2018-10-26 04:52:29
[2018-10-26 04:52:29.655], [INFO ], consumerRoute, Camel (camel-1) thread #4 - seda://seda_test, consumerRoute, body -> 2018-10-26 04:52:29
[2018-10-26 04:52:29.655], [INFO ], consumerRoute, Camel (camel-1) thread #3 - seda://seda_test, consmerRoute, body -> 2018-10-26 04:52:29
[2018-10-26 04:52:29.655], [INFO ], consumerRoute, Camel (camel-1) thread #1 - seda://seda_test, consumerRoute, body -> 2018-10-26 04:52:29
[2018-10-26 04:52:29.655], [INFO ], consumerRoute, Camel (camel-1) thread #4 - seda://seda_test, consumerRoute, body -> 2018-10-26 04:52:29
[2018-10-26 04:52:29.657], [INFO ], consumerRoute, Camel (camel-1) thread #2 - seda://seda_test, consumerRoute, body -> 2018-10-26 04:52:29
[2018-10-26 04:52:29.657], [INFO ], consumerRoute, Camel (camel-1) thread #1 - seda://seda_test, consumerRoute, body -> 2018-10-26 04:52:29
[2018-10-26 04:52:29.658], [INFO ], consumerRoute, Camel (camel-1) thread #3 - seda://seda_test, consumerRoute, body -> 2018-10-26 04:52:29
[2018-10-26 04:52:30.612], [INFO ], consumerRoute, Camel (camel-1) thread #4 - seda://seda_test, consumerRoute, body -> 2018-10-26 04:52:30
[2018-10-26 04:52:30.612], [INFO ], consumerRoute, Camel (camel-1) thread #5 - seda://seda_test, consumerRoute, body -> 2018-10-26 04:52:30
オプション一覧
使用できる主なオプションは下表のとおりです。
プロパティ名 | producer/consumer | 説明 | デフォルト値 | 型 |
---|---|---|---|---|
size | 共通 | SEDAキューが保持できる最大メッセージ数を設定する。デフォルト値が"1000"と小さい値であるため、大量のデータを扱う場合は変更が必要。 | 1000 | int |
concurrentConsumers | consumer | Exchangeを処理するスレッド数を設定する。 | 1 | int |
blockWhenFull | producer | デフォルトでは、送信先のキューに空きがない場合、例外がスローされる。このオプションを有効にすると、呼び出し側スレッドはキューが空くまで待機する。 | false | boolean |
timeout | producer | プロデューサがメッセージ送信の完了を待つ時間(ミリ秒)を指定する。0以下を指定するとタイムアウトは無効になる。 | 30000 | long |
hawtioによるキューの確認
最後にhawtioでSEDAに格納されているメッセージを確認してみます。
hawtioはJavaアプリケーションのモニタリングツール?で、Camel用のプラグインが標準で同梱されており、ルートやルートを通ったエクスチェンジの数などの様々な情報がモニタリングできます。
hawtioの導入方法は以下を参照。
Apache Camelのルートをhawtioを使って可視化する - 導入編
SEDAに格納されているメッセージを確認すると、以下のようにメッセージが蓄積されていることが確認できます。
前画面のメッセージをクリックすると、メッセージの内容が表示されます。
hawtio便利すぎ!