LoginSignup
2
3

More than 5 years have passed since last update.

Spring Integration 勉強の備忘録 ~Spring Integration Sampleの理解 1.Hello World~

Last updated at Posted at 2018-06-30

目的

Spring Integrationを学ぶために、Spring Projectが提供しているサンプルを理解して、実装し直す + アレンジしてみた。本記事は、その備忘録として。

サンプルは以下のGitに登録してあるもの。
spring-integration-sample

まずはbasicのhelloworld。
ちなみにSpring Integrationのバージョンは4.3.10.RELEASEである。

概要

helloworldには2つの機能が実装されている。
1. Hello World
2. Poller Application

まずはサンプルの通りに実装してみたが、それだけだと理解が薄かったので3つ目として
2つの機能を結合したものも実装した。

実装

(1) hello world

全体像

READMEまんまだが、hello worldは以下のようにシンプルなメッセージングフローを実装したものである。

Message -> Channel -> ServiceActivator -> QueueChannel

これから実装の中を見ていく。

helloWorldDemo.xml

1~3は分かりやすく、ChannelとServiceAvtivatorのコンポーネントをbean定義しただけ。
4はフロー全体を定義したもので、service-activatorタグで定義する。

このように定義しておくと、inputChannel(MessageChannel)sendメソッドを呼び出すと、ServiceActivator(HelloWorldService.seyHelloメソッド)に処理が委譲される。 そして、sayHelloメソッドのreturn結果が、outputChannelに入る。


    <channel id="inputChannel"/> <!-- 1 -->
    <channel id="outputChannel"> <!-- 2 -->
        <queue capacity="10"/>
    </channel>


    <beans:bean id="sampleService" 
    class="org.ek.sample.helloworld.HelloWorldService"/> <!-- 3 -->

    <service-activator input-channel="inputChannel"
                       output-channel="outputChannel"
                       ref="sampleService"
                       method="sayHello"/> <!-- 4 -->

Mainクラス

Mainクラスのコードは以下の通り。

1でxmlファイルを読み込んで、ApplicationContextインスランスを作成する。あとはChannelgetBeanして(2,3)、前述の通り、inputChannel.sendServiceActivatorsayHelloメソッドへ処理を委譲する(4)。処理が終わったらoutputChannelに値が入るので、ouputChannel.receive.getPayloadreturn値を取得することができる。ここでは取得した内容をロガーに渡して標準出力している。(5)

なお、Channelによって値を取得できるようになっているものと送信専用のものがあるようで、PollableChannelMessageChannelを継承した上でreceiveメソッドを定義しているので値を取得でき、MessageChannelはsend専門のようである。

    public static void main(String[] args) {

        @SuppressWarnings("resource")
        AbstractApplicationContext context = new ClassPathXmlApplicationContext(
                "/META-INF/spring/integration/helloWorldDemo.xml", HelloWorldMain.class); // 1
        MessageChannel inputChannel = context.getBean("inputChannel", MessageChannel.class); // 2
        PollableChannel outputChannel = context.getBean("outputChannel", PollableChannel.class); // 3

        inputChannel.send(new GenericMessage<String>("World")); // 4
        logger.info("==> HelloWorldDemo: " + outputChannel.receive(0).getPayload()); // 5

ServiceActivatorは割愛するが、sayHelloメソッドは引数Stringの少し文字列を足してreturnするだけの簡単な内容となっていた。ちなみにPOJOであり、何かを継承・実装しているわけではない。

(2) Polling

全体像

システム時間を20秒(20000ミリ秒)ごとに2回ポーリングして取得し、ロガーに渡して標準出力するというアプリ。

delay.xml

今回はinbound-channel-adapterタグが何ができるのか、また、executorタグがを何故定義する必要があるのか等パッと見ではよくわからなかったので少しxsdを拝見した。xsdは簡単なものしか読み方わからないのですが、ざっくり以下のような形でしょうか。(間違っていたら誰かご指摘いただきたいです)

  • xsdを見ると、inbound-channel-adapterタグは、内部にpollerを子要素として持つ必要がある。
    • Pollingの形式は2つ(interval-trigger, cron-trigger)あって、どちらか選べるよう。何も指定しないとinterval-triggerという方になるっぽい。
    • あとはその方式に沿うように属性等を定義していく。
  • expression属性では、SpELを使って記載する必要がある。ここで記載した内容をAOPのポイントカットのように取得してくる。Spring IntegrationではSpELを使った記法は結構出てくる模様。
  • task:executorタグは、設定可能なプールサイズ、キュー容量、キープアライブ、および拒否ポリシーの値を持つThreadPoolTaskExecutorインスタンスを定義する。 このタグは必ずしもbean定義する必要はなく(設定がデフォルトでよければ)、タグを消してもアプリケーションは動く。
<int:inbound-channel-adapter expression="T(java.lang.System).currentTimeMillis()" channel="logger">
        <int:poller fixed-delay="20000" max-messages-per-poll="2" />
    </int:inbound-channel-adapter> <!-- 1 -->

    <int:logging-channel-adapter id="logger" logger-name="org.springframework.integration.samples.helloworld"/> 

    <task:executor id="executor" queue-capacity="20" pool-size="5-20"/> <!-- 2 -->

Mainクラス

今回はポーリングなので、Mainクラスはコンテキストを読み込むだけでOK。あとはロガーがアプリを停止するまでログを出力し続ける。


    public static void main(String[] args) throws Exception{
        new ClassPathXmlApplicationContext("META-INF/spring/integration/delay.xml");
    }

(3) polling + hello world

全体像

これだけだとサンプルの実装を見ただけになってしまうので、ポーリングして得た内容をServiceActivatorに流して、最後ログ出力するという形(今までの2つを合わせる形)の実装も行った。 実装できたものの何だか無駄が多いように思えるが、今のところの知識だとこれが限界。もっと勉強したら作り変えよう。

delay_hello_world.xml

今までの2つを足す形にしているが、実はただ足すだけだと上手く行かなかったので少しアレンジする必要があった。まずは1のbeansタグ。PollingとHelloWorldはそれぞれ、ここの粒度(xmlnsの定義)が少しズレていたのでそこを合わせる必要がある。なお、それに伴って、HelloWorldアプリで使っていたchannel, service-activatorタグにint:を付与している。

次は2のbridgeChannelというQueueChannelを新たに定義した。inbound-channel-adapterタグは内部的にMessageChannel.sendを呼んでいるのかな?と思った(そこまでは調べていない)ので、outputChannelに新たにchannelを定義し、Mainクラスでそこから値をreceiveして、それをinputChannel.sendに渡すことにした。

<?xml version="1.0" encoding="UTF-8"?>
<!-- 1 -->
<beans 
    xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.3.xsd
    http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.3.xsd"
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:task="http://www.springframework.org/schema/task">

    <int:inbound-channel-adapter expression="'World! time is ' + T(java.lang.System).currentTimeMillis()" channel="bridgeChannel">
        <int:poller fixed-delay="2000" max-messages-per-poll="2" />
    </int:inbound-channel-adapter>

    <!-- 2 -->
    <int:channel id="bridgeChannel">
        <int:queue capacity="2"/>
    </int:channel>

    <int:channel id="inputChannel"/>
    <int:channel id="outputChannel">
        <int:queue capacity="10"/>
    </int:channel>

    <int:service-activator input-channel="inputChannel"
                       output-channel="outputChannel"
                       ref="sampleService"
                       method="sayHello"/>

    <bean id="sampleService" class="org.ek.sample.helloworld.HelloWorldService"/>

</beans>

Mainクラス

Mainはこんな感じ。ポーリングし続けるためにwhile(true)でbrideChannelに受信してきた内容をずっと取得し続ける感じ。まぁ想定通りの動きをしてくれているのでSpring Integrationの最初の理解としてはこんなんでよいか。

    private static Logger logger  = Logger.getLogger(PollingHelloWorld.class);

    public static void main(String[] args) {

        @SuppressWarnings("resource")
        AbstractApplicationContext context = 
                new ClassPathXmlApplicationContext("/META-INF/spring/integration/delay_hello_world.xml");

        PollableChannel bridgeChannel = context.getBean("bridgeChannel", PollableChannel.class);
        MessageChannel inputChannel = context.getBean("inputChannel", MessageChannel.class);
        PollableChannel outputChannel = context.getBean("outputChannel", PollableChannel.class);

        while(true) {
            Message<?> msg = bridgeChannel.receive(0);          
            if(msg != null) {
                String str = (String) msg.getPayload() ;
                inputChannel.send(new GenericMessage<String>(str));
                logger.info("==> Demo: " + outputChannel.receive(0).getPayload());
            }

        }
    }
2
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
3