まえがき
Java/Androidでのノンブロッキング通信に続いて,NIO2を使ったJavaでのノンブロッキング通信について解説します.
これも以外と記事が見当たらないので,そもそも需要が無さそうですが備忘録代わりに残しておきます.
Asynchronous系クラスについて
NIO2でのTCP通信にはAsynchronous系のクラスを使います.Asynchronous系のメソッドは引数に**「成功したときに呼び出されるコールバック」と「失敗したときに呼び出されるコールバック」**を指定するのが特徴です.Futureクラスを使って後からブロッキング処理する方法もありますが,今回はノンブロッキング通信を行うため割愛します.
サーバ側
このプログラムでは,サーバ側はAccept処理をした後,指定サイズだけ読み込み,それをそのままクライアントに送信します.とりあえず何も考えず一つのメソッドに全部押し込んだ例を使って解説します.無名インナクラスを多用するので階層が深くなるため,インデント等に注意してください.
ソース
サーバ側のソースを抜粋します.
public static void server(Consumer<Throwable> error) throws IOException {
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
server.setOption(StandardSocketOptions.SO_REUSEADDR, true);
server.bind(new InetSocketAddress("localhost", PORT));
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
@Override
public void completed(AsynchronousSocketChannel socket, Void attachment) {
System.out.println("SERVER: Accept");
ByteBuffer buffer = ByteBuffer.allocate(SIZE);
socket.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
System.out.println("SERVER: Read "+result);
if(result < 0){
error.accept(new IOException("Closed"));
try {
socket.close();
} catch(IOException e){
}
return;
}
if(buffer.hasRemaining()){
socket.read(attachment, attachment, this);
return;
}
buffer.flip();
socket.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
System.out.println("SERVER: Write "+result);
if(result == 0){
error.accept(new IOException("Write Error"));
return;
}
if (buffer.hasRemaining()) {
socket.write(buffer, buffer, this);
return;
}
System.out.println("SERVER: Done "+ Arrays.toString(buffer.array()));
try {
socket.close();
} catch(IOException e){
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
error.accept(exc);
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
error.accept(exc);
}
});
}
@Override
public void failed(Throwable exc, Void attachment) {
error.accept(exc);
}
});
}
以下に解説します.
Accept処理
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
server.setOption(StandardSocketOptions.SO_REUSEADDR, true);
server.bind(new InetSocketAddress("localhost", PORT));
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
@Override
public void completed(AsynchronousSocketChannel socket, Void attachment) {
// Read処理
}
@Override
public void failed(Throwable exc, Void attachment) {
// Error処理
}
}
サーバ側ではサーバソケットを開きます.NIOと同様にOptionを指定し,ローカルアドレスにバインドします.
ポート番号に0を指定した場合,空いてるポートで待ち受けしますが,NIOと違ってgetLocalAddress()
でローカルのSocketAddressオブジェクトを取得できます.
accept()
メソッドでは第1引数に添付するオブジェクト,第2引数にコールバックされるCompletionHandler
オブジェクトを指定します.
CompletionHandler
オブジェクトはGenericsで2つのクラスを指定し,1つめのクラスが成功時に渡されるオブジェクトのクラス,2つめのクラスが添付するオブジェクトのクラスになります.特に渡すオブジェクトが無ければVoidクラスを指定しておけば良いです.
実装するメソッドは成功時に呼び出されるcompleted()
と失敗時に呼び出されるfailed()
の2つで,accept()
メソッドの場合は成功時に接続したクライアントとのAsynchronousSocketChannelオブジェクトが渡されます.
accept()
メソッドでCompletionHandlerの登録が終わると処理はブロッキングされずに次の処理に移ります.実際にクライアントから接続があった場合は登録しておいたCompletionHandlerに従って処理が行われます.
注意点としてacceptメソッドはタイムアウトしません.タイムアウト処理が必要な場合は別途WatchDogスレッドを立てるなどしてタイムアウトさせるなりしてください.
Read処理
Accept処理に成功すると,引数で受け取ったAsynchronousSocketオブジェクトを使用して指定サイズだけ読み込みを行います.
ByteBuffer buffer = ByteBuffer.allocate(SIZE);
socket.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
if(result < 0){
// Close処理
return;
}
if(attachment.hasRemaining()){
socket.read(attachment, attachment, this);
return;
}
// Write処理
}
@Override
public void failed(Throwable exc, Void attachment){
// Error処理
}
}
指定サイズでByteBufferを作成し,それを指定して読み込みを指示します.
read
メソッドは使用するByteBuffer, 引数に添付するオブジェクト,CompletionHandlerを指定します.
この例ではByteBufferを添付するオブジェクトに指定していますが,無名インナクラスを使用する場合は指定せずに元のオブジェクトを使用できます.Java 7以前であればByteBufferの宣言時にfinal修飾子を付けてください.
CompletionHandler#completed()
メソッドの第1引数には読み取ったバイト数が渡されます.**ソケットが閉じられた場合はfailedが呼ばれるのではなく,読み取ったバイト数として-1が返されます.**if文でresultをチェックして適宜Close処理をしてください.
また,この例ではByteBufferに余りがある,つまり全て読み込んでいない場合は残りの読み込みを試みますが,第3引数のCompletionHandler
には無名インナクラスとして定義されている自分自身としてthis(CompletionHandler.this
でも良い)を指定しています.
よく忘れがちなのが,その後にreturn
を書くのを忘れないでください.ノンブロッキングなので普通に次の処理を開始します.
また,readメソッドではタイムアウトを指定することができます.
タイムアウトを指定するには下記のようにread
メソッドを呼び出してください.
socket.read(buffer, 10, TimeUnit.SECONDS, buffer, new CompletionHandler<Integer, ByteBuffer>() {
....
}
第3引数に単位,第2引数に値を指定します.この場合だとタイムアウトとして10秒が設定されます.タイムアウトした場合はCompletionHandler#failed()
が呼ばれ,InterruptedByTimeoutException
が引数に渡されます.
Write処理
ここまで来るとほとんど解説も必要ないと思いますが,最後はWrite処理です.
buffer.flip();
socket.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (result == 0){
// Error処理
return;
}
if (buffer.hasRemaining()) {
socket.write(buffer, buffer, this);
return;
}
System.out.println("SERVER: Done "+ Arrays.toString(buffer.array()));
try {
socket.close();
} catch(IOException e){
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
// Error処理
}
});
write
メソッドは第1引数に送信するByteBuffer,添付するオブジェクト,CompletionHandlerを指定します.
CompletionHandler#completed()
メソッドの第1引数には書き込まれたバイト数が渡されます.遭遇したことはないのですが,書き込みに失敗すると0が返されるらしいです.
この例ではByteBufferをチェックし,全てのデータが送信できていなければ再度送信,送信が完了していればソケットを閉じて終了します.
再度読み込みを行う場合はread
メソッドを呼び出せば良いのですが,そのときに指定するCompletionHandlerはread
メソッドに用いるCompletionHandlerでなければならないことに注意してください.(thisで渡されるのはwrite
に用いられるCompletionHandler)
Error処理
最後にエラー処理ですが,この例ではJava 8から追加されているConsumerクラスを用いて実装しています.Consumerクラスはaccept(T)のメソッドを1つだけ持つクラスです.TにThrowableクラスを指定してエラーの時は全てそこに投げています.ちゃんと実装する場合はサーバソケットを閉じるなりの処理を加えてください.
エラー処理としてスタックトレースするだけの場合は下記のようにLambda式を使って呼び出します.
server( (t)-> t.printStackTrace() );
クライアント側
クライアント側ではサーバ側でAcceptした代わりにConnect処理を行うだけです.
ソース
public static void client(Consumer<Throwable> error) throws IOException {
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
client.connect(new InetSocketAddress("localhost", PORT), null, new CompletionHandler<Void, Void>() {
@Override
public void completed(Void result, Void attachment) {
System.out.println("CLIENT: Connected");
ByteBuffer wBuf = ByteBuffer.allocate(SIZE);
Random rand = new Random();
while(wBuf.hasRemaining()){
wBuf.put((byte) (rand.nextInt() % 128));
}
wBuf.flip();
client.write(wBuf, wBuf, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
System.out.println("CLIENT: write "+result);
if(result == 0){
error.accept(new IOException("Write Error"));
return;
}
if(wBuf.hasRemaining()){
client.write(wBuf, wBuf, this);
return;
}
ByteBuffer rBuf = ByteBuffer.allocate(SIZE);
client.read(rBuf, rBuf, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
System.out.println("CLIENT: read "+result);
if(result < 0){
error.accept(new IOException("Closed"));
try {
client.close();
} catch(IOException e){
}
return;
}
if(rBuf.hasRemaining()){
client.read(rBuf, rBuf, this);
return;
}
System.out.println("CLIENT: done "+Arrays.toString(rBuf.array()));
try {
client.close();
} catch(IOException e){
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
error.accept(exc);
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
error.accept(exc);
}
});
}
@Override
public void failed(Throwable exc, Void attachment) {
error.accept(exc);
}
});
}
Connect処理
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
client.connect(new InetSocketAddress("localhost", PORT), null, new CompletionHandler<Void, Void>() {
@Override
public void completed(Void result, Void attachment) {
// Write処理
}
public void failed(Throwable exc, Void attachment){
// Error処理
}
}
クライアント側ではAsynchronousSocketChannelを直接オープンし,connectメソッドを呼び出してサーバに接続を試みます.
connectメソッドでは第1引数に接続先にInetSocketAddressオブジェクト,第2引数に添付するオブジェクト,第3引数にCompletionHandlerオブジェクトを指定します.
CompletionHandler#completedメソッドは第1引数としてVoid型が指定されているため,何も受け取りません.接続に成功したときに呼び出されるだけになります.
注意点として,connectメソッドにはタイムアウトが指定できません.
Socket#connectメソッドやAsynchronousSocket#read/writeメソッドにはタイムアウトが指定できるのに何故指定できないのか理解不能ですが,Java 8になっても追加されていないのでタイムアウトさせる気はないようです.WatchDogスレッドを別途立てる,Futureオブジェクトを取得してgetを呼び出すなど行ってください.
Read/Write/Errorの処理に関してはサーバ側とほぼ同様です.
ソース(サーバ・クライアント)
最後に全てのソースを掲載しておきます.mainの最後でwhileループに入っていますが,こうしておかないとプログラム中にスレッドが一つも無くなるためプログラムが終了してしまいます.
実際に実装する際も監視用のスレッド等が残るように注意してください.
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Arrays;
import java.util.Random;
import java.util.function.Consumer;
public class Main {
public static final int PORT = 12345;
public static final int SIZE = 20;
public static void main(String[] args) throws IOException, InterruptedException {
Consumer<Throwable> error = (t) -> t.printStackTrace();
server(error);
client(error);
while(true){
Thread.sleep(1);
}
}
public static void server(Consumer<Throwable> error) throws IOException {
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
server.setOption(StandardSocketOptions.SO_REUSEADDR, true);
server.bind(new InetSocketAddress("localhost", PORT));
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
@Override
public void completed(AsynchronousSocketChannel socket, Void attachment) {
System.out.println("SERVER: Accept");
ByteBuffer buffer = ByteBuffer.allocate(SIZE);
socket.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
System.out.println("SERVER: Read "+result);
if(result < 0){
error.accept(new IOException("Closed"));
try {
socket.close();
} catch(IOException e){
}
return;
}
if(buffer.hasRemaining()){
socket.read(attachment, attachment, this);
return;
}
buffer.flip();
socket.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
System.out.println("SERVER: Write "+result);
if(result == 0){
error.accept(new IOException("Write Error"));
return;
}
if (buffer.hasRemaining()) {
socket.write(buffer, buffer, this);
return;
}
System.out.println("SERVER: Done "+ Arrays.toString(buffer.array()));
try {
socket.close();
} catch(IOException e){
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
error.accept(exc);
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
error.accept(exc);
}
});
}
@Override
public void failed(Throwable exc, Void attachment) {
error.accept(exc);
}
});
}
public static void client(Consumer<Throwable> error) throws IOException {
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
client.connect(new InetSocketAddress("localhost", PORT), null, new CompletionHandler<Void, Void>() {
@Override
public void completed(Void result, Void attachment) {
System.out.println("CLIENT: Connected");
ByteBuffer wBuf = ByteBuffer.allocate(SIZE);
Random rand = new Random();
while(wBuf.hasRemaining()){
wBuf.put((byte) (rand.nextInt() % 128));
}
wBuf.flip();
client.write(wBuf, wBuf, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
System.out.println("CLIENT: write "+result);
if(result == 0){
error.accept(new IOException("Write Error"));
return;
}
if(wBuf.hasRemaining()){
client.write(wBuf, wBuf, this);
return;
}
ByteBuffer rBuf = ByteBuffer.allocate(SIZE);
client.read(rBuf, rBuf, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
System.out.println("CLIENT: read "+result);
if(result < 0){
error.accept(new IOException("Closed"));
try {
client.close();
} catch(IOException e){
}
return;
}
if(rBuf.hasRemaining()){
client.read(rBuf, rBuf, this);
return;
}
System.out.println("CLIENT: done "+Arrays.toString(rBuf.array()));
try {
client.close();
} catch(IOException e){
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
error.accept(exc);
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
error.accept(exc);
}
});
}
@Override
public void failed(Throwable exc, Void attachment) {
error.accept(exc);
}
});
}
}
おまけ
この例のように無名インナクラスを多用すると階層が深くなって可読性が著しく下がります.また,Server/Clientの両方でRead/Writeの処理はほぼ共通なのに再利用できていません.
とりあえずメソッドの宣言部分だけでも省略するためにlambda式を使いたいところですが,CompletionHandlerは2つのメソッドがあるため短縮することができません.苦肉の策としてCompletionHandlerを作成する中間メソッドを定義することで短縮した例を下記に示します.
ソース
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Arrays;
import java.util.Random;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public class Main2 {
public static final int PORT = 12345;
public static final int SIZE = 20;
public static <V, A> CompletionHandler<V, A> completion(BiConsumer<V, A> complete, BiConsumer<Throwable, A> error){
return new CompletionHandler<V, A>() {
@Override
public void completed(V result, A attachment) {
complete.accept(result, attachment);
}
@Override
public void failed(Throwable exc, A attachment) {
error.accept(exc, attachment);
}
};
}
public static void read(AsynchronousSocketChannel socket, ByteBuffer buffer, Consumer<Void> next, BiConsumer<Throwable, Void> error){
InetSocketAddress addr;
try {
addr = (InetSocketAddress) socket.getLocalAddress();
} catch(IOException e){
error.accept(e, null);
return;
}
socket.read(buffer, null, completion((r, v) ->{
System.out.println("Read "+addr+" "+r);
if(r < 0){
error.accept(new IOException("Closed"), null);
return;
}
if(buffer.hasRemaining()){
read(socket, buffer, next, error);
return;
}
buffer.flip();
next.accept(null);
}, error));
}
public static void write(AsynchronousSocketChannel socket, ByteBuffer buffer, Consumer<Void> next, BiConsumer<Throwable, Void> error){
InetSocketAddress addr;
try {
addr = (InetSocketAddress) socket.getLocalAddress();
} catch(IOException e){
error.accept(e, null);
return;
}
socket.write(buffer, null, completion((w, v)->{
System.out.println("Write "+addr+" "+w);
if (buffer.hasRemaining()) {
write(socket, buffer, next, error);
return;
}
buffer.clear();
next.accept(null);
}, error));
}
public static void server(BiConsumer<Throwable, Void> error) {
try {
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
server.setOption(StandardSocketOptions.SO_REUSEADDR, true);
server.bind(new InetSocketAddress("localhost", PORT));
server.accept(null, completion((s, v) -> {
ByteBuffer buffer = ByteBuffer.allocate(SIZE);
read(s, buffer, (vv) -> write(s, buffer, (vvv) -> System.out.println("SERVER: Done "+Arrays.toString(buffer.array())), error), error);
}, error));
} catch(IOException e){
error.accept(e, null);
}
}
public static void client(BiConsumer<Throwable, Void> error){
try {
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
client.connect(new InetSocketAddress("localhost", PORT), null, completion((v, vv)->{
ByteBuffer wBuf = ByteBuffer.allocate(SIZE);
Random rand = new Random();
while(wBuf.hasRemaining()){
wBuf.put((byte) (rand.nextInt() % 128));
}
wBuf.flip();
ByteBuffer rBuf = ByteBuffer.allocate(SIZE);
write(client, wBuf, (vvv) -> read(client, rBuf, (vvvv)-> System.out.println("CLIENT: Done "+Arrays.toString(rBuf.array())), error), error);
}, error));
} catch(IOException e){
error.accept(e, null);
}
}
public static void main(String[] args) throws IOException, InterruptedException {
BiConsumer<Throwable, Void> error = (t, v) -> t.printStackTrace();
server(error);
client(error);
while(true){
Thread.sleep(1);
}
}
}
以下,簡単に解説します.
Completionメソッド
public static <V, A> CompletionHandler<V, A> completion(BiConsumer<V, A> complete, BiConsumer<Throwable, A> error){
return new CompletionHandler<V, A>() {
@Override
public void completed(V result, A attachment) {
complete.accept(result, attachment);
}
@Override
public void failed(Throwable exc, A attachment) {
error.accept(exc, attachment);
}
};
}
このメソッドで,completed, failedのそれぞれについてBiConsumerクラスを受け取り,CompletedHandlerを作成します.BiConsumerクラスは引数を2つとるacceptメソッドだけのクラスになります.
こうした中間メソッドを作成することで,readのメソッドは下記のように書けます.
socket.read(buffer, null, completion((r, v) -> { /* Read処理 */ }, (t, o) -> { /* Error処理 */ }));
宣言部分が減るだけでだいぶスマートになります.
Read/Writeメソッド
Read/Writeメソッドでは引数にソケット,バッファと次に実行するConsumerオブジェクト,エラー用のBiConsumerオブジェクトを受け取り増す.
読み込み・書き込みが完了したら次に実行するConsumerオブジェクトを実行します.
public static void read(AsynchronousSocketChannel socket, ByteBuffer buffer, Consumer<Void> next, BiConsumer<Throwable, Void> error){
socket.read(buffer, null, completion((r, v) ->{
if(r < 0){
error.accept(new IOException("Closed"), null);
return;
}
if(buffer.hasRemaining()){
read(socket, buffer, next, error);
return;
}
buffer.flip();
next.accept(null);
}, error));
}
こうすることで,Server/Clientの両方でRead/Writeの処理を共通化しています.まぁ,こういう共通化ができるのはこの例だけですが・・・.
Server例
ということで,サーバの実装は凄く簡単になります.
server.accept(null, completion((s, v) -> {
ByteBuffer buffer = ByteBuffer.allocate(SIZE);
read(s, buffer, (vv) -> write(s, buffer, (vvv) -> System.out.println("SERVER: Done "+Arrays.toString(buffer.array())), error), error);
}, error));
accept処理が成功した場合はreadを行い,readの成功時にwriteを呼び出すようにConsumerをlambda式で定義しておきます.
writeの作成時はSystem.out.printlnして終わりです.
補足
AsynchronousSocket等の実行はThreadPoolで管理されています.スレッドプールをプログラム全体で管理して性能を上げたい場合などは下記のように指定することができます.
ExecutorService service = Executors.newFixedThreadPool(4);
AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(service);
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group);
当たり前ですがスレッド数はいくらでも増やせばいいというわけではなく,現実的にはコア数-2ぐらいが適正とどこかで聞きましたが評価したことはありません.適宜試してみてください.