LoginSignup
30
21

More than 5 years have passed since last update.

【v4 対応版】ReactiveSwift に登場するオペレータまとめ

Last updated at Posted at 2019-03-06

こんにちは、なんちゃってモバイルアプリ開発者のかたおかです。

半年程前から RxSwift の代わりに ReactiveSwift を使い始めて気に入っています。

ReactiveSwift に登場するオペレータについては Basic Operators にある程度解説がありますが、全てではありません。

API リファレンス には全てのオペレータがまとまっていますが、サンプルコードなしではなかなか動作のイメージしづらいこともあります。

というわけで、リファレンスとして Signal/SignalProducer 共通で使えるオペレータをサンプルコードと一緒にまとめてみました。

(サンプルコードは基本 Signal を使って書いていますが、SignalProducer でも同様に動作すると思います。たぶん)

map

map を使うことで値の変換ができます。

map(_ transform:)

クロージャで値を受け取って新しい値を返すパターン。

let (signal, observer) = Signal<Int, NoError>.pipe()

signal
    .map { $0 * 2 } // map(_ transform:)
    .observeValues { print($0) }

observer.send(value: 1) // prints 2
observer.send(value: 2) // prints 4
observer.send(value: 3) // prints 6

map(value:)

直接新しい値を指定するパターン。

let (signal, observer) = Signal<Int, NoError>.pipe()

signal
    .map(value: 1) // map(value:)
    .observeValues { print($0) }

observer.send(value: 1) // prints 1
observer.send(value: 2) // prints 1
observer.send(value: 3) // prints 1

map(_ keyPath:)

KeyPath を指定するパターン。

let (signal, observer) = Signal<String, NoError>.pipe()

signal
    .map(\String.count) // map(_ keyPath:)
    .observeValues { print($0) }

observer.send(value: "foo")    // prints 3
observer.send(value: "foobar") // prints 6

mapError

map では値(Value)の変換ができるのに対し、mapError ではエラー(Error)の変換ができます。

enum CustomError: String, Error {
    case foo
    case bar
    case other
}

let (signal, observer) = Signal<String, NSError>.pipe()
let error = NSError(domain: "com.example.foo", code: 42, userInfo: nil)

signal
    .mapError { error -> CustomError in // mapError(_ transform:)
        switch error.domain {
        case "com.example.foo":
            return .foo
        case "com.example.bar":
            return .bar
        default:
            return .other
        }
    }
    .observeFailed { print($0) }

observer.send(error: error) // prints foo

lazyMap

map と同様に値の変換ができます。ただし、引数のクロージャは指定した Scheduler 上で遅延評価されるようになります。

let (signal, observer) = Signal<Int, NoError>.pipe()
let scheduler = TestScheduler()

signal
    .lazyMap(on: scheduler, transform: { $0 * 2 }) // lazyMap(on scheduler:, transform:)
    .observeValues { print($0) }

observer.send(value: 1) // nothings printed
scheduler.advance()     // prints 2
observer.send(value: 2) // nothings printed
scheduler.advance()     // prints 4

filter

filter を使うことで値のフィルタリングができます。

レシーバから受け取った値に対して引数のクロージャが true を返す場合のみ、その値を送るようになります。

let (signal, observer) = Signal<Int, NoError>.pipe()

signal
    .filter { $0 % 2 == 0 } // filter(_ isIncluded:)
    .observeValues { print($0) }

observer.send(value: 1) // nothings printed
observer.send(value: 2) // prints 2
observer.send(value: 3) // nothings printed
observer.send(value: 4) // prints 4

filterMap

filterMap を使うことで値のフィルタリングとアンラップが同時にできます。

レシーバから受け取った値に対して引数のクロージャが返すオプショナル型の値が nil でない場合のみ、その値をアンラップして送るようになります。

let (signal, observer) = Signal<String?, NoError>.pipe()

signal
    .filterMap { $0?.uppercased() } // filterMap(_ transform:)
    .observeValues { print($0) }

observer.send(value: "foo") // prints FOO
observer.send(value: nil)   // nothings printed
observer.send(value: "bar") // prints BAR

skipNil

レシーバから受け取った値をアンラップして送るようになります。nil 値は送られなくなります。

let (signal, observer) = Signal<Int?, NoError>.pipe()

signal
    .skipNil() // skipNil()
    .observeValues { print($0) }

observer.send(value: 1)   // prints 1
observer.send(value: nil) // nothings printed
observer.send(value: 2)   // prints 2

take

take を使うことで完了を制御することができます。

take(first count:)

指定回数(count)の値を送るまではレシーバから受け取った値を送りますが、指定回数の値を送った直後に完了するようになります。

let (signal, observer) = Signal<Int, NoError>.pipe()

signal
    .take(first: 2) // take(first count:)
    .observeValues { print($0) }

observer.send(value: 1) // prints 1
observer.send(value: 2) // prints 2
observer.send(value: 3) // nothings printed

take(during lifetime:)

引数の lifetime が終了するまではレシーバから受け取った値を送り続けますが、lifetime が終了するタイミングで完了するようになります。

let (signal, observer) = Signal<Int, NoError>.pipe()
let (lifetime, token) = Lifetime.make()

signal
    .take(during: lifetime) // take(during lifetime:)
    .observeValues { print($0) }

observer.send(value: 1) // prints 1
observer.send(value: 2) // prints 2
token.dispose()
observer.send(value: 3) // nothing printed

take(until trigger:)

引数の trigger が値を送る or 完了するまではレシーバから受け取った値を送り続けますが、trigger が値を送る or 完了するタイミングで完了するようになります。

let (signal, observer) = Signal<Int, NoError>.pipe()
let (triggerSignal, triggerObserver) = Signal<(), NoError>.pipe()

signal
    .take(until: triggerSignal) // take(until trigger:)
    .observeValues { print($0) }

observer.send(value: 1)         // prints 1
observer.send(value: 2)         // prints 2
triggerObserver.send(value: ())
observer.send(value: 3)         // nothing printed

take(untilReplacement replacement:)

引数の replacement が値を送るまではレシーバから受け取った値を送り続けますが、untilReplacement が値が送り始めると、レシーバの値の代わりに untilReplacement から受け取った値を送るようになります。

let (signal, observer) = Signal<Int, NoError>.pipe()
let (replacementSignal, replacementObserver) = Signal<Int, NoError>.pipe()

signal
    .take(untilReplacement: replacementSignal) // take(untilReplacement replacement:)
    .observeValues { print($0) }

observer.send(value: 1)            // prints 1
observer.send(value: 2)            // prints 2
replacementObserver.send(value: 3) // prints 3
observer.send(value: 4)            // nothing printed
replacementObserver.send(value: 5) // prints 5

take(last count:)

レシーバが完了したタイミングで、レシーバから受け取った値のうち最後から count 番目までの値を送るようになります。

let (signal, observer) = Signal<Int, NoError>.pipe()

signal
    .take(last: 2) // take(last count:)
    .observeValues { print($0) }

observer.send(value: 1)
observer.send(value: 2)
observer.send(value: 3)
observer.send(value: 4)
observer.sendCompleted() // prints 3, 4

take(while shouldContinue:)

レシーバから受け取った値に対して引数の shouldContinuetrue を返す間は受け取った値を送り続けますが、shouldContinuefalse を返すタイミングで完了するようになります。

let (signal, observer) = Signal<Int, NoError>.pipe()

signal
    .take(while: { $0 != 3 }) // take(while shouldContinue:)
    .observeValues { print($0) }

observer.send(value: 1)  // prints 1
observer.send(value: 2)  // prints 2
observer.send(value: 3)  // nothing printed
observer.send(value: 4)  // nothing printed

collect

collect を使うことで値を配列にまとめることができます。

collect()

レシーバが完了したタイミングですべての値を 1 つの配列にまとめて送るようになります。

let (signal, observer) = Signal<Int, NoError>.pipe()

signal
    .collect() // collect()
    .observeValues { print($0) }

observer.send(value: 1)
observer.send(value: 2)
observer.send(value: 3)
observer.sendCompleted() // prints [1, 2, 3]

collect(count:)

レシーバから受け取った値が指定回数分まとまるタイミングで、値を 1 つの配列にまとめて送るようになります。

let (signal, observer) = Signal<Int, NoError>.pipe()

signal
    .collect(count: 2) // collect(count:)
    .observeValues { print($0) }

observer.send(value: 1) // nothing printed
observer.send(value: 2) // prints [1, 2]
observer.send(value: 3) // nothing printed
observer.send(value: 4) // prints [3, 4]

collect(_ shouldEmit:)

レシーバから受け取った値に対して引数の shouldEmittrue を返すタイミングで、それまで受け取った値を 1 つの配列にまとめて送るようになります。

let (signal, observer) = Signal<Int, NoError>.pipe()

signal
    .collect({ $0.reduce(0, +) == 5 })
    .observeValues { print($0) }

observer.send(value: 1) // nothing printed
observer.send(value: 4) // prints [1, 4]
observer.send(value: 1) // nothing printed
observer.send(value: 3) // nothing printed
observer.send(value: 1) // prints [1, 3, 1]

collect(every interval:, on scheduler, skipEmpty:, discardWhenCompleted:)

指定時間(interval)ごとに、レシーバから受け取った値を 1 つの配列にまとめて送るようになります。

skipEmpty では、送るべき値が何もない場合に空配列を送るか or 何も送らないかを指定できます。

discardWhenCompleted では、まだ送りきっていない値がある状態で完了した場合に次のインターバルで送るか or 何も送らないかを指定できます。

let (signal, observer) = Signal<Int, NoError>.pipe()
let scheduler = TestScheduler()

signal
    .collect(every: .seconds(1), on: scheduler, skipEmpty: false, discardWhenCompleted: false) // collect(every interval:, on scheduler, skipEmpty:, discardWhenCompleted:)
    .observeValues { print($0) }

observer.send(value: 1)
observer.send(value: 2)
scheduler.advance(by: .seconds(1)) // prints [1, 2]
observer.send(value: 3)
observer.send(value: 4)
scheduler.advance(by: .seconds(1)) // prints [3, 4]
scheduler.advance(by: .seconds(1)) // prints []
observer.send(value: 5)
observer.sendCompleted()
scheduler.advance(by: .seconds(1)) // prints [5]

combineLatest

レシーバもしくは引数の Signal/SignalProducer いずれかから値を受け取ったタイミングで、各々から最後に受け取った値を含むタプルを送るようになります。

let (numbersSignal, numbersObserver) = Signal<Int, NoError>.pipe()
let (lettersSignal, lettersObserver) = Signal<String, NoError>.pipe()

numbersSignal
    .combineLatest(with: lettersSignal) // combineLatest(with other:)
    .observeValues { print($0) }

numbersObserver.send(value: 0)   // nothing printed
numbersObserver.send(value: 1)   // nothing printed
lettersObserver.send(value: "A") // prints (1, "A")
lettersObserver.send(value: "B") // prints (1, "B")
numbersObserver.send(value: 2)   // prints (2, "B")

merge

レシーバもしくは引数の Signal/SignalProducer いずれかから値を受け取ったタイミングで、その値を送るようになります。

let (signal, observer) = Signal<Int, NoError>.pipe()
let (otherSignal, otherObserver) = Signal<Int, NoError>.pipe()

signal
    .merge(with: otherSignal) // merge(with other:)
    .observeValues { print($0) }

observer.send(value: 1)      // prints 1
observer.send(value: 2)      // prints 2
otherObserver.send(value: 3) // prints 3
observer.send(value: 4)      // prints 4
otherObserver.send(value: 5) // prints 5
otherObserver.send(value: 6) // prints 6

delay

値を送るタイミングを遅らせることができます。

let (signal, observer) = Signal<Int, NoError>.pipe()
let scheduler = TestScheduler()

signal
    .delay(2, on: scheduler) // delay(_ interval:, on scheduler:)
    .observeValues { print($0) }

observer.send(value: 1)
scheduler.advance(by: .seconds(1))
scheduler.advance(by: .seconds(2)) // prints 1

skip

skip を使うことで値を送らないようにする(スキップ)ことができます。

skip(first count:)

レシーバから受け取った値を指定回数分スキップするようになります。

let (signal, observer) = Signal<Int, NoError>.pipe()

signal
    .skip(first: 2) // skip(first count:)
    .observeValues { print($0) }

observer.send(value: 1) // nothing printed
observer.send(value: 2) // nothing printed
observer.send(value: 3) // prints 3
observer.send(value: 4) // prints 4

skip(until trigger:)

引数の trigger が値を送る or 完了するまではレシーバからのイベントをスキップするようになります。

let (signal, observer) = Signal<Int, NoError>.pipe()
let (triggerSignal, triggerObserver) = Signal<(), NoError>.pipe()

signal
    .skip(until: triggerSignal) // skip(until trigger:)
    .observeValues { print($0) }

observer.send(value: 1)         // nothing printed
observer.send(value: 2)         // nothing printed
triggerObserver.send(value: ())
observer.send(value: 3)         // prints 3
observer.send(value: 4)         // prints 4

skip(while shouldContinue:)

引数の shouldContinuefalse を返すまではレシーバからのイベントをスキップするようになります。

let (signal, observer) = Signal<Int, NoError>.pipe()

signal
    .skip(while: { $0 <= 2 }) // skip(while shouldContinue:)
    .observeValues { print($0) }

observer.send(value: 1) // nothing printed
observer.send(value: 2) // nothing printed
observer.send(value: 3) // prints 3
observer.send(value: 4) // prints 4

materialize

materialize を使うことでレシーバからのイベントをすべて値として扱えるようになります。

例えば、Signal<Int, NSError> を materialize すると Signal<Signal<Int, NSError>.Event, NoError> になります。

let (signal, observer) = Signal<Int, NSError>.pipe()
let error = NSError(domain: "com.example.foo", code: 42, userInfo: nil)

signal
    .materialize() // materialize()
    .observeValues {
        switch $0 {
        case .value:
            print($0.value!)
        case .failed:
            print($0.error!)
        default:
            break
        }
    }

observer.send(value: 1)     // prints 1
observer.send(error: error) // prints Error Domain=com.example.foo Code=42 "(null)"

dematerialize

dematerialize を使うことで materialize と逆のことができます。

例えば、Signal<Signal<Int, NSError>.Event, NoError> を dematerialize すると Signal<Int, NSError> になります。

let (signal, observer) = Signal<Signal<Int, NSError>.Event, NoError>.pipe()
let error = NSError(domain: "com.example.foo", code: 42, userInfo: nil)

signal
    .dematerialize() // dematerialize()
    .observeResult {
        switch $0 {
        case .success:
            print($0.value!)
        case .failure:
            print($0.error!)
        }
    }

observer.send(value: .value(1))      // prints 1
observer.send(value: .failed(error)) // prints Error Domain=com.example.foo Code=42 "(null)"

sample

sample を使うことで値を送るタイミングを制御できます。

sample(with sampler:)

引数の sampler から値を受け取ったタイミングで、レシーバから最後に受け取った値と sampler から受け取った値をタプルで送るようになります。

let (signal, observer) = Signal<Int, NoError>.pipe()
let (sampler, samplerObserver) = Signal<String, NoError>.pipe()

signal
    .sample(with: sampler) // sample(with sampler:)
    .observeValues { print($0) }

samplerObserver.send(value: "a") // nothing printed
observer.send(value: 1)          // nothing printed
observer.send(value: 2)          // nothing printed
samplerObserver.send(value: "b") // prints (2, "b")

sample(on sampler:)

引数の sampler から値を受け取ったタイミングで、レシーバから最後に受け取った値を送るようになります。

let (signal, observer) = Signal<Int, NoError>.pipe()
let (sampler, samplerObserver) = Signal<(), NoError>.pipe()

signal
    .sample(on: sampler) // sample(with sampler:)
    .observeValues { print($0) }

samplerObserver.send(value: ()) // nothing printed
observer.send(value: 1)         // nothing printed
observer.send(value: 2)         // nothing printed
samplerObserver.send(value: ()) // prints 2

withLatest

レシーバから値を受け取ったタイミングで samplee から受け取った最後の値を送るようになります。

sample(with sampler:) のレシーバと引数を入れ替えたバージョンです。

let (signal, observer) = Signal<Int, NoError>.pipe()
let (samplee, sampleeObserver) = Signal<String, NoError>.pipe()

signal
    .withLatest(from: samplee) // withLatest(from samplee:)
    .observeValues { print($0) }

observer.send(value: 1)          // nothing printed
sampleeObserver.send(value: "a") // nothing printed
sampleeObserver.send(value: "b") // nothing printed
observer.send(value: 2)          // prints (2, "b")

combinePrevious

combinePrevious を使うことで前回の送られた値と今回送られた値を同時に送ることができます。

combinePrevious(_ initial:)

レシーバから値を受け取ると、前回受け取った値と今回受け取った値をタプルで受け取るようになります。

初めて値を受け取ったときは、指定された initial と受け取った値のタプルを送ります。

let (signal, observer) = Signal<Int, NoError>.pipe()

signal
    .combinePrevious(0) // combinePrevious(_ initial:)
    .observeValues { print($0) }

observer.send(value: 1) // prints (0, 1)
observer.send(value: 2) // prints (1, 2)
observer.send(value: 3) // prints (2, 3)

combinePrevious()

combinePrevious(_ initial:) と同様にレシーバから値を受け取ると、前回受け取った値と今回受け取った値をタプルで受け取るようになります。

ただし、初めて値を受け取ったときには何も送りません。

let (signal, observer) = Signal<Int, NoError>.pipe()

signal
    .combinePrevious() // combinePrevious()
    .observeValues { print($0) }

observer.send(value: 1) // nothing printed
observer.send(value: 2) // prints (1, 2)
observer.send(value: 3) // prints (2, 3)

reduce

reduce を使うことで値の蓄積ができます。

Swift の Array にも同名のメソッドがありますが、これと同じ感じで使えます。

reduce(_ initialResult:, _ nextPartialResult:)

レシーバが完了したタイミングで蓄積した値を送るようになります。

let (signal, observer) = Signal<Int, NoError>.pipe()

signal
    .reduce(1, { $0 + $1 }) // reduce(_ initialResult:, _ nextPartialResult:)
    .observeValues { print($0) }

observer.send(value: 2)
observer.send(value: 3)
observer.send(value: 4)
observer.sendCompleted() // prints 10

reduce(into initialResult:, _ nextPartialResult:)

reduce(_ initialResult:, _ nextPartialResult:) と異なり、nextPartialResult に渡される値が inout パラメータになります。

let (signal, observer) = Signal<Int, NoError>.pipe()

signal
    .reduce(into: 1, { $0 += $1 }) // reduce(into initialResult:, _ nextPartialResult:)
    .observeValues { print($0) }

observer.send(value: 2)
observer.send(value: 3)
observer.send(value: 4)
observer.sendCompleted() // prints 10

scan

scan を使うことで reduce と同様に値の蓄積ができます。ただ、reduce とは異なり部分的な蓄積結果を扱うことができます。

scan(_ initialResult:, _ nextPartialResult:)

レシーバから値を受け取ったタイミングでそれまでに蓄積した値を送るようになります。

let (signal, observer) = Signal<Int, NoError>.pipe()

signal
    .scan(1, { $0 + $1 }) // scan(_ initialResult:, _ nextPartialResult:)
    .observeValues { print($0) }

observer.send(value: 2) // prints 3
observer.send(value: 3) // prints 6
observer.send(value: 4) // prints 10

scan(into initialResult:, _ nextPartialResult:)

scan(_ initialResult:, _ nextPartialResult:) と異なり、nextPartialResult に渡される値が inout パラメータになります。

let (signal, observer) = Signal<Int, NoError>.pipe()

signal
    .scan(into: 1, { $0 += $1 }) // scan(into initialResult:, _ nextPartialResult:)
    .observeValues { print($0) }

observer.send(value: 2) // prints 3
observer.send(value: 3) // prints 6
observer.send(value: 4) // prints 10

skipRepeats

skipRepeats を使うことで直前と同じ値だった場合のスキップができます。

skipRepeats()

レシーバから値を受け取ったとき、その値が直前の値と同じだった場合は値をスキップします。

let (signal, observer) = Signal<Bool, NoError>.pipe()

signal
    .skipRepeats() // skipRepeats()
    .observeValues { print($0) }

observer.send(value: true)  // prints true
observer.send(value: true)  // nothing printed
observer.send(value: false) // prints false
observer.send(value: true)  // prints true

skipRepeats(_ isEquivalent:)

レシーバから値を受け取ったとき、引数の isEquivalenttrue を返す場合は値をスキップします。

let (signal, observer) = Signal<String, NoError>.pipe()

signal
    .skipRepeats { $0.count == $1.count } // skipRepeats(_ isEquivalent:)
    .observeValues { print($0) }

observer.send(value: "a")  // prints a
observer.send(value: "b")  // nothing printed
observer.send(value: "cc") // prints cc
observer.send(value: "d")  // prints d

zip

zip を使うことで二つの Signal/SignalProducer の値のペアを作ることができます。

let (signal, observer) = Signal<Int, NoError>.pipe()
let (rightSignal, rightObserver) = Signal<String, NoError>.pipe()

signal
    .zip(with: rightSignal) // zip(with other:)
    .observeValues { print($0) }

observer.send(value: 1)           // nothing printed
observer.send(value: 2)           // nothing printed
rightObserver.send(value: "foo")  // prints (1, "foo")
rightObserver.send(value: "bar")  // prints (2, "bar")
rightObserver.send(value: "buzz") // nothing printed
observer.send(value: 3)           // prints (3, "buzz")

throttle

throttle を使うことで連続して値が送られるのを防ぐことができます。

throttle(_ interval:, on scheduler:)

最後に値を送った後 interval 秒過ぎるまでは新しい値を送らないようになります。

最後に値を送ってから interval 秒の間にレシーバから値を受け取ったときは、受け取ったうちの最後の値をinterval 秒が過ぎたタイミングで送ります。

let (signal, observer) = Signal<Int, NoError>.pipe()
let scheduler = TestScheduler()

signal
    .throttle(1, on: scheduler) // throttle(_ interval:, on scheduler:)
    .observeValues { print($0) }

observer.send(value: 1)
scheduler.advance()                // prints 1

observer.send(value: 2)
observer.send(value: 3)
scheduler.advance(by: .seconds(1)) // prints 3

scheduler.advance(by: .seconds(3)) // nothing printed

observer.send(value: 4)
scheduler.advance()                // prints 4

throttle(while shouldThrottle:, on scheduler:)

引数の shouldThrottletrue の間は値を送らないようになります。

shouldThrottletrue の間にレシーバから値を受け取ったときは、受け取った値のうち最後の値を shouldThrottlefalse になったタイミングで送ります。

shouldThrottlefalse の間にレシーバから値を受け取ったときは、受け取ったタイミングでその値を送ります。

let (signal, observer) = Signal<Int, NoError>.pipe()
let shouldThrottle = MutableProperty(false)
let scheduler = ImmediateScheduler()

signal
    .throttle(while: shouldThrottle, on: scheduler) // throttle(while shouldThrottle:, on scheduler:)
    .observeValues { print($0) }

observer.send(value: 1)      // prints 1
observer.send(value: 2)      // prints 2

shouldThrottle.value = true
observer.send(value: 3)
observer.send(value: 4)
shouldThrottle.value = false // prints 4

observer.send(value: 5)      // prints 5
observer.send(value: 6)      // prints 6

debounce

throttle と同様、連続して値が送られるのを防ぐことができます。

レシーバから最後に値を受け取ってから interval 秒過ぎたタイミングで、その値を送るようになります。

interval 秒過ぎる前にレシーバから複数の値を受け取ったときは、最後に値を受け取ってからさらに interval 秒待った後、最後に受け取った値を送ります。

let (signal, observer) = Signal<Int, NoError>.pipe()
let scheduler = TestScheduler()

signal
    .debounce(1, on: scheduler, discardWhenCompleted: true) // debounce(_ interval:, on scheduler:, discardWhenCompleted:)
    .observeValues { print($0) }

observer.send(value: 1)
scheduler.advance()                // nothing printed

observer.send(value: 2)
observer.send(value: 3)
scheduler.advance(by: .seconds(1)) // prints 3

scheduler.advance(by: .seconds(3)) // nothing printed

observer.send(value: 4)
scheduler.advance()                // nothing printed

uniqueValues

uniqueValues を使うことですでに送ったことがある値をスキップできます。

uniqueValues(_ transform:)

クロージャを渡して一意性チェックの対象となる値を指定するパターン。

let (signal, observer) = Signal<String, NoError>.pipe()

signal
    .uniqueValues { $0.uppercased() } // uniqueValues(_ transform:)
    .observeValues { print($0) }

observer.send(value: "A") // prints A
observer.send(value: "b") // prints b
observer.send(value: "c") // prints c
observer.send(value: "a") // nothing printed
observer.send(value: "B") // nothing printed
observer.send(value: "c") // nothing printed

uniqueValues()

引数なしパターン。

let (signal, observer) = Signal<String, NoError>.pipe()

signal
    .uniqueValues() // uniqueValues()
    .observeValues { print($0) }

observer.send(value: "A") // prints A
observer.send(value: "b") // prints b
observer.send(value: "c") // prints c
observer.send(value: "a") // prints a
observer.send(value: "B") // prints B
observer.send(value: "c") // nothing printed

timeout

interval 秒が過ぎるまではレシーバから受け取った値をそのまま送ります。

interval 秒が過ぎるまでにレシーバが完了しないと、interval 秒が過ぎたタイミングで指定した error と共に失敗するようになります。

let (signal, observer) = Signal<Int, NoError>.pipe()
let error = NSError(domain: "com.example.foo", code: 42, userInfo: nil)
let scheduler = TestScheduler()

signal
    .timeout(after: 1, raising: error, on: scheduler) // timeout(after interval:, raising error:, on scheduler:)
    .observeResult {
        switch $0 {
        case .success:
            print($0.value!)
        case .failure:
            print($0.error!)
        }
    }

observer.send(value: 1)            // prints 1
observer.send(value: 2)            // prints 2
scheduler.advance(by: .seconds(1)) // Error Domain=com.example.foo Code=42 "(null)"

promoteError

combineLatestmerge 等を使う際には、組み合わせるイベントストリームの Error の型を合わせる必要があります。

失敗しないイベントストリーム(例えば、Signal<Int, NoError>)と失敗するイベントストリーム(例えば、Signal<Int, NSError>)はそのままでは組み合わせられませんが、promoteError を使うことで組み合わせられるようになります。

let numbersSignal = Signal<Int, NoError>.never
let lettersSignal = Signal<String, NSError>.never

let combined = numbersSignal
    .promoteError(NSError.self) // promoteError(_:)
    .combineLatest(with: lettersSignal)

print(type(of: combined)) // prints Signal<(Int, String), NSError>

promoteValue

promoteErrorError の型を変換するのに対し、promoteValueValue の型を変換します。

let numbersSignal = Signal<Int, NoError>.never
let completable = Signal<Never, NoError>.never

let merged = numbersSignal.merge(with: completable.promoteValue() /* promoteValue(_:) */)

print(type(of: merged)) // prints Signal<Int, NoError>

negate

レシーバから受け取った Bool 値を反転させて送るようになります。

let (signal, observer) = Signal<Bool, NoError>.pipe()

signal
    .negate() // negate()
    .observeValues { print($0) }

observer.send(value: true) // prints false

and

レシーバから受け取った値と引数の booleans から受け取った値の論理積(AND)を計算して送るようになります。

let (signal1, observer1) = Signal<Bool, NoError>.pipe()
let (signal2, observer2) = Signal<Bool, NoError>.pipe()

signal1
    .and(signal2) // and(_ booleans:)
    .observeValues { print($0) }

observer1.send(value: true)
observer2.send(value: true)  // prints true
observer1.send(value: false) // prints false
observer2.send(value: false) // prints false

or

レシーバから受け取った値と引数の booleans から受け取った値の論理和(OR)を計算して送るようになります。

let (signal1, observer1) = Signal<Bool, NoError>.pipe()
let (signal2, observer2) = Signal<Bool, NoError>.pipe()

signal1
    .or(signal2) // or(_ booleans:)
    .observeValues { print($0) }

observer1.send(value: true)
observer2.send(value: true)  // prints true
observer1.send(value: false) // prints true
observer2.send(value: false) // prints false

attempt

レシーバから値を受け取ったタイミングで action(エラーを送出する処理)を実行し、成功すれば受け取った値を送り、エラーが生じた場合はそのエラーと共に失敗するようになります。

func operation(value: Int?) throws {
    if value == nil {
        throw NSError(domain: "com.example.foo", code: 42, userInfo: nil)
    }
}

let (signal, observer) = Signal<Int?, AnyError>.pipe()

signal
    .attempt { try operation(value: $0) } // attempt(_ action:)
    .observeResult {
        switch $0 {
        case .success:
            print($0.value!)
        case .failure:
            print($0.error!)
        }
    }

observer.send(value: 1)   // prints Optional(1)
observer.send(value: 2)   // prints Optional(2)
observer.send(value: nil) // prints Error Domain=com.example.foo Code=42 "(null)"

attemptMap

レシーバから値を受け取ったタイミングで action(エラーを送出する処理)を実行し、成功すれば action の実行結果を送り、エラーが生じた場合はそのエラーと共に失敗するようになります。

func operation(value: Int?) throws -> Int {
    guard let value = value else {
        throw NSError(domain: "com.example.foo", code: 42, userInfo: nil)
    }
    return value * 2
}

let (signal, observer) = Signal<Int?, AnyError>.pipe()

signal
    .attemptMap { try operation(value: $0) } // attemptMap(_ transform:)
    .observeResult {
        switch $0 {
        case .success:
            print($0.value!)
        case .failure:
            print($0.error!)
        }
}

observer.send(value: 1)   // prints 2
observer.send(value: 2)   // prints 4
observer.send(value: nil) // prints Error Domain=com.example.foo Code=42 "(null)"

flatten

flatten を使うことでストリーム・オブ・ストリーム(例えば、Signal<Signal<Int, NoError>, NoError>)の内部ストリーム(Signal<Int, NoError>)の値を送ることができるようになります。

これを仮にフラット化(flatten)と呼びます。

フラット化する際には、どのような戦略でフラット化するかを引数(FlattenStrategy)で指定します。

FlattenStrategy.merge

.merge を指定すると、受け取ったすべての内部ストリームのすべての値を送るようになります。

let (outerSignal, outerObserver) = Signal<Signal<String, NoError>, NoError>.pipe()
let (innerSignal1, innerObserver1) = Signal<String, NoError>.pipe()
let (innerSignal2, innerObserver2) = Signal<String, NoError>.pipe()

outerSignal
    .flatten(.merge)
    .observeValues { print($0) }

outerObserver.send(value: innerSignal1)
outerObserver.send(value: innerSignal2)

innerObserver1.send(value: "a") // prints a
innerObserver2.send(value: "1") // prints 1
innerObserver1.send(value: "b") // prints b
innerObserver2.send(value: "2") // prints 2
innerObserver1.send(value: "c") // prints c
innerObserver2.send(value: "3") // prints 3

FlattenStrategy.concat

.concat を指定すると、一つ前に受け取った内部ストリームが完了してから次に受け取った内部ストリームの値を送るようになります。

let (outerSignal, outerObserver) = Signal<Signal<String, NoError>, NoError>.pipe()
let (innerSignal1, innerObserver1) = Signal<String, NoError>.pipe()
let (innerSignal2, innerObserver2) = Signal<String, NoError>.pipe()

outerSignal
    .flatten(.concat)
    .observeValues { print($0) }

outerObserver.send(value: innerSignal1)
outerObserver.send(value: innerSignal2)

innerObserver1.send(value: "a") // prints a
innerObserver2.send(value: "1") // nothing printed
innerObserver1.send(value: "b") // prints b
innerObserver2.send(value: "2") // nothing printed
innerObserver1.send(value: "c") // prints c
innerObserver1.sendCompleted()
innerObserver2.send(value: "3") // prints 3
innerObserver2.sendCompleted()

FlattenStrategy.concurrent

.concurrent を指定すると limit を上限としたすべての内部ストリームのすべての値を送るようになります。

受け取った内部ストリームが limit を超えると、超えた分はエンキューされます(監視中の内部ストリームのいずれかが完了したタイミングでデキューされて監視がはじまります)。

limit1 を指定すると、.concat を指定したときと同じ動作になります。

let (outerSignal, outerObserver) = Signal<Signal<String, NoError>, NoError>.pipe()
let (innerSignal1, innerObserver1) = Signal<String, NoError>.pipe()
let (innerSignal2, innerObserver2) = Signal<String, NoError>.pipe()
let (innerSignal3, innerObserver3) = Signal<String, NoError>.pipe()

outerSignal
    .flatten(.concurrent(limit: 2))
    .observeValues { print($0) }

outerObserver.send(value: innerSignal1)
outerObserver.send(value: innerSignal2)
outerObserver.send(value: innerSignal3)

innerObserver1.send(value: "a") // prints a
innerObserver2.send(value: "1") // prints 1
innerObserver3.send(value: "!") // nothing printed
innerObserver1.send(value: "b") // prints b
innerObserver2.send(value: "2") // prints 2
innerObserver3.send(value: "@") // nothing printed
innerObserver1.send(value: "c") // prints c
innerObserver1.sendCompleted()
innerObserver2.send(value: "3") // prints 3
innerObserver3.send(value: "#") // prints #

FlattenStrategy.latest

.latest を指定すると、最後に受け取った内部ストリームの値のみを送るようになります。

let (outerSignal, outerObserver) = Signal<Signal<String, NoError>, NoError>.pipe()
let (innerSignal1, innerObserver1) = Signal<String, NoError>.pipe()
let (innerSignal2, innerObserver2) = Signal<String, NoError>.pipe()

outerSignal
    .flatten(.latest)
    .observeValues { print($0) }

outerObserver.send(value: innerSignal1)
innerObserver1.send(value: "a") // prints a
innerObserver2.send(value: "1") // nothing printed
innerObserver1.send(value: "b") // prints b
innerObserver2.send(value: "2") // nothing printed
outerObserver.send(value: innerSignal2)
innerObserver1.send(value: "c") // nothing printed
innerObserver2.send(value: "3") // prints 3

FlattenStrategy.race

.race を指定すると、受け取った内部ストリームのうち最初に値を受け取った内部ストリームの値のみを送るようになります。

let (outerSignal, outerObserver) = Signal<Signal<String, NoError>, NoError>.pipe()
let (innerSignal1, innerObserver1) = Signal<String, NoError>.pipe()
let (innerSignal2, innerObserver2) = Signal<String, NoError>.pipe()

outerSignal
    .flatten(.race)
    .observeValues { print($0) }

outerObserver.send(value: innerSignal1)
outerObserver.send(value: innerSignal2)

innerObserver2.send(value: "1") // prints 1
innerObserver1.send(value: "a") // nothing printed
innerObserver2.send(value: "2") // prints 2
innerObserver1.send(value: "b") // nothing printed
innerObserver2.send(value: "3") // prints 3
innerObserver1.send(value: "c") // nothing printed

flatMap

レシーバから値を受け取ったタイミングで transform を実行し、transform の戻り値から受け取る値を送るようになります。(map してから flatten するのと同様の効果)

strategy には flatten と同様に FlattenStrategy を指定します。

let (outerSignal, outerObserver) = Signal<Int, NoError>.pipe()
let (innerSignal1, innerObserver1) = Signal<String, NoError>.pipe()
let (innerSignal2, innerObserver2) = Signal<String, NoError>.pipe()
let innerSignals = [innerSignal1, innerSignal2]

outerSignal
    .flatMap(.latest) { innerSignals[$0] }
    .observeValues { print($0) }

outerObserver.send(value: 0)
innerObserver1.send(value: "a") // prints a
innerObserver2.send(value: "1") // nothing printed
innerObserver1.send(value: "b") // prints b
innerObserver2.send(value: "2") // nothing printed
outerObserver.send(value: 1)
innerObserver1.send(value: "c") // nothing printed
innerObserver2.send(value: "3") // prints 3

flatMapError

flatMapError を使うことで、レシーバが失敗したときに transform が返す新しい SignalProducer をスタートするようになります。

let (signal, observer) = Signal<Int, NSError>.pipe()
let error = NSError(domain: "com.example.foo", code: 42, userInfo: nil)

SignalProducer(signal)
    .flatMapError { _ in SignalProducer(value: 0) }
    .startWithValues { print($0) }

observer.send(value: 1)     // prints 1
observer.send(value: 2)     // prints 2
observer.send(error: error) // prints 0
30
21
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
30
21