38
32

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

RxSwift/RxJava入門

Last updated at Posted at 2017-11-14
1 / 20
  • 社内勉強会用の資料です
  • 本資料作成時のRxSwift/RxJavaのバージョンは下記です
    • RxSwift 4.0.0
    • RxJava 2.1.6 (with RxKotlin 2.2.0)

本スライドの対象者

  • Rxを聞いたことあるけど、まだ触ってない or 触っては見たけど普段は使ってない
  • 下記のどれかの言語を読み書きできる
    • Swift
    • Kotlin

本スライドの目的

  • 「Rxをやってみようかな!」と思って貰える
  • 自分で勉強するときのポインタを提供する

Rxとは?

ReactiveX の略

An API for asynchronous programming
with observable streams

  • 非同期プログラミング用のAPI
  • オブザーバーパターンの拡張
  • 全てをデータの流れ(ストリーム)として表現する
  • ストリームに反応(リアクティブ)して処理をおこなうよう記述する
  • 様々な言語向けに提供されいている
    • Java, Kotlin, JavaScript, C#, Scala, Clojure, Swift, etc...

非同期処理を少ない行数で書ける!


実例1:インクリメンタルサーチ

ViewController.swift
let searchResults = searchBar.rx.text.orEmpty
    .throttle(0.3, scheduler: MainScheduler.instance)
    .distinctUntilChanged()
    .flatMapLatest { query -> Observable<[Repository]> in
        if query.isEmpty {
            return .just([])
        }
        return searchGitHub(query)
            .catchErrorJustReturn([])
    }
    .observeOn(MainScheduler.instance)

searchResults
    .bind(to: tableView.rx.items(cellIdentifier: "Cell")) {
        (index, repository: Repository, cell) in
        cell.textLabel?.text = repository.name
        cell.detailTextLabel?.text = repository.url
    }
    .disposed(by: disposeBag)


実例2:WebAPI

MainActivity.kt
class MainActivity : AppCompatActivity() {

    lateinit private var binding: ActivityMainBinding
    private val disposable = CompositeDisposable()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        binding = DataBindingUtil.setContentView(this, R.layout.activity_main)

        val retrofit = Retrofit.Builder()
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .addConverterFactory(GsonConverterFactory.create())
                .baseUrl("https://api.github.com/")
                .build()

        val service = retrofit.create(GitHubService::class.java)

        service.listRepos("tomoya0x00")
                .subscribeOn(Schedulers.io())
                .retry(3)
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext { binding.textView.text = it.joinToString("\n") { it.name } }
                .subscribe()
                .addTo(disposable)        
    }

    override fun onDestroy() {
        super.onDestroy()
        disposable.dispose()
    }
}

interface GitHubService {
    @GET("users/{user}/repos")
    fun listRepos(@Path("user") user: String): Observable<List<Repo>>
}

data class Repo(
        val id: Int,
        val name: String
)

RxJava+RetrofitでAPI通信周りを実装するうえで最低限の知識を30分で詰め込む


あと嬉しいこと

  • iOSでもAndroidでも同じような書き方ができる

主要な登場人物

  • Observable
  • Operator
  • Subject
  • Scheduler
  • Disposable

Observable

  • Observeできる何か
  • Subscribeすることでデータが流れてくる
main.kt
// 1 2 3
Observable.create<Int> { e ->
    e.onNext(1)
    e.onNext(2)
    e.onNext(3)
    e.onComplete()
}.subscribe(::println)

// 1 2 3
Observable.fromArray(1, 2, 3)
        .subscribe(::println)

// 2
Observable.just(2)
        .subscribe(::println)

// 2
Observable.fromCallable({ 1 + 1 })
        .subscribe(::println)
main.swift
// 1 2 3
_ = Observable<Int>.create { observer in
    observer.on(.next(1))
    observer.on(.next(2))
    observer.on(.next(3))
    observer.on(.completed)
    return Disposables.create()
    }.subscribe(onNext: { print($0) })

// 1 2 3
_ = Observable.of(1, 2, 3)
    .subscribe(onNext: { print($0) })

// 2
Observable.just(2)
    .subscribe(onNext: { print($0) })  

Operator

ストリームに流れてきたデータを加工するもの
(Observableを作成するものも含む)

main.kt
// 2 20 4 40 40 6 60 60
Observable.fromArray(1, 2, 3)
        .map { it * 2 }
        .doOnNext(::println) // 2 4 6
        .flatMap { Observable.just(it * 10) }
        .doOnNext(::println) // 20 40 60
        .filter { it >= 30 }
        .doOnNext(::println) // 40 60
        .subscribe()

val a = Observable.fromArray("a1", "a2", "a3", "a4")
val b = Observable.fromArray("b1", "b2", "b3")

// a1 a2 a3 a4 b1 b2 b3
Observable.merge(a, b)
        .doOnNext(::println)
        .subscribe()

// a1b1 a2b2 a3b3
Observable.zip(a, b, BiFunction { t1: String, t2: String -> t1 + t2 })
        .doOnNext(::println)
        .subscribe()

// a4b1 a4b2 a4b3
Observable.combineLatest(a, b, BiFunction { t1: String, t2: String -> t1 + t2 })
        .doOnNext(::println)
        .subscribe()
main.swift
// 2 20 4 40 40 6 60 60
_ = Observable.of(1, 2, 3)
    .map { $0 * 2 } // 2 4 6
    .do(onNext: { print($0) })
    .flatMap { Observable.just($0 * 10) }
    .do(onNext: { print($0) }) // 20 40 60
    .filter { $0 >= 30 }
    .do(onNext: { print($0) }) // 40 60
    .subscribe()

let a = Observable.of("a1", "a2", "a3", "a4")
let b = Observable.of("b1", "b2", "b3")

// a1 b1 a2 b2 a3 b3 a4
_ = Observable.merge(a, b)
    .do(onNext: { print($0) })
    .subscribe()

// a1b1 a2b2 a3b3
_ = Observable.zip(a, b) { $0 + $1 }
    .do(onNext: { print($0) })
    .subscribe()

// a1b1 a2b1 a2b2 a3b2 a3b3 a4b3
_ = Observable.combineLatest(a, b) { $0 + $1 }
    .do(onNext: { print($0) })
    .subscribe()

Subject

SubscriberとObservableの二つの機能を持つもの

main.kt
val ps = PublishSubject.create<Int>()

ps.onNext(1)
ps.onNext(2)

ps.subscribe(::println)

ps.onNext(3) // 3
ps.onComplete()

val bs = BehaviorSubject.create<Int>()

bs.onNext(1)
bs.onNext(2)

bs.subscribe(::println) // 2

bs.onNext(3) // 3
bs.onComplete()

val rs = ReplaySubject.create<Int>()

rs.onNext(1)
rs.onNext(2)

rs.subscribe(::println) // 1 2

rs.onNext(3) // 3
rs.onComplete()
main.swift
let ps = PublishSubject<Int>()

ps.on(.next(1))
ps.on(.next(2))

ps.subscribe(onNext: { print($0) })

ps.on(.next(3)) // 3
ps.on(.completed)

let bs = BehaviorSubject<Int>(value: 1)

bs.on(.next(2))

bs.subscribe(onNext: { print($0) }) // 2

bs.on(.next(3)) // 3
bs.on(.completed)

let rs = ReplaySubject<Int>.create(bufferSize: 10)

rs.on(.next(1))
rs.on(.next(2))

rs.subscribe(onNext: { print($0) }) // 1 2

rs.on(.next(3)) // 3
rs.on(.completed)

Scheduler

どのスレッドでSubscribe/Observeするか指定するもの
(別スレッドでネットワーク通信して、メインスレッドでUIに反映など)


http://reactivex.io/documentation/scheduler.html

main.kt
Observable.just("hoge") // on io Scheduler
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.computation())
        .map { "$it fuga" } // on computation Scheduler
        .observeOn(AndroidSchedulers.mainThread())
        .doOnNext { // on mainThread 
            binding.textView.text = it
        }.subscribe()
ViewController.swift
let scheduler1 = SerialDispatchQueueScheduler(qos: .default)
let scheduler2 = ConcurrentDispatchQueueScheduler(qos: .default)

_ = Observable.just("hoge") // on scheduler1
    .subscribeOn(scheduler1)
    .observeOn(scheduler2)
    .map { "\($0) fuga" } // on scheduler2
    .observeOn(MainScheduler.instance)
    .do(onNext: { [unowned self] in // MainScheduler
        self.textLabel.text = $0
    }).subscribe()

// with RxCocoa
_ = Observable.just("hoge")  // on scheduler1
    .subscribeOn(scheduler1)
    .observeOn(scheduler2)
    .map { "\($0) fuga" }  // on scheduler1
    .asDriver(onErrorDriveWith: Driver.empty())
    .drive(textLabel.rx.text) // MainScheduler

Disposable

リソース開放用のインターフェース
subscribeの戻り値で、これをdisposeするとリソースが開放される
(メモリリーク防止のため、Activity/Fragment/ViewControllerのライフサイクルに合わせて開放するなど)

main.kt
val hello = Observable.just("hello").delay(10, TimeUnit.SECONDS)
val disposable = hello.subscribe()

println(disposable.isDisposed) // false
disposable.dispose()
println(disposable.isDisposed) // true
main.swift
let hello = Observable.just("hello").delay(10, scheduler: MainScheduler.instance)
let disposable: Disposable = hello.subscribe()

disposable.dispose()

Disposableの実用例(iOS)

DisposeBagでまとめて管理する

Dispose bags are used to return ARC like behavior to RX.
When a DisposeBag is deallocated, it will call dispose on each of the added disposables.

ViewController.swift
class ViewController: UIViewController {

    @IBOutlet weak var textLabel: UILabel!
    
    private let disposeBag = DisposeBag()
    
    override func viewDidLoad() {
        super.viewDidLoad()
        
        disposeBag.insert(showAfter(seconds: 5))
        disposeBag.insert(showAfter(seconds: 10))
        disposeBag.insert(showAfter(seconds: 15))
    }

    private func showAfter(seconds: RxTimeInterval) -> Disposable {
        return Observable.just("\(seconds) seconds")
            .delay(seconds, scheduler: MainScheduler.instance)
            .asDriver(onErrorDriveWith: Driver.empty())
            .drive(self.textLabel.rx.text)
    }
}

Disposableの実用例(Android)

CompositeDisposableでまとめて管理する

A disposable container that can hold onto multiple other disposables and offers O(1) add and removal complexity.

MainActivity.kt
class MainActivity : AppCompatActivity() {

    lateinit private var binding: ActivityMainBinding
    private val disposable = CompositeDisposable()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        binding = DataBindingUtil.setContentView(this, R.layout.activity_main)

        disposable.add(
                showAfter(5)
        )

        disposable.addAll(
                showAfter(10),
                showAfter(15)
        )
    }

    override fun onDestroy() {
        super.onDestroy()
        disposable.dispose()
    }

    private fun showAfter(seconds: Long): Disposable {
        return Observable.just("$seconds seconds")
                .delay(seconds, TimeUnit.SECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext {
                    binding.textView.text = it
                }.subscribe()
    }
}

エラーハンドリング

  • subscribe(onError: {})でハンドリングするか、別の値かストリームに変換する
  • リトライも可能(retryWhen()で細かい条件も実現可)
main.kt
val o = Observable.create<Int> { e ->
    e.onNext(1)
    e.onNext(2)
    e.onError(Throwable("hoge"))
    e.onNext(3)
    e.onComplete()
}

// 1 2 io.reactivex.exceptions.OnErrorNotImplementedException: hoge
o.subscribe(::println)

// 1 2 error!(java.lang.Throwable: hoge)
o.subscribe(::println, // onNext
        { println("error!($it)") }) // onError

// 1 2 error!(java.lang.Throwable: hoge) io.reactivex.exceptions.OnErrorNotImplementedException: hoge
o.doOnError { println("error!($it)") }.subscribe(::println)

// 1 2 1 2 io.reactivex.exceptions.OnErrorNotImplementedException: hoge
o.retry(2).subscribe(::println)

// 1 2 0
o.onErrorReturn { 0 }.subscribe(::println)

// 1 2 10 20
o.onErrorResumeNext(Observable.fromArray(10, 20)).subscribe(::println)

// 1 2
o.onErrorResumeNext(Observable.empty()).subscribe(::println)
main.swift
let o = Observable<Int>.create { observer in
    observer.on(.next(1))
    observer.on(.next(2))
    observer.on(.error(NSError(domain: "hoge", code: 0)))
    observer.on(.next(3))
    observer.on(.completed)
    return Disposables.create()
}

// 1 2 Unhandled error happened: Error Domain=hoge Code=0 "(null)"
_ = o.subscribe(onNext: { print($0) })

// 1 2 error!(Error Domain=hoge Code=0 "(null)")
_ = o.subscribe(onNext: { print($0) },
                onError: { print("error!(\($0))") })
// 1 2 error!(Error Domain=hoge Code=0 "(null)") Unhandled error happened: Error Domain=hoge Code=0 "(null)"
_ = o.do(onError: { print("error!(\($0))") }).subscribe(onNext: { print($0) })

// 1 2 1 2 Unhandled error happened: Error Domain=hoge Code=0 "(null)"
_ = o.retry(2).subscribe(onNext: { print($0) })

// 1 2 0
_ = o.catchErrorJustReturn(0).subscribe(onNext: { print($0) })

// 1 2 10 20
_ = o.catchError { _ in Observable.of(10, 20) } .subscribe(onNext: { print($0) })

// 1 2
_ = o.catchError { _ in Observable.empty() } .subscribe(onNext: { print($0) })

関連用語

  • Backpressure
  • Hot / Cold observables
    • Observableの性質
      • Coldはsubscribeされるまで動作しない&subscribeした回数分、ストリーム生成される
        • 大体のOperatorはこちら
      • Hotはsubscribeに関係なく動作する&ストリームは一つのみ
        • Subjectはこちら
      • Cold->Hot変換できる
    • 参考:今日こそ理解するHot / Cold @社内RxSwift勉強会
  • Single
    • 結果は成功(with an item)か失敗
    • JavaScriptのPromise的なもの
  • Completable
    • 結果は完了(with no item)か失敗
    • reactive runnable
  • Maybe
    • 結果は成功(with an item)か完了(with no item)か失敗
    • reactive optional
  • Flowable
    • BackpressureありのObservable
  • Processor
    • BackpressureありのSubject

次に見るべき資料

38
32
4

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
38
32

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?