13
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

DartのStreamについて学んでみる

Posted at

Streamとは?

公式のドキュメントのリンク

非同期プログラミング:ストリームについて
https://dart.dev/tutorials/language/streams
シンク< T >クラスについて
https://api.flutter.dev/flutter/dart-core/Sink-class.html
リッスンメソッドについて
https://api.flutter.dev/flutter/dart-async/Stream/listen.html
ストリームコントローラーについて
https://api.dart.dev/stable/2.16.2/dart-async/StreamController-class.html

英単語自体を直訳すると、流れ となります。訳通り、dartのstreamはある場所からある場所へ値を流す機能です。
Javaの本では、川の流れだとか例えていた。

こんな例えもあった。
Streamとは、「ある値を入れて」、「ある値を出す」仕組みです。
この例えの方が、プログラミングでは、分かりやすいと思う。

StreamControllerを使ってコードを書くのが一般的な方法みたいです。
書きながら、勉強していきましょう。

  1. 作業用のフォルドを作成
  2. ターミナルで、dart create dart-プロジェクト名を入力する。
dart create dart-practice
  1. Visual Studio Codeでプロジェクトを開く。
  2. こちらで、コーディングを行なっていきます。
import 'dart:async';

main() {
  final controller = StreamController();
  controller.sink.add(1);
  controller.sink.add(2);
    
  controller.stream.listen((value){
    print(value);
  });
}

実行するには、Run|Debugのどちらかを押すか、右上の虫のボタンを押す。ファイルを右クリックして、Start Debugginを押しても実行できます。
Flutterでも同じことができる。

スクリーンショット 2022-04-17 9.08.47.png

実行結果

1
2
Exited

sinkとは?

公式を翻訳すると
データの一般的な宛先。

複数のデータ値をシンクに入れることができ、
利用できるデータがなくなったら、シンクを閉じる必要があります。

これは、他のデータ受信者が実装できる汎用インターフェースです。

sink.の後にメソッドで、追加をするadd(T data)と閉じるclose()がある。
今使ってみたのは、追加をするaddですね。

add(値)こんな仕組みになっているようですね。

listenとは?

listen関数を利用することで、Streamからデータを都度受け取れる
公式を翻訳すると分かりずらい?
forEachのような繰り返し処理をしている。

int型だけではなく、あらゆる値を出し入れすることができます。

import 'dart:async';

main() {
  final controller = StreamController();
  controller.sink.add(1);
  controller.sink.add(2.0);
  controller.sink.add('Hello');
  controller.sink.add([10, 20.0, 'World']);
  controller.sink.add(null);
    
  controller.stream.listen((value){
    print(value);
  });
}

実行結果

1
2.0
Hello
[10, 20.0, World]
null

自分でプログラムを考えて実験してみた!

import 'dart:async';

main() {
  // controllerという定数を定義、finalなので代入はできる。変数といった方がいいか...
  final controller = StreamController();
  // controllerに値を追加していく、List型出ないと複数の型を扱えない、エラーが出る
  controller.sink.add("こんにちは!");
  // 計算ができた!、足し算をする
  controller.sink.add(1 + 1 + 1);
  // 掛け算してみる
  controller.sink.add(2 * 2);
  // 掛け算して、余の数を出す。2×2は4で、4÷3は1になる。
  controller.sink.add(2 * 2 % 3);
  // 配列を使ってみる。Dクラスの平均点を配列で作ってみる。
  controller.sink.add(["Dクラス", 68, true]);
  // String、int、bool型が入っている。実際は成績が悪いので、新しく作る。
  controller.sink.add(["Dクラス", 48, false]);
  // forEachのように、controllerという変数ですね。これを繰り返し処理している。
  controller.stream.listen((value) {
    print(value);
  });
}

実行結果

こんにちは!
3
4
1
[Dクラス, 68, true]
[Dクラス, 48, false]

Streamを返す関数を定義する

forEachを使った方法があるそうですが、async関数を使うのが一般的な方法のようです。

streamとasync

async関数の中では「await for」を使ってStreamの値を取り出すことができます。

また、「async*」関数の中で「yield」キーワードを使うことで、返り値のStreamに値を流すことができます。

async*をつけると、関数の戻り値がStreamになります。似たものにasyncがありますが、asyncは戻り値をFutureにするものです。この2つは似て非なるものなので、しっかり区別しましょう。と参考にした昔の記事には書いてありました。

import 'dart:async';

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (var value in stream) {
    sum += value;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;
  }
}

main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // 55
}

実行結果

55

型に関係なく、以下のように使うこともできます。

import 'dart:async';

Future<List> displayStream(Stream<String> stream) async {
  List<String> arr = [];
  await for (var value in stream) {
    arr.add(value + '-inside-stream');
  }
  
  return arr;
}

Stream<String> passStream(List to) async* {
  for(var v in to){
    yield v;
  }
}

main() async {
  var stream = passStream(['a', 'b', 'c', 'd']);
  var str_1 = await displayStream(stream);
  print(str_1);
}

でも、参考にしたこのソースコードだとエラーが出たので、修正しないと使えませんでした🤔

import 'dart:async';
// dynamic型にしないとエラー出る!
Future<List> displayStream(Stream<dynamic> stream) async {
  List<String> arr = [];
  await for (var value in stream) {
    arr.add("配列の中身を表示: " + value);
  }

  return arr;
}
// dynamic型にしないとエラー出る!
Stream<dynamic> passStream(List to) async* {
  for(var v in to){
    yield v;
  }
}

main() async {
  var stream = passStream(['a', 'b', 'c', 'd']);
  var str_1 = await displayStream(stream);
  print(str_1);
}

実行結果

[配列の中身を表示: a, 配列の中身を表示: b, 配列の中身を表示: c, 配列の中身を表示: d]

もっと簡単な書き方があったでもこの書き方古いみたいですね?

import 'dart:async';

main() {
  final members = ["Kboyさん", "Aoiさん", "kosukeさん"];

  toSquared(members).listen((val) {
    print(val);
  });
}

Stream<String> toSquared(List<String> members) async* {
  for (String n in members) {
    yield n;
  }
}

実行結果

Kboyさん
Aoiさん
kosukeさん

現在はこの書き方が推奨されている?、Null safety対応されているみたいです。
公式ドキュメント
https://api.flutter.dev/flutter/dart-math/sqrt.html

コードを修正

import 'dart:async';

main() {
  final members = ["Kboyさん", "Aoiさん", "kosukeさん"];
  // コードを修正: toSquared -> sqrt
  sqrt(members).listen((val) {
    print("sqrtで表示: " + val);
  });
}

// コードを修正: toSquared -> sqrt
Stream<String> sqrt(List<String> members) async* {
  for (String n in members) {
    yield n;
  }
}

実行結果

sqrtで表示: Kboyさん
sqrtで表示: Aoiさん
sqrtで表示: kosukeさん

例外処理を行うこともできます。
例外は、「onError」でcatchします。

import 'dart:async';

// 例外処理を行うこともできます。
void addLessThanFive(StreamController controller, int value) {
  // 5未満までデータを追加する
  if (value < 5) {
    controller.sink.add(value);
  } else {
    controller.sink.addError(StateError('$value is not less than 5'));
  }
}

main() {
  final controller = StreamController();
  addLessThanFive(controller, 1);
  addLessThanFive(controller, 2);
  addLessThanFive(controller, 3);
  addLessThanFive(controller, 4);
  addLessThanFive(controller, 5);

  controller.stream.listen((value) {
    print(value);
  }, onError: (error) {
    print(error);
    print("例外処理が発生しました!");
  });
}

実行結果

1
2
3
4
Bad state: 5 is not less than 5
例外処理が発生しました!

Streamをcloseして、その時に行う処理を書くこともできます。
「onDone」という項目で、closeした時の処理を行います。

import 'dart:async';

void addLessThanFive(StreamController controller, int value) {
  // 5未満まで値を追加して、5を超えると例外処理が発生して、closeする
  if (value < 5) {
    controller.sink.add(value);
  } else {
    controller.sink.addError(StateError('$value is not less than 5'));
  }
}

// Streamをcloseします。
main() {
  final controller = StreamController();
  addLessThanFive(controller, 1);
  addLessThanFive(controller, 2);
  addLessThanFive(controller, 3);
  addLessThanFive(controller, 4);
  addLessThanFive(controller, 5);
  controller.close();

  controller.stream.listen((value) {
    print(value);
  }, onError: (error) {
    print(error);
  }, onDone: () {
    print('controller was closed');
    print('Streamをcloseしました');
  });
}

実行結果

1
2
3
4
Bad state: 5 is not less than 5
controller was closed
Streamをcloseしました

やってみた感想

自分でプログラムを動かしてみないと、サンプル通りに動かなかったりするので、調べながらやってみた、ストリームコントローラーはこんなものかとなんとなく理解できた。
これ理解しないと、Provider、riverpod使っあFirebase使ったアプリ作るときに、人が書いたソースコード読めないのが理解できた。

13
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?