LoginSignup
14
14

More than 5 years have passed since last update.

Rxと絡めたSynchronizationContextについて

Last updated at Posted at 2015-08-28

ReactiveCommandを含むクラスをConsoleアプリから動かそうとする

ReactiveCommandを使ったViewModelをUnitTestに使おうとすると

System.InvalidOperationException: SynchronizationContext.Current is null

が発生したりするのだけど、そこからはじめてSynchronizationContextのついていろいろと。

例外の起こるコード

例えばMVVMなViewModelをUnitTestする場合とか
using Reactive.Bindings;
using System;
using System.Reactive.Linq;
using System.Windows.Input;

namespace ConsoleApplication1
{
    class ViewModel
    {
        ReactiveCommand m_command;
        public ICommand Command
        {
            get
            {
                if(m_command== null)
                {
                    m_command = new ReactiveCommand();
                    m_command.Subscribe(_ =>
                    {
                        Console.WriteLine("Command");
                    });
                }
                return m_command;
            }
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            var vm = new ViewModel();

            var subscription =
                Observable.Interval(TimeSpan.FromSeconds(2))
                .Take(1)
                .Subscribe(x =>
                {
                    vm.Command.Execute(null);
                })
                ;

            // 終わるまで待ってる・・・
            Console.ReadLine();
        }
    }
}

実行すると

vm.Command.Execute(null);

のところで

System.InvalidOperationException: SynchronizationContext.Current is null

と言われます。

とりあえず

                .Subscribe(x =>
                {
                    // 安直にSetSynchronizationContext
                    SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());

                    vm.Command.Execute(null);
                })

これでエラーは出なくなる。
しかし、やりたいのはこれではない。

Rx的にはこうじゃないのか
        static void Main(string[] args)
        {
            // set SynchronizationContext.Current
            SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());

            var vm = new ViewModel();

            var subscription =
                Observable.Interval(TimeSpan.FromSeconds(2))
                .Take(1)
                // SynchronizationContext.Currentでカレントスレッドに戻れる?
                .ObserveOn(SynchronizationContext.Current)
                .Subscribe(x =>
                {
                    vm.Command.Execute(null);
                })
                ;

            Console.ReadLine();
        }

しかし

System.InvalidOperationException: SynchronizationContext.Current is null

発生…。何故か。

            Console.WriteLine("ThreadID: " + Thread.CurrentThread.ManagedThreadId);
            var subscription =
                Observable.Interval(TimeSpan.FromSeconds(2))
                .Take(1)
                .Do(x =>
                {
                    Console.WriteLine("ThreadID: " + Thread.CurrentThread.ManagedThreadId);
                })
                // 
                .ObserveOn(SynchronizationContext.Current)
                .Do(x =>
                {
                    Console.WriteLine("ThreadID: " + Thread.CurrentThread.ManagedThreadId);
                })
                .Subscribe(x =>
                {
                    vm.Command.Execute(null);
                })
                ;

という風にしてみると

ThreadID: 8
ThreadID: 10 // Interval
ThreadID: 11 // SynchronizationContext

8, 10, 8となってほしいのですけどそうはいかず。
実は、

new SynchronizationContext()

はスレッドプールに投げる実装なのでカレントスレッドに戻してくれたりはしない。

ならばと

ObserveOn(CurrentThreadScheduler.Instance)

としてみたが

ThreadID: 8
ThreadID: 10 // Interval
ThreadID: 10 // CurrentThreadSchedulerはスレッドを乗り換えない

字義通りにカレントスレッドで実行するというのとはちょっと意味が違うようだ。
ImmediateSchedulerとの対比から言うとQueueingScheduler的なものでスレッドの乗り換えはしない、つまりカレントスレッドということらしい。

カレントスレッドに乗り換えるSynchronizationContextを作りたい

Await, SynchronizationContext, and Console Apps
にやり方があった。

上記の記事からSingleThreadSynchronizationContextを頂いてくる。

SingleThreadSynchronizationContext
    /// <summary>Provides a SynchronizationContext that's single-threaded.</summary>
    class SingleThreadSynchronizationContext : SynchronizationContext
    {
        /// <summary>The queue of work items.</summary>
        private readonly BlockingCollection<KeyValuePair<SendOrPostCallback, object>> m_queue =
            new BlockingCollection<KeyValuePair<SendOrPostCallback, object>>();

        /// <summary>The processing thread.</summary>
        private readonly Thread m_thread = Thread.CurrentThread;

        /// <summary>Dispatches an asynchronous message to the synchronization context.</summary>
        /// <param name="d">The System.Threading.SendOrPostCallback delegate to call.</param>
        /// <param name="state">The object passed to the delegate.</param>
        public override void Post(SendOrPostCallback d, object state)
        {
            if (d == null) throw new ArgumentNullException("d");
            m_queue.Add(new KeyValuePair<SendOrPostCallback, object>(d, state));
        }

        /// <summary>Not supported.</summary>
        public override void Send(SendOrPostCallback d, object state)
        {
            throw new NotSupportedException("Synchronously sending is not supported.");
        }

        /// <summary>Runs an loop to process all queued work items.</summary>
        public void RunOnCurrentThread()
        {
            foreach (var workItem in m_queue.GetConsumingEnumerable())
                workItem.Key(workItem.Value);
        }

        /// <summary>Notifies the context that no more work will arrive.</summary>
        public void Complete() { m_queue.CompleteAdding(); }
    }

で呼び出し方をこっちに合うように調整。

        static void Main(string[] args)
        {
            // contextを作ってセットする
            var context = new SingleThreadSynchronizationContext();
            SynchronizationContext.SetSynchronizationContext(context);

            var vm = new ViewModel();

            Console.WriteLine("ThreadID: " + Thread.CurrentThread.ManagedThreadId);
            var subscription =
                Observable.Interval(TimeSpan.FromSeconds(2))
                .Take(1)
                .Do(x =>
                {
                    Console.WriteLine("ThreadID: " + Thread.CurrentThread.ManagedThreadId);
                })
                // CurrentThreadでObserveだ
                .ObserveOn(context)
                .Do(x =>
                {
                    Console.WriteLine("ThreadID: " + Thread.CurrentThread.ManagedThreadId);
                })
                .Subscribe(x =>
                {
                    vm.Command.Execute(null);
                }
                , ex=>
                {
                }
                , ()=>
                {
                    // 終了通知
                    context.Complete();
                })
                ;

            // エンキューされたアクションを消化する
            context.RunOnCurrentThread();

            Console.ReadLine();
        }

実行してみると

ThreadID: 8
ThreadID: 10
ThreadID: 8
Command

となりうまくいった。
もうちょっとSynchronizationContextの嬉しさがでるようにしてみる。

EventLoop風にしてみる

        static void Main(string[] args)
        {
            // contextを作ってセットする
            var context = new SingleThreadSynchronizationContext();
            SynchronizationContext.SetSynchronizationContext(context);

            // 本体
            _Main(args);

            Observable.Start(() =>
            {
                // エンターで停止・・・
                Console.ReadLine();
            })
            .Subscribe(x =>
            {
                // コンテキストを終了させる
                context.Complete();
            });

            // ブロッキング
            context.RunOnCurrentThread();
        }

        static void _Main(string[] args)
        {
            var vm = new ViewModel();

            Console.WriteLine("ThreadID: " + Thread.CurrentThread.ManagedThreadId);
            var subscription =
                Observable.Interval(TimeSpan.FromSeconds(2))
                .Take(1)
                .Do(x =>
                {
                    Console.WriteLine("ThreadID: " + Thread.CurrentThread.ManagedThreadId);
                })
                // SetSynchronizationContext等が隠蔽された
                .ObserveOn(SynchronizationContext.Current)
                .Do(x =>
                {
                    Console.WriteLine("ThreadID: " + Thread.CurrentThread.ManagedThreadId);
                })
                .Subscribe(x =>
                {
                    vm.Command.Execute(null);
                }
                , ex=>
                {
                }
                , ()=>
                {
                })
                ;
        }

ここまでやると本体ではObserveOnするだけになって細かいところを隠蔽できた。
最終版の動くコード

Scheduler(ObserveOnの引数)とSynchronizationContextは何なのか

Scheduler(ImmediateScheduler以外)は処理をエンキューしてどこかでデキューして実行する。
SynchronizaionContextはasync/awaitのawaitの続きをどこで実行するのかを設定するのに使われるもので、
WPFやWinForm等のメインループを内包するフレームワーク側が裏で提供する想定のようだ。
どちらも処理をPostされて、なんらかの方法でPostされたものを後で(ImmediateSchedulerを除く)処理するものを抽象化している。
いろんなスレッドからエンキュー(Post)されるが、特定のスレッドでデキューするループが働いているという理解でよさそう。

Rx入門 (15) - スケジューラの利用
にRxのスケジューラの一覧がありました。

下記にスケジューラの種類とそれぞれに関してのメモ。細かいところは間違っているかもしれないので注意・・・

Shceduler SynchronizationContext memo
ControlScheduler WindowsFormsSynchronizationContext WindowsFormのメッセージポンプに仕込まれたキュー
DispatcherScheduler DispatcherSynchronizationContext WPFに仕込まれたキュー
ImmediateScheduler 待たずにすぐに実行する。キューですらない
CurrentThreadScheduler キュー。たぶんRxで他のスケジューラが指定されなかったときに使われる。Subscribeすると生成される。明示的に指定するのは無意味なのでは?
NewThreadScheduler 新しいスレッドをどんどん作る?
EventLoopScheduler 新しいスレッドでEventLoopを開始する。ThreadPoolに対して、シングルスレッドが売り?
ThreadPoolScheduler ThreadPoolで処理する
ObserveOn(Thread.CurrentThread)

とかできたら便利そうだが、デキューが不在になるので無理ですね。

14
14
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
14