Java Socket の InputStream で read() 待ちしてるスレッドは、ブロッキングモードでI/O待ちになってます。
これを解放する、つまりread()待ちを解除して、例外をスローするなりして処理を進めるにはどうするか調べてみました。
- 結論:Thread.interrupt() を使ったread()待ちスレッドへの割り込みは効果がなくて、Socket.close()を呼ぶことで解放できました。
- 他にも read() 待ちを解放する方法はあります、詳細は Socket.getInputStream() のJavaDocを参照してください:
- https://docs.oracle.com/javase/jp/8/docs/api/java/net/Socket.html#getInputStream--
以下、実験メモ。
まずクライアントソケットからの受信データを読みっぱなしで、データを返さないブラックホールのようなTCPサーバを作ります。
作例 : https://github.com/msakamoto-sf/javasnack/blob/master/src/main/java/javasnack/tool/BlackholeTcpServer.java
次にブロッキングモードで上記TCPサーバに接続し、read()待ちを行うようなタスクを Runnable
インターフェイスを実装して作ります。その際、タスクを実行中のスレッドとは別のスレッド(=外部)からスレッドに割り込みを掛けたり、read()待ちしてるSocketをcloseするためのメソッドを追加しておきます。
作例:
class BlockingIOTask implements Runnable {
final int no;
final int remotePort;
volatile Socket clientSocket = null;
volatile Thread currentThread = null;
volatile boolean isInterrupted = false;
volatile boolean done = false;
public BlockingIOTask(final int no, final int remotePort) {
this.no = no;
this.remotePort = remotePort;
}
/**
* 外部からread()待ち中のスレッドに割り込みをかけるためのメソッド
*/
public void interruptThread() {
if (Objects.nonNull(currentThread)) {
currentThread.interrupt();
}
}
/**
* 外部からread()待ち中のSocketをclose()するためのメソッド
*/
public void closeSocket() {
if (Objects.nonNull(clientSocket) && clientSocket.isConnected()) {
try {
clientSocket.close();
} catch (IOException ignore) {
}
}
}
@Override
public void run() {
// このタスクを実行中のスレッド参照を保存
currentThread = Thread.currentThread();
InetSocketAddress connectTo = new InetSocketAddress("127.0.0.1", this.remotePort);
clientSocket = new Socket();
try {
clientSocket.connect(connectTo);
// 一応、適当なデータを送信しておく。
OutputStream out = clientSocket.getOutputStream();
out.write(new byte[] { 0x00, 0x01, 0x02 });
out.write(new byte[] { 0x03, 0x04, 0x05 });
out.flush();
InputStream in = clientSocket.getInputStream();
// read()待ち開始
in.read();
} catch (IOException e) {
// 後でテストコードに組み込むためのTestNG用のassertion。
assertTrue(e instanceof SocketException);
assertEquals(e.getMessage(), "Socket closed");
} finally {
if (clientSocket.isConnected()) {
try {
clientSocket.close();
} catch (IOException ignore) {
}
}
}
this.isInterrupted = currentThread.isInterrupted();
this.done = true;
}
}
実際の挙動をテストコード(TestNGを使ってます)に組み込んで確認します。
細かい解説はコード中のコメントを参照してください。
public class TestGracefulShutdownBlockingIOTaskDemo {
BlackholeTcpServer blackholeTcpServer = null;
int blackholeTcpServerPort = 0;
@BeforeClass
public void beforeClass() throws IOException {
// 本テストクラスを実行する前にブラックホールTCPサーバを起動し、
// ランダムに割り当てられたlisteningポート番号を控えておく。
BlackholeTcpServer server = new BlackholeTcpServer();
this.blackholeTcpServerPort = server.start();
}
@AfterClass
public void afterClass() {
if (Objects.nonNull(this.blackholeTcpServer)) {
this.blackholeTcpServer.stop();
}
}
@Test
void testGracefulShutdownBlockingIOTaskDemo() throws InterruptedException {
// BlockingIOTask を4スレッド分起動
final int NUM = 4;
ExecutorService es = Executors.newFixedThreadPool(NUM);
BlockingIOTask tasks[] = new BlockingIOTask[NUM];
for (int i = 0; i < NUM; i++) {
tasks[i] = new BlockingIOTask(i, this.blackholeTcpServerPort);
es.submit(tasks[i]);
}
// ExecutorService.shutdown()を呼び、まず新しいタスクのsubmit()を禁止する(=入り口を閉じた)。
es.shutdown();
// 50ms 待ってみて、タスクが終了したかチェックする。
// もちろん、read()待ちでblockしているため、タスクは終了していない。
assertFalse(es.awaitTermination(50, TimeUnit.MILLISECONDS));
// ExecutorService.shutdownNow()を呼ぶ。タスク実行中のスレッドに割り込みをかける。
List<Runnable> l = es.shutdownNow();
assertEquals(l.size(), 0); // 新たなタスクは投入していないため、実行待ちのタスクは0となる。
// ExecutorService.awaitTermination() が false を返しているため、
// read()待ちのblockingは、スレッド割り込みでは解除されないことが分かる。
assertFalse(es.awaitTermination(50, TimeUnit.MILLISECONDS));
// 試しに Thread.interrupt() を手動で呼び出す。
for (int i = 0; i < NUM; i++) {
tasks[i].interruptThread();
}
// やっぱり、スレッド割り込みではread()待ち blocking は解除されない。
assertFalse(es.awaitTermination(50, TimeUnit.MILLISECONDS));
// Socket.close() を外部から呼び出してみる。
for (int i = 0; i < NUM; i++) {
tasks[i].closeSocket();
}
// ExecutorService.awaitTermination()がtrueを返すので、
// タスクが全て終了したことが分かる。
assertTrue(es.awaitTermination(50, TimeUnit.MILLISECONDS));
// 実際にタスクのフラグ状況をチェックする。まずdoneフラグは全てtrueになっている。
assertTrue(tasks[0].done);
assertTrue(tasks[1].done);
assertTrue(tasks[2].done);
assertTrue(tasks[3].done);
// さらに Thread.isInterrupted() を保存したフラグも全てtrueになっており、
// interruptは受け付けていた、にも関わらずread()待ちは解放されなかったことが分かる。
assertTrue(tasks[0].isInterrupted);
assertTrue(tasks[1].isInterrupted);
assertTrue(tasks[2].isInterrupted);
assertTrue(tasks[3].isInterrupted);
}
}
以上、簡単な実験メモでした。