57
42

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

RustAdvent Calendar 2017

Day 22

できる!futures入門

Last updated at Posted at 2017-12-22

はじめに

futures1 とは、rustにおいて非同期処理を実現するためのライブラリです。非常に強力ですが、結構難易度高いので、使い方のコツとかを書きます。

何が難しいのか

  • 型の追い方

  • 型推論を把握している必要がある

対象者

[required]

  • Result列挙型が実務レベルで使える人。(combinator周りの知識は必要です)

[optional]

  • 低レベルな非同期処理をrustで使いたい人。

  • futures crate触ってみたけど、型に押しつぶされて圧死した挫折した人。

version

[dependencies]
futures = "0.1.17"

大幅な仕様変更は無いと思いますが、将来deprecatedになる可能性はゼロではないので、念の為。

準備

associated type

associated typeについては、https://doc.rust-lang.org/book/first-edition/associated-types.html#associated-types を参照して下さい。
使う側としては、associated typeを用いない場合だと、TraitSample<T1,T2>とかけるものが、associated typeの導入により、TraitSample<Item1=T1, Item2=T2>とかけるみたいなふうにざっくり捉えれば、オッケーです。
associated typeを用いることで、より複雑なgenerics型を定義できます。

Future, Streamとは

公式のdocsの冒頭の説明を読んでください:smiley:

Future: https://docs.rs/futures/0.1.17/futures/future/trait.Future.html

Stream: https://docs.rs/futures/0.1.17/futures/stream/trait.Stream.html

後、https://tokio.rs/docs/getting-started/streams-and-sinks/ も参考に。

Result, IteratorとFuture, Streamの対比

ResultとFutureの対比

  • Resultは列挙型です。対して、Future, Streamはトレイト。

  • あるオブジェクトの型をResult<T, E>とします。Tが成功したときの型、Eが失敗したときの型であることに注意すると、主に使うメソッドはだいたいこんな感じになります。

メソッド レシーバーの型 引数のシグネチャ 戻り値型 備考
map Result<T,E> FnOnce(T)->U Result<U,E> TからUへの写像
map_err Result<T,E1> FnOnce(E1)->E2 Result<T,E2> E1からE2への写像
and_then Result<T,E> FnOnce(T)->Result<_U,E> Result<_U, E> TからUへの写像(失敗可能性をResult<_U,E>で表現)。

型に関して_*(上の表の場合だと,and_thenの_U)は任意の型です。(_*は本記事にのみ有効な独自記法とします。)

一方、Futureの主なメソッドはこんな感じ。1

メソッド レシーバーの型 引数のシグネチャ 戻り値型 戻り値型のトレイト 備考
map Self:Future<T,E> F:FnOnce(T)->U Map<Self, F> Future<U,E> TからUへの写像
map_err Self:Future<T,E1> F:FnOnce(E1)->E2 MapErr<Self, E2> Future<T,E2> E1からE2への写像
and_then Self:Future<T,E> F:FnOnce(T)->IntoFuture<_U, E> AndThen<Self,IntoFuture<_U,E>,F> Future<T,U> Tから_Uへの写像(失敗可能性をIntoFuture<_U, E>で表現)
then Self:Future<T,E1> F:FnOnce(std::result::Result<T,E1>)->IntoFuture<_U,_E2> Then<Self,IntoFuture<_U,_E2>,F> Future<_U,_E2> (T,E1)から(_U, _E2)への写像(失敗可能性をResult<_U,_E2>で表現)

ここで、IntoFutureとはFutureトレイトにconvertできるようなトレイトです。このような型を新たに導入している最大の理由は、std::result::Result<T,E>型を取り扱いたいから2です。なので、ざっくり、Futureを拡張したトレイトと理解していれば大丈夫です。
また、同様に、型に関して_*(上の表の場合だと,and_thenの_U)[^arbitrary_type]は任意の型で、省略した場合は、型推論され、型一致を確認します。 この型推論の部分については、第二の難関なので、型推論の章で後述します。

上の表の備考から基本的には、Resultのように使えば大丈夫なんですが、Futureの場合は、戻り値型のトレイトが何なのかを把握することが非常に重要です。(戻り値型自体は表面上の型)
Futureトレイトのdocsからこの表をどのように作ったかを、Futureトレイトのmap, and_then, thenメソッドを例に解説していきたいです。(割りとハウツーみたいな感じで説明します。型システムとかの理論的なお話は抜きで)

map

メソッド レシーバーの型 引数のシグネチャ 戻り値型 戻り値型のトレイト 備考
map Self:Future<T,E> F:FnOnce(T)->U Map<Self, F> Future<U,E> TからUへの写像

Futureトレイトの中でも一番わかり易いmapメソッドから。
レシーバーの型をFuture<Item=T,Error=E>とします。Self::Item=T, Self::Error=Eであることに注意して下さい。

https://docs.rs/futures/0.1.17/futures/future/trait.Future.html#method.map
スクリーンショット 2017-12-21 14.57.47 1.png

Map<Self, F>Futureトレイトを実装しています:

https://docs.rs/futures/0.1.17/futures/future/struct.Map.html#impl-Future
スクリーンショット 2017-12-21 14.59.15.png

  • 引数のクロージャーのシグネチャはFnOnce(Self::Item) -> Uですが、Self::Item=Tなので、FnOnce(T)->Uです。

  • 戻り値型は定義からMap<Self, F>です。

  • 戻り値型のトレイトが一番重要です。なぜなら、この戻り値が次のメソッドのレシーバーになれるからです。(メソッドチェーンできる)
    先ず、リンクのように、Map<Self, F>Futureを実装しています。impl<U, A, F>Uは引数のクロージャーの戻り値型であることに注意すると、実装されたトレイトはFuture<Item=U, Error=Self::Error>となります。ここで、Self::Error=Eなので、Future<Item=U, Error=E>です。以上より、最終的には、Future<Item=U,Error=E>です。

つまり、mapを使用することで、reciverの型がFuture<Item=T,Error=E>からFuture<Item=U,Error=E>に変化することを表しています。(TからUへの写像)

and_then

メソッド レシーバーの型 引数のシグネチャ 戻り値型 戻り値型のトレイト 備考
and_then Self:Future<T,E> F:FnOnce(T)->IntoFuture<_U, E> AndThen<Self,IntoFuture<_U,E>,F> Future<T,U> Tから_Uへの写像(失敗可能性をIntoFuture<_U, E>で表現)

mapと同様に見ていきましょう。レシーバーの型をFuture<Item=T,Error=E>とします。Self::Item=T, Self::Error=Eであることに注意して下さい。

https://docs.rs/futures/0.1.17/futures/future/trait.Future.html#method.and_then
スクリーンショット 2017-12-21 15.11.59 1.png

AndThen<Self, B, F>はFutureトレイトを実装しています(docsではSelfAに置き換わっています):

https://docs.rs/futures/0.1.17/futures/future/struct.AndThen.html#impl-Future
スクリーンショット 2017-12-21 15.12.36.png

  • 引数のクロージャーのシグネチャはFnOnce(Self::Item) -> B(ただし、B: IntoFuture<Error=Self::Error>)ですが、Self::Error=Eなので、FnOnce(T)->IntoFuture<Error=E>です。
    ここで、IntoFuture<Error=E>Itemが任意の型で、型を明示するか、省略時は前後の文脈から型推論した上で、型一致を確認します(型推論については後述)。型推論できなければコンパイルエラーです。本記事では、IntoFuture<_U, E>と表現しているのでした。

  • 戻り値型は定義からAndThen<Self,IntoFuture<Error=Self::Error>,F>です。

  • 戻り値型のトレイトについて。
    先ず、リンクのように、AndThen<Self, B, F>Future<Item=B::Item, Error=B::Error>を実装しています。ここで、B: IntoFuture<Error=Self::Error>B::Itemは任意(本記事では_Uと表現します)、Self::Error=Eであることに注意すると、最終的なトレイトは、Future<Item=_U, Error=E>です。

つまり、and_thenを使用することで、receiverの型がFuture<Item=T,Error=E>からFuture<Item=_U, Error=E>に変化したということを表しています。(Tから_Uへの写像)。ただし、mapとは異なり、引数のクロージャーの戻り値型をIntoFuture<Error=E>とすることで、失敗可能性を表現しています。

then

メソッド レシーバーの型 引数のシグネチャ 戻り値型 戻り値型のトレイト 備考
then Self:Future<T,E1> F:FnOnce(std::result::Result<T,E1>)->IntoFuture<_U,_E2> Then<Self,IntoFuture<_U,_E2>,F> Future<_U,_E2> (T,E1)から(_U, _E2)への写像(失敗可能性をResult<_U,_E2>で表現)

thenResultにはないメソッドですが、これは、Future<T,E1>の終了後、成功失敗にかかわらず、必ず実行してほしいときに用いるメソッドです。おんなじようにやっていきます。レシーバーの型をFuture<Item=T,Error=E1>とします。Self::Item=T, Self::Error=E1であることに注意して下さい。

https://docs.rs/futures/0.1.17/futures/future/trait.Future.html#method.then
スクリーンショット 2017-12-21 15.31.39.png

Then<Self, B, F>はFutureトレイトを実装しています(docsではSelfAに置き換わっています):

https://docs.rs/futures/0.1.17/futures/future/struct.Then.html#impl-Future
スクリーンショット 2017-12-21 15.32.25.png

  • 引数のクロージャーのシグネチャはFnOnce(std::result::Result<Self::Item, Self::Error>)-> B(ただし、B: IntoFuture)ですが、Self::Item=T, Self::Error=E1なので、FnOnce(Result<T,E1>)->IntoFutureです。
    ここで、IntoFutureItem&Errorの両方が任意の型です。型を明示するか、省略時は前後の文脈から型推論されます。型推論できなければコンパイルエラーです。本記事では、IntoFuture<_U, _E2>と表現しているのでした。(_U, _E2)は型を明示するか、型推論されて、型が決定される)

  • 戻り値型は定義からThen<Self,IntoFuture<_U,_E2>,F>です。

  • 戻り値型のトレイトについて。
    先ず、2つめのリンクのように、Map<Self, F>Futureを実装しています。B: IntoFutureであることに注意すると、トレイトはFuture<Item=B::Item, Error=B::Error>となります。ここで、B: IntoFutureなので、最終的な型は、Future<Item=_U, Error=_E2>となります。
    本記事では、Future<_U, _E2>と表現しているのでした。

つまり、thenを使用することで、receiverの型がFuture<Item=T,Error=E1>からFuture<Item=_U, Error=_E2>に変化したということを表しています。((T,E1)から(_U, _E2への写像)
(and_thenとの違いは、and_thenはError typeが引き継がれることです。対して、thenはError時にも実行されるので、Error typeは引き継がれません)


入門にはふさわしくないですが、Futureについて補足。

  • Futureトレイトのwaitメソッドは処理が終わるまでブロッキングするので、実用では使わないです。非同期処理をさせたい場合は、代わりにfutures_cpupool::CpuPool::spawnとかtokio_core::reactor::Handle::spawnで処理することが多いです。前者は、事前に用意されたスレッドプールの中で、非同期処理が走ります。後者はScheduledTask に登録されて、mio::event::Event単位で所要の非同期処理が走ります^mio
  • Futureトレイトのpollメソッドについては、futuresを使う側はあんまり意識しなくて良いメソッドです。Futureトレイトを実装したいような構造体に対して、pollメソッドを実装しておくことで、Futureトレイトのmapだったり、and_thenだったり、その他のソッドを新規に実装せずに済みます。

IteratorとStreamの対比

StreamIterator<Item=T>と対比できます:

まずはIterator<Item=T>から3

メソッド レシーバーの型 引数のシグネチャ 戻り値型 戻り値型のトレイト 備考
map Self:Iterator<T> FnMut(T)->B Map<Self,F> Iterator<B> TからBへの写像
for_each Self:Iterator<T> FnMut(T)->() - - T達を評価する

続いて、Streamトレイト。同様にStream<Item=T,Error=E>Stream<T,E>と略記します:

メソッド レシーバーの型 引数のシグネチャ 戻り値型 戻り値型のトレイト 備考
map Self:Stream<T,E> F:FnMut(T)->U Map<Self,F> Stream<U,E> TからUへの写像
for_each Self:Stream<T,E> F:FnMut(T)->IntoFuture<(), E> ForEach<Self,F,IntoFuture<(),E>> Future<(),E> StreamからFutureへの変換(評価されないことに注意)
and_then Self:Stream<T,E> F:FnMut(T)->IntoFuture<_U,E> AndThen<Self,F,IntoFuture<_U,E>> Stream<_U,E> Tから_Uへの写像(IntoFuture<_U,E>で失敗可能性を表現)
then Self:Stream<T,E1> F:FnMut(Result<T,E1>)->IntoFuture<_U, _E2> Then<Self,F,_U> Stream<_U, _E2> (T,E1)から(_U,_E2)への写像(IntoFuture<_U,_E2>で失敗可能性を表現)

注意すべき点としては、for_eachIteratorの場合だと、それぞれの要素が評価されますが、Stream<Item=T,Error=E>の場合だと、Futureが返されます。つまり依然として、非同期処理として保持されていて、ブロッキングされるわけではありません。(この点waitメソッドと異なります。waitは処理が終わるまでブロッキングし、最後にResult型を返します。)

なお、Iteratorでよく使うcollectメソッドについては、非同期処理の文脈ではwaitメソッドに相当します。しかしながら、ブロッキング処理に相当するため、waitメソッドはあまり用いません。4

Streamの型も前節のFutureとおんなじような感じで見ればいいですが、一応、and_thenメソッドだけ確認しておきましょう。

and_then

メソッド レシーバーの型 引数のシグネチャ 戻り値型 戻り値型のトレイト 備考
and_then Self:Stream<T,E> F:FnMut(T)->IntoFuture<_U,E> AndThen<Self,F,IntoFuture<_U,E>> Stream<_U,E> Tから_Uへの写像(IntoFuture<_U,E>で失敗可能性を表現)

レシーバーの型をStream<Item=T,Error=E>とします。Self::Item=T, Self::Error=Eであることに注意して下さい。

https://docs.rs/futures/0.1.17/futures/stream/trait.Stream.html#method.and_then
スクリーンショット 2017-12-21 17.04.14.png

AndThen<Self, F, U>Streamトレイトを実装しています(docsではSelfSに置き換わっています):

https://docs.rs/futures/0.1.17/futures/stream/struct.AndThen.html#impl-Stream
スクリーンショット 2017-12-21 17.05.47.png

  • 引数のクロージャーのシグネチャはF:FnMut(Self::Item) -> B(ただし、B: IntoFuture<Error=Self::Error>ですが、Self::Error=Eなので、FnMut(T)->IntoFuture<Error=E>です。
    ここで、IntoFuture<Error=E>Itemが任意の型で、型を明示するか、省略時は前後の文脈から型推論されます。型推論できなければコンパイルエラーです。本記事では、IntoFuture<_U, E>と表現しているのでした。

  • 戻り値型は定義からAndThen<Self,F,IntoFuture<Error=Self::Error>>です。

  • 戻り値型のトレイトについて。
    先ず、リンクのように、AndThen<Self, F, B>Streamを実装しています。B: IntoFuture<Error=Self::Error>であることに注意すると、トレイトはFuture<Item=_U, Error=Self::Error>となります。ここで、Self::Error=Eなので、最終的な型は、Future<Item=_U, Error=E>です。

つまり、and_thenを使用することで、receiverの型がFuture<Item=T,Error=E>からFuture<Item=_U, Error=E>に変化したということを表しています。(Tから_Uへの写像)。ただし、mapとは異なり、引数のクロージャーの戻り値型をIntoFuture<Error=E>とすることで、失敗可能性を表現しています。


Note)
使い方的には、Streamトレイトをmapメソッドやand_thenメソッドで処理していき、最後にfor_eachメソッドを作用させ、Future型に変換し、Futureを引数に取る関数(tokioだとtokio_core::reactor::Core::runとか)に突っ込みます。Streamが無限の長さ(e.g:TcpListener::bind(&addr, &handle).unwrap().incoming()とか)。こうして変換されたFutureは、waitメソッドとかで処理してやると、無限時間実行されます(エラーとならない限り)。(すぐ次の章でも例を通じて言及します)

型推論

第二の難関、型推論ですが、これは例をやったほうがわかりやすいと思うので、やっていきます。

最も簡易なノンブロッキングのエコーサーバーを考えます:

cargo.toml
[dependencies]
futures = "0.1.17"
tokio-core = "0.1"
tokio-io = "0.1"
main.rs
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;

// non-blocking compared with std::net::TcpListener
use tokio_core::net::TcpListener;
// reactor is a common synonym for event loop
use tokio_core::reactor::Core;
use futures::{Future, Stream};

use tokio_io::AsyncRead;


fn main() {
    let addr = "127.0.0.1:8081".parse().unwrap();
    let mut core = Core::new().unwrap();

    // handler for event loop
    let handle = core.handle();

    let listener =TcpListener::bind(&addr, &handle).unwrap();

    let server = listener.incoming()
        .for_each(move |(socket, _addr)| {
            let (reader, writer) = socket.split();

            let msg = tokio_io::io::copy(reader, writer)
                .then(|_| Ok(()));

            // spawn the future as a concurrent task. This method Executes concurrently on the event loop thread
            handle.spawn(msg);
            Ok(())
    });
    core.run(server).unwrap();
}

Core::new().unwrap();でevent loopのインスタンスを立てて、core.run(server)で起動します。https://tokio-rs.github.io/tokio-core/tokio_core/reactor/struct.Core.html#method.run にもある通り、runメソッドの引数にはFutureトレイトを実装している型のオブジェクトを突っ込みます。

TcpListenerは標準のstd::net::TcpListenerを使用していませんが、標準のライブラリはブロッキング処理なので、ノンブロッキングに対応したtokio_core::net::TcpListener;を用いています(tokio_coremioというノンブロッキングに対応したライブラリを内部で使用しています)

結論から言うと、for_eachthenでそれぞれ型推論されています。(Ok(())の部分で型が省略されている)。

では、server変数の処理について、一行づつ見ていきましょう。

    let server = listener.incoming()
        .for_each(move |(socket, _addr)| {
            let (reader, writer) = socket.split();

            let msg = tokio_io::io::copy(reader, writer)
                .then(|_| Ok(()));

            // spawn the future as a concurrent task. This method Executes concurrently on the event loop thread
            handle.spawn(msg);
            Ok(())
    });

  • let server = listener.incoming()

listenertokio_core::net::TcpListener型です。incomingメソッドは、表面上はIncoming型なのですが、これは、Stream<Item=(TcpStream, SocketAddr), Error=std::io::Error>を実装しています。なので、本記事で言うT=(TcpStream, SocketAddr), E=std::io::Errorとなります。(TcpStream自体はFuture, Streamを実装しているわけではありません。)

https://tokio-rs.github.io/tokio-core/tokio_core/net/struct.Incoming.html#impl-Stream
スクリーンショット 2017-12-22 8.47.32.png


  • .for_each(move |(socket, _addr)| {

後にhandle変数を用いるので、moveしています。(handleとはイベントループのハンドラーです)
for_eachに入れる引数のシグネチャはF:FnMut(T)->IntoFuture<Item=(), Error=E>なのでした。
注意すべき点としては、戻り値型がItem=()となるようなクロージャーを引数に入れる必要があるということです。(先走りますが、最後にOk(())としているのはそのためです。)

T=(TcpStream, SocketAddr)に注意すれば、sockettokio_core::net::TcpStream型、_addrstd::net::SocketAddr型です。。(_addrはクロージャー内で使用しないので、_を先頭につけています)


  • let (reader, writer) = socket.split();

split関数の戻り値型自体は、(ReadHalf<TcpStream>, WriteHalf<TcpStream>)です。次のcopy関数に突っ込むためのコードです。


  • let msg = tokio_io::io::copy(reader, writer);

さて、このコードで、Futureトレイトを実装する型を作成しています。copy関数の戻り値型は、Copy<ReadHalf<TcpStream>, WriteHalf<TcpStream>>ですが、この型はFutureトレイトを実装しています。(Future<Item=(u64, ReadHalf<TcpStream>, WriteHalf<TcpStream>), Error=std::io::Error>)です。

https://docs.rs/tokio-io/0.1.4/tokio_io/io/struct.Copy.html#impl-Future
スクリーンショット 2017-12-22 9.02.17.png


  • .then(|_| Ok(()));,

ここの部分で、やっと型推論の話に入ります。
次の行のhandle.spawnの引数は、以下のようにFuture<Item = (), Error = ()>を要求しています。

https://tokio-rs.github.io/tokio-core/tokio_core/reactor/struct.Handle.html#method.spawn
スクリーンショット 2017-12-22 9.23.50.png

Future<Item=(u64, ReadHalf<TcpStream>, WriteHalf<TcpStream>), Error=io::Error>からIntoFuture<Item=(), Error=_>へと変換しているわけです。(Ok(())std::result::Result<(), _>型ですが、これはIntoFutureを実装している)

さて、すでにhandle.spawnの引数はError = ()を要求しています。一方、Ok(())をクロージャーの戻り値とすると、Error typeは任意です。この時、型推論が働き、IntoFuture<Item=(), Error=()>と型が確定し、型一致します。

Note) 今回のケースではthenの代わりにand_thenは使用できません。なぜなら、and_thenはErrorの型を引き継ぐからです。(つまり、出口でError = ()を要求されているのに、and_thenError=io::Errorをそのまま引き継いでしまうので、型不一致でコンパイルエラーとなります。


  • handle.spawn(msg);

msgをイベントループのハンドラーに登録しています5spawnの引数は前述の通り、Future<Item = (), Error = ()>を要求するので、msgの型と一致します。戻り値はないです。


  • Ok(())

最後の行です。for_eachに入れる引数のシグネチャはF:FnMut(T)->IntoFuture<Item=(), Error=std::io::Error>です。一方、Ok(())とすると、IntoFuture<Item=(), Error=_>(Errorは任意の型)なので型一致します。(ここも型推論が働いている)


こんな感じで型が推移していきます。結構型を追うの大変なので、型推論が働くところは結構コンパイラーに助けてもらうことが多いかもです。

Note) 普通のWebアプリを制作する際は、こんなプリミティブな実装をする必要はなく、WebフレームワークだったらIron, rocketとか使います。Pythonでいう、requestsライブラリみたいなものをお探しの方は、reqwestcrateを覗きましょう。
そこら辺興味ある方は、http://www.arewewebyet.org/ に一通り書いてあるので、目を通すと良いかもしれません。今回の例は、futuresを扱うためのサンプルコードということで。

Note) その他、こういった低水準の実装に興味があれば、https://github.com/tokio-rs/tokio-core/tree/master/tests とか https://github.com/tokio-rs/tokio-line にいろいろあります。

まとめ

  • 型の追うときには、そのメソッドの戻り値型がどのトレイトを実装しているのか(FutureトレイトかStreamトレイトか, ItemとErrorのtypeは何か?)を把握しました。

  • Future, Streamのときに型が省略されたときの型推論をざっくり紹介しました。

最後に

futuresの使い方について解説しました。頑張って解説したは良いものの、やっぱり複雑で難しい。
ですが、一度使い方さえ身に着けていれば、抽象的かつ、保守性の高いコードを構築できると思います。実務的には、本記事のようなことを常に考えているわけではなく、いくつか自分の中でスニペットを持っていて、応用する感じで使っています(英作文でいう「英借文」みたいな感じでしょうか)。

Sinkについては、説明できなかった... (SinkトレイトはSinkはデータを「沈める」先6です。詳しくはコメントに書きました。)

本記事がきっかけで、futuresを使用できるようになれば幸いです:timer:

  1. future自体については、増補改訂版 Java言語で学ぶデザインパターン入門の第9章を読めば良いでしょう。 2

  2. https://tokio.rs/docs/going-deeper-futures/futures-mechanics/#intofuture

  3. Iterator<Item=T>Iterator<T>と略記しています。

  4. Streamにはcollectメソッドもありますが、これは、StreamからFutureに変換するメソッドです。なので、std::iter::Iteratorで使うcollectとちょっとニュアンスが違う)

  5. 詳しいことは、https://tokio.rs/docs/getting-started/reactor/#handle を見ましょう。

  6. https://tokio.rs/docs/getting-started/streams-and-sinks/#sinks を参照。

57
42
3

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
57
42

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?