41
30

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

【UniRx】Schedulerとは何なのか

Last updated at Posted at 2018-03-25

はじめに

Schedulerは、ReactiveExtensionsを構成する三大要素のうちの1つであり、Rxの動作に欠かせないものです。
(ちなみにRxの三大要素は「Observable」「Operator」「Scheduler」)

Rxを踏襲しているUniRxにも当然Schedulerは登場しますし、到るところで使われていたりします。
今回はそのSchedulerの説明をします。

前提知識:Observableのメッセージが処理されるスレッドはどこなのか

OnNextメッセージを処理するスレッドはメッセージの発行元スレッドと同じになるという挙動をまず覚えておく必要があります。
同じObservableでも異なるスレッド上で処理が実行される可能性があります。

using System.Threading;
using UniRx;
using UnityEngine;

public class ThreadSample : MonoBehaviour
{
    void Start()
    {
        var subject = new Subject<string>();

        Debug.Log("メインスレッドのID:" + Thread.CurrentThread.ManagedThreadId);

        // OnNextを受け取ったらメッセージ内容と、その時の処理中のスレッドIDを表示する
        subject.Subscribe(message => Debug.Log(message + ":" + Thread.CurrentThread.ManagedThreadId));



        //---Unityメインスレッドから送信---
        subject.OnNext("メインスレッドから送信");

        //---スレッドを新しく作って送信---
        new Thread(() =>
        {
            subject.OnNext("別スレッドから送信");
        }).Start();

        //---ThreadPoolから送信---
        ThreadPool.QueueUserWorkItem(_ => subject.OnNext("ThreadPoolから送信"));
    }
}

実行結果
メインスレッドのID:1
メインスレッドから送信:1
別スレッドから送信:79
ThreadPoolから送信:68

上記のコードは1つのSubjectに対していろいろなスレッドからメッセージを発行した例です。
Subscribeで登録した関数の実行スレッドがそれぞれバラバラであることがわかるかと思います。

このように、Observable上のメッセージの実行スレッドは発行元に依存する(Subscribe側のスレッドは一切関係ない)ということを覚えておいてください。

Schedulerとは何なのか

というわけで改めてSchedulerの解説を行います。

Schedulerとは、Observableにおける各種メッセージをどのスレッド上で、どのタイミングで実行するかを管理する存在です。
先程の解説の通り、メッセージの処理スレッドは発行元に依存するため、特定のスレッドで処理を実行したいとなった時にこのままでは不便です。
そこで用いられるのがSchedulerであり、面倒くさいスレッドの切り替え処理や、処理の実行タイミングの調整を引き受けてくれます。

このSchedulerですが、ユーザが自前で新しく定義する必要はありません。
ほとんどの場合において、UniRxが標準で用意してくれているSchedulerを指定するだけで済みます。

Schedulerの使い所

1.時間が関係した処理を行う部分では必ず必要になる

SchedulerUniRx上で時間が関係する処理を行う場合は必ず指定する必要があります。
(「今までSchedulerなんて指定したことがないよ」という人は、単にSchedulerが未指定の場合にUniRxが最適なものを選んで使ってくれているため気づいていないだけです。)

なお、Schedulerによって時間の計測方法が異なり、選択したSchedulerによっては挙動が大きく変化することもあります。

Observable.Timerの例

Observable.Timer
void Start()
{
    // Observable.TimerはSchedulerが未指定の場合は
    // 自動的にUniRxがMainThreadSchedulerを選択してくれる
    Observable.Timer(TimeSpan.FromSeconds(1))
        .Subscribe();

    // 明示的にMainThreadSchedulerを指定する
    Observable.Timer(TimeSpan.FromSeconds(1), Scheduler.MainThread)
        .Subscribe();

    // ThreadPoolSchedulerを指定する
    Observable.Timer(TimeSpan.FromSeconds(1), Scheduler.ThreadPool)
        .Subscribe();
}

MainThreadSchedulerはコルーチンを用いて時間の計測を行いますが、ThreadPoolSchedulerThread.Sleepを用いて時間の計測を行います。
そのため両者のSchedulerでは精度が大きく異なります。
MainThreadSchedulerの方が精度が悪いのですが、メッセージをそのままメインスレッド上で継続して処理できるというメリットがあるため、 Observable.TimerのデフォルトSchedulerMainThreadSchedulerになっています。

Observable.Rangeの例

ちなみに、ファクトリメソッド系もSchedulerを指定する必要があります。

Observable.Range
void Start()
{
    // Observable.Rangeは指定した数値から連番を発行するファクトリメソッド
    // デフォルトではCurrentThreadSchedulerが利用される(同一フレーム内でまとめて発行される)
    Observable.Range(0, 10)
        .Subscribe();

    // MainThreadSchedulerを指定した場合は、1フレームにつき1メッセージずつの発行になる
    // この場合は0から10まで1フレームずつ発行する
    Observable.Range(0, 10, Scheduler.MainThread)
            .Subscribe(_ => Debug.Log(Time.frameCount));
}

2.実行スレッドを切り替える時に利用する

ObserveOnSubscribeOnSchedulerを指定することで、処理の実行スレッドを明示的に変更することができます。

例:処理の途中でThreadPoolに移動し、終わったらメインスレッドに戻す

ObserveOnを使うことで、それ以降のメッセージの処理を指定したSchedulerで実行することができます。

// WebSocketから受信したメッセージをThreadPoolでDeserializeする
// ThreadPoolSchedulerを使っているためメッセージの順序が入れ替わる可能性がある
webSocketClient.OnReceiveMessageAsObservable
    .ObserveOn(Scheduler.ThreadPool)
    .Select(x => DeserializeJson(x))
    .ObserveOnMainThread()
    .Subscribe(x => Debug.Log(x));

(webSocketClientは適当に定義したWebScoketのクライアントだと思ってください。)

例:Observable.Createの実行スレッドを切り替える

SubscribeOnを使うことで、Observableの初期化処理を指定したSchedulerで実行することができます。
Observable.CreateSubscribeした時のスレッドに依存して処理が実行されるため、SubscribeOn(Scheduler.ThreadPool) を挟むことで実行スレッドをそのままThreadPoolに切り替えることができます。

using System;
using System.IO;
using UniRx;
using UnityEngine;

public class SubscribeonSample: MonoBehaviour
{
    void Start()
    {
        // Unityメインスレッド上でファイル読み込みをやる場合
        ReadFileAsync("data.txt")        
            .Subscribe(x => Debug.Log(x));

        //  TrheadPoolでファイルを読み込んで、終わったらメインスレッドに戻す場合
        ReadFileAsync("data.txt")
            .SubscribeOn(Scheduler.ThreadPool) 
            .ObserveOnMainThread()             
            .Subscribe(x => Debug.Log(x));
    }

    // 指定されたファイルを読み込む
    IObservable<string> ReadFileAsync(string path)
    {
        return Observable.Create<String>(observer =>
        {
            using (var r = new StreamReader(path))
            {
                observer.OnNext(r.ReadToEnd());
            }
            observer.OnCompleted();
            return Disposable.Empty;
        });
    }
}

Schedulerの種類

UniRxにはいくつかのSchedulerが用意されています。

Scheduler 説明
ImmediateScheduler 現在のスレッドにて直ちに処理を実行する。
CurrentThreadScheduler 現在のスレッドにて処理を実行する。メッセージは一度キューに詰まれてから順次実行する。
ThreadPoolScheduler スレッドプール上にて処理を実行する。
MainThreadScheduler Unityメインスレッドにて処理を実行する(Time.timeScaeの影響を受ける)。
MainThreadIgnoreTimeScaleScheduler Unityメインスレッドにて処理を実行する(Time.timeScaleの影響を受けない)。
MainThreadFixedUpdateScheduler Unityメインスレッドにて処理を実行する(Time.fixedTimeを基準にする)。
MainThreadEndOfFrameScheduler Unityメインスレッドにて処理を実行する(EndOfFrameを基準にする)。

ImmediateScheduler / CurrentThreadScheduler

ImmediateSchedulerおよびCurrentThreadSchedulerは、現在のスレッド上でメッセージを処理します。
ImmediateSchedulerCurrentThreadSchedulerの違いは即座に実行するか、一度キューに詰んでから実行するかです。

例:実行スレッド上でそのまま実行したい場合
// Schedulerを指定しない場合はMainThreadSchedulerを用いて1秒計測する
// そのためタイマ計測終了後はメインスレッドに行く
Observable.Timer(TimeSpan.FromSeconds(1)).Subscribe();

new Thread(() =>
{
    // 別スレッド上でTimerを実行するとメインスレッドで時間の計測を行い、結果もメインスレッドに飛んでしまう
    Observable.Timer(TimeSpan.FromSeconds(1)).Subscribe();
}).Start();

new Thread(() =>
{
    // CurrentThreadSchedulerを指定することで、このスレッド上でタイマ計測が行われ、結果もこのスレッド上で処理される
    // (スレッドをブロックして時間の計測を行う)
    Observable.Timer(TimeSpan.FromSeconds(1), Scheduler.CurrentThread).Subscribe();
}).Start();
}

ThreadPoolScheduler

ThreadPoolSchedulerは名前のとおり、ThreadPool上で処理を実行するSchedulerです。
メインスレッドで処理中の内容を途中でThreadPoolに逃したい場合などに用いることができます。

なお、注意点として**ThreadPoolSchedulerではメッセージの順序が保障されていません。**
ThreadPoolSchedulerを通した結果、メッセージの順序が入れ替わる可能性がある点に注意しましょう。
場合によってはOnNextメッセージより先にOnCompletedメッセージが先着してしまい、値を取りこぼすといったこともありえます。

なお、こちらの問題についてはUniRxのissueに登録されてはいます。

ObserveOn needs scheduling control

処理を途中でThreadPoolに移動させる例

// WebSocketから受信したメッセージをThreadPoolでDeserializeする
// ThreadPoolSchedulerを使っているためメッセージの順序が入れ替わる可能性がある
webSocketClient.OnReceiveMessageAsObservable
    .ObserveOn(Scheduler.ThreadPool)
    .Select(x => DeserializeJson(x))
    .ObserveOnMainThread()
    .Subscribe(x => Debug.Log(x));

OnNextOnCompletedがの順序が入れ替わってしまう例

次のコードは一見問題なさそうですが、実際に動かすとOnCompletedしか出力されない場合があります。
理由としては、ThreadPoolOnNextOnCompletedメッセージが順番に入力されるが、OnNextメッセージの方が処理内容が重いため処理が終わらず、先にOnCompletedメッセージの方がThreadPoolから出てきてしまうからです。

OnCompletedのみしか得られないことがある例
ObservableWWW.Get("https://unity3d.com/jp/")
    .ObserveOn(Scheduler.ThreadPool) //以下の処理をThreadPoolで実行する
    .Select(html => Regex.Replace(html, "<[^>]*?>", "")) // HTMLタグを除去
    .Select(html => Regex.Replace(html, @"[ \s]+", " ")) // 2つ以上のスペースを除去
    .ObserveOnMainThread() //Unityメインスレッドに切り替え
    .Subscribe(x => Debug.Log(x),()=>Debug.Log("OnCompleted"));

もしこういったことがやりたいのであれば、Observable.Startで処理をThreadPoolに移した上で、SelectManyまたはContinueWithを使って元のObservableに結合するしかないかなと思います。(メッセージの頻度が多い場合はコストが大きそうですけど)

OnCompletedが先着するのを防止しながらThreadPool上で処理をする
ObservableWWW.Get("https://unity3d.com/jp/")
    .ContinueWith(html => Observable.Start(() => 
        Regex.Replace(Regex.Replace(html, "<[^>]*?>", ""), @"[ \s]+", " ")))
    .ObserveOnMainThread() //Unityメインスレッドに切り替え
    .Subscribe(x => Debug.Log(x), () => Debug.Log("OnCompleted"));

ContinueWithSelectManyの軽量版で、発行されるOnNextメッセージの数が1つ限定の場合のみ使えます。)

MainThreadScheduler

MainThreadSchedulerはUnityのメインスレッドを使って処理を行うSchedulerです。
別スレッドで実行した処理をメインスレッドに移したい場合に利用ができます。

時間順にActionが登録できるキューを用意し、そのキューをUnity上のMainTrehadDispatcherと呼ばれるGameObjectが内容をチェックし、処理できるActionがあれば取り出して実行する、という仕組みになっています。
MainThreadSchedulerにはいくつか種類がありますが、これはそれぞれMainTrehadDispatcherがどのタイミングでキューの中身をチェックするかの違いとなります。

MainThreadScheduler

MainThreadSchedulerMainTrehadDispatcher上で実行されるコルーチンを用いて時間の計測と処理を実行します。
そのため、MainThreadSchedulerを用いて時間を計測した場合のタイマの精度はWaitForSecondsと同じになります。
(どんなに細かい時間を指定しても、一番近いフレームの実行タイミングに丸められます。)

また、Time.timeScaleの影響を受ける点にも注意が必要です。

// 10msの精度を指定しても、60fpsで動作している場合は約16msの倍数でしか計測ができない
Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(10))
    .TimeInterval(Scheduler.Immediate) //OnNext同士の時間を計測する
    .Subscribe(x => Debug.Log(x.Interval.TotalMilliseconds))
    .AddTo(this);
実行結果
17.0453
16.5441
17.0453
17.0456
16.5436
...

MainThreadIgnoreTimeScaleScheduler

MainThreadIgnoreTimeScaleMainThreadSchedulerと同じ挙動ですが、こちらはTime.timeScaleの影響を受けません。

MainThreadFixedUpdateScheduler

MainThreadFixedUpdateTime.fixedTimeを用いて時間を計測します。それ以外の挙動はMainThreadSchedulerと同じです。
処理の実行タイミングはFixedUpdateとなります。

MainThreadEndOfFrameScheduler

MainThreadEndOfFrameWaitForEndOfFrameを用いたコルーチンで処理を実行します。

まとめ

UniRxを使う場合は、いまどのSchedulerを使っているかをちゃんと意識した方が事故なく運用できるかと思います。
時間を計測する系のオペレータの大半はデフォルトでMainThreadSchedulerを使うようになっているため、そこもちゃんと意識しておくとよいかと思います。

おまけ

「MainThreadSchedulerを使う」と「UnityメインスレッドでCurretnThreadSchedulerを使う」の違い

MainThreadSchedulerをそのまま使うのと、UnityメインスレッドでCurretnThreadSchedulerを使うのでは、同じように見えますが挙動が全く異なる点に注意が必要です。

特に大きく違う点は「時間の計測方法」です。MainTreadSchedulerではコルーチンを使って時間を計測するのに対し、CurretnThreadSchedulerでは現行スレッドをThread.Sleepして時間の計測を行います。
そのため次のようなコードを書いてしまっただけで、ゲームがフリーズしてしまうため注意が必要です。

ゲームがフリーズする例
void Start()
{
    // Unityメインスレッドを止めて時間を調整するためゲームが5秒間フリーズする
    Observable.Timer(TimeSpan.FromSeconds(5), Scheduler.CurrentThread)
        .Subscribe();
}

もしフリーズするのが嫌ならばThreadPoolSchedulerを使いましょう。

フリーズしない例
void Start()
{
    // どうしてもMainTreadSchedulerを使いたくないならThreadPoolSchedulerを指定する
    Observable.Timer(TimeSpan.FromSeconds(5), Scheduler.ThreadPool)
        .Subscribe();
}
41
30
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
41
30

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?