33
18

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【Unity】MessagePipeを触ってみる2 ~さまざまなIPublisher/ISubscriber編~

Last updated at Posted at 2022-10-08

この記事は続きです。前回はこちら。

【Unity】MessagePipeを触ってみる1 ~概要・導入編~

この記事ではVContainerの使用を前提としています。

記事に登場するコード類はこちらのリポジトリで公開しています。


さまざまなIPublisher/ISubscriber

MessagePipeではIPublisher<T>/ISubscriber<T>を用いてメッセージの送信と受信を行います(長いのでIPub/ISubと略します)。

このIPub/ISubですがいくつかバリエーションが用意されており、それぞれで挙動が異なります。
今回はそのバリエーションを紹介します。

IPublisher<T>/ISubscriber<T>

概要

まずはもっともベーシックなものです。
指定した型Tのデータを同期でそのまま即時伝達を行います。

Normal.jpg

送信用データ
using System;

namespace MessagePipeSample.PubSub.Normal
{
    /// <summary>
    /// 送信データ(適当に定義した型)
    /// </summary>
    public readonly struct SendData : IEquatable<SendData>
    {
        public int Id { get; }
        public int Value { get; }

        public SendData(int id, int value)
        {
            Id = id;
            Value = value;
        }
        
        public bool Equals(SendData other)
        {
            return Id == other.Id && Value == other.Value;
        }

        public override bool Equals(object obj)
        {
            return obj is SendData other && Equals(other);
        }

        public override int GetHashCode()
        {
            return HashCode.Combine(Id, Value);
        }
    }
}
送受信を行うクラス
using System;
using MessagePipe;
using UnityEngine;
using VContainer.Unity;

namespace MessagePipeSample.PubSub.Normal
{
    public class ReadWriteSample1 : IInitializable, IDisposable, ITickable
    {
        private readonly IPublisher<SendData> _publisher;
        private readonly ISubscriber<SendData> _subscriber;
        private IDisposable _disposable;

        // それぞれをDIしてもらう
        public ReadWriteSample1(
            IPublisher<SendData> publisher,
            ISubscriber<SendData> subscriber)
        {
            _publisher = publisher;
            _subscriber = subscriber;
        }

        public void Initialize()
        {
            // 同じクラス内でPub/Subするのはナンセンスだけど、
            // 使い方のサンプルということでゆるして

            // SendDataを購読し、受信したらログにだす
            _disposable = _subscriber.Subscribe(data =>
            {
                // 受信データのうち、Idが一致するもののみ処理する
                if (data.Id == 0)
                {
                    Debug.Log($"Received: Id={data.Id} Value={data.Value}");
                }
            });
        }

        public void Tick()
        {
            // 毎フレーム送信する
            _publisher.Publish(new SendData(id: 0, UnityEngine.Random.Range(0, 100)));
        }

        public void Dispose()
        {
            // 購読中止
            _disposable?.Dispose();
        }
    }
}
DI
using MessagePipe;
using VContainer;
using VContainer.Unity;

namespace MessagePipeSample.PubSub.Normal
{
    public class GameLifetimeScope : LifetimeScope
    {

        protected override void Configure(IContainerBuilder builder)
        {
            // MessagePipeの設定
            var options = builder.RegisterMessagePipe();

            // SendDataを伝達できるように設定する
            builder.RegisterMessageBroker<SendData>(options);

            // ReadWriteSample1を起動
            builder.RegisterEntryPoint<ReadWriteSample1>(Lifetime.Singleton);
            
        }
    }
}

実行結果

毎フレーム、ログが繰り返し表示され続けます。

Received: Id=0 Value=2
Received: Id=0 Value=59
Received: Id=0 Value=21
Received: Id=0 Value=86
...(以下実行を停止するまで続く)

IPublisher<T>/ISubscriber<T>はすごくシンプルに、Publish()されたときのデータを即時・同期ですべてのSubscriberへと伝達します。これにより「非常にシンプルな形でのメッセージのグローバル配信」が可能となります。

考察:型が同じだが用途が異なる場合に、Pub/Subを別けて扱えるのか

IPublisher<T>/ISubscriber<T>は型を指定して送受信はできるが、これを「グループ分けして使えるのか」という疑問です。
(「グループAにはそれ専用のIPub/ISub」を、「グループBにはそれ専用のそれを」といった区別ができるのか)

結論:IPublisher<TKey, T>/ISubscriber<TKey, T>を使おう

IPublisher<TKey, T>/ISubscriber<TKey, T>という、Keyを指定して送受信できるものが用意されているのでこちらを使いましょう。

使い方については後述します。

別解A: データの中に識別子を入れる

何らかの理由でIPublisher<TKey, T>/ISubscriber<TKey, T>が使えない場合にどうするか。簡単なやり方としては、送信するデータの中に識別用の何かを入れて、受信側でハンドリングするというやり方です。

受け取ってからフィルタリング
_disposable = _subscriber.Subscribe(data =>
{
    // 受信データのうち、Idが自分と一致するもののみ処理する
    if (data.Id == _myId)
    {
        Debug.Log($"Received: Id={data.Id} Value={data.Value}");
    }
});

この場合はSubscriber側ですべてのデータを受けて、そこから不要なデータは捨てるという挙動になります。そのためSubscriberの数が膨大な場合は無駄な負荷が発生します。

別解B: 用途ごとに型を変える

前提からひっくり返すパターン。しかし設計上、「別の意味・用途をもつデータ構造なら概念ごと別ける」というは重要だったりします。なので場合によってはこれが最適解のパターンになることもありそう。

型をそもそも別ける
// 用途が違うならそもそも型から別けてしまうべき
builder.RegisterMessageBroker<SendData1>(options);
builder.RegisterMessageBroker<SendData2>(options);

IPublisher<TKey, T>/ISubscriber<TKey, T>

概要

挙動はIPublisher<T>/ISubscriber<T>とだいたい同じなのですが、TKeyを指定して送受信のグルーピングが可能になっています。

Publish時に指定したTKeyと同じ値でSubscribeしている対象にのみメッセージが届くように、MessageBrokerの内部でフィルタリングが実行されます。このため、関係ないSubscriberに不必要にデータを送ることがなくなり負荷軽減にも繋がります。

Key.jpg
(同じインタフェースを用いてPub/Subするが…)

Key2.jpg
MessageBrokerの内部でグループ分けされるため、異なるTKeySubscriberにはメッセージ送信はされない)

なお内部実装的にはTKeyをKeyとしたDictionaryが用いられています。そのためTKeyとして扱うオブジェクトには適切にGetHashCodeを実装することを忘れないでください。

Key
using System;

namespace MessagePipeSample.PubSub.Key
{
    /// <summary>
    /// 識別子
    /// </summary>
    public readonly struct KeyData : IEquatable<KeyData>
    {
        public int _raw { get; }

        public KeyData(int raw)
        {
            _raw = raw;
        }

        public bool Equals(KeyData other)
        {
            return _raw == other._raw;
        }

        public override bool Equals(object obj)
        {
            return obj is KeyData other && Equals(other);
        }

        public override int GetHashCode()
        {
            return _raw;
        }
    }
}
メッセージ本体
using System;

namespace MessagePipeSample.PubSub.Key
{
    /// <summary>
    /// 送信データ(適当に定義した型)
    /// </summary>
    public readonly struct SendData : IEquatable<SendData>
    {
        public string Value { get; }

        public SendData(string value)
        {
            Value = value;
        }

        public bool Equals(SendData other)
        {
            return Value == other.Value;
        }

        public override bool Equals(object obj)
        {
            return obj is SendData other && Equals(other);
        }

        public override int GetHashCode()
        {
            return (Value != null ? Value.GetHashCode() : 0);
        }
    }
}
送信側
using MessagePipe;
using VContainer.Unity;

namespace MessagePipeSample.PubSub.Key
{
    /// <summary>
    /// 順繰りにデータを送る
    /// </summary>
    public sealed class RoundRobinSender : ITickable
    {
        private readonly IPublisher<KeyData, SendData> _publisher;

        private int _roundRobinIndex;

        public RoundRobinSender(IPublisher<KeyData, SendData> publisher)
        {
            _publisher = publisher;
        }

        public void Tick()
        {
            // 0,1,2 を順繰りにデータ送信する
            _roundRobinIndex = (_roundRobinIndex + 1) % 3;

            var key = new KeyData(_roundRobinIndex);
            var sendData = new SendData($"Key={_roundRobinIndex},Value={UnityEngine.Random.Range(0, 100)}");

            // Keyを指定して送信
            _publisher.Publish(key, sendData);
        }
    }
}
受信側
using System;
using MessagePipe;
using UnityEngine;
using VContainer.Unity;

namespace MessagePipeSample.PubSub.Key
{
    public sealed class Receiver : IInitializable, IDisposable
    {
        private readonly ISubscriber<KeyData, SendData> _subscriber;
        private IDisposable _disposable;

        public Receiver(ISubscriber<KeyData, SendData> subscriber)
        {
            _subscriber = subscriber;
        }

        public void Initialize()
        {
            // まとめて破棄するやつ
            var bag = DisposableBag.CreateBuilder();

            // --- Key = 0 のみを購読 ---
            _subscriber.Subscribe(new KeyData(0), data =>
                {
                    Debug.Log($"MyKey=0, ReceivedData=[{data.Value}]");
                })
            .AddTo(bag);

            // --- Key = 1 のみを購読 ---
            _subscriber.Subscribe(new KeyData(1), data =>
                {
                    Debug.Log($"MyKey=1, ReceivedData=[{data.Value}]");
                })
            .AddTo(bag);

            // --- Key = 2 は購読しない ---

            _disposable = bag.Build();
        }

        public void Dispose()
        {
            _disposable?.Dispose();
        }
    }
}
DI
using MessagePipe;
using VContainer;
using VContainer.Unity;

namespace MessagePipeSample.PubSub.Key
{
    public class GameLifetimeScope : LifetimeScope
    {
        protected override void Configure(IContainerBuilder builder)
        {
            // MessagePipeの設定
            var options = builder.RegisterMessagePipe();
            
            // KeyDataでグループ別けしてSendDataを送受信する
            builder.RegisterMessageBroker<KeyData, SendData>(options);

            builder.RegisterEntryPoint<RoundRobinSender>();
            builder.RegisterEntryPoint<Receiver>();
        }
    }
}

実行結果

KeyDataを用いてフィルタリングされた形でメッセージの受信が実行されます。

MyKey=1, ReceivedData=[Key=1,Value=92]
MyKey=0, ReceivedData=[Key=0,Value=28]
MyKey=1, ReceivedData=[Key=1,Value=13]
MyKey=0, ReceivedData=[Key=0,Value=78]
MyKey=1, ReceivedData=[Key=1,Value=2]
MyKey=0, ReceivedData=[Key=0,Value=44]
...(以下、Key=0/1を交互に繰り返し)

考察: Subscriberが居ないTKeyへのPublishはどうなるのか

「誰も購読していないTKey」を指定してPublishを行うとどうなるのでしょうか。

結論: 送信すらされない

IPublisher<TKey, T>/ISubscriber<TKey, T>では、TKeyに応じて別けてSubscriberを管理しています。そしてSubscriberが存在しないTKeyについては、そもそも送信処理が実行されず無視されるようになっています。

IBufferedPublisher<T>/IBufferedSubscriber<T>

概要

IBufferedPublisher<T>/IBufferedSubscriber<T>は「直前にPublishされた値を1つだけ保持する」という挙動をします。つまり「Subscribe時に最新の値があればそれを発行し、以降は通常のIPub/ISubと同じ」という動作をします。

なおTclassであり、かつnullPublishされている(または1度もPublishしていない)場合はSubscribe時の最新値の発行は行われません。

送信データ
using System;

namespace MessagePipeSample.PubSub.Buffered
{
    /// <summary>
    /// 送信データ
    /// </summary>
    public readonly struct SendData : IEquatable<SendData>
    {
        public string Value { get; }

        public SendData(string value)
        {
            Value = value;
        }

        public bool Equals(SendData other)
        {
            return Value == other.Value;
        }

        public override bool Equals(object obj)
        {
            return obj is SendData other && Equals(other);
        }

        public override int GetHashCode()
        {
            return (Value != null ? Value.GetHashCode() : 0);
        }
    }
}
送信側
using System;
using Cysharp.Threading.Tasks;
using MessagePipe;
using UnityEngine;
using VContainer.Unity;

namespace MessagePipeSample.PubSub.Buffered
{
    public sealed class BufferSender : IInitializable
    {
        private readonly IBufferedPublisher<SendData> _bufferedPublisher;

        public BufferSender(IBufferedPublisher<SendData> bufferedPublisher)
        {
            _bufferedPublisher = bufferedPublisher;
        }

        public void Initialize()
        {
            // 2回送信する
            _bufferedPublisher.Publish(new SendData("1"));
            // この2度目がBufferに積まれた状態でしばらく置かれる
            _bufferedPublisher.Publish(new SendData("2"));
            
            Debug.Log("Sender: Message sent.");
            
            UniTask.Void(async () =>
            {
                Debug.Log("Wait for a second...");
                await UniTask.Delay(TimeSpan.FromSeconds(1));

                // しばらくたったから次の値をPublish
                _bufferedPublisher.Publish(new SendData("3"));
            });
        }
    }
}
受信側
using System;
using MessagePipe;
using UnityEngine;
using VContainer.Unity;

namespace MessagePipeSample.PubSub.Buffered
{
    public sealed class BufferReceiver : IStartable, IDisposable
    {
        private readonly IBufferedSubscriber<SendData> _bufferedSubscriber;
        private IDisposable _disposable;

        public BufferReceiver(IBufferedSubscriber<SendData> bufferedSubscriber)
        {
            _bufferedSubscriber = bufferedSubscriber;
        }

        // Sender.Initialize()よりこっちの方が実行開始が遅い
        public void Start()
        {
            Debug.Log("BufferReceiver: Subscribe start.");
            _disposable = _bufferedSubscriber.Subscribe(data =>
            {
                Debug.Log($"Received! :{data.Value}");
            });
        }

        public void Dispose()
        {
            _disposable?.Dispose();
        }
    }
}
DI
using MessagePipe;
using VContainer;
using VContainer.Unity;

namespace MessagePipeSample.PubSub.Buffered
{
    public class GameLifetimeScope : LifetimeScope
    {
        protected override void Configure(IContainerBuilder builder)
        {
            // MessagePipeの設定
            var options = builder.RegisterMessagePipe();
            
            // KeyDataでグループ別けしてSendDataを送受信する
            builder.RegisterMessageBroker<SendData>(options);

            builder.RegisterEntryPoint<BufferSender>();
            builder.RegisterEntryPoint<BufferReceiver>();
        }
    }
}

実行結果

// この前に発行された「1」は無視される
Sender: Message sent.    // この時点で「2」がBufferに積まれている
Wait for a second... 
BufferReceiver: Subscribe start.
Received! :2            // Bufferされていた「2」が出力
Received! :3            // 後にPublishされた「3」

IAsyncPublisher<T>/IAsyncSubscriber<T>

概要

IAsyncPublisher<T>/IAsyncSubscriber<T>(長いのでIAP<T>/IAS<T>と書きます)は、IPub<T>/ISub<T>の非同期版です。何が非同期なのかというと、「メッセージの送信時に、受信側の処理を完了するのを待つ」の部分が非同期になっています。

AsyncPublishStrategy

IAP<T>/IAS<T>AsyncPublishStrategyを指定することで非同期処理の戦略を変更することができます。

AsyncPublishStrategy.Parallel

Parallelを指定した場合は並行動作となります。

PublishAsyncしたタイミングで各Subscriberへ一斉にメッセージを発送し、Subscriber側の非同期処理がすべて終わるのを待機します。
(一斉に非同期処理を実行してUniTask.WhenAllで待ち受けるイメージ)

Parallel.jpg

AsyncPublishStrategy.Sequential

Sequentialを指定した場合は直列動作となります。

PublishAsyncしたタイミングで各Subscriberへ1つずつ順番にメッセージを発送します。発送先のSubscriberで非同期処理が終わるのを待機し、完了したら次のSubscriberへ発送するという処理を繰り返します。

Sequential.jpg

実装例

実際の動きを見たほうがわかりやすいと思うので実装例を。

  • シーン上にCubeが配置されている
  • 「リセットボタン」でCubeの位置がランダムに初期化される
  • 「Sequential」を押すとAsyncPublishStrategy.SequentialでCubeをゴールに移動させるメッセージを送る
  • 「Parallel」を押すとAsyncPublishStrategy.ParallelでCubeをゴールに移動させるメッセージを送る
  • 移動中は各ボタン操作を無効化する

という挙動をIAP<T>/ISP<T>を用いて実装してみます。

move.gif

送信メッセージ

今回は「ゴール先の座標送信」と「位置のリセット」で2つメッセージを用意します。

送信データ(ゴール先情報)
using System;
using UnityEngine;

namespace MessagePipeSample.PubSub.Async
{
    /// <summary>
    /// 目的地情報
    /// </summary>
    public readonly struct TargetPosition : IEquatable<TargetPosition>
    {
        public Vector3 Position { get; }

        public TargetPosition(Vector3 position)
        {
            Position = position;
        }

        public bool Equals(TargetPosition other)
        {
            return Position.Equals(other.Position);
        }

        public override bool Equals(object obj)
        {
            return obj is TargetPosition other && Equals(other);
        }

        public override int GetHashCode()
        {
            return Position.GetHashCode();
        }
    }
}
送信データ(リセットイベント)
namespace MessagePipeSample.PubSub.Async
{
    // 状態リセットイベント
    public readonly struct ResetEvent
    {
        public static readonly ResetEvent Default = new ResetEvent();
    }
}

移動するCube

Cube側に貼り付けるコンポーネントです。
IAsyncSubscriber<TargetPosition>からゴール先の座標を受け取ったら、非同期処理として移動処理を実行します。

移動するCubeのコンポーネント
using System.Threading;
using Cysharp.Threading.Tasks;
using MessagePipe;
using UnityEngine;
using VContainer;

namespace MessagePipeSample.PubSub.Async
{
    public sealed class MovingCube : MonoBehaviour
    {
        [Inject] private IAsyncSubscriber<TargetPosition> _asyncTargetSubscriber;
        [Inject] private ISubscriber<ResetEvent> _resetEventSubscriber;

        private readonly float _speed = 5.0f;

        private void Start()
        {
            _asyncTargetSubscriber.Subscribe(async (target, ct) =>
                {
                    await MoveToTargetAsync(target.Position, ct);
                })
                .AddTo(this.GetCancellationTokenOnDestroy());

            _resetEventSubscriber.Subscribe(_ => ResetPosition())
                .AddTo(this.GetCancellationTokenOnDestroy());
        }

        /// <summary>
        /// 指定座標に向かってゆっくり移動する
        /// </summary>
        private async UniTask MoveToTargetAsync(Vector3 target, CancellationToken ct)
        {
            while (!ct.IsCancellationRequested)
            {
                // 目標地点との差分
                var delta = (target - transform.position);

                // 1.0m以内なら到着
                if (delta.sqrMagnitude < 1.0f)
                {
                    return;
                }

                // 離れているなら目標に向かって移動する
                transform.position += _speed * (delta.normalized) * Time.deltaTime;

                await UniTask.Yield();
            }
        }

        private void ResetPosition()
        {
            transform.position =
                new Vector3(Random.Range(-5f, 5f), 0, Random.Range(-5f, 5f));
        }
    }
}

UI操作部分

uGUIButtonと連動してメッセージを発行します。
GetAsyncClickEventHandlerの使い方はこちらを参考に

ここで「移動ボタン」が押されたときに各Cubeへメッセージを送り、それぞれの移動が終わるのを待つという処理をIAP<T>/IAS<T>で実現しています。

uGUIと連動してメッセージ送信
using System.Threading;
using Cysharp.Threading.Tasks;
using MessagePipe;
using UnityEngine;
using UnityEngine.UI;
using VContainer;
using Random = UnityEngine.Random;

namespace MessagePipeSample.PubSub.Async
{
    /// <summary>
    /// uGUIに連動してCubeに命令を出す
    /// </summary>
    public sealed class CubeController : MonoBehaviour
    {
        [Inject] private IAsyncPublisher<TargetPosition> _asyncPublisher;
        [Inject] private IPublisher<ResetEvent> _resetPublisher;

        // 各種ボタン
        [SerializeField] private Button _moveParallelButton;
        [SerializeField] private Button _moveSequentialButton;
        [SerializeField] private Button _resetButton;

        [SerializeField] private Transform _goalMarkerObject;

        private void Start()
        {
            var ct = this.GetCancellationTokenOnDestroy();
            
            // ボタンイベントの購読開始
            WaitForResetButtonAsync(ct).Forget();
            WaitForMoveParallelButtonAsync(ct).Forget();
            WaitForMoveSequentialButtonAsync(ct).Forget();
        }

        /// <summary>
        /// リセットボタンの処理
        /// </summary>
        private async UniTaskVoid WaitForResetButtonAsync(CancellationToken ct)
        {
            var asyncHandler = _resetButton.GetAsyncClickEventHandler(ct);

            while (!ct.IsCancellationRequested)
            {
                // ボタンがクリックされたらリセットメッセージを発行
                await asyncHandler.OnClickAsync();
                _resetPublisher.Publish(ResetEvent.Default);
            }
        }

        /// <summary>
        /// 移動ボタン(並行処理)
        /// </summary>
        private async UniTaskVoid WaitForMoveParallelButtonAsync(CancellationToken ct)
        {
            var asyncHandler = _moveParallelButton.GetAsyncClickEventHandler(ct);

            while (!ct.IsCancellationRequested)
            {
                await asyncHandler.OnClickAsync();

                // PublishAsyncが完了するまでボタンを無効化
                SwitchButtonInteractable(false);
                
                // ランダムに移動先を決定
                var target = new UnityEngine.Vector3(Random.Range(-5f, 5f), 0, Random.Range(-5f, 5f));
                _goalMarkerObject.transform.position = target;

                // Cubeにメッセージを送信し、移動が終わるまで待つ
                // AsyncPublishStrategy.Parallel を指定しているので並行実行
                await _asyncPublisher.PublishAsync(new TargetPosition(target),
                    AsyncPublishStrategy.Parallel, ct);

                // ボタンを再有効化
                SwitchButtonInteractable(true);
            }
        }

        /// <summary>
        /// 移動ボタン(直列処理)
        /// </summary>
        private async UniTaskVoid WaitForMoveSequentialButtonAsync(CancellationToken ct)
        {
            var asyncHandler = _moveSequentialButton.GetAsyncClickEventHandler(ct);

            while (!ct.IsCancellationRequested)
            {
                await asyncHandler.OnClickAsync();

                // PublishAsyncが完了するまでボタンを無効化
                SwitchButtonInteractable(false);

                // ランダムに移動先を決定
                var target = new UnityEngine.Vector3(Random.Range(-5f, 5f), 0, Random.Range(-5f, 5f));
                _goalMarkerObject.transform.position = target;

                // Cubeにメッセージを送信し、移動が終わるまで待つ
                // AsyncPublishStrategy.Sequential を指定しているので直列実行
                await _asyncPublisher.PublishAsync(new TargetPosition(target),
                    AsyncPublishStrategy.Sequential, ct);
                
                // ボタンを再有効化
                SwitchButtonInteractable(true);
            }
        }

        private void SwitchButtonInteractable(bool isEnabled)
        {
            _moveParallelButton.interactable = isEnabled;
            _moveSequentialButton.interactable = isEnabled;
            _resetButton.interactable = isEnabled;
        }
    }
}

DI設定

DI
using MessagePipe;
using UnityEngine;
using VContainer;
using VContainer.Unity;

namespace MessagePipeSample.PubSub.Async
{
    public class GameLifetimeScope : LifetimeScope
    {
        [SerializeField] private MovingCube _cubePrefab;

        protected override void Configure(IContainerBuilder builder)
        {
            // MessagePipeの設定
            var options = builder.RegisterMessagePipe();

            // デフォルトの非同期処理のやり方
            // PublishAsync時に個別指定もできる
            options.DefaultAsyncPublishStrategy = AsyncPublishStrategy.Parallel;

            builder.RegisterMessageBroker<TargetPosition>(options);
            builder.RegisterMessageBroker<ResetEvent>(options);

            // DIしながらInstantiate
            builder.RegisterBuildCallback(resolver =>
            {
                // 3つ生成する
                for (int i = 0; i < 3; i++)
                {
                    resolver.Instantiate(_cubePrefab);
                }
            });
        }
    }
}

実行結果

こちらで実際に動かせます

  • 並行実行時

AsyncPublishStrategy.Parallelでメッセージ送信した場合の挙動です。
すべてのCubeへ一斉にメッセージを渡して移動処理を行い、すべてが終わるまで待ちます。
Parallel.gif

  • 直列実行時

AsyncPublishStrategy.Sequentialでメッセージ送信した場合の挙動です。
1つずつ順番にCubeへとメッセージを渡して移動処理を行い、それが終わったら次のCubeへ…という処理を終わるまで待ちます。

sequential.gif

考察

IAP<T>/ISP<T>Observableでは表現できなかった機能なので、MessagePipeの目玉機能だと思います。(一応、UniRxでもAsyncMessageBrokerとかあったけど)

IAsyncPublisher<TKey, T>/IAsyncSubscriber<TKey, T>

IAP<T>/ISP<T>Tkeyが指定できる版です。IPublisher<TKey, T>/ISubscriber<TKey, T>が非同期になったのと同等なので説明は省略します。

IBufferedAsyncPublisher<T>/IBufferedAsyncSubscriber<T>

IBufferedPublisher<T>/IBufferedSubscriber<T>の非同期版です。

挙動はIBufferedPublisher<T>/IBufferedSubscriber<T>とだいたい同じですが、Subscribe自体がAsyncになっている点が異なります。


// SubscribeAsync()の戻り値はUniTask<IDisposable>
// Subscribe時に最新値があれば即発行されて非同期処理が実行される
var disposable = await _asyncBufferedTargetSubscriber.SubscribeAsync(
    async (target, ct) =>
    {
        await MoveToTargetAsync(target.Position, ct);
    });

IDistributedPublisher<TKey, TMessage>/IDistributedSubscriber<TKey, TMessage>

IDistributedPublisher<TKey, TMessage>/IDistributedSubscriber<TKey, TMessage>は分散化された(ネットワークを跨いだ)メッセージ伝達に持ちいることができます。

長くなるのでこれの解説は別記事で。

まとめ

どのパターンのPub/Subも便利なので、ユースケースに応じて適切なものを選択できるとよいでしょう。
個人的にはAsyncPub/AsyncSubの挙動がかなり面白くて気に入っているので、使える場面があればガンガン使って行きたいと思います。

33
18
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
33
18

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?