前回の記事
この記事は以下の記事の続きとなりますが、この記事から読み始めてもOKです。
Stream/RxDart初心者のためのBLoC入門
https://qiita.com/tetsufe/items/521014ddc59f8d1df581
BLoCを使うために Stream/Sink は必要
BLoCの定義に「Stream/Sinkを使う」ということが含まれていますので、まさにBLoCとStream/Sinkは切っても切れない関係です。
今回はこのStream/Sinkについて理解を深めるという内容です。
BLoCのおさらい
- BLoCは状態管理パターンの一つ
- BLoCへの入力はSinkを使う
- BLoCがもつ値の出力にStreamを使う
- BLoCを使うことでロジック部分をUIと分離できる
- BLoCとProviderなどを併用することで、複数のWidgetで一つの状態を共有できる
今回の登場人物
- Stream
- Sink
- EventSink
- StreamSink
- StreamController
- broadcast(hot)
- StreamBuilder
Stream
Streamは、連続したデータ(イベント)を扱うためのクラスです。
- Stream自体は連続した値を持ち、Sinkを使うことでその値を追加していくことができる
- listen() メソッドを使うことで、新しい値が追加されたタイミングでその値を使って何かしらの処理を行うリスナーを設定することができる
- 連続した値をメソッド(オペレータ)により変換(例えば、連続する全ての値を2倍にしたり、連続で同じ値が来た場合はそれを取り除いたり)できる。
これらの性質から、
- ユーザからのアクションに応じた処理、web APIを用いる際などの非同期処理をうまく扱うことができる
- メソッド(オペレータ)でデータの変換処理などができ、メソッドチェーンを使った読みやすいコードを書くことができる
- Rx系のライブラリと互換性があり、Rxを使ったことがある人は習得しやすい
Sink
Streamを扱う上で重要なのがSinkです。Sinkはadd()メソッドを用いることで紐づいているStreamに新しい値を送ることができます。Sinkの基本はこれだけで、非常にシンプルです。
Sink <- EventSink <- StreamSink
Sinkを継承したクラスがEventSink、EventSinkを継承したクラスがStreamSinkという関係になっています。
実際のところ、StreamController.sink
は、StreamSink
です。
SinkとEventSink(及びその派生クラスStreamSink)の違いは、
- EventSinkはadd()とaddError()を持ち、エラーハンドリングに適している
- Sinkはadd()のみを持つ
このため、StreamControllerを使う場合、StreamController.sinkを使うことで、エラー / 非エラーという情報をStreamに送ることができます。
この構成は、RxDartのSubjectやその派生クラスでも引き継がれています。つまり、Subject.sinkはStreamSinkクラスです。
SinkとStreamを使ったエラーハンドリング
WebAPIを使った処理を題材に、エラーハンドリング処理を実際に作ってみます。
構成
- search_bloc.dart(BLoC)
- 検索APIへの入力 / 出力(検索結果)を扱う
- main.dart(UI)
- 検索フォーム
- 検索結果表示用 StreamBuilder
流れ
- 検索フォームの検索ボタンを押したら、searchQueryControllerのsink(changeQuery).add() を使って値を流す
- BLoC側で流れてきた値をlisten()しておき、queryを用いてAPIを叩く
- その結果を searchResultControllerのsink(changeResult).add() or addError() で送る。
- この値 searchResultControllerのstream(result) を main.dartのStreamBuilderで用いる。この際、エラーかどうかも判断する
コード
import 'dart:async';
class SearchBloc {
final searchApi = SearchApi();
final searchQueryController = StreamController<String>();
Stream<String> get query => searchQueryController.stream;
StreamSink<String> get changeQuery => searchQueryController.sink;
// APIの返り値となるSearchResult型を自作したと仮定
final searchResultController = StreamController<SearchResult>();
Stream<SearchResult> get result => searchResultController.stream;
StreamSink<SearchResult> get changeResult => searchResultController.sink;
SearchBloc() {
query.listen((v) async {
// APIの返り値となるSearchResult型を自作したと仮定
final SearchResult searchResult = await searchApi.search(v);
if (searchResult.isError) {
changeResult.addError(searchResult);
} else {
changeResult.add(searchResult);
}
});
}
void dispose() {
searchResultController.close();
searchQueryController.close();
}
}
/*
class SearchApi {
Future<SearchResult> search(String v) async {
await Future.delayed(Duration(seconds: 1));
return SearchResult(isError: false);
}
}
class SearchResult {
bool isError;
SearchResult({this.isError});
}
*/
import 'package:flutter/material.dart';
import 'package:provider/provider.dart';
import 'package:search_bloc/search_bloc.dart';
void main() => runApp(MyApp());
class MyApp extends StatelessWidget {
@override
Widget build(BuildContext context) {
return MaterialApp(
title: 'Flutter Demo',
theme: ThemeData(
primarySwatch: Colors.blue,
),
home: Provider<SearchBloc>(
builder: (context) => SearchBloc(),
dispose: (context, bloc) => bloc.dispose(),
child: SearchPage(),
));
}
}
class SearchPage extends StatelessWidget {
final _formKey = GlobalKey<FormState>();
TextEditingController queryInputController = TextEditingController(text: '');
@override
Widget build(BuildContext context) {
final searchBloc = Provider.of<SearchBloc>(context);
return Scaffold(
appBar: AppBar(
title: Text('テストアプリ'),
),
body: Center(
child: Column(children: <Widget>[
Form(
key: _formKey,
child: Column(children: <Widget>[
Padding(
padding: const EdgeInsets.all(16.0),
child: Center(
child: TextFormField(
decoration: InputDecoration(
labelText: '検索キーワード',
),
controller: queryInputController,
))),
RaisedButton(
child: const Text('検索'),
onPressed: () =>
searchBloc.changeQuery.add(queryInputController.text)),
])),
StreamBuilder(
stream: searchBloc.result,
builder: (context, snapshot) {
if (snapshot.hasError) {
// snapshot.error を使ったWidgetを返す
// snapshot は AsyncSnapshot<T> で
// snapshot.error は Object
// https://api.flutter.dev/flutter/widgets/AsyncSnapshot-class.html
}
if (snapshot.data != null) {
// snapshot.data を使ったWidgetを返す
} else {
// 何かWidgetを返す
}
// エラーが気になる人はとりあえず return Container();
})
])));
}
}
Broadcast(hot) Stream
Broadcast Streamは、複数回listen()することができるStreamです。hot/coldという概念に聞き覚えがある方は、hotだと思ってください。
反対に、broadcastでないStream(デフォルト)は、複数回listen()する場合、2回目以降は何も起こりません。
import 'dart:async';
class CounterBloc {
final _actionController = StreamController<void>();
Sink<void> get increment => _actionController.sink;
final _countController = StreamController<int>();
Stream<int> get count => _countController.stream;
int _count = 0;
CounterBloc() {
_actionController.stream.listen((_) {
_count++;
_countController.sink.add(_count);
});
_countController.stream.listen(print); // ここで listen()
// 以下の二つも同じ意味
//_countController.stream.listen((value) => print(value));
/*
_countController.stream.listen((value) {
print(value);
});
*/
}
void dispose() {
_actionController.close();
_countController.close();
}
}
StreamBuilder(
initialData: 0,
stream: counterBloc.count,
builder: (context, snapshot) {
return Text(
'${snapshot.data}',
style: Theme.of(context).textTheme.display1,
);
},
)
表示は以下のようになってしまいます。broadcastでないStreamを使ってしまったため、listen()ができず、StreamBuilderは値を受け取れなかったのです(StreamBuilderでは内部でlisten()を使っています)
StreamControllerにはbroadcastコンストラクタがあり、これを使うことでbroadcastができるようになります。
import 'dart:async';
class CounterBloc {
/* 省略 */
final _countController = StreamController<int>.broadcast();
Stream<int> get count => _countController.stream;
StraemBuilderとbroadcast
StreamBuilderを使う場合でも、broadcastでない場合、streamから読み取る値はnullとなってしまいます。broadcastを使うと、普通に値を受け取ることができます。
というのも、StreamBuilderの値の受け取り方はlisten()を使っているからです。
treamBuilderで値を使いながら、その値に応じてもう一つの処理を行いたいときなどはbroadcastを使いましょう。
broadcastのデメリット
これだけをみると、全てをbroadcastで宣言した方が便利なように思えます。broadcastにするデメリットはないのでしょうか?(あまり調べてもわかりませんでした。詳しい方教えていただけると嬉しいです・・)
デメリットというかは微妙ですが、以下には注意すべきだと思いました。
- 複数listen()されるべきでないStreamが間違ってlisten()されてしまっても気づけない
- broadcastでない場合は複数回listen()すると警告が出ます
broadcast と RxDart
RxDartのSubjectを継承するオブジェクトは、全てbroadcastです。よく使うBehaviorSubject, PublishSubjectなどがそうです。
これらは全てbroadcastですが、サンプルをいくつか見た限り、broadcastでないStreamControllerを使う場合とあまり実装に変化がないようでした。これもbroadcastにしたからといってそれほど目立ったデメリットはないように思った理由の一つです。
参考:https://pub.dev/documentation/rxdart/latest/rx/BehaviorSubject-class.html
まとめ
- BLoCでは定義上、 Stream / Sink を使わなければならない
- Stream / Sink は、ユーザからのアクションを受け付ける際などに便利
- StreamSink.add()とStreamSink.addError()でStreamに値を送る
- Stream.listen()で値が送られた時に行う処理をセットする
- Streamで複数回listen()するにはbroadcast Streamを使えばよい
続き
RxDartのBehaviorSubjectとPublishSubjectの違いと使い分け
参考
Asynchronous programming: streams
https://dart.dev/tutorials/language/streams#broadcast-streams
EventSink
https://api.flutter.dev/flutter/dart-async/EventSink-class.html
StreamSink
https://api.flutter.dev/flutter/dart-async/StreamSink-class.html
StreamController
https://api.dartlang.org/stable/2.4.0/dart-async/StreamController-class.html