Edited at

Rust Tokioでnetcatっぽいものを実装してみた

More than 1 year has passed since last update.

サンプルコードでEcho serverを実装してみたので、次はそれ用のクライアントを実装してみました。

クライアントはサーバに任意のデータを送れて、それに対するレスポンスが見られればいいので、ちょうどncコマンドのようになります。


要件

サーバに接続したら、以下の動作を行う。


  • 標準入力へ入力されたデータをサーバに送信する

  • サーバから受信したデータを標準出力へ出力する

  • 標準入力がEOFに達したらサーバへFINを送る

  • サーバからFINを受信したら終了する


実装

以下、モジュール名が省略されている場合は適宜補ってください。特にTcpStreamtokio_core::net::TcpStreamiotokio_io::iompscfutures::sync::mpscです。


問題点と解決方法


標準入出力はブロックする

特に入力を待ってブロックしてしまうと、その間にサーバからデータが着信したり、接続を切られたりしても対応できません。

解決方法の1つとして標準入出力をノンブロッキングにすることが考えられます。しかしそのような手段は標準ライブラリでは提供されていません。libc::fcntl()を使えば直接ファイルディスクリプタ0および1をノンブロッキングにできるかと思いますが、その状態でStdin, Stdoutが意図通りに動作するかは不明です。Stdin, StdoutAsRawFdを実装していないのも、ファイルディスクリプタを操作してほしくないからかもしれません。

今回は標準入力からの読み出しを別スレッドで行うことで対応します。標準出力への書き出しは通常通りに行います。


WriteHalf<TcpStream>はハーフクローズできない

要件の1つ「標準入力がEOFに達したらサーバへFINを送る」にはshutdown(SHUT_WR)を呼び出さなければなりません。AsyncWriteはそれらしいshutdown()を持っていますが、WriteHalf<TcpStream>の実装は何もしません。

なお、ハーフクローズでなく単なるクローズであっても、WriteHalf<TcpStream>, ReadHalf<TcpStream>の両方がドロップされたときになされるだけです。言い換えると、WriteHalf<TcpStream>だけの操作で接続を切ることは出来ません。

解決方法の1つは、TcpStream.shutdown()を使うことです。これはshutdown(2)を実施します。

しかしこれをやるには、TcpStream.split()を利用できません。なぜならTcpStreamの所有権が失われるからです。更にio::write_all()io::read()も使えなくなるので、自前で相当の、TcpStreamを借用するバージョンを書かなければなりません。

今回はファイルディスクリプタに対して直接libc::shutdown(libc::SHUT_WR)を呼び出すことにしました。1つ目の問題と逆のアプローチになりましたが、TcpStreamAsRawFdを実装しているので、こういうのもありかなと考えます。


コード


接続

fn main() {

let mut core = Core::new().unwrap();
let handle = core.handle();

let stream = TcpStream::connect(&"127.0.0.1:7".to_socket_addrs().unwrap().next().unwrap(), &handle);

これは特に不思議のない部分かと思います。


標準入力の待ち受け

    let nc = stream.and_then(|stream| {

let (tx, rx) = mpsc::channel::<String>(1);
thread::spawn(move || get_input(tx));

futuresでは別スレッドからStreamとしてデータを受け取ることが出来ます。それがmpsc::channel()です。Senderを別スレッドに渡し、Receiverから受け取ります。

get_input()は以下のとおりです。

fn get_input(mut writer: mpsc::Sender<String>) {

loop {
let mut buf = String::new();
let size = std::io::stdin().read_line(&mut buf).unwrap();
if 0 == size {
break;
}
match writer.send(buf).wait() {
Ok(t) => writer = t,
Err(e) => {
writeln!(std::io::stderr(), "failed to send input to main thread: error={}", e).unwrap();
break;
}
}
}
}

標準入力から読んではSenderに書き込む、の繰り返しです。読み出しの単位は、ターミナルからの入力を考え、行単位としています。終了条件としては、EOFの入力のほか、Senderへの書き込みエラーもあります。これが発生した場合はReceiverがドロップしているので、以降はやることがないということになります。


サーバへの送信

まず、これ及びサーバからの受信の際には「繰り返し送信」「繰り返し受信」という動作が必要になります。io::write_all()io::read()AsyncWrite, AsyncReadの所有権を奪ってしまうので、繰り返し動作がシンプルには書けません。

最初、これらioの関数が返すFutureは成功時、もとの引数のAsyncWriteまたはAsyncReadを渡してくるので、これを利用できないかと考えました。以下は繰り返し受信のコードです。


ダメだったコード

fn read_repeat(reader: ReadHalf<TcpStream>) -> BoxFuture<(), Error> {

tokio_io::io::read(reader, [0])
.and_then(|(reader, buf, sz)| {
if 0 == sz {
future::ok::<(), Error>(()).boxed()
} else {
std::io::stdout().write_all(&buf);
read_repeat(reader)
}
})
.boxed()
}

この関数が返すFutureは読み出しを試み、成功したら内容を書き出した後、同じことをするFutureを返します。これで繰り返しが表現できます。再帰ですが、再帰呼び出し自体はクロージャの中にあり、read_repeat()の実行中には実行されないので、問題ないかと思っていました。しかし、残念ながらスタックオーバーフローしてしまいました。

その後試行錯誤の末、繰り返し書き出すSinkと、繰り返し読み出すStreamを使うしかないという結論に至りました。

自分で実装するという手もありましたが、AsyncRead.framed()によりSink + Streamが得られるのでそれを使うことにしました。

このメソッドは、データ構造のペイロードへのエンコード・ペイロードからデータ構造へのデコードを行うEncoder + Decoderを引数に取ります1が、今回その機能は不要なので、何の変換もしないVoidCodec構造体で実装しました。

struct VoidCodec;

impl Encoder for VoidCodec {
type Item = String;
type Error = Error;

fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> {
dst.extend(item.as_bytes());
Ok(())
}
}

impl Decoder for VoidCodec {
type Item = BytesMut;
type Error = Error;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<BytesMut>, Error> {
Ok(if src.is_empty() {None} else {Some(src.take())})
}
}

見ての通りただコピーしているだけですが、Decoder.decode()でバッファが空のときだけNoneを返す必要があります。これは、たとえAsyncReadEOFを返しても、decode()Noneを返さなければ、Streamが終了しないからです。

では、サーバへデータを送信するコードに戻ります。

        let fd = stream.as_raw_fd();

let (writer, reader) = stream.framed(VoidCodec {}).split();

let send_out = writer.sink_map_err(|e| {
writeln!(std::io::stderr(), "failed to send input to server: error={}", e).unwrap();
})
.send_all(rx)
.map(move |_| {
let r;
let e;
unsafe {
r = libc::shutdown(fd, libc::SHUT_WR);
e = *libc::__errno_location();
}
if r < 0 {
writeln!(std::io::stderr(), "shutdown(2) failed: errno={}", e).unwrap();
}
});
handle.spawn(send_out);

framed()の返り値に対しsplit()を呼ぶことでSinkStreamに分割できます2

先程、標準入力読み出しスレッドからデータをStreamで読めるようにしました。これと書き出しのSinksend_all()で直結できるのですが、そのためにはSink::SinkErrorStream::Errorに互換性がなければいけません。Receiver::Error = ()なので、Sink.sink_map_err()によりSinkError = ()にしてから結合します。

Streamが底をつき書き出しが成功裡に完了したらshutdown(2)を呼び出します。errnoの取得がこれでいいのかは謎です。


サーバからの受信

        reader.for_each(|buf| {

if !buf.is_empty() {
std::io::stdout().write_all(&buf).unwrap();
}
Ok(())
})
});
core.run(nc).unwrap();
}

ごくシンプルです。標準出力はブロックするので、本来はスレッドを切るべきなのかもしれません。

細切れにしたためわかりにくいですが、ここまではTcpStreamNew.and_then()のクロージャであり、最後にStream.for_each()Futureを返したため、これが最終的に走るFutureとなります。つまりイベントループはEOFを読み出すまで回り続けます。


余談

一生懸命実装したら公式に実装例がありました。大枠で同じなので安心しました。大きな違いは、公式の例では、標準入力からのStreamとサーバからのStreamのどちらかが終了したらイベントループが終了するようになっていることです。これではこちらから接続を切るときに問題が発生しますが、サンプルなのでよいということでしょう。





  1. 公式サイトの例では構造体そのものを渡していますが、これはnightlyの機能なんでしょうか……構造体の名前だけ宣言すればいいだけでした 



  2. AsyncRead.split()とは読み出し(AsyncRead/Stream)、書き出し(AsyncWrite/Sink)の順序が逆になっているのが腹立たしい