■基本(プロトコルとそれを実装したクラス)
<入力 = 受け取る側 = Consumer>
RxSwiftのObserverは CombineのSubscriber、Kotlin FlowのFlowCollector
-> データを購読可能にするプロトコル
※実際に相当する実装は呼び出し元の以下処理
.subscribe {} の クロージャ
.sink {} の クロージャ
.collect {} の ラムダ関数
<出力 = 提供する側 = Provider(Cold)>
RSwiftのObservableはCombineのPublisherでKotlin FlowのFlow
-> データを提供可能にするプロトコル
※補足:RxSwiftでは他にSingle/Maybe/CompletableなどもColdに当たる
<入力+出力(Hot)>
RxSwiftのSubjectは CombineのSubjectでKotlin FlowのSharedFlowまたはStateFlow
->RxsWItのObservable と Observable 両方を実装したクラス
(Combineでは PublisherとSubscriberを両方実装したクラス)
(Kotlin Flowでは FlowCollector と Flow を両方実装したクラス)
※補足:RxSwiftでは他にDriver/Signal/BehaviorRelay/PublishRelayなどもHotに当たる
RxSwiftの例)
// 以下はObservableの機能
let observable = Observable.just("Hello")
observable.subscribe (onNext: { value in
// ここはObserverの機能
print(value)
})
// 以下はSubjectの機能
let subject = PublishSubject<String>()
subject.subscribe(onNext: { value in
print("Observer1: \(value)")
})
subject.onNext("Hello")
Combineの例)
// 以下はPublisherの機能
let publisher = Just("Hello")
publisher.sink(receiveValue: { value in
// ここはSubscriberの機能
print(value) // "Hello"が出力される
}
// 以下はSubjectの機能
let subject = PassthroughSubject<String, Never>)
subject.sink(receiveValue: { value in print ("Subscriber1: \(value)")
})
subject.send ("Hello")
Kotlin Flowの例)
import android.os.Bundle
import androidx.activity.ComponentActivity
import androidx.lifecycle.lifecycleScope
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
class MainActivity : ComponentActivity() {
// Observable / Publisher に対応:flowOf(Coldなストリーム)
private val flow: Flow<String> = flowOf("Hello")
// Subject に対応:SharedFlow(Hotなストリーム)
private val sharedFlow = MutableSharedFlow<String>(replay = 0)
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
// Flowの購読(=Observableの購読に対応)
lifecycleScope.launch {
flow.collect { value ->
// ここはFlowCollectorの機能(Observer に対応)
println(value) // → "Hello"
}
}
// SharedFlow の購読(=Subjectの購読に対応)
lifecycleScope.launch {
sharedFlow.collect { value ->
// ここはFlowCollectorの機能(Observer に対応)
println("Observer1: $value") // → "Hello"
}
}
// SharedFlowに値を発行(=Subject.send/onNextに対応)
lifecycleScope.launch {
sharedFlow.emit("Hello")
}
}
}
■基本(HotとCold)
Cold:購読開始時に最初から値が流れる(過去の値ではなく、購読開始時点で新しく流れる値)
用途:
Hot:購読開始後の新しい値のみを取得(過去の値は取得できない)
->発行者であるSubjectは全てHot
HotなObservable の主な用途:
- パフォーマンス最適化:重い処理の重複実行を防ぐ
- リソース効率:ネットワーク通やデータベースアクセスの最適化
- UI安全性:エラーや完了での予期せぬ終了を防ぐ
- イベント共有:複数の処理で同じイベントを使用
- データ共有:複数の画面で同じデータを効率的に使用
- キャッシュ機能:データの再利用を可能にするエラー処理:統一されたエラーハンドリング
これらの用途でHotなObservableを使用することで、アプリケーションのパフォーマンスと保守性を向上させることができる
※Observableにshare()をつけるとことでHotに変換できる
let coldObservable = Observable.just("Hello")
let hotObservable = Observable.just ("Hello").share()
let coldPublisher = Just("Hello")
let hotPublisher = Just ("Hello") .share()
■エラーで終了するSubject(Hot)
<最新値を保持する>
RxSwiftの BehaviorSubjectは Combine のCurrentValueSubjectに相当
Kotlin FlowではStateFlow
<最新値を保持しない>
RxSwiftのPublishSubject は Combine の PassthroughSubjectに相当
Kotlin FlowではSharedFlow
■エラーで終了しないSubject(Hot)
エラー終了させないためのRelayTypeを実装し、Hot変換したSubject
<最新値を保持する>
RxSwift の BehaviorRelay
->Combineでは同様の機能は用意されていないため@Published で実現
<最新値保持しない>
RxSwift の PublishRelay
->Combineでは同様の機能は用意されていないためPublisherをカスタムする
■Mainスレッドで実行し、エラー終了しないSubject(Hot)
RelayをMainスレッドで実行するよう特化
<最新値を保持する>
RSwift の Driver
->Combineでは同様の機能は用意されていないためカスタム CurrentValueSubjectで実現
(@MainActor + CurrentValueSubject + share() + catch内でJust(デフォルト値))
<最新値を保持しない>
RxSwift のSignal
->Combineでは同様の機能は用意されていないためカスタム PassthroughSubjectで実現
(@MainActor + PassthroughSubject + share() + catch内でJust(デフォルト値))
※DriverやSignalも内部ではカスタムのObservableをHotにしている
(Observable + share() + catch内でJust(デフォルト値))
■Cold->Hot変換オペレータについて
通常のObservableはshare()でHotになる。
ConnectableObservable はconnect()でHotになる。
※Connectable Observableとは、replay(), publish(), multicast())などのモディファイアで作成されたObservable
■replay オペレータについて
指定した数の値をキャッシュするConnectableObservableを生成する。
つまり、replay(1)にすると初期値をもてるということ。
※replay(O) はPublishSubjectと同じなので利用用途なし
<replay(1)の例>
let publishSubject = PublishSubject<Int> ()
let replayed = publishSubject.replay(1)
replayed.connect()//重要:Hotにするために必要。
// connectがなければ、初期値のもてるPublishSubiectに相当
// connectがあれば、初期値のもてるPublishRelay に相当
// 複数のObserverが同じストリームを共有
replayed.subscribe(onNext: { print("'Observer 1: \($0)") }) replayed.subscribe(onNext: { print("Observer 2: \($0)") })
publishSubject.onNext(1)
publishSubject.onNext(2)
//新しいObserverが後から subscribe
replayed.subscribe(onNext: { print("Observer 3: \($0)") })
// 出力:Observer 3: 2(最新値が即座に発行される)
<複数Observerでreplay(2)の例>
let publishSubject = PublishSubject<Int>()
let replayed = publishSubject.replay(2)
replayed.connect)
//最初のObserver
replayed.subscribe(onNext: { print("Observer 1: \($0)") })
publishSubject.onNext(1)
publishSubject.onNext(2)
//2番目のObserver (最新の2つの値を受け取る)
replayed.subscribe(onNext: { print("Observer 2: \($0)") })
//出力:
// Observer 2: 1
// Observer 2: 2
publishSubject.onNext(3)
//3番目のObserver (最新の2つの値を受け取る)
replayed.subscribe(onNext: { print("Observer 3: \($0)") })
//出力:
// Observer 3: 2
// Observer 3: 3
■スレッド保証について
スレッド保証とは、Observableがどのスレッドで値を発行するかが保証されているかどうかを指す。
1.スレッド保証なし (Behaviorsubject、 BehaviorRelay)
let behavior Subject = BehaviorSubject<Int> (value: 0)
//どのスレッドで値が発行されるかは保証されていない
DispatchQueue.global().async {
behaviorSubject.onNext(1) //バックグラウンドスレッドで発行
}
DispatchQueue.main.async {
behaviorSubject.onNext(2)//メインスレッドで発行
}
behavior Subject.subscribe(onNext: { value in
print ("Current thread: \(Thread.current)")
// 発行されたスレッドで実行される(保証なし)
}
- Mainスレッド保証(Driver、Signal)
let driver = Observable.just(1).asDriver(onErrorJustReturn: 0)
driver.drive (onNext: { value in
print("Current thread: \(Thread.current)")
//常に Mainスレッドで実行される(保証あり)
})
✅ Cold の用途(API通信の例)
① 購読のたびに毎回リクエストしてほしいから
Cold な Observable(例: Observable.create
, Single.create
, Flow
, Future
など)は、購読されるたびに処理を実行します。
let api = Observable.create { observer in
print("API送信")
observer.onNext("レスポンス")
observer.onCompleted()
return Disposables.create()
}
api.subscribe() // ← "API送信"
api.subscribe() // ← "API送信"(もう一度呼ばれる)
→ ユーザーが何度ボタンを押しても、新しい通信が行われる
② 副作用の再利用は危険(Hotにするとキャッシュされうる)
たとえば .share()
や Subject
を経由してしまうと、処理が 1回しか実行されず、あとはキャッシュが返ってしまうことがあります。
let sharedApi = api.share()
sharedApi.subscribe() // "API送信"
sharedApi.subscribe() // 🚨 実行されない!過去のキャッシュが返る
→ APIが送られない=致命的なバグにつながる
③ スレッドセーフで、他と干渉しない
Cold であれば、購読ごとに独立した処理が走るため、状態を共有しない=競合が起きにくい。
例:同時に複数のユーザーがボタンを押しても、通信が混ざらない。
④ テスト・モックがしやすい
Cold な API は副作用を内部に閉じ込めているため、テストでモックに差し替えやすい。
func fetchUser(): Observable<User> {
return Observable.just(User(name: "Mock"))
}
→ DIやモックが容易
⑤ UIロジックとの分離がしやすい
Cold Observable は**イベント駆動で処理を「遅延実行」**できるため、
UIイベント(タップなどのHot)と組み合わせて使いやすい。
button.rx.tap
.flatMap { fetchUser() } // Coldだからボタン押すたびに実行される
.bind(to: userLabel.rx.text)
❌ Hot にすると起こる問題の例
問題 | 内容 |
---|---|
処理が1回しか走らない |
.share() された Observable では、購読しても再実行されない |
キャッシュの再送が起こる |
.replay() などで古いレスポンスを再送することがある |
テストしづらい |
Subject や BehaviorRelay に埋め込まれると副作用が見えにくくなる |
状態管理と副作用が混在する | =デバッグしづらい、保守性が下がる |
✅ Hot の用途
1. パフォーマンス最適化:重い処理の重複実行を防ぐ
✅ 例:API通信を複数画面から同時に使う場合
let sharedAPIResult = fetchUserInfo()
.share(replay: 1, scope: .forever) // Hot化してキャッシュ&共有
// 複数の購読先
sharedAPIResult
.bind(to: label1.rx.text)
.disposed(by: disposeBag)
sharedAPIResult
.bind(to: label2.rx.text)
.disposed(by: disposeBag)
🔍 ポイント:fetchUserInfo()
が一度だけ実行され、結果が複数UIに共有される。
2. リソース効率:ネットワーク通やデータベースアクセスの最適化
✅ 例:ログイン中ユーザー情報を複数ViewModelで使い回す
let userSubject = BehaviorSubject<User?>(value: nil)
UserAPI.getCurrentUser()
.bind(to: userSubject) // 一度取得したらSubjectで共有
.disposed(by: disposeBag)
// 画面A
userSubject
.compactMap { $0 }
.bind(to: viewModelA.user)
.disposed(by: disposeBag)
// 画面B
userSubject
.compactMap { $0 }
.bind(to: viewModelB.user)
.disposed(by: disposeBag)
🔍 ポイント:APIを何度も呼ばず、1回取得した値を複数箇所で使える。
3. UI安全性:エラーや完了での予期せぬ終了を防ぐ
✅ 例:Subjectを使ってUIイベントを管理
let tapSubject = PublishSubject<Void>()
button.rx.tap
.bind(to: tapSubject)
.disposed(by: disposeBag)
// 何度でも安全に使える
tapSubject
.subscribe(onNext: {
print("ボタンが押された")
})
.disposed(by: disposeBag)
🔍 ポイント:エラーやcompleteが流れないよう管理され、UIイベントとして再利用可能。
4. イベント共有:複数の処理で同じイベントを使用
✅ 例:ログアウトイベントをアプリ全体で監視
let logoutSubject = PublishSubject<Void>()
// 画面A: ユーザーがログアウトボタンを押す
logoutButton.rx.tap
.bind(to: logoutSubject)
.disposed(by: disposeBag)
// 画面B: ログアウト検知して初期画面に戻る
logoutSubject
.subscribe(onNext: {
navigator.navigateToLoginScreen()
})
.disposed(by: disposeBag)
5. データ共有:複数の画面で同じデータを効率的に使用
✅ 例:チャットの新着メッセージを全体で購読
let messageSubject = ReplaySubject<Message>.create(bufferSize: 10)
webSocket.receiveMessages()
.bind(to: messageSubject)
.disposed(by: disposeBag)
// 複数画面がメッセージを受信
messageSubject
.subscribe(onNext: { message in
print("受信: \(message.text)")
})
.disposed(by: disposeBag)
6. キャッシュ機能:データの再利用を可能にする
✅ 例:一度取得した天気情報をキャッシュ
let weatherObservable = WeatherAPI.fetchTodayWeather()
.share(replay: 1, scope: .forever)
weatherObservable
.subscribe(onNext: { weather in
print("今日の天気: \(weather)")
})
.disposed(by: disposeBag)
7. エラー処理:統一されたエラーハンドリング
✅ 例:共通エラーハンドリング機構
let errorSubject = PublishSubject<Error>()
someObservable
.catch { error in
errorSubject.onNext(error)
return .empty()
}
.subscribe()
.disposed(by: disposeBag)
errorSubject
.subscribe(onNext: { error in
showAlert(message: error.localizedDescription)
})
.disposed(by: disposeBag)
✅ まとめ表(一覧)
用途 | 主な手法 | 代表例 |
---|---|---|
パフォーマンス | .share(replay:) |
API結果を複数購読 |
リソース効率 | BehaviorSubject |
DB/APIの結果共有 |
UI安全性 | PublishSubject |
ボタンタップ等のイベント |
イベント共有 | Subject |
ログアウトなどの通知 |
データ共有 |
ReplaySubject , BehaviorSubject
|
チャットメッセージなど |
キャッシュ機能 | share(scope: .forever) |
天気API、商品一覧など |
エラー処理 | catch + Subject |
エラー発生時に共通UIへ通知 |
必要があれば、各例をファイル単位でプロジェクト構成してお渡しすることもできます。ご希望ですか?
Cold -> Hot変換の例
:API通信の共通処理は購読するたびに実行する必要があるのでCold
画面内で上記APIの結果を使いまわしたい場合、APIを何度も走らせたくないので、Hotに変換して共有
let sharedAPIResult = fetchUserInfo()
.share(replay: 1, scope: .forever) // Hot化してキャッシュ&共有
sharedAPIResult
.bind(to: label1.rx.text)
.disposed(by: disposeBag)
sharedAPIResult
.bind(to: label2.rx.text)
.disposed(by: disposeBag)
✅ このコードの意図と構成
🔸 fetchUserInfo()
は Cold な Observable
- 定義: 購読されるたびに API 通信が走る
- 性質: 毎回「新規処理」が走る(=副作用がある)
func fetchUserInfo() -> Observable<String> {
return Observable.create { observer in
print("APIリクエスト送信")
// 通信処理…
observer.onNext("ユーザー名")
observer.onCompleted()
return Disposables.create()
}
}
🔸 .share(replay: 1, scope: .forever)
で Hot に変換+キャッシュ
- Hot化の目的: 「画面内の複数購読先で、同じ結果を使い回したい」
-
.share(replay: 1)
: 最初の購読でfetchUserInfo()
が実行され、以降の購読者にはその値を即返す -
.scope: .forever
: Disposeされてもキャッシュは保持される(この画面内で共有するには便利)
🔸 複数の購読者がいても再リクエストされない
- つまり、「最初の1回だけAPI呼び出しが行われる」
- label1, label2 両方で表示しても通信は1回だけ
✅ このパターンが有効なケース
シチュエーション | なぜこの方法が適切? |
---|---|
画面内で複数の UI パーツが同じデータを使う | 毎回リクエストすると無駄&負荷大 |
viewDidLoad() 時に1度だけ取得し、画面内で使いまわす |
キャッシュしないと同じ処理が走ってしまう |
API結果が画面の複数箇所に影響する | 「状態の一貫性」を保つために共有が必要 |
❗️注意:この Hot 化をそのままグローバルに使い回すのは危険
たとえば…
let sharedUserInfo = fetchUserInfo()
.share(replay: 1, scope: .forever) // ずっとキャッシュ
// 他の画面やViewModelからも使いまわす…
→ キャッシュがずっと残る or 想定しないタイミングでリクエストされるリスクあり
🔧 対策
- スコープを限定(例:
.scope: .whileConnected
) - 再購読を明示(例:リロードボタンで
fetchUserInfo()
を再実行)
🎯 まとめ
要素 | 説明 |
---|---|
fetchUserInfo() |
✅ Cold:毎回リクエストされる処理 |
.share(replay: 1) |
✅ Hot化+1回分のキャッシュを保持 |
複数の .bind(...)
|
✅ 同じ結果を UI に使い回せる |
ベストプラクティス? | ✅ 特定の画面内限定で共有したい時には非常に良い使い方 |