はじめに
非同期メソッドにCancellationTokenSource
からCancellationToken
を渡して実行し、イベントの発生時にCancellationTokenSource
をキャンセルすればいいわけですが、OnDestroy
以外だと面倒ですよね。😥
こういうの.cs
Subject<Unit> _onDead;
public IObservable<Unit> OnDead => _onDead;
CancellationTokenSource _cancelOnDead; // これをOnDeadでキャンセルしないといけない
CancellationToken CancellationTokenOnDead => _cancelOnDead.Token;
そこで、GetCancellationTokenOnDestroy()
みたいなのが全部のイベントにあれば楽に実装できるのでは!?
こう書ける説.cs
Subject<Unit> _onDead;
public IObservable<Unit> OnDead => _onDead;
CancellationToken CancellationTokenOnDead => OnDead.GetCancellationToken();
というわけでIObservable<T>
に拡張メソッドGetCancellationToken()
を生やしてみました。
拡張メソッド
public static class ObservableExtensions
{
/// <summary>
/// OnNextかOnCompletedかOnErrorでキャンセルされるCancellationTokenを返す。
/// TakeUntilかMergeで複数のイベントを指定できる。
/// </summary>
public static CancellationToken GetCancellationToken<T>(this IObservable<T> source)
{
var cts = new CancellationTokenSource();
source.FirstOrDefault().SubscribeWithState(cts, (_, cts) => cts.Cancel(), (_, cts) => cts.Cancel(), cts => cts.Cancel());
return cts.Token;
}
}
使い方
// こういう非同期メソッドがあるとして
async UniTask SomeAsync(int value, CancellationToken ct)
{
await UniTask.Yield(ct);
}
Subject<int> _onHoge = new();
public IObservable<int> OnHoge => _onHoge;
// 「OnHogeに値が流れたときにキャンセルされるトークン」を渡してSomeAsyncを実行
SomeAsync(0, OnHoge.GetCancellationToken());
// OnHogeに値が流れるたび、「流れてきた値」と「次にOnHogeに値が流れたときにキャンセルされるトークン」を渡してSomeAsyncを実行
OnHoge.Subscribe(x => SomeAsync(x, OnHoge.GetCancellationToken());
GCが😰
GetCancellationToken()
を呼ぶたびにCancellationTokenSource
とFirstOrDefault
とActionObserver
とAction
3つをnew
しちゃう😰
キャッシュする版
というわけでキャッシュ😣
using System;
using System.Collections.Generic;
using System.Threading;
using UniRx;
public static class GetCancellationToken
{
static Dictionary<object, CancellationTokenSource> cache = new();
/// <summary>
/// 呼ぶとストリームが終了するまで購読が残り続けるので、FromEventなどの終わらないストリームから取得する時はTakeUntilなどを挟む必要がある。
/// </summary>
public static CancellationToken GetCancellationTokenWithCache<T>(this IObservable<T> source)
{
CancellationTokenSource cts;
if (cache.TryGetValue(source, out cts))
{
if (cts.IsCancellationRequested)
{
cache[source] = cts = new();
}
return cts.Token;
}
cts = new();
cache.Add(source, cts);
source.SubscribeWithState(source,
(_, source) =>
{
cache[source].Cancel();
},
(_, source) =>
{
cache[source].Cancel();
cache.Remove(source);
},
(source) =>
{
cache[source].Cancel();
cache.Remove(source);
});
return cts.Token;
}
}
先ほどとの違い
- 同じObservableに対して呼ばれたらキャッシュからトークンを返す
- OnNextされても購読は解除しない
終わらないストリームに使うとメモリリークしますが、使い方は同じです。
Dictionary舐めるの効率悪くない?😰
というわけで新しいインターフェイスIObservable_<T>
とクラスSubject_<T>
IObservable_.cs
using System;
namespace System.Threading
{
public interface IObservable_<T> : IObservable<T>
{
CancellationToken GetCancellationToken();
}
}
Subject_.cs
using System;
using System.Threading;
namespace UniRx
{
/// <summary>
/// GetCancellationToken()を低コストで行えるSubject<T>。
/// </summary>
public class Subject_<T> : ISubject<T>, IDisposable, IOptimizedObservable<T>, IObservable_<T>
{
readonly Subject<T> subject = new();
CancellationTokenSource cts = new();
bool getCancellationTokenCalled;
public CancellationToken GetCancellationToken()
{
getCancellationTokenCalled = true;
return cts.Token;
}
public void OnNext(T value)
{
if (getCancellationTokenCalled && !cts.IsCancellationRequested)
{
cts.Cancel();
cts = new();
getCancellationTokenCalled = false;
}
subject.OnNext(value);
}
public void OnCompleted()
{
subject.OnCompleted();
cts.Cancel();
}
public void OnError(Exception error)
{
subject.OnError(error);
}
public IDisposable Subscribe(IObserver<T> observer)
{
return subject.Subscribe(observer);
}
public void Dispose()
{
subject.Dispose();
cts.Cancel();
}
public bool IsRequiredSubscribeOnCurrentThread()
{
return false;
}
}
}
使い方
readonly Subject_<int> _onHoge = new();
public IObservable_<int> OnHoge => _onHoge;
// 以下は先ほどと同じです。
// 「OnHogeに値が流れたときにキャンセルされるトークン」を渡してSomeAsyncを実行
SomeAsync(0, OnHoge.GetCancellationToken());
// OnHogeに値が流れるたび、「流れてきた値」と「次にOnHogeに値が流れたときにキャンセルされるトークン」を渡してSomeAsyncを実行
OnHoge.Subscribe(x => SomeAsync(x, OnHoge.GetCancellationToken());
おわりに
こんな方法もあるみたいです
みんなどうしてるのかな?