【ReactiveExtensions】Multicastの使いドコロ

  • 6
    Like
  • 0
    Comment
More than 1 year has passed since last update.

UniRxについての記事のまとめはこちら


Multicastオペレータとは何か

RxのMulticastオペレータはHot変換用オペレータの1つです。
引数で渡したSubjectに各メッセージを伝達するとても単純明快なオペレータです。

Multicastの例
var subject = new Subject<long>();
var timerStream = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));

//Multicastの引数はSubject
var timerObservable = timerStream.Multicast(subject);
//ConnectしたタイミングでMulticastがSubscribeを行う
timerObservable.Connect();

//↓ Multicast+Connectと同義
//timerStream.Subscribe(subject);

//さらにわかりやすく書くとこう
//timerStream.Subscribe(x => subject.OnNext(x), ex => subject.OnError(ex), () => subject.OnCompleted());

ちなみに、各Hot変換オペレータをMulticastを使って表現すると以下の通りとなります (参考)

  • Publish() = .Multicast(new Subject)
  • PublishLast() = .Multicast(new AsyncSubject)
  • Replay() = .Multicast(new ReplaySubject)

Multicastの使いドコロ

Subscribeだけ先に行い、後からストリームをつなぎ込む(つなぎ替える)時に使えます。
ストリームの初期化が行われる前に、先にSubscribeしておきたい時にMulticastを使うと便利です。

var subject = new Subject<long>();

//先にSubscribeしておける
subject.Subscribe(Console.WriteLine);

//Subscribeより後にストリーム定義
var timerStream1 = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
var timerStream2 = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2));

//timerStream1をつなぎ込む
var disposable = timerStream1.Multicast(subject).Connect();

//Multicastの停止(OnCompletedは発行されない)
disposable.Dispose();

//subjectへのSubscrieは継続したままtimerStream2に挿し替える
timerStream2.Multicast(subject).Connect();

Subscribeは継続したままストリームの根源を挿し替える必要があった時や、初期化のタイミングの問題でSubscribeだけを先に行う必要がある時に使うと便利です。