はじめに
futures1 とは、rustにおいて非同期処理を実現するためのライブラリです。非常に強力ですが、結構難易度高いので、使い方のコツとかを書きます。
何が難しいのか
-
型の追い方
-
型推論を把握している必要がある
対象者
[required]
- Result列挙型が実務レベルで使える人。(combinator周りの知識は必要です)
[optional]
-
低レベルな非同期処理をrustで使いたい人。
-
futures crate触ってみたけど、型に押しつぶされて
圧死した挫折した人。
version
[dependencies]
futures = "0.1.17"
大幅な仕様変更は無いと思いますが、将来deprecatedになる可能性はゼロではないので、念の為。
準備
associated type
associated typeについては、https://doc.rust-lang.org/book/first-edition/associated-types.html#associated-types を参照して下さい。
使う側としては、associated typeを用いない場合だと、TraitSample<T1,T2>
とかけるものが、associated typeの導入により、TraitSample<Item1=T1, Item2=T2>
とかけるみたいなふうにざっくり捉えれば、オッケーです。
associated typeを用いることで、より複雑なgenerics型を定義できます。
Future, Streamとは
公式のdocsの冒頭の説明を読んでください
Future: https://docs.rs/futures/0.1.17/futures/future/trait.Future.html
Stream: https://docs.rs/futures/0.1.17/futures/stream/trait.Stream.html
後、https://tokio.rs/docs/getting-started/streams-and-sinks/ も参考に。
Result, IteratorとFuture, Streamの対比
ResultとFutureの対比
-
Result
は列挙型です。対して、Future
,Stream
はトレイト。 -
あるオブジェクトの型を
Result<T, E>
とします。T
が成功したときの型、E
が失敗したときの型であることに注意すると、主に使うメソッドはだいたいこんな感じになります。
メソッド | レシーバーの型 | 引数のシグネチャ | 戻り値型 | 備考 |
---|---|---|---|---|
map |
Result<T,E> |
FnOnce(T)->U |
Result<U,E> |
T からU への写像 |
map_err |
Result<T,E1> |
FnOnce(E1)->E2 |
Result<T,E2> |
E1 からE2 への写像 |
and_then |
Result<T,E> |
FnOnce(T)->Result<_U,E> |
Result<_U, E> |
T からU への写像(失敗可能性をResult<_U,E> で表現)。 |
型に関して_*
(上の表の場合だと,and_thenの_U
)は任意の型です。(_*
は本記事にのみ有効な独自記法とします。)
メソッド | レシーバーの型 | 引数のシグネチャ | 戻り値型 | 戻り値型のトレイト | 備考 |
---|---|---|---|---|---|
map |
Self :Future<T,E>
|
F :FnOnce(T)->U
|
Map<Self, F> |
Future<U,E> |
T からU への写像 |
map_err |
Self :Future<T,E1>
|
F :FnOnce(E1)->E2
|
MapErr<Self, E2> |
Future<T,E2> |
E1 からE2 への写像 |
and_then |
Self :Future<T,E>
|
F :FnOnce(T)->IntoFuture<_U, E>
|
AndThen<Self,IntoFuture<_U,E>,F> |
Future<T,U> |
Tから_U への写像(失敗可能性をIntoFuture<_U, E> で表現) |
then |
Self :Future<T,E1>
|
F :FnOnce(std::result::Result<T,E1>)->IntoFuture<_U,_E2>
|
Then<Self,IntoFuture<_U,_E2>,F> |
Future<_U,_E2> |
(T ,E1 )から(_U , _E2 )への写像(失敗可能性をResult<_U,_E2> で表現) |
ここで、IntoFuture
とはFuture
トレイトにconvertできるようなトレイトです。このような型を新たに導入している最大の理由は、std::result::Result<T,E>
型を取り扱いたいから2です。なので、ざっくり、Future
を拡張したトレイトと理解していれば大丈夫です。
また、同様に、型に関して_*
(上の表の場合だと,and_thenの_U
)[^arbitrary_type]は任意の型で、省略した場合は、型推論され、型一致を確認します。 この型推論の部分については、第二の難関なので、型推論の章で後述します。
上の表の備考から基本的には、Result
のように使えば大丈夫なんですが、Futureの場合は、戻り値型のトレイトが何なのかを把握することが非常に重要です。(戻り値型自体は表面上の型)
Futureトレイトのdocsからこの表をどのように作ったかを、Futureトレイトのmap, and_then, thenメソッドを例に解説していきたいです。(割りとハウツーみたいな感じで説明します。型システムとかの理論的なお話は抜きで)
map
メソッド | レシーバーの型 | 引数のシグネチャ | 戻り値型 | 戻り値型のトレイト | 備考 |
---|---|---|---|---|---|
map |
Self :Future<T,E>
|
F :FnOnce(T)->U
|
Map<Self, F> |
Future<U,E> |
T からU への写像 |
Future
トレイトの中でも一番わかり易いmap
メソッドから。
レシーバーの型をFuture<Item=T,Error=E>
とします。Self::Item=T
, Self::Error=E
であることに注意して下さい。
https://docs.rs/futures/0.1.17/futures/future/trait.Future.html#method.map
Map<Self, F>
はFuture
トレイトを実装しています:
https://docs.rs/futures/0.1.17/futures/future/struct.Map.html#impl-Future
-
引数のクロージャーのシグネチャは
FnOnce(Self::Item) -> U
ですが、Self::Item=T
なので、FnOnce(T)->U
です。 -
戻り値型は定義から
Map<Self, F>
です。 -
戻り値型のトレイトが一番重要です。なぜなら、この戻り値が次のメソッドのレシーバーになれるからです。(メソッドチェーンできる)
先ず、リンクのように、Map<Self, F>
はFuture
を実装しています。impl<U, A, F>
のU
は引数のクロージャーの戻り値型であることに注意すると、実装されたトレイトはFuture<Item=U, Error=Self::Error>
となります。ここで、Self::Error=E
なので、Future<Item=U, Error=E>
です。以上より、最終的には、Future<Item=U,Error=E>
です。
つまり、mapを使用することで、reciverの型がFuture<Item=T,Error=E>
からFuture<Item=U,Error=E>
に変化することを表しています。(T
からU
への写像)
and_then
メソッド | レシーバーの型 | 引数のシグネチャ | 戻り値型 | 戻り値型のトレイト | 備考 |
---|---|---|---|---|---|
and_then |
Self :Future<T,E>
|
F :FnOnce(T)->IntoFuture<_U, E>
|
AndThen<Self,IntoFuture<_U,E>,F> |
Future<T,U> |
Tから_U への写像(失敗可能性をIntoFuture<_U, E> で表現) |
mapと同様に見ていきましょう。レシーバーの型をFuture<Item=T,Error=E>
とします。Self::Item=T
, Self::Error=E
であることに注意して下さい。
https://docs.rs/futures/0.1.17/futures/future/trait.Future.html#method.and_then
AndThen<Self, B, F>
はFutureトレイトを実装しています(docsではSelf
がA
に置き換わっています):
https://docs.rs/futures/0.1.17/futures/future/struct.AndThen.html#impl-Future
-
引数のクロージャーのシグネチャは
FnOnce(Self::Item) -> B
(ただし、B: IntoFuture<Error=Self::Error>
)ですが、Self::Error=E
なので、FnOnce(T)->IntoFuture<Error=E>
です。
ここで、IntoFuture<Error=E>
はItem
が任意の型で、型を明示するか、省略時は前後の文脈から型推論した上で、型一致を確認します(型推論については後述)。型推論できなければコンパイルエラーです。本記事では、IntoFuture<_U, E>
と表現しているのでした。 -
戻り値型は定義から
AndThen<Self,IntoFuture<Error=Self::Error>,F>
です。 -
戻り値型のトレイトについて。
先ず、リンクのように、AndThen<Self, B, F>
はFuture<Item=B::Item, Error=B::Error>
を実装しています。ここで、B: IntoFuture<Error=Self::Error>
、B::Item
は任意(本記事では_U
と表現します)、Self::Error=E
であることに注意すると、最終的なトレイトは、Future<Item=_U, Error=E>
です。
つまり、and_then
を使用することで、receiver
の型がFuture<Item=T,Error=E>
からFuture<Item=_U, Error=E>
に変化したということを表しています。(T
から_U
への写像)。ただし、mapとは異なり、引数のクロージャーの戻り値型をIntoFuture<Error=E>
とすることで、失敗可能性を表現しています。
then
メソッド | レシーバーの型 | 引数のシグネチャ | 戻り値型 | 戻り値型のトレイト | 備考 |
---|---|---|---|---|---|
then |
Self :Future<T,E1>
|
F :FnOnce(std::result::Result<T,E1>)->IntoFuture<_U,_E2>
|
Then<Self,IntoFuture<_U,_E2>,F> |
Future<_U,_E2> |
(T ,E1 )から(_U , _E2 )への写像(失敗可能性をResult<_U,_E2> で表現) |
then
はResult
にはないメソッドですが、これは、Future<T,E1>
の終了後、成功失敗にかかわらず、必ず実行してほしいときに用いるメソッドです。おんなじようにやっていきます。レシーバーの型をFuture<Item=T,Error=E1>
とします。Self::Item=T
, Self::Error=E1
であることに注意して下さい。
https://docs.rs/futures/0.1.17/futures/future/trait.Future.html#method.then
Then<Self, B, F>
はFutureトレイトを実装しています(docsではSelf
がA
に置き換わっています):
https://docs.rs/futures/0.1.17/futures/future/struct.Then.html#impl-Future
-
引数のクロージャーのシグネチャは
FnOnce(std::result::Result<Self::Item, Self::Error>)-> B
(ただし、B: IntoFuture
)ですが、Self::Item=T
,Self::Error=E1
なので、FnOnce(Result<T,E1>)->IntoFuture
です。
ここで、IntoFuture
はItem
&Error
の両方が任意の型です。型を明示するか、省略時は前後の文脈から型推論されます。型推論できなければコンパイルエラーです。本記事では、IntoFuture<_U, _E2>
と表現しているのでした。(_U, _E2
)は型を明示するか、型推論されて、型が決定される) -
戻り値型は定義から
Then<Self,IntoFuture<_U,_E2>,F>
です。 -
戻り値型のトレイトについて。
先ず、2つめのリンクのように、Map<Self, F>
はFuture
を実装しています。B: IntoFuture
であることに注意すると、トレイトはFuture<Item=B::Item, Error=B::Error>
となります。ここで、B: IntoFuture
なので、最終的な型は、Future<Item=_U, Error=_E2>
となります。
本記事では、Future<_U, _E2>
と表現しているのでした。
つまり、thenを使用することで、receiverの型がFuture<Item=T,Error=E1>
からFuture<Item=_U, Error=_E2>
に変化したということを表しています。((T
,E1
)から(_U
, _E2
への写像)
(and_then
との違いは、and_then
はError typeが引き継がれることです。対して、then
はError時にも実行されるので、Error typeは引き継がれません)
入門にはふさわしくないですが、Futureについて補足。
- Futureトレイトの
wait
メソッドは処理が終わるまでブロッキングするので、実用では使わないです。非同期処理をさせたい場合は、代わりにfutures_cpupool::CpuPool::spawn
とかtokio_core::reactor::Handle::spawn
で処理することが多いです。前者は、事前に用意されたスレッドプールの中で、非同期処理が走ります。後者はScheduledTask に登録されて、mio::event::Event単位で所要の非同期処理が走ります^mio
-
Future
トレイトのpoll
メソッドについては、futuresを使う側はあんまり意識しなくて良いメソッドです。Future
トレイトを実装したいような構造体に対して、poll
メソッドを実装しておくことで、Future
トレイトのmap
だったり、and_then
だったり、その他のソッドを新規に実装せずに済みます。
IteratorとStreamの対比
Stream
はIterator<Item=T>
と対比できます:
まずはIterator<Item=T>
から3:
メソッド | レシーバーの型 | 引数のシグネチャ | 戻り値型 | 戻り値型のトレイト | 備考 |
---|---|---|---|---|---|
map |
Self :Iterator<T>
|
FnMut(T)->B |
Map<Self,F> |
Iterator<B> |
T からB への写像 |
for_each |
Self :Iterator<T>
|
FnMut(T)->() |
- | - |
T 達を評価する |
続いて、Streamトレイト。同様にStream<Item=T,Error=E>
をStream<T,E>
と略記します:
メソッド | レシーバーの型 | 引数のシグネチャ | 戻り値型 | 戻り値型のトレイト | 備考 |
---|---|---|---|---|---|
map |
Self :Stream<T,E>
|
F :FnMut(T)->U
|
Map<Self,F> |
Stream<U,E> |
TからUへの写像 |
for_each |
Self :Stream<T,E>
|
F :FnMut(T)->IntoFuture<(), E>
|
ForEach<Self,F,IntoFuture<(),E>> |
Future<(),E> |
Stream からFuture への変換(評価されないことに注意) |
and_then |
Self :Stream<T,E>
|
F :FnMut(T)->IntoFuture<_U,E>
|
AndThen<Self,F,IntoFuture<_U,E>> |
Stream<_U,E> |
Tから_U への写像(IntoFuture<_U,E> で失敗可能性を表現) |
then |
Self :Stream<T,E1>
|
F :FnMut(Result<T,E1>)->IntoFuture<_U, _E2>
|
Then<Self,F,_U> |
Stream<_U, _E2> |
(T,E1) から(_U,_E2) への写像(IntoFuture<_U,_E2> で失敗可能性を表現) |
注意すべき点としては、for_each
はIterator
の場合だと、それぞれの要素が評価されますが、Stream<Item=T,Error=E>
の場合だと、Futureが返されます。つまり依然として、非同期処理として保持されていて、ブロッキングされるわけではありません。(この点wait
メソッドと異なります。wait
は処理が終わるまでブロッキングし、最後にResult
型を返します。)
なお、Iteratorでよく使うcollectメソッドについては、非同期処理の文脈ではwaitメソッドに相当します。しかしながら、ブロッキング処理に相当するため、waitメソッドはあまり用いません。4
Streamの型も前節のFutureとおんなじような感じで見ればいいですが、一応、and_then
メソッドだけ確認しておきましょう。
and_then
メソッド | レシーバーの型 | 引数のシグネチャ | 戻り値型 | 戻り値型のトレイト | 備考 |
---|---|---|---|---|---|
and_then |
Self :Stream<T,E>
|
F :FnMut(T)->IntoFuture<_U,E>
|
AndThen<Self,F,IntoFuture<_U,E>> |
Stream<_U,E> |
T から_U への写像(IntoFuture<_U,E> で失敗可能性を表現) |
レシーバーの型をStream<Item=T,Error=E>
とします。Self::Item=T
, Self::Error=E
であることに注意して下さい。
https://docs.rs/futures/0.1.17/futures/stream/trait.Stream.html#method.and_then
AndThen<Self, F, U>
はStream
トレイトを実装しています(docsではSelf
がS
に置き換わっています):
https://docs.rs/futures/0.1.17/futures/stream/struct.AndThen.html#impl-Stream
-
引数のクロージャーのシグネチャは
F
:FnMut(Self::Item) -> B
(ただし、B: IntoFuture<Error=Self::Error>
ですが、Self::Error=E
なので、FnMut(T)->IntoFuture<Error=E>
です。
ここで、IntoFuture<Error=E>
はItem
が任意の型で、型を明示するか、省略時は前後の文脈から型推論されます。型推論できなければコンパイルエラーです。本記事では、IntoFuture<_U, E>
と表現しているのでした。 -
戻り値型は定義から
AndThen<Self,F,IntoFuture<Error=Self::Error>>
です。 -
戻り値型のトレイトについて。
先ず、リンクのように、AndThen<Self, F, B>
はStream
を実装しています。B: IntoFuture<Error=Self::Error>
であることに注意すると、トレイトはFuture<Item=_U, Error=Self::Error>
となります。ここで、Self::Error=E
なので、最終的な型は、Future<Item=_U, Error=E>
です。
つまり、and_then
を使用することで、receiverの型がFuture<Item=T,Error=E>
からFuture<Item=_U, Error=E>
に変化したということを表しています。(T
から_U
への写像)。ただし、mapとは異なり、引数のクロージャーの戻り値型をIntoFuture<Error=E>
とすることで、失敗可能性を表現しています。
Note)
使い方的には、Stream
トレイトをmap
メソッドやand_then
メソッドで処理していき、最後にfor_each
メソッドを作用させ、Future
型に変換し、Futureを引数に取る関数(tokio
だとtokio_core::reactor::Core::run
とか)に突っ込みます。Streamが無限の長さ(e.g:TcpListener::bind(&addr, &handle).unwrap().incoming()
とか)。こうして変換されたFutureは、waitメソッドとかで処理してやると、無限時間実行されます(エラーとならない限り)。(すぐ次の章でも例を通じて言及します)
型推論
第二の難関、型推論ですが、これは例をやったほうがわかりやすいと思うので、やっていきます。
最も簡易なノンブロッキングのエコーサーバーを考えます:
[dependencies]
futures = "0.1.17"
tokio-core = "0.1"
tokio-io = "0.1"
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
// non-blocking compared with std::net::TcpListener
use tokio_core::net::TcpListener;
// reactor is a common synonym for event loop
use tokio_core::reactor::Core;
use futures::{Future, Stream};
use tokio_io::AsyncRead;
fn main() {
let addr = "127.0.0.1:8081".parse().unwrap();
let mut core = Core::new().unwrap();
// handler for event loop
let handle = core.handle();
let listener =TcpListener::bind(&addr, &handle).unwrap();
let server = listener.incoming()
.for_each(move |(socket, _addr)| {
let (reader, writer) = socket.split();
let msg = tokio_io::io::copy(reader, writer)
.then(|_| Ok(()));
// spawn the future as a concurrent task. This method Executes concurrently on the event loop thread
handle.spawn(msg);
Ok(())
});
core.run(server).unwrap();
}
Core::new().unwrap();
でevent loopのインスタンスを立てて、core.run(server)
で起動します。https://tokio-rs.github.io/tokio-core/tokio_core/reactor/struct.Core.html#method.run にもある通り、runメソッドの引数にはFutureトレイトを実装している型のオブジェクトを突っ込みます。
TcpListenerは標準のstd::net::TcpListener
を使用していませんが、標準のライブラリはブロッキング処理なので、ノンブロッキングに対応したtokio_core::net::TcpListener;
を用いています(tokio_core
はmio
というノンブロッキングに対応したライブラリを内部で使用しています)
結論から言うと、for_each
とthen
でそれぞれ型推論されています。(Ok(())
の部分で型が省略されている)。
では、server
変数の処理について、一行づつ見ていきましょう。
let server = listener.incoming()
.for_each(move |(socket, _addr)| {
let (reader, writer) = socket.split();
let msg = tokio_io::io::copy(reader, writer)
.then(|_| Ok(()));
// spawn the future as a concurrent task. This method Executes concurrently on the event loop thread
handle.spawn(msg);
Ok(())
});
let server = listener.incoming()
listener
はtokio_core::net::TcpListener
型です。incoming
メソッドは、表面上はIncoming
型なのですが、これは、Stream<Item=(TcpStream, SocketAddr), Error=std::io::Error>
を実装しています。なので、本記事で言うT=(TcpStream, SocketAddr)
, E=std::io::Error
となります。(TcpStream
自体はFuture, Streamを実装しているわけではありません。)
https://tokio-rs.github.io/tokio-core/tokio_core/net/struct.Incoming.html#impl-Stream
.for_each(move |(socket, _addr)| {
後にhandle
変数を用いるので、moveしています。(handle
とはイベントループのハンドラーです)
for_eachに入れる引数のシグネチャはF:FnMut(T)->IntoFuture<Item=(), Error=E>
なのでした。
注意すべき点としては、戻り値型がItem=()
となるようなクロージャーを引数に入れる必要があるということです。(先走りますが、最後にOk(())
としているのはそのためです。)
T=(TcpStream, SocketAddr)
に注意すれば、socket
はtokio_core::net::TcpStream
型、_addr
はstd::net::SocketAddr
型です。。(_addr
はクロージャー内で使用しないので、_
を先頭につけています)
let (reader, writer) = socket.split();
split関数の戻り値型自体は、(ReadHalf<TcpStream>, WriteHalf<TcpStream>)
です。次のcopy関数に突っ込むためのコードです。
let msg = tokio_io::io::copy(reader, writer);
さて、このコードで、Future
トレイトを実装する型を作成しています。copy関数の戻り値型は、Copy<ReadHalf<TcpStream>, WriteHalf<TcpStream>>
ですが、この型はFuture
トレイトを実装しています。(Future<Item=(u64, ReadHalf<TcpStream>, WriteHalf<TcpStream>), Error=std::io::Error>
)です。
https://docs.rs/tokio-io/0.1.4/tokio_io/io/struct.Copy.html#impl-Future
-
.then(|_| Ok(()));
,
ここの部分で、やっと型推論の話に入ります。
次の行のhandle.spawn
の引数は、以下のようにFuture<Item = (), Error = ()>
を要求しています。
https://tokio-rs.github.io/tokio-core/tokio_core/reactor/struct.Handle.html#method.spawn
Future<Item=(u64, ReadHalf<TcpStream>, WriteHalf<TcpStream>), Error=io::Error>
からIntoFuture<Item=(), Error=_>
へと変換しているわけです。(Ok(())
はstd::result::Result<(), _>
型ですが、これはIntoFutureを実装している)
さて、すでにhandle.spawn
の引数はError = ()
を要求しています。一方、Ok(())
をクロージャーの戻り値とすると、Error typeは任意です。この時、型推論が働き、IntoFuture<Item=(), Error=()>
と型が確定し、型一致します。
Note) 今回のケースではthen
の代わりにand_then
は使用できません。なぜなら、and_thenはErrorの型を引き継ぐからです。(つまり、出口でError = ()
を要求されているのに、and_then
でError=io::Error
をそのまま引き継いでしまうので、型不一致でコンパイルエラーとなります。
handle.spawn(msg);
msgをイベントループのハンドラーに登録しています5。spawn
の引数は前述の通り、Future<Item = (), Error = ()>
を要求するので、msg
の型と一致します。戻り値はないです。
Ok(())
最後の行です。for_eachに入れる引数のシグネチャはF:FnMut(T)->IntoFuture<Item=(), Error=std::io::Error>
です。一方、Ok(())
とすると、IntoFuture<Item=(), Error=_>
(Errorは任意の型)なので型一致します。(ここも型推論が働いている)
こんな感じで型が推移していきます。結構型を追うの大変なので、型推論が働くところは結構コンパイラーに助けてもらうことが多いかもです。
Note) 普通のWebアプリを制作する際は、こんなプリミティブな実装をする必要はなく、WebフレームワークだったらIron
, rocket
とか使います。Pythonでいう、requestsライブラリみたいなものをお探しの方は、reqwest
crateを覗きましょう。
そこら辺興味ある方は、http://www.arewewebyet.org/ に一通り書いてあるので、目を通すと良いかもしれません。今回の例は、futures
を扱うためのサンプルコードということで。
Note) その他、こういった低水準の実装に興味があれば、https://github.com/tokio-rs/tokio-core/tree/master/tests とか https://github.com/tokio-rs/tokio-line にいろいろあります。
まとめ
-
型の追うときには、そのメソッドの戻り値型がどのトレイトを実装しているのか(FutureトレイトかStreamトレイトか, ItemとErrorのtypeは何か?)を把握しました。
-
Future, Streamのときに型が省略されたときの型推論をざっくり紹介しました。
最後に
futures
の使い方について解説しました。頑張って解説したは良いものの、やっぱり複雑で難しい。
ですが、一度使い方さえ身に着けていれば、抽象的かつ、保守性の高いコードを構築できると思います。実務的には、本記事のようなことを常に考えているわけではなく、いくつか自分の中でスニペットを持っていて、応用する感じで使っています(英作文でいう「英借文」みたいな感じでしょうか)。
Sink
については、説明できなかった... (Sink
トレイトはSinkはデータを「沈める」先6です。詳しくはコメントに書きました。)
本記事がきっかけで、futuresを使用できるようになれば幸いです
-
future自体については、増補改訂版 Java言語で学ぶデザインパターン入門の第9章を読めば良いでしょう。 ↩ ↩2
-
https://tokio.rs/docs/going-deeper-futures/futures-mechanics/#intofuture ↩
-
Iterator<Item=T>
をIterator<T>
と略記しています。 ↩ -
Streamには
collect
メソッドもありますが、これは、StreamからFutureに変換するメソッドです。なので、std::iter::Iteratorで使うcollectとちょっとニュアンスが違う) ↩ -
詳しいことは、https://tokio.rs/docs/getting-started/reactor/#handle を見ましょう。 ↩
-
https://tokio.rs/docs/getting-started/streams-and-sinks/#sinks を参照。 ↩