はじめに
Scheduler
は、ReactiveExtensionsを構成する三大要素のうちの1つであり、Rxの動作に欠かせないものです。
(ちなみにRxの三大要素は「Observable
」「Operator
」「Scheduler
」)
Rxを踏襲しているUniRxにも当然Scheduler
は登場しますし、到るところで使われていたりします。
今回はそのScheduler
の説明をします。
前提知識:Observable
のメッセージが処理されるスレッドはどこなのか
OnNext
メッセージを処理するスレッドはメッセージの発行元スレッドと同じになるという挙動をまず覚えておく必要があります。
同じObservable
でも異なるスレッド上で処理が実行される可能性があります。
using System.Threading;
using UniRx;
using UnityEngine;
public class ThreadSample : MonoBehaviour
{
void Start()
{
var subject = new Subject<string>();
Debug.Log("メインスレッドのID:" + Thread.CurrentThread.ManagedThreadId);
// OnNextを受け取ったらメッセージ内容と、その時の処理中のスレッドIDを表示する
subject.Subscribe(message => Debug.Log(message + ":" + Thread.CurrentThread.ManagedThreadId));
//---Unityメインスレッドから送信---
subject.OnNext("メインスレッドから送信");
//---スレッドを新しく作って送信---
new Thread(() =>
{
subject.OnNext("別スレッドから送信");
}).Start();
//---ThreadPoolから送信---
ThreadPool.QueueUserWorkItem(_ => subject.OnNext("ThreadPoolから送信"));
}
}
メインスレッドのID:1
メインスレッドから送信:1
別スレッドから送信:79
ThreadPoolから送信:68
上記のコードは1つのSubject
に対していろいろなスレッドからメッセージを発行した例です。
Subscribe
で登録した関数の実行スレッドがそれぞれバラバラであることがわかるかと思います。
このように、Observable
上のメッセージの実行スレッドは発行元に依存する(Subscribe
側のスレッドは一切関係ない)ということを覚えておいてください。
Schedulerとは何なのか
というわけで改めてScheduler
の解説を行います。
Scheduler
とは、Observable
における各種メッセージをどのスレッド上で、どのタイミングで実行するかを管理する存在です。
先程の解説の通り、メッセージの処理スレッドは発行元に依存するため、特定のスレッドで処理を実行したいとなった時にこのままでは不便です。
そこで用いられるのがScheduler
であり、面倒くさいスレッドの切り替え処理や、処理の実行タイミングの調整を引き受けてくれます。
このScheduler
ですが、ユーザが自前で新しく定義する必要はありません。
ほとんどの場合において、UniRxが標準で用意してくれているScheduler
を指定するだけで済みます。
Schedulerの使い所
1.時間が関係した処理を行う部分では必ず必要になる
Scheduler
はUniRx上で時間が関係する処理を行う場合は必ず指定する必要があります。
(「今までScheduler
なんて指定したことがないよ」という人は、単にScheduler
が未指定の場合にUniRxが最適なものを選んで使ってくれているため気づいていないだけです。)
なお、Scheduler
によって時間の計測方法が異なり、選択したScheduler
によっては挙動が大きく変化することもあります。
Observable.Timer
の例
void Start()
{
// Observable.TimerはSchedulerが未指定の場合は
// 自動的にUniRxがMainThreadSchedulerを選択してくれる
Observable.Timer(TimeSpan.FromSeconds(1))
.Subscribe();
// 明示的にMainThreadSchedulerを指定する
Observable.Timer(TimeSpan.FromSeconds(1), Scheduler.MainThread)
.Subscribe();
// ThreadPoolSchedulerを指定する
Observable.Timer(TimeSpan.FromSeconds(1), Scheduler.ThreadPool)
.Subscribe();
}
MainThreadScheduler
はコルーチンを用いて時間の計測を行いますが、ThreadPoolScheduler
はThread.Sleep
を用いて時間の計測を行います。
そのため両者のSchedulerでは精度が大きく異なります。
MainThreadScheduler
の方が精度が悪いのですが、メッセージをそのままメインスレッド上で継続して処理できるというメリットがあるため、 Observable.Timer
のデフォルトScheduler
はMainThreadScheduler
になっています。
Observable.Range
の例
ちなみに、ファクトリメソッド系もScheduler
を指定する必要があります。
void Start()
{
// Observable.Rangeは指定した数値から連番を発行するファクトリメソッド
// デフォルトではCurrentThreadSchedulerが利用される(同一フレーム内でまとめて発行される)
Observable.Range(0, 10)
.Subscribe();
// MainThreadSchedulerを指定した場合は、1フレームにつき1メッセージずつの発行になる
// この場合は0から10まで1フレームずつ発行する
Observable.Range(0, 10, Scheduler.MainThread)
.Subscribe(_ => Debug.Log(Time.frameCount));
}
2.実行スレッドを切り替える時に利用する
ObserveOn
やSubscribeOn
でScheduler
を指定することで、処理の実行スレッドを明示的に変更することができます。
例:処理の途中でThreadPoolに移動し、終わったらメインスレッドに戻す
ObserveOn
を使うことで、それ以降のメッセージの処理を指定したScheduler
で実行することができます。
// WebSocketから受信したメッセージをThreadPoolでDeserializeする
// ThreadPoolSchedulerを使っているためメッセージの順序が入れ替わる可能性がある
webSocketClient.OnReceiveMessageAsObservable
.ObserveOn(Scheduler.ThreadPool)
.Select(x => DeserializeJson(x))
.ObserveOnMainThread()
.Subscribe(x => Debug.Log(x));
(webSocketClient
は適当に定義したWebScoketのクライアントだと思ってください。)
例:Observable.Create
の実行スレッドを切り替える
SubscribeOn
を使うことで、Observable
の初期化処理を指定したScheduler
で実行することができます。
Observable.Create
はSubscribe
した時のスレッドに依存して処理が実行されるため、SubscribeOn(Scheduler.ThreadPool)
を挟むことで実行スレッドをそのままThreadPool
に切り替えることができます。
using System;
using System.IO;
using UniRx;
using UnityEngine;
public class SubscribeonSample: MonoBehaviour
{
void Start()
{
// Unityメインスレッド上でファイル読み込みをやる場合
ReadFileAsync("data.txt")
.Subscribe(x => Debug.Log(x));
// TrheadPoolでファイルを読み込んで、終わったらメインスレッドに戻す場合
ReadFileAsync("data.txt")
.SubscribeOn(Scheduler.ThreadPool)
.ObserveOnMainThread()
.Subscribe(x => Debug.Log(x));
}
// 指定されたファイルを読み込む
IObservable<string> ReadFileAsync(string path)
{
return Observable.Create<String>(observer =>
{
using (var r = new StreamReader(path))
{
observer.OnNext(r.ReadToEnd());
}
observer.OnCompleted();
return Disposable.Empty;
});
}
}
Schedulerの種類
UniRxにはいくつかのScheduler
が用意されています。
Scheduler | 説明 |
---|---|
ImmediateScheduler | 現在のスレッドにて直ちに処理を実行する。 |
CurrentThreadScheduler | 現在のスレッドにて処理を実行する。メッセージは一度キューに詰まれてから順次実行する。 |
ThreadPoolScheduler | スレッドプール上にて処理を実行する。 |
MainThreadScheduler | Unityメインスレッドにて処理を実行する(Time.timeScae の影響を受ける)。 |
MainThreadIgnoreTimeScaleScheduler | Unityメインスレッドにて処理を実行する(Time.timeScale の影響を受けない)。 |
MainThreadFixedUpdateScheduler | Unityメインスレッドにて処理を実行する(Time.fixedTimeを基準にする)。 |
MainThreadEndOfFrameScheduler | Unityメインスレッドにて処理を実行する(EndOfFrameを基準にする)。 |
ImmediateScheduler / CurrentThreadScheduler
ImmediateScheduler
およびCurrentThreadScheduler
は、現在のスレッド上でメッセージを処理します。
ImmediateScheduler
とCurrentThreadScheduler
の違いは即座に実行するか、一度キューに詰んでから実行するかです。
// Schedulerを指定しない場合はMainThreadSchedulerを用いて1秒計測する
// そのためタイマ計測終了後はメインスレッドに行く
Observable.Timer(TimeSpan.FromSeconds(1)).Subscribe();
new Thread(() =>
{
// 別スレッド上でTimerを実行するとメインスレッドで時間の計測を行い、結果もメインスレッドに飛んでしまう
Observable.Timer(TimeSpan.FromSeconds(1)).Subscribe();
}).Start();
new Thread(() =>
{
// CurrentThreadSchedulerを指定することで、このスレッド上でタイマ計測が行われ、結果もこのスレッド上で処理される
// (スレッドをブロックして時間の計測を行う)
Observable.Timer(TimeSpan.FromSeconds(1), Scheduler.CurrentThread).Subscribe();
}).Start();
}
ThreadPoolScheduler
ThreadPoolScheduler
は名前のとおり、ThreadPool
上で処理を実行するScheduler
です。
メインスレッドで処理中の内容を途中でThreadPool
に逃したい場合などに用いることができます。
なお、注意点として**ThreadPoolScheduler
ではメッセージの順序が保障されていません。**
ThreadPoolScheduler
を通した結果、メッセージの順序が入れ替わる可能性がある点に注意しましょう。
場合によってはOnNextメッセージ
より先にOnCompleted
メッセージが先着してしまい、値を取りこぼすといったこともありえます。
なお、こちらの問題についてはUniRxのissueに登録されてはいます。
ObserveOn needs scheduling control
処理を途中でThreadPoolに移動させる例
// WebSocketから受信したメッセージをThreadPoolでDeserializeする
// ThreadPoolSchedulerを使っているためメッセージの順序が入れ替わる可能性がある
webSocketClient.OnReceiveMessageAsObservable
.ObserveOn(Scheduler.ThreadPool)
.Select(x => DeserializeJson(x))
.ObserveOnMainThread()
.Subscribe(x => Debug.Log(x));
OnNext
とOnCompleted
がの順序が入れ替わってしまう例
次のコードは一見問題なさそうですが、実際に動かすとOnCompleted
しか出力されない場合があります。
理由としては、ThreadPool
にOnNext
とOnCompleted
メッセージが順番に入力されるが、OnNext
メッセージの方が処理内容が重いため処理が終わらず、先にOnCompleted
メッセージの方がThreadPool
から出てきてしまうからです。
ObservableWWW.Get("https://unity3d.com/jp/")
.ObserveOn(Scheduler.ThreadPool) //以下の処理をThreadPoolで実行する
.Select(html => Regex.Replace(html, "<[^>]*?>", "")) // HTMLタグを除去
.Select(html => Regex.Replace(html, @"[ \s]+", " ")) // 2つ以上のスペースを除去
.ObserveOnMainThread() //Unityメインスレッドに切り替え
.Subscribe(x => Debug.Log(x),()=>Debug.Log("OnCompleted"));
もしこういったことがやりたいのであれば、Observable.Start
で処理をThreadPool
に移した上で、SelectMany
またはContinueWith
を使って元のObservable
に結合するしかないかなと思います。(メッセージの頻度が多い場合はコストが大きそうですけど)
ObservableWWW.Get("https://unity3d.com/jp/")
.ContinueWith(html => Observable.Start(() =>
Regex.Replace(Regex.Replace(html, "<[^>]*?>", ""), @"[ \s]+", " ")))
.ObserveOnMainThread() //Unityメインスレッドに切り替え
.Subscribe(x => Debug.Log(x), () => Debug.Log("OnCompleted"));
(ContinueWith
はSelectMany
の軽量版で、発行されるOnNext
メッセージの数が1つ限定の場合のみ使えます。)
MainThreadScheduler
MainThreadScheduler
はUnityのメインスレッドを使って処理を行うScheduler
です。
別スレッドで実行した処理をメインスレッドに移したい場合に利用ができます。
時間順にAction
が登録できるキューを用意し、そのキューをUnity上のMainTrehadDispatcher
と呼ばれるGameObjectが内容をチェックし、処理できるAction
があれば取り出して実行する、という仕組みになっています。
MainThreadScheduler
にはいくつか種類がありますが、これはそれぞれMainTrehadDispatcher
がどのタイミングでキューの中身をチェックするかの違いとなります。
MainThreadScheduler
MainThreadScheduler
はMainTrehadDispatcher
上で実行されるコルーチンを用いて時間の計測と処理を実行します。
そのため、MainThreadScheduler
を用いて時間を計測した場合のタイマの精度はWaitForSeconds
と同じになります。
(どんなに細かい時間を指定しても、一番近いフレームの実行タイミングに丸められます。)
また、Time.timeScale
の影響を受ける点にも注意が必要です。
// 10msの精度を指定しても、60fpsで動作している場合は約16msの倍数でしか計測ができない
Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(10))
.TimeInterval(Scheduler.Immediate) //OnNext同士の時間を計測する
.Subscribe(x => Debug.Log(x.Interval.TotalMilliseconds))
.AddTo(this);
17.0453
16.5441
17.0453
17.0456
16.5436
...
MainThreadIgnoreTimeScaleScheduler
MainThreadIgnoreTimeScale
はMainThreadScheduler
と同じ挙動ですが、こちらはTime.timeScale
の影響を受けません。
MainThreadFixedUpdateScheduler
MainThreadFixedUpdate
はTime.fixedTime
を用いて時間を計測します。それ以外の挙動はMainThreadScheduler
と同じです。
処理の実行タイミングはFixedUpdate
となります。
MainThreadEndOfFrameScheduler
MainThreadEndOfFrame
はWaitForEndOfFrame
を用いたコルーチンで処理を実行します。
まとめ
UniRxを使う場合は、いまどのScheduler
を使っているかをちゃんと意識した方が事故なく運用できるかと思います。
時間を計測する系のオペレータの大半はデフォルトでMainThreadScheduler
を使うようになっているため、そこもちゃんと意識しておくとよいかと思います。
おまけ
「MainThreadSchedulerを使う」と「UnityメインスレッドでCurretnThreadSchedulerを使う」の違い
MainThreadScheduler
をそのまま使うのと、UnityメインスレッドでCurretnThreadScheduler
を使うのでは、同じように見えますが挙動が全く異なる点に注意が必要です。
特に大きく違う点は「時間の計測方法」です。MainTreadScheduler
ではコルーチンを使って時間を計測するのに対し、CurretnThreadScheduler
では現行スレッドをThread.Sleep
して時間の計測を行います。
そのため次のようなコードを書いてしまっただけで、ゲームがフリーズしてしまうため注意が必要です。
void Start()
{
// Unityメインスレッドを止めて時間を調整するためゲームが5秒間フリーズする
Observable.Timer(TimeSpan.FromSeconds(5), Scheduler.CurrentThread)
.Subscribe();
}
もしフリーズするのが嫌ならばThreadPoolScheduler
を使いましょう。
void Start()
{
// どうしてもMainTreadSchedulerを使いたくないならThreadPoolSchedulerを指定する
Observable.Timer(TimeSpan.FromSeconds(5), Scheduler.ThreadPool)
.Subscribe();
}