Posted at

Spring IntegrationのTCP機能でSocketへ直接アクセスする方法

Spring IntegrationのTCP機能を使ってTCPクライアントを作る際に、Socketを直接触りたいケースが(たまに)あったので、Socketを直接触る方法もメモっておきます。

私がSocketを直接触りたいと思ったケースは・・・正常時の切断はFIN、異常時(何らかのエラーなどが発生した際)の切断はRSTで!という接続要件をみたす必要があった時でした。なお、Spring IntegrationのTCP機能では、ConnectionFactoryの設定で切断方法としてFINとRSTどちらを使うかを指定することはできるのですが、(調べた範囲では・・・)条件によって切り替えることはできませんでした。


TcpSocketSupportの利用

Spring Integrationは、フレームワーク側で生成したSocket(NIOの場合はSocketChannelに紐付くSocket)に対して任意の処理を適用することをサポートするために、TcpSocketSupport というインタフェースを提供しています。デフォルトでは、DefaultTcpSocketSupportというクラスが適用されていますが、Socketの状態を変えるような処理が実行されることはありません。


例外が発生したらRSTする実装の適用

Spring IntegrationのTCP機能には、Spring Integration内の処理で発生したイベント(接続、切断、エラー検知など)をハンドリングするための仕組みがサポートされており、今回紹介するコードではこれらのイベントをハンドリングして、例外検知後の切断をRSTで行うようなコードにしてみました。

なお、ここで紹介するコードは実際のアプリケーションに適用した(する予定の)コードではないので、そのまま使うことはできないという点は補足しておきます。(あくまでサンプルです)

package com.example.demo;

import java.net.Socket;
import java.net.SocketException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.event.EventListener;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Transformers;
import org.springframework.integration.ip.dsl.Tcp;
import org.springframework.integration.ip.tcp.connection.*;
import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException;
import org.springframework.integration.ip.tcp.serializer.TcpCodecs;

@SpringBootApplication
public class SprIntDemoApplication {

public static void main(String[] args) {
SpringApplication.run(SprIntDemoApplication.class, args);
}

// TCP Server
@Bean
public IntegrationFlow integrationInboundFlow() {
return IntegrationFlows.from(Tcp.inboundGateway(Tcp.nioServer(5555)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.crlf())
.get()))
.transform(Transformers.objectToString()) // byte[] -> String
.transform(m -> m) // 受け取ったメッセージをそのまま返信する
.get();
}

// TCP Client
@Bean
public IntegrationFlow integrationOutboundFlow(ApplicationEventPublisher publisher) {
AbstractClientConnectionFactory factory = Tcp.nioClient("localhost", 5555)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.crlf())
.tcpSocketSupport(socketManager()) // 生成したSocketを管理するためのサポートクラスを適用
.get();
factory.setApplicationEventPublisher(publisher);
return flow -> flow.handle(Tcp.outboundGateway(factory))
.transform(Transformers.objectToString()); // byte[] -> String
}

@Bean
public SocketManager socketManager() {
return new SocketManager();
}

@Bean
public MessagingTemplate messagingTemplate() {
return new MessagingTemplate();
}

static class SocketManager extends DefaultTcpSocketSupport {
private final Map<Integer, Socket> sockets = new ConcurrentHashMap<>();

@Override
public void postProcessSocket(Socket socket) {
super.postProcessSocket(socket);
sockets.put(socket.getLocalPort(), socket); // 異常検知時にSocketへアクセスできるように内部に保存しておく
}

@EventListener
public void handleTcpConnectionExceptionEvent(TcpConnectionExceptionEvent event) {
try {
int localPort = ((TcpConnection) event.getSource()).getSocketInfo().getLocalPort();
Socket socket = sockets.get(localPort);
if (!socket.isClosed() && !(event.getCause() instanceof SoftEndOfStreamException)) {
sockets.get(localPort).setSoLinger(true, 0); // RSTによる切断が行われるように設定
}
} catch (SocketException e) {
// ignore
}
}

@EventListener
public void handleTcpConnectionCloseEvent(TcpConnectionCloseEvent event) {
sockets.remove(((TcpConnection) event.getSource()).getSocketInfo().getLocalPort()); // 切断後に不要になるSocketをお掃除
}

}

}

package com.example.demo;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpIntDemoApplicationTests {

@Autowired
MessagingTemplate template;

@Test
public void contextLoads() {
Message<?> reply = template.sendAndReceive("integrationOutboundFlow.input",
MessageBuilder.withPayload("hello!").build());
System.out.println("reply: " + reply);
}

}

ちなみに・・・上記コードのまま実行すると処理中にエラー(例外)が発生することはないため、エラーを発生させるためにはサーバ側で処理中にSocketを切断したりする必要がありますが、本エントリーではそのあたりは割愛します。


まとめ

ドキュメントを読む感じだと TcpSocketSupport の役割を超えた使い方をしている気もしますが・・・Socketを触ることができる仕組みがサポートされていたのは助かりました。


参考ドキュメント