目的
Spring Integrationを学ぶために、Spring Projectが提供しているサンプルを理解して、実装し直す + アレンジしてみた。本記事は、その備忘録として。
サンプルは以下のGitに登録してあるもの。
spring-integration-sample
まずはbasicのhelloworld。
ちなみにSpring Integrationのバージョンは4.3.10.RELEASE
である。
概要
helloworldには2つの機能が実装されている。
- Hello World
- Poller Application
まずはサンプルの通りに実装してみたが、それだけだと理解が薄かったので3つ目として
2つの機能を結合したものも実装した。
実装
(1) hello world
全体像
READMEまんまだが、hello worldは以下のようにシンプルなメッセージングフローを実装したものである。
Message -> Channel -> ServiceActivator -> QueueChannel
これから実装の中を見ていく。
helloWorldDemo.xml
1~3は分かりやすく、ChannelとServiceAvtivatorのコンポーネントをbean定義しただけ。
4はフロー全体を定義したもので、service-activator
タグで定義する。
このように定義しておくと、inputChannel(MessageChannel)
のsend
メソッドを呼び出すと、ServiceActivator(HelloWorldService.seyHelloメソッド)
に処理が委譲される。 そして、sayHello
メソッドのreturn結果が、outputChannelに入る。
<channel id="inputChannel"/> <!-- 1 -->
<channel id="outputChannel"> <!-- 2 -->
<queue capacity="10"/>
</channel>
<beans:bean id="sampleService"
class="org.ek.sample.helloworld.HelloWorldService"/> <!-- 3 -->
<service-activator input-channel="inputChannel"
output-channel="outputChannel"
ref="sampleService"
method="sayHello"/> <!-- 4 -->
Mainクラス
_Main_クラスのコードは以下の通り。
1でxmlファイルを読み込んで、ApplicationContext
インスランスを作成する。あとは_Channel_をgetBean
して(2,3)、前述の通り、inputChannel.send
でServiceActivator
のsayHello
メソッドへ処理を委譲する(4)。処理が終わったらoutputChannelに値が入るので、ouputChannel.receive.getPayload
で_return_値を取得することができる。ここでは取得した内容をロガーに渡して標準出力している。(5)
なお、Channelによって値を取得できるようになっているものと送信専用のものがあるようで、PollableChannel
はMessageChannel
を継承した上でreceive
メソッドを定義しているので値を取得でき、MessageChannel
はsend専門のようである。
public static void main(String[] args) {
@SuppressWarnings("resource")
AbstractApplicationContext context = new ClassPathXmlApplicationContext(
"/META-INF/spring/integration/helloWorldDemo.xml", HelloWorldMain.class); // 1
MessageChannel inputChannel = context.getBean("inputChannel", MessageChannel.class); // 2
PollableChannel outputChannel = context.getBean("outputChannel", PollableChannel.class); // 3
inputChannel.send(new GenericMessage<String>("World")); // 4
logger.info("==> HelloWorldDemo: " + outputChannel.receive(0).getPayload()); // 5
ServiceActivatorは割愛するが、sayHello
メソッドは引数Stringの少し文字列を足してreturnするだけの簡単な内容となっていた。ちなみに_POJO_であり、何かを継承・実装しているわけではない。
(2) Polling
全体像
システム時間を20秒(20000ミリ秒)ごとに2回ポーリングして取得し、ロガーに渡して標準出力するというアプリ。
delay.xml
今回はinbound-channel-adapter
タグが何ができるのか、また、executor
タグがを何故定義する必要があるのか等パッと見ではよくわからなかったので少し_xsd_を拝見した。_xsd_は簡単なものしか読み方わからないのですが、ざっくり以下のような形でしょうか。(間違っていたら誰かご指摘いただきたいです)
- _xsd_を見ると、
inbound-channel-adapter
タグは、内部に_poller_を子要素として持つ必要がある。- Pollingの形式は2つ(
interval-trigger
,cron-trigger
)あって、どちらか選べるよう。何も指定しないとinterval-trigger
という方になるっぽい。 - あとはその方式に沿うように属性等を定義していく。
- Pollingの形式は2つ(
- _expression_属性では、SpELを使って記載する必要がある。ここで記載した内容をAOPのポイントカットのように取得してくる。_Spring Integration_では_SpEL_を使った記法は結構出てくる模様。
-
task:executor
タグは、設定可能なプールサイズ、キュー容量、キープアライブ、および拒否ポリシーの値を持つThreadPoolTaskExecutor
インスタンスを定義する。 このタグは必ずしもbean定義する必要はなく(設定がデフォルトでよければ)、タグを消してもアプリケーションは動く。
<int:inbound-channel-adapter expression="T(java.lang.System).currentTimeMillis()" channel="logger">
<int:poller fixed-delay="20000" max-messages-per-poll="2" />
</int:inbound-channel-adapter> <!-- 1 -->
<int:logging-channel-adapter id="logger" logger-name="org.springframework.integration.samples.helloworld"/>
<task:executor id="executor" queue-capacity="20" pool-size="5-20"/> <!-- 2 -->
Mainクラス
今回はポーリングなので、Mainクラスはコンテキストを読み込むだけでOK。あとはロガーがアプリを停止するまでログを出力し続ける。
public static void main(String[] args) throws Exception{
new ClassPathXmlApplicationContext("META-INF/spring/integration/delay.xml");
}
(3) polling + hello world
全体像
これだけだとサンプルの実装を見ただけになってしまうので、ポーリングして得た内容をServiceActivatorに流して、最後ログ出力するという形(今までの2つを合わせる形)の実装も行った。 実装できたものの何だか無駄が多いように思えるが、今のところの知識だとこれが限界。もっと勉強したら作り変えよう。
delay_hello_world.xml
今までの2つを足す形にしているが、実はただ足すだけだと上手く行かなかったので少しアレンジする必要があった。まずは1のbeans
タグ。PollingとHelloWorldはそれぞれ、ここの粒度(xmlnsの定義)が少しズレていたのでそこを合わせる必要がある。なお、それに伴って、HelloWorldアプリで使っていたchannel
, service-activator
タグにint:
を付与している。
次は2のbridgeChannelというQueueChannel
を新たに定義した。inbound-channel-adapter
タグは内部的にMessageChannel.send
を呼んでいるのかな?と思った(そこまでは調べていない)ので、outputChannelに新たにchannelを定義し、Main
クラスでそこから値をreceiveして、それをinputChannel.send
に渡すことにした。
<?xml version="1.0" encoding="UTF-8"?>
<!-- 1 -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.3.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.3.xsd"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:task="http://www.springframework.org/schema/task">
<int:inbound-channel-adapter expression="'World! time is ' + T(java.lang.System).currentTimeMillis()" channel="bridgeChannel">
<int:poller fixed-delay="2000" max-messages-per-poll="2" />
</int:inbound-channel-adapter>
<!-- 2 -->
<int:channel id="bridgeChannel">
<int:queue capacity="2"/>
</int:channel>
<int:channel id="inputChannel"/>
<int:channel id="outputChannel">
<int:queue capacity="10"/>
</int:channel>
<int:service-activator input-channel="inputChannel"
output-channel="outputChannel"
ref="sampleService"
method="sayHello"/>
<bean id="sampleService" class="org.ek.sample.helloworld.HelloWorldService"/>
</beans>
Mainクラス
Mainはこんな感じ。ポーリングし続けるためにwhile(true)
でbrideChannelに受信してきた内容をずっと取得し続ける感じ。まぁ想定通りの動きをしてくれているのでSpring Integrationの最初の理解としてはこんなんでよいか。
private static Logger logger = Logger.getLogger(PollingHelloWorld.class);
public static void main(String[] args) {
@SuppressWarnings("resource")
AbstractApplicationContext context =
new ClassPathXmlApplicationContext("/META-INF/spring/integration/delay_hello_world.xml");
PollableChannel bridgeChannel = context.getBean("bridgeChannel", PollableChannel.class);
MessageChannel inputChannel = context.getBean("inputChannel", MessageChannel.class);
PollableChannel outputChannel = context.getBean("outputChannel", PollableChannel.class);
while(true) {
Message<?> msg = bridgeChannel.receive(0);
if(msg != null) {
String str = (String) msg.getPayload() ;
inputChannel.send(new GenericMessage<String>(str));
logger.info("==> Demo: " + outputChannel.receive(0).getPayload());
}
}
}