はじめに
Schedulerは、ReactiveExtensionsを構成する三大要素のうちの1つであり、Rxの動作に欠かせないものです。
(ちなみにRxの三大要素は「Observable」「Operator」「Scheduler」)
Rxを踏襲しているUniRxにも当然Schedulerは登場しますし、到るところで使われていたりします。
今回はそのSchedulerの説明をします。
前提知識:Observableのメッセージが処理されるスレッドはどこなのか
OnNextメッセージを処理するスレッドはメッセージの発行元スレッドと同じになるという挙動をまず覚えておく必要があります。
同じObservableでも異なるスレッド上で処理が実行される可能性があります。
using System.Threading;
using UniRx;
using UnityEngine;
public class ThreadSample : MonoBehaviour
{
void Start()
{
var subject = new Subject<string>();
Debug.Log("メインスレッドのID:" + Thread.CurrentThread.ManagedThreadId);
// OnNextを受け取ったらメッセージ内容と、その時の処理中のスレッドIDを表示する
subject.Subscribe(message => Debug.Log(message + ":" + Thread.CurrentThread.ManagedThreadId));
//---Unityメインスレッドから送信---
subject.OnNext("メインスレッドから送信");
//---スレッドを新しく作って送信---
new Thread(() =>
{
subject.OnNext("別スレッドから送信");
}).Start();
//---ThreadPoolから送信---
ThreadPool.QueueUserWorkItem(_ => subject.OnNext("ThreadPoolから送信"));
}
}
メインスレッドのID:1
メインスレッドから送信:1
別スレッドから送信:79
ThreadPoolから送信:68
上記のコードは1つのSubjectに対していろいろなスレッドからメッセージを発行した例です。
Subscribeで登録した関数の実行スレッドがそれぞれバラバラであることがわかるかと思います。
このように、Observable上のメッセージの実行スレッドは発行元に依存する(Subscribe側のスレッドは一切関係ない)ということを覚えておいてください。
Schedulerとは何なのか
というわけで改めてSchedulerの解説を行います。
Schedulerとは、Observableにおける各種メッセージをどのスレッド上で、どのタイミングで実行するかを管理する存在です。
先程の解説の通り、メッセージの処理スレッドは発行元に依存するため、特定のスレッドで処理を実行したいとなった時にこのままでは不便です。
そこで用いられるのがSchedulerであり、面倒くさいスレッドの切り替え処理や、処理の実行タイミングの調整を引き受けてくれます。
このSchedulerですが、ユーザが自前で新しく定義する必要はありません。
ほとんどの場合において、UniRxが標準で用意してくれているSchedulerを指定するだけで済みます。
Schedulerの使い所
1.時間が関係した処理を行う部分では必ず必要になる
SchedulerはUniRx上で時間が関係する処理を行う場合は必ず指定する必要があります。
(「今までSchedulerなんて指定したことがないよ」という人は、単にSchedulerが未指定の場合にUniRxが最適なものを選んで使ってくれているため気づいていないだけです。)
なお、Schedulerによって時間の計測方法が異なり、選択したSchedulerによっては挙動が大きく変化することもあります。
Observable.Timerの例
void Start()
{
// Observable.TimerはSchedulerが未指定の場合は
// 自動的にUniRxがMainThreadSchedulerを選択してくれる
Observable.Timer(TimeSpan.FromSeconds(1))
.Subscribe();
// 明示的にMainThreadSchedulerを指定する
Observable.Timer(TimeSpan.FromSeconds(1), Scheduler.MainThread)
.Subscribe();
// ThreadPoolSchedulerを指定する
Observable.Timer(TimeSpan.FromSeconds(1), Scheduler.ThreadPool)
.Subscribe();
}
MainThreadSchedulerはコルーチンを用いて時間の計測を行いますが、ThreadPoolSchedulerはThread.Sleepを用いて時間の計測を行います。
そのため両者のSchedulerでは精度が大きく異なります。
MainThreadSchedulerの方が精度が悪いのですが、メッセージをそのままメインスレッド上で継続して処理できるというメリットがあるため、 Observable.TimerのデフォルトSchedulerはMainThreadSchedulerになっています。
Observable.Rangeの例
ちなみに、ファクトリメソッド系もSchedulerを指定する必要があります。
void Start()
{
// Observable.Rangeは指定した数値から連番を発行するファクトリメソッド
// デフォルトではCurrentThreadSchedulerが利用される(同一フレーム内でまとめて発行される)
Observable.Range(0, 10)
.Subscribe();
// MainThreadSchedulerを指定した場合は、1フレームにつき1メッセージずつの発行になる
// この場合は0から10まで1フレームずつ発行する
Observable.Range(0, 10, Scheduler.MainThread)
.Subscribe(_ => Debug.Log(Time.frameCount));
}
2.実行スレッドを切り替える時に利用する
ObserveOnやSubscribeOnでSchedulerを指定することで、処理の実行スレッドを明示的に変更することができます。
例:処理の途中でThreadPoolに移動し、終わったらメインスレッドに戻す
ObserveOnを使うことで、それ以降のメッセージの処理を指定したSchedulerで実行することができます。
// WebSocketから受信したメッセージをThreadPoolでDeserializeする
// ThreadPoolSchedulerを使っているためメッセージの順序が入れ替わる可能性がある
webSocketClient.OnReceiveMessageAsObservable
.ObserveOn(Scheduler.ThreadPool)
.Select(x => DeserializeJson(x))
.ObserveOnMainThread()
.Subscribe(x => Debug.Log(x));
(webSocketClientは適当に定義したWebScoketのクライアントだと思ってください。)
例:Observable.Createの実行スレッドを切り替える
SubscribeOnを使うことで、Observableの初期化処理を指定したSchedulerで実行することができます。
Observable.CreateはSubscribeした時のスレッドに依存して処理が実行されるため、SubscribeOn(Scheduler.ThreadPool) を挟むことで実行スレッドをそのままThreadPoolに切り替えることができます。
using System;
using System.IO;
using UniRx;
using UnityEngine;
public class SubscribeonSample: MonoBehaviour
{
void Start()
{
// Unityメインスレッド上でファイル読み込みをやる場合
ReadFileAsync("data.txt")
.Subscribe(x => Debug.Log(x));
// TrheadPoolでファイルを読み込んで、終わったらメインスレッドに戻す場合
ReadFileAsync("data.txt")
.SubscribeOn(Scheduler.ThreadPool)
.ObserveOnMainThread()
.Subscribe(x => Debug.Log(x));
}
// 指定されたファイルを読み込む
IObservable<string> ReadFileAsync(string path)
{
return Observable.Create<String>(observer =>
{
using (var r = new StreamReader(path))
{
observer.OnNext(r.ReadToEnd());
}
observer.OnCompleted();
return Disposable.Empty;
});
}
}
Schedulerの種類
UniRxにはいくつかのSchedulerが用意されています。
| Scheduler | 説明 |
|---|---|
| ImmediateScheduler | 現在のスレッドにて直ちに処理を実行する。 |
| CurrentThreadScheduler | 現在のスレッドにて処理を実行する。メッセージは一度キューに詰まれてから順次実行する。 |
| ThreadPoolScheduler | スレッドプール上にて処理を実行する。 |
| MainThreadScheduler | Unityメインスレッドにて処理を実行する(Time.timeScaeの影響を受ける)。 |
| MainThreadIgnoreTimeScaleScheduler | Unityメインスレッドにて処理を実行する(Time.timeScaleの影響を受けない)。 |
| MainThreadFixedUpdateScheduler | Unityメインスレッドにて処理を実行する(Time.fixedTimeを基準にする)。 |
| MainThreadEndOfFrameScheduler | Unityメインスレッドにて処理を実行する(EndOfFrameを基準にする)。 |
ImmediateScheduler / CurrentThreadScheduler
ImmediateSchedulerおよびCurrentThreadSchedulerは、現在のスレッド上でメッセージを処理します。
ImmediateSchedulerとCurrentThreadSchedulerの違いは即座に実行するか、一度キューに詰んでから実行するかです。
// Schedulerを指定しない場合はMainThreadSchedulerを用いて1秒計測する
// そのためタイマ計測終了後はメインスレッドに行く
Observable.Timer(TimeSpan.FromSeconds(1)).Subscribe();
new Thread(() =>
{
// 別スレッド上でTimerを実行するとメインスレッドで時間の計測を行い、結果もメインスレッドに飛んでしまう
Observable.Timer(TimeSpan.FromSeconds(1)).Subscribe();
}).Start();
new Thread(() =>
{
// CurrentThreadSchedulerを指定することで、このスレッド上でタイマ計測が行われ、結果もこのスレッド上で処理される
// (スレッドをブロックして時間の計測を行う)
Observable.Timer(TimeSpan.FromSeconds(1), Scheduler.CurrentThread).Subscribe();
}).Start();
}
ThreadPoolScheduler
ThreadPoolSchedulerは名前のとおり、ThreadPool上で処理を実行するSchedulerです。
メインスレッドで処理中の内容を途中でThreadPoolに逃したい場合などに用いることができます。
なお、注意点として**ThreadPoolSchedulerではメッセージの順序が保障されていません。**
ThreadPoolSchedulerを通した結果、メッセージの順序が入れ替わる可能性がある点に注意しましょう。
場合によってはOnNextメッセージより先にOnCompletedメッセージが先着してしまい、値を取りこぼすといったこともありえます。
なお、こちらの問題についてはUniRxのissueに登録されてはいます。
ObserveOn needs scheduling control
処理を途中でThreadPoolに移動させる例
// WebSocketから受信したメッセージをThreadPoolでDeserializeする
// ThreadPoolSchedulerを使っているためメッセージの順序が入れ替わる可能性がある
webSocketClient.OnReceiveMessageAsObservable
.ObserveOn(Scheduler.ThreadPool)
.Select(x => DeserializeJson(x))
.ObserveOnMainThread()
.Subscribe(x => Debug.Log(x));
OnNextとOnCompletedがの順序が入れ替わってしまう例
次のコードは一見問題なさそうですが、実際に動かすとOnCompletedしか出力されない場合があります。
理由としては、ThreadPoolにOnNextとOnCompletedメッセージが順番に入力されるが、OnNextメッセージの方が処理内容が重いため処理が終わらず、先にOnCompletedメッセージの方がThreadPoolから出てきてしまうからです。
ObservableWWW.Get("https://unity3d.com/jp/")
.ObserveOn(Scheduler.ThreadPool) //以下の処理をThreadPoolで実行する
.Select(html => Regex.Replace(html, "<[^>]*?>", "")) // HTMLタグを除去
.Select(html => Regex.Replace(html, @"[ \s]+", " ")) // 2つ以上のスペースを除去
.ObserveOnMainThread() //Unityメインスレッドに切り替え
.Subscribe(x => Debug.Log(x),()=>Debug.Log("OnCompleted"));
もしこういったことがやりたいのであれば、Observable.Startで処理をThreadPoolに移した上で、SelectManyまたはContinueWithを使って元のObservableに結合するしかないかなと思います。(メッセージの頻度が多い場合はコストが大きそうですけど)
ObservableWWW.Get("https://unity3d.com/jp/")
.ContinueWith(html => Observable.Start(() =>
Regex.Replace(Regex.Replace(html, "<[^>]*?>", ""), @"[ \s]+", " ")))
.ObserveOnMainThread() //Unityメインスレッドに切り替え
.Subscribe(x => Debug.Log(x), () => Debug.Log("OnCompleted"));
(ContinueWithはSelectManyの軽量版で、発行されるOnNextメッセージの数が1つ限定の場合のみ使えます。)
MainThreadScheduler
MainThreadSchedulerはUnityのメインスレッドを使って処理を行うSchedulerです。
別スレッドで実行した処理をメインスレッドに移したい場合に利用ができます。
時間順にActionが登録できるキューを用意し、そのキューをUnity上のMainTrehadDispatcherと呼ばれるGameObjectが内容をチェックし、処理できるActionがあれば取り出して実行する、という仕組みになっています。
MainThreadSchedulerにはいくつか種類がありますが、これはそれぞれMainTrehadDispatcherがどのタイミングでキューの中身をチェックするかの違いとなります。
MainThreadScheduler
MainThreadSchedulerはMainTrehadDispatcher上で実行されるコルーチンを用いて時間の計測と処理を実行します。
そのため、MainThreadSchedulerを用いて時間を計測した場合のタイマの精度はWaitForSecondsと同じになります。
(どんなに細かい時間を指定しても、一番近いフレームの実行タイミングに丸められます。)
また、Time.timeScaleの影響を受ける点にも注意が必要です。
// 10msの精度を指定しても、60fpsで動作している場合は約16msの倍数でしか計測ができない
Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(10))
.TimeInterval(Scheduler.Immediate) //OnNext同士の時間を計測する
.Subscribe(x => Debug.Log(x.Interval.TotalMilliseconds))
.AddTo(this);
17.0453
16.5441
17.0453
17.0456
16.5436
...
MainThreadIgnoreTimeScaleScheduler
MainThreadIgnoreTimeScaleはMainThreadSchedulerと同じ挙動ですが、こちらはTime.timeScaleの影響を受けません。
MainThreadFixedUpdateScheduler
MainThreadFixedUpdateはTime.fixedTimeを用いて時間を計測します。それ以外の挙動はMainThreadSchedulerと同じです。
処理の実行タイミングはFixedUpdateとなります。
MainThreadEndOfFrameScheduler
MainThreadEndOfFrameはWaitForEndOfFrameを用いたコルーチンで処理を実行します。
まとめ
UniRxを使う場合は、いまどのSchedulerを使っているかをちゃんと意識した方が事故なく運用できるかと思います。
時間を計測する系のオペレータの大半はデフォルトでMainThreadSchedulerを使うようになっているため、そこもちゃんと意識しておくとよいかと思います。
おまけ
「MainThreadSchedulerを使う」と「UnityメインスレッドでCurretnThreadSchedulerを使う」の違い
MainThreadSchedulerをそのまま使うのと、UnityメインスレッドでCurretnThreadSchedulerを使うのでは、同じように見えますが挙動が全く異なる点に注意が必要です。
特に大きく違う点は「時間の計測方法」です。MainTreadSchedulerではコルーチンを使って時間を計測するのに対し、CurretnThreadSchedulerでは現行スレッドをThread.Sleepして時間の計測を行います。
そのため次のようなコードを書いてしまっただけで、ゲームがフリーズしてしまうため注意が必要です。
void Start()
{
// Unityメインスレッドを止めて時間を調整するためゲームが5秒間フリーズする
Observable.Timer(TimeSpan.FromSeconds(5), Scheduler.CurrentThread)
.Subscribe();
}
もしフリーズするのが嫌ならばThreadPoolSchedulerを使いましょう。
void Start()
{
// どうしてもMainTreadSchedulerを使いたくないならThreadPoolSchedulerを指定する
Observable.Timer(TimeSpan.FromSeconds(5), Scheduler.ThreadPool)
.Subscribe();
}