12
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Task→Observable 変換でハマったこと

Posted at

.NET の Task<T> は、Reactive Extensions が提供する拡張メソッド ToObservable()IObservable<T> に変換できます。

なにも考えずに ToObservable() を連発していたら、盛大にハマったのでメモ。

Task.Run().ToObservable() とか、意味ないっしょ

ダメなコード。

var i = 0;
IObservable<int> incrementObservable = Task.Run (() => {
	i++;
	Debug.WriteLine($"increment! - {i}");
	return i;
})
.ToObservable ();

Debug.WriteLine("Ready...");

incrementObservable // インクリメント
	.Repeat(3) // 3回繰り返す
	.Subscribe(
		x  => Debug.WriteLine($"OnNext({x})"),
		ex => Debug.WriteLine($"OnError({ex.ToString()})"),
		() => Debug.WriteLine("OnCompleted"));

incrementObservable は、副作用ありありですが、外部変数 i を +1 して後続に流す IObservable<int> です。
 これを .Repeat(3) して .Subscribe してますから、
 

Ready...
increment! - 1
OnNext(1)
increment! - 2
OnNext(2)
increment! - 3
OnNext(3)
OnCompleted

という出力を期待してました。
が、実際の出力はこう。

increment! - 1
Ready...
OnNext(1)
OnNext(1)
OnNext(1)
OnCompleted

Subscribe する前に Task が実行されてるし、 repeat してるのに increment されない。。。

「・・・ん? Task.Run().ToObservable() って、タスクを実行した結果を IObservable 化してるだけじゃね?」

コード見たまんまなんですが、これに気づくのに1時間かかりました。。。

期待通り動くのはこう↓。

var i = 0;
IObservable<int> incrementObservable = Observable.FromAsync(()=>Task.Run(() => {
	i++;
	Debug.WriteLine($"increment! - {i}");
	return i;
}));

Debug.WriteLine("Ready...");

incrementObservable // インクリメント
	.Repeat(3) // 3回繰り返す
	.Subscribe(
		x  => Debug.WriteLine($"OnNext({x})"),
		ex => Debug.WriteLine($"OnError({ex.ToString()})"),
		() => Debug.WriteLine("OnCompleted"));

Observable.FromAsync で Task の実行そのものを IObservable 化します。
これの結果は正しくこう↓なりました。

Ready...
increment! - 1
OnNext(1)
increment! - 2
OnNext(2)
increment! - 3
OnNext(3)
OnCompleted

Task は1回しか実行できない

ところで、 Task<T> は一度実行すると、2度目は実行できません。(Furure や Promise もそうだっけ)

var i = 0;
Task<int> incrementTask = new Task<int>(() => {
	i++;
	Debug.WriteLine($"increment! - {i}");
	return i;
});

incrementTask.RunSynchronously();
incrementTask.RunSynchronously(); 

このコードは2回目の RunSynchronously() で例外がでます。

となると、 incrementTask.ToObservable() したとしても、期待通り動いてくれなさそうです。
(そもそも Task は Start などしないと実行されないので、Observable のチェインの中でいつ呼ぶの?)

というわけで、 Task.ToObservable() は、どういう時に使えばいいのかよくわかりませんでした。だれか教えて下さい。(汗)

12
7
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?