LoginSignup
48
47

More than 5 years have passed since last update.

RxJava-Groovy でリアクティブプログラミング

Last updated at Posted at 2013-12-19

G* Advent Calendar 20日目のエントリです。

世間で注目を集めているらしいリアクティブプログラミングというのがどういうものか知りたくて、少しかじってみました。

リアクティブとは?

まず概念については The Reactive Manifesto という文書があります。その中で書かれているように、リアクティブなプログラムでは以下のことが大事だそうです。

  • React to events (event-driven)
  • React to load (scalable)
  • React to failures (resilient)
  • React to users (responsive)

イベント駆動でスケーラブルかつ耐障害性のあるプログラムを作ればユーザはレスポンスよく使えるよ、ってことでしょうか。いまどきのアプリケーションはそうあるべきでしょうが、これだけだとモヤモヤしていてイメージが湧きません。

RxJava

以下ではJava/Groovyで使えるリアクティブプログラミングのライブラリとして Netflix の RxJava を使って具体的に考えてみます。ドキュメントはこちらで、APIはこちら

まずリアクティブプログラミングでは Observable の概念が重要ということで、これは時間を含んだイベントのストリームを扱うコレクションと考えられます。非同期処理で将来の値を表す Future の概念がコレクションに拡張されたイメージでしょうか。通常のコレクションに対する処理が値を取ってきて何かするというプル型であるのに対して、Observableでは値が生成された時点であらかじめ登録された処理が実行されるというプッシュ型であるという点が異なります。

またObservableの操作はプリミティブなものをつないで複雑な操作を組み立てられるよう合成可能になっています。このことを指してRxJavaでは Functional Reactive Programming と呼んでいます。Observableの操作に関する詳細はこちら

プログラム

Observable

まずはメインスレッドで同期的処理を行うObservableを使った簡単なプログラムを見てみます。Observableの基本的な使い方はsubscribeメソッドにObserverやクロージャを渡して、値の生成時に呼び出す処理を指定することです。

SyncObservableSample.groovy
@Grab(group='com.netflix.rxjava', module='rxjava-groovy', version='0.15.1')
import static java.util.concurrent.TimeUnit.*
import rx.Observable
import rx.concurrency.CurrentThreadScheduler

Observable.interval(1, SECONDS, CurrentThreadScheduler.getInstance())
    .filter { it % 2 == 1 } 
    .map { "#" + it }
    .subscribe { println it }

println "Hoge"

上記でObservable#interval は1秒ごとにインクリメントされる整数値のストリームを生成します。スケジューラとして CurrentThreadSchedulerを指定しているので、値の生成はメインスレッド上で同期的に行われます。Observable#interval で作られたObservableに対して、filterによるフィルタリングとmapによる値の変換を行い、最後にsubscribeで値を出力するためのクロージャを渡しています。この実行結果は下記のようになり、プログラムを止めない限り表示が続きます。

stdout
#1
#3
#5
#7
#9
...

次に別スレッドで非同期処理を行うObservableを見てみます。

AsyncObservableSample.groovy
@Grab(group='com.netflix.rxjava', module='rxjava-groovy', version='0.15.1')
import static java.util.concurrent.TimeUnit.*
import rx.Observable
import rx.concurrency.NewThreadScheduler

Observable.interval(1, SECONDS, NewThreadScheduler.getInstance())
    .filter { it % 2 == 1 } 
    .map { "#" + it }
    .subscribe { println it }

println "Hoge"

Thread.sleep 10000 // To prevent the program from quitting immediately

今回はスケジューラとしてNewThreadSchedulerを指定しているので、値の生成は別スレッドにて非同期で行われます。Observableが同期か非同期かに関わらず、filter/map/subscribe等の処理が全く同じように書けるところが良いですね。出力結果は次のようになります。

stdout
Hoge
#1
#3
#5
#7
#9

このようにObservableを使うとデータ生成の同期・非同期を操作から隠蔽できますし、生成されたデータに対する各種の操作をチェーンさせて簡潔に記述することができます。Netflixが内部で利用しているシステムでは、この例のように多くのAPIがObservableを返すよう抽象化されているそうです。

ちなみに通常のObservableではsubscribeメソッドが呼ばれた時点で値の生成が始まりますが、publishメソッドで ConnectableObservable を生成すれば、subscribeメソッドが呼ばれた時点ではなくconnectメソッドが呼ばれた時点で値の生成が開始されるようになります。これは複数のObserverをObservableに登録して、その後ある時点で一斉に処理を走らせたいような場合に有効です。

Subject

SubjectはObservableでもありObserverでもあるクラスですので、Observableをつないだ処理を書くことができます。下記ではintervalを使ったデータソースであるObservableに対して、filter/mapを行うObservableをObserverとして登録しています(Subject)。Subjectはデータ生成方法の違いによっていくつかの種類がありますので、詳細はこちらを参照してください。

SubjectSample.groovy
@Grab(group='com.netflix.rxjava', module='rxjava-groovy', version='0.15.1')
import static java.util.concurrent.TimeUnit.*
import rx.Observable
import rx.concurrency.NewThreadScheduler
import rx.subjects.PublishSubject

def subject = PublishSubject.create()
subject.filter { it % 2 == 1 } 
    .map { "#" + it }
    .subscribe { println it }

Observable.interval(1, SECONDS, NewThreadScheduler.getInstance())
    .subscribe(subject)

println "DONE"

Thread.sleep 10000

まとめ

RxJavaを使ったからといってすぐに The Reactive Manifesto が言うようなリアクティブなプログラムが作れるわけではありませんが、Observableによるストリームデータの抽象化は確かに強力だと思いました。まだまだ理解が浅いので引き続き触ってみます。

なおリアクティブプログラミングについては Coursera の Principles of Reactive Programming という授業も参考になります。Scala作者のMartin Odersky氏が講師をしています。

48
47
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
48
47