C#
ReactiveExtensions
非同期処理
Rx
C#Day 17

TaskとRxを組み合わせると最強に見える

More than 1 year has passed since last update.

これは C# Advent Calendar 2017 の17日目の記事です。

昨日は @nitacore さんの C#でのRPCのインターフェース表現の一例 でした。


TaskとReactiveExtensions(Rx)は共に非同期処理を実現するための機能として各所で紹介されています。

特にUnity3DにおけるC#だとUniRxのお陰で非同期処理といえばRxという印象が強いです。

しかし、Unity2017では.NET Framework4.6が使えるようになり、Taskの利用もできるようになりました。

そこで今回はTaskとRxを組み合わせ、より便利に使う方法を考えてみたいと思います。


はじめに

まずTaskとRxそれぞれの特徴を簡単に確認してみましょう。


  • Task


    • 非同期な処理の流れを簡単に記述できる。



  • ReactiveExtensions


    • 非同期にイベント又はメッセージを送信することができる。



語弊アリアリの状態で端的に言い表すとすれば、

Taskは処理の流れを記述し、Rxは処理のトリガーを作成できます。

勘違いされやすい(と個人的に感じている)のですが、

TaskとRxは択一的なものではなくそれぞれ別の役割のものなのです。

つまり、組み合わせると最強!


TaskとRxの相互運用

TaskとRxを組み合わせるにあたり、それぞれの相互変換を行う必要があります。

それを行ってくれるのがTaskObservableExtensionsです。

(UniRxにももちろんあります)


使用例:他のクラスのIObservableを利用する

他のクラスでのイベントがIObservableの状態で公開されていることは多々あると思います。

例えばデータの更新を待ちたい場合には以下のように記述できます。

(ctはCancellationToken)

var updatedData = await target.OnUpdated.FirstAsync().ToTask(ct);

IObservableでイベントを公開しているものとしてはUIクラスが多いでしょう。

例えばキャンセルボタンによる処理のキャンセルを行いたい場合は以下のように記述できます。

using (var heavyTaskCts = CancellationTokenSource.CreateLinkedTokenSource(ct))

using (var buttonCts = CancellationTokenSource.CreateLinkedTokenSource(ct))
{
var cancelButtonClickedTask = _cancelButton.OnClicked.FirstAsync().ToTask(buttonCts.Token);
await Task.WhenAny(HeavyTaskAsync(heavyTaskCts.Token), cancelButtonClickedTask);
if (cancelButtonClickedTask.Status == TaskStatus.RanToCompletion)
{
// キャンセルボタンが押された
heavyTaskCts.Cancel();
await ShowDialog("キャンセルされました", ct);
return;
}

// buttonCts.Dispose()だとタスク自体はキャンセルされないので明示的にCancel()を呼び出す必要がある
buttonCts.Cancel();
}


使用例:並列に実行できない処理をキューイングする

スレッドセーフでない処理を行いたい場合にlockやSemaphoreSlimで排他制御を行うことがあると思います。

しかし、何らかのRxのイベントをSubscribeした中でそれを行いたい場合など、

async/awaitな処理を記述するわけにはいきません。

(asyncなラムダを渡すことは可能だが、例外のハンドリングができなくなるため)

そんな場合には処理をキューイングする部分と、それを受け取って処理する処理ループに分割することで

同期的なデリゲートの中からでも非同期な処理をキューイングすることができます。

例えば以下のようなクラスを用意します。

using AsyncAction = Func<CancellationToken, Task>;

class Monopolizer
{
/// <summary>
/// 処理のキュー
/// </summary>
private readonly ConcurrentQueue<AsyncAction> _queue = new ConcurrentQueue<AsyncAction>();

/// <summary>
/// <see cref="Enqueue(AsyncAction)"/>されたことを通知する内部イベント
/// </summary>
private readonly Subject<object> _queued = new Subject<object>();

/// <summary>
/// 処理をキューイングする
/// </summary>
public void Enqueue(AsyncAction process)
{
_queue.Enqueue(process);
_queued.OnNext(null);
}

/// <summary>
/// 処理を取り出して実行する
/// </summary>
public async Task LoopAsync(CancellationToken ct)
{
// ThreadContextを破棄
await Task.Run(() => { }).ConfigureAwait(false);

while (!ct.IsCancellationRequested)
{
// 処理がキューイングするまで待つタスクを作成
var waitQueueTask = _queued.FirstAsync().ToTask(ct);
if (_queue.IsEmpty) // タスク作成中にキューイングされた時用にチェックする
{
await waitQueueTask; // 実際に待ち合わせを行う
}

// キューイングされているものを待つ
while (_queue.TryDequeue(out var t)) await t(ct);
}
}
}

使う側では以下のような感じで呼び出せば簡単に排他制御することができます。

observable.Subscribe(x => monopolizer.Enqueue(async ct => { await Task.Delay(x, ct); Console.WriteLine(x); }));


使用例:コールバックメソッドを待ち合わせる

ライブラリを使用する際にコールバック(デリゲート)を登録する必要があるものが存在します。

特にiOS上のプラグイン(ネイティブDLL)の呼び出し時などは

MonoPInvokeCallbackAttributeの指定とともにstaticメソッドである必要があるため

Taskの途中で処理を待ち合わせするのが困難です。

そこで、コールバックメソッドでイベントを発行してそれを待ち合わせすることにより

async/awaitな処理の記述をすることができます。

例えば以下のようなDLLがあったとします。

public delegate void InterlopLoginCallback(ulong userId, ulong data);

[DllImport("__Internal")]
public static extern void Login(ulong userId, ulong authToken, InterlopLoginCallback callback);

ここで渡すコールバックはstaticである必要がある場合は以下のようなコードを記述する必要があります。

/// <summary>

/// <see cref="Login(ulong, ulong, InterlopLoginCallback)"/>に渡すデリゲートインスタンスが
/// GCされるのを回避するために変数として保持しておく
/// </summary>
private static InterlopLoginCallback _loginCallback = LoginCallback;

/// <summary>
/// <see cref="LoginCallback(ulong, ulong)"/>が呼ばれたことを通知するイベント
/// </summary>
private static Subject<(ulong userId, ulong data)> _onLoggedIn = new Subject<(ulong userId, ulong data)>();

/// <summary>
/// <see cref="Login(ulong, ulong, InterlopLoginCallback)"/>用のコールバック
/// </summary>
[MonoPInvokeCallback]
private static void LoginCallback(ulong userId, ulong data) => _onLoggedIn.OnNext((userId, data));

/// <summary>
/// ログイン処理を行う
/// </summary>
public static Task<(ulong userId, ulong data)> LoginAsync(ulong userId, ulong authToken)
{
// 先にTaskを起動しておかないとMock等で即コールバックが呼ばれた時に取りこぼす
var waitLoginTask = _onLoggedIn.FirstAsync(x => x.userId == userId).ToTask();
Login(userId, authToken, LoginCallback);
return waitLoginTask;
}

使う側では以下のようにasync/awaitなコードが記述可能です。

...

await LoginAsync(userId, authToken);
...

ちなみに単純にデリゲートを渡すだけであればTaskCompletionSourceを使えばどうにでもなりますが、

GCによって解放されてしまう可能性がある点に注意する必要があります。


まとめ

このようにTaskとRxを組み合わせることにより、多彩な操作を行うことができます。

ぜひ組み合わせて使ってみてください。


明日は @usamik26 さんの Moq : Mocking Framework for .NET です。