はじめに
dartのstream、初めてみたときは「なんだコレ」と思いました。パッと見難しげで、ちょっと避けていました。ただ、最近避けて通れなくなった出来事もあり、その際にだいぶ理解できたので、なんだかよくわからない人向けに解説します。
streamとは
英単語自体を直訳すると、流れ となります。訳通り、dartのstreamはある場所からある場所へ値を流す機能です。
以下のイメージ画像を見てください。
このように、異なる場所にあるAとBで、hogeという値を流すことができます。AとかBは、なんでもいいんです。メソッド間だろうが、クラス間だろうが、メソッドとクラスの間だろうが、何でもよくてとにかく、値を、ある場所からある場所にテレポートさせる、そんなイメージです。このテレポートの橋をかけるのがstreamです。
このように、ある場所から複数の場所に、同じ値を配布するようなこともできるし、
テレポート中に、値を書き換えることもできます。なんとなく、streamの凄さが伝わったでしょうか。
もっとも簡単なstreamの例
これは、他の記事とかでもよく目にする、一番簡単な例です。
import 'dart:async';
main() {
final _controller = StreamController<String>();
// 値を流す側
_controller.sink.add('hoge');
_controller.sink.add('fuga');
// 値を受け取る側
_controller.stream.listen((val) {
print(val);
});
}
上記のように StreamController
を利用することで、値の受け渡しができます。
実行結果は、
$ dart sample1.dart
hoge
fuga
sample1だと、main関数の中で全てが完結しているので、あまりメリットがよくわからないかもしれません。
以下のような、例を見てみましょう。
import 'dart:async';
class Sample2 {
final _controller = StreamController<String>();
Sample2() {
a();
b();
}
void a() {
_controller.sink.add('hoge');
_controller.sink.add('fuga');
}
void b() {
_controller.stream.listen((val) {
print(val);
});
}
}
main() {
Sample2();
}
実行結果は、sample1と同じです。この例だと、値が関数間をテレポートしているような感じがしませんか?
streamを返す関数を定義する
さて、streamの基本を理解したところで、一歩階段を登りましょう。streamを返す関数を作ってみます。以下の例を見てください。
import 'dart:async';
main() {
final numbers = [1, 2, 3, 4, 5];
toSquared(numbers).listen((val) {
print(val);
});
}
Stream<int> toSquared(List<int> numbers) {
final _controller = StreamController<int>();
numbers.forEach((n) {
_controller.sink.add(n * n);
});
return _controller.stream;
}
配列の中身を全て2乗してprintする例です。実行結果は以下の通り。
$ dart sample3.dart
1
4
9
16
25
「forEachの中でprintしろよ」って言われたらそれまでなんですが、ちょっとstreamのお勉強ということで許してください。
上記のように、streamを返す関数は定義できるわけです。そこまで理解は難しくないと思います。
ただ、dartでは、sample3をもっと短く記述することができます。それが、async* と yieldです。
ここまでに示したsampleを理解していない中で、async*とかyieldを目にするから、「streamワカンナイ」ってなるわけです。
async* と yield
sample3と同じ挙動をするsample4を、async*とyieldで作ってみます。
import 'dart:async';
main() {
final numbers = [1, 2, 3, 4, 5];
toSquared(numbers).listen((val) {
print(val);
});
}
Stream<int> toSquared(List<int> numbers) async* {
for (int n in numbers) {
yield n * n;
}
}
どうですか、めちゃくちゃコードが短くなったでしょう。それと同時に、streamの面影がどこかへ消えています。もちろん、実行結果はsample3と同じです。
async*をつけると、関数の戻り値がStreamになります
。似たものにasyncがありますが、asyncは戻り値をFutureにする
ものです。この2つは似て非なるものなので、しっかり区別しましょう。
yieldは、async*関数内でのみ使える、通常の関数におけるreturnの代わり
です。やっていることは、StreamController.sink.add()
と同じです。
値の受け取り方
ここまで示したsampleでは、全てlistenを用いて値を受け取っています。dartには、listen以外の受け取り方が存在します。
await for を使う
先程のsample4を、await forを使って書き換えてみましょう。
import 'dart:async';
main() async {
final numbers = [1, 2, 3, 4, 5];
await for (int n in toSquared(numbers)) {
print(n);
}
}
Stream<int> toSquared(List<int> numbers) async* {
for (int n in numbers) {
yield n * n;
}
}
実行結果はsample4と同じです。一見、listenと何が違うんだ?という感じですが、この2つには、streamの終了を待つか待たないか、という大きな違いがあります。sample4と5を少し書き換えて、確認してみましょう。
import 'dart:async';
main() {
final numbers = [1, 2, 3, 4, 5];
toSquared(numbers).listen((val) {
print(val);
});
print('hoge');
}
Stream<int> toSquared(List<int> numbers) async* {
for (int n in numbers) {
yield n * n;
}
}
import 'dart:async';
main() async {
final numbers = [1, 2, 3, 4, 5];
await for (int n in toSquared(numbers)) {
print(n);
}
print('hoge');
}
Stream<int> toSquared(List<int> numbers) async* {
for (int n in numbers) {
yield n * n;
}
}
双方にprint('hoge')
を追加しました。これで実行結果を確認してみます。
$ dart sample4.dart
hoge // 先に出力
1
4
9
16
25
$ dart sample5.dart
1
4
9
16
25
hoge // 後に出力
見事に、hogeの出力位置が異なりますね。つまり、await forはstreamの終了を待ってほしいときに使います。逆にいえば、streamの値の受け取りと同時に他の処理を進めたい場合は、listenを使うべき、ということになります。
複数の場所で値を受け取る
以下は、関数aとbでstreamの値を受け取る例です。
import 'dart:async';
final _controller = StreamController<String>.broadcast(); // ポイント: broadcastで定義する
main() async {
a();
b();
while(true) {
_controller.sink.add('hoge');
await Future.delayed(Duration(seconds: 1));
}
}
void a() {
_controller.stream.listen((val) {
print('a: ${val}');
});
}
void b() {
_controller.stream.listen((val) {
print('b: ${val}');
});
}
複数の場所でlistenをする場合、StreamController.broadcast()
を使う必要があります。上記の実行結果は、
$ dart sample6.dart
a: hoge
b: hoge
// 1秒sleep
a: hoge
b: hoge
// 1秒sleep
a: hoge
b: hoge
^C
このようになります。複数の場所でlistenできていることがわかりますね。
受け取る際に値を加工する
先程のsample6少し書き換えて、値を受け取る際に加工してみましょう。
import 'dart:async';
final _controller = StreamController<String>.broadcast();
main() async {
a();
b();
while(true) {
_controller.sink.add('hoge');
await Future.delayed(Duration(seconds: 1));
}
}
void a() {
_controller.stream.transform(transform('だべや')).listen((val) {
print('a: ${val}');
});
}
void b() {
_controller.stream.transform(transform('やねん')).listen((val) {
print('b: ${val}');
});
}
StreamTransformer<String, String> transform(String endWord) {
return StreamTransformer<String, String>.fromHandlers(
handleData: (value, sink) {
sink.add('${value}${endWord}');
}
);
}
値の加工をするには、listenの前にtransformをはさみます。記述の仕方については、上記を参考にしてみてください。実行結果は以下です。
% dart sample7.dart
a: hogeだべや
b: hogeやねん
// 1秒sleep
a: hogeだべや
b: hogeやねん
// 1秒sleep
a: hogeだべや
b: hogeやねん
^C
おわりに
一度理解すればそこまで難しくは無いと思います。streamは、dartで非同期を実行する上でとても便利な仕組みなので、この記事で少しでも理解が深まれば嬉しいです。