Help us understand the problem. What is going on with this article?

勉強メモ/Java SocketのInputStreamでread()待ちはThread.interrupt()では解放されない

More than 1 year has passed since last update.

Java Socket の InputStream で read() 待ちしてるスレッドは、ブロッキングモードでI/O待ちになってます。
これを解放する、つまりread()待ちを解除して、例外をスローするなりして処理を進めるにはどうするか調べてみました。

以下、実験メモ。

まずクライアントソケットからの受信データを読みっぱなしで、データを返さないブラックホールのようなTCPサーバを作ります。
作例 : https://github.com/msakamoto-sf/javasnack/blob/master/src/main/java/javasnack/tool/BlackholeTcpServer.java

次にブロッキングモードで上記TCPサーバに接続し、read()待ちを行うようなタスクを Runnable インターフェイスを実装して作ります。その際、タスクを実行中のスレッドとは別のスレッド(=外部)からスレッドに割り込みを掛けたり、read()待ちしてるSocketをcloseするためのメソッドを追加しておきます。
作例:

class BlockingIOTask implements Runnable {
    final int no;
    final int remotePort;
    volatile Socket clientSocket = null;
    volatile Thread currentThread = null;
    volatile boolean isInterrupted = false;
    volatile boolean done = false;

    public BlockingIOTask(final int no, final int remotePort) {
        this.no = no;
        this.remotePort = remotePort;
    }

    /**
     * 外部からread()待ち中のスレッドに割り込みをかけるためのメソッド
     */
    public void interruptThread() {
        if (Objects.nonNull(currentThread)) {
            currentThread.interrupt();
        }
    }

    /**
     * 外部からread()待ち中のSocketをclose()するためのメソッド
     */
    public void closeSocket() {
        if (Objects.nonNull(clientSocket) && clientSocket.isConnected()) {
            try {
                clientSocket.close();
            } catch (IOException ignore) {
            }
        }
    }

    @Override
    public void run() {
        // このタスクを実行中のスレッド参照を保存
        currentThread = Thread.currentThread();
        InetSocketAddress connectTo = new InetSocketAddress("127.0.0.1", this.remotePort);
        clientSocket = new Socket();
        try {
            clientSocket.connect(connectTo);
            // 一応、適当なデータを送信しておく。
            OutputStream out = clientSocket.getOutputStream();
            out.write(new byte[] { 0x00, 0x01, 0x02 });
            out.write(new byte[] { 0x03, 0x04, 0x05 });
            out.flush();
            InputStream in = clientSocket.getInputStream();
            // read()待ち開始
            in.read();
        } catch (IOException e) {
            // 後でテストコードに組み込むためのTestNG用のassertion。
            assertTrue(e instanceof SocketException);
            assertEquals(e.getMessage(), "Socket closed");
        } finally {
            if (clientSocket.isConnected()) {
                try {
                    clientSocket.close();
                } catch (IOException ignore) {
                }
            }
        }
        this.isInterrupted = currentThread.isInterrupted();
        this.done = true;
    }
}

実際の挙動をテストコード(TestNGを使ってます)に組み込んで確認します。
細かい解説はコード中のコメントを参照してください。

public class TestGracefulShutdownBlockingIOTaskDemo {

    BlackholeTcpServer blackholeTcpServer = null;
    int blackholeTcpServerPort = 0;

    @BeforeClass
    public void beforeClass() throws IOException {
        // 本テストクラスを実行する前にブラックホールTCPサーバを起動し、
        // ランダムに割り当てられたlisteningポート番号を控えておく。
        BlackholeTcpServer server = new BlackholeTcpServer();
        this.blackholeTcpServerPort = server.start();
    }

    @AfterClass
    public void afterClass() {
        if (Objects.nonNull(this.blackholeTcpServer)) {
            this.blackholeTcpServer.stop();
        }
    }

    @Test
    void testGracefulShutdownBlockingIOTaskDemo() throws InterruptedException {

        // BlockingIOTask を4スレッド分起動
        final int NUM = 4;
        ExecutorService es = Executors.newFixedThreadPool(NUM);
        BlockingIOTask tasks[] = new BlockingIOTask[NUM];
        for (int i = 0; i < NUM; i++) {
            tasks[i] = new BlockingIOTask(i, this.blackholeTcpServerPort);
            es.submit(tasks[i]);
        }

        // ExecutorService.shutdown()を呼び、まず新しいタスクのsubmit()を禁止する(=入り口を閉じた)。
        es.shutdown();

        // 50ms 待ってみて、タスクが終了したかチェックする。
        // もちろん、read()待ちでblockしているため、タスクは終了していない。
        assertFalse(es.awaitTermination(50, TimeUnit.MILLISECONDS));

        // ExecutorService.shutdownNow()を呼ぶ。タスク実行中のスレッドに割り込みをかける。
        List<Runnable> l = es.shutdownNow();
        assertEquals(l.size(), 0); // 新たなタスクは投入していないため、実行待ちのタスクは0となる。

        // ExecutorService.awaitTermination() が false を返しているため、
        // read()待ちのblockingは、スレッド割り込みでは解除されないことが分かる。
        assertFalse(es.awaitTermination(50, TimeUnit.MILLISECONDS));

        // 試しに Thread.interrupt() を手動で呼び出す。
        for (int i = 0; i < NUM; i++) {
            tasks[i].interruptThread();
        }

        // やっぱり、スレッド割り込みではread()待ち blocking は解除されない。
        assertFalse(es.awaitTermination(50, TimeUnit.MILLISECONDS));

        // Socket.close() を外部から呼び出してみる。
        for (int i = 0; i < NUM; i++) {
            tasks[i].closeSocket();
        }
        // ExecutorService.awaitTermination()がtrueを返すので、
        // タスクが全て終了したことが分かる。
        assertTrue(es.awaitTermination(50, TimeUnit.MILLISECONDS));

        // 実際にタスクのフラグ状況をチェックする。まずdoneフラグは全てtrueになっている。
        assertTrue(tasks[0].done);
        assertTrue(tasks[1].done);
        assertTrue(tasks[2].done);
        assertTrue(tasks[3].done);
        // さらに Thread.isInterrupted() を保存したフラグも全てtrueになっており、
        // interruptは受け付けていた、にも関わらずread()待ちは解放されなかったことが分かる。
        assertTrue(tasks[0].isInterrupted);
        assertTrue(tasks[1].isInterrupted);
        assertTrue(tasks[2].isInterrupted);
        assertTrue(tasks[3].isInterrupted);
    }
}

以上、簡単な実験メモでした。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away