3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Apache Camelの標準キュー(SEDA)の基本的な使い方

Last updated at Posted at 2018-10-25

SEDAとは

SEDAはApache Camelフレームワーク標準のキューイングシステムです。標準だけあってCamelフレームワークから簡単に利用することができます。

なお、SEDAはメモリ上で処理され永続化することはできません。永続化するなら、ActiveMQやRabbitMQなどを別に準備して利用することになります。

SEDAの基本的な使い方

以下はXML DSLだけで、SEDAにメッセージを入れて取り出すプログラムです。
mainRouteとconsumerRouteの2つのルートを作成しています。

mainRoute
・setBodyで現在日時の文字列をBODYに入れる。
・「to uri="seda:seda_test"」で、seda_testというキューにメッセージ(BODY)を配信する。

consumerRoute
・「from uri="seda:seda_test?concurrentConsumers=1"」で、seda_testのキューからメッセージを取り出す。
・「log message」で、取り出したメッセージをログに出力する。

camel-context.xml
	<camelContext
		xmlns="http://camel.apache.org/schema/spring">

		<route id="mainRoute">
			<from uri="timer://run?delay=1000&amp;period=1000" />
			<setBody>
				<simple>${date:now:yyyy-MM-dd HH:mm:ss}</simple>
			</setBody>
			<to uri="seda:seda_test" />
			<log message="seda_test end" />
		</route>

		<route id="consumerRoute">
			<from uri="seda:seda_test?concurrentConsumers=1" />
			<log message="body -> ${body}" loggingLevel="INFO" />
		</route>
	</camelContext>

実行した結果は以下のとおり。
「seda://seda_test, consumerRoute, body -> 2018-10-26 04:11:43」のように、取り出したメッセージが表示されています。

[2018-10-26 04:11:43.955], [INFO ], mainRoute, Camel (camel-1) thread #2 - timer://run, mainRoute, seda_test end
[2018-10-26 04:11:43.955], [INFO ], consumerRoute, Camel (camel-1) thread #1 - seda://seda_test, consumerRoute, body -> 2018-10-26 04:11:43
[2018-10-26 04:11:44.955], [INFO ], mainRoute, Camel (camel-1) thread #2 - timer://run, mainRoute, seda_test end
[2018-10-26 04:11:44.955], [INFO ], consumerRoute, Camel (camel-1) thread #1 - seda://seda_test, consumerRoute, body -> 2018-10-26 04:11:44
[2018-10-26 04:11:45.956], [INFO ], mainRoute, Camel (camel-1) thread #2 - timer://run, mainRoute, seda_test end

SEDAの様々な使い方

SEDAのキューのサイズを確認する

SEDAのキューに入っているメッセージのサイズを確認します。
まず、以下のようにメッセージ数を取得するプロセッサーを作成します。
「end.getCurrentQueueSize()」で、メッセージ数が取得できます。

package sample;

import javax.annotation.Resource;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.seda.SedaEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
public class TestProcessor implements Processor {
	Logger logger = LoggerFactory.getLogger(TestProcessor.class);

	@Resource
	private CamelContext context;

    public void process(Exchange exchange) throws Exception {
    	logger.info("TestProcessor start ");
    	SedaEndpoint end = (SedaEndpoint)context.getEndpoint("seda:seda_test");
    	logger.info("Current Queue Size = {}", end.getCurrentQueueSize());
    }
}

XML DSLは以下のように記述。
testRouteで、1秒ごとに作成したプロセッサーを呼び出しています。

	<context:component-scan base-package="sample" />
	<bean id="testProcessor" class="sample.TestProcessor" />

	<camelContext
		xmlns="http://camel.apache.org/schema/spring">

		<route id="mainRoute">
			<from uri="timer://run?delay=1000&amp;period=1000" />
			<setBody>
				<simple>${date:now:yyyy-MM-dd HH:mm:ss}</simple>
			</setBody>
			<to uri="seda:seda_test" />
			<log message="seda_test end" />
		</route>

		<route id="testRoute">
			<from uri="timer://run?delay=1000&amp;period=1000" />
			<process ref="testProcessor" />
		</route>
	</camelContext>

実行結果は以下のとおり。
「timer://run, sample.TestProcessor, Current Queue Size = 1」のように、1秒ごとにキューの数が増えていることが確認できます。

[2018-10-26 04:26:16.435], [INFO ], mainRoute, Camel (camel-1) thread #1 - timer://run, mainRoute, seda_test end
[2018-10-26 04:26:16.437], [INFO ], s.TestProcessor, Camel (camel-1) thread #1 - timer://run, sample.TestProcessor, TestProcessor start 
[2018-10-26 04:26:16.446], [INFO ], s.TestProcessor, Camel (camel-1) thread #1 - timer://run, sample.TestProcessor, Current Queue Size = 1
[2018-10-26 04:26:17.402], [INFO ], mainRoute, Camel (camel-1) thread #1 - timer://run, mainRoute, seda_test end
[2018-10-26 04:26:17.437], [INFO ], s.TestProcessor, Camel (camel-1) thread #1 - timer://run, sample.TestProcessor, TestProcessor start 
[2018-10-26 04:26:17.438], [INFO ], s.TestProcessor, Camel (camel-1) thread #1 - timer://run, sample.TestProcessor, Current Queue Size = 2
[2018-10-26 04:26:18.403], [INFO ], mainRoute, Camel (camel-1) thread #1 - timer://run, mainRoute, seda_test end
[2018-10-26 04:26:18.437], [INFO ], s.TestProcessor, Camel (camel-1) thread #1 - timer://run, sample.TestProcessor, TestProcessor start 
[2018-10-26 04:26:18.438], [INFO ], s.TestProcessor, Camel (camel-1) thread #1 - timer://run, sample.TestProcessor, Current Queue Size = 3

SEDAのキューを削除する

SEDAに格納されたメッセージを全て削除します。
「end.purgeQueue()」で削除しています。

    public void process(Exchange exchange) throws Exception {
    	logger.info("TestProcessor start ");

    	SedaEndpoint end = (SedaEndpoint)context.getEndpoint("seda:seda_test");
    	logger.info("Before Current Queue Size = {}", end.getCurrentQueueSize());
    	end.purgeQueue();
    	logger.info("After Current Queue Size = {}", end.getCurrentQueueSize());
    }

実行結果は以下のとおり。
削除前はメッセージ1件で、削除後はメッセージが0件になっていることが確認できます。

[2018-10-26 04:28:32.327], [INFO ], s.TestProcessor, Camel (camel-1) thread #1 - timer://run, sample.TestProcessor, TestProcessor start 
[2018-10-26 04:28:32.328], [INFO ], s.TestProcessor, Camel (camel-1) thread #1 - timer://run, sample.TestProcessor, Before Current Queue Size = 1
[2018-10-26 04:28:32.328], [INFO ], s.TestProcessor, Camel (camel-1) thread #1 - timer://run, sample.TestProcessor, After Current Queue Size = 0

SEDAのキューの最大サイズを指定する

SEDAに格納できるメッセージの最大数をsizeで指定します。
今回は最大5メッセージで設定しています。

		<route id="mainRoute">
			<from uri="timer://run?delay=1000&amp;period=1000" />
			<setBody>
				<simple>${date:now:yyyy-MM-dd HH:mm:ss}</simple>
			</setBody>
			<to uri="seda:seda_test?size=5" />
			<log message="seda_test end" />
		</route>

実行すると、5件目の後(6件目)で「Queue full」というメッセージで例外が発生しています。

[2018-10-26 04:32:39.070], [INFO ], mainRoute, Camel (camel-1) thread #1 - timer://run, mainRoute, seda_test end
[2018-10-26 04:32:39.103], [INFO ], s.TestProcessor, Camel (camel-1) thread #1 - timer://run, sample.TestProcessor, TestProcessor start 
[2018-10-26 04:32:39.103], [INFO ], s.TestProcessor, Camel (camel-1) thread #1 - timer://run, sample.TestProcessor, Before Current Queue Size = 4
[2018-10-26 04:32:40.069], [INFO ], mainRoute, Camel (camel-1) thread #1 - timer://run, mainRoute, seda_test end
[2018-10-26 04:32:40.103], [INFO ], s.TestProcessor, Camel (camel-1) thread #1 - timer://run, sample.TestProcessor, TestProcessor start 
[2018-10-26 04:32:40.103], [INFO ], s.TestProcessor, Camel (camel-1) thread #1 - timer://run, sample.TestProcessor, Before Current Queue Size = 5
[2018-10-26 04:32:41.076], [ERROR], o.a.c.p.DefaultErrorHandler, Camel (camel-1) thread #1 - timer://run, org.apache.camel.processor.DefaultErrorHandler, Failed delivery for (MessageId: ID-mky-PC-1540495954604-0-12 on ExchangeId: ID-mky-PC-1540495954604-0-11). Exhausted after delivery attempt: 1 caught: java.lang.IllegalStateException: Queue full

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[mainRoute         ] [mainRoute         ] [timer://run?delay=1000&period=1000                                            ] [         4]
[mainRoute         ] [setBody1          ] [setBody[simple{18/10/26 4:32}]                              ] [         0]
[mainRoute         ] [to1               ] [seda:seda_test?size=5                                                         ] [         1]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.IllegalStateException: Queue full
	at java.util.AbstractQueue.add(AbstractQueue.java:98) ~[?:1.8.0_172]
~省略~

キューが最大(フル)になっている場合に例外を出さずに待機する場合は、「blockWhenFull=true」を指定します。

			<to uri="seda:seda_test?size=5&amp;blockWhenFull=true" />

「blockWhenFull=true」を指定するとキューが空くまで永遠と待ち続けます。
タイムアウトしたい場合は、offerTimeoutを使用します。
以下の例では、5000(ms)を指定しています。5秒経過してもキューが空かないと例外が発生します。

			<to uri="seda:seda_test?size=5&amp;blockWhenFull=true&amp;offerTimeout=5000" />

SEDAのconsumerのスレッド数を変更する

メッセージを取り出すconsumerのスレッド数を変更します。
デフォルトは"1"で、以下ではconcurrentConsumersで"5"(スレッド)を設定しています。

	<camelContext
		xmlns="http://camel.apache.org/schema/spring">

		<route id="mainRoute">
			<from uri="timer://run?delay=1000&amp;period=1000" />
			<setBody>
				<simple>${date:now:yyyy-MM-dd HH:mm:ss}</simple>
			</setBody>
			<loop>
    		<constant>10</constant>
				<to uri="seda:seda_test" />
			</loop>
			<log message="seda_test end" />
		</route>

		<route id="consumerRoute">
			<from uri="seda:seda_test?concurrentConsumers=5" />
			<log message="body -> ${body}" loggingLevel="INFO" />
		</route>
	</camelContext>

実行すると、「thread #1~5」というスレッドが動いていることが確認できます。

[2018-10-26 04:52:29.653], [INFO ], mainRoute, Camel (camel-1) thread #6 - timer://run, mainRoute, seda_test end
[2018-10-26 04:52:29.654], [INFO ], consumerRoute, Camel (camel-1) thread #2 - seda://seda_test, consumerRoute, body -> 2018-10-26 04:52:29
[2018-10-26 04:52:29.654], [INFO ], consumerRoute, Camel (camel-1) thread #4 - seda://seda_test, consumerRoute, body -> 2018-10-26 04:52:29
[2018-10-26 04:52:29.655], [INFO ], consumerRoute, Camel (camel-1) thread #3 - seda://seda_test, consumerRoute, body -> 2018-10-26 04:52:29
[2018-10-26 04:52:29.655], [INFO ], consumerRoute, Camel (camel-1) thread #4 - seda://seda_test, consumerRoute, body -> 2018-10-26 04:52:29
[2018-10-26 04:52:29.655], [INFO ], consumerRoute, Camel (camel-1) thread #3 - seda://seda_test, consmerRoute, body -> 2018-10-26 04:52:29
[2018-10-26 04:52:29.655], [INFO ], consumerRoute, Camel (camel-1) thread #1 - seda://seda_test, consumerRoute, body -> 2018-10-26 04:52:29
[2018-10-26 04:52:29.655], [INFO ], consumerRoute, Camel (camel-1) thread #4 - seda://seda_test, consumerRoute, body -> 2018-10-26 04:52:29
[2018-10-26 04:52:29.657], [INFO ], consumerRoute, Camel (camel-1) thread #2 - seda://seda_test, consumerRoute, body -> 2018-10-26 04:52:29
[2018-10-26 04:52:29.657], [INFO ], consumerRoute, Camel (camel-1) thread #1 - seda://seda_test, consumerRoute, body -> 2018-10-26 04:52:29
[2018-10-26 04:52:29.658], [INFO ], consumerRoute, Camel (camel-1) thread #3 - seda://seda_test, consumerRoute, body -> 2018-10-26 04:52:29
[2018-10-26 04:52:30.612], [INFO ], consumerRoute, Camel (camel-1) thread #4 - seda://seda_test, consumerRoute, body -> 2018-10-26 04:52:30
[2018-10-26 04:52:30.612], [INFO ], consumerRoute, Camel (camel-1) thread #5 - seda://seda_test, consumerRoute, body -> 2018-10-26 04:52:30

オプション一覧

使用できる主なオプションは下表のとおりです。

プロパティ名 producer/consumer 説明 デフォルト値
size 共通 SEDAキューが保持できる最大メッセージ数を設定する。デフォルト値が"1000"と小さい値であるため、大量のデータを扱う場合は変更が必要。 1000 int
concurrentConsumers consumer Exchangeを処理するスレッド数を設定する。 1 int
blockWhenFull producer デフォルトでは、送信先のキューに空きがない場合、例外がスローされる。このオプションを有効にすると、呼び出し側スレッドはキューが空くまで待機する。 false boolean
timeout producer プロデューサがメッセージ送信の完了を待つ時間(ミリ秒)を指定する。0以下を指定するとタイムアウトは無効になる。 30000 long

hawtioによるキューの確認

最後にhawtioでSEDAに格納されているメッセージを確認してみます。
hawtioはJavaアプリケーションのモニタリングツール?で、Camel用のプラグインが標準で同梱されており、ルートやルートを通ったエクスチェンジの数などの様々な情報がモニタリングできます。

hawtioの導入方法は以下を参照。

Apache Camelのルートをhawtioを使って可視化する - 導入編

SEDAに格納されているメッセージを確認すると、以下のようにメッセージが蓄積されていることが確認できます。

image.png

前画面のメッセージをクリックすると、メッセージの内容が表示されます。

image.png

hawtio便利すぎ!

参考

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?