3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

DartのStream/StreamTransformer

Last updated at Posted at 2022-09-16

DartのStreamとStreamTransformerを調査したときの話

Flutterを使い始めて疑問に思ったDartのStreamとStreamTransformerの話.ほぼ備忘録なのであまり需要はないかと思います

ストリームとは

ストリームとは,ファイルやネットワークから読み込むデータ,イベントなど,連続して送られてくるモノを抽象化した概念である(少なくとも今のところの認識は).Dartでは,Streamクラスとして抽象化され,ストリームに流れてくるデータやイベントを取得するできる.一方,Streamは読み出し専用であり,ストリームにデータを入れるには別の方法(後述するジェネレータやSink)を使う必要がある.

Streamを使ってみる

Streamからデータを受け取る方法は2つある.一つはawait forを使う方法.もう一つはlistenコールバックを使う方法
まず,await forを使ってみる

void f1() async {
  Stream<List<int>> content = File('input.txt').openRead();
  await for (final iv in content) {
    print(iv);
  }
  print("end");
}

input.txtには以下の文字列が入っている

abc
def

これを実行すると以下の結果を得る.

[97, 98, 99, 10, 100, 101, 102]
end

Streamは非同期で処理され,await forはStreamから流れ出てくるデータの終了を待つ.そのため"end"が最後に出力される.
次に,listenを使ってみる

void f11() async {
  Stream<List<int>> content = File('input.txt').openRead();
  content.listen(print);
  print("end");
}

これを実行すると以下の結果を得る.

end
[97, 98, 99, 10, 100, 101, 102]

listenはコールバックなので,Streamからデータが流れてきた時に実行される(非同期処理)ので,"end"が先に出力されている.
ちなみに,listenの引数はvoid onData(T event)?なので,TはList<int>である.よって,await forでループして出力しているのではなく,Listが一回だけ出力されている.もっと長いファイルを読み込ませるとループするはず.

Streamを作ってみる

Streamを作る方法は(現時点で知る限り)2つある.1つはジェネレータを使う方法,もう一つはStreamControllerを使う方法.

ジェネレータを使う

ジェネレータとは,Streamを作るためにDartが用意している仕組み.具体的には,async*yieldを使う

void f23() {
  Stream<int> _create() async* {
    for (int i = 0; i < 10; ++i) {
      yield i;
    }
  }
  Stream<int> stream = _create();
  stream.listen(print);
}

メソッド内部の_create関数がジェネレータ.yieldを使うことでストリームに値を与えている.yieldasync*がついた関数でなければ使うことができない.実行結果は省略

StreamControllerを使う

StreamControllerは,内部にStreamとSinkというプロパティを保持している.SinkはStreamに値を与えるaddメソッドを持つ

void f22() {
  var controller = StreamController<int>();
  Stream<int> stream = controller.stream;
  stream.listen(print);
  controller.sink.add(1);
  controller.sink.add(2);
  controller.sink.add(3);
}

実行結果は省略.controller.sink.addで,Streamに値を与えている.

StreamTransformerとは

Streamは非同期でやってくるイベントやデータで,await forlistenを使うことで値を取り出すことができる.取り出した値に対して何らかの加工をし,新たに別のストリームに流し込む,ために用いるのがStreamTransformerとなる.Javaのデコレータパターンやストリームと本質的には同じ.

StreamTransformerを作って使ってみる

StreamTransformerのクラス定義は

abstract class StreamTransformer<S, T>

で,Stream<S>からデータを受け取って,Stream<T>に流し込む
Dartでは,StreamTransformerを作るのに3つ方法が用意されている.

(1) const factory StreamTransformer(StreamSubSubscription<T> onListen(Stream<S> stream, bool cancelOnError))
(2) factory StreamTransformer.fromHandlers(
      {void handleData(S data, EventSink<T> sink)?,
      void handleError(Object error, StackTrace stackTrace, EventSink<T> sink)?,
      void handleDone(EventSink<T> sink)?})
(3) factory StreamTransformer.fromBind(Stream<T> Function(Stream<S>) bind)

(1)は,Stream<S>とbool型の引数をとり,StreamSubScription<T>を返す関数を引数に取るコンストラクタを利用する方法.具体的には次のように使う

void f5() {
  StreamSubscription<String> _onListen(Stream<int> input, bool cancelOnError) {
    late StreamSubscription<int> subscription;

    var controller = StreamController<String>( // 新たな出力ストリームを作る
        onPause: () {
          subscription.pause();
        },
        onResume: () {
          subscription.resume();
        },
        onCancel: () => subscription.cancel(),
        sync: true);

    subscription = input.listen((data) { // (a)ここで変換と値の書き込みを行う
      controller.add("$data-$data");
    },
        onError: controller.addError,
        onDone: controller.close,
        cancelOnError: cancelOnError);

    return controller.stream.listen(null); // (b)StreamSubscriptionを返す
  }

  var duplicator = StreamTransformer(_onListen);

  var controller = StreamController<int>();
  Stream<String> controllerStream = controller.stream.transform(duplicator); //StreamTransformerを適用
  controllerStream.listen(print);

  controller.add(1);
  controller.add(2);
  controller.add(3);
}

ちょっと複雑にみえるが,やっていることは,Stream<S>(今の場合Stream<int>)をlistenして値を変換し(a),変換後のStream.listenで返されるStreamSubScription<T>(今の場合,StreamSubscription<String>)を返す(b).そして,具体的な変換は_onListenの中でinput.listen受け取り,StreamSubScription<T>が返されるStreamに変換後(この場合,数値を文字に変換して-でくっつける)のデータを流し込む.

そして,作ったStreamTransformerは,Stream.transformの引数に与えることで,変換後のStream<T>が返される.あとは入力ストリームに値を流し込むと,変換後の値を受け取れる.
このコードを実行すると以下の結果を得る

1-1
2-2
3-3

(2)は,S型とEventSink<T>を引数に受け取り,Sを変換してEventSink<T>に値を書き込むコールバック関数,handleDataを用いる.(1)の方法よりも簡単.

void f3() {
  var controller = StreamController<num>();

  var streamTransformer = StreamTransformer<num, num>.fromHandlers(
      handleData: (num data, EventSink sink) {
    sink.add(data * 2);        // ここで変換している
  }, handleError: (Object error, StackTrace stacktrace, EventSink sink) {
    sink.addError("Something happen: $error");
  }, handleDone: (EventSink sink) {
    sink.close();
  });

  var controllerStream = controller.stream.transform(streamTransformer);
  controllerStream.listen(print);

  controller.sink.add(1);
  controller.sink.add(2);
  controller.sink.add(3);
}

これを実行すると以下の結果を得る

2
4
6

(3)は,Stream<S>型を受け取り,Stream<T>を返す関数を受け取る,fromBindを用いる方法

void f61() {
  Stream<String> _convert(Stream<int> inStream) {
    var controller = StreamController<String>();
    inStream.listen((int data) {
      controller.sink.add("$data"); // ここで変換している
    });

    return controller.stream;
  }

  var iToS = StreamTransformer.fromBind(_convert);
  var cont = StreamController<int>();
  Stream<String> outStream = cont.stream.transform(iToS);
  outStream.listen(print);
  cont.sink.add(10);
  cont.sink.add(20);
  cont.sink.add(30);
}

これもやってることはほとんど(2)と同じ.これを実行すると以下の結果を得る

10
20
30

StreamControllerとawait for

ジェネレータを用いる場合,最初の例のようにawait forを利用するとStreamの終了を待って次の処理を実行,といったことができる.一方,StreamControllerを用いる場合,Streamを終了させないとawait forがいつまでも待ち続けてしまう(大ハマリした).そこで,Streamを終了させるにはStreamController.closeを呼び出す.

Stream<int> makeCounter(int to) {
  var controller = StreamController<int>();
  int counter = 0;
  void tick(Timer timer) {
    counter++;
    controller.add(counter);
    if (counter >= to) {
      timer.cancel();
      controller.close(); // これを呼ばないとawait forが待つのをやめない
    }
  }

  Timer.periodic(Duration(seconds: 1), tick);
  return controller.stream;
}

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (final value in stream) {
    sum += value;
  }

  print("end of for");
  return sum;
}

void f7() async {
  var stream = makeCounter(5);
  var sum = await sumStream(stream);
  print("total = $sum");
}

Dart,良い言語な気がする

Socketに応用

この記事を参考に,Socketに対してStreamTransformerを適用してみた.Socketの定義は,

abstract class Socket implements Stream<Uint8List>, IOSink

なので,ストリームを流れるデータの型はUint8Listである(要するにバイナリ).これをUTF8の文字列として受け取りたい.そのためにはUTF8Decoderを使うのが便利だが,UTF8Decoderの定義は

class Utf8Decoder extends Converter<List<int>, String> 

であり,Uint8ListとListの変換が必要となる.そこで,これを変換するStreamTransformerを作成して適用してみた.今回は(3)の方法で.

socket_client.dart
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:typed_data';

void main() async {
  print("Hello Client");
  final socket = await Socket.connect('localhost', 8080);
  print('Connected to: ${socket.remoteAddress.address}:${socket.remotePort}');

  Stream<List<int>> _convert(Stream<Uint8List> inStream) {
    final controller = StreamController<List<int>>();
    inStream.listen((Uint8List ilist) {
      List<int> list = List<int>.from(ilist);
      controller.sink.add(list); // ここで変換
    });
    return controller.stream;
  }

  final trans = StreamTransformer.fromBind(_convert);
  socket.transform(trans).transform(utf8.decoder).listen((String line) {
    print("Server2: $line");
  }, onError: (error) {
    print(error);
    socket.destroy();
  }, onDone: () {
    print("Server left");
    socket.destroy();
  });

  await sendMessage(socket, 'Knock, knock.');
  await sendMessage(socket, 'Banana');
  await sendMessage(socket, 'Banana');
  await sendMessage(socket, 'Banana');
  await sendMessage(socket, 'Banana');
  await sendMessage(socket, 'Banana');
  await sendMessage(socket, 'Orange');
  await sendMessage(socket, "Orange you glad I didn't say banana again?");
}

//Future<void> sendMessage(Socket socket, String message) async {
Future<void> sendMessage(Socket socket, String message) {
  print("Client $message");
  socket.write(message);
  //return await Future.delayed(Duration(seconds: 1)); await不要
  return Future.delayed(Duration(seconds: 1));
}

サーバの方は参考にしたサイトそのままで,確認のため日本語を送っている.これを実行すると以下の結果を得る.
(追記)参考サイトでは,sendMessagereturnでawaitして結果を返していますが,よくよく考えるとこれは不要で
asyncも不要です.

Connected to: 127.0.0.1:8080
Client Knock, knock.
Client Banana
Server2: Who is there? どこ
Client Banana
Server2: Banana だれ
Client Banana
Server2: Banana だれ
Client Banana
Server2: Banana だれ
Client Banana
Server2: Banana だれ
Client Orange
Server2: Banana だれ
Client Orange you glad I didn't say banana again?
Server2: Orange だれ
Server2: Very funny.

ちゃんと日本語が出ている.ちなみに,オリジナルのコードで動かすと文字化けする.

余談

Dartのチュートリアルでは,次のようなコードでファイルを読み込み,UTF8に変換して1行区切りにしてリストに格納,などのようなことをしている.ここで,utf8.decoderLineSplitterStreamTransformerのインスタンスである.これらのクラスはStreamTransformerにコールバックなどを与えて生成しているのではなく,StreamTransformer`を継承した子クラスである.解析していったら非常に複雑だった.なので,基本的には継承はせず,コールバック等を与えて使うのが正解だろう.解析したのでDartの勉強になった...

Stream<List<int>> content = File('someFile.txt').openRead();
List<String> lines = await content
    .transform(utf8.decoder)
    .transform(const LineSplitter())
    .toList();
3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?