Java

同期・非同期・ノンブロッキング・並列・リアクティブを区別する

Play Framework や Akka, RxJava, Spring5 での WebFlux などここ数年は非同期処理だとかリアクティブだとかが流行っていますが、

いまいち各処理ごとの特性とかが整理できなかったり、整理しても忘れてしまうことがあるので

自分向けのメモとして書いておこうと思います。

これらが指すものは OS・プログラム・IO などの文脈によっても変わりますので、おかしい部分があるかもしれません。間違いがあれば突っ込んでください。

IO を行うにはファイルやネットワークの通信先の準備ができるまで待つ必要があります。

また、準備ができて実際にデータの入出力を行う際も待つ必要があります。

前者の待ちがブロッキング(IOが開始できるまで待機する)、後者の待ちが同期(IOが終わるまで待機する)です。(多分)


同期・ブロッキング

普通の IO 処理を書く場合は、同期かつブロッキングになります。

HTTP で適当なホストのレスポンスを取得するコードは次の通りです。

public static String httpGet(String url) throws Exception {

URL u = new URL(url);
HttpURLConnection con = (HttpURLConnection)u.openConnection();
con.connect();
try(InputStream is = con.getInputStream()) {
BufferedReader br = new BufferedReader(new InputStreamReader(is, "utf-8"));

StringBuilder sb = new StringBuilder();
String line;
while((line = br.readLine()) != null) {
sb.append(line + "\n");
}
return sb.toString();
}
}

相手との通信に時間がかかったり取得するデータ量が多いと、同期・ブロッキング処理だと今の IO 処理が終わるまで次の処理を行えません。

これを解消していくのが非同期処理やノンブロッキング IO です。


マルチスレッド

同期・ブロッキング処理を別のスレッドで処理します。

これにより、IO 処理を割り当てられたスレッドは同期・ブロッキングですが、

IO 処理を呼び出した元のスレッドは IO 処理中に別の処理ができるので非同期処理になります。

次は前述の同期・ブロッキング処理をスレッドプール上で実行するサンプルです。

(Javaには非同期 IO を扱う AsynchronousSocketChannel もありますが割愛します)

ExecutorService pool = Executors.newCachedThreadPool();

Future<String> result = pool.submit(() -> httpGet("sample"));

submit で記述したタスクが別スレッドで開始されますが、結果を待たず Future が呼び出し元に返されます。

呼び出し元は後で Future から結果を取得できます。

非同期処理の結果は Future で将来の計算結果を取得する方法や、コールバックを設定して今の IO 処理が終わった後に続けて行う処理を追加する方法があります。

前者は Future#get などがあり、これは get した時点で同期になります。

後者の方式は Java だと CompletableFuture という API を使用します。こちらはコールバックをラムダ式で設定します。JavaScript の Promise、 async/await の特殊な文法もこれに相当します。この方法の場合、現在の非同期計算上にさらに処理を足していくので、最終的な結果を取得するときまでは同期しなくなります。


ノンブロッキング IO

ノンブロッキング IO では IO の準備ができない無い場合は準備できるまで待たずに、呼び出し元に制御を戻します。そして開始できる状態かは自分から後で確認しに行きます。

そのため IO がまだ開始できる状態でないなら、それを置いておいて別の IO を実行できます。

ここに、イベントループという無限ループを導入することでシングルスレッドでも複数の IO を受け付けできます。ループ中に複数の IO の状態を監視し、入出力の準備ができたものはループ中で処理を行います。その時だけ同期します。

ノンブロッキング IO は、OS 命令(O_NONBLOCKフラグを設定した read/write システムコールや select, pool, epollなど) やライブラリ(libuvなど) で実現します。

// ノンブロッキングIOで、nums回 url にアクセスする。

public static void nonBlockingIO(int nums, String url) throws IOException {

long start = System.currentTimeMillis();

// セレクターは登録したリソースに状況の変化があったら知らせてくれるもの。
Selector selector = SelectorProvider.provider().openSelector();

// 件数分、HTTP接続とGETリクエストの書き込みを行った後、セレクタに登録して、応答を待つ。
for (int i = 0; i < nums; i++) {

String host = url.substring(url.startsWith("https") ? 8 : 7);
SocketChannel chan = SocketChannel.open(new InetSocketAddress(host, 80));
chan.configureBlocking(false); // Non-Blocking

chan.write(ByteBuffer.wrap(("GET " + "/" + " HTTP/1.1\r\n").getBytes()));
chan.write(ByteBuffer.wrap(("HOST: " + host + "\r\n").getBytes()));
chan.write(ByteBuffer.wrap("Connection: close\r\n".getBytes()));
chan.write(ByteBuffer.wrap("\r\n".getBytes()));
chan.write(ByteBuffer.wrap("\r\n".getBytes()));

chan.register(selector, SelectionKey.OP_READ);
}

List<String> list = new ArrayList<>();
int selectCount = 0;

// 準備ができたものから受け取リ続ける。。
while(selector.select() > 0) {
Iterator<SelectionKey> itr = selector.selectedKeys().iterator();
while (itr.hasNext()) {
SelectionKey key = itr.next();
itr.remove(); // 取り出したキーは削除
if (key.isReadable()) {
System.out.println(key);
SocketChannel chan = (SocketChannel)key.channel();

ByteBuffer buf = ByteBuffer.allocate(4096);
int read = chan.read(buf);
StringBuilder sb = new StringBuilder();
while(read >= 0) {
buf.flip();
sb.append(StandardCharsets.UTF_8.decode(buf).toString());
buf.clear();
read = chan.read(buf);
}
list.add(sb.toString());
selectCount++;
chan.close();
key.cancel();
if (selectCount == nums) {
//全てのチャネルを取得したら、終了。
selector.wakeup();
}
}
}
}

System.out.println(list.get(0));
System.out.println(list.get(3));
System.out.println(list.get(9));
}

ノンブロッキング IO をコンビニのレジに例えてみます。

コンビニのレジを一人で担当する場合、一人目のお客さんが弁当を温めてと言ったら弁当を温めている間お客さんを横に立たせて、次のお客さんの対応すると思います。そして弁当が温め終わったら、最初のお客さんの対応に戻り、弁当を渡します。

一方マルチスレッドはお客さん一人ずつに店員を一人割り当てていますので、対応時間が長いと大量の店員が必要になります。

ノンブロッキングIO は Java では Netty という Webサーバーが採用しています。少ないスレッドで大量のアクセスをさばけます。 Java 以外では node.js が libuv によるノンブロッキングIO を全面的に採用してシングルスレッドで IO を多重化しています。


マルチスレッドとノンブロッキング IO どっちがいいの?

冒頭の Play Framework, Akka, WebFlux を見ているとノンブロッキング IO を基本として、マルチスレッドを組み合わせるが現代の標準っぽく感じます。

Java並行処理プログラミング

を読み返したら、ノンブロッキング IO のプログラムは難しいしマルチスレッドでも性能は十分だからマルチスレッドにしたら的なことが書いてありましたが、すでに 10 年以上前の本です。

今でも並行処理の基本を知るための最適な一冊(ただし絶版)ですが、

その時にはスマートフォンや IoT の登場により、爆発的にネットワークアクセスが増えることは予想されていなかったのだろうなと。

マルチスレッドにして、同期処理を別スレッドにしたら呼び出し側から見れば同期するコードは無くなりますが、アプリケーション全体で見れば結局どこかが同期していることに変わりはありません。

よってアプリケーションへの同時アクセス数が増大し時間のかかる同期処理があるとスレッドプール内のスレッドが足りなくなり、パフォーマンス低下を招きます。

ノンブロッキング だと 1 スレッドで IO を多重化できますが、時間のかかる IO 処理があったり、CPU バウンドの処理がある場合は、そこがボトルネックで全体のパフォーマンスが落ちます。

うっかり無限ループを書こうものなら、イベントループスレッドが完全に停止するので全ての処理が止まります。

そこで、ノンブロッキング IO をマルチスレッドやマルチプロセスにしたり、どうしても非同期にできない処理を切り離して行う場所としてスレッドプールを併用します。

どうしても非同期にできない処理の代表は JDBC です。

Play Framework は非同期・ノンブロッキングなフレームワークと言っていますが、DBアクセスを行う処理についてはスレッドプールを別にして実行せよという指針があります。

いつになるのかわかりませんが async-JDBC (java.sql2) という提案があるので、それが登場したら全てがノンブロッキングになるアプリケーションが実現できそうです。

https://events.rainfocus.com/catalog/oracle/oow17/catalogjavaone17?search=CON1491&showEnrolled=false


並行処理 (Concurrency)

非同期処理との区別があってないような感じですが、非同期処理は単発の IO 処理を別スレッドで行う程度のニュアンスで用いることが多い気がします。

並行処理は異なるタスクを協調させながら同時に動かし1つのアプリケーションを作るものを指すことがあります。

例えば GUI アプリケーションで、長い処理の実行中にキャンセルで処理を打ち切ったりするとか、

Producer/Consumerパターンのような、処理対象データの生成と処理を別々のスレッドで行うとかがあります。

並行処理では複数タスクの協調が大事になるので、並行処理のためのライブラリが用意されています。

Java では、 java.util.concurrent パッケージに多数のライブラリが用意されています。

例えば次のようなものがあります。


  • Executor などスレッドプールや Future といった並行処理の実行基盤を提供するもの

  • AtomicInteger などのアトミックな処理ができるデータ型

  • ConcurrentHashMap などのマルチスレッド環境でも効率的に扱えるコレクション

  • Semaphore, Lock, Mutex, CountdownLatch など並行タスクの協調を行うもの

こういったライブラリは専門家の経験と知見の塊ですので積極的に使用し、低レベルなスレッドの API を使うことはデバッグやテストが難しくなるので避けます。


並列 (Parallel)

非同期処理や並行は主に IO バウンドな処理の効率化に用います。

一方、長大な配列の各要素に1億回計算を行うというような CPU バウンドの処理に対して、

非同期処理ではUXの改善はできても処理時間を短縮することはできません。

並列処理はタスクを複数のタスクに分割しそれをマルチコアで同時に処理します。

そのため、並行処理はIO待ちの時に別スレッドを割り当てたりすることで シングルコアでも実現できますが、

並列処理を実現するためにはマルチコアが必須になります。

百枚のはがきのあて名書きを何人かで手分けしてやる感じですね。

Java では並列処理は、ForkJoinPool や、 Stream API の parallel メソッドを使用します(実際 Stream API の内部実装でも ForkJoinPool を使っています)。

ForkJoinPool は、work steal というアルゴリズムで実装したスレッドプールで、大量のタスクに対して各スレッドが自分でタスクを取りに行く動きをすることで、複数のスレッドが常に働き続けるような社畜根性にあふれた振る舞いをしています。


リアクティブ

リアクティブ宣言、リアクティブシステム、リアクティブプログラムと色々ありますが、

ここではリアクティブプログラムについて触れます。

つまりは、 ReactiveExtention や RxJava などです。

リアクティブプログラムは 今までに出てきた技術要素の上に成り立つライブラリ・API という位置づけです。

大量・無限に発生するデータを、データの発生都度処理していくというプログラムスタイルです。

例えば、ファイルの中身を取得して別のファイルに書き出すというプログラムを書くことを考えてみます。

ファイルの中身をすべてメモリにロードしてファイルに書き出すというプログラムは簡単ですが、ファイルサイズが 1G とかだったりするとファイルを開くまでに時間がかかるし、ヒープサイズが足りずにアプリケーションが死んだりします。

そこで、読み込みと書き出しのファイルを2つのも開き、片方から1行読みだしたら片方に1行書き出すという風にしてみます。

そうするとヒープを消費せずにファイルが書きだせます。


余談ですが私はバッチ処理でDBからCSVを書きだすという内容の処理で同様の改善を行ったことがあり、その時はヒープ使用量が 6GB から 200MB まで減りました。


リアクティブプログラムは上記のような、データの送信元(Publisher)から、データの送信先( Subscriber) にデータが発生する度に送る処理に対する汎用的な API です。

データの送信元・送信先をファイルやネットワークにしたり、処理をマルチスレッドにしたり、複数の送信元・送信先を混在させたり、データの加工をラムダ式で設定出来たりといった機能があります。

また、送信元・送信先の処理能力に合わせてデータの送信量を調整する バックプレッシャー という機能もあります。

非同期処理を使ってデータを後から取得するのは簡単です。しかし、取得するデータが大量である場合、データが揃うまでの処理時間が長くなったり、あるいはリソースが枯渇する可能性があります。

リアクティブでは、大量データの要求であっても、データの一部が用意できたらその時点で相手にデータを送ります。

無限に発生するデータにも対応できますし、大量アクセスでも安定して対応できます。

このような特徴は例えばログ送信のような大量・無限に発生し続けるデータを逐次処理するのに有効です。

リアクティブシステムが目差すのは、リアクティブプログラムによる非同期かつノンブロッキングと分散処理で、多くのクライアントに素早く応答することだと言えそうです。

主にサーバーサイドの文脈で述べましたが、リアクティブの適用範囲は広く GUI アプリで発生するイベントをうまく扱うための手段としても用います。