Help us understand the problem. What is going on with this article?

動かして理解する!dartのStreamとrxdart!

More than 1 year has passed since last update.

Gakuです。
最近ネイティブ開発にはまってます!Flutterくっそ楽しい!

さてさて、FlutterといえばBLoCパターン。BLoCパターンを習得するにはリアクティブプログラミングですね!
リアクティブプログラミングといえば。。。そう。dartの基本ライブラリのStreamとStream拡張ライブラリのrxdartを習得しなければいけません。

まだまだFlutter関連は日本語文献少ないし、文献読んでいるだけではあまり理解度が得られなかったので、Stream周りの基本コードを一通り書いてみました。
今回はそんなお話。

書いたコード

書いたコードは以下に置いてます。この文献読んで理解できなかったらcloneして動かしてみれば幸せになれるかもしれません。
【GitHub】gaku360study-reactive-dart

動かして理解する!dart Streamとrxdart!

1.dartのStream

1.1 基本形

dartを触ってると「Streamってなんぞや!」って思うタイミングがかなり最初に訪れますw
発行する側と購読する側があって、発行する側が「ぽいっ」ってデータを投げると購読する側が検知して、なんらかの処理をする。そんな感じのイメージです。
購読しているコードと発行しているコードがどれだけ離れたところにあっても、データを伝搬できるので、めっちゃ便利です。
超簡易的なコードはこんな感じ。

study-reactive-dart/1.dart-stream/1.basic.dart
import 'dart:async';

var controller = StreamController<String>();

void main() {
  // listenさせます(購読する)
  koudokusya1();

  // streamを流します。(購読している人へ新聞配達する感じ)
  controller.sink.add('新聞だょ〜(´・ω・`)b');
  controller.sink.add('今日の夕刊新聞だょ〜(´・ω・`)b');
}

void koudokusya1() {
  controller.stream.listen((data){
    print('購読している人1');
    print(data);
  });
}

/* output
購読している人1
新聞だょ〜(´・ω・`)b
購読している人1
今日の夕刊新聞だょ〜(´・ω・`)b
*/

とまぁ、こんな感じで書けます。今回は簡単に1ファイルで記述しましたが、本来であれば遠く離れたところからデータ伝搬することになります。宇宙の理を無視したワームホールな感覚で利用できるので、超絶便利というわけです!

1.2 ブロードキャスト配信したい

こんな感じで書けると、1回の発行で複数の購読者に新聞を配達したくなってきます。
そういう時はStreamをこんな感じで書いてあげればOKです。

study-reactive-dart/1.dart-stream/3.broadcast.dart
StreamController<String> controller = StreamController<String>.broadcast();

【Outputと全体コード】
study-reactive-dart/1.dart-stream/3.broadcast.dart
1回の発行で複数の購読しているところへ新聞を配達できます。

1.3 データを加工して受け取りたい

データを受け取る時に加工を行うこともできます。

study-reactive-dart/1.dart-stream/5.transform.dart
import 'dart:async';

StreamController<String> _actionController = StreamController<String>();

StreamTransformer<String, String> transform() {
  return StreamTransformer<String, String>.fromHandlers(
    handleData: (value, sink) {
      sink.add(value+':ありがとうだょ〜');
    }
  );
}

void main() {
  // 購読
  _actionController.stream
  .transform(transform())
  .listen((value){
    print(value);
  });

  // 発行
  _actionController.sink.add('新聞だょ(´・ω・`)b');
}

/* output
新聞だょ(´・ω・`)b:ありがとうだょ〜
*/

受け取ったデータのバリデーションやエラーハンドリング等は、transform部分で記載してあげれば、スッキリしたコードが書けそうです。

1.4 Streamまとめ

これ以外にもStreamの使い方はいろいろあると思います。ただ、基本形はこんな感じです。
発行する側と購読する側がいて、ぽいってデータを投げれば購読側が検知して処理するよぉ〜ぐらいのニュアンスで良きかと思います。

2 rxdart

さて、ここからはrxdartのお話。rxdartはdartのStreamを拡張して、いろいろ便利にしてくれたライブラリです。
大きくわけると
PublishSubject
BehaviorSubject
ReplaySubject
があって、これらを使い分けれればOKだと思います。
とりあえず、rxdartのObservableを利用した書き方はこんな感じ。

study-reactive-dart/2.rxdart/Observable/1.basic/main.dart
mport 'dart:async';
import 'package:rxdart/rxdart.dart';

var controller = StreamController<String>();
var streamObservable = Observable(controller.stream);

void main(){
  // 購読
  streamObservable.listen(print);
  // 発行
  controller.add('test');
}

/* output
test
*/

Observableがrxdartライブラリのもので、streamを渡してあげることで使うことができます。
後述するsubject周りが強力なんで、Observableでこういった書き方はあまりしないかな?と思ってます。
とりあえず後述のために基本系を掲載しときます。

2.1 PublishSubjects

rxdartの1つ目「PublishSubjects」です。こいつは最もベーシックな動きをしてくれます。
とりあえず、コード見てみましょう。

study-reactive-dart/2.rxdart/PublishSubjects/1.basic/main.dart
import 'package:rxdart/rxdart.dart';

var subject = PublishSubject<String>();

void main() async {
  // 購読
  subject.listen((data){
    print('検知1------');
    print(data);
  });

  // 発行
  subject.add("Item1");
  subject.add("Item1");
  subject.add("Item2");
  subject.add("Item1");

  await Future.delayed(Duration(seconds: 5)); // このコードは検証用 5秒待つ
  print('【ここまで発行1回目】');
  // 購読
  subject.listen((data){
    print('検知2------');
    print(data);
  });
  subject.add("Item3");
  await Future.delayed(Duration(seconds: 5)); // このコードは検証用 5秒待つ
  print('【ここまで発行2回目】');
}

/* output
検知1------
Item1
検知1------
Item1
検知1------
Item2
検知1------
Item1
【ここまで発行1回目】
検知1------
Item3
検知2------
Item3
【ここまで発行2回目】
*/

わかりやすくするためにasync/await使ってdelay処理を入れてます。

PublishSubjectを利用することでStreamとObservableがなくなり、subjectの1つになりました!\(^o^)/パチパチ!
ちょっと複雑に見えるかもしれませんが、やってることは基本と同じです。
「検知1」のsubject.listenで購読して、subject.addで1回目の4つの発行を行っています。
2回目の発行では「検知1」と「検知2」でlistenしているので、両方に届けているといった感じです。

2.2 BehaviorSubject

rxdartの2つ目「BehaviorSubject」です。
こいつは発行を記憶してくれていて、listenしたタイミングで一番最後に発行したデータで処理を行うことができます。
とりあえず、コード見てみましょう!

study-reactive-dart/2.rxdart/BehaviourSubject/1.basic/main.dart
import 'package:rxdart/rxdart.dart';

var subject = BehaviorSubject<String>();

void main() async {
  // 購読
  subject.listen((data){
    print('検知1------');
    print(data);
  });

  // 発行
  subject.add("Item1");
  subject.add("Item1");
  subject.add("Item2");
  subject.add("Item3");

  await Future.delayed(Duration(seconds: 5)); // このコードは検証用 5秒待つ
  print('ここまで1回目の発行');

  // 購読
  subject.listen((data){
    print('検知2------');
    print(data);
  });
  await Future.delayed(Duration(seconds: 5)); // このコードは検証用 5秒待つ
}

/* output
検知1------
Item1
検知1------
Item1
検知1------
Item2
検知1------
Item3
ここまで1回目の発行
検知2------
Item3
*/

「検知1」まではPublishSubjectsと同じです。ただ、「検知2」は発行を実施していないのに処理が行われています。そう、BehaviorSubjectは最後に発行したデータを記憶してくれていて、listenが呼ばれたタイミングで処理を行います。いわばdo-while的なやつです。

はい。ややこしくなってきました。
動かしてみれば簡単なので、わからない場合はクローンして実行だ!

2.3 ReplaySubject

rxdartで最後に登場するのが「ReplaySubject」です。BehaviorSubjectは、最後に発行したデータを使ってlistenのタイミングで処理をしてくれますが、ReplaySubjectはこれまで発行したデータ全てを利用しlistenのタイミングで処理をしてくれるやつです。

はいはい。コードコード。

study-reactive-dart/2.rxdart/ReplaySubject/1.basic/main.dart
import 'package:rxdart/rxdart.dart';

var subject = ReplaySubject<String>();

void main() async {
  // 購読
  subject.listen((data){
    print('検知1------');
    print(data);
  });

  // 発行
  subject.add("Item1");
  subject.add("Item1");
  subject.add("Item2");
  subject.add("Item1");

  await Future.delayed(Duration(seconds: 5)); // このコードは検証用 5秒待つ
  print('ここまで発行1回目');
  // 購読
  subject.listen((data){
    print('検知2------');
    print(data);
  });
  await Future.delayed(Duration(seconds: 5)); // このコードは検証用 5秒待つ
}

/* output
検知1------
Item1
検知1------
Item1
検知1------
Item2
検知1------
Item1
ここまで発行1回目
検知2------
Item1
検知2------
Item1
検知2------
Item2
検知2------
Item1
*/

BehaviorSubjectと同様、発行していないのにlistenが処理されるパターンです。ただ、これまで4回の発行を行ってきて、「検知2」のlistenタイミングでも4回の処理が行われています。これが、ReplaySubjectです。
「BehaviorSubject」が理解できれば簡単です。

2.4 rxdartまとめ

お疲れさまでした。これがrxdartの基本です。動かしてみれば簡単ですが文献だと。。。言葉で説明がしにくすぎますw
今回解説したのはあくまで基本形で、ここからいろんな関数を利用することで様々な処理ができるみたいなので、下に記述します参考文献とか読むと幸せになれるかもです(´・ω・`)b

おわりに

さて、BLoCパターンに迷い込んではや1週間。。。そろそろFlutterでエンジニアハイに入り込めるところまでキャッチアップできたと思います。ここからコードぶち込んでfirebaseとの共存で爆速ハイブリッドネイティブアプリをリリースするとしたいと思います。

参考文献

GutuMeditation RxDart: Magical transformations of Streams
rx library

gaku3601
🎉筋肉系エンジニアになりたい系プログラマ٩( 'ω' )و🎉 好きなことで食っていく。(`・ω・´) twitterとかもやってますので、絡んでいただけると嬉しく思います♪
persol-pt
労働人口の減少が見込まれる現代において、 一人ひとりが生み出す付加価値を最大化することが、 日本における重要なテーマです。 わたしたちは、 人・プロセスデザイン・テクノロジーの力で、 お客さまのビジネスプロセスに変革をもたらし、 人と組織の生産性を高めていきます。 さらに、はたらくを楽しむことを大切にし、 「はたらいて、笑おう。」の世界を実現します。
https://www.persol-pt.co.jp/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away