LoginSignup
2
5

More than 1 year has passed since last update.

MessagePipeのFilterでUniRxと同じような機能を作ってみる

Last updated at Posted at 2023-04-15

MessagePipeとは

MessagePipeはUniTaskで有名なCysharp様が提供しているライブラリの一つです。
UniRxと同系列の機能で、MessageのPub/Subを管理します。
開発者さまのブログに非常にわかりやすい説明があったので引用します

MessagePipeのIPublisher/ISusbcriberがRxのSubjectと異なるのは、OnErrorとOnCompletedが存在しないことです。
つまり、OnNextのみが存在するIObservableとみなせます。
このことによって「終わらないことの保証」と「エラーで購読が切れないことの保証」ができます。
イベントのハンドリングという観点では、OnError/OnCompletedにより購読が終了する可能性が存在することは、
再購読の必要性の判断が生じたりと、考慮事項が非常に増えます。
「終わらない」という、Rxよりも表現力の低い制約のある状態にすることで、扱いやすさを向上させています。
~『MessagePipe – .NET と Unityのためのハイパフォーマンスメッセージングライブラリ』 より

ただし、MessagePipeにはUniRxのオペレーターのような機能がついていません。
オペレーターのような機能を使うためには、Subscribe時にUniRxに変換するか、Filterを自作する必要があります。

この記事で説明したいこと

この記事では、UniRxのSelect,Where,ThrottleFirstのようなフィルターの実例を紹介します!
MessagePipeそのものの使い方については説明しませんが、前提としてフィルターそのものの使い方から説明していきます。

フィルターを使う方法

フィルターの実例を見る前に、フィルターの使い方について説明していきます!

フィルターを作る

まずはフィルターそのものを用意します。
フィルターは MessageHandlerFilter<TValue> を継承したクラスとして作成します!
例えば以下のようなクラスです。

    public class HogeHogeFilter<TValue> : MessageHandlerFilter<TValue>
    {
        public override void Handle(TValue message, Action<TValue> next)
        {
            //ここでメッセージの編集など
            next(message);
        )
    }

非同期版の場合は継承するクラスが少し異なります。
AsyncMessageHandlerFilter<TValue>を継承しますので、以下のようになります!

    public class HogeHogeAsyncFilter<TValue> : AsyncMessageHandlerFilter<TValue>
    {
        public override async UniTask HandleAsync
        (TValue message, CancellationToken cancellationToken, Func<TValue, CancellationToken, UniTask> next)
        {
            //ここでメッセージの編集など
            await next(message , cancellationToken);
        }
    }

フィルターを使う

次に、作ったフィルターをSubscribe時に使用します。
フィルターは複数登録することができるため、Subscibe時には配列で渡す仕様になっています。
以下のように第二引数として渡します!

    public class SamplePresenter : VContainer.Unity.IInitializable
    {
        // DIパターンでSubscriberをインジェクション
        private ISubscriber<uint> _sampleSubscriber;
        public SamplePresenter(ISubscriber<uint> sampleSubscriber)
        {
            _sampleSubscriber = sampleSubscriber;
        }

        // 初期化時にSubscribeする
        public void Initialize()
        {
            // 発行するMessageと型をあわせてFilterの配列を作る
            var sampleFilters = new MessageHandlerFilter<uint>[]
            {
                new HogeHogeFilter<uint>(),
                new FugaFugaFilter<uint>()
            };

            // Subscribe時に第二引数としてFiltersを渡す
            _sampleSubscriber.Subscribe(x => Console.WriteLine(x), sampleFilters).AddTo(_disposables);
        }
    }

非同期処理の場合は、IAsyncSubscriberをInjectし、
AsyncMessageHandlerFilterの配列を作って渡してください。

ContainerへのRegister時にフィルターを適用する方法もありますが、割愛します。
興味がある方は、MessagePipeOptionsを調べてみてください。

使い方について以上です!

Filterの例

それでは早速フィルターの例を挙げていきます!
以下のフィルター例を上記の方法で使うと、UniRxのオペレーターのような機能が実現できます!

Whereと同じ働きをするFilter

    public class WhereFilter<TValue> : MessageHandlerFilter<TValue>
    {
        private Func<TValue,bool> _predicate;
        
        public WhereFilter(Func<TValue,bool> predicate)
        {
            _predicate = predicate;
        }

        public override void Handle(TValue message, Action<TValue> next)
        {
            if(!_predicate(message)) return;
            next(message );
        }
    }

    //作成例 messageとして入ってきたuintが1の場合のみ処理
    new WhereFilter<uint>(x => x == 1);

Whereと同じ働きをするFilter(非同期版)

    public class WhereAsyncFilter<TValue> : AsyncMessageHandlerFilter<TValue>
    {
        private Func<TValue,bool> _predicate;
        
        public WhereAsyncFilter(Func<TValue,bool> predicate)
        {
            _predicate = predicate;
        }
        
        public override async UniTask HandleAsync
        (TValue message, CancellationToken cancellationToken, Func<TValue, CancellationToken, UniTask> next)
        {
            if(!_predicate(message)) return;
            await next(message , cancellationToken);
        }
    }

    //作成例 messageとして入ってきたuintが1の場合のみ処理
    new WhereAsyncFilter<uint>(x => x == 1);

Selectと同じ働きをするFilter

    public class SelectFilter<TValue> : MessageHandlerFilter<TValue>
    {
        private Func<TValue,TValue> _selectFunc;
        public SelectFilter(Func<TValue,TValue> selectFunc)
        {
            _selectFunc = selectFunc;
        }
        
        public override void Handle(TValue message, Action<TValue> next)
        {
            var selected = _selectFunc(message);
            next(selected);
        }
    }
    
    //作成例 message に2を加算してから処理する
    new SelectFilter<uint>(x=> x + 2);

Selectと同じ働きをするFilter(非同期版)

    public class SelectAsyncFilter<TValue> : AsyncMessageHandlerFilter<TValue>
    {
        private Func<TValue,TValue> _selectFunc;
        public SelectAsyncFilter(Func<TValue,TValue> selectFunc)
        {
            _selectFunc = selectFunc;
        }
        
        public override async UniTask HandleAsync
        (TValue message, CancellationToken cancellationToken, Func<TValue, CancellationToken, UniTask> next)
        {
            var selected = _selectFunc(message);
            await next(selected,cancellationToken);
        }
    }

    //作成例 message に2を加算してから処理する
    new SelectAsyncFilter<uint>(x=> x + 2);

ThrottleFirstと同じ働きをするFilter

    public class ThrottleFirstFilter<TValue> : MessageHandlerFilter<TValue>
    {
        private readonly TimeSpan _interval;
        private readonly object _lock = new object();
        private DateTime _lastInvocation;

        public ThrottleFirstFilter(TimeSpan interval)
        {
            _interval = interval;
            _lastInvocation = DateTime.MinValue;
        }

        public override void Handle(TValue message, Action<TValue> next)
        {
            lock (_lock)
            {
                var now = DateTime.UtcNow;
                if ((now - _lastInvocation) >= _interval)
                {
                    _lastInvocation = now;
                    next(message);
                }
            }
        }
    }

    //作成例 一度メッセージが発光されたら0.1秒以内の追加発行を無視するフィルター
    new ThrottleFirstFilter<uint>(TimeSpan.FromSeconds(0.1))

ThrottleFirstと同じ働きをするFilter(非同期版)

    public class ThrottleFirstAsyncFilter<TValue> : AsyncMessageHandlerFilter<TValue>
    {
        private readonly TimeSpan _interval;
        private readonly SemaphoreSlim _semaphore = new (1, 1);
        private DateTime _lastInvocation;

        public ThrottleFirstAsyncFilter(TimeSpan interval)
        {
            _interval = interval;
            _lastInvocation = DateTime.MinValue;
        }
        
        public override async UniTask HandleAsync
        (TValue message, CancellationToken cancellationToken, Func<TValue, CancellationToken, UniTask> next)
        {
            await _semaphore.WaitAsync();

            try
            {
                var now = DateTime.UtcNow;
                if ((now - _lastInvocation) >= _interval)
                {
                    _lastInvocation = now;
                    await next(message,cancellationToken);
                }
            }
            finally
            {
                _semaphore.Release();
            }
            
        }
    }

    //作成例 一度メッセージが発光されたら0.1秒以内の追加発行を無視するフィルター
    new ThrottleFirstAsyncFilter<uint>(TimeSpan.FromSeconds(0.1))

おわりに

自分がMessagePipeの乗り換えを検討するときに、
一番気になったのがFilter周りだったので、そこにフォーカスした記事を書いてみました。

MessagePipeはDIとUniTaskが前提のライブラリで、学習コストは高めですが、
非常に強力で柔軟なPub/Subライブラリです。
利用を検討している方の一助になりましたら幸いです!

他にもFilterを作ったら追記して行こうと思います。

2
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
5