4
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Spring IntegrationのTCP機能でSocketへ直接アクセスする方法

Posted at

Spring IntegrationのTCP機能を使ってTCPクライアントを作る際に、Socketを直接触りたいケースが(たまに)あったので、Socketを直接触る方法もメモっておきます。

私がSocketを直接触りたいと思ったケースは・・・正常時の切断はFIN、異常時(何らかのエラーなどが発生した際)の切断はRSTで!という接続要件をみたす必要があった時でした。なお、Spring IntegrationのTCP機能では、ConnectionFactoryの設定で切断方法としてFINとRSTどちらを使うかを指定することはできるのですが、(調べた範囲では・・・)条件によって切り替えることはできませんでした。

TcpSocketSupportの利用

Spring Integrationは、フレームワーク側で生成したSocket(NIOの場合はSocketChannelに紐付くSocket)に対して任意の処理を適用することをサポートするために、TcpSocketSupport というインタフェースを提供しています。デフォルトでは、DefaultTcpSocketSupportというクラスが適用されていますが、Socketの状態を変えるような処理が実行されることはありません。

例外が発生したらRSTする実装の適用

Spring IntegrationのTCP機能には、Spring Integration内の処理で発生したイベント(接続、切断、エラー検知など)をハンドリングするための仕組みがサポートされており、今回紹介するコードではこれらのイベントをハンドリングして、例外検知後の切断をRSTで行うようなコードにしてみました。
なお、ここで紹介するコードは実際のアプリケーションに適用した(する予定の)コードではないので、そのまま使うことはできないという点は補足しておきます。(あくまでサンプルです)

package com.example.demo;

import java.net.Socket;
import java.net.SocketException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.event.EventListener;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Transformers;
import org.springframework.integration.ip.dsl.Tcp;
import org.springframework.integration.ip.tcp.connection.*;
import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException;
import org.springframework.integration.ip.tcp.serializer.TcpCodecs;

@SpringBootApplication
public class SprIntDemoApplication {

  public static void main(String[] args) {
    SpringApplication.run(SprIntDemoApplication.class, args);
  }

  // TCP Server
  @Bean
  public IntegrationFlow integrationInboundFlow() {
    return IntegrationFlows.from(Tcp.inboundGateway(Tcp.nioServer(5555)
        .serializer(TcpCodecs.crlf())
        .deserializer(TcpCodecs.crlf())
        .get()))
        .transform(Transformers.objectToString()) // byte[] -> String
        .transform(m -> m) // 受け取ったメッセージをそのまま返信する
        .get();
  }

  // TCP Client
  @Bean
  public IntegrationFlow integrationOutboundFlow(ApplicationEventPublisher publisher) {
    AbstractClientConnectionFactory factory = Tcp.nioClient("localhost", 5555)
        .serializer(TcpCodecs.crlf())
        .deserializer(TcpCodecs.crlf())
        .tcpSocketSupport(socketManager()) // 生成したSocketを管理するためのサポートクラスを適用
        .get();
    factory.setApplicationEventPublisher(publisher);
    return flow -> flow.handle(Tcp.outboundGateway(factory))
        .transform(Transformers.objectToString());  // byte[] -> String
  }

  @Bean
  public SocketManager socketManager() {
    return new SocketManager();
  }

  @Bean
  public MessagingTemplate messagingTemplate() {
    return new MessagingTemplate();
  }

  static class SocketManager extends DefaultTcpSocketSupport {
    private final Map<Integer, Socket> sockets = new ConcurrentHashMap<>();

    @Override
    public void postProcessSocket(Socket socket) {
      super.postProcessSocket(socket);
      sockets.put(socket.getLocalPort(), socket); // 異常検知時にSocketへアクセスできるように内部に保存しておく
    }

    @EventListener
    public void handleTcpConnectionExceptionEvent(TcpConnectionExceptionEvent event) {
      try {
        int localPort = ((TcpConnection) event.getSource()).getSocketInfo().getLocalPort();
        Socket socket = sockets.get(localPort);
        if (!socket.isClosed() && !(event.getCause() instanceof SoftEndOfStreamException)) {
          sockets.get(localPort).setSoLinger(true, 0); // RSTによる切断が行われるように設定
        }
      } catch (SocketException e) {
        // ignore
      }
    }

    @EventListener
    public void handleTcpConnectionCloseEvent(TcpConnectionCloseEvent event) {
      sockets.remove(((TcpConnection) event.getSource()).getSocketInfo().getLocalPort()); // 切断後に不要になるSocketをお掃除
    }

  }

}
package com.example.demo;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpIntDemoApplicationTests {

  @Autowired
  MessagingTemplate template;

  @Test
  public void contextLoads() {
    Message<?> reply = template.sendAndReceive("integrationOutboundFlow.input",
        MessageBuilder.withPayload("hello!").build());
    System.out.println("reply: " + reply);
  }

}

ちなみに・・・上記コードのまま実行すると処理中にエラー(例外)が発生することはないため、エラーを発生させるためにはサーバ側で処理中にSocketを切断したりする必要がありますが、本エントリーではそのあたりは割愛します。

まとめ

ドキュメントを読む感じだと TcpSocketSupport の役割を超えた使い方をしている気もしますが・・・Socketを触ることができる仕組みがサポートされていたのは助かりました。

参考ドキュメント

4
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?