Posted at

Rust Tokioを動かしてみた

More than 1 year has passed since last update.


Tokioとは

https://tokio.rs/

Rustでイベント駆動のネットワークI/Oを扱うフレームワークといったところでしょうか。


動かす


サンプルコード

公式トップページにあるものですが、一応転記しておきます。TCP Echoサーバです。

以下のコードが公式トップページに書いてあるものと違うなら、この記事はobsoleteです。


main.rs

// A tiny async echo server with tokio-core

extern crate futures;
extern crate tokio_core;
extern crate tokio_io;

use futures::{Future, Stream};
use tokio_io::{io, AsyncRead};
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;

fn main() {
// Create the event loop that will drive this server
let mut core = Core::new().unwrap();
let handle = core.handle();

// Bind the server's socket
let addr = "127.0.0.1:12345".parse().unwrap();
let tcp = TcpListener::bind(&addr, &handle).unwrap();

// Iterate incoming connections
let server = tcp.incoming().for_each(|(tcp, _)| {
// Split up the read and write halves
let (reader, writer) = tcp.split();

// Future of the copy
let bytes_copied = io::copy(reader, writer);

// ... after which we'll print what happened
let handle_conn = bytes_copied.map(|(n, _, _)| {
println!("wrote {} bytes", n)
}).map_err(|err| {
println!("IO error {:?}", err)
});

// Spawn the future as a concurrent task
handle.spawn(handle_conn);

Ok(())
});

// Spin up the server on the event loop
core.run(server).unwrap();
}



プロジェクト設定

Cargo.tomlに以下の設定を追加します。


Cargo.toml

[dependencies]

futures = "0.1"
tokio-io = "0.1"
tokio-core = "0.1"


実行

cargo runすると127.0.0.1の12345/tcpでlistenします。

別ターミナルからnc等で接続するとEchoサービスが開始されます。接続を切ると、サーバを実行しているターミナルに、送出したデータサイズが表示されます。

もちろん、同時に複数接続することができ、並列にサービスすることが出来ます。


読解

内側から読んでみます。

        // Future of the copy

let bytes_copied = io::copy(reader, writer);

tokio_io::io::copy()関数は第1引数のAsyncReadから第2引数のAsyncWriteへデータをコピーします。これは読み出しがEOFに当たり、全てのデータが書き出されると完了します。

readerは接続しているソケットの読み出し、writerは書き込みなので、これがEchoサービスの実装となっています。

返り値はCopy<R, W>でこれはFutureトレイトを実装しています。Futureパターン自体はTokioに特有のものというわけではありません。簡単に説明すると、非同期処理を「もし完了したらこうなっている」という観点で表現したものです。

        // ... after which we'll print what happened

let handle_conn = bytes_copied.map(|(n, _, _)| {
println!("wrote {} bytes", n)
}).map_err(|err| {
println!("IO error {:?}", err)
});

Future.map()メソッドはMapを返します。MapもまたFutureトレイトを実装しているので、Futureの型を変換していると言えます。

引数のクロージャについて説明する前に、Futureトレイトについてもう少し知る必要があります。

FutureトレイトはItemErrorという2つの関連型を持っていて、これはそれぞれ、非同期処理が成功したときの値と、失敗したときのエラーを表す型です。

map()メソッドの引数のクロージャは、処理が成功したときに実行されます。そのためその引数の型はItemです。また、map()メソッドの返すMapItemはクロージャの返り値を表す型になります。これはクロージャが実行されるかどうかとは無関係です。

Copy<R, W>は特にItem = (u64, R, W)です。第1要素は読んだ(=書いた)データサイズを表しています。

以上から、map()メソッドの意味するところは次の2点です。


  • 非同期処理が成功したら読んだ(=書いた)データサイズを表示する


  • Item = ()であるFutureを返す

Future.map_err()メソッドはmap()メソッドの逆で、非同期処理が失敗したときの処理を受け取り、Errorの型を変更したFutureを返します。なお、Copy<R, W>は特にError = std::io::error::Errorです。つまり、以下のような働きになります。


  • 非同期処理が失敗したらそのエラーを表示する


  • Error = ()Futureを返す

最終的にhandle_connItem = (), Error = ()Futureとなります。これはとても重要です。

        // Spawn the future as a concurrent task

handle.spawn(handle_conn);

spawn()メソッドは受け取ったFuture非同期処理をイベントループ上で実行します。この時重要なのがItem = (), Error = ()であることを要求している点です。つまり、処理のハンドリングをちゃんと定義しないと走らせてくれないということです。

    let server = tcp.incoming().for_each(|(tcp, _)| {

// ...

Ok(())
});

for_each()メソッドによって、確立された接続に対して何をするかを定義します。何をするかは今まで見てきたとおりですが、クロージャはOk(())を返しています。これはクロージャの返り値がIntoFuture<Item=(), Error=Self::Error>を実装している必要があるためで、Result<T, E>は実装しています。更にItem=()なのでOk(0)などを返してはいけません。for_each()ForEachを返し、これもまたFutureです。

    // Spin up the server on the event loop

core.run(server).unwrap();

イベントループ上でFutureを実行します。handle.spawn()と違いItem = (), Error = ()である必要はありません。なぜならrun()の返り値こそがFutureの結果を表すからです。

つまり、メインの非同期処理はlisten portに対して接続を検知してdispatchすることであり、dispatch先で生じる非同期処理については、dispatch側で責任を持つということになります。