Posted at

ObserverパターンとHelloWorldからはじめるRxJava

More than 3 years have passed since last update.


はじめに

初投稿です.この文章はRxJavaについての入門記事です.Javaの文法をある程度習得しており,RxJavaに興味があるけどアプローチが難しく感じている人に向けて書きました.

まずは概観をとらえて,HelloWorldをじっくり味わうことを目的としています.

参考


Observerパターン

RxJavaはObserverパターンの拡張です.ObserverパターンはGoFのデザインパターンにも登場するほど非常に一般的なものです.「オブジェクトの状態変化をそれに依存する他のオブジェクトたちが監視し,通知があったら各自が自動的に適切な振る舞いをしてほしい」というのが基本的なアイディアです.Javaではこれをinterfaceを使って実現します.

関心がある状態変化イベントをもつ対象をObservableと呼び,そのイベントを監視する対象をObserverと呼びます.java.utilパッケージにもObserverインターフェースObservableクラスがあります.

java.util.Observerインターフェースはupdate()メソッドを定義しています.状態変化を知りたい対象にはObserverインターフェースを実装してObservableに登録しておきます.そして,状態が変化したときに,Observableは登録してある全てのObserverに対してupdate()メソッドを呼び出して変更の通知を行います.

ObservableObserver#update()を呼び出しますが,中身の実装については何一つ知らないという点が重要なポイントです.また,updateメソッドの引数にはObservableが含まれている点もポイントです.イベントが発生したという事実を通知するだけでなく,状態に関する情報も一緒に渡すことで,Observerたちは所望の処理を実装できるわけです.


とにもかくにもHello World

RxJavaのHelloWorldのプログラムを見てみます.(RxJavaのGitHub wikiページより)

import rx.Observable;

import rx.functions.Action1;

public class HelloWorld {

public static void main(String[] args) {
String[] names = {"Ben", "George"};

Observable.from(names).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("Hello, " + s + "!");
}
});
}
}

ここでの手続きをざっくり説明すると以下のようになります.


  1. Observableを生成する.

  2. Observerを生成する.

  3. ObservableにObserverを登録する (subscribeする).

これを先ほどのObserverパターンの観点から見るために,ちょっとだけ書き換えてみます.

public static void main(String[] args) {

String[] names = {"Ben", "George"};

// Step1. 配列からObservableを生成します
Observable observable = Observable.from(names);

// Step2. Observerを生成します
Action1<String> observer = new Action1<String>() {
@Override
public void call(String s) {
System.out.println("Hello, " + s + "!");
}
};

// Step3. ObservableにObserverをセットします (subscribeする)
observable.subscribe(observer);
}


Step1. Observableを生成する

ここでのObservableは文字列ストリームです.先ほどのObserverパターンの説明では状態変化とイベントにフォーカスして説明していましたがObervableはとても抽象的なモノで,様々な対象をObservableに出来ます.ちょっとわかりづらいかもしれませんが「String配列から,"文字列のストリーム"をつくられた」くらいに思ってくれたらOKです.

「様々な対象をObservableに出来る」と述べましたように,RxJavaにはObservableを生成する方法は多数用意されていますObservable.create(), Observable.from()Observable.just()などなど.


Step2. Observerを生成する

ObserverはObservableの変化に対して振る舞うモノです.これも抽象的なもので,ここではAction1というクラスがこれに該当します.Action1rx.Observerインターフェースを継承していません.Observerパターンにあてはめて説明しているため,ここでちょっとズレが起きてしまっていますが今のレベルでは支障はありません.「RxJavaには"Observableの変化に対して振る舞うモノ"が複数ある.ObserverFunctionActionなどがそれである」くらいに思って下さい.ちなみにFunctionActionの違いはcall()メソッドの戻り値の有無です.

それからこのAction1なんですが,「1ってなんやねん」とお思いの方.これはcall()メソッドの引数の数によってAction0からAction9まで用意されています.Javaは型が厳格なため,こういった実装は致し方ないのです.(※ 型を書くのが面倒くさくなったらretrolambdaなどの利用を検討して下さい.)


Step3. Observableをsubscribeする

ObservableObserver.登場人物が整いました.あとはsubscribe()メソッドでObservableObserverを登録します.subscribeメソッドも引数に応じて多数用意されています.今回呼び出しているのはAction1をひとつ受け取るsubscribeメソッドです.

実行結果

Hello, Ben!

Hello, George!


Observerパターンとの違い

ReactiveプログラミングはObserverパターンと以下の点がパワーアップしています.


  • Completion/Errorハンドリング

  • イベントの合成やフィルタを関数で行える etc...

今回はほんのちょっとだけ,HelloWorldプログラムからこれらを覗いてみます.


Completion/Errorハンドリング

ここで,先ほどのHelloWorldプログラムではは利用しなかったrx.Observerインターフェースを見てみます.

package rx;

public interface Observer<T> {

public abstract void onCompleted();

public abstract void onError(Throwable e);

public abstract void onNext(T t);

}

rx.Observerには3つのメソッドがあります (java.util.Observerにはupdate()メソッド1つしかありませんでしたが) .処理が正常に完了したことと,途中でエラーが起こったことを通知できるような仕組みが用意されているわけです.

HelloWorldプログラムのObserverをrx.functions.Action1からrx.Observerに置き換えます.

public static void main(String[] args) {

String[] names = {"Ben", "George"};

Observable.from(names).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("completed!");
}

@Override
public void onError(Throwable e) {
System.out.println("error... " + e.getMessage());
}

@Override
public void onNext(String s) {
System.out.println("Hello, " + s + "!");
}
});
}

実行結果

Hello, Ben!

Hello, George!
completed!

予想通りの結果です.エラーは起こらずに正常終了するのでonError()は呼ばれません.

onNext()メソッドを書き換えてエラーを起こしてみます.文字列ストリームにGeorgeがきたら例外をthrowするようにしました.

@Override

public void onNext(String s) {
if (s.equals("George")) {
throw new RuntimeException("Oh my god..");
}
System.out.println("Hello, " + s + "!");
}

実行結果

Hello, Ben!

error... Oh my god..

エラーハンドリングについてはここで扱うトピックとしては大きすぎるのでこのくらいにしときます.(私も模索中…)


関数でマップやフィルタ

Observableはマップやフィルタや合成などの加工が関数で出来るという素晴らしい機能を持っています.

加工を行うオペレーター関数は非常にたくさん用意されています.

まずは最も基本的と思われるものを抜粋して列挙します.


  • map()

  • flatMap()

  • filter()

  • take()

関数型プログラミングではお馴染みの役者たちです.まとめて利用したサンプルプログラムが以下です.

public static void main(String[] args) {

String[] names = {"Ms.Alice", "Mr.Ben", "Mr.Carlos", "Mr.Dylan", "Ms.Ellie"};

Observable.from(names)
.map(new Func1<String, String>() {
@Override
public String call(String name) {
return name.toUpperCase();
}
})
//-> {"MS.ALICE", "MR.BEN", "MR.CARLOS", "MR.DYLAN", "MS.ELLIE"}
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String name) {
return Observable.from(name.split("\\."));
}
})
//-> {"MS", "ALICE", "MR", "BEN", "MR", "CARLOS", "MR", "DYLAN", "MS", "ELLIE"}
.filter(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return !(s.equals("MR") || s.equals("MS"));
}
})
//-> {"ALICE", BEN", "CARLOS", "DYLAN", "ELLIE"}
.take(3)
//-> {"ALICE", BEN", "CARLOS"}
.subscribe(new Action1<String>() {
@Override
public void call(String name) {
System.out.println("Hello, " + name + "!");
}
});
}

流れは以下です.



  1. map()で文字列を大文字にする.


  2. flatMap() で敬称と名前を分離する.


  3. filter()でMrとMsをフィルタする.


  4. take()で先頭の3つを取得する.

実行結果

Hello, ALICE!

Hello, BEN!
Hello, CARLOS!


おわりに

GoFのObserverパターンから出発して,RxJavaのHelloWorldを動かしてみました.

Observableを自在に操れるようになるまでの道のりはまだまだ長いです...