はじめに
本記事はRx初学者の筆者が、Rxをただただ使ってみて学びや気づきなどをまとめたものです。
Reactive Extensions(Rx)とは
ざっくり表現すると、LINQで非同期やイベント関連の処理も扱えるようにする仕組みのことです。
詳細はこちらの記事を参照ください。
非同期処理を使ってみる
C#で非同期処理を実現する際によく利用されるTaskとRxを使って非同期処理を書いてみます。
以下が実際のコードです。
重い処理を非同期処理で実施しつつ別の処理を行って、あるタイミングで非同期処理の完了を待つといったありがちなケースをTaskとRxの両方を使って書いています。
- Taskの場合
/// <summary>
/// サンプル用プログラム
/// </summary>
class Program
{
static async Task Main(string[] args)
{
// Taskで非同期処理を実行
var taskAsyncFunc = Task.Run(() =>
{
// 重い処理を実行している想定
Thread.Sleep(1000);
Console.WriteLine("Taskによる非同期処理完了");
});
// 非同期処理中に別処理を実施
Console.WriteLine("Taskによる非同期処理中の処理");
// 非同期処理の完了を待つ
await taskAsyncFunc;
}
}
- Rxの場合
/// <summary>
/// サンプル用プログラム
/// </summary>
class Program
{
static async Task Main(string[] args)
{
// Rxで非同期処理を実行
var rxAsyncFunc = Observable.ToAsync(() =>
{
// 重い処理を実行している想定
Thread.Sleep(1000);
Console.WriteLine("Rxによる非同期処理完了");
}).Invoke();
// 非同期処理中に別処理を実施
Console.WriteLine("Rxによる非同期処理中の処理");
// 非同期処理の完了を待つ
await rxAsyncFunc;
}
}
確かに非同期処理は書けますが、上記のコードではRxならではの利点が分からないです。
Rxの特徴は非同期処理、イベントをシーケンスとして表現できる点にあるというのが私の理解です。
なので、例えば非同期処理完了後にやらなければならない処理を登録したいという場合にはRxの場合、非常に簡単に実現できます。
具体的には以下なコードで実現できます。
/// <summary>
/// サンプル用プログラム
/// </summary>
class Program
{
static async Task Main(string[] args)
{
// Rxで非同期処理を実行
var rxAsyncFunc = Observable.ToAsync(() =>
{
// 重い処理を実行している想定
Thread.Sleep(1000);
Console.WriteLine("Rxによる非同期処理完了");
}).Invoke();
// この部分を追加
// 非同期処理完了後の処理を設定
rxAsyncFunc.Subscribe(unit =>
{
Console.WriteLine("非同期処理完了後の後処理");
});
// 非同期処理中に別処理を実施
Console.WriteLine("Rxによる非同期処理中の処理");
// 非同期処理の完了を待つ
await rxAsyncFunc;
}
}
上記コードではObservable.ToAsyncメソッドで作成した非同期処理をInvokeメソッドで実行しています。
実行時には変数rxAsyncFunc
でIObservableオブジェクトを受け取っています。IObservableオブジェクトは非同期処理の状態を観測します。IObservableオブジェクトにSubscribeメソッドでハンドラを設定することで、IObservableオブジェクトが非同期処理の完了を観測した時点で設定したハンドラを呼びだします。
このようにRxを使うと非同期処理の完了を補足して追加で処理を行わせるといったことが簡単にできます。
さいごに
記事を書くことで「Rxは非同期処理やイベントをシーケンスとして扱い、そのシーケンスを流れるイベントや非同期処理の完了を登録したハンドラに伝えている」というイメージが少し湧きました。
今後もRxについての記事を(理解のために)書かねばと思いました。