#ReactiveCommandを含むクラスをConsoleアプリから動かそうとする
ReactiveCommandを使ったViewModelをUnitTestに使おうとすると
System.InvalidOperationException: SynchronizationContext.Current is null
が発生したりするのだけど、そこからはじめてSynchronizationContextのついていろいろと。
#例外の起こるコード
using Reactive.Bindings;
using System;
using System.Reactive.Linq;
using System.Windows.Input;
namespace ConsoleApplication1
{
class ViewModel
{
ReactiveCommand m_command;
public ICommand Command
{
get
{
if(m_command== null)
{
m_command = new ReactiveCommand();
m_command.Subscribe(_ =>
{
Console.WriteLine("Command");
});
}
return m_command;
}
}
}
class Program
{
static void Main(string[] args)
{
var vm = new ViewModel();
var subscription =
Observable.Interval(TimeSpan.FromSeconds(2))
.Take(1)
.Subscribe(x =>
{
vm.Command.Execute(null);
})
;
// 終わるまで待ってる・・・
Console.ReadLine();
}
}
}
実行すると
vm.Command.Execute(null);
のところで
System.InvalidOperationException: SynchronizationContext.Current is null
と言われます。
#とりあえず
.Subscribe(x =>
{
// 安直にSetSynchronizationContext
SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());
vm.Command.Execute(null);
})
これでエラーは出なくなる。
しかし、やりたいのはこれではない。
static void Main(string[] args)
{
// set SynchronizationContext.Current
SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());
var vm = new ViewModel();
var subscription =
Observable.Interval(TimeSpan.FromSeconds(2))
.Take(1)
// SynchronizationContext.Currentでカレントスレッドに戻れる?
.ObserveOn(SynchronizationContext.Current)
.Subscribe(x =>
{
vm.Command.Execute(null);
})
;
Console.ReadLine();
}
しかし
System.InvalidOperationException: SynchronizationContext.Current is null
発生…。何故か。
Console.WriteLine("ThreadID: " + Thread.CurrentThread.ManagedThreadId);
var subscription =
Observable.Interval(TimeSpan.FromSeconds(2))
.Take(1)
.Do(x =>
{
Console.WriteLine("ThreadID: " + Thread.CurrentThread.ManagedThreadId);
})
//
.ObserveOn(SynchronizationContext.Current)
.Do(x =>
{
Console.WriteLine("ThreadID: " + Thread.CurrentThread.ManagedThreadId);
})
.Subscribe(x =>
{
vm.Command.Execute(null);
})
;
という風にしてみると
ThreadID: 8
ThreadID: 10 // Interval
ThreadID: 11 // SynchronizationContext
8, 10, 8となってほしいのですけどそうはいかず。
実は、
new SynchronizationContext()
はスレッドプールに投げる実装なのでカレントスレッドに戻してくれたりはしない。
ならばと
ObserveOn(CurrentThreadScheduler.Instance)
としてみたが
ThreadID: 8
ThreadID: 10 // Interval
ThreadID: 10 // CurrentThreadSchedulerはスレッドを乗り換えない
字義通りにカレントスレッドで実行するというのとはちょっと意味が違うようだ。
ImmediateSchedulerとの対比から言うとQueueingScheduler的なものでスレッドの乗り換えはしない、つまりカレントスレッドということらしい。
#カレントスレッドに乗り換えるSynchronizationContextを作りたい
Await, SynchronizationContext, and Console Apps
にやり方があった。
上記の記事からSingleThreadSynchronizationContextを頂いてくる。
/// <summary>Provides a SynchronizationContext that's single-threaded.</summary>
class SingleThreadSynchronizationContext : SynchronizationContext
{
/// <summary>The queue of work items.</summary>
private readonly BlockingCollection<KeyValuePair<SendOrPostCallback, object>> m_queue =
new BlockingCollection<KeyValuePair<SendOrPostCallback, object>>();
/// <summary>The processing thread.</summary>
private readonly Thread m_thread = Thread.CurrentThread;
/// <summary>Dispatches an asynchronous message to the synchronization context.</summary>
/// <param name="d">The System.Threading.SendOrPostCallback delegate to call.</param>
/// <param name="state">The object passed to the delegate.</param>
public override void Post(SendOrPostCallback d, object state)
{
if (d == null) throw new ArgumentNullException("d");
m_queue.Add(new KeyValuePair<SendOrPostCallback, object>(d, state));
}
/// <summary>Not supported.</summary>
public override void Send(SendOrPostCallback d, object state)
{
throw new NotSupportedException("Synchronously sending is not supported.");
}
/// <summary>Runs an loop to process all queued work items.</summary>
public void RunOnCurrentThread()
{
foreach (var workItem in m_queue.GetConsumingEnumerable())
workItem.Key(workItem.Value);
}
/// <summary>Notifies the context that no more work will arrive.</summary>
public void Complete() { m_queue.CompleteAdding(); }
}
で呼び出し方をこっちに合うように調整。
static void Main(string[] args)
{
// contextを作ってセットする
var context = new SingleThreadSynchronizationContext();
SynchronizationContext.SetSynchronizationContext(context);
var vm = new ViewModel();
Console.WriteLine("ThreadID: " + Thread.CurrentThread.ManagedThreadId);
var subscription =
Observable.Interval(TimeSpan.FromSeconds(2))
.Take(1)
.Do(x =>
{
Console.WriteLine("ThreadID: " + Thread.CurrentThread.ManagedThreadId);
})
// CurrentThreadでObserveだ
.ObserveOn(context)
.Do(x =>
{
Console.WriteLine("ThreadID: " + Thread.CurrentThread.ManagedThreadId);
})
.Subscribe(x =>
{
vm.Command.Execute(null);
}
, ex=>
{
}
, ()=>
{
// 終了通知
context.Complete();
})
;
// エンキューされたアクションを消化する
context.RunOnCurrentThread();
Console.ReadLine();
}
実行してみると
ThreadID: 8
ThreadID: 10
ThreadID: 8
Command
となりうまくいった。
もうちょっとSynchronizationContextの嬉しさがでるようにしてみる。
#EventLoop風にしてみる
static void Main(string[] args)
{
// contextを作ってセットする
var context = new SingleThreadSynchronizationContext();
SynchronizationContext.SetSynchronizationContext(context);
// 本体
_Main(args);
Observable.Start(() =>
{
// エンターで停止・・・
Console.ReadLine();
})
.Subscribe(x =>
{
// コンテキストを終了させる
context.Complete();
});
// ブロッキング
context.RunOnCurrentThread();
}
static void _Main(string[] args)
{
var vm = new ViewModel();
Console.WriteLine("ThreadID: " + Thread.CurrentThread.ManagedThreadId);
var subscription =
Observable.Interval(TimeSpan.FromSeconds(2))
.Take(1)
.Do(x =>
{
Console.WriteLine("ThreadID: " + Thread.CurrentThread.ManagedThreadId);
})
// SetSynchronizationContext等が隠蔽された
.ObserveOn(SynchronizationContext.Current)
.Do(x =>
{
Console.WriteLine("ThreadID: " + Thread.CurrentThread.ManagedThreadId);
})
.Subscribe(x =>
{
vm.Command.Execute(null);
}
, ex=>
{
}
, ()=>
{
})
;
}
ここまでやると本体ではObserveOnするだけになって細かいところを隠蔽できた。
最終版の動くコード
Scheduler(ObserveOnの引数)とSynchronizationContextは何なのか
Scheduler(ImmediateScheduler以外)は処理をエンキューしてどこかでデキューして実行する。
SynchronizaionContextはasync/awaitのawaitの続きをどこで実行するのかを設定するのに使われるもので、
WPFやWinForm等のメインループを内包するフレームワーク側が裏で提供する想定のようだ。
どちらも処理をPostされて、なんらかの方法でPostされたものを後で(ImmediateSchedulerを除く)処理するものを抽象化している。
いろんなスレッドからエンキュー(Post)されるが、特定のスレッドでデキューするループが働いているという理解でよさそう。
Rx入門 (15) - スケジューラの利用
にRxのスケジューラの一覧がありました。
下記にスケジューラの種類とそれぞれに関してのメモ。細かいところは間違っているかもしれないので注意・・・
Shceduler | SynchronizationContext | memo |
---|---|---|
ControlScheduler | WindowsFormsSynchronizationContext | WindowsFormのメッセージポンプに仕込まれたキュー |
DispatcherScheduler | DispatcherSynchronizationContext | WPFに仕込まれたキュー |
ImmediateScheduler | 待たずにすぐに実行する。キューですらない | |
CurrentThreadScheduler | キュー。たぶんRxで他のスケジューラが指定されなかったときに使われる。Subscribeすると生成される。明示的に指定するのは無意味なのでは? | |
NewThreadScheduler | 新しいスレッドをどんどん作る? | |
EventLoopScheduler | 新しいスレッドでEventLoopを開始する。ThreadPoolに対して、シングルスレッドが売り? | |
ThreadPoolScheduler | ThreadPoolで処理する |
ObserveOn(Thread.CurrentThread)
とかできたら便利そうだが、デキューが不在になるので無理ですね。