Dart で非同期処理を記述する (Future と Stream)

More than 5 years have passed since last update.

Dart での非同期処理は大きく二つのパターンに分けられる.

一つ目は目的のイベントが一度きりしか行われない場合で, 例えば HTTP リクエストを送信してレスポンスの受信を全て完了したタイミングでイベントが発行される場合などがこちらに当てはまる.

一方, 目的のイベントが複数回発行されることを期待する場合も存在し, こちらは HTTP のレスポンスを受信の完了を待たずに逐次処理していきたい場合などが当てはまる.

これらの二種類の非同期処理を記述するために, Dart にはそれぞれ FutureStream というインタフェースが用意されている.


Future を実装する

Future の基本的な使い方は次のような感じで, Future インスタンスの then() メソッドにより結果を取得するためのハンドラを登録する.

Future<Result> futureResult = getFuture();

futureResult.then((Result result) {
print(result.toString());
});

Future を実装するには Completer を利用するのが便利だ.

次のプログラムは ServerSocket クラスの bind() メソッドをラップして, EchoServer クラスに bind() メソッドを実装している. Completer クラスの future プロパティから Future のインスタンスを取得することができ, complete メソッドに渡されたパラメータが, Future クラスの then() メソッドで登録したハンドラに渡される.

class EchoServer extends Stream<String> {

...
static Future<EchoServer> bind(
var address,
int port,
{ int backlog: 0,
bool v6Only: false }) {
var completer = new Completer();
var futureServer = ServerSocket.bind(
address,
port,
backlog: backlog,
v6Only: v6Only);
futureServer.then((server) {
completer.complete(new EchoServer._internal(server));
});
return completer.future;
}
...

Refs: Using Futures in Dart for Better Async Code


Stream を実装する

Stream の基本的な使い方は, then() メソッドの代わりに listen() メソッドを利用してハンドラを登録する.

Stream<Result> streamResult = getStream();

streamResult.listen((Result result) {
print(result.toString());
});

Stream を実装するには StreamController クラスを利用すると良い. StreamController クラスの stream プロパティから Stream のインスタンスを取得することができ, add() メソッドを通じて listen() で登録されたハンドラを起動することができる.

次のプログラムは ServerSocketlisten() をラップして, 単純な ehco サーバを実装している.

class EchoServer extends Stream<String> {

ServerSocket _server;
StreamController<String> _controller;
StreamSubscription<Socket> _subscription;

...

EchoServer._internal(ServerSocket server) :
_server = server {
_controller = new StreamController<String>(
onListen: _onListen,
onPause: _onPause,
onResume: _onResume,
onCancel: _onCancel);
}

StreamSubscription<String> listen(
void onData(String line),
{ void onError(Error error),
void onDone(),
bool cancelOnError }) {
return _controller.stream.listen(
onData,
onError: onError,
onDone: onDone,
onDone: onDone,
cancelOnError: cancelOnError);
}

void _onListen() {
_subscription = _server.listen(
_onData,
onError: _controller.addError,
onDone: _onDone);
}

void _onCancel() {
_subscription.cancel();
_subscription = null;
}

void _onPause() {
_subscription.pause();
}

void _onResume() {
_subscription.resume();
}

void _onData(Socket client) {
client.transform(UTF8.decoder).listen((data) {
_controller.add(data);
client.write(data);
});
}

void _onDone() {
_controller.close();
}
}

Refs: Creating Streams in Dart


サンプルプログラム

動作するサンプルプログラムは次のリンクから取得できる.