- 社内勉強会用の資料です
- 本資料作成時のRxSwift/RxJavaのバージョンは下記です
- RxSwift 4.0.0
- RxJava 2.1.6 (with RxKotlin 2.2.0)
本スライドの対象者
- Rxを聞いたことあるけど、まだ触ってない or 触っては見たけど普段は使ってない
- 下記のどれかの言語を読み書きできる
- Swift
- Kotlin
本スライドの目的
- 「Rxをやってみようかな!」と思って貰える
- 自分で勉強するときのポインタを提供する
Rxとは?
ReactiveX の略
An API for asynchronous programming
with observable streams
- 非同期プログラミング用のAPI
- オブザーバーパターンの拡張
- 全てをデータの流れ(ストリーム)として表現する
- ストリームに反応(リアクティブ)して処理をおこなうよう記述する
- 様々な言語向けに提供されいている
- Java, Kotlin, JavaScript, C#, Scala, Clojure, Swift, etc...
非同期処理を少ない行数で書ける!
実例1:インクリメンタルサーチ
let searchResults = searchBar.rx.text.orEmpty
.throttle(0.3, scheduler: MainScheduler.instance)
.distinctUntilChanged()
.flatMapLatest { query -> Observable<[Repository]> in
if query.isEmpty {
return .just([])
}
return searchGitHub(query)
.catchErrorJustReturn([])
}
.observeOn(MainScheduler.instance)
searchResults
.bind(to: tableView.rx.items(cellIdentifier: "Cell")) {
(index, repository: Repository, cell) in
cell.textLabel?.text = repository.name
cell.detailTextLabel?.text = repository.url
}
.disposed(by: disposeBag)
実例2:WebAPI
class MainActivity : AppCompatActivity() {
lateinit private var binding: ActivityMainBinding
private val disposable = CompositeDisposable()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
binding = DataBindingUtil.setContentView(this, R.layout.activity_main)
val retrofit = Retrofit.Builder()
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.baseUrl("https://api.github.com/")
.build()
val service = retrofit.create(GitHubService::class.java)
service.listRepos("tomoya0x00")
.subscribeOn(Schedulers.io())
.retry(3)
.observeOn(AndroidSchedulers.mainThread())
.doOnNext { binding.textView.text = it.joinToString("\n") { it.name } }
.subscribe()
.addTo(disposable)
}
override fun onDestroy() {
super.onDestroy()
disposable.dispose()
}
}
interface GitHubService {
@GET("users/{user}/repos")
fun listRepos(@Path("user") user: String): Observable<List<Repo>>
}
data class Repo(
val id: Int,
val name: String
)
RxJava+RetrofitでAPI通信周りを実装するうえで最低限の知識を30分で詰め込む
あと嬉しいこと
- iOSでもAndroidでも同じような書き方ができる
主要な登場人物
- Observable
- Operator
- Subject
- Scheduler
- Disposable
Observable
- Observeできる何か
- Subscribeすることでデータが流れてくる
// 1 2 3
Observable.create<Int> { e ->
e.onNext(1)
e.onNext(2)
e.onNext(3)
e.onComplete()
}.subscribe(::println)
// 1 2 3
Observable.fromArray(1, 2, 3)
.subscribe(::println)
// 2
Observable.just(2)
.subscribe(::println)
// 2
Observable.fromCallable({ 1 + 1 })
.subscribe(::println)
// 1 2 3
_ = Observable<Int>.create { observer in
observer.on(.next(1))
observer.on(.next(2))
observer.on(.next(3))
observer.on(.completed)
return Disposables.create()
}.subscribe(onNext: { print($0) })
// 1 2 3
_ = Observable.of(1, 2, 3)
.subscribe(onNext: { print($0) })
// 2
Observable.just(2)
.subscribe(onNext: { print($0) })
Operator
ストリームに流れてきたデータを加工するもの
(Observableを作成するものも含む)
// 2 20 4 40 40 6 60 60
Observable.fromArray(1, 2, 3)
.map { it * 2 }
.doOnNext(::println) // 2 4 6
.flatMap { Observable.just(it * 10) }
.doOnNext(::println) // 20 40 60
.filter { it >= 30 }
.doOnNext(::println) // 40 60
.subscribe()
val a = Observable.fromArray("a1", "a2", "a3", "a4")
val b = Observable.fromArray("b1", "b2", "b3")
// a1 a2 a3 a4 b1 b2 b3
Observable.merge(a, b)
.doOnNext(::println)
.subscribe()
// a1b1 a2b2 a3b3
Observable.zip(a, b, BiFunction { t1: String, t2: String -> t1 + t2 })
.doOnNext(::println)
.subscribe()
// a4b1 a4b2 a4b3
Observable.combineLatest(a, b, BiFunction { t1: String, t2: String -> t1 + t2 })
.doOnNext(::println)
.subscribe()
// 2 20 4 40 40 6 60 60
_ = Observable.of(1, 2, 3)
.map { $0 * 2 } // 2 4 6
.do(onNext: { print($0) })
.flatMap { Observable.just($0 * 10) }
.do(onNext: { print($0) }) // 20 40 60
.filter { $0 >= 30 }
.do(onNext: { print($0) }) // 40 60
.subscribe()
let a = Observable.of("a1", "a2", "a3", "a4")
let b = Observable.of("b1", "b2", "b3")
// a1 b1 a2 b2 a3 b3 a4
_ = Observable.merge(a, b)
.do(onNext: { print($0) })
.subscribe()
// a1b1 a2b2 a3b3
_ = Observable.zip(a, b) { $0 + $1 }
.do(onNext: { print($0) })
.subscribe()
// a1b1 a2b1 a2b2 a3b2 a3b3 a4b3
_ = Observable.combineLatest(a, b) { $0 + $1 }
.do(onNext: { print($0) })
.subscribe()
Subject
SubscriberとObservableの二つの機能を持つもの
val ps = PublishSubject.create<Int>()
ps.onNext(1)
ps.onNext(2)
ps.subscribe(::println)
ps.onNext(3) // 3
ps.onComplete()
val bs = BehaviorSubject.create<Int>()
bs.onNext(1)
bs.onNext(2)
bs.subscribe(::println) // 2
bs.onNext(3) // 3
bs.onComplete()
val rs = ReplaySubject.create<Int>()
rs.onNext(1)
rs.onNext(2)
rs.subscribe(::println) // 1 2
rs.onNext(3) // 3
rs.onComplete()
let ps = PublishSubject<Int>()
ps.on(.next(1))
ps.on(.next(2))
ps.subscribe(onNext: { print($0) })
ps.on(.next(3)) // 3
ps.on(.completed)
let bs = BehaviorSubject<Int>(value: 1)
bs.on(.next(2))
bs.subscribe(onNext: { print($0) }) // 2
bs.on(.next(3)) // 3
bs.on(.completed)
let rs = ReplaySubject<Int>.create(bufferSize: 10)
rs.on(.next(1))
rs.on(.next(2))
rs.subscribe(onNext: { print($0) }) // 1 2
rs.on(.next(3)) // 3
rs.on(.completed)
Scheduler
どのスレッドでSubscribe/Observeするか指定するもの
(別スレッドでネットワーク通信して、メインスレッドでUIに反映など)
http://reactivex.io/documentation/scheduler.html
Observable.just("hoge") // on io Scheduler
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.map { "$it fuga" } // on computation Scheduler
.observeOn(AndroidSchedulers.mainThread())
.doOnNext { // on mainThread
binding.textView.text = it
}.subscribe()
let scheduler1 = SerialDispatchQueueScheduler(qos: .default)
let scheduler2 = ConcurrentDispatchQueueScheduler(qos: .default)
_ = Observable.just("hoge") // on scheduler1
.subscribeOn(scheduler1)
.observeOn(scheduler2)
.map { "\($0) fuga" } // on scheduler2
.observeOn(MainScheduler.instance)
.do(onNext: { [unowned self] in // MainScheduler
self.textLabel.text = $0
}).subscribe()
// with RxCocoa
_ = Observable.just("hoge") // on scheduler1
.subscribeOn(scheduler1)
.observeOn(scheduler2)
.map { "\($0) fuga" } // on scheduler1
.asDriver(onErrorDriveWith: Driver.empty())
.drive(textLabel.rx.text) // MainScheduler
Disposable
リソース開放用のインターフェース
subscribeの戻り値で、これをdisposeするとリソースが開放される
(メモリリーク防止のため、Activity/Fragment/ViewControllerのライフサイクルに合わせて開放するなど)
val hello = Observable.just("hello").delay(10, TimeUnit.SECONDS)
val disposable = hello.subscribe()
println(disposable.isDisposed) // false
disposable.dispose()
println(disposable.isDisposed) // true
let hello = Observable.just("hello").delay(10, scheduler: MainScheduler.instance)
let disposable: Disposable = hello.subscribe()
disposable.dispose()
Disposableの実用例(iOS)
DisposeBagでまとめて管理する
Dispose bags are used to return ARC like behavior to RX.
When a DisposeBag is deallocated, it will call dispose on each of the added disposables.
class ViewController: UIViewController {
@IBOutlet weak var textLabel: UILabel!
private let disposeBag = DisposeBag()
override func viewDidLoad() {
super.viewDidLoad()
disposeBag.insert(showAfter(seconds: 5))
disposeBag.insert(showAfter(seconds: 10))
disposeBag.insert(showAfter(seconds: 15))
}
private func showAfter(seconds: RxTimeInterval) -> Disposable {
return Observable.just("\(seconds) seconds")
.delay(seconds, scheduler: MainScheduler.instance)
.asDriver(onErrorDriveWith: Driver.empty())
.drive(self.textLabel.rx.text)
}
}
Disposableの実用例(Android)
CompositeDisposableでまとめて管理する
A disposable container that can hold onto multiple other disposables and offers O(1) add and removal complexity.
class MainActivity : AppCompatActivity() {
lateinit private var binding: ActivityMainBinding
private val disposable = CompositeDisposable()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
binding = DataBindingUtil.setContentView(this, R.layout.activity_main)
disposable.add(
showAfter(5)
)
disposable.addAll(
showAfter(10),
showAfter(15)
)
}
override fun onDestroy() {
super.onDestroy()
disposable.dispose()
}
private fun showAfter(seconds: Long): Disposable {
return Observable.just("$seconds seconds")
.delay(seconds, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.doOnNext {
binding.textView.text = it
}.subscribe()
}
}
エラーハンドリング
- subscribe(onError: {})でハンドリングするか、別の値かストリームに変換する
- リトライも可能(retryWhen()で細かい条件も実現可)
val o = Observable.create<Int> { e ->
e.onNext(1)
e.onNext(2)
e.onError(Throwable("hoge"))
e.onNext(3)
e.onComplete()
}
// 1 2 io.reactivex.exceptions.OnErrorNotImplementedException: hoge
o.subscribe(::println)
// 1 2 error!(java.lang.Throwable: hoge)
o.subscribe(::println, // onNext
{ println("error!($it)") }) // onError
// 1 2 error!(java.lang.Throwable: hoge) io.reactivex.exceptions.OnErrorNotImplementedException: hoge
o.doOnError { println("error!($it)") }.subscribe(::println)
// 1 2 1 2 io.reactivex.exceptions.OnErrorNotImplementedException: hoge
o.retry(2).subscribe(::println)
// 1 2 0
o.onErrorReturn { 0 }.subscribe(::println)
// 1 2 10 20
o.onErrorResumeNext(Observable.fromArray(10, 20)).subscribe(::println)
// 1 2
o.onErrorResumeNext(Observable.empty()).subscribe(::println)
let o = Observable<Int>.create { observer in
observer.on(.next(1))
observer.on(.next(2))
observer.on(.error(NSError(domain: "hoge", code: 0)))
observer.on(.next(3))
observer.on(.completed)
return Disposables.create()
}
// 1 2 Unhandled error happened: Error Domain=hoge Code=0 "(null)"
_ = o.subscribe(onNext: { print($0) })
// 1 2 error!(Error Domain=hoge Code=0 "(null)")
_ = o.subscribe(onNext: { print($0) },
onError: { print("error!(\($0))") })
// 1 2 error!(Error Domain=hoge Code=0 "(null)") Unhandled error happened: Error Domain=hoge Code=0 "(null)"
_ = o.do(onError: { print("error!(\($0))") }).subscribe(onNext: { print($0) })
// 1 2 1 2 Unhandled error happened: Error Domain=hoge Code=0 "(null)"
_ = o.retry(2).subscribe(onNext: { print($0) })
// 1 2 0
_ = o.catchErrorJustReturn(0).subscribe(onNext: { print($0) })
// 1 2 10 20
_ = o.catchError { _ in Observable.of(10, 20) } .subscribe(onNext: { print($0) })
// 1 2
_ = o.catchError { _ in Observable.empty() } .subscribe(onNext: { print($0) })
関連用語
-
Backpressure
- 流速制御の仕組み
- 参考:詳解RxJava2:Backpressureで流速制御
-
Hot / Cold observables
- Observableの性質
- Coldはsubscribeされるまで動作しない&subscribeした回数分、ストリーム生成される
- 大体のOperatorはこちら
- Hotはsubscribeに関係なく動作する&ストリームは一つのみ
- Subjectはこちら
- Cold->Hot変換できる
- Coldはsubscribeされるまで動作しない&subscribeした回数分、ストリーム生成される
- 参考:今日こそ理解するHot / Cold @社内RxSwift勉強会
- Observableの性質
- Single
- 結果は成功(with an item)か失敗
- JavaScriptのPromise的なもの
- Completable
- 結果は完了(with no item)か失敗
- reactive runnable
- Maybe
- 結果は成功(with an item)か完了(with no item)か失敗
- reactive optional
- Flowable
- BackpressureありのObservable
- Processor
- BackpressureありのSubject
次に見るべき資料
- 公式Webサイト
- ReactiveXのドキュメントにある図の見方の解説
- Jake神によるRxJava親切丁寧説明スライド