Unity
UniRx
UniTask

【UniRx.Async】UniTaskのキャンセル覚書【Unity】

はじめに

手元の UniRx をアップデートしてUniRx.Asyncを導入。

UniTaskを触ってみたがキャンセルが辛かったので、少しでも楽をするためにいろいろ調べた。

UniTaskのキャンセル

そもそもキャンセルというのは「通常のシーケンスとは異なる割り込み(異常)が発生した結果、以後の処理を行わない」と言い換えられる。

と考えるとasync/awaitなしのコードと同様、適切なタイミングで例外をスローすることによって実現できる1

キャンセルにより処理が止まったという事実はOperationCanceledExceptionをスローすることで表現し、必要があればtry~catchで捕捉して破棄処理などを実行する。

外からキャンセル

実行中のUniTaskインスタンスは、そのメソッド呼び出しなどによって外部から処理をキャンセルすることはできない。

では外からキャンセルできないのかというとそうではなく、次のような手順になる。

UniTaskの外側〜
 1. CancellationTokenSourceのインスタンス(以下cts)を生成
 2. UniTask実行時にCancellationTokenのインスタンスを渡す(cts.Tokenで得られる。以下ct2
 3. キャンセルしたいタイミングでcts.Cancel()を呼び出す

UniTaskの内側〜
 キャンセルを受け付けるタイミングでキャンセルが要求されているかctを見て確認し、Operation~例外をスローする

using System;
using System.Threading;
using UniRx.Async;
using UnityEngine;

public class OuterCancel : MonoBehaviour
{
    private CancellationTokenSource _cancellationTokenSource;

    void Start()
    {
        // 1. CancellationTokenSourceのインスタンスを生成
        _cancellationTokenSource = new CancellationTokenSource();

        // 2. UniTask実行時にCancellationTokenのインスタンスを渡す
        CancelableAsync(_cancellationTokenSource.Token).Forget(e => { }); // 呼び出し階層のトップでOperationCanceledExceptionを握りつぶす
    }

    void Update()
    {
        if (Input.anyKeyDown)
        {
            // 3. CancellationTokenSourceのCancel()を呼び出す
            _cancellationTokenSource.Cancel();
        }
    }

    async UniTask CancelableAsync(CancellationToken cancellationToken = default(CancellationToken))
    {
        await UniTask.Delay(TimeSpan.FromSeconds(1));

        Debug.Log("waited 1sec");

        if (cancellationToken.IsCancellationRequested)
        {
            // a.
            // キャンセルされていたらOperationCanceledExceptionをスロー
            throw new OperationCanceledException(cancellationToken);
        }

        await UniTask.Delay(TimeSpan.FromSeconds(2));

        Debug.Log("waited 2sec");

        // b.
        // キャンセルされていたらOperationCanceledExceptionをスロー
        cancellationToken.ThrowIfCancellationRequested();

        // c.
        // キャンセルされていたらOperationCanceledExceptionをスロー
        await UniTask.Delay(TimeSpan.FromSeconds(3), cancellationToken: cancellationToken);

        Debug.Log("waited 3sec");

        try
        {
            await UniTask.Delay(TimeSpan.FromSeconds(5), cancellationToken: cancellationToken);
        }
        catch (OperationCanceledException)
        {
            // キャンセル時に実行したい処理があればtry~catchで捕捉
            Debug.LogWarning("Canceled!");
            throw;
        }

        Debug.LogWarning("Done!");
    }
}

通常CancellationTokenを渡されたメソッドはそのCancelltionTokenキャンセル時にOperation~例外を吐くように実装されており(後述)、自分が実装する場合もそうすべき。

なのでなるべく c. のようにUniTask実行時にCancellationTokenを渡すようにして、途中で伝播が止まらないように呼び出し階層のトップでOperation~例外を握りつぶす。

また、a,bの記述は次のような場合などに使用する。

  • awaitawaitの間で時間がかかる処理をしていて、その間キャンセルを受け付けたい
  • UniTaskを返すメソッドが引数でCancellationTokenを受け取るようになっていない

 
* * *

 
引数でCancallationTokenを受け取るようにするとシグネチャが長くなってしまうのが悩ましい。受け取らなくてもいいようにデフォルト引数を指定するとさらに長くなる(辛い)。

Rxの購読キャンセルはオペレータの合間で処理が切れてくれたが、Task/UniTaskの場合はどこで停止すべきかを自分で決め、CancellationTokenを呼び出しの奥深くまで脈々と受け継いでいく形になる(辛い)。

UniRx.Asyncに実装されているメソッドのキャンセル

引数でCancellationTokenを受け取るメソッド(引数省略)

  • UniTask.Yield()    ... 特定のUnityイベントのタイミングまで待つ
  • UniTask.Delay()   ... 指定時間経過するまで待つ
  • UniTask.DelayFrame() ... 指定フレーム数経過するまで待つ
  • UniTask.WaitUntil()   ... 引数に与えたFunc<bool>がtrueを返すまで待つ
  • UniTask.WaitWhile()  ... 引数に与えたFunc<bool>がfalseを返すまで待つ

などは、渡したCancellationTokenがキャンセルされるとOperation~例外をスローし、停止する。

また、UniTaskのインスタンスに拡張メソッドとして生えているメソッド(引数省略)

  • Timeout()
  • WithCancellation()

も引数で渡したCancellationTokenがキャンセルされるとOperation~例外やTimeoutExceptionをスローするので以後の処理は実行されなくなる3が、これらはレシーバの UniTask を停止するわけではない

 
大事なことなので繰り返す。

 
これらはレシーバの UniTask を停止するわけではない

 
なので、これらを副作用を含むUniTaskに対して呼び出してはいけない。副作用を含むUniTaskのキャンセル(停止)は、必ずUniTaskを返すメソッド呼び出し時にCancellationTokenを渡すことで実現する。

WithCancellation()などは主にAsync~TriggerObservable~Triggerなどのイベント発行待ちのキャンセル用に使用する。

 
なお、この挙動は次のバージョンで改善される様子。

IObservableawaitキャンセル

GetAwaiter()もしくはToUniTask()CancellationTokenを渡す。

// キャンセルされたら購読を停止してOperation~例外をスロー
await fooObservable.GetAwaiter(cancellationToken);
await barObservable.ToUniTask(cancellationToken);

WithCancellation()などとは異なり、ちゃんと購読を破棄してくれる。

キャンセルを考慮したUniTaskIObservable変換

UniTaskIObservableへの変換にはToObservable()が使えるが、これだとストリームの購読キャンセルをUniTaskに伝えることができない。

購読のキャンセルがUniTaskに伝わるようにIObservable変換するには、Observable.StartAsync()もしくはObservable.FromAsync()を使用する4

これらは現状のUniRx.Asyncで実装されていないので、.NETのv2.2.4の実装を参考に自分で実装してみた。

Observable.Async.cs
// SEE: https://github.com/dotnet/reactive/blob/v2.2.4/Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Async.cs
using System;
using System.Threading;
using UniRx.Async;

namespace UniRx
{
    partial class Observable
    {
        public static IObservable<TResult> StartAsync<TResult>(Func<UniTask<TResult>> functionAsync)
        {
            if (functionAsync == null)
                throw new ArgumentNullException("functionAsync");

            var task = default(UniTask<TResult>);
            try
            {
                task = functionAsync();
            }
            catch (Exception exception)
            {
                return Throw<TResult>(exception);
            }

            return task.ToObservable();
        }

        public static IObservable<TResult> StartAsync<TResult>(Func<CancellationToken, UniTask<TResult>> functionAsync)
        {
            if (functionAsync == null)
                throw new ArgumentNullException("functionAsync");

            var cancellable = new CancellationDisposable();

            var task = default(UniTask<TResult>);
            try
            {
                task = functionAsync(cancellable.Token);
            }
            catch (Exception exception)
            {
                return Throw<TResult>(exception);
            }

            var result = task.ToObservable();

            return Create<TResult>(observer =>
            {
                var subscription = result.Subscribe(observer);
                return new CompositeDisposable(cancellable, subscription);
            });
        }

        public static IObservable<Unit> StartAsync(Func<UniTask> actionAsync)
        {
            if (actionAsync == null)
                throw new ArgumentNullException("actionAsync");

            var task = default(UniTask);
            try
            {
                task = actionAsync();
            }
            catch (Exception exception)
            {
                return Throw<Unit>(exception);
            }

            return task.ToObservable();
        }

        public static IObservable<Unit> StartAsync(Func<CancellationToken, UniTask> actionAsync)
        {
            if (actionAsync == null)
                throw new ArgumentNullException("actionAsync");

            var cancellable = new CancellationDisposable();

            var task = default(UniTask);
            try
            {
                task = actionAsync(cancellable.Token);
            }
            catch (Exception exception)
            {
                return Throw<Unit>(exception);
            }

            var result = task.ToObservable();

            return Create<Unit>(observer =>
            {
                var subscription = result.Subscribe(observer);
                return new CompositeDisposable(cancellable, subscription);
            });
        }

        public static IObservable<TResult> FromAsync<TResult>(Func<UniTask<TResult>> functionAsync)
        {
            return Defer(() => StartAsync(functionAsync));
        }

        public static IObservable<TResult> FromAsync<TResult>(Func<CancellationToken, UniTask<TResult>> functionAsync)
        {
            return Defer(() => StartAsync(functionAsync));
        }

        public static IObservable<Unit> FromAsync(Func<UniTask> actionAsync)
        {
            return Defer(() => StartAsync(actionAsync));
        }

        public static IObservable<Unit> FromAsync(Func<CancellationToken, UniTask> actionAsync)
        {
            return Defer(() => StartAsync(actionAsync));
        }
    }
}

BestHTTPのHTTPRequestをUniTask

BestHTTPのHTTPRequestクラスに、UniTask<string>を返すSendTaskAsync()を拡張メソッドで追加してみた。HTTPRequestによるHTTP通信はAbort()による途中キャンセルが可能なのでキャンセルにも対応。

ついでにObservable.FromAsync()を使ってIObservable<string>を返すSendAsync()も実装。

HTTPRequestExtensions.cs
using System;
using System.Net;
using System.Threading;
using BestHTTP;

namespace UniRx.Async
{
    public static class HTTPRequestExtensions
    {
        public static IObservable<string> SendAsync(this HTTPRequest request)
        {
            return Observable.FromAsync(cancel => request.SendTaskAsync(cancel));
        }

        public static async UniTask<string> SendTaskAsync(
            this HTTPRequest request,
            CancellationToken cancellationToken = default(CancellationToken)
        )
        {
            try
            {
                await request.Send().ConfigureAwait(PlayerLoopTiming.Update, cancellationToken);
            }
            catch (OperationCanceledException)
            {
                request.Abort();
                throw;
            }

            switch (request.State)
            {
                case HTTPRequestStates.Finished:
                    var response = request.Response;

                    return (response.IsSuccess)
                        ? response.DataAsText
                        : throw new HttpProtocolException((HttpStatusCode) response.StatusCode, response);

                case HTTPRequestStates.Error:
                    throw request.Exception;
                case HTTPRequestStates.Aborted:
                    // NOTE: 外部からのAbort()呼び出しまたはHTTPManager.AbortAll()
                    throw new OperationCanceledException();
                case HTTPRequestStates.ConnectionTimedOut:
                case HTTPRequestStates.TimedOut:
                    throw new TimeoutException();
                default:
                    throw new ArgumentOutOfRangeException();
            }
        }
    }
}

public class HttpProtocolException : Exception
{
    public readonly HttpStatusCode StatusCode;
    public readonly HTTPResponse Response;

    public HttpProtocolException(HttpStatusCode statusCode, HTTPResponse response) : this(statusCode)
    {
        this.Response = response;
    }

    public HttpProtocolException(HttpStatusCode statusCode) : base($"{(int) statusCode} {statusCode}")
    {
        this.StatusCode = statusCode;
    }
}

SendAync()については購読のキャンセル以外でAbort()された場合はOperation~例外がスローされてしまうので、横着せずにObservableHTTPRequestラッパークラスを作るべきかもしれない。
 
UnityWebRequestも送信中のリクエストをAbort()でキャンセル可能なので、厳密に実装したいのであれば上記同様SendWebRequestAsync()を実装すると良さげ。

CancellationToken

キャンセル時のコールバック

Register(Action callback)メソッドでキャンセル時に呼び出されるコールバックを登録可能。

CancellationTokenSource       cts = new CancellationTokenSource();
CancellationToken             ct = cts.Token;
CancellationTokenRegistration registration = ct.Register(() => Debug.Log("Canceled!"));

cts.Cancel(); // "Canceled!"
registration.Dispose();  // コールバックを削除

ToUniTask()

ToUniTask()メソッドでCancellationTokenUniTaskに変換可能。これで得られたUniTaskは、

CancellationToken
 ・ すでにキャンセル済みだった場合、Operation~例外をスロー
 ・ 未キャンセルの場合、キャンセル時に実行完了

という挙動になる。

参考


  1. 単純に途中で処理を終わらせる場合も、通常と同じくreturnする 

  2. CancellationTokenSourceはreadonlyなCancellationTokenの状態を外から変更するためのもので、JSでいうところのDefferedとPromiseみたいな関係 

  3. ちなみにTimeoutWithoutException(),WithCancellationWihtoutException()という、キャンセルされたら例外を吐かずに待ちが完了するバージョンのメソッドもある 

  4. FromAsync()StartAsync()Defer()でラップしたメソッドで、再Subscribe時に都度処理が走ってほしい場合はFromAsync()、キャッシュされた実行結果がほしい場合はStartAsync()を使用