Help us understand the problem. What is going on with this article?

Spring IntegrationのTCP機能でSocketへ直接アクセスする方法

More than 1 year has passed since last update.

Spring IntegrationのTCP機能を使ってTCPクライアントを作る際に、Socketを直接触りたいケースが(たまに)あったので、Socketを直接触る方法もメモっておきます。

私がSocketを直接触りたいと思ったケースは・・・正常時の切断はFIN、異常時(何らかのエラーなどが発生した際)の切断はRSTで!という接続要件をみたす必要があった時でした。なお、Spring IntegrationのTCP機能では、ConnectionFactoryの設定で切断方法としてFINとRSTどちらを使うかを指定することはできるのですが、(調べた範囲では・・・)条件によって切り替えることはできませんでした。

TcpSocketSupportの利用

Spring Integrationは、フレームワーク側で生成したSocket(NIOの場合はSocketChannelに紐付くSocket)に対して任意の処理を適用することをサポートするために、TcpSocketSupport というインタフェースを提供しています。デフォルトでは、DefaultTcpSocketSupportというクラスが適用されていますが、Socketの状態を変えるような処理が実行されることはありません。

例外が発生したらRSTする実装の適用

Spring IntegrationのTCP機能には、Spring Integration内の処理で発生したイベント(接続、切断、エラー検知など)をハンドリングするための仕組みがサポートされており、今回紹介するコードではこれらのイベントをハンドリングして、例外検知後の切断をRSTで行うようなコードにしてみました。
なお、ここで紹介するコードは実際のアプリケーションに適用した(する予定の)コードではないので、そのまま使うことはできないという点は補足しておきます。(あくまでサンプルです)

package com.example.demo;

import java.net.Socket;
import java.net.SocketException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.event.EventListener;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Transformers;
import org.springframework.integration.ip.dsl.Tcp;
import org.springframework.integration.ip.tcp.connection.*;
import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException;
import org.springframework.integration.ip.tcp.serializer.TcpCodecs;

@SpringBootApplication
public class SprIntDemoApplication {

  public static void main(String[] args) {
    SpringApplication.run(SprIntDemoApplication.class, args);
  }

  // TCP Server
  @Bean
  public IntegrationFlow integrationInboundFlow() {
    return IntegrationFlows.from(Tcp.inboundGateway(Tcp.nioServer(5555)
        .serializer(TcpCodecs.crlf())
        .deserializer(TcpCodecs.crlf())
        .get()))
        .transform(Transformers.objectToString()) // byte[] -> String
        .transform(m -> m) // 受け取ったメッセージをそのまま返信する
        .get();
  }

  // TCP Client
  @Bean
  public IntegrationFlow integrationOutboundFlow(ApplicationEventPublisher publisher) {
    AbstractClientConnectionFactory factory = Tcp.nioClient("localhost", 5555)
        .serializer(TcpCodecs.crlf())
        .deserializer(TcpCodecs.crlf())
        .tcpSocketSupport(socketManager()) // 生成したSocketを管理するためのサポートクラスを適用
        .get();
    factory.setApplicationEventPublisher(publisher);
    return flow -> flow.handle(Tcp.outboundGateway(factory))
        .transform(Transformers.objectToString());  // byte[] -> String
  }

  @Bean
  public SocketManager socketManager() {
    return new SocketManager();
  }

  @Bean
  public MessagingTemplate messagingTemplate() {
    return new MessagingTemplate();
  }

  static class SocketManager extends DefaultTcpSocketSupport {
    private final Map<Integer, Socket> sockets = new ConcurrentHashMap<>();

    @Override
    public void postProcessSocket(Socket socket) {
      super.postProcessSocket(socket);
      sockets.put(socket.getLocalPort(), socket); // 異常検知時にSocketへアクセスできるように内部に保存しておく
    }

    @EventListener
    public void handleTcpConnectionExceptionEvent(TcpConnectionExceptionEvent event) {
      try {
        int localPort = ((TcpConnection) event.getSource()).getSocketInfo().getLocalPort();
        Socket socket = sockets.get(localPort);
        if (!socket.isClosed() && !(event.getCause() instanceof SoftEndOfStreamException)) {
          sockets.get(localPort).setSoLinger(true, 0); // RSTによる切断が行われるように設定
        }
      } catch (SocketException e) {
        // ignore
      }
    }

    @EventListener
    public void handleTcpConnectionCloseEvent(TcpConnectionCloseEvent event) {
      sockets.remove(((TcpConnection) event.getSource()).getSocketInfo().getLocalPort()); // 切断後に不要になるSocketをお掃除
    }

  }

}
package com.example.demo;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpIntDemoApplicationTests {

  @Autowired
  MessagingTemplate template;

  @Test
  public void contextLoads() {
    Message<?> reply = template.sendAndReceive("integrationOutboundFlow.input",
        MessageBuilder.withPayload("hello!").build());
    System.out.println("reply: " + reply);
  }

}

ちなみに・・・上記コードのまま実行すると処理中にエラー(例外)が発生することはないため、エラーを発生させるためにはサーバ側で処理中にSocketを切断したりする必要がありますが、本エントリーではそのあたりは割愛します。

まとめ

ドキュメントを読む感じだと TcpSocketSupport の役割を超えた使い方をしている気もしますが・・・Socketを触ることができる仕組みがサポートされていたのは助かりました。

参考ドキュメント

kazuki43zoo
Javaエンジニアで、SpringやMyBatisらへんにそれなりに詳しいです。お仕事のつながりで「Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発」を共著させてもらいました!
https://kazuki43zoo.github.io
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした