LoginSignup
6
5

More than 5 years have passed since last update.

RxJava 勉強会 Week2

Last updated at Posted at 2017-02-08
1 / 45

目次

  • 前回の復習
  • Observable
  • Operators
  • Scheduler
  • Tumpaca x RxJava

前回の復習


ReactiveX

  • Observable = 非同期プログラム × 複数のデータ
Integer[] numbers = {1, 2, 3, 4, 5};
Observable<Integer> observable
    = Observable.fromArray(numbers);
observable.subscribe(new Consumer<Integer>() {
    @Override 
    public void accept(Integer integer) throws Exception {
        System.out.println(integer);
    }
});

Observable


Observable, Observe, Subscribe

  • Observable: Item を発行(emit)する実体
  • Observer: Observable が発行した Item を受け取る実体
  • Subscribe: Observer が Observable を監視すること
    • 『Observe は Observable を Subscribe する』

Observable Contract

  • Observable と Observer の契約
  • ReactiveX ではこれらを Notification と呼ぶ

ref: http://reactivex.io/documentation/contract.html


Observable から Observer への Notification

  • OnNext(Item): Observable が Item を emit したときの通知
  • OnCompleted(): Observable が成功して、これ以上 Item を emit しない時の通知
  • OnError(Error): Observable がエラーで中断し、これ以上 Item を emit しない時の通知
  • OnSubscribe()(任意): Observable が Observer からの Request 通知(後述)を受け取る準備ができたときの通知
    • Backpressure と呼ばれる機能と関係する

RxJava: Observer インタフェース

  • interface Observer<T>
    • void onNext(T t);
    • void onError(Throwable e);
    • void onComplete();
    • void onSubscribe(Disposable d);

Observer から Observable への Notification

  • Subscribe: Observer 側が通知を受け取る準備ができたことを Observable に指示する
  • Unsubscribe: Observer 側が通知を受け取る必要がなくなったことを Observable に指示する
  • Request(任意): 一定数の OnNext 通知以上は必要ないことを Observable に通知する
    • Backpressure と呼ばれる機能と関係がある

RxJava

  • Subscribe
    • Observable.subscribe(Observer observer);
  • Unsubscribe
    • RxJava 1.x: Subscription.unsubscribe();
    • RxJava 2.x: Disposable.dispose();
  • Request
    • RxJava 1.x: Subscriber.request(int n);
    • RxJava 2.x: Subscription.request(int n);

イメージ図

スクリーンショット 2017-02-03 9.46.15.png


Sample Codes


Sample1 (1)

Observable の作成

Integer[] numbers = {1, 2, 3, 4, 5};
Observable<Integer> observable = 
    Observable.fromArray(numbers);

Sample1 (2)

Observer の作成

Observer<Integer> observer = new Observer<Integer>() {
    @Override public void onSubscribe(Disposable d) {
        System.out.println("onSubscribe(): " + d);
    }
    @Override public void onNext(Integer integer) {
        System.out.println("onNext(): " + integer);
    }
    @Override public void onError(Throwable e) {
        System.out.println("onError(): " + e);
    }
    @Override public void onComplete() {
        System.out.println("onComplete()");
    }
};

Sample1 (3)

Observable を subscribe

observable.subscribe(observer);

Sample1 (4)

実行結果

onSubscribe(): io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable@619a5dff
onNext(): 1
onNext(): 2
onNext(): 3
onNext(): 4
onNext(): 5
onComplete()

RxJava: Consumer と Action

  • interface Consumer<T>
    • void accept(T t) throws Exception;
  • interface Action
    • void run() throws Exception;
  • メモ
    • Java8 の Lambda を使うのに便利
    • Action と java.lang.Runnable との違い:Exception を throws 可

Sample2

onNext, onError, onComplete, onSubscribe を個別に作って subscribe

Consumer<Integer> onNext = new Consumer<Integer>() {
    @Override public void accept(Integer integer) throws Exception {
        System.out.println("onNext(): " + integer);
    }
};
Consumer<Throwable> onError = new Consumer<Throwable>() {
    @Override public void accept(Throwable e) throws Exception {
        System.out.println("onError(): " + e);
    }
};
Action onComplete = new Action() {
    @Override public void run() throws Exception {
        System.out.println("onComplete()");
    }
};
Consumer<Disposable> onSubscribe = new Consumer<Disposable>() {
    @Override public void accept(Disposable disposable) throws Exception {
        System.out.println("onSubscribe(): " + disposable);
    }
};
observable.subscribe(onNext, onError, onComplete, onSubscribe);

Sample3

ラムダで一気に subscribe
(メソッド参照で更に省略可)

observable.subscribe(
    integer -> {
        System.out.println("onNext(): " + integer);
    },
    e -> {
        System.out.println("onError(): " + e);
    },
    () -> {
        System.out.println("onComplete()");
    },
    disposable ->  {
        System.out.println("onSubscribe(): " + disposable);
    });

Sample4

3回目の OnNext で throw new RuntimeException()

Consumer<Integer> onNext = new Consumer<Integer>() {
    @Override public void accept(Integer integer) throws Exception {
        if (integer != null && integer == 3) {
            throw new RuntimeException("A number is three.");
        }
        System.out.println("onNext(): " + integer);
    }
};
Consumer<Throwable> onError = new Consumer<Throwable>() {
    @Override public void accept(Throwable e) throws Exception {
        System.out.println("onError(): " + e);
    }
};
observable.subscribe(onNext, onError, onComplete);

Sample4

実行結果

onNext(): 1
onNext(): 2
onError(): java.lang.RuntimeException: A number is three.

Notification Contracts

  • Observable は OnNext() 通知を 0 以上 Observer に送る
  • OnError() か OnCompleted() が送られたら、以降、通知は送られない
  • 通知がまったく送られない場合がある
    • この場合は Observable はまだ Active

Observable を作成

  • 1, 2, 3 を通知する Observable を作成

Observable とは

  • Observable = Observer に通知を送る実体
  • どのタイミングで通知を開始?
    • Observer が subscribe した時(Cold Observable)

⇒ Observer が Observable を Subscribe したタイミングで、
OnNext(1), OnNext(2), OnNext(3), OnCompleted() を通知するクラスを作る


Sample5

1, 2, 3 を返す Observable の作成

Observable<Integer> observable =
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
        }
    });

Sample5

実行結果

example5
onSubscribe(): null // なぜ?
onNext(): 1
onNext(): 2
onNext(): 3
onComplete()

Operators


Operator とは

  • Operator = 下図の四角い箱


注意事項

実装によって、

  • 名前が違う
  • 特別な Operator がある
  • 一定の Operator が存在しない

ということがあります。


Operator の種類

  • Observable を作成(Create, From)
  • Observable を変換(Map)
  • Observable をフィルタ(Filter)
  • などなど、、、

必要な時はここから探す
http://reactivex.io/documentation/operators.html


Operator Chain

  • Operator は Chain できる
  • Builder Pattern に似ている
    • しかし、順番が重要という点で Builder Patter と異なる
observable
  .skip(10)
  .take(5) 
  .map(i -> i + "transformed") 

Scheduler


Scheduler

  • 特定の Operator を違うスレッドで実行したい
    • ⇒ 実行したい Scheduler を Observable に指定する
  • 指定方法
    • SubscribeOn()
    • ObserveOn()

SubscribeOn()

  • Observable が動作する Scheduler を指定する
  • Operator Chain のどこで指定してもよい
  • 「ObservableOnSubscribe.subscribe() が実行されるスレッドの指定」と理解するとよい

ObserveOn()

  • Observable から Observer に通知する際の Scheduler を指定する
  • Operator Chain で指定した後ろの Operator に適用される
  • 「Observer のコールバックが実行されるスレッド」と理解するとよい

イメージ図


Sample6 (1)

Observable の作成

Observable<Integer> observable =
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
            System.out.println("subscribe() end");
        }
    });

Sample6 (2)

Observer の作成

Observer<Integer> observer = new Observer<Integer>() {
    @Override public void onSubscribe(Disposable d) {
        System.out.println("onSubscribe(): " + d);
    }
    @Override public void onNext(Integer n) {
        System.out.println("onNext(): " + n);
    }
    @Override public void onError(Throwable e) {
        System.out.println("onError(): " + e);
    }
    @Override public void onComplete() {
        System.out.println("onComplete()");
    }
};

Sample6 (3)

subscribeOn() と observeOn() を指定し subscribe()

observable
    .subscribeOn(Schedulers.computation())
    .observeOn(Schedulers.computation())
    .subscribe(observer);

Tumpaca x RxJava


Tumpaca x RxJava

  • reblog を RxJava 化

作業履歴


次回以降


学ぶべきこと

  • Hot and Cold
  • Backpressure
  • Implements Operators
6
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
5