Edited at

ブロッキングとかノンブロッキングを理解したい

More than 1 year has passed since last update.

この記事は、Spring WebFluxの前提である、ブロッキングやノンブロッキンクとは何か、Servlet3.0の Async Servletや Servlet3.1の NonblockingI/Oとは何か、を理解することが目的です。


検証バージョン

> java -version

java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)

> ver
Microsoft Windows [Version 10.0.17134.165]


ブロッキングI/O

I/Oをする際(read,write)に、処理がブロックされる。例えばサーバがソケットをreadしたら、リクエストが届くまでスレッドをブロックして待つ。


BlockingAndSingleEchoServer.java

@Slf4j

public class BlockingAndSingleEchoServer implements EchoServer {
@Override
public void start() {
try (ServerSocketChannel ssc = ServerSocketChannel.open();) {
ssc.socket().bind(new InetSocketAddress(PORT));
log.info("サーバを起動しました");
while (true) {
// accept()したらクライアントが来るまでブロックされる
try (SocketChannel sc = ssc.accept();
InputStream in = sc.socket().getInputStream()
) {
// read()したら次が届くまでブロックされる
int c = 0;
while ((c = in.read()) != -1) {
System.out.print((char)c);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

TELNETでTCP接続してゆっくりと「hello world」と入力する。

サーバにメッセージが届くまで処理がブロックされていることがわかる。


コード例(シングルスレッド)

8080ポートに届いたリクエストをそのまま返すエコーサーバを作る。


BlockingAndSingleEchoServer.java

@Slf4j

public class BlockingAndSingleEchoServer {
public void start() {
try (ServerSocketChannel ssc = ServerSocketChannel.open();) {
ssc.socket().bind(new InetSocketAddress(8080));
log.info("サーバを起動しました");
while (true) {
try (SocketChannel sc = ssc.accept();//ブロックされる
BufferedReader in = new BufferedReader(new InputStreamReader(sc.socket().getInputStream()));
PrintWriter out = new PrintWriter(sc.socket().getOutputStream(), true);
) {
String line;
while ((line = in.readLine()) != null) {
log.info("recieved " + line + " from " + sc.socket().getRemoteSocketAddress());
log.info("echo " + line + " to " + sc.socket().getRemoteSocketAddress());
if ("goodbye".equals(line)) {
sc.socket().close();
break;
}
out.println(line);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}


実行例

Socketをaccept()したスレッドで処理を行うと、処理中は別のリクエストが受け付けられない。そのため、最大でも1つのリクエストしか同時に処理できない。

19:30:14.686 [main] INFO server.blocking.BlockingAndSingleEchoServer - サーバを起動しました

19:30:40.933 [main] INFO server.blocking.BlockingAndSingleEchoServer - recieved hello world from /192.168.11.116:52537
19:30:40.933 [main] INFO server.blocking.BlockingAndSingleEchoServer - echo hello world to /192.168.11.116:52537
19:30:50.535 [main] INFO server.blocking.BlockingAndSingleEchoServer - recieved goodbye from /192.168.11.116:52537
19:30:50.535 [main] INFO server.blocking.BlockingAndSingleEchoServer - echo goodbye to /192.168.11.116:52537
19:30:50.537 [main] INFO server.blocking.BlockingAndSingleEchoServer - recieved hello world from /192.168.11.116:52538
19:30:50.537 [main] INFO server.blocking.BlockingAndSingleEchoServer - echo hello world to /192.168.11.116:52538


コード例(マルチスレッド)

具体的な処理は別スレッドで非同期に実行すると、あるリクエストの処理中にも平行して別のリクエストを処理することができる。


BlockingAndMultiEchoServer.java

@Slf4j

public class BlockingAndMultiEchoServer implements EchoServer {
@Override
public void start() {
try (ServerSocketChannel ssc = ServerSocketChannel.open();) {
ssc.socket().bind(new InetSocketAddress(PORT));
log.info("サーバを起動しました " + ssc.socket().getLocalSocketAddress());
while (true) {
Socket socket = ssc.socket().accept();
log.info("別スレッドを起動します from " + socket.getRemoteSocketAddress());
// Socketがcloseされるとレスポンスが書き込めないので
// レスポンスを返却するスレッドでcloseする
new Thread(new EchoTask(socket)).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}

レスポンスを返却する処理で忘れずにsocketをクローズする。


EchoTask.java

  class EchoTask implements Runnable {

private final Socket socket;

EchoTask(Socket socket) {
this.socket = socket;
}

@Override
public void run() {
try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);) {
String line;
while ((line = in.readLine()) != null) {
log.info("recieved " + line + " from " + socket.getRemoteSocketAddress());
log.info("echo " + line + " to " + socket.getRemoteSocketAddress());
if ("goodbye".equals(line)) {
socket.close();
break;
}
out.println(line);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}



実行例(マルチスレッド)

マルチスレッドにすると、複数のリクエストを同時にさばくことができる。

ログからもスレッドが複数使われていることがわかる。

19:35:21.915 [main] INFO server.blocking.BlockingAndMultiEchoServer - サーバを起動しました /0:0:0:0:0:0:0:0:8080

19:35:44.794 [main] INFO server.blocking.BlockingAndMultiEchoServer - 別スレッドを起動します from /192.168.11.116:52539
19:35:47.724 [Thread-0] INFO server.blocking.BlockingAndMultiEchoServer - recieved hello world from /192.168.11.116:52539
19:35:47.725 [Thread-0] INFO server.blocking.BlockingAndMultiEchoServer - echo hello world to /192.168.11.116:52539
19:35:49.605 [main] INFO server.blocking.BlockingAndMultiEchoServer - 別スレッドを起動します from /192.168.11.116:52540
19:35:52.124 [Thread-1] INFO server.blocking.BlockingAndMultiEchoServer - recieved hello world from /192.168.11.116:52540
19:35:52.124 [Thread-1] INFO server.blocking.BlockingAndMultiEchoServer - echo hello world to /192.168.11.116:52540
19:35:55.573 [Thread-0] INFO server.blocking.BlockingAndMultiEchoServer - recieved goodbye from /192.168.11.116:52539
19:35:55.574 [Thread-0] INFO server.blocking.BlockingAndMultiEchoServer - echo goodbye to /192.168.11.116:52539


ブロッキングI/Oで何がいいか


  • read/writeすればブロックして処理が完了するのを待つため、シンプルにコーディングできる


ブロッキングI/Oで何が困るか


  • 複数リクエストを同時に処理するためには、同時リクエスト分のスレッドが必要になる

  • スレッドを生成するためには、メモリを割り当てる必要がある(参考 Spring WebFlux の概要を理解する)

  • ソケットの読み書きが終わるまでスレッドがブロックされるため、低速なネットワークの場合に非効率

  • ブロックされるスレッドが増えると、CPU効率が悪くなる


ノンブロッキングI/O

I/Oをする際(read,write)に、処理がブロックされない。

処理を呼び出したら、すぐに何かしら返ってくる。


コード例

ノンブロッキングI/Oは読み書きができない状態ならば、即座にリターンされる。そのため、読み書き可能かどうかをSelectorを使って監視して、読み書きできるときだけ処理する。


NonBlockingEchoServer.java

@Slf4j

public class NonBlockingEchoServer {
public void start() {
try (ServerSocketChannel ssc = ServerSocketChannel.open();
Selector selector = Selector.open();) {
// ノンブロッキングモードにしてSelectorに受付チャネルを登録する
ssc.configureBlocking(false);
ssc.socket().bind(new InetSocketAddress(8080));
ssc.register(selector, SelectionKey.OP_ACCEPT);
log.info("サーバが起動しました " + ssc.socket().getLocalSocketAddress());

// チャネルにイベントが登録されるまで待つ
while (selector.select() > 0) {
for (Iterator it = selector.selectedKeys().iterator(); it.hasNext(); ) {
SelectionKey key = (SelectionKey) it.next();
it.remove();

if (key.isAcceptable()) {
doAccept((ServerSocketChannel) key.channel(), selector);
} else if (key.isReadable()) {
doRead((SocketChannel) key.channel(), selector);
} else if (key.isWritable()) {
byte[] message = (byte[]) key.attachment();
doWrite((SocketChannel) key.channel(), selector, message);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}


ソケットを受け付けたときのメソッド。

読み込み可能かどうかを監視するためにSelectorにchannelを登録する。

  private void doAccept(ServerSocketChannel ssc, Selector selector) {

try {
SocketChannel channel = ssc.accept();
log.info("connected " + channel.socket().getRemoteSocketAddress());
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}

読み込み可能になったときのメソッド。

読み込みにはByteBufferを使う。つらい。

読み込み終わったらレスポンスに書き込むので、Selectorにchannelを登録する。

  public void doRead(SocketChannel channel, Selector selector) {

try {
ByteBuffer buffer = ByteBuffer.allocate(1024);

// ソケットから入力を読み込む
// コネクションが切れていればチャネルをクローズし、読み込めなければリターンする
int readBytes = channel.read(buffer);
if (readBytes == -1) {
log.info("disconnected " + channel.socket().getRemoteSocketAddress());
channel.close();
return;
}
if (readBytes == 0) {
return;
}

// 入力されたメッセージを取り出し、チャネルに登録する
buffer.flip();
byte[] bytes = new byte[buffer.limit()];
buffer.get(bytes);

String line = new String(buffer.array(), "UTF-8").replaceAll(System.getProperty("line.separator"), "");
log.info("recieved " + line + " from " + channel.socket().getRemoteSocketAddress());

log.info("register " + channel.socket().getRemoteSocketAddress());
channel.register(selector, SelectionKey.OP_WRITE, bytes);
} catch (IOException e) {
e.printStackTrace();
}
}

書き込み可能になったときのメソッド。

書き込みにもByteBufferを使う。ほんとつらい。

全てを書き込めるとは限らないのでhasRemaining()で残りがあれば再度書き込む。

全部レスポンスに書き出したらまた読み込みを受け付ける。

  public void doWrite(SocketChannel channel, Selector selector, byte[] message) {

try {
ByteBuffer byteBuffer = ByteBuffer.wrap(message);
channel.write(byteBuffer);
ByteBuffer restByteBuffer = byteBuffer.slice();

// ログに送信したメッセージを表示する
byteBuffer.flip();
byte[] sendBytes = new byte[byteBuffer.limit()];
byteBuffer.get(sendBytes);
String line = new String(sendBytes, "UTF-8").replaceAll(System.getProperty("line.separator"), "");
log.info("echo " + line + " to " + channel.socket().getRemoteSocketAddress());

// メッセージを最後まで出力したら入力を受け付ける
if (restByteBuffer.hasRemaining()) {
byte[] restBytes = new byte[restByteBuffer.limit()];
restByteBuffer.get(restBytes);
channel.register(selector, SelectionKey.OP_WRITE, restBytes);
} else {
channel.register(selector, SelectionKey.OP_READ);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}


実行例

1スレッドで複数クライアントの接続を同時に処理できることがわかる。

ログからも、1スレッドで処理できていることがわかる。

19:55:37.077 [main] INFO server.nonblocking.NonBlockingEchoServer - サーバが起動しました /0:0:0:0:0:0:0:0:8080

19:55:51.765 [main] INFO server.nonblocking.NonBlockingEchoServer - connected /192.168.11.116:52545
19:55:54.751 [main] INFO server.nonblocking.NonBlockingEchoServer - recieved hello world from /192.168.11.116:52545
19:55:54.751 [main] INFO server.nonblocking.NonBlockingEchoServer - register /192.168.11.116:52545
19:55:54.751 [main] INFO server.nonblocking.NonBlockingEchoServer - echo hello world to /192.168.11.116:52545
19:55:56.683 [main] INFO server.nonblocking.NonBlockingEchoServer - connected /192.168.11.116:52546
19:55:59.350 [main] INFO server.nonblocking.NonBlockingEchoServer - recieved hello world from /192.168.11.116:52546
19:55:59.350 [main] INFO server.nonblocking.NonBlockingEchoServer - register /192.168.11.116:52546
19:55:59.350 [main] INFO server.nonblocking.NonBlockingEchoServer - echo hello world to /192.168.11.116:52546


ノンブロッキングI/Oで何がいいか


  • 1スレッドで複数のリクエストを処理することが可能

  • 低速なネットワークでも効率的に処理できる

  • CPU効率がいい

1スレッドで複数のリクエストを処理することが"可能"と書いたのは、実際には、リクエストごとにスレッドを割り当てるアーキテクチャ(Tomcatなど)と本当に1つのスレッドで複数のリクエストを処理するアーキテクチャ(Node.jsなど)の2通りあるため。

シングルスレッドのイベントループモデルの場合、あるリクエストを処理している間は別のリクエストが処理されないため、重たい処理をする用途には向かない(と思う)。


ノンブロッキングI/Oで何が困るか


  • 実装が難しい


Tomcatのコネクタ実装

TomcatはブロッキングI/OやノンブロッキングI/Oをどう利用しているのか調べた。

さらに詳しく知りたいかたは『詳解 Tomcat』を参照してください。

種類
検証バージョン

BIO
Tomcat 8.0.53

NIO
Tomcat 8.5.32


ブロッキングI/O

Tomcat7以前のデフォルトのHTTP1.1のコネクタであるHttp11Protocol(JIoEndpoint+Http11ConnectionHandler)は、ブロッキングI/Oを利用している。

JIoEndpointクラスのAcceptorでsocketをaccept()し、workerスレッドでリクエスト処理を実行している。

参考 Github Tomcat 8.0.53 JIoEndpoint.java

抜粋すると、以下のようになる。


JIoEndpoint.java

protected class Acceptor extends AbstractEndpoint.Acceptor {

@Override
public void run() {
// Loop until we receive a shutdown command
while (running) {
...
socket = serverSocketFactory.acceptSocket(serverSocket);
...
}

protected boolean processSocket(Socket socket) {
...
//別スレッドで実行する
getExecutor().execute(new SocketProcessor(wrapper));
...
return true;
}



HTTP Keep-alive

HTTP Keep-aliveは、一度接続したコネクションを再利用する仕組み。コネクションを切断せずに次のリクエストを待たないといけないため、BIOの場合は次のリクエストを待つ間もずっとworkerスレッドを占有することになる。

実際に動かしてみると、リクエスト処理が完了したあともHTTP Keep-aliveのためにスレッドがRUNNINGのままになっていることがわかる。またHTTPパケットが送られていない時も、常にスレッドを占有していることがわかる。


今までありがとうBIOコネクタ

Tomcat8.5.0からはBIOコネクタが無くなり、デフォルトはNIOコネクタになった。


ノンブロッキングI/O

TomcatのHttp11NioProtocol(+NioEndpoint+Http11ConnectionHandler)はノンブロッキングI/O。ソースコードの抜粋が難しいので図だけ。

参考 Github Tomcat 8.5.32


SocketProcessorの実行タイミング

Telnetで1行ずつデータを送信すると、HTTPヘッダーを送信するたびにworkerがRUNNINGになり、通信がないときはWAITになっていることがわかる。


HTTP Keep-alive

読み込み可能になったタイミングでSocketProcessorを割り当てることができるため、HTTP Keep-aliveのためにworkerを占有する必要がない。


BIOとNIOを比べる


  • BIO


    • Socket#accept()以降、スレッドを占有する

    • ネットワーク遅延に弱い

    • HTTP Keep-alive のためにスレッドを占有する



  • NIO


    • 読み書き可能になったタイミングのみ、スレッドを占有する

    • ネットワーク遅延に強い

    • HTTP Keep-alive のためにスレッドを占有しない



  • 共通


    • HttpServlet#serviceの処理中は、リクエストごとにworkerスレッドを占有する



詳しくは以下を参照してください。

参考 Connector Comparison

参考 Connecting Tomcat to the World


その他の仕組み

Spring WebFluxを実現するベースになっている以下の機能についても調べた。

種類
検証バージョン

Servlet3.0 Async Servlet
Tomcat 8.5.32

Servlet3.1 NonBlocking I/O
Tomcat 8.5.32


Servlet3.0 Async Servlet

処理中にworkerスレッドを解放するための仕組み。

例えばロングポーリングやSSEなどでHTTPのコネクションを維持していると、workerスレッドを長時間占有することになる。

そのため、上記のような処理をworkerスレッドから切り離して実行する。

レスポンスをまとめて書き込むか、

chunckedで送ることができる。


コード例

chunckedで送る方法を試す。

別スレッドを起動してAsyncContextを利用し、レスポンスを書き込む。

@Slf4j

@WebServlet(urlPatterns = "/", asyncSupported = true)
public class EchoServlet extends HttpServlet {
Executor executor = new ScheduledThreadPoolExecutor(10);

@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
log.info("begin doGet");
AsyncContext ac = req.startAsync();
executor.execute(new SlowTask(ac));
log.info("end doGet");
}
}

@Slf4j
class SlowTask implements Runnable {
private AsyncContext ac;

SlowTask(AsyncContext ac) {
this.ac = ac;
}

@Override
public void run() {
log.info("begin AsyncContext#start");

try {
PrintWriter writer = ac.getResponse().getWriter();
for (int i = 0; i < 5; i++) {
writer.println("task :" + i);
writer.flush();
log.info("send chuncked data");
TimeUnit.SECONDS.sleep(2);
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
ac.complete();
log.info("end AsyncContext#start");
}
}


動作確認

curlでリクエストを送るとchunckedで受信されるのがわかる。

ログから、SlowTaskが別スレッドで実行されていることがわかる。

15:03:45.079 [http-nio-8080-exec-1] INFO example.com.echo.EchoServlet - begin doGet

15:03:45.083 [http-nio-8080-exec-1] INFO example.com.echo.EchoServlet - end doGet
15:03:45.083 [pool-1-thread-1] INFO example.com.echo.SlowTask - begin AsyncContext#start
15:03:45.088 [pool-1-thread-1] INFO example.com.echo.SlowTask - send chuncked data
15:03:47.088 [pool-1-thread-1] INFO example.com.echo.SlowTask - send chuncked data
15:03:49.090 [pool-1-thread-1] INFO example.com.echo.SlowTask - send chuncked data
15:03:51.091 [pool-1-thread-1] INFO example.com.echo.SlowTask - send chuncked data
15:03:53.092 [pool-1-thread-1] INFO example.com.echo.SlowTask - send chuncked data
15:03:55.093 [pool-1-thread-1] INFO example.com.echo.SlowTask - end AsyncContext#start


疑問

結局はアプリケーション内でスレッドを生成するんだから、APサーバ全体で見たときのスレッド生成コストは変わらない気がする。

アプリケーション内でスレッドを生成せずに、workerスレッド数の上限を上げればいいと思った。いまいち理解できてない。

外部通信といった待ち時間が大部分を占める処理は、ReactiveX系のライブラリを利用すれば、少ないスレッドで同時に多くの処理を実行できる。

Servlet3.0 Async Servlet 以前はリクエストごとにworkerスレッドを割り当てていたが、Servletコンテナ外のスレッドプールを利用できるようになった。そのため、下記のような構成にすることができる。

workerスレッドはすぐに外部スレッドに処理を委譲するため少ないスレッドで済み、アプリケーション全体のスレッド数を減らせる。

逆に処理にブロッキングな箇所がなければ多くのスレッドが必要になるため、外部スレッドプールを利用する意味は薄い。


Servlet APIはブロッキングなAPI

HTTPリクエストのボディをreadするためのAPIであるHttpServletRequest#getInputStream()の実装クラスはInputStream#read()を実装する必要がある。


This is an abstract class that a servlet container implements. Subclasses of this class must implement the java.io.InputStream.read() method.

参考 ServletInputStream Javadoc


そして、InputStreamのAPIは、


入力ストリームからデータの次のバイトを読み込みます。バイト値は、0 - 255の範囲のintとして返されます。ストリームの終わりに達したために読み込むバイトがない場合は、-1が返されます。入力データが読み込めるようになるか、ストリームの終わりが検出されるか、または例外が発生するまで、このメソッドはブロックされます。

参考 InputStream


つまりブロッキングなAPI。

HTTPレスポンスをwriteするHttpServletResponse#OutputStream()も同様にブロッキングなAPI。

Tomcatのコネクタのマニュアルからも、HTTPリクエストのボディを読み込む処理、および、HTTPレスポンスを書き込む処理がブロッキングな処理になっていることがわかる。

参考 Connector Comparison


コード例

検証のために、Tomcatのworkerスレッドを1に制限する。


Main.java

    public static void main(String[] args) throws ServletException, LifecycleException, MalformedURLException {

Tomcat tomcat = new Tomcat();
Connector connector = tomcat.getConnector();
connector.setAttribute("maxThreads", "1");

// アプリケーションのルートとWEB資材配置場所の指定
StandardContext ctx = (StandardContext) tomcat.addWebapp("/", new File("src/main/webapp").getAbsolutePath());

File additionWebInfClasses = new File("target/classes");
WebResourceRoot resources = new StandardRoot(ctx);
resources.addPreResources(new DirResourceSet(resources, "/WEB-INF/classes", additionWebInfClasses.getAbsolutePath(), "/"));
ctx.setResources(resources);

tomcat.start();
tomcat.getServer().await();

}


以下を実行する。

@Slf4j

@WebServlet(urlPatterns = "/")
public class SampleServlet extends HttpServlet {

@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
log.info("begin doPost");
InputStream in = req.getInputStream();
int c = 0;
while ((c = in.read()) != -1) {
log.info(Character.toString((char) c));
}
log.info("end doPost");
}

}


動作確認

以下のことがわかる。


  • HTTPのボディを読み書きする処理はブロッキング


  • InputStream#read()で処理が止まり、スレッドを占有している

  • HTTPボディを読み込む前に(ヘッダーを読み込んだタイミングで)、HttpServlet#service()が実行される


  • InputStream#read()している間はWAITになっている


スレッド状態がWAITになっている理由


  • RUNNING = JVM上で処理が実行中

  • WAIT = 別のスレッドのアクション実行を待機中

Socket#read()を呼び出した場合、スレッドはRUNNINGになる。

しかし今回はWAITになっている。なぜか?

今回利用したNIOコネクタは、内部でByteBufferなどのAPIを利用する。しかしServletのInputStream#read()は、ブロッキングなAPI。そのため、ノンブロッキングなAPIをブロッキングなAPIのようにふるまわせる必要がある。

そこで、Tomcatは到着したリクエストボディをバッファしておき、InputStream#read()が呼び出されたときにバッファを返す。バッファが空になったときはNioBlockingSelector#read()を呼び出して、次にリクエストボディのデータが届くまでCoundDownLatch#await()を利用してスレッドを停止する。リクエストボディのデータが届くとCoundDownLatch.countDown()を呼びだし、スレッドを再開する。

これによって、ノンブロッキングなAPIをブロッキングなAPIのように扱っている。

参考 Github Tomcat8.5.32 NioBlockingSelector

CoundDownLatchで止められたスレッドのステータスはWAITになるため、RUNNINGではなくWAITになっている。(WAITだからといって別のリクエストは受け付けない点に注意)

参考 CoundDownLatch Javadoc


Servlet3.1 Nonblocking I/O

HTTPのボディに対する読み書きをノンブロッキングにするための仕組み。

Java Servlet 3.1の新機能――クラウド対応のJava EE 7でどう変わるのか?【Java EEエキスパート・シリーズ】


コード例

@Slf4j

@WebServlet(urlPatterns = "/non-blocking", asyncSupported = true)
public class NonBlockingIOServlet extends HttpServlet {

@Override
public void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
log.info("begin doGet");
AsyncContext ctx = req.startAsync();
ServletInputStream input = req.getInputStream();
input.setReadListener(new NonBlockingReadListener(input, ctx));
log.info("end doGet");
}
}

@Slf4j
class NonBlockingReadListener implements ReadListener {
private final ServletInputStream inputStream;
private final AsyncContext ctx;

NonBlockingReadListener(ServletInputStream inputStream, AsyncContext ctx) {
log.info("ReadListener is initialized");
this.inputStream = inputStream;
this.ctx = ctx;
}

@Override
public void onDataAvailable() throws IOException {
log.info("onDataAvaliable");

StringBuilder sb = new StringBuilder();
int len = -1;
byte b[] = new byte[1024];

while (!inputStream.isFinished() && inputStream.isReady() && (len = inputStream.read(b)) != -1) {
String data = new String(b, 0, len);
log.info("recieved : " + data);
}
}

@Override
public void onAllDataRead() throws IOException {
log.info("onAllDataRead");
ctx.complete();
}

@Override
public void onError(Throwable throwable) {
log.info("onError : " + throwable);
ctx.complete();
}
}

検証のために、Tomcatのworkerスレッドを1に制限して実行する。


実行例

実行例からデータ到着のたびにonDataAvailable()が実行されていることがわかる。

また、1スレッドで複数リクエストをさばけることがわかる。

09:05:25.073 [http-nio-8080-exec-1] INFO example.com.NonBlockingIOServlet - begin doGet

09:05:25.073 [http-nio-8080-exec-1] INFO example.com.NonBlockingReadListener - ReadListener is initialized
09:05:25.073 [http-nio-8080-exec-1] INFO example.com.NonBlockingIOServlet - end doGet
09:05:27.293 [http-nio-8080-exec-1] INFO example.com.NonBlockingReadListener - onDataAvaliable
09:05:27.294 [http-nio-8080-exec-1] INFO example.com.NonBlockingReadListener - recieved : hello
09:05:33.100 [http-nio-8080-exec-1] INFO example.com.NonBlockingIOServlet - begin doGet
09:05:33.100 [http-nio-8080-exec-1] INFO example.com.NonBlockingReadListener - ReadListener is initialized
09:05:33.101 [http-nio-8080-exec-1] INFO example.com.NonBlockingIOServlet - end doGet
09:05:36.465 [http-nio-8080-exec-1] INFO example.com.NonBlockingReadListener - onDataAvaliable
09:05:36.465 [http-nio-8080-exec-1] INFO example.com.NonBlockingReadListener - recieved : goodbye

09:05:39.679 [http-nio-8080-exec-1] INFO example.com.NonBlockingReadListener - onDataAvaliable
09:05:39.679 [http-nio-8080-exec-1] INFO example.com.NonBlockingReadListener - recieved : hi
09:05:39.679 [http-nio-8080-exec-1] INFO example.com.NonBlockingReadListener - onAllDataRead


何がいいのか


  • HTTPのボディも効率的に読み書きできるようになるため、でかいファイルアップロードや低速なネットワークからのアクセスを効率的に処理できる。


何が困るか


  • 実装が難しい


そして時代はSpring5へ

Controllerの引数や戻り値をReactorのAPIにしとくと、Servlet3.1のノンブロッキングI/Oを利用して読み書きしてくれるようになる。

参考 Spring 5に備えるリアクティブプログラミング入門

参考 Spring WebFlux の概要を理解する


結論

IOTやモバイル端末の普及により、低速なネットワークからの大量のアクセスが増えてきた昨今、CPUを効率的に利用しつつ大量のリクエストを処理できるノンブロッキングな処理というのが注目を集めている。

ただし、ノンブロッキング処理はたいがい難易度が上がるので、必要になったら採用を検討するくらいでちょうど良さそう。

ということで俺はギョウミーアプリに戻るぞー!ジョジョー!


サンプルコード

https://github.com/kimullamen/echo-server


参考