2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

勉強メモ/Java SocketのInputStreamでread()待ちはThread.interrupt()では解放されない

Posted at

Java Socket の InputStream で read() 待ちしてるスレッドは、ブロッキングモードでI/O待ちになってます。
これを解放する、つまりread()待ちを解除して、例外をスローするなりして処理を進めるにはどうするか調べてみました。

以下、実験メモ。

まずクライアントソケットからの受信データを読みっぱなしで、データを返さないブラックホールのようなTCPサーバを作ります。
作例 : https://github.com/msakamoto-sf/javasnack/blob/master/src/main/java/javasnack/tool/BlackholeTcpServer.java

次にブロッキングモードで上記TCPサーバに接続し、read()待ちを行うようなタスクを Runnable インターフェイスを実装して作ります。その際、タスクを実行中のスレッドとは別のスレッド(=外部)からスレッドに割り込みを掛けたり、read()待ちしてるSocketをcloseするためのメソッドを追加しておきます。
作例:

class BlockingIOTask implements Runnable {
    final int no;
    final int remotePort;
    volatile Socket clientSocket = null;
    volatile Thread currentThread = null;
    volatile boolean isInterrupted = false;
    volatile boolean done = false;

    public BlockingIOTask(final int no, final int remotePort) {
        this.no = no;
        this.remotePort = remotePort;
    }

    /**
     * 外部からread()待ち中のスレッドに割り込みをかけるためのメソッド
     */
    public void interruptThread() {
        if (Objects.nonNull(currentThread)) {
            currentThread.interrupt();
        }
    }

    /**
     * 外部からread()待ち中のSocketをclose()するためのメソッド
     */
    public void closeSocket() {
        if (Objects.nonNull(clientSocket) && clientSocket.isConnected()) {
            try {
                clientSocket.close();
            } catch (IOException ignore) {
            }
        }
    }

    @Override
    public void run() {
        // このタスクを実行中のスレッド参照を保存
        currentThread = Thread.currentThread();
        InetSocketAddress connectTo = new InetSocketAddress("127.0.0.1", this.remotePort);
        clientSocket = new Socket();
        try {
            clientSocket.connect(connectTo);
            // 一応、適当なデータを送信しておく。
            OutputStream out = clientSocket.getOutputStream();
            out.write(new byte[] { 0x00, 0x01, 0x02 });
            out.write(new byte[] { 0x03, 0x04, 0x05 });
            out.flush();
            InputStream in = clientSocket.getInputStream();
            // read()待ち開始
            in.read();
        } catch (IOException e) {
            // 後でテストコードに組み込むためのTestNG用のassertion。
            assertTrue(e instanceof SocketException);
            assertEquals(e.getMessage(), "Socket closed");
        } finally {
            if (clientSocket.isConnected()) {
                try {
                    clientSocket.close();
                } catch (IOException ignore) {
                }
            }
        }
        this.isInterrupted = currentThread.isInterrupted();
        this.done = true;
    }
}

実際の挙動をテストコード(TestNGを使ってます)に組み込んで確認します。
細かい解説はコード中のコメントを参照してください。

public class TestGracefulShutdownBlockingIOTaskDemo {

    BlackholeTcpServer blackholeTcpServer = null;
    int blackholeTcpServerPort = 0;

    @BeforeClass
    public void beforeClass() throws IOException {
        // 本テストクラスを実行する前にブラックホールTCPサーバを起動し、
        // ランダムに割り当てられたlisteningポート番号を控えておく。
        BlackholeTcpServer server = new BlackholeTcpServer();
        this.blackholeTcpServerPort = server.start();
    }

    @AfterClass
    public void afterClass() {
        if (Objects.nonNull(this.blackholeTcpServer)) {
            this.blackholeTcpServer.stop();
        }
    }

    @Test
    void testGracefulShutdownBlockingIOTaskDemo() throws InterruptedException {

        // BlockingIOTask を4スレッド分起動
        final int NUM = 4;
        ExecutorService es = Executors.newFixedThreadPool(NUM);
        BlockingIOTask tasks[] = new BlockingIOTask[NUM];
        for (int i = 0; i < NUM; i++) {
            tasks[i] = new BlockingIOTask(i, this.blackholeTcpServerPort);
            es.submit(tasks[i]);
        }

        // ExecutorService.shutdown()を呼び、まず新しいタスクのsubmit()を禁止する(=入り口を閉じた)。
        es.shutdown();

        // 50ms 待ってみて、タスクが終了したかチェックする。
        // もちろん、read()待ちでblockしているため、タスクは終了していない。
        assertFalse(es.awaitTermination(50, TimeUnit.MILLISECONDS));

        // ExecutorService.shutdownNow()を呼ぶ。タスク実行中のスレッドに割り込みをかける。
        List<Runnable> l = es.shutdownNow();
        assertEquals(l.size(), 0); // 新たなタスクは投入していないため、実行待ちのタスクは0となる。

        // ExecutorService.awaitTermination() が false を返しているため、
        // read()待ちのblockingは、スレッド割り込みでは解除されないことが分かる。
        assertFalse(es.awaitTermination(50, TimeUnit.MILLISECONDS));

        // 試しに Thread.interrupt() を手動で呼び出す。
        for (int i = 0; i < NUM; i++) {
            tasks[i].interruptThread();
        }

        // やっぱり、スレッド割り込みではread()待ち blocking は解除されない。
        assertFalse(es.awaitTermination(50, TimeUnit.MILLISECONDS));

        // Socket.close() を外部から呼び出してみる。
        for (int i = 0; i < NUM; i++) {
            tasks[i].closeSocket();
        }
        // ExecutorService.awaitTermination()がtrueを返すので、
        // タスクが全て終了したことが分かる。
        assertTrue(es.awaitTermination(50, TimeUnit.MILLISECONDS));

        // 実際にタスクのフラグ状況をチェックする。まずdoneフラグは全てtrueになっている。
        assertTrue(tasks[0].done);
        assertTrue(tasks[1].done);
        assertTrue(tasks[2].done);
        assertTrue(tasks[3].done);
        // さらに Thread.isInterrupted() を保存したフラグも全てtrueになっており、
        // interruptは受け付けていた、にも関わらずread()待ちは解放されなかったことが分かる。
        assertTrue(tasks[0].isInterrupted);
        assertTrue(tasks[1].isInterrupted);
        assertTrue(tasks[2].isInterrupted);
        assertTrue(tasks[3].isInterrupted);
    }
}

以上、簡単な実験メモでした。

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?