DartのStreamとStreamTransformerを調査したときの話
Flutterを使い始めて疑問に思ったDartのStreamとStreamTransformerの話.ほぼ備忘録なのであまり需要はないかと思います
ストリームとは
ストリームとは,ファイルやネットワークから読み込むデータ,イベントなど,連続して送られてくるモノを抽象化した概念である(少なくとも今のところの認識は).Dartでは,Streamクラスとして抽象化され,ストリームに流れてくるデータやイベントを取得するできる.一方,Streamは読み出し専用であり,ストリームにデータを入れるには別の方法(後述するジェネレータやSink)を使う必要がある.
Streamを使ってみる
Streamからデータを受け取る方法は2つある.一つはawait for
を使う方法.もう一つはlisten
コールバックを使う方法
まず,await for
を使ってみる
void f1() async {
Stream<List<int>> content = File('input.txt').openRead();
await for (final iv in content) {
print(iv);
}
print("end");
}
input.txtには以下の文字列が入っている
abc
def
これを実行すると以下の結果を得る.
[97, 98, 99, 10, 100, 101, 102]
end
Streamは非同期で処理され,await for
はStreamから流れ出てくるデータの終了を待つ.そのため"end"が最後に出力される.
次に,listen
を使ってみる
void f11() async {
Stream<List<int>> content = File('input.txt').openRead();
content.listen(print);
print("end");
}
これを実行すると以下の結果を得る.
end
[97, 98, 99, 10, 100, 101, 102]
listen
はコールバックなので,Streamからデータが流れてきた時に実行される(非同期処理)ので,"end"が先に出力されている.
ちなみに,listen
の引数はvoid onData(T event)?
なので,TはList<int>
である.よって,await for
でループして出力しているのではなく,Listが一回だけ出力されている.もっと長いファイルを読み込ませるとループするはず.
Streamを作ってみる
Streamを作る方法は(現時点で知る限り)2つある.1つはジェネレータを使う方法,もう一つはStreamController
を使う方法.
ジェネレータを使う
ジェネレータとは,Streamを作るためにDartが用意している仕組み.具体的には,async*
とyield
を使う
void f23() {
Stream<int> _create() async* {
for (int i = 0; i < 10; ++i) {
yield i;
}
}
Stream<int> stream = _create();
stream.listen(print);
}
メソッド内部の_create
関数がジェネレータ.yield
を使うことでストリームに値を与えている.yield
はasync*
がついた関数でなければ使うことができない.実行結果は省略
StreamControllerを使う
StreamControllerは,内部にStreamとSinkというプロパティを保持している.SinkはStreamに値を与えるadd
メソッドを持つ
void f22() {
var controller = StreamController<int>();
Stream<int> stream = controller.stream;
stream.listen(print);
controller.sink.add(1);
controller.sink.add(2);
controller.sink.add(3);
}
実行結果は省略.controller.sink.add
で,Streamに値を与えている.
StreamTransformerとは
Streamは非同期でやってくるイベントやデータで,await for
やlisten
を使うことで値を取り出すことができる.取り出した値に対して何らかの加工をし,新たに別のストリームに流し込む,ために用いるのがStreamTransformerとなる.Javaのデコレータパターンやストリームと本質的には同じ.
StreamTransformerを作って使ってみる
StreamTransformerのクラス定義は
abstract class StreamTransformer<S, T>
で,Stream<S>
からデータを受け取って,Stream<T>
に流し込む
Dartでは,StreamTransformerを作るのに3つ方法が用意されている.
(1) const factory StreamTransformer(StreamSubSubscription<T> onListen(Stream<S> stream, bool cancelOnError))
(2) factory StreamTransformer.fromHandlers(
{void handleData(S data, EventSink<T> sink)?,
void handleError(Object error, StackTrace stackTrace, EventSink<T> sink)?,
void handleDone(EventSink<T> sink)?})
(3) factory StreamTransformer.fromBind(Stream<T> Function(Stream<S>) bind)
(1)は,Stream<S>
とbool型の引数をとり,StreamSubScription<T>
を返す関数を引数に取るコンストラクタを利用する方法.具体的には次のように使う
void f5() {
StreamSubscription<String> _onListen(Stream<int> input, bool cancelOnError) {
late StreamSubscription<int> subscription;
var controller = StreamController<String>( // 新たな出力ストリームを作る
onPause: () {
subscription.pause();
},
onResume: () {
subscription.resume();
},
onCancel: () => subscription.cancel(),
sync: true);
subscription = input.listen((data) { // (a)ここで変換と値の書き込みを行う
controller.add("$data-$data");
},
onError: controller.addError,
onDone: controller.close,
cancelOnError: cancelOnError);
return controller.stream.listen(null); // (b)StreamSubscriptionを返す
}
var duplicator = StreamTransformer(_onListen);
var controller = StreamController<int>();
Stream<String> controllerStream = controller.stream.transform(duplicator); //StreamTransformerを適用
controllerStream.listen(print);
controller.add(1);
controller.add(2);
controller.add(3);
}
ちょっと複雑にみえるが,やっていることは,Stream<S>
(今の場合Stream<int>
)をlistenして値を変換し(a),変換後のStream.listenで返されるStreamSubScription<T>
(今の場合,StreamSubscription<String>
)を返す(b).そして,具体的な変換は_onListen
の中でinput.listen
受け取り,StreamSubScription<T>
が返されるStreamに変換後(この場合,数値を文字に変換して-でくっつける)のデータを流し込む.
そして,作ったStreamTransformerは,Stream.transform
の引数に与えることで,変換後のStream<T>
が返される.あとは入力ストリームに値を流し込むと,変換後の値を受け取れる.
このコードを実行すると以下の結果を得る
1-1
2-2
3-3
(2)は,S型とEventSink<T>
型を引数に受け取り,Sを変換してEventSink<T>
に値を書き込むコールバック関数,handleData
を用いる.(1)の方法よりも簡単.
void f3() {
var controller = StreamController<num>();
var streamTransformer = StreamTransformer<num, num>.fromHandlers(
handleData: (num data, EventSink sink) {
sink.add(data * 2); // ここで変換している
}, handleError: (Object error, StackTrace stacktrace, EventSink sink) {
sink.addError("Something happen: $error");
}, handleDone: (EventSink sink) {
sink.close();
});
var controllerStream = controller.stream.transform(streamTransformer);
controllerStream.listen(print);
controller.sink.add(1);
controller.sink.add(2);
controller.sink.add(3);
}
これを実行すると以下の結果を得る
2
4
6
(3)は,Stream<S>
型を受け取り,Stream<T>
型を返す関数を受け取る,fromBind
を用いる方法
void f61() {
Stream<String> _convert(Stream<int> inStream) {
var controller = StreamController<String>();
inStream.listen((int data) {
controller.sink.add("$data"); // ここで変換している
});
return controller.stream;
}
var iToS = StreamTransformer.fromBind(_convert);
var cont = StreamController<int>();
Stream<String> outStream = cont.stream.transform(iToS);
outStream.listen(print);
cont.sink.add(10);
cont.sink.add(20);
cont.sink.add(30);
}
これもやってることはほとんど(2)と同じ.これを実行すると以下の結果を得る
10
20
30
StreamControllerとawait for
ジェネレータを用いる場合,最初の例のようにawait for
を利用するとStreamの終了を待って次の処理を実行,といったことができる.一方,StreamControllerを用いる場合,Streamを終了させないとawait forがいつまでも待ち続けてしまう(大ハマリした).そこで,Streamを終了させるにはStreamController.closeを呼び出す.
Stream<int> makeCounter(int to) {
var controller = StreamController<int>();
int counter = 0;
void tick(Timer timer) {
counter++;
controller.add(counter);
if (counter >= to) {
timer.cancel();
controller.close(); // これを呼ばないとawait forが待つのをやめない
}
}
Timer.periodic(Duration(seconds: 1), tick);
return controller.stream;
}
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
await for (final value in stream) {
sum += value;
}
print("end of for");
return sum;
}
void f7() async {
var stream = makeCounter(5);
var sum = await sumStream(stream);
print("total = $sum");
}
Dart,良い言語な気がする
Socketに応用
この記事を参考に,Socketに対してStreamTransformer
を適用してみた.Socketの定義は,
abstract class Socket implements Stream<Uint8List>, IOSink
なので,ストリームを流れるデータの型はUint8List
である(要するにバイナリ).これをUTF8の文字列として受け取りたい.そのためにはUTF8Decoder
を使うのが便利だが,UTF8Decoder
の定義は
class Utf8Decoder extends Converter<List<int>, String>
であり,Uint8ListとListの変換が必要となる.そこで,これを変換するStreamTransformerを作成して適用してみた.今回は(3)の方法で.
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:typed_data';
void main() async {
print("Hello Client");
final socket = await Socket.connect('localhost', 8080);
print('Connected to: ${socket.remoteAddress.address}:${socket.remotePort}');
Stream<List<int>> _convert(Stream<Uint8List> inStream) {
final controller = StreamController<List<int>>();
inStream.listen((Uint8List ilist) {
List<int> list = List<int>.from(ilist);
controller.sink.add(list); // ここで変換
});
return controller.stream;
}
final trans = StreamTransformer.fromBind(_convert);
socket.transform(trans).transform(utf8.decoder).listen((String line) {
print("Server2: $line");
}, onError: (error) {
print(error);
socket.destroy();
}, onDone: () {
print("Server left");
socket.destroy();
});
await sendMessage(socket, 'Knock, knock.');
await sendMessage(socket, 'Banana');
await sendMessage(socket, 'Banana');
await sendMessage(socket, 'Banana');
await sendMessage(socket, 'Banana');
await sendMessage(socket, 'Banana');
await sendMessage(socket, 'Orange');
await sendMessage(socket, "Orange you glad I didn't say banana again?");
}
//Future<void> sendMessage(Socket socket, String message) async {
Future<void> sendMessage(Socket socket, String message) {
print("Client $message");
socket.write(message);
//return await Future.delayed(Duration(seconds: 1)); await不要
return Future.delayed(Duration(seconds: 1));
}
サーバの方は参考にしたサイトそのままで,確認のため日本語を送っている.これを実行すると以下の結果を得る.
(追記)参考サイトでは,sendMessage
のreturn
でawaitして結果を返していますが,よくよく考えるとこれは不要で
async
も不要です.
Connected to: 127.0.0.1:8080
Client Knock, knock.
Client Banana
Server2: Who is there? どこ
Client Banana
Server2: Banana だれ
Client Banana
Server2: Banana だれ
Client Banana
Server2: Banana だれ
Client Banana
Server2: Banana だれ
Client Orange
Server2: Banana だれ
Client Orange you glad I didn't say banana again?
Server2: Orange だれ
Server2: Very funny.
ちゃんと日本語が出ている.ちなみに,オリジナルのコードで動かすと文字化けする.
余談
Dartのチュートリアルでは,次のようなコードでファイルを読み込み,UTF8に変換して1行区切りにしてリストに格納,などのようなことをしている.ここで,utf8.decoder
やLineSplitter
はStreamTransformer
のインスタンスである.これらのクラスはStreamTransformerにコールバックなどを与えて生成しているのではなく,
StreamTransformer`を継承した子クラスである.解析していったら非常に複雑だった.なので,基本的には継承はせず,コールバック等を与えて使うのが正解だろう.解析したのでDartの勉強になった...
Stream<List<int>> content = File('someFile.txt').openRead();
List<String> lines = await content
.transform(utf8.decoder)
.transform(const LineSplitter())
.toList();