110
54

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

dart の stream を理解して async* と yield を正しく使う

Last updated at Posted at 2020-05-07

はじめに

dartのstream、初めてみたときは「なんだコレ」と思いました。パッと見難しげで、ちょっと避けていました。ただ、最近避けて通れなくなった出来事もあり、その際にだいぶ理解できたので、なんだかよくわからない人向けに解説します。

streamとは

英単語自体を直訳すると、流れ となります。訳通り、dartのstreamはある場所からある場所へ値を流す機能です。
以下のイメージ画像を見てください。

空白の図.png

このように、異なる場所にあるAとBで、hogeという値を流すことができます。AとかBは、なんでもいいんです。メソッド間だろうが、クラス間だろうが、メソッドとクラスの間だろうが、何でもよくてとにかく、値を、ある場所からある場所にテレポートさせる、そんなイメージです。このテレポートの橋をかけるのがstreamです。

また、
空白の図 (1).png

このように、ある場所から複数の場所に、同じ値を配布するようなこともできるし、

空白の図 (3).png

テレポート中に、値を書き換えることもできます。なんとなく、streamの凄さが伝わったでしょうか。

もっとも簡単なstreamの例

これは、他の記事とかでもよく目にする、一番簡単な例です。

sample1.dart
import 'dart:async';

main() {
  final _controller = StreamController<String>();

  // 値を流す側
  _controller.sink.add('hoge');
  _controller.sink.add('fuga');

  // 値を受け取る側
  _controller.stream.listen((val) {
    print(val);
  });
}

上記のように StreamControllerを利用することで、値の受け渡しができます。
実行結果は、

$ dart sample1.dart 
hoge
fuga

sample1だと、main関数の中で全てが完結しているので、あまりメリットがよくわからないかもしれません。
以下のような、例を見てみましょう。

sample2.dart
import 'dart:async';

class Sample2 {
  final _controller = StreamController<String>();

  Sample2() {
    a();
    b();
  }

  void a() {
    _controller.sink.add('hoge');
    _controller.sink.add('fuga');
  }

  void b() {
    _controller.stream.listen((val) {
      print(val);
    });
  }
}

main() {
  Sample2();
}

実行結果は、sample1と同じです。この例だと、値が関数間をテレポートしているような感じがしませんか?

streamを返す関数を定義する

さて、streamの基本を理解したところで、一歩階段を登りましょう。streamを返す関数を作ってみます。以下の例を見てください。

sample3.dart
import 'dart:async';

main() {
  final numbers = [1, 2, 3, 4, 5];

  toSquared(numbers).listen((val) {
    print(val);
  });
}

Stream<int> toSquared(List<int> numbers) {
  final _controller = StreamController<int>();

  numbers.forEach((n) {
    _controller.sink.add(n * n);
  });

  return _controller.stream;
}

配列の中身を全て2乗してprintする例です。実行結果は以下の通り。

$ dart sample3.dart
1
4
9
16
25

「forEachの中でprintしろよ」って言われたらそれまでなんですが、ちょっとstreamのお勉強ということで許してください。
上記のように、streamを返す関数は定義できるわけです。そこまで理解は難しくないと思います。

ただ、dartでは、sample3をもっと短く記述することができます。それが、async* と yieldです。
ここまでに示したsampleを理解していない中で、async*とかyieldを目にするから、「streamワカンナイ」ってなるわけです。

async* と yield

sample3と同じ挙動をするsample4を、async*とyieldで作ってみます。

sample4.dart
import 'dart:async';

main() {
  final numbers = [1, 2, 3, 4, 5];

  toSquared(numbers).listen((val) {
    print(val);
  });
}

Stream<int> toSquared(List<int> numbers) async* {
  for (int n in numbers) {
    yield n * n;
  }
}

どうですか、めちゃくちゃコードが短くなったでしょう。それと同時に、streamの面影がどこかへ消えています。もちろん、実行結果はsample3と同じです。

async*をつけると、関数の戻り値がStreamになります。似たものにasyncがありますが、asyncは戻り値をFutureにするものです。この2つは似て非なるものなので、しっかり区別しましょう。

yieldは、async*関数内でのみ使える、通常の関数におけるreturnの代わりです。やっていることは、StreamController.sink.add()と同じです。

値の受け取り方

ここまで示したsampleでは、全てlistenを用いて値を受け取っています。dartには、listen以外の受け取り方が存在します。

await for を使う

先程のsample4を、await forを使って書き換えてみましょう。

sample5.dart
import 'dart:async';

main() async {
  final numbers = [1, 2, 3, 4, 5];

  await for (int n in toSquared(numbers)) {
    print(n);
  }
}

Stream<int> toSquared(List<int> numbers) async* {
  for (int n in numbers) {
    yield n * n;
  }
}

実行結果はsample4と同じです。一見、listenと何が違うんだ?という感じですが、この2つには、streamの終了を待つか待たないか、という大きな違いがあります。sample4と5を少し書き換えて、確認してみましょう。

sample4-2.dart
import 'dart:async';

main() {
  final numbers = [1, 2, 3, 4, 5];

  toSquared(numbers).listen((val) {
    print(val);
  });

  print('hoge');
}

Stream<int> toSquared(List<int> numbers) async* {
  for (int n in numbers) {
    yield n * n;
  }
}
sample5-2.dart
import 'dart:async';
main() async {
  final numbers = [1, 2, 3, 4, 5];

  await for (int n in toSquared(numbers)) {
    print(n);
  }

  print('hoge');
}

Stream<int> toSquared(List<int> numbers) async* {
  for (int n in numbers) {
    yield n * n;
  }
}

双方にprint('hoge')を追加しました。これで実行結果を確認してみます。

$ dart sample4.dart
hoge // 先に出力
1
4
9
16
25

$ dart sample5.dart
1
4
9
16
25
hoge // 後に出力

見事に、hogeの出力位置が異なりますね。つまり、await forはstreamの終了を待ってほしいときに使います。逆にいえば、streamの値の受け取りと同時に他の処理を進めたい場合は、listenを使うべき、ということになります。

複数の場所で値を受け取る

以下は、関数aとbでstreamの値を受け取る例です。

sample6.dart
import 'dart:async';

final _controller = StreamController<String>.broadcast();  // ポイント: broadcastで定義する

main() async {
  a();
  b();

  while(true) {
    _controller.sink.add('hoge');
    await Future.delayed(Duration(seconds: 1));
  }
}

void a() {
  _controller.stream.listen((val) {
    print('a: ${val}');
  });
}

void b() {
  _controller.stream.listen((val) {
    print('b: ${val}');
  });
}

複数の場所でlistenをする場合、StreamController.broadcast()を使う必要があります。上記の実行結果は、

$ dart sample6.dart
a: hoge
b: hoge

// 1秒sleep

a: hoge
b: hoge

// 1秒sleep

a: hoge
b: hoge
^C

このようになります。複数の場所でlistenできていることがわかりますね。

受け取る際に値を加工する

先程のsample6少し書き換えて、値を受け取る際に加工してみましょう。

sample7.dart
import 'dart:async';

final _controller = StreamController<String>.broadcast();

main() async {
  a();
  b();

  while(true) {
    _controller.sink.add('hoge');
    await Future.delayed(Duration(seconds: 1));
  }
}

void a() {
  _controller.stream.transform(transform('だべや')).listen((val) {
    print('a: ${val}');
  });
}

void b() {
  _controller.stream.transform(transform('やねん')).listen((val) {
    print('b: ${val}');
  });
}

StreamTransformer<String, String> transform(String endWord) {
  return StreamTransformer<String, String>.fromHandlers(
    handleData: (value, sink) {
      sink.add('${value}${endWord}');
    }
  );
}

値の加工をするには、listenの前にtransformをはさみます。記述の仕方については、上記を参考にしてみてください。実行結果は以下です。

% dart sample7.dart
a: hogeだべや
b: hogeやねん

// 1秒sleep

a: hogeだべや
b: hogeやねん

// 1秒sleep

a: hogeだべや
b: hogeやねん
^C

おわりに

一度理解すればそこまで難しくは無いと思います。streamは、dartで非同期を実行する上でとても便利な仕組みなので、この記事で少しでも理解が深まれば嬉しいです。

110
54
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
110
54

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?