LoginSignup
0
0

More than 1 year has passed since last update.

イベント発生時にキャンセルされる非同期処理の実装

Last updated at Posted at 2022-11-26

はじめに

非同期メソッドにCancellationTokenSourceからCancellationTokenを渡して実行し、イベントの発生時にCancellationTokenSourceをキャンセルすればいいわけですが、OnDestroy以外だと面倒ですよね。😥

こういうの.cs
Subject<Unit> _onDead;
public IObservable<Unit> OnDead => _onDead;
CancellationTokenSource _cancelOnDead; // これをOnDeadでキャンセルしないといけない
CancellationToken CancellationTokenOnDead => _cancelOnDead.Token;

そこで、GetCancellationTokenOnDestroy()みたいなのが全部のイベントにあれば楽に実装できるのでは!?

こう書ける説.cs
Subject<Unit> _onDead;
public IObservable<Unit> OnDead => _onDead;
CancellationToken CancellationTokenOnDead => OnDead.GetCancellationToken();

というわけでIObservable<T>に拡張メソッドGetCancellationToken()を生やしてみました。

拡張メソッド

public static class ObservableExtensions
{
    /// <summary>
    /// OnNextかOnCompletedかOnErrorでキャンセルされるCancellationTokenを返す。
    /// TakeUntilかMergeで複数のイベントを指定できる。
    /// </summary>
    public static CancellationToken GetCancellationToken<T>(this IObservable<T> source)
    {
        var cts = new CancellationTokenSource();
        source.FirstOrDefault().SubscribeWithState(cts, (_, cts) => cts.Cancel(), (_, cts) => cts.Cancel(), cts => cts.Cancel());
        return cts.Token;
    }
}

使い方

// こういう非同期メソッドがあるとして
async UniTask SomeAsync(int value, CancellationToken ct)
{
    await UniTask.Yield(ct);
}

Subject<int> _onHoge = new();
public IObservable<int> OnHoge => _onHoge;

// 「OnHogeに値が流れたときにキャンセルされるトークン」を渡してSomeAsyncを実行
SomeAsync(0, OnHoge.GetCancellationToken());

// OnHogeに値が流れるたび、「流れてきた値」と「次にOnHogeに値が流れたときにキャンセルされるトークン」を渡してSomeAsyncを実行
OnHoge.Subscribe(x => SomeAsync(x, OnHoge.GetCancellationToken());

GCが😰

GetCancellationToken()を呼ぶたびにCancellationTokenSourceFirstOrDefaultActionObserverAction3つをnewしちゃう😰

キャッシュする版

というわけでキャッシュ😣

using System;
using System.Collections.Generic;
using System.Threading;
using UniRx;

public static class GetCancellationToken
{
    static Dictionary<object, CancellationTokenSource> cache = new();
    /// <summary>
    /// 呼ぶとストリームが終了するまで購読が残り続けるので、FromEventなどの終わらないストリームから取得する時はTakeUntilなどを挟む必要がある。
    /// </summary>
    public static CancellationToken GetCancellationTokenWithCache<T>(this IObservable<T> source)
    {
        CancellationTokenSource cts;
        if (cache.TryGetValue(source, out cts))
        {
            if (cts.IsCancellationRequested)
            {
                cache[source] = cts = new();
            }
            return cts.Token;
        }
        cts = new();
        cache.Add(source, cts);
        source.SubscribeWithState(source,
            (_, source) =>
            {
                cache[source].Cancel();
                },
            (_, source) =>
            {
                cache[source].Cancel();
                cache.Remove(source);
            },
            (source) =>
            {
                cache[source].Cancel();
                cache.Remove(source);
            });
        return cts.Token;
    }
}

先ほどとの違い

  • 同じObservableに対して呼ばれたらキャッシュからトークンを返す
  • OnNextされても購読は解除しない

終わらないストリームに使うとメモリリークしますが、使い方は同じです。

Dictionary舐めるの効率悪くない?😰

というわけで新しいインターフェイスIObservable_<T>とクラスSubject_<T>

IObservable_.cs
using System;

namespace System.Threading
{
    public interface IObservable_<T> : IObservable<T>
    {
        CancellationToken GetCancellationToken();
    }
}
Subject_.cs
using System;
using System.Threading;

namespace UniRx
{
    /// <summary>
    /// GetCancellationToken()を低コストで行えるSubject<T>。
    /// </summary>
    public class Subject_<T> : ISubject<T>, IDisposable, IOptimizedObservable<T>, IObservable_<T>
    {
        readonly Subject<T> subject = new();
        CancellationTokenSource cts = new();
        bool getCancellationTokenCalled;

        public CancellationToken GetCancellationToken()
        {
            getCancellationTokenCalled = true;
            return cts.Token;
        }

        public void OnNext(T value)
        {
            if (getCancellationTokenCalled && !cts.IsCancellationRequested)
            {
                cts.Cancel();
                cts = new();
                getCancellationTokenCalled = false;
            }
            subject.OnNext(value);
        }

        public void OnCompleted()
        {
            subject.OnCompleted();
            cts.Cancel();
        }

        public void OnError(Exception error)
        {
            subject.OnError(error);
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            return subject.Subscribe(observer);
        }

        public void Dispose()
        {
            subject.Dispose();
            cts.Cancel();
        }

        public bool IsRequiredSubscribeOnCurrentThread()
        {
            return false;
        }
    }
}

使い方

readonly Subject_<int> _onHoge = new();
public IObservable_<int> OnHoge => _onHoge;

// 以下は先ほどと同じです。

// 「OnHogeに値が流れたときにキャンセルされるトークン」を渡してSomeAsyncを実行
SomeAsync(0, OnHoge.GetCancellationToken());

// OnHogeに値が流れるたび、「流れてきた値」と「次にOnHogeに値が流れたときにキャンセルされるトークン」を渡してSomeAsyncを実行
OnHoge.Subscribe(x => SomeAsync(x, OnHoge.GetCancellationToken());

おわりに

こんな方法もあるみたいです

みんなどうしてるのかな?

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0