Unity
ReactiveExtensions
UniRx


背景

もう旬もすぎて当たり前になった感がありますが...

ReactiveExtensionを全く触ったことがない人に、Rxの概念やUniRxの使い方を教えるのに苦労していませんか?

僕はしています。

まずどこから、何から伝えるべきか?というのを迷います。

毎回悩むのはばからしいので、ステップを踏んで丁寧に記事にしてみようという試みです。


目的

この記事では、全くReactiveExtensionを知らないところから、簡単なRxのロジックを理解でき書けるレベルに引き上げることを目的としています。

理解できることを優先しているので、示しているコードは完璧ではありません.

実際のUniRxの処理を知りたい方は githubから本家のソースを呼んでみると良さそうです.


内容


1. Observerパターン

まずはEvent通知の仕組みについて理解しておくのが良いと考えます。

古典的なイベント監視のデザインパターンにObserverパターンがあります。

Observerパターンとは?

- プログラム内のオブジェクトの状態を監視するデザインパターン

- https://ja.wikipedia.org/wiki/Observer_%E3%83%91%E3%82%BF%E3%83%BC%E3%83%B3

簡単なのでcodeを書いてみます.

まず必要なのは2つのinterface、監視する人 (Observer) と 監視される対象 (Subject) です.

// 監視者

public interface IObserver
{
// 通知がきた
void OnNotified();
}

// 監視対象
public interface ISubject
{
// 監視者に通知する
void NotifyObservers();

// 監視者が購読する
void Subscribe(IObserver observer);

// 監視者が購読をやめる
void Unsubscribe(IObserver observer);
}

例えば、回転寿司を考えてみます。

回転寿司にはレーン (監視対象) があり、客 (監視者) はレーンに寿司が流れてくるイベントを待ちます。

// 監視対象 (寿司が流れてくるレーン)

public class SushiLaneSubject : ISubject
{
// 流れてきた寿司
public string Neta;

// 寿司を待つ人々
private List<IObserver> observers = new List<IObserver>();

// 寿司が来たことを通知
public void NotifyObservers()
{
foreach (var observer in this.observers)
{
observer.OnNotified();
}
}

// 寿司を監視する (イベントを購読する)
public void Subscribe(IObserver observer)
{
this.observers.Add(observer);
}

// 寿司の監視をやめる (イベントを非購読する)
public void Unsubscribe(IObserver observer)
{
this.observers.Remove(observer);
}
}

// 監視者 (寿司を食べる人)
public class SushiObserver : IObserver
{
public SushiLaneSubject Subject;

// 寿司が来た
public void OnNotified()
{
Debug.LogFormat("{0}おいしいです (^q^)", this.Subject.Neta);
}
}

実行してみます.

var subject = new SushiLaneSubject();

var observer1 = new SushiObserver() {Subject = subject};
var observer2 = new SushiObserver() {Subject = subject};

subject.Subscribe(observer1);
subject.Subscribe(observer2);

subject.Neta = "まぐろ";
subject.NotifyObservers();

// output:
// まぐろおいしいです (^q^)
// まぐろおいしいです (^q^)

まぐろが美味しそうです.

寿司が来たというイベントを2人の監視者が監視しています.


2. Push型 Observer パターン

基本的なイベントの通知パターンが実装できました。

ただこの実装には問題があります。

寿司が来たということはわかるのですが、具体的にどんなネタが来たのががわかりません。

通知された人が通知した人 (Subject) にデータを参照しにいかなければならないという問題です。

(OnNotifiedで this.Subject.Neta を参照している)

これをPull型のObserverパターンと呼ぶようです。

監視者が監視対象を知らないければならないので密結合になり、監視者側が監視対象を意識しなくてはならないので拡張性にかけます。

この問題を解決した Push型のObserverパターンというのが存在します。

監視者が監視対象を参照せず、データのみ受け取るようなInterfaceです。

// 監視者

public interface IObserver<TValue>
{
// 通知がきた
void OnNotified(TValue value);
}

// 監視対象
public interface ISubject<TValue>
{
// 監視者に通知する
void NotifyObservers(TValue value);

// 監視者が購読する
void Subscribe(IObserver<TValue> observer);

// 監視者が購読をやめる
void Unsubscribe(IObserver<TValue> observer);
}

こうしておくと実際のデータが通知完了時に受け取れ (OnNotified)、そのときに監視対象のobjectへの依存を切り、データのみを受け渡すことができます

// 監視対象 (寿司が流れてくるレーン)

public class SushiLaneSubject : ISubject<string>
{
// 寿司を待つ人々
private List<IObserver<string>> observers = new List<IObserver<string>>();

public void NotifyObservers(string neta)
{
foreach (var observer in this.observers)
{
observer.OnNotified(neta);
}
}

// 寿司を監視する (イベントを購読する)
public void Subscribe(IObserver<string> observer)
{
this.observers.Add(observer);
}

// 寿司の監視をやめる (イベントを非購読する)
public void Unsubscribe(IObserver<string> observer)
{
this.observers.Remove(observer);
}
}

// 監視者 (寿司を食べる人)
public class SushiObserver : IObserver<string>
{
public void OnNotified(string neta)
{
Debug.LogFormat("{0}おいしいです (^q^)", neta);
}
}

実行してみます.

var subject = new SushiLaneSubject();

var observer1 = new SushiObserver();
var observer2 = new SushiObserver();

subject.Subscribe(observer1);
subject.Subscribe(observer2);

subject.NotifyObservers("サーモン");

// output:
// サーモンおいしいです (^q^)
// サーモンおいしいです (^q^)

サーモンがおいしそうです.

Pull型とは異なり、Push型では IObserverと ISubjectの依存を分離することが可能になっていることがわかります.


3. OnNext, OnError, OnComplete

Push型の実装をさらに改良して一般化してみましょう.

3つのイベントがあると便利そうです。


  1. データが来た (Next)

  2. エラーが起きた (Error)

  3. データは全て来た (Complete)

実装に落としてみます.

// 監視者

public interface IObserver<TValue>
{
// データがきた
void OnNext(TValue value);

// エラーが起きた
void OnError(Exception error);

// データはもう来ない
void OnComplete();
}

// 監視対象
public interface ISubject<TValue>
{
// データを通知
void NotifyNext(TValue value);

// エラーを通知
void NotifyError(Exception error);

// データ終了を通知
void NotifyComplete();

// 監視者が購読する
void Subscribe(IObserver<TValue> observer);

// 監視者が購読をやめる
void Unsubscribe(IObserver<TValue> observer);
}

実装です

// ちょっと一般化

public class Subject<TNext> : ISubject<TNext>
{
private List<IObserver<TNext>> observers = new List<IObserver<TNext>>();

public void NotifyNext(TNext next)
{
foreach (var observer in this.observers)
{
observer.OnNext(next);
}
}

public void NotifyError(Exception error)
{
foreach (var observer in this.observers)
{
observer.OnError(error);
}
}

public void NotifyComplete()
{
foreach (var observer in this.observers)
{
observer.OnComplete();
}
}

public void Subscribe(IObserver<TNext> observer)
{
this.observers.Add(observer);
}

public void Unsubscribe(IObserver<TNext> observer)
{
this.observers.Remove(observer);
}
}

public class SushiLaneSubject : Subject<string>
{
}

public class SushiObserver : IObserver<string>
{
public void OnNext(string neta)
{
Debug.LogFormat("{0}おいしいです (^q^)", neta);
}

public void OnError(Exception exception)
{
Debug.LogFormat("{0} ( ̄Д ̄;)", exception.Message);
}

public void OnComplete()
{
Debug.Log("ごちそうさま (^O^)");
}
}

実行してみます

var subject = new SushiLaneSubject();

var observer1 = new SushiObserver();
var observer2 = new SushiObserver();

subject.Subscribe(observer1);
subject.NotifyNext("ほたて");
subject.NotifyComplete();
subject.Unsubscribe(observer1);

subject.Subscribe(observer2);
subject.NotifyError(new Exception("ほたてはもうありません"));
subject.Unsubscribe(observer2);

// output:
// ほたておいしいです (^q^)
// ごちそうさま (^O^)
// ほたてはもうありません ( ̄Д ̄;)

Nextでデータ(ほたて)が来たことを通知。

Completeでデータがもう来ないことを通知。

Errorで監視対象にエラーがおきたことを通知できています.

かなりRxの形に近づいてきました.


4. Disposable

ここまでの実装では、Unsubscribeによって購読の停止管理をしていました。

ただ監視をやめたいだけなのに、ISubjectのclassを参照しなくてはならないのは拡張性にかけます。

interface内の関数が多いのも制約が強いですよね.

C#には IDisposableというリソース開放のためのinterfaceが備わっているので、これを用いて購読停止をするのが良さそうです。

IDisposable

- https://msdn.microsoft.com/ja-jp/library/system.idisposable(v=vs.110).aspx

IDisposableを具象化した購読管理クラスをSubscriptionとして実装してみます.

// 監視対象

public interface ISubject<TValue>
{
// データを通知
void NotifyNext(TValue value);

// エラーを通知
void NotifyError(Exception error);

// データ終了を通知
void NotifyComplete();

// 監視者が購読する
IDisposable Subscribe(IObserver<TValue> observer);
}

public class Subject<TNext> : ISubject<TNext>
{
private List<IObserver<TNext>> observers = new List<IObserver<TNext>>();

public void NotifyNext(TNext next)
{
foreach (var observer in this.observers)
{
observer.OnNext(next);
}
}

public void NotifyError(Exception error)
{
foreach (var observer in this.observers)
{
observer.OnError(error);
}

this.observers.Clear();
}

public void NotifyComplete()
{
foreach (var observer in this.observers)
{
observer.OnComplete();
}

this.observers.Clear();
}

public IDisposable Subscribe(IObserver<TNext> observer)
{
this.observers.Add(observer);
// 購読管理のclassを返す
return new Subscription(this, observer);
}

private void Unsubscribe(IObserver<TNext> observer)
{
this.observers.Remove(observer);
}

// 購読管理をするclass. Dispose()を呼ぶことで購読をやめる
class Subscription : IDisposable
{
private IObserver<TNext> observer;
private Subject<TNext> subject;

public Subscription(Subject<TNext> subject, IObserver<TNext> observer)
{
this.subject = subject;
this.observer = observer;
}

public void Dispose()
{
this.subject.Unsubscribe(this.observer);
}
}
}

実装してみます.

var subject = new SushiLaneSubject();

var observer = new SushiObserver();

var disposable = subject.Subscribe(observer);
subject.NotifyNext("えんがわ");

// 購読停止
disposable.Dispose();

// これは流れない
subject.NotifyNext("いか");

// output:
// えんがわおいしいです (^q^)

interfaceを減らして汎用化しつつ、購読管理を別クラスに委譲できました.


5. Observable

イベント通知の一般化を更に勧めてみます.

ここで、ISubjectの NotifyNext(value),NotifyError(error),NotifyComplete() は、interfaceの関数の入出力がIObserverの OnNext(value),OnError(error),OnComplete() と同一なことに気がつくでしょうか?

NotifyNext(next) はすぐに OnNext(value) を呼び出します。

監視対象の ISubjectはイベントの実行タイミングで考えると、IObserverと同等です。

つまり Subjectは 監視者(IObserver) であり監視対象 (IObservable) であるとみなせます.

先程定義した ISubjectの IDisposable Subscribe(IObserver<TValue> observer) を、監視可能なもの(IObservable)のinterfaceとして外出してみます.

// 監視者

public interface IObserver<TValue>
{
// データがきた
void OnNext(TValue value);

// エラーが起きた
void OnError(Exception error);

// データはもう来ない
void OnComplete();
}

// 監視可能であることを示す
public interface IObservable<TValue>
{
// 監視者が購読する
IDisposable Subscribe(IObserver<TValue> observer);
}

// 監視対象
public interface ISubject<TValue> : IObserver<TValue>, IObservable<TValue>
{
// 3つのイベントは IObserverと同一のinterfaceなので委譲した
// データを通知: NofityNext(value) => OnNext(value)
// エラーを通知: NotifyError(error) => OnError(error)
// データ終了を通知: NotifyComplete() => OnComplete()

// 監視可能であることは IObservableに委譲した
// IDisposable Subscribe(IObserver<TValue> observer);
}

ISubjectは、IObserver(監視者)であり、IObservable(監視可能)であることがわかりました.

Rxの文脈では、Subjectのようにすぐに来たイベントを流してしまうような IObservableのことを Hotであると呼称します.


6. Cold Observable

実は先程までの回転寿司には、釣ったばかりのネタをそのまま捌いて提供していたのです.(HotObservable)

でも都合よく客が来店したときに、釣ったばかりの魚があるとは限らないですよね.

釣った魚を冷凍しておいて、客が来たときに解凍して提供してあげればより利便性が上がります.

// 講読時に監視するためのclass

public class Observable<TValue> : IObservable<TValue>
{
private Func<IObserver<TValue>, IDisposable> creator;

private Observable(Func<IObserver<TValue>, IDisposable> creator)
{
this.creator = creator;
}

public IDisposable Subscribe(IObserver<TValue> observer)
{
// Subscribeした瞬間に関数を実行するのが特徴
return this.creator(observer);
}

// Observableを直接渡したくないため、Createメソッドを作っておく.
public static IObservable<TValue> Create(Func<IObserver<TValue>, IDisposable> creator)
{
return new Observable<TValue>(creator);
}
}

// 購読解除するつもりがないときに返す Disposable
public class EmptyDisposable : IDisposable
{
public void Dispose()
{
}
}

このように監視をSubscribe時に開始するclassをつくることで、ネタの鮮度を維持しつつ、客が来たときに振る舞うことが可能になります.

// 冷凍寿司

var observable = Observable<string>.Create(_observer =>
{
// ネタが解凍できたら寿司を握って提供する
Debug.Log("ネタを解凍します");
_observer.OnNext("ぶり");
_observer.OnComplete();
return new EmptyDisposable();
});

var observer = new SushiObserver();
Debug.Log("冷凍されたぶりが届きました!");
observable.Subscribe(observer);

// output:
// 冷凍されたぶりが届きました!
// ネタを解凍します
// ぶりおいしいです (^q^)
// ごちそうさま (^O^)

実行結果を見てみると、監視するときに処理内容を関数として保持しておくことで、講読(Subscribe)が始まるまで処理が実行されていないことが確認できます.

Rxの文脈では、このようなObservableを Cold であるといいます.

Hot Observableに対して、Cold Observableは即時実行でない点が特徴です.

この性質は、Rxの非同期な利用を許す上で重要な概念になっています.


7. Operator: Where

回転寿司では、いろんなネタが流れてきます.

人それぞれ好みはありますよね。でもこのままでは来たネタすべて食べなきゃいけない! /(^o^)\

マグロ好きはマグロだけ食べていたいものです.

その問題を解決する概念がOperatorです.

まずはWhereを定義してみます.

// イベント関数をそのまま実行するだけの Observer

public class Observer<TNext> : IObserver<TNext>
{
private Action<TNext> next;
private Action<Exception> error;
private Action complete;

private Observer(Action<TNext> next, Action<Exception> error, Action complete)
{
this.next = next;
this.error = error;
this.complete = complete;
}

public void OnNext(TNext value)
{
this.next(value);
}

public void OnError(Exception error)
{
this.error(error);
}

public void OnComplete()
{
this.complete();
}

public static IObserver<TNext> Create(Action<TNext> next, Action<Exception> error, Action complete)
{
return new Observer<TNext>(next, error, complete);
}
}

// Nextの値によって通知するかしないかを変更する
public class WhereObservable<TNext> : IObservable<TNext>
{
private Func<TNext, bool> operation;
private IObservable<TNext> observable;

public WhereObservable(IObservable<TNext> observable, Func<TNext, bool> operation)
{
this.operation = operation;
this.observable = observable;
}

public IDisposable Subscribe(IObserver<TNext> observer)
{
var disposable = this.observable.Subscribe(Observer<TNext>.Create(
next =>
{
// nextの値によって、次に処理を流すかどうかを決定. operationはboolを返却する
if (this.operation(next)) observer.OnNext(next);
},
error => observer.OnError(error),
() => observer.OnComplete()
));

return disposable;
}
}

// IObserverをメソッドチェインするためのExtension
public static class IObservableExtension
{
// 条件をしぼるObservable
public static IObservable<TNext> Where<TNext>(this IObservable<TNext> observable, Func<TNext, bool> operation)
{
return new WhereObservable<TNext>(observable, operation);
}
}

ポイントは、IObservableの拡張関数として新しいObservableを返却するように定義している点です.

observable.Where(...).Where(...).Where(...).Subscribe() という形で関数を鎖でつなげていくように、IObservableから新しいIObservableへ変換していけています.

実際に利用してみます.

var subject = new SushiLaneSubject();

var observer = new SushiObserver();

// マグロだけ食べたい
subject
.Where(neta => neta == "まぐろ")
.Subscribe(observer);

subject.OnNext("すずき");
subject.OnNext("まぐろ");
subject.OnNext("たこ");
subject.OnNext("まぐろ");
subject.OnComplete();

// output:
// まぐろおいしいです (^q^)
// まぐろおいしいです (^q^)
// ごちそうさま (^O^)

無事にまぐろだけ食べることに成功しました。

Rxの文脈では、このようにIObservableからIObservableへの変換を行う機能をOperatorといいます.


8. Operator: Select

Operatorには様々な種類があります.他にも幾つか実装してみましょう.

Selectはある値を別の値に変換するOperatorです.

// Nextの値を別の値に変更する Operator

public class SelectObservable<TNext1, TNext2> : IObservable<TNext2>
{
private Func<TNext1, TNext2> operation;
private IObservable<TNext1> observable;

public SelectObservable(IObservable<TNext1> observable, Func<TNext1, TNext2> operation)
{
this.operation = operation;
this.observable = observable;
}

public IDisposable Subscribe(IObserver<TNext2> observer)
{
var disposable = this.observable.Subscribe(Observer<TNext1>.Create(
next =>
{
// nextの値をoperatorによって別の値に変更する
var next2 = this.operation(next);
observer.OnNext(next2);
},
error => observer.OnError(error),
() => observer.OnComplete()
));

return disposable;
}
}

// IObserverをメソッドチェインするためのExtension
public static class IObservableExtension
{
// 条件をしぼる Operator
public static IObservable<TNext> Where<TNext>(this IObservable<TNext> observable, Func<TNext, bool> operation)
{
return new WhereObservable<TNext>(observable, operation);
}

// 別の値に変換する Operator
public static IObservable<TNext2> Select<TNext1, TNext2>(
this IObservable<TNext1> observable, Func<TNext1, TNext2> operation)
{
return new SelectObservable<TNext1, TNext2>(observable, operation);
}
}

今日のマグロはいいものが入ったので、店長オススメにしてみましょう。

var subject = new SushiLaneSubject();

var observer = new SushiObserver();

// 店長おすすめのネタに変換する
subject
.Where(neta => neta == "まぐろ")
.Select(neta => string.Format("店長オススメの{0}", neta))
.Subscribe(observer);

subject.OnNext("すずき");
subject.OnNext("まぐろ");
subject.OnNext("たこ");
subject.OnNext("まぐろ");
subject.OnComplete();

// output:
// 店長オススメのまぐろおいしいです (^q^)
// 店長オススメのまぐろおいしいです (^q^)
// ごちそうさま (^O^)


9. Operator: SelectMany

Operatorのイメージが掴めてきたでしょうか?

もう一つ、面白いOperatorがあるので紹介しておきます.

SelectMany (他のRxでは、FlatMapに相当)は、1個の値を受け取ってN個の値を流すOperatorです.

// 複数の講読をいっぺんに解除するための Disposable

public class CollectionDisposable : IDisposable
{
private IList<IDisposable> disposables;

public CollectionDisposable(IList<IDisposable> disposables)
{
this.disposables = this.disposables;
}

public void Dispose()
{
foreach (var disposable in this.disposables)
{
disposable.Dispose();
}
}
}

// 1つの値を受け取って、N個(0以上)の値を流す Observable
public class SelectManyObservable<TNext1, TNext2> : IObservable<TNext2>
{
private Func<TNext1, IObservable<TNext2>> operation;
private IObservable<TNext1> observable;

public SelectManyObservable(IObservable<TNext1> observable, Func<TNext1, IObservable<TNext2>> operation)
{
this.observable = observable;
this.operation = operation;
}

public IDisposable Subscribe(IObserver<TNext2> observer)
{
var disposables = new List<IDisposable>();
var disposable1 = this.observable.Subscribe(Observer<TNext1>.Create(
next1 =>
{
// nextの値を流すと、observableが帰ってくる. それを購読して次へ伝える
var disposable2 = this.operation(next1).Subscribe(Observer<TNext2>.Create(
next2 => observer.OnNext(next2),
error => observer.OnError(error),
() => observer.OnComplete()
));
disposables.Add(disposable2);
},
error => observer.OnError(error),
() => observer.OnComplete()
));
disposables.Add(disposable1);
return new CollectionDisposable(disposables);
}
}

// IObserverをメソッドチェインするためのExtension
public static class IObservableExtension
{
// ...

// 値を1つ受け取って、IObservableに変換する Operator
public static IObservable<TNext2> SelectMany<TNext1, TNext2>(
this IObservable<TNext1> observable, Func<TNext1, IObservable<TNext2>> operation)
{
return new SelectManyObservable<TNext1, TNext2>(observable, operation);
}

// 値を1つ受け取って、複数の値に分割して流す Operator
public static IObservable<TNext2> SelectMany<TNext1, TNext2>(
this IObservable<TNext1> observable, Func<TNext1, IEnumerable<TNext2>> operation)
{
return new SelectManyObservable<TNext1, TNext2>(observable, next1 =>
{
return Observable<TNext2>.Create(observer =>
{
var next2enumerable = operation(next1);

foreach (var next2 in next2enumerable)
{
observer.OnNext(next2);
}

return new EmptyDisposable();
});
});
}
}

SelectManyのすごい点は、1つの値を受取り、新しいIObservableを返すという点です。

これにより、値を受け取って条件によって流さないということも可能ですし、値によってエラーを流すということもできます.

SelectManyさえあれば、大抵のOperatorは実装できてしまうくらい強いoperatorです.

また、別の意味でのSelectManyというOperatorも存在します.

1つ値を受け取って、リストや配列などN個の値を含んだenumerableを返却すると、それらの値をばらして流してくれるというショートカットです.

Selectは1:1、SelectManyは1:Nという意味をそのまま捉えると、こちらのほうが意味的には直感的でしょうか.

var subject = new SushiLaneSubject();

var observer = new SushiObserver();

// 例: 1 => N変換
// まぐろをさばいて、大トロ, 中トロに変換する
subject
.Where(neta => neta == "まぐろ")
.Select(neta => "トロ")
.SelectMany(neta => new[] {"大" + neta, "中" + neta})
.Subscribe(observer);

// 例: 1 => Observable変換
// ネタがきてもすべて無視する
subject
.SelectMany(neta => Observable<string>.Create(_observer =>
{
// _observer.OnNext(neta); とかしない
return new EmptyDisposable();
}))
.Subscribe(observer);

subject.OnNext("まぐろ");
subject.OnComplete();

// output:
// 大トロおいしいです (^q^)
// 中トロおいしいです (^q^)
// ごちそうさま (^O^)

UniRxではどちらのoperatorもSelectManyとして利用することが可能でとても便利です.

実際のRxのOperatorはまだまだ他にもたくさんあります.

Throttle,ThrottleFirst,Zip,Merge,CombineLatest,Repeat,Retry,...

Operator一覧に関しては記事を書いている人も多いので、探してみるといいでしょう。


10. Unityでの利用

Rxの基本的な仕組みはだいたい説明できたように思います。

ここまでの流れで、高度に汎用的なイベント通知の仕組みが出来上がったことがわかるでしょうか?

では、それらがどのくらい汎用的なのか、これを実際のUnityの世界で利用してみましょう.

例えば、ボタンのクリックイベントを監視可能にしてみましょう.

// 何もないことを示すイベントのデータclass

public class Unit
{
public static readonly Unit Default = new Unit();

private Unit()
{
}
}

// ボタンが押されたら、イベントを通知する Observable
public class ButtonObservable : IObservable<Unit>
{
private Button button;

public ButtonObservable(Button button)
{
this.button = button;
}

public IDisposable Subscribe(IObserver<Unit> observer)
{
this.button.onClick.AddListener(() =>
{
observer.OnNext(Unit.Default);
});

// 本当は講読管理をしっかり考えなければならない
return new EmptyDisposable();
}
}

public static class ButtonExtension
{
// ボタンのOnClickを受けて、イベントを流す拡張関数
public static IObservable<Unit> OnClickAsObservable(this Button button)
{
return new ButtonObservable(button);
}
}

すごく雑ですが、Buttonはこんな感じでイベントを定義できます.

ボタンが押されたらOnNextを流すだけのObservableです.

UnitはUniRxでよく利用されるデータで、イベントのタイミングだけ知らせたいや通知するデータが何もないときに利用するclassです.

ついでにSubscribeにIObserverのclassをいちいち実装するのが少し面倒だったので、便利になるようにevent関数のみを引数に取れるようにしました.

// IObserverをメソッドチェインするためのExtension

public static class IObservableExtension
{
// ...

// 面倒なので IObserverクラスをいちいち作らなくて良いようにする.
public static IDisposable Subscribe<TNext>(
this IObservable<TNext> observable,
Action<TNext> next,
Action<Exception> error,
Action complete)
{
return observable.Subscribe(Observer<TNext>.Create(next, error, complete));
}

// next, errorのみでもOK
public static IDisposable Subscribe<TNext>(
this IObservable<TNext> observable,
Action<TNext> next,
Action<Exception> error)
{
return observable.Subscribe(next, error, () => { });
}

// nextのみでもOK
public static IDisposable Subscribe<TNext>(
this IObservable<TNext> observable,
Action<TNext> next)
{
return observable.Subscribe(next, error => { }, () => { });
}
}

あとは利用するだけです.

using UnityEngine;

using UnityEngine.UI;

[RequireComponent(typeof(Button))]
public class ButtonExample : MonoBehaviour
{
void Start()
{
var button = this.GetComponent<Button>();

// ボタンがおされたら、ログを流す.
button.OnClickAsObservable()
.Subscribe(_ => Debug.Log("Clicked!"));
}
}

ね、便利そうでしょう?

例えば、ThrottleFirstという一定時間はNextを流さないoperatorを定義しておけば、ボタン連打防止なんかも簡単にできそうです.

button.OnClickAsObservable().ThrottleFirst(TimeSpan.FromSeconds(1)).Subscribe(...);

他にも、MonoBehaviourのUpdateが起きたらNextを流すなんてこともできそうです.

public class EveryUpdateObservable : IObservable<Unit>

{
private Component component;

public EveryUpdateObservable(Component component)
{
this.component = component;
}

public IDisposable Subscribe(IObserver<Unit> observer)
{
var updator = this.component.GetComponent<EveryUpdateComponent>();

if (updator == null)
{
updator = this.component.gameObject.AddComponent<EveryUpdateComponent>();
}

updator.Observer = observer;
return updator;
}
}

public class EveryUpdateComponent : MonoBehaviour, IDisposable
{
public IObserver<Unit> Observer { get; set; }

private void Update()
{
// 毎frameでイベント送信する
if (this.Observer != null)
{
this.Observer.OnNext(Unit.Default);
}
}

private void OnDestroy()
{
this.Dispose();
}

public void Dispose()
{
if (this.Observer != null)
{
this.Observer.OnComplete();
}
}
}

public static class ComponentExtension
{
// フレームごとにイベント送信する
public static IObservable<Unit> OnEveryUpdate(this Component component)
{
return new EveryUpdateObservable(component);
}
}

こんな感じでUpdateの監視がRxになりました。

public class EveryUpdateExample : MonoBehaviour

{
void Start()
{
// 10フレームごとにログを吐く
this.OnEveryUpdate()
.Select(_ => Time.frameCount)
.Where(frame => frame % 10 == 0)
.Subscribe(frame => Debug.LogFormat("{0} frame passed", frame));
}
}

実際のUniRxでは、他にもUnityのライフサイクルに合わせて購読停止してくれる IDisposable.AddTo(component) 関数や、衝突したときのイベントを取ってくれる ObservableなどUnityならではのRx拡張が多々用意されています.


終わりに

長文を最後まで読んでいただきありがとうございました.

Rxの考え方や仕組みがなんとなくは理解できたでしょうか?

あとは実践あるのみです。

ここではまだまだ紹介しきれなかったoperatorなど多々あるので、まずは簡単なイベントからRxを使って書いてみるといいかもしれません.

全体の実装はgithubに上げたので、理解の一助になれば幸いです

追記:

- 次のステップとして、Quiz形式でUniRxをより知れる UniRx Quiz というのをつくりました。 興味あれば解いてみてください!