Edited at

Stream/Sinkを使いこなす! Stream/RxDart初心者のためのBLoC入門 part2


前回の記事

この記事は以下の記事の続きとなりますが、この記事から読み始めてもOKです。

Stream/RxDart初心者のためのBLoC入門

https://qiita.com/tetsufe/items/521014ddc59f8d1df581


BLoCを使うために Stream/Sink は必要

BLoCの定義に「Stream/Sinkを使う」ということが含まれていますので、まさにBLoCとStream/Sinkは切っても切れない関係です。

今回はこのStream/Sinkについて理解を深めるという内容です。


BLoCのおさらい


  • BLoCは状態管理パターンの一つ

  • BLoCへの入力はSinkを使う

  • BLoCがもつ値の出力にStreamを使う

  • BLoCを使うことでロジック部分をUIと分離できる

  • BLoCとProviderなどを併用することで、複数のWidgetで一つの状態を共有できる


今回の登場人物


  • Stream

  • Sink


    • EventSink

    • StreamSink



  • StreamController


    • broadcast(hot)



  • StreamBuilder

スクリーンショット 2019-07-21 3.48.02.png


Stream

Streamは、連続したデータ(イベント)を扱うためのクラスです。


  • Stream自体は連続した値を持ち、Sinkを使うことでその値を追加していくことができる

  • listen() メソッドを使うことで、新しい値が追加されたタイミングでその値を使って何かしらの処理を行うリスナーを設定することができる

  • 連続した値をメソッド(オペレータ)により変換(例えば、連続する全ての値を2倍にしたり、連続で同じ値が来た場合はそれを取り除いたり)できる。

これらの性質から、


  • ユーザからのアクションに応じた処理、web APIを用いる際などの非同期処理をうまく扱うことができる

  • メソッド(オペレータ)でデータの変換処理などができ、メソッドチェーンを使った読みやすいコードを書くことができる

  • Rx系のライブラリと互換性があり、Rxを使ったことがある人は習得しやすい


Sink

https://api.flutter.dev/flutter/dart-core/Sink-class.html

Streamを扱う上で重要なのがSinkです。Sinkはadd()メソッドを用いることで紐づいているStreamに新しい値を送ることができます。Sinkの基本はこれだけで、非常にシンプルです。


Sink <- EventSink <- StreamSink

Sinkを継承したクラスがEventSink、EventSinkを継承したクラスがStreamSinkという関係になっています。

実際のところ、StreamController.sinkは、StreamSinkです。

SinkとEventSink(及びその派生クラスStreamSink)の違いは、


  • EventSinkはadd()とaddError()を持ち、エラーハンドリングに適している

  • Sinkはadd()のみを持つ

このため、StreamControllerを使う場合、StreamController.sinkを使うことで、エラー / 非エラーという情報をStreamに送ることができます。

この構成は、RxDartのSubjectやその派生クラスでも引き継がれています。つまり、Subject.sinkはStreamSinkクラスです。


SinkとStreamを使ったエラーハンドリング

WebAPIを使った処理を題材に、エラーハンドリング処理を実際に作ってみます。

Simulator Screen Shot - iPhone 8 - 2019-07-21 at 05.04.47.png


構成


  • search_bloc.dart(BLoC)


    • 検索APIへの入力 / 出力(検索結果)を扱う



  • main.dart(UI)


    • 検索フォーム

    • 検索結果表示用 StreamBuilder




流れ


  1. 検索フォームの検索ボタンを押したら、searchQueryControllerのsink(changeQuery).add() を使って値を流す

  2. BLoC側で流れてきた値をlisten()しておき、queryを用いてAPIを叩く

  3. その結果を searchResultControllerのsink(changeResult).add() or addError() で送る。

  4. この値 searchResultControllerのstream(result) を main.dartのStreamBuilderで用いる。この際、エラーかどうかも判断する


コード


search_bloc.dart

import 'dart:async';

class SearchBloc {
final searchApi = SearchApi();

final searchQueryController = StreamController<String>();
Stream<String> get query => searchQueryController.stream;
StreamSink<String> get changeQuery => searchQueryController.sink;

// APIの返り値となるSearchResult型を自作したと仮定
final searchResultController = StreamController<SearchResult>();
Stream<SearchResult> get result => searchResultController.stream;
StreamSink<SearchResult> get changeResult => searchResultController.sink;

SearchBloc() {
query.listen((v) async {
// APIの返り値となるSearchResult型を自作したと仮定
final SearchResult searchResult = await searchApi.search(v);
if (searchResult.isError) {
changeResult.addError(searchResult);
} else {
changeResult.add(searchResult);
}
});
}

void dispose() {
searchResultController.close();
searchQueryController.close();
}
}

/*
class SearchApi {
Future<SearchResult> search(String v) async {
await Future.delayed(Duration(seconds: 1));
return SearchResult(isError: false);
}
}

class SearchResult {
bool isError;
SearchResult({this.isError});
}
*/



main.dart

import 'package:flutter/material.dart';

import 'package:provider/provider.dart';
import 'package:search_bloc/search_bloc.dart';

void main() => runApp(MyApp());

class MyApp extends StatelessWidget {
@override
Widget build(BuildContext context) {
return MaterialApp(
title: 'Flutter Demo',
theme: ThemeData(
primarySwatch: Colors.blue,
),
home: Provider<SearchBloc>(
builder: (context) => SearchBloc(),
dispose: (context, bloc) => bloc.dispose(),
child: SearchPage(),
));
}
}

class SearchPage extends StatelessWidget {
final _formKey = GlobalKey<FormState>();

TextEditingController queryInputController = TextEditingController(text: '');

@override
Widget build(BuildContext context) {
final searchBloc = Provider.of<SearchBloc>(context);
return Scaffold(
appBar: AppBar(
title: Text('テストアプリ'),
),
body: Center(
child: Column(children: <Widget>[
Form(
key: _formKey,
child: Column(children: <Widget>[
Padding(
padding: const EdgeInsets.all(16.0),
child: Center(
child: TextFormField(
decoration: InputDecoration(
labelText: '検索キーワード',
),
controller: queryInputController,
))),
RaisedButton(
child: const Text('検索'),
onPressed: () =>
searchBloc.changeQuery.add(queryInputController.text)),
])),
StreamBuilder(
stream: searchBloc.result,
builder: (context, snapshot) {
if (snapshot.hasError) {
// snapshot.error を使ったWidgetを返す
// snapshot は AsyncSnapshot<T> で
           // snapshot.error は Object
// https://api.flutter.dev/flutter/widgets/AsyncSnapshot-class.html
}
if (snapshot.data != null) {
// snapshot.data を使ったWidgetを返す
} else {
// 何かWidgetを返す
}
// エラーが気になる人はとりあえず return Container();
})
])));
}
}



Broadcast(hot) Stream

Broadcast Streamは、複数回listen()することができるStreamです。一つの

https://dart.dev/tutorials/language/streams#broadcast-streams

反対に、broadcastでないStream(デフォルト)は、複数回listen()する場合、2回目以降は何も起こりません。


counter_bloc.dart

import 'dart:async';

class CounterBloc {
final _actionController = StreamController<void>();
Sink<void> get increment => _actionController.sink;

final _countController = StreamController<int>();
Stream<int> get count => _countController.stream;

int _count = 0;

CounterBloc() {
_actionController.stream.listen((_) {
_count++;
_countController.sink.add(_count);
});
_countController.stream.listen(print); // ここで listen()
// 以下の二つも同じ意味
//_countController.stream.listen((value) => print(value));
/*
_countController.stream.listen((value) {
print(value);
});
*/

}

void dispose() {
_actionController.close();
_countController.close();
}
}



main.dart

StreamBuilder(

initialData: 0,
stream: counterBloc.count,
builder: (context, snapshot) {
return Text(
'${snapshot.data}',
style: Theme.of(context).textTheme.display1,
);
},
)

表示は以下のようになってしまいます。

Simulator Screen Shot - iPhone 8 - 2019-07-18 at 14.29.57.png

StreamControllerでは、broadcastメソッドがあり、これを使うことでbroadcastができるようになります。


counter_bloc.dart

import 'dart:async';

class CounterBloc {
/* 省略 */

final _countController = StreamController<int>.broadcast();
Stream<int> get count => _countController.stream;



StraemBuilderとbroadcast

StreamBuilderを使う場合でも、broadcastでない場合、streamから読み取る値はnullとなってしまいます。broadcastを使うと、普通に値を受け取ることができます。

というのも、StreamBuilderの値の受け取り方はlisten()を使っているからです。

https://github.com/flutter/flutter/blob/b712a172f9/packages/flutter/lib/src/widgets/async.dart#L373

treamBuilderで値を使いながら、その値に応じてもう一つの処理を行いたいときなどはbroadcastを使いましょう。


broadcastのデメリット

これだけをみると、全てをbroadcastで宣言した方が便利なように思えます。broadcastにするデメリットはないのでしょうか?(あまり調べてもわかりませんでした。詳しい方教えていただけると嬉しいです・・)

デメリットというかは微妙ですが、以下には注意すべきだと思いました。


  • 複数listen()されるべきでないStreamが間違ってlisten()されてしまっても気づけない


    • broadcastでない場合は複数回listen()すると警告が出ます




broadcast と RxDart

RxDartのSubjectを継承するオブジェクトは、全てbroadcastです。よく使うBehaviorSubject, PublishSubjectなどがそうです。

これらは全てbroadcastですが、サンプルをいくつか見た限り、broadcastでないStreamControllerを使う場合とあまり実装に変化がないようでした。これもbroadcastにしたからといってそれほど目立ったデメリットはないように思った理由の一つです。

参考:https://pub.dev/documentation/rxdart/latest/rx/BehaviorSubject-class.html


まとめ

スクリーンショット 2019-07-21 3.48.02.png


  • BLoCでは定義上、 Stream / Sink を使わなければならない

  • Stream / Sink は、ユーザからのアクションを受け付ける際などに便利

  • StreamSink.add()とStreamSink.addError()でStreamに値を送る

  • Stream.listen()で値が送られた時に行う処理をセットする

  • Streamで複数回listen()するにはbroadcast Streamを使えばよい


参考

Asynchronous programming: streams

https://dart.dev/tutorials/language/streams#broadcast-streams

EventSink

https://api.flutter.dev/flutter/dart-async/EventSink-class.html

StreamSink

https://api.flutter.dev/flutter/dart-async/StreamSink-class.html

StreamController

https://api.dartlang.org/stable/2.4.0/dart-async/StreamController-class.html