JavaでのノンブロッキングIO(NIO)を用いたTCP通信のサンプルです.
Java7以降ではNIO2が導入されたのでNIOを用いた実装はほとんど見ないのですが,AndroidではNIO2が導入されていないのでノンブロッキング実装をする場合はNIOを利用するしかありません.
いろんなライブラリがあるので実際にTCP通信を実装する人は少ないのかも知れませんが,いざ実装しようとすると意外とちゃんとしたサンプルが落ちていないのでまとめておきます.
ソースはGitHubを参照してください.
クライアントから受け取ったバイト列をそのままクライアントに返すプログラムになります.
以下,解説します.
サーバ側
まずはサーバ側から.
基本的な手順は
1.サーバソケットの作成
2.セレクタを利用したイベントの取得
3.Accept処理(ソケットの作成)
4.ソケット毎のRead/Write処理
という流れになります.
サーバソケットの作成
Selector selector = Selector.open();
ServerSocketChannel serverCh = ServerSocketChannel.open();
serverCh.configureBlocking(false);
serverCh.setOption(StandardSocketOptions.SO_REUSEADDR, true);
serverCh.bind(new InetSocketAddress("localhost", 12345));
serverCh.register(selector, SelectionKey.OP_ACCEPT);
注意点としては3行目のconfigureBlocking(false)を呼ぶこと,4行目オプションの設定方法です.
configureBlocking(false)を呼ばないとブロッキング処理になります.
4行目のオプションの設定方法は第1引数にオプション名を指定し,第2引数は第1引数のGenericsに応じた型のオブジェクトを指定します.
SO_REUSEADDRはBoolean型が指定されているのでtrue/falseを指定します.
例えばSO_SNDBUFはInteger型が指定されているのでint値を入れることになります.
5行目でローカルのアドレスにバインドします.
ポートがどこでも良い場合は0を指定します.
その場合,ServerSocketChannel.socket()メソッドからServerSocketオブジェクトを取得し,更にgetLocalPort()メソッドを呼び出してポート番号を取得する必要があります.(なぜgetLocalPort()メソッドがServerSocketChannelに用意されていないのか・・・)
最後にセレクタに登録します.
OP_ACCEPTのキーを指定してセレクタに登録することで,ServerSocketChannelのacceptメソッドが呼び出し可能になった場合にセレクタから通知されます.
セレクタを利用したイベントの取得
セレクタを利用してNIOを利用するために,セレクタから通知のあったチャネルを取り出す必要があります.
while(true){
selector.select();
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while(keys.hasNext()){
SelectionKey key = keys.next();
keys.remove();
...
}
}
2行目のselector.select()でクライアントから接続されるまで待機します.
待機させずに別の処理を指せる場合はselectNow()メソッド,タイムアウト値を設定する場合はselect(timeout)を呼び出してください.
(JavaでTCPのタイムアウト処理を行うにはこのSelectorを使う方法がスマートですが,また別途書きます)
Selector#select()メソッドは返り値としてイベント通知のあったチャネルの数を返しますが,今回は使っていません.
クライアントから接続されて2行目のブロッキング処理を抜けると,3行目でイテレータを取得します.
セレクタには複数のチャネルが登録されている可能性があるため,イベント通知のあったチャネルがイテレータとして提供されます.
このとき,SelectionKeyのオブジェクトのイテレータとして提供されますが,SelectionKeyの中にチャネルへの参照が保持されています.
ちなみに,Selector.selectedKeys()はSetオブジェクトを返すので,Java 5から導入されているforeach構文を使いたくなりますが(というかJava 1.4時代を知らない人の方が多そうですが・・・),
for(SelectionKey key: selector.selectedKeys()){
...
}
といった形で実装すると正常に動作しません.
6行目のSelectionKey#remove()メソッドを呼びださなければSelectionKeyが削除されず,次のSelector#select()メソッドでも再度通知されてしまって常に通知され続けます.
ということで,上記のようなIteratorを使った実装を行ってください.
Accept処理
if(key.isAcceptable()){
SocketChannel ch = serverCh.accept();
ch.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(SIZE);
ch.register(selector, SelectionKey.OP_READ, buffer);
continue;
}
ServerSocketChannel#accept()メソッドを呼び出して,クライアントと通信するためのSocketChannelを取得します.
まず,SelectionKey#isAcceptable()を使用して取得したチャネルがAccept可能かどうかを確認してください.
その後,acceptメソッドを呼び出すのですが,今回の例ではServerSocketChannelが1つしかないので直接呼び出しています.
複数ある場合はSelectionKey#channel()を呼び出して,Selectorから指定されているチャネルを取得してください.
その後,SocketChannel#configureBlocking()メソッドを呼び出してノンブロッキングを指定し,セレクタに登録します.
セレクタへの登録は任意のオブジェクトを添付できるので,この例では送受信に使用するByteBufferを第3引数に指定して添付しています.
実際の利用ではここにクライアント毎に必要な情報をクラスにまとめてオブジェクトとして添付したりします.
セレクタへの登録はOP_READを指定して,読み取り可能になった場合にセレクタから通知されるようにします.
接続されたらすぐに書きたい場合はOP_WRITEを,ReadもWriteも両方行う場合はorの演算子を使用して
ch.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, buffer);
と指定してください.
このようにacceptされたチャネルも同じセレクタに登録することで,同一スレッド内でサーバソケットと複数のクライアントソケットを捌くことができます.
ソケット毎のRead/Write処理
if(key.isReadable()){
SocketChannel ch = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
int readByte = ch.read(buffer);
if(readByte < 0){
try {
ch.close();
} catch(IOException e){
}
break loop;
}
if(!buffer.hasRemaining()){
buffer.flip();
ch.register(selector, SelectionKey.OP_WRITE, buffer);
}
}
if(key.isWritable()){
SocketChannel ch = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
int writeByte = ch.write(buffer);
if(!buffer.hasRemaining()){
buffer.clear();
ch.register(selector, SelectionKey.OP_READ, buffer);
}
}
最後にRead/Writeの処理ですが,Accept同様にSelectionKey#isReadable()/isWriteable()を呼び出して読み書きが可能かどうか確認し,それぞれの処理を行います.
Readが可能であった場合,SelectionKey#channel()から該当チャネルを取得し,SocketChannelへキャストします.
使用するByteBufferはSelectionKey#attachment()から取得します.
後はSocketChannel#read()メソッドを使って読み込みます.返り値には読み込まれたバイト数が格納されます.
仮にクライアント側のソケットがCloseされていても,例外は発生しません.-1が返り値となるので,そこで判断してください.
クライアントから受け取ったバイト列がバッファ一杯になると,readの作業を終了し,writeの処理に移るため,セレクタへ再度登録を行います.
OP_WRITEを指定して登録することで,書込みが可能になるとセレクタから通知されます.
基本的に書込みはほとんどの場合で可能なので,書き込むデータが無いのにOP_WRITEの状態で登録したままにすると,セレクタから頻繁に通知されることになります.
クライアントとの間でRead/Writeを常時行うような場合であっても,常にOP_READ | OP_WRITEとして登録するのではなく,書き込むデータがある場合の時だけOP_WRITEを登録するようにしてください.
Writeが可能であった場合はReadの場合と同様にチャネルとByteBufferを取得し,Writeしてください.
ByteBuffer内の全てのバイト列が書き込まれるとは限らないので,ByteBufferに残りがある場合は再度セレクタからの通知を待って書き込みます.
書込みが全て終わるとバッファをクリアして,再度OP_READで登録します.
クライアント側
クライアント側の動作は下記の通りとなります.
1.ソケットの作成
2.セレクタを利用したイベントの取得
3.Connect処理
4.Read/Write処理
基本的にはサーバ側と似ているのですが,3.のConnect処理が少し違います.
ソケットの作成
Selector selector = Selector.open();
SocketChannel socketCh = SocketChannel.open();
socketCh.configureBlocking(false);
socketCh.connect(new InetSocketAddress("localhost", PORT));
socketCh.register(selector, SelectionKey.OP_CONNECT);
まずSocketChannelを作成します.ServerSocketChannelとほとんど同じです.
違う点はconnectメソッドを呼び出して,サーバ側に接続を指示することと,OP_CONNECTを指定してセレクタに登録する点です.
ノンブロッキング処理を行うので,SocketChannel#connect()を呼び出してもTCPのスリーウェイハンドシェイクが終わって接続が確立するまで待機するようなことはありません.また,指定したアドレスに誤りがあった場合やサーバのポートが空いていないような場合でも例外が返ることはありません.
それらは全てセレクタからOP_CONNECTのイベントとして通知される形で受け取ることになります.
Connect処理
if(key.isConnectable()){
if(!socketCh.finishConnect()){
throw new IOException("Connect Fail");
}
socketCh.register(selector, SelectionKey.OP_WRITE);
}
接続処理の完了したチャネルはSelectionKey#isConnectable()で判別します.
該当するチャネルのSocketChannel#finishConnect()を呼び出すことで,接続が成功したか失敗したかを取得できます.
成功した場合はtrue,失敗した場合は例外がスローされます.
(基本的にセレクタから指示があって呼び出した場合,falseが返ることはありません)
無事に接続が成功した場合,OP_WRITEかOP_READで登録してRead/Write処理を行います.
まとめ
以上でNIOの簡単なTCP通信サンプルの説明は終わりです.
完全なソースはGitHubにもありますが,下記に全文を載せておきます.
package jp.shinido.qiita.simple_nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Random;
public class Main {
public static final int SIZE = 20;
public static final int PORT = 12345;
public static void main(String[] args) throws IOException{
Thread th = new Thread(()->{
try {
server();
} catch(IOException e){
e.printStackTrace();
}
});
th.start();
client();
}
public static void server() throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverCh = ServerSocketChannel.open();
serverCh.configureBlocking(false);
serverCh.setOption(StandardSocketOptions.SO_REUSEADDR, true);
serverCh.bind(new InetSocketAddress("localhost", PORT));
serverCh.register(selector, SelectionKey.OP_ACCEPT);
loop:while(true){
selector.select(); // blocking
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while(keys.hasNext()){
SelectionKey key = keys.next();
keys.remove(); // DON'T FORGET!
if(key.isAcceptable()){
// new connection
System.out.println("SERVER: New Connection");
SocketChannel ch = serverCh.accept();
ch.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(SIZE);
ch.register(selector, SelectionKey.OP_READ, buffer);
continue;
}
if(key.isReadable()){
SocketChannel ch = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
int readByte = ch.read(buffer);
System.out.println("SERVER: Read "+readByte);
if(readByte < 0){
System.out.println("SERVER: Closed");
try {
ch.close();
} catch(IOException e){
}
break loop;
}
if(!buffer.hasRemaining()){
buffer.flip();
ch.register(selector, SelectionKey.OP_WRITE, buffer);
}
}
if(key.isWritable()){
SocketChannel ch = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
int writeByte = ch.write(buffer);
System.out.println("SERVER: Write "+writeByte);
if(!buffer.hasRemaining()){
buffer.clear();
ch.register(selector, SelectionKey.OP_READ, buffer);
}
}
}
}
selector.close();
}
public static void client() throws IOException {
Selector selector = Selector.open();
SocketChannel socketCh = SocketChannel.open();
socketCh.configureBlocking(false);
socketCh.connect(new InetSocketAddress("localhost", PORT));
socketCh.register(selector, SelectionKey.OP_CONNECT);
ByteBuffer buffer = ByteBuffer.allocate(SIZE);
Random rand = new Random();
while(buffer.hasRemaining()){
buffer.put((byte) (rand.nextInt() % 128));
}
buffer.flip();
loop:while(true){
selector.select();
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while(keys.hasNext()){
SelectionKey key = keys.next();
keys.remove();
if(key.isConnectable()){
if(!socketCh.finishConnect()){
throw new IOException("Connect Fail");
}
System.out.println("CLIENT: Connected");
socketCh.register(selector, SelectionKey.OP_WRITE);
}
if(key.isWritable()){
socketCh.write(buffer);
if(!buffer.hasRemaining()){
System.out.println("CLIENT: Write "+ Arrays.toString(buffer.array()));
ByteBuffer readBuf = ByteBuffer.allocate(SIZE);
socketCh.register(selector, SelectionKey.OP_READ, readBuf);
}
}
if(key.isReadable()){
ByteBuffer readBuf = (ByteBuffer) key.attachment();
socketCh.read(readBuf);
if(!readBuf.hasRemaining()){
System.out.println("CLIENT: Read "+ Arrays.toString(readBuf.array()));
}
socketCh.close();
break loop;
}
}
}
selector.close();
System.out.println("CLIENT: Done");
}
}
クライアントの数を増やしてもサーバ側はシングルスレッドで処理することができます.
実際にこのまま使うわけでは無いと思うので,適宜クラス・メソッド化して実装してください.