MessagePipe解読第3回目
1回目、2回目は以下
はじめに
以下の環境で実行しています
- MacOS
- Unity2021.2.0b2
- MessagePipe v1.6.1
- Zenject
(※ DIContainerはビルドインで用意されているので、Zenjectを入れていないプロジェクトでもUniTask、UniRxに加えてMessagePipeを入れましょう)
今回はMessagePipeの非同期処理(async/await)が使用できるということで、どのような使い方が出来るのかを記事にしました
しかし処理が大きく変わるというわけではなく非同期用のインターフェイスにするだけで使用感が変わるということはありませんでした
IPublisher → IAsyncPublusher
ISubscriber → IAsyncSubscriber
非同期処理についていくつかテストをしてみたいと思います
(今回は並列処理については書いていません。追記で書くかも)
- イベントを送信するクラスを Publisher
- イベントを受信するクラスを Subscriber
- イベントクラスを MyEvent
とする
※最後にテストコードを貼ってあるのでそれをUnityTestRunnerで実行してみてくださ
イベント送る処理が終了するまでawaitで待機する
まずはイベントを送る側(Publisher)にasync/awaitを適用させてみます。
Publisherのイベント送信処理が終了するまでawaitするケース
イベント受ける側のクラスは前回と同じ
// イベント受け取る方
public class Subscriber
{
[Inject] private ISubscriber<MyEvent> _subscriber;
public void Subscribe()
{
// イベントが来たら反応する
_subscriber
.Subscribe(ev =>
{
Debug.Log($"イベント受信完了");
});
}
}
ISubscriberで受けて、イベントが来たらSubscribeのラムダが反応します
// イベント送る方
public class Publisher
{
// IPublisherではなくIAsyncPublisherを使う
[Inject] private IAsyncPublisher<MyEvent> _publisher;
public async UniTask SendAsync(MyEvent ev)
{
// Subscriberの購読処理が終わるまで待つ
await _publisher.PublishAsync(ev);
// イベント処理終わった後になにか処理かける
}
}
IPublisher が IAsyncPublisher になっています。
前回はPublishでイベントを送っていましたが、PublishAsyncになっています。
そしてPublishAsyncにawaitがついていますが
PublishAsyncでイベントを送ることにより、イベント送信処理が終わるまで awaitされ続けます。
後はPublisherクラスを作成してSendAsyncメソッドを呼び出すだけです
// Zenjectだと以下のような
var publisher = _container.Instantiate<Publisher>();
await publisher.SendAsync(new MyEvent()); // ← イベント送信処理が終わるまで待機される
使い方は以前とほぼ変わっていません。インターフェイスを変えてawaitを書いただけ
これで待機処理にできました。
しかし今回の例、「イベント処理終わるまで待機...?」 というところに疑問を持たれると思います。
上記の例はイベント受け取るほう(Subscriber)が特に何も変わったことをしていないため、イベントを送ってSubscribeが反応して終わり。
つまりawaitも何もせずに先に進みます → 特に意味がない例。でした
しかしIPublisherをIPublishAsyncにするだけで async/await が使用できるということがわかったと思います。
イベントを受信したときに await させたい
次はイベントを受信する側(Subscriber)のクラスでasync/awaitを適用させるパターンを考えます。
イベントを受信したら1秒待機して、ログを出したいケース。
// イベント受け取る方
public class Subscriber
{
// ISubscirberからIAsyncSubscirberに変更
[Inject] private IAsyncSubscriber<MyEvent> _asyncSubscriber;
public void Subscribe()
{
_asyncSubscriber
.Subscribe(async (x, ctr) =>
{
var time = Time.realtimeSinceStartup;
// ここで非同期処理が可能
// 大体1秒待機する
await UniTask.Delay(TimeSpan.FromSeconds(1), cancellationToken: ctr);
time = Time.realtimeSinceStartup - time;
Debug.Log($"イベント受信完了 待機秒数: {time}秒");
});
}
}
イベント受け取る方を変えました。送る方はさっきと同じです。
ISubscriberをIAsyncSubscriberに変えました。
これだけで非同期処理が使用可能になってます
そして、_asyncSubscriberのSubscribeのラムダに async がついていることがわかると思います。
_asyncSubscriber
.Subscribe(async (x, ctr) => ← ここ
{
....
つまり ラムダの中で await が使用できるということ。
ラムダの中で UniTask.Delay を使用して1秒待機し、その後ログを出すようにしました。
流れとしては以下です
- PublisherAsyncでイベントを投げた
- Subscriberクラスがイベントを受け取った
- UniTask.Delayで1秒待機
- 待機後ログを出した。
- Subscribe終了
- PublishAsync の待機が終わり先に進む
ここでひっそりと6番目が追加されていますが、
新しくコードを追加したのではなく元々PublisherクラスはIAsyncPublisherを使用して**「イベントの送信処理がすべて終了した時先に進む」**というawait処理がありました。(最初の例)
今回はSubscribeの中で1秒待機してログを出すようにしています。
Subscribeがasyncで動いている中、PublishAsyncはawaitし続け、5番目の 「Subscribe終了」 が来たときにawaitが解除されて先に進むようになります
イベント受信側がasync/awaitが使用できること、そしてイベント送信側でawaitする理由は受信側でasync/awaitされた結果、その処理が終わるまで待つために使える
ということがわかりました。
※ IAsyncSubscriberのSubscribeは二番目の引数にCancellationToken
を受け取ります。
これを各種非同期処理に渡してあげるとSubscribeのasyncが途中で強制的に破棄された場合、渡した非同期処理も破棄してくれます(便利)
一度だけイベントをawaitで待ちたい
特定のイベントを一度だけ待機したいケースは割と多いんじゃないかと思います。
- 再生したアニメーションが終わるまで待機したい。
- 撃った弾が消えるまで待機したい
Subscirbe (() => .... );
でラムダの中に処理を書いても問題ありませんが awaitを使用してもう少しモダンに処理をさせたい。
そこで使用するのが IAsyncSubscriber.FirstAsync
// イベント受け取る方
public class Subscriber
{
[Inject] private IAsyncSubscriber<MyEvent> _asyncSubscriber;
public async UniTask WaitAsync(CancellationTokenSource cts)
{
// MyEventがくるまでawaitで待つ
var ev = await _asyncSubscriber.FirstAsync(cts.Token);
Debug.Log($"イベント受信完了!!");
}
}
↓ ここが今回のお話
var ev = await _asyncSubscriber.FirstAsync(cts.Token);
**Subscribeで購読処理を書くのではなくFirstAsyncを書くことで「MyEventが送られてくるまで待機する。」**というawait処理が可能になっています。
(イベントが来たらawait終わって先に進む)
一度だけ処理したい場合はこちらのほうが見やすく、直感的ではないでしょうか
しかしこの処理はイベントが来るまでawaitで待ち続けてしまいます。
これは途中で購読をやめたい時には不便です。
この為に引数で CancellationToken
を受け取るようになっています。
これによりトークンをいつでもCancelすることでawaitの待機処理を破棄することができます
このあたりも開発元であるCyshapのUniTaskとの連携によりとても使いやすいと感じます。
終わりに
MessagePipeの真骨頂はこのUniTaskと連動した非同期処理にあるものだと言っても良いと思いました
async/awaitが使用できることによりイベントの送受信だけに収まらず、待機処理まで一連の流れで行えることで処理が分散されません。
結果視認性が高くなり、更にはバグが出にくいコードにもなると考えます
async/awaitを使えることを知ったことでMessagePipe使用の視野が広がれば幸いです
MessagePipeを使っていくうち、今後はMessagePipeを入れないとゲーム開発ができない(したくない)体になっていきそう...
今回テストで作成したUnityTestコードも貼っておきます。 自分でも実際にコードを見た、書いたほうがわかりやすかったので↓のコードをコピペしてUnityTestRunnerで実行して結果を見たほうが理解が早いかもしれません。(PlayModeTestのほう) `GlobalMessagePipe`を使用してIAsyncPublisher, IAsyncSubscriberを作成する処理も書いています
using NUnit.Framework;
using UnityEngine;
using System;
using Cysharp.Threading.Tasks;
using System.Collections;
using UnityEngine.TestTools;
using Zenject;
using MessagePipe;
using System.Threading;
namespace XXXX
{
/// <summary>
/// Asyncテスト
/// </summary>
public class AsyncTest
{
#region 定数, class, enum
// イベント送る方
public class Publisher
{
[Inject] private IAsyncPublisher<MyEvent> _asyncPublisher;
public async UniTask SendAsync(MyEvent ev)
{
// Subscriberの購読処理が終わるまで待つ
await _asyncPublisher.PublishAsync(ev);
Debug.Log("イベント送信完了");
}
}
// イベント受け取る方
public class Subscriber
{
[Inject] private IAsyncSubscriber<MyEvent> _asyncSubscriber;
[Inject] private ISubscriber<MyEvent> _subscriber;
private CancellationTokenSource _cts = new CancellationTokenSource();
private IDisposable _disposable;
public void Subscribe()
{
var bag = DisposableBag.CreateBuilder();
// イベントが来たら反応する
_subscriber
.Subscribe(ev =>
{
Debug.Log($"イベント受信完了: {ev.Message}");
}).AddTo(bag);
_disposable = bag.Build();
}
/// <summary>
/// 最初のイベントが来るまで待機
/// </summary>
public async UniTask FirstAsync()
{
var ev = await _asyncSubscriber.FirstAsync(_cts.Token);
Debug.Log($"イベント受信完了: {ev.Message}");
}
/// <summary>
/// Subscribe内で待機
/// </summary>
public void SubscribeAsync(System.Action onFinished)
{
var bag = DisposableBag.CreateBuilder();
_asyncSubscriber
.Subscribe(async (x, ctr) =>
{
var time = Time.realtimeSinceStartup;
// ここで非同期処理が可能
// 大体1秒待機
await UniTask.Delay(TimeSpan.FromSeconds(1), cancellationToken: ctr);
time = Time.realtimeSinceStartup - time;
Debug.Log($"イベント受信完了: {x.Message} 待機秒数: {time}秒");
// 終わったらよびだし
onFinished?.Invoke();
}).AddTo(bag);
_disposable = bag.Build();
}
public void Close()
{
_disposable?.Dispose();
}
}
// 送るイベント
public class MyEvent
{
public string Message;
}
#endregion
#region public, protected 変数
#endregion
#region private 変数
private DiContainer _container;
#endregion
#region プロパティ
#endregion
#region コンストラクタ, デストラクタ
#endregion
#region public, protected 関数
[SetUp]
public void Setup()
{
_container = new DiContainer();
_container.BindMessageBroker<MyEvent>(_container.BindMessagePipe());
}
[TearDown]
public void TearDown()
{
_container.UnbindAll();
}
[UnityTest]
public IEnumerator 非同期送信テスト()
{
Debug.Log("イベント受信");
// イベントを受ける方
var subscriber = _container.Instantiate<Subscriber>();
subscriber.Subscribe();
var task2 = UniTask.Create(async () =>
{
Debug.Log("イベント送信");
// イベントを投げる方
var publisher = _container.Instantiate<Publisher>();
await publisher.SendAsync(new MyEvent { Message = "イベント" });
});
return UniTask.ToCoroutine(async () => await UniTask.WhenAll(task2));
}
[UnityTest]
public IEnumerator 非同期送受信テスト()
{
var task1 = UniTask.Create(async () =>
{
bool isFinished = false;
Debug.Log("イベント受信");
// イベントを受ける方
var subscriber = _container.Instantiate<Subscriber>();
subscriber
.SubscribeAsync(() =>
{
// 終わったらフラグをtrueにして終了させる
isFinished = true;
});
return UniTask.WaitUntil(() => isFinished);
});
var task2 = UniTask.Create(async () =>
{
Debug.Log("イベント送信");
// イベントを投げる方
var publisher = _container.Instantiate<Publisher>();
await publisher.SendAsync(new MyEvent { Message = "イベント" });
});
return UniTask.ToCoroutine(async () => await UniTask.WhenAll(task1, task2));
}
[UnityTest]
public IEnumerator 非同期送受信テスト_FirstAsync()
{
var task1 = UniTask.Create(async () =>
{
Debug.Log("イベント受信");
// イベントを受ける方
var subscriber = _container.Instantiate<Subscriber>();
await subscriber.FirstAsync();
});
var task2 = UniTask.Create(async () =>
{
Debug.Log("イベント送信");
// イベントを投げる方
var publisher = _container.Instantiate<Publisher>();
await publisher.SendAsync(new MyEvent { Message = "イベント" });
});
return UniTask.ToCoroutine(async () => await UniTask.WhenAll(task1, task2));
}
[UnityTest]
public IEnumerator 非同期送受信テスト_GlobalMessagePipe()
{
// GlobalMessagePipeを使用する前にSetProviderに設定する必要がある
GlobalMessagePipe.SetProvider(_container.AsServiceProvider());
// 非同期用のPublisher/Subscriberを生成する
var asyncSubscriber = GlobalMessagePipe.GetAsyncSubscriber<MyEvent>();
var asyncPublisher = GlobalMessagePipe.GetAsyncPublisher<MyEvent>();
var cts = new CancellationTokenSource();
var task1 = UniTask.Create(async () =>
{
Debug.Log("イベント受信");
// イベントを受ける方
var ev = await asyncSubscriber.FirstAsync(cts.Token);
Debug.Log($"イベント受信完了 : {ev.Message}");
});
var task2 = UniTask.Create(async () =>
{
Debug.Log("イベント送信");
// イベントを投げる方
await asyncPublisher.PublishAsync(new MyEvent { Message = "イベント" }, cts.Token);
Debug.Log("イベント送信完了");
});
return UniTask.ToCoroutine(async () => await UniTask.WhenAll(task1, task2));
}
#endregion
#region private 関数
#endregion
}
}