はじめに
手元の UniRx をアップデートしてUniRx.Asyncを導入。
UniTask
を触ってみたがキャンセルが辛かったので、少しでも楽をするためにいろいろ調べた。
UniTask
のキャンセル
そもそもキャンセルというのは「通常のシーケンスとは異なる割り込み(異常)が発生した結果、以後の処理を行わない」と言い換えられる。
と考えるとasync/awaitなしのコードと同様、適切なタイミングで例外をスローすることによって実現できる1。
キャンセルにより処理が止まったという事実はOperationCanceledException
をスローすることで表現し、必要があればtry~catch
で捕捉して破棄処理などを実行する。
外からキャンセル
実行中のUniTask
インスタンスは、そのメソッド呼び出しなどによって外部から処理をキャンセルすることはできない。
では外からキャンセルできないのかというとそうではなく、次のような手順になる。
〜UniTask
の外側〜
1. CancellationTokenSourceのインスタンス(以下cts
)を生成
2. UniTask実行時にCancellationTokenのインスタンスを渡す(cts.Token
で得られる。以下ct
)2
3. キャンセルしたいタイミングでcts.Cancel()
を呼び出す
〜UniTask
の内側〜
キャンセルを受け付けるタイミングでキャンセルが要求されているかct
を見て確認し、Operation~
例外をスローする
using System;
using System.Threading;
using UniRx.Async;
using UnityEngine;
public class OuterCancel : MonoBehaviour
{
private CancellationTokenSource _cancellationTokenSource;
void Start()
{
// 1. CancellationTokenSourceのインスタンスを生成
_cancellationTokenSource = new CancellationTokenSource();
// 2. UniTask実行時にCancellationTokenのインスタンスを渡す
CancelableAsync(_cancellationTokenSource.Token).Forget(e => { }); // 呼び出し階層のトップでOperationCanceledExceptionを握りつぶす
}
void Update()
{
if (Input.anyKeyDown)
{
// 3. CancellationTokenSourceのCancel()を呼び出す
_cancellationTokenSource.Cancel();
}
}
async UniTask CancelableAsync(CancellationToken cancellationToken = default(CancellationToken))
{
await UniTask.Delay(TimeSpan.FromSeconds(1));
Debug.Log("waited 1sec");
if (cancellationToken.IsCancellationRequested)
{
// a.
// キャンセルされていたらOperationCanceledExceptionをスロー
throw new OperationCanceledException(cancellationToken);
}
await UniTask.Delay(TimeSpan.FromSeconds(2));
Debug.Log("waited 2sec");
// b.
// キャンセルされていたらOperationCanceledExceptionをスロー
cancellationToken.ThrowIfCancellationRequested();
// c.
// キャンセルされていたらOperationCanceledExceptionをスロー
await UniTask.Delay(TimeSpan.FromSeconds(3), cancellationToken: cancellationToken);
Debug.Log("waited 3sec");
try
{
await UniTask.Delay(TimeSpan.FromSeconds(5), cancellationToken: cancellationToken);
}
catch (OperationCanceledException)
{
// キャンセル時に実行したい処理があればtry~catchで捕捉
Debug.LogWarning("Canceled!");
throw;
}
Debug.LogWarning("Done!");
}
}
通常CancellationToken
を渡されたメソッドはそのCancelltionToken
キャンセル時にOperation~
例外を吐くように実装されており(後述)、自分が実装する場合もそうすべき。
なのでなるべく c. のようにUniTask
実行時にCancellationToken
を渡すようにして、途中で伝播が止まらないように呼び出し階層のトップでOperation~
例外を握りつぶす。
また、a,bの記述は次のような場合などに使用する。
-
await
とawait
の間で時間がかかる処理をしていて、その間キャンセルを受け付けたい -
UniTask
を返すメソッドが引数でCancellationToken
を受け取るようになっていない
* * *
引数でCancallationToken
を受け取るようにするとシグネチャが長くなってしまうのが悩ましい。受け取らなくてもいいようにデフォルト引数を指定するとさらに長くなる(辛い)。
Rxの購読キャンセルはオペレータの合間で処理が切れてくれたが、Task
/UniTask
の場合はどこで停止すべきかを自分で決め、CancellationToken
を呼び出しの奥深くまで脈々と受け継いでいく形になる(辛い)。
UniRx.Asyncに実装されているメソッドのキャンセル
引数でCancellationToken
を受け取るメソッド(引数省略)
-
UniTask.Yield()
... 特定のUnityイベントのタイミングまで待つ -
UniTask.Delay()
... 指定時間経過するまで待つ -
UniTask.DelayFrame()
... 指定フレーム数経過するまで待つ -
UniTask.WaitUntil()
... 引数に与えたFunc<bool>
がtrueを返すまで待つ -
UniTask.WaitWhile()
... 引数に与えたFunc<bool>
がfalseを返すまで待つ
などは、渡したCancellationToken
がキャンセルされるとOperation~
例外をスローし、停止する。
また、UniTask
のインスタンスに拡張メソッドとして生えているメソッド(引数省略)
Timeout()
WithCancellation()
も引数で渡したCancellationToken
がキャンセルされるとOperation~
例外やTimeoutException
をスローするので以後の処理は実行されなくなる3が、これらはレシーバの UniTask を停止するわけではない。
大事なことなので繰り返す。
これらはレシーバの UniTask を停止するわけではない。
なので、これらを副作用を含むUniTask
に対して呼び出してはいけない。副作用を含むUniTask
のキャンセル(停止)は、必ずUniTask
を返すメソッド呼び出し時にCancellationToken
を渡すことで実現する。
WithCancellation()
などは主にAsync~Trigger
やObservable~Trigger
などのイベント発行待ちのキャンセル用に使用する。
なお、この挙動は次のバージョンで改善される様子。
今また試して確かめたけど、.WithCancellation(cancellationToken)はCTがキャンセルされたら例外を吐くだけで、レシーバのUniTaskをキャンセル/停止するわけじゃない。
— su10@Shoot.io (@su10_dev) 2018年8月6日
これはかなり直感に反してて、UniTask noob達はこの仕様の厳しい洗礼を受けることと思う🤗
なので次のバージョンからは、CancellationTokenが「外側から」セットできる仕様になりました(ので止まります) https://t.co/W6eckEn2E8
— neuecc (@neuecc) 2018年8月6日
IObservable
のawait
キャンセル
GetAwaiter()
もしくはToUniTask()
にCancellationToken
を渡す。
// キャンセルされたら購読を停止してOperation~例外をスロー
await fooObservable.GetAwaiter(cancellationToken);
await barObservable.ToUniTask(cancellationToken);
WithCancellation()
などとは異なり、ちゃんと購読を破棄してくれる。
キャンセルを考慮したUniTask
→IObservable
変換
UniTask
→IObservable
への変換にはToObservable()
が使えるが、これだとストリームの購読キャンセルをUniTask
に伝えることができない。
購読のキャンセルがUniTask
に伝わるようにIObservable
変換するには、Observable.StartAsync()
もしくはObservable.FromAsync()
を使用する4。
これらは現状のUniRx.Asyncで実装されていないので、.NETのv2.2.4の実装を参考に自分で実装してみた。
// SEE: https://github.com/dotnet/reactive/blob/v2.2.4/Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Async.cs
using System;
using System.Threading;
using UniRx.Async;
namespace UniRx
{
partial class Observable
{
public static IObservable<TResult> StartAsync<TResult>(Func<UniTask<TResult>> functionAsync)
{
if (functionAsync == null)
throw new ArgumentNullException("functionAsync");
var task = default(UniTask<TResult>);
try
{
task = functionAsync();
}
catch (Exception exception)
{
return Throw<TResult>(exception);
}
return task.ToObservable();
}
public static IObservable<TResult> StartAsync<TResult>(Func<CancellationToken, UniTask<TResult>> functionAsync)
{
if (functionAsync == null)
throw new ArgumentNullException("functionAsync");
var cancellable = new CancellationDisposable();
var task = default(UniTask<TResult>);
try
{
task = functionAsync(cancellable.Token);
}
catch (Exception exception)
{
return Throw<TResult>(exception);
}
var result = task.ToObservable();
return Create<TResult>(observer =>
{
var subscription = result.Subscribe(observer);
return new CompositeDisposable(cancellable, subscription);
});
}
public static IObservable<Unit> StartAsync(Func<UniTask> actionAsync)
{
if (actionAsync == null)
throw new ArgumentNullException("actionAsync");
var task = default(UniTask);
try
{
task = actionAsync();
}
catch (Exception exception)
{
return Throw<Unit>(exception);
}
return task.ToObservable();
}
public static IObservable<Unit> StartAsync(Func<CancellationToken, UniTask> actionAsync)
{
if (actionAsync == null)
throw new ArgumentNullException("actionAsync");
var cancellable = new CancellationDisposable();
var task = default(UniTask);
try
{
task = actionAsync(cancellable.Token);
}
catch (Exception exception)
{
return Throw<Unit>(exception);
}
var result = task.ToObservable();
return Create<Unit>(observer =>
{
var subscription = result.Subscribe(observer);
return new CompositeDisposable(cancellable, subscription);
});
}
public static IObservable<TResult> FromAsync<TResult>(Func<UniTask<TResult>> functionAsync)
{
return Defer(() => StartAsync(functionAsync));
}
public static IObservable<TResult> FromAsync<TResult>(Func<CancellationToken, UniTask<TResult>> functionAsync)
{
return Defer(() => StartAsync(functionAsync));
}
public static IObservable<Unit> FromAsync(Func<UniTask> actionAsync)
{
return Defer(() => StartAsync(actionAsync));
}
public static IObservable<Unit> FromAsync(Func<CancellationToken, UniTask> actionAsync)
{
return Defer(() => StartAsync(actionAsync));
}
}
}
BestHTTPのHTTPRequestをUniTask
で
BestHTTPのHTTPRequest
クラスに、UniTask<string>
を返すSendTaskAsync()
を拡張メソッドで追加してみた。HTTPRequest
によるHTTP通信はAbort()
による途中キャンセルが可能なのでキャンセルにも対応。
ついでにObservable.FromAsync()
を使ってIObservable<string>
を返すSendAsync()
も実装。
using System;
using System.Net;
using System.Threading;
using BestHTTP;
namespace UniRx.Async
{
public static class HTTPRequestExtensions
{
public static IObservable<string> SendAsync(this HTTPRequest request)
{
return Observable.FromAsync(cancel => request.SendTaskAsync(cancel));
}
public static async UniTask<string> SendTaskAsync(
this HTTPRequest request,
CancellationToken cancellationToken = default(CancellationToken)
)
{
try
{
await request.Send().ConfigureAwait(PlayerLoopTiming.Update, cancellationToken);
}
catch (OperationCanceledException)
{
request.Abort();
throw;
}
switch (request.State)
{
case HTTPRequestStates.Finished:
var response = request.Response;
return (response.IsSuccess)
? response.DataAsText
: throw new HttpProtocolException((HttpStatusCode) response.StatusCode, response);
case HTTPRequestStates.Error:
throw request.Exception;
case HTTPRequestStates.Aborted:
// NOTE: 外部からのAbort()呼び出しまたはHTTPManager.AbortAll()
throw new OperationCanceledException();
case HTTPRequestStates.ConnectionTimedOut:
case HTTPRequestStates.TimedOut:
throw new TimeoutException();
default:
throw new ArgumentOutOfRangeException();
}
}
}
}
public class HttpProtocolException : Exception
{
public readonly HttpStatusCode StatusCode;
public readonly HTTPResponse Response;
public HttpProtocolException(HttpStatusCode statusCode, HTTPResponse response) : this(statusCode)
{
this.Response = response;
}
public HttpProtocolException(HttpStatusCode statusCode) : base($"{(int) statusCode} {statusCode}")
{
this.StatusCode = statusCode;
}
}
SendAync()
については購読のキャンセル以外でAbort()
された場合はOperation~
例外がスローされてしまうので、横着せずにObservableHTTPRequest
ラッパークラスを作るべきかもしれない。
UnityWebRequest
も送信中のリクエストをAbort()
でキャンセル可能なので、厳密に実装したいのであれば上記同様SendWebRequestAsync()
を実装すると良さげ。
CancellationToken
キャンセル時のコールバック
Register(Action callback)
メソッドでキャンセル時に呼び出されるコールバックを登録可能。
CancellationTokenSource cts = new CancellationTokenSource();
CancellationToken ct = cts.Token;
CancellationTokenRegistration registration = ct.Register(() => Debug.Log("Canceled!"));
cts.Cancel(); // "Canceled!"
registration.Dispose(); // コールバックを削除
ToUniTask()
ToUniTask()
メソッドでCancellationToken
をUniTask
に変換可能。これで得られたUniTask
は、
CancellationToken
が
・ すでにキャンセル済みだった場合、Operation~
例外をスロー
・ 未キャンセルの場合、キャンセル時に実行完了
という挙動になる。
参考
-
単純に途中で処理を終わらせる場合も、通常と同じく
return
する ↩ -
CancellationTokenSource
はreadonlyなCancellationToken
の状態を外から変更するためのもので、JSでいうところのDeferredとPromiseみたいな関係 ↩ -
ちなみに
TimeoutWithoutException()
,WithCancellationWihtoutException()
という、キャンセルされたら例外を吐かずに待ちが完了するバージョンのメソッドもある ↩ -
FromAsync()
はStartAsync()
をDefer()
でラップしたメソッドで、再Subscribe
時に都度処理が走ってほしい場合はFromAsync()
、キャッシュされた実行結果がほしい場合はStartAsync()
を使用 ↩