はじめに
以前からWeb開発案件で Spring MVC を使う機会は多かったのですが、Spring Integration の経験がなく、また、システム統合についても興味があったため、Spring Integration について学習した内容をまとめたいと思います。
技術ブログを書くのは初めてですし、Spring Integration についても認識違い等ある可能性がありますので、記載に間違い等ありましたらご指摘いただけると幸いです。
Spring Integrationとは
Spring Framework のプロジェクトの一つ。EIP(Enterprise Integration Pattern)をサポートしており、メッセージングをSpringベースのプロジェクトで実装することができます。
EIP(Enterprise Integration Pattern)とは
アプリケーションを統合するための標準化された65のパターンです。あくまでパターンのカタログであり、テクノロジーには依存しません。同名の書籍で詳しく説明しているようですが、日本語版が出版されていないようです。
各パターンの概要は下記URLから確認できます。
https://www.enterpriseintegrationpatterns.com/
概要にはわかりやすい図もある為、これだけでも十分理解はできると思いますので書籍は必読ではないと思いますが、英語に抵抗がない方や詳しく知りたい方は書籍を購入して読んでみるのもよいかと思います。(私は読めてません。。。)
メッセージング
EIPはトップレベルのパターン(統合スタイル)として、以下の4パターンを定義しています。
- ファイル転送
- 共有データベース
- リモートプロシージャ呼び出し
- メッセージング
先ほど説明した通り、Spring Integration ではメッセージングによりシステム統合を行います。
メッセージングを簡単に説明すると、システム間をメッセージバスを介し、非同期的にデータ連携を行い統合するパターンです。
アーキテクチャ
Spring Integration の統合フローでは、メッセージエンドポイント同士をメッセージチャネルで接続しメッセージを連携します。
メインコンポーネント
Spring Integration を理解するうえで重要となるコンポーネントが3つあります。
メッセージ
チャネルで送受信されるデータ。ヘッダーとペイロードで構成されます。
ヘッダーはkey, valueのペアになっており、例えばファイルが送信されてきた場合はファイル名などのメタ情報が保存されます。
ペイロードは送信されてきたデータを格納する任意のJavaオブジェクトです。型は指定されおらず、任意の型でオブジェクトを格納できます。
public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}
メッセージチャネル
メッセージエンドポイント同士を接続し、メッセージを送受信するための伝送路です。
メッセージチャネルのトップレベルインターフェイスとしてMessageChannel
が定義されていますが、チャネルに送信するためのsend
メソッドのみが定義されています。
public interface MessageChannel {
boolean send(Message message);
boolean send(Message message, long timeout);
}
メッセージチャネルからメッセージエンドポイントが受信方法のオプションとして、MessageChannel
のサブインターフェイスが2つ定義されています。
PollableChannelインターフェイス
送信されてきたメッセージをバッファリングし、メッセージエンドポイントからのポーリングにより能動的にメッセージを受信できます。
PollableChannel
インターフェイスの定義は以下になります。エンドポイントが受信するためのメソッドが定義されています。
public interface PollableChannel extends MessageChannel {
Message<?> receive();
Message<?> receive(long timeout);
}
SubscribableChannelインターフェイス
サブスクライブされたハンドラー(メッセージエンドポイント)に対しメッセージを送信します。バッファリングは行わないため、メッセージチャネルへの送信がトリガーとなりメッセージエンドポイントに受信されます。
SubscribableChannel
インターフェイスの定義は以下になります。受信の為のメソッドは定義されておらず、エンドポイントをサブスクライブするためのメソッドが定義されています。
public interface SubscribableChannel extends MessageChannel {
boolean subscribe(MessageHandler handler);
boolean unsubscribe(MessageHandler handler);
}
PollableChannelインターフェイスの実装(一部)
-
QueueChannel
- キュー実装(先入れ先出し)
-
PriorityChannel
- 優先度に基づいて順序付けする。
デフォルトはメッセージのpriorityヘッダーによって決定する。
- 優先度に基づいて順序付けする。
SubscribableChannelインターフェイスの実装(一部)
-
PublishSubscribeChannel
- サブスクライブされたすべてのハンドラーに対しメッセージを送信する。
-
DirectChannel
- デフォルトのチャネル
- サブスクライブされた単一のハンドラーに対しメッセージを送信する。
メッセージエンドポイント
Spring Integration における処理の中核を担い、何らかの処理を行うコンポーネントです。Spring MVC ではコントローラーをURLとマッピングしますが、Spring Integration ではメッセージエンドポイントにメッセージチャネルをマッピングします。
メッセージエンドポイントは基本的に利用者が実装する必要はありません。Spring Integationから様々な役割を持つエンドポイントが複数提供されています。ここでは主要な一部のエンドポイントのみ紹介します。
チャネルアダプター
外部のシステムのデータ(例えばhttpやメール、ファイル、MongoDBなど)とSpring Integration管理のメッセージを相互に変換し、チャネルに接合するエンドポイント。受信用と送信用があり、それぞれ統合したいシステムに合わせたチャネルアダプターを選択できます。
Spring Integration から様々なシステム間の統合をサポートするための統合エンドポイントが提供されています。
https://spring.pleiades.io/spring-integration/reference/endpoint-summary.html
メッセージトランスフォーマー
メッセージフィルター
メッセージを次のチャネルに送信するかしないかを制御するエンドポイント。
メッセージルーター
スプリッター
メッセージを複数に分割して次のチャネルに送信するエンドポイント。
アグリゲーター
複数のメッセージを単一のメッセージとして統合するためのエンドポイント。スプリッターで分割したメッセージを統合する場合などに使用される。
サービスアクティベーター
独自のハンドラメソッドを定義し、実装することのできる汎用的なエンドポイント。システム固有のビジネスロジックを実装できる。
実装、動作確認
最後に簡単な統合フローを実装していきたいと思います。
Spring Integration で統合フローを実装する方法としてxml、Java、Java DSLなどの方法があるようですが、ここではXML Configで実装していきます。
- 受信ファイルアダプターで名前の一覧が記載されたテキストファイルをポーリングして取得。メッセージに変換してチャネルに送信。
- DirectChannelを使用。そのままファイルスプリッターが受信。
- ファイル用のスプリッター。ファイルを一行ごと(名前)に分割して、次のチャネルに送信。
- QueueChannelを使用。送信されてきた名前をキューにバッファリングし、サービスアクティベーターからのポーリングを待機します。
- 定期的にポーリングを行い、名前を取得。Handler(HelloService#hello)に処理を委譲します。Handlerは処理している内容をコンソール出力し、「"hello " + 名前」を返却します。サービスアクティベーターは返却された文字列をペイロードとして次のチャネルに送信します。
- DirectChannelを使用。そのまま送信チャネルアダプターが受信。
- 送信ファイルアダプターは送信されてきたメッセージをファイルとして出力する。ファイル名は「元ファイル名 + "_" + 行番号」としています。
Mavenの依存関係定義は以下。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
<relativePath/>
</parent>
<groupId>com.spring.integration.example</groupId>
<artifactId>names-integration</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>names-integration</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<!-- ファイル統合用 -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
統合フローはXML Configを使用して定義しています。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
<!-- ① -->
<int-file:inbound-channel-adapter
id="filesIn" directory="file:c:\spring-integration-sample\tmp\input"
filename-pattern="*.txt">
<int:poller fixed-rate="5000" /> <!-- 5秒ごとに指定のフォルダをポーリング -->
</int-file:inbound-channel-adapter>
<!-- ② -->
<!-- inbound-channel-adapterのid属性のデフォルトのチャネルが生成されるっぽい(よくわかってない) -->
<!-- ③ -->
<int-file:splitter id="fileSplitter" charset="UTF-8"
input-channel="filesIn" output-channel="names" />
<!-- ④ -->
<int:channel id="names">
<int:queue capacity="10" /> <!-- 最大10個バッファリング -->
</int:channel>
<!-- ⑤ -->
<int:service-activator input-channel="names"
output-channel="filesOut" ref="helloService" method="hello">
<int:poller fixed-delay="5000" max-messages-per-poll="1" /> <!-- 5秒ごとに④のチャネルをポーリング -->
</int:service-activator>
<!-- ⑥ -->
<!-- outbound-channel-adapterのid属性のデフォルトのチャネルが生成されるっぽい(よくわかってない) -->
<!-- ⑦ -->
<int-file:outbound-channel-adapter
id="filesOut" directory="file:c:\spring-integration-sample\tmp\output"
filename-generator-expression="headers['file_name'].substring(0, headers['file_name'].lastIndexOf('.')) + '_' + headers['sequenceNumber'] + '.txt'" />
</beans>
@Service
をHelloService
に付与することでコンポーネントスキャンでBean登録できるようにしています。
package com.example;
import org.springframework.integration.file.FileHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
@Service
public class HelloService {
private static String FORMAT = "proccessing { filename : %s, name : %s }\n";
public String hello(Message<String> message) {
String name = message.getPayload();
String filename = (String)message.getHeaders().get(FileHeaders.FILENAME);
System.out.printf(FORMAT, filename, name);
// 戻り値の型でpayloadを構成
return "hello " + name;
}
}
以下はメインクラス。統合フロー用のXML Configをインポートしています。
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ImportResource;
@SpringBootApplication
@ImportResource("classpath:integration.xml")
public class SpringIntegrationSampleApplication {
public static void main(String[] args) {
SpringApplication.run(SpringIntegrationSampleApplication.class, args);
}
}
検証用の名前一覧ファイル。
Abbey
Alanna
Marty
Mike
Opal
Syd
Talon
動かしてみます。
アプリを実行して、C:\spring-integration-sample\tmp\input
にnames.txt
を配置します。すると数秒ごとに処理中のログが出力され、C:\spring-integration-sample\tmp\out
配下にファイルが出力されました。ポーリングがちゃんと機能しているようです。
終わりに
Spring Integrationについて概要、メインコンポーネントの説明をし、実装して動かしてみました。ほかのシステム統合フレームワークを知らないのでなんとも言い難いですが、非同期にシステム統合を実現できるため実用性は高いような気がしました。ただ、Spring MVC などに比べ技術ブログなどが少なく、イメージがしずらい面がありました。
初の技術ブログですが文章力がなく、わかりにくくなっている気がします。反省。。。