こんにちは、なんちゃってモバイルアプリ開発者のかたおかです。
半年程前から RxSwift の代わりに ReactiveSwift を使い始めて気に入っています。
ReactiveSwift に登場するオペレータについては Basic Operators にある程度解説がありますが、全てではありません。
API リファレンス には全てのオペレータがまとまっていますが、サンプルコードなしではなかなか動作のイメージしづらいこともあります。
というわけで、リファレンスとして Signal/SignalProducer 共通で使えるオペレータをサンプルコードと一緒にまとめてみました。
(サンプルコードは基本 Signal を使って書いていますが、SignalProducer でも同様に動作すると思います。たぶん)
map
map
を使うことで値の変換ができます。
map(_ transform:)
クロージャで値を受け取って新しい値を返すパターン。
let (signal, observer) = Signal<Int, NoError>.pipe()
signal
.map { $0 * 2 } // map(_ transform:)
.observeValues { print($0) }
observer.send(value: 1) // prints 2
observer.send(value: 2) // prints 4
observer.send(value: 3) // prints 6
map(value:)
直接新しい値を指定するパターン。
let (signal, observer) = Signal<Int, NoError>.pipe()
signal
.map(value: 1) // map(value:)
.observeValues { print($0) }
observer.send(value: 1) // prints 1
observer.send(value: 2) // prints 1
observer.send(value: 3) // prints 1
map(_ keyPath:)
KeyPath
を指定するパターン。
let (signal, observer) = Signal<String, NoError>.pipe()
signal
.map(\String.count) // map(_ keyPath:)
.observeValues { print($0) }
observer.send(value: "foo") // prints 3
observer.send(value: "foobar") // prints 6
mapError
map
では値(Value
)の変換ができるのに対し、mapError
ではエラー(Error
)の変換ができます。
enum CustomError: String, Error {
case foo
case bar
case other
}
let (signal, observer) = Signal<String, NSError>.pipe()
let error = NSError(domain: "com.example.foo", code: 42, userInfo: nil)
signal
.mapError { error -> CustomError in // mapError(_ transform:)
switch error.domain {
case "com.example.foo":
return .foo
case "com.example.bar":
return .bar
default:
return .other
}
}
.observeFailed { print($0) }
observer.send(error: error) // prints foo
lazyMap
map
と同様に値の変換ができます。ただし、引数のクロージャは指定した Scheduler 上で遅延評価されるようになります。
let (signal, observer) = Signal<Int, NoError>.pipe()
let scheduler = TestScheduler()
signal
.lazyMap(on: scheduler, transform: { $0 * 2 }) // lazyMap(on scheduler:, transform:)
.observeValues { print($0) }
observer.send(value: 1) // nothings printed
scheduler.advance() // prints 2
observer.send(value: 2) // nothings printed
scheduler.advance() // prints 4
filter
filter
を使うことで値のフィルタリングができます。
レシーバから受け取った値に対して引数のクロージャが true
を返す場合のみ、その値を送るようになります。
let (signal, observer) = Signal<Int, NoError>.pipe()
signal
.filter { $0 % 2 == 0 } // filter(_ isIncluded:)
.observeValues { print($0) }
observer.send(value: 1) // nothings printed
observer.send(value: 2) // prints 2
observer.send(value: 3) // nothings printed
observer.send(value: 4) // prints 4
filterMap
filterMap
を使うことで値のフィルタリングとアンラップが同時にできます。
レシーバから受け取った値に対して引数のクロージャが返すオプショナル型の値が nil
でない場合のみ、その値をアンラップして送るようになります。
let (signal, observer) = Signal<String?, NoError>.pipe()
signal
.filterMap { $0?.uppercased() } // filterMap(_ transform:)
.observeValues { print($0) }
observer.send(value: "foo") // prints FOO
observer.send(value: nil) // nothings printed
observer.send(value: "bar") // prints BAR
skipNil
レシーバから受け取った値をアンラップして送るようになります。nil
値は送られなくなります。
let (signal, observer) = Signal<Int?, NoError>.pipe()
signal
.skipNil() // skipNil()
.observeValues { print($0) }
observer.send(value: 1) // prints 1
observer.send(value: nil) // nothings printed
observer.send(value: 2) // prints 2
take
take
を使うことで完了を制御することができます。
take(first count:)
指定回数(count
)の値を送るまではレシーバから受け取った値を送りますが、指定回数の値を送った直後に完了するようになります。
let (signal, observer) = Signal<Int, NoError>.pipe()
signal
.take(first: 2) // take(first count:)
.observeValues { print($0) }
observer.send(value: 1) // prints 1
observer.send(value: 2) // prints 2
observer.send(value: 3) // nothings printed
take(during lifetime:)
引数の lifetime
が終了するまではレシーバから受け取った値を送り続けますが、lifetime
が終了するタイミングで完了するようになります。
let (signal, observer) = Signal<Int, NoError>.pipe()
let (lifetime, token) = Lifetime.make()
signal
.take(during: lifetime) // take(during lifetime:)
.observeValues { print($0) }
observer.send(value: 1) // prints 1
observer.send(value: 2) // prints 2
token.dispose()
observer.send(value: 3) // nothing printed
take(until trigger:)
引数の trigger
が値を送る or 完了するまではレシーバから受け取った値を送り続けますが、trigger
が値を送る or 完了するタイミングで完了するようになります。
let (signal, observer) = Signal<Int, NoError>.pipe()
let (triggerSignal, triggerObserver) = Signal<(), NoError>.pipe()
signal
.take(until: triggerSignal) // take(until trigger:)
.observeValues { print($0) }
observer.send(value: 1) // prints 1
observer.send(value: 2) // prints 2
triggerObserver.send(value: ())
observer.send(value: 3) // nothing printed
take(untilReplacement replacement:)
引数の replacement
が値を送るまではレシーバから受け取った値を送り続けますが、untilReplacement
が値が送り始めると、レシーバの値の代わりに untilReplacement
から受け取った値を送るようになります。
let (signal, observer) = Signal<Int, NoError>.pipe()
let (replacementSignal, replacementObserver) = Signal<Int, NoError>.pipe()
signal
.take(untilReplacement: replacementSignal) // take(untilReplacement replacement:)
.observeValues { print($0) }
observer.send(value: 1) // prints 1
observer.send(value: 2) // prints 2
replacementObserver.send(value: 3) // prints 3
observer.send(value: 4) // nothing printed
replacementObserver.send(value: 5) // prints 5
take(last count:)
レシーバが完了したタイミングで、レシーバから受け取った値のうち最後から count
番目までの値を送るようになります。
let (signal, observer) = Signal<Int, NoError>.pipe()
signal
.take(last: 2) // take(last count:)
.observeValues { print($0) }
observer.send(value: 1)
observer.send(value: 2)
observer.send(value: 3)
observer.send(value: 4)
observer.sendCompleted() // prints 3, 4
take(while shouldContinue:)
レシーバから受け取った値に対して引数の shouldContinue
が true
を返す間は受け取った値を送り続けますが、shouldContinue
が false
を返すタイミングで完了するようになります。
let (signal, observer) = Signal<Int, NoError>.pipe()
signal
.take(while: { $0 != 3 }) // take(while shouldContinue:)
.observeValues { print($0) }
observer.send(value: 1) // prints 1
observer.send(value: 2) // prints 2
observer.send(value: 3) // nothing printed
observer.send(value: 4) // nothing printed
collect
collect
を使うことで値を配列にまとめることができます。
collect()
レシーバが完了したタイミングですべての値を 1 つの配列にまとめて送るようになります。
let (signal, observer) = Signal<Int, NoError>.pipe()
signal
.collect() // collect()
.observeValues { print($0) }
observer.send(value: 1)
observer.send(value: 2)
observer.send(value: 3)
observer.sendCompleted() // prints [1, 2, 3]
collect(count:)
レシーバから受け取った値が指定回数分まとまるタイミングで、値を 1 つの配列にまとめて送るようになります。
let (signal, observer) = Signal<Int, NoError>.pipe()
signal
.collect(count: 2) // collect(count:)
.observeValues { print($0) }
observer.send(value: 1) // nothing printed
observer.send(value: 2) // prints [1, 2]
observer.send(value: 3) // nothing printed
observer.send(value: 4) // prints [3, 4]
collect(_ shouldEmit:)
レシーバから受け取った値に対して引数の shouldEmit
が true
を返すタイミングで、それまで受け取った値を 1 つの配列にまとめて送るようになります。
let (signal, observer) = Signal<Int, NoError>.pipe()
signal
.collect({ $0.reduce(0, +) == 5 })
.observeValues { print($0) }
observer.send(value: 1) // nothing printed
observer.send(value: 4) // prints [1, 4]
observer.send(value: 1) // nothing printed
observer.send(value: 3) // nothing printed
observer.send(value: 1) // prints [1, 3, 1]
collect(every interval:, on scheduler, skipEmpty:, discardWhenCompleted:)
指定時間(interval
)ごとに、レシーバから受け取った値を 1 つの配列にまとめて送るようになります。
skipEmpty
では、送るべき値が何もない場合に空配列を送るか or 何も送らないかを指定できます。
discardWhenCompleted
では、まだ送りきっていない値がある状態で完了した場合に次のインターバルで送るか or 何も送らないかを指定できます。
let (signal, observer) = Signal<Int, NoError>.pipe()
let scheduler = TestScheduler()
signal
.collect(every: .seconds(1), on: scheduler, skipEmpty: false, discardWhenCompleted: false) // collect(every interval:, on scheduler, skipEmpty:, discardWhenCompleted:)
.observeValues { print($0) }
observer.send(value: 1)
observer.send(value: 2)
scheduler.advance(by: .seconds(1)) // prints [1, 2]
observer.send(value: 3)
observer.send(value: 4)
scheduler.advance(by: .seconds(1)) // prints [3, 4]
scheduler.advance(by: .seconds(1)) // prints []
observer.send(value: 5)
observer.sendCompleted()
scheduler.advance(by: .seconds(1)) // prints [5]
combineLatest
レシーバもしくは引数の Signal/SignalProducer いずれかから値を受け取ったタイミングで、各々から最後に受け取った値を含むタプルを送るようになります。
let (numbersSignal, numbersObserver) = Signal<Int, NoError>.pipe()
let (lettersSignal, lettersObserver) = Signal<String, NoError>.pipe()
numbersSignal
.combineLatest(with: lettersSignal) // combineLatest(with other:)
.observeValues { print($0) }
numbersObserver.send(value: 0) // nothing printed
numbersObserver.send(value: 1) // nothing printed
lettersObserver.send(value: "A") // prints (1, "A")
lettersObserver.send(value: "B") // prints (1, "B")
numbersObserver.send(value: 2) // prints (2, "B")
merge
レシーバもしくは引数の Signal/SignalProducer いずれかから値を受け取ったタイミングで、その値を送るようになります。
let (signal, observer) = Signal<Int, NoError>.pipe()
let (otherSignal, otherObserver) = Signal<Int, NoError>.pipe()
signal
.merge(with: otherSignal) // merge(with other:)
.observeValues { print($0) }
observer.send(value: 1) // prints 1
observer.send(value: 2) // prints 2
otherObserver.send(value: 3) // prints 3
observer.send(value: 4) // prints 4
otherObserver.send(value: 5) // prints 5
otherObserver.send(value: 6) // prints 6
delay
値を送るタイミングを遅らせることができます。
let (signal, observer) = Signal<Int, NoError>.pipe()
let scheduler = TestScheduler()
signal
.delay(2, on: scheduler) // delay(_ interval:, on scheduler:)
.observeValues { print($0) }
observer.send(value: 1)
scheduler.advance(by: .seconds(1))
scheduler.advance(by: .seconds(2)) // prints 1
skip
skip
を使うことで値を送らないようにする(スキップ)ことができます。
skip(first count:)
レシーバから受け取った値を指定回数分スキップするようになります。
let (signal, observer) = Signal<Int, NoError>.pipe()
signal
.skip(first: 2) // skip(first count:)
.observeValues { print($0) }
observer.send(value: 1) // nothing printed
observer.send(value: 2) // nothing printed
observer.send(value: 3) // prints 3
observer.send(value: 4) // prints 4
skip(until trigger:)
引数の trigger
が値を送る or 完了するまではレシーバからのイベントをスキップするようになります。
let (signal, observer) = Signal<Int, NoError>.pipe()
let (triggerSignal, triggerObserver) = Signal<(), NoError>.pipe()
signal
.skip(until: triggerSignal) // skip(until trigger:)
.observeValues { print($0) }
observer.send(value: 1) // nothing printed
observer.send(value: 2) // nothing printed
triggerObserver.send(value: ())
observer.send(value: 3) // prints 3
observer.send(value: 4) // prints 4
skip(while shouldContinue:)
引数の shouldContinue
が false
を返すまではレシーバからのイベントをスキップするようになります。
let (signal, observer) = Signal<Int, NoError>.pipe()
signal
.skip(while: { $0 <= 2 }) // skip(while shouldContinue:)
.observeValues { print($0) }
observer.send(value: 1) // nothing printed
observer.send(value: 2) // nothing printed
observer.send(value: 3) // prints 3
observer.send(value: 4) // prints 4
materialize
materialize
を使うことでレシーバからのイベントをすべて値として扱えるようになります。
例えば、Signal<Int, NSError>
を materialize すると Signal<Signal<Int, NSError>.Event, NoError>
になります。
let (signal, observer) = Signal<Int, NSError>.pipe()
let error = NSError(domain: "com.example.foo", code: 42, userInfo: nil)
signal
.materialize() // materialize()
.observeValues {
switch $0 {
case .value:
print($0.value!)
case .failed:
print($0.error!)
default:
break
}
}
observer.send(value: 1) // prints 1
observer.send(error: error) // prints Error Domain=com.example.foo Code=42 "(null)"
dematerialize
dematerialize
を使うことで materialize
と逆のことができます。
例えば、Signal<Signal<Int, NSError>.Event, NoError>
を dematerialize すると Signal<Int, NSError>
になります。
let (signal, observer) = Signal<Signal<Int, NSError>.Event, NoError>.pipe()
let error = NSError(domain: "com.example.foo", code: 42, userInfo: nil)
signal
.dematerialize() // dematerialize()
.observeResult {
switch $0 {
case .success:
print($0.value!)
case .failure:
print($0.error!)
}
}
observer.send(value: .value(1)) // prints 1
observer.send(value: .failed(error)) // prints Error Domain=com.example.foo Code=42 "(null)"
sample
sample
を使うことで値を送るタイミングを制御できます。
sample(with sampler:)
引数の sampler
から値を受け取ったタイミングで、レシーバから最後に受け取った値と sampler
から受け取った値をタプルで送るようになります。
let (signal, observer) = Signal<Int, NoError>.pipe()
let (sampler, samplerObserver) = Signal<String, NoError>.pipe()
signal
.sample(with: sampler) // sample(with sampler:)
.observeValues { print($0) }
samplerObserver.send(value: "a") // nothing printed
observer.send(value: 1) // nothing printed
observer.send(value: 2) // nothing printed
samplerObserver.send(value: "b") // prints (2, "b")
sample(on sampler:)
引数の sampler
から値を受け取ったタイミングで、レシーバから最後に受け取った値を送るようになります。
let (signal, observer) = Signal<Int, NoError>.pipe()
let (sampler, samplerObserver) = Signal<(), NoError>.pipe()
signal
.sample(on: sampler) // sample(with sampler:)
.observeValues { print($0) }
samplerObserver.send(value: ()) // nothing printed
observer.send(value: 1) // nothing printed
observer.send(value: 2) // nothing printed
samplerObserver.send(value: ()) // prints 2
withLatest
レシーバから値を受け取ったタイミングで samplee
から受け取った最後の値を送るようになります。
sample(with sampler:)
のレシーバと引数を入れ替えたバージョンです。
let (signal, observer) = Signal<Int, NoError>.pipe()
let (samplee, sampleeObserver) = Signal<String, NoError>.pipe()
signal
.withLatest(from: samplee) // withLatest(from samplee:)
.observeValues { print($0) }
observer.send(value: 1) // nothing printed
sampleeObserver.send(value: "a") // nothing printed
sampleeObserver.send(value: "b") // nothing printed
observer.send(value: 2) // prints (2, "b")
combinePrevious
combinePrevious
を使うことで前回の送られた値と今回送られた値を同時に送ることができます。
combinePrevious(_ initial:)
レシーバから値を受け取ると、前回受け取った値と今回受け取った値をタプルで受け取るようになります。
初めて値を受け取ったときは、指定された initial
と受け取った値のタプルを送ります。
let (signal, observer) = Signal<Int, NoError>.pipe()
signal
.combinePrevious(0) // combinePrevious(_ initial:)
.observeValues { print($0) }
observer.send(value: 1) // prints (0, 1)
observer.send(value: 2) // prints (1, 2)
observer.send(value: 3) // prints (2, 3)
combinePrevious()
combinePrevious(_ initial:)
と同様にレシーバから値を受け取ると、前回受け取った値と今回受け取った値をタプルで受け取るようになります。
ただし、初めて値を受け取ったときには何も送りません。
let (signal, observer) = Signal<Int, NoError>.pipe()
signal
.combinePrevious() // combinePrevious()
.observeValues { print($0) }
observer.send(value: 1) // nothing printed
observer.send(value: 2) // prints (1, 2)
observer.send(value: 3) // prints (2, 3)
reduce
reduce
を使うことで値の蓄積ができます。
Swift の Array にも同名のメソッドがありますが、これと同じ感じで使えます。
reduce(_ initialResult:, _ nextPartialResult:)
レシーバが完了したタイミングで蓄積した値を送るようになります。
let (signal, observer) = Signal<Int, NoError>.pipe()
signal
.reduce(1, { $0 + $1 }) // reduce(_ initialResult:, _ nextPartialResult:)
.observeValues { print($0) }
observer.send(value: 2)
observer.send(value: 3)
observer.send(value: 4)
observer.sendCompleted() // prints 10
reduce(into initialResult:, _ nextPartialResult:)
reduce(_ initialResult:, _ nextPartialResult:)
と異なり、nextPartialResult
に渡される値が inout
パラメータになります。
let (signal, observer) = Signal<Int, NoError>.pipe()
signal
.reduce(into: 1, { $0 += $1 }) // reduce(into initialResult:, _ nextPartialResult:)
.observeValues { print($0) }
observer.send(value: 2)
observer.send(value: 3)
observer.send(value: 4)
observer.sendCompleted() // prints 10
scan
scan
を使うことで reduce
と同様に値の蓄積ができます。ただ、reduce
とは異なり部分的な蓄積結果を扱うことができます。
scan(_ initialResult:, _ nextPartialResult:)
レシーバから値を受け取ったタイミングでそれまでに蓄積した値を送るようになります。
let (signal, observer) = Signal<Int, NoError>.pipe()
signal
.scan(1, { $0 + $1 }) // scan(_ initialResult:, _ nextPartialResult:)
.observeValues { print($0) }
observer.send(value: 2) // prints 3
observer.send(value: 3) // prints 6
observer.send(value: 4) // prints 10
scan(into initialResult:, _ nextPartialResult:)
scan(_ initialResult:, _ nextPartialResult:)
と異なり、nextPartialResult
に渡される値が inout
パラメータになります。
let (signal, observer) = Signal<Int, NoError>.pipe()
signal
.scan(into: 1, { $0 += $1 }) // scan(into initialResult:, _ nextPartialResult:)
.observeValues { print($0) }
observer.send(value: 2) // prints 3
observer.send(value: 3) // prints 6
observer.send(value: 4) // prints 10
skipRepeats
skipRepeats
を使うことで直前と同じ値だった場合のスキップができます。
skipRepeats()
レシーバから値を受け取ったとき、その値が直前の値と同じだった場合は値をスキップします。
let (signal, observer) = Signal<Bool, NoError>.pipe()
signal
.skipRepeats() // skipRepeats()
.observeValues { print($0) }
observer.send(value: true) // prints true
observer.send(value: true) // nothing printed
observer.send(value: false) // prints false
observer.send(value: true) // prints true
skipRepeats(_ isEquivalent:)
レシーバから値を受け取ったとき、引数の isEquivalent
が true
を返す場合は値をスキップします。
let (signal, observer) = Signal<String, NoError>.pipe()
signal
.skipRepeats { $0.count == $1.count } // skipRepeats(_ isEquivalent:)
.observeValues { print($0) }
observer.send(value: "a") // prints a
observer.send(value: "b") // nothing printed
observer.send(value: "cc") // prints cc
observer.send(value: "d") // prints d
zip
zip
を使うことで二つの Signal/SignalProducer の値のペアを作ることができます。
let (signal, observer) = Signal<Int, NoError>.pipe()
let (rightSignal, rightObserver) = Signal<String, NoError>.pipe()
signal
.zip(with: rightSignal) // zip(with other:)
.observeValues { print($0) }
observer.send(value: 1) // nothing printed
observer.send(value: 2) // nothing printed
rightObserver.send(value: "foo") // prints (1, "foo")
rightObserver.send(value: "bar") // prints (2, "bar")
rightObserver.send(value: "buzz") // nothing printed
observer.send(value: 3) // prints (3, "buzz")
throttle
throttle
を使うことで連続して値が送られるのを防ぐことができます。
throttle(_ interval:, on scheduler:)
最後に値を送った後 interval
秒過ぎるまでは新しい値を送らないようになります。
最後に値を送ってから interval
秒の間にレシーバから値を受け取ったときは、受け取ったうちの最後の値をinterval
秒が過ぎたタイミングで送ります。
let (signal, observer) = Signal<Int, NoError>.pipe()
let scheduler = TestScheduler()
signal
.throttle(1, on: scheduler) // throttle(_ interval:, on scheduler:)
.observeValues { print($0) }
observer.send(value: 1)
scheduler.advance() // prints 1
observer.send(value: 2)
observer.send(value: 3)
scheduler.advance(by: .seconds(1)) // prints 3
scheduler.advance(by: .seconds(3)) // nothing printed
observer.send(value: 4)
scheduler.advance() // prints 4
throttle(while shouldThrottle:, on scheduler:)
引数の shouldThrottle
が true
の間は値を送らないようになります。
shouldThrottle
が true
の間にレシーバから値を受け取ったときは、受け取った値のうち最後の値を shouldThrottle
が false
になったタイミングで送ります。
shouldThrottle
が false
の間にレシーバから値を受け取ったときは、受け取ったタイミングでその値を送ります。
let (signal, observer) = Signal<Int, NoError>.pipe()
let shouldThrottle = MutableProperty(false)
let scheduler = ImmediateScheduler()
signal
.throttle(while: shouldThrottle, on: scheduler) // throttle(while shouldThrottle:, on scheduler:)
.observeValues { print($0) }
observer.send(value: 1) // prints 1
observer.send(value: 2) // prints 2
shouldThrottle.value = true
observer.send(value: 3)
observer.send(value: 4)
shouldThrottle.value = false // prints 4
observer.send(value: 5) // prints 5
observer.send(value: 6) // prints 6
debounce
throttle
と同様、連続して値が送られるのを防ぐことができます。
レシーバから最後に値を受け取ってから interval
秒過ぎたタイミングで、その値を送るようになります。
interval
秒過ぎる前にレシーバから複数の値を受け取ったときは、最後に値を受け取ってからさらに interval
秒待った後、最後に受け取った値を送ります。
let (signal, observer) = Signal<Int, NoError>.pipe()
let scheduler = TestScheduler()
signal
.debounce(1, on: scheduler, discardWhenCompleted: true) // debounce(_ interval:, on scheduler:, discardWhenCompleted:)
.observeValues { print($0) }
observer.send(value: 1)
scheduler.advance() // nothing printed
observer.send(value: 2)
observer.send(value: 3)
scheduler.advance(by: .seconds(1)) // prints 3
scheduler.advance(by: .seconds(3)) // nothing printed
observer.send(value: 4)
scheduler.advance() // nothing printed
uniqueValues
uniqueValues
を使うことですでに送ったことがある値をスキップできます。
uniqueValues(_ transform:)
クロージャを渡して一意性チェックの対象となる値を指定するパターン。
let (signal, observer) = Signal<String, NoError>.pipe()
signal
.uniqueValues { $0.uppercased() } // uniqueValues(_ transform:)
.observeValues { print($0) }
observer.send(value: "A") // prints A
observer.send(value: "b") // prints b
observer.send(value: "c") // prints c
observer.send(value: "a") // nothing printed
observer.send(value: "B") // nothing printed
observer.send(value: "c") // nothing printed
uniqueValues()
引数なしパターン。
let (signal, observer) = Signal<String, NoError>.pipe()
signal
.uniqueValues() // uniqueValues()
.observeValues { print($0) }
observer.send(value: "A") // prints A
observer.send(value: "b") // prints b
observer.send(value: "c") // prints c
observer.send(value: "a") // prints a
observer.send(value: "B") // prints B
observer.send(value: "c") // nothing printed
timeout
interval
秒が過ぎるまではレシーバから受け取った値をそのまま送ります。
interval
秒が過ぎるまでにレシーバが完了しないと、interval
秒が過ぎたタイミングで指定した error
と共に失敗するようになります。
let (signal, observer) = Signal<Int, NoError>.pipe()
let error = NSError(domain: "com.example.foo", code: 42, userInfo: nil)
let scheduler = TestScheduler()
signal
.timeout(after: 1, raising: error, on: scheduler) // timeout(after interval:, raising error:, on scheduler:)
.observeResult {
switch $0 {
case .success:
print($0.value!)
case .failure:
print($0.error!)
}
}
observer.send(value: 1) // prints 1
observer.send(value: 2) // prints 2
scheduler.advance(by: .seconds(1)) // Error Domain=com.example.foo Code=42 "(null)"
promoteError
combineLatest
や merge
等を使う際には、組み合わせるイベントストリームの Error
の型を合わせる必要があります。
失敗しないイベントストリーム(例えば、Signal<Int, NoError>
)と失敗するイベントストリーム(例えば、Signal<Int, NSError>
)はそのままでは組み合わせられませんが、promoteError
を使うことで組み合わせられるようになります。
let numbersSignal = Signal<Int, NoError>.never
let lettersSignal = Signal<String, NSError>.never
let combined = numbersSignal
.promoteError(NSError.self) // promoteError(_:)
.combineLatest(with: lettersSignal)
print(type(of: combined)) // prints Signal<(Int, String), NSError>
promoteValue
promoteError
が Error
の型を変換するのに対し、promoteValue
は Value
の型を変換します。
let numbersSignal = Signal<Int, NoError>.never
let completable = Signal<Never, NoError>.never
let merged = numbersSignal.merge(with: completable.promoteValue() /* promoteValue(_:) */)
print(type(of: merged)) // prints Signal<Int, NoError>
negate
レシーバから受け取った Bool
値を反転させて送るようになります。
let (signal, observer) = Signal<Bool, NoError>.pipe()
signal
.negate() // negate()
.observeValues { print($0) }
observer.send(value: true) // prints false
and
レシーバから受け取った値と引数の booleans
から受け取った値の論理積(AND)を計算して送るようになります。
let (signal1, observer1) = Signal<Bool, NoError>.pipe()
let (signal2, observer2) = Signal<Bool, NoError>.pipe()
signal1
.and(signal2) // and(_ booleans:)
.observeValues { print($0) }
observer1.send(value: true)
observer2.send(value: true) // prints true
observer1.send(value: false) // prints false
observer2.send(value: false) // prints false
or
レシーバから受け取った値と引数の booleans
から受け取った値の論理和(OR)を計算して送るようになります。
let (signal1, observer1) = Signal<Bool, NoError>.pipe()
let (signal2, observer2) = Signal<Bool, NoError>.pipe()
signal1
.or(signal2) // or(_ booleans:)
.observeValues { print($0) }
observer1.send(value: true)
observer2.send(value: true) // prints true
observer1.send(value: false) // prints true
observer2.send(value: false) // prints false
attempt
レシーバから値を受け取ったタイミングで action
(エラーを送出する処理)を実行し、成功すれば受け取った値を送り、エラーが生じた場合はそのエラーと共に失敗するようになります。
func operation(value: Int?) throws {
if value == nil {
throw NSError(domain: "com.example.foo", code: 42, userInfo: nil)
}
}
let (signal, observer) = Signal<Int?, AnyError>.pipe()
signal
.attempt { try operation(value: $0) } // attempt(_ action:)
.observeResult {
switch $0 {
case .success:
print($0.value!)
case .failure:
print($0.error!)
}
}
observer.send(value: 1) // prints Optional(1)
observer.send(value: 2) // prints Optional(2)
observer.send(value: nil) // prints Error Domain=com.example.foo Code=42 "(null)"
attemptMap
レシーバから値を受け取ったタイミングで action
(エラーを送出する処理)を実行し、成功すれば action
の実行結果を送り、エラーが生じた場合はそのエラーと共に失敗するようになります。
func operation(value: Int?) throws -> Int {
guard let value = value else {
throw NSError(domain: "com.example.foo", code: 42, userInfo: nil)
}
return value * 2
}
let (signal, observer) = Signal<Int?, AnyError>.pipe()
signal
.attemptMap { try operation(value: $0) } // attemptMap(_ transform:)
.observeResult {
switch $0 {
case .success:
print($0.value!)
case .failure:
print($0.error!)
}
}
observer.send(value: 1) // prints 2
observer.send(value: 2) // prints 4
observer.send(value: nil) // prints Error Domain=com.example.foo Code=42 "(null)"
flatten
flatten
を使うことでストリーム・オブ・ストリーム(例えば、Signal<Signal<Int, NoError>, NoError>
)の内部ストリーム(Signal<Int, NoError>
)の値を送ることができるようになります。
これを仮にフラット化(flatten)と呼びます。
フラット化する際には、どのような戦略でフラット化するかを引数(FlattenStrategy
)で指定します。
FlattenStrategy.merge
.merge
を指定すると、受け取ったすべての内部ストリームのすべての値を送るようになります。
let (outerSignal, outerObserver) = Signal<Signal<String, NoError>, NoError>.pipe()
let (innerSignal1, innerObserver1) = Signal<String, NoError>.pipe()
let (innerSignal2, innerObserver2) = Signal<String, NoError>.pipe()
outerSignal
.flatten(.merge)
.observeValues { print($0) }
outerObserver.send(value: innerSignal1)
outerObserver.send(value: innerSignal2)
innerObserver1.send(value: "a") // prints a
innerObserver2.send(value: "1") // prints 1
innerObserver1.send(value: "b") // prints b
innerObserver2.send(value: "2") // prints 2
innerObserver1.send(value: "c") // prints c
innerObserver2.send(value: "3") // prints 3
FlattenStrategy.concat
.concat
を指定すると、一つ前に受け取った内部ストリームが完了してから次に受け取った内部ストリームの値を送るようになります。
let (outerSignal, outerObserver) = Signal<Signal<String, NoError>, NoError>.pipe()
let (innerSignal1, innerObserver1) = Signal<String, NoError>.pipe()
let (innerSignal2, innerObserver2) = Signal<String, NoError>.pipe()
outerSignal
.flatten(.concat)
.observeValues { print($0) }
outerObserver.send(value: innerSignal1)
outerObserver.send(value: innerSignal2)
innerObserver1.send(value: "a") // prints a
innerObserver2.send(value: "1") // nothing printed
innerObserver1.send(value: "b") // prints b
innerObserver2.send(value: "2") // nothing printed
innerObserver1.send(value: "c") // prints c
innerObserver1.sendCompleted()
innerObserver2.send(value: "3") // prints 3
innerObserver2.sendCompleted()
FlattenStrategy.concurrent
.concurrent
を指定すると limit
を上限としたすべての内部ストリームのすべての値を送るようになります。
受け取った内部ストリームが limit
を超えると、超えた分はエンキューされます(監視中の内部ストリームのいずれかが完了したタイミングでデキューされて監視がはじまります)。
limit
に 1
を指定すると、.concat
を指定したときと同じ動作になります。
let (outerSignal, outerObserver) = Signal<Signal<String, NoError>, NoError>.pipe()
let (innerSignal1, innerObserver1) = Signal<String, NoError>.pipe()
let (innerSignal2, innerObserver2) = Signal<String, NoError>.pipe()
let (innerSignal3, innerObserver3) = Signal<String, NoError>.pipe()
outerSignal
.flatten(.concurrent(limit: 2))
.observeValues { print($0) }
outerObserver.send(value: innerSignal1)
outerObserver.send(value: innerSignal2)
outerObserver.send(value: innerSignal3)
innerObserver1.send(value: "a") // prints a
innerObserver2.send(value: "1") // prints 1
innerObserver3.send(value: "!") // nothing printed
innerObserver1.send(value: "b") // prints b
innerObserver2.send(value: "2") // prints 2
innerObserver3.send(value: "@") // nothing printed
innerObserver1.send(value: "c") // prints c
innerObserver1.sendCompleted()
innerObserver2.send(value: "3") // prints 3
innerObserver3.send(value: "#") // prints #
FlattenStrategy.latest
.latest
を指定すると、最後に受け取った内部ストリームの値のみを送るようになります。
let (outerSignal, outerObserver) = Signal<Signal<String, NoError>, NoError>.pipe()
let (innerSignal1, innerObserver1) = Signal<String, NoError>.pipe()
let (innerSignal2, innerObserver2) = Signal<String, NoError>.pipe()
outerSignal
.flatten(.latest)
.observeValues { print($0) }
outerObserver.send(value: innerSignal1)
innerObserver1.send(value: "a") // prints a
innerObserver2.send(value: "1") // nothing printed
innerObserver1.send(value: "b") // prints b
innerObserver2.send(value: "2") // nothing printed
outerObserver.send(value: innerSignal2)
innerObserver1.send(value: "c") // nothing printed
innerObserver2.send(value: "3") // prints 3
FlattenStrategy.race
.race
を指定すると、受け取った内部ストリームのうち最初に値を受け取った内部ストリームの値のみを送るようになります。
let (outerSignal, outerObserver) = Signal<Signal<String, NoError>, NoError>.pipe()
let (innerSignal1, innerObserver1) = Signal<String, NoError>.pipe()
let (innerSignal2, innerObserver2) = Signal<String, NoError>.pipe()
outerSignal
.flatten(.race)
.observeValues { print($0) }
outerObserver.send(value: innerSignal1)
outerObserver.send(value: innerSignal2)
innerObserver2.send(value: "1") // prints 1
innerObserver1.send(value: "a") // nothing printed
innerObserver2.send(value: "2") // prints 2
innerObserver1.send(value: "b") // nothing printed
innerObserver2.send(value: "3") // prints 3
innerObserver1.send(value: "c") // nothing printed
flatMap
レシーバから値を受け取ったタイミングで transform
を実行し、transform
の戻り値から受け取る値を送るようになります。(map
してから flatten
するのと同様の効果)
strategy
には flatten
と同様に FlattenStrategy
を指定します。
let (outerSignal, outerObserver) = Signal<Int, NoError>.pipe()
let (innerSignal1, innerObserver1) = Signal<String, NoError>.pipe()
let (innerSignal2, innerObserver2) = Signal<String, NoError>.pipe()
let innerSignals = [innerSignal1, innerSignal2]
outerSignal
.flatMap(.latest) { innerSignals[$0] }
.observeValues { print($0) }
outerObserver.send(value: 0)
innerObserver1.send(value: "a") // prints a
innerObserver2.send(value: "1") // nothing printed
innerObserver1.send(value: "b") // prints b
innerObserver2.send(value: "2") // nothing printed
outerObserver.send(value: 1)
innerObserver1.send(value: "c") // nothing printed
innerObserver2.send(value: "3") // prints 3
flatMapError
flatMapError
を使うことで、レシーバが失敗したときに transform
が返す新しい SignalProducer
をスタートするようになります。
let (signal, observer) = Signal<Int, NSError>.pipe()
let error = NSError(domain: "com.example.foo", code: 42, userInfo: nil)
SignalProducer(signal)
.flatMapError { _ in SignalProducer(value: 0) }
.startWithValues { print($0) }
observer.send(value: 1) // prints 1
observer.send(value: 2) // prints 2
observer.send(error: error) // prints 0