11
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

RxSwift で非同期処理を合成しよう

Posted at

今回は同じ型の非同期処理を RxSwift でまとめる際に使用する concatmerge の使い方と挙動を簡単にまとめとこうと思います。

concat

image.png

concat() は複数の非同期処理を渡された順番で順次処理を行っていきます。サンプルは次のようになります。

    let ob1 = Observable<String>.create { observer -> Disposable in
        // 3秒後にイベントを流す
        DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
            observer.onNext("ob1")
            observer.onCompleted()
        }
        return Disposables.create()
    }
    let ob2 = Observable<String>.create { observer -> Disposable in
        // 2秒後にイベントを流す
        DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
            observer.onNext("ob2")
            observer.onCompleted()
        }
        return Disposables.create()
    }

    Observable.of(ob1, ob2)
        .concat()
        .subscribe(onNext: { str in
            print(str)
        })
        .disposed(by: disposeBag)
    
    // 出力:
    //
    // ob1
    // ob2
    //

また、Observable の合成は下記のように書くこともできます。

    Observable.concat(ob1, ob2)

merge

image.png

merge() は複数の非同期処理を並列に実行することができます。つまり、Observable の渡される順番などが関係なく処理が早く終わった順にストリームに流れます。下記がサンプルコードになります。

    let ob1 = Observable<String>.create { observer -> Disposable in
        // 3秒後にイベントを流す
        DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
            observer.onNext("ob1")
            observer.onCompleted()
        }
        return Disposables.create()
    }
    let ob2 = Observable<String>.create { observer -> Disposable in
        // 2秒後にイベントを流す
        DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
            observer.onNext("ob2")
            observer.onCompleted()
        }
        return Disposables.create()
    }

    Observable.of(ob1, ob2)
        .merge()
        .subscribe(onNext: { str in
            print(str)
        })
        .disposed(by: disposeBag)

    // 出力:
    //
    // ob2
    // ob1
    //

じゃあ異なる型の Observable はどうなるの?

基本的には、ストリームのイベントを逐次検知する必要がある場合は Observable の型を統一して、concat なり merge なりを使用する必要があります。並列で処理を実行して全ての処理が完了したタイミングで値を参照する場合は、zip という関数が用意されていますが、直列で実行が完了した値を参照したい場合は flatMap なり、concat なりを使って実装する感じでしょうか🤔(こんな方法があるよってやつがあれば教えてください🥺)

直列で逐次イベントを検知

    enum Container {
        case string(String)
        case int(Int)
    }
    let ob1 = Observable<String>.create { observer -> Disposable in
        DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
            observer.onNext("ob1")
            observer.onCompleted()
        }
        return Disposables.create()
    }.map { Container.string($0) }
    
    let ob2 = Observable<Int>.create { observer -> Disposable in
        DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
            observer.onNext(2)
            observer.onCompleted()
        }
        return Disposables.create()
    }.map { Container.int($0) }

    Observable.of(ob1, ob2)
        .concat()
        .subscribe(onNext: { c in
            switch c {
            case .string(let str):
                print("string: \(str)")
            case .int(let num):
                print("int: \(num)")
            }
        })
        .disposed(by: disposeBag)


    // 出力:
    //
    // string: ob1
    // int: 2
    //

並列で逐次イベントを検知

    enum Container {
        case string(String)
        case int(Int)
    }
    let ob1 = Observable<String>.create { observer -> Disposable in
        DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
            observer.onNext("ob1")
            observer.onCompleted()
        }
        return Disposables.create()
    }.map { Container.string($0) }
    
    let ob2 = Observable<Int>.create { observer -> Disposable in
        DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
            observer.onNext(2)
            observer.onCompleted()
        }
        return Disposables.create()
    }.map { Container.int($0) }

    Observable.of(ob1, ob2)
        .merge()
        .subscribe(onNext: { c in
            switch c {
            case .string(let str):
                print("string: \(str)")
            case .int(let num):
                print("int: \(num)")
            }
        })
        .disposed(by: disposeBag)


    // 出力:
    //
    // int: 2
    // string: ob1
    //

並列で完了イベントを検知

zip を使うと上記2つの方法とは違い型を統一する必要がないので、よりシンプルに実装することができます。

    let ob1 = Observable<String>.create { observer -> Disposable in
        DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
            observer.onNext("ob1")
            observer.onCompleted()
        }
        return Disposables.create()
    }
    
    let ob2 = Observable<Int>.create { observer -> Disposable in
        DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
            observer.onNext(2)
            observer.onCompleted()
        }
        return Disposables.create()
    }

    Observable.zip(ob1, ob2)
        .subscribe(onNext: { str, num in
            print("string: \(str), int: \(num)")
        })
        .disposed(by: disposeBag)

    // 出力:
    //
    // string: ob1, int: 2
    //

参考

11
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?