はじめに
今回はUnityにおけるMessagePipe
を学習がてら触りつつ、その内容を自分用のメモ兼解説書という形でまとめていきます。
記事に登場するコード類はこちらのリポジトリで公開しています。
MessagePipeとは
公式で解説記事があるので、詳しくはこちらを読んで下さい。
かいつまんでまとめると、MessagePipe
はこのようなライブラリです。
-
Pub/Sub
パターンの実装を提供するライブラリ - 加えて局所的な範囲で用いることができるメッセージングの仕組み
- さまざまな用途で利用が可能
- インメモリ駆動
- サーバ-クライアント間での通信(
MagicOnion
やSignalR
との連携) - プロセス間通信(
IPC
) - GUI実装における
View-ViewModel
間のメッセージング -
CQRS
向けのMediator
パターン実装 - Unityにおける
Zenject.SignalBus
やUniRx.MessageBroker
の代替
- パフォーマンスがめちゃくちゃよい
-
DIContainer
と併用が前提
要するに「メッセージの伝達」に特化したライブラリです。
Pub/Subパターンとは
Pub/Sub
パターンはPub/Sub
メッセージングモデルとも呼ばれます。
こちらは主に次の3つの概念から構成される、メッセージ(イベント)伝達のパターンです。
-
Publisher
: メッセージを発信する -
Subscriber
: メッセージを受信する -
Broker
: 発信メッセージを管理し、受信側へと伝達する機構
一般的なイベント実装と違い、Pub/Sub
パターンは「オブジェクト同時をより疎結合にできる」「メッセージ伝達の調整が行いやすい」という特徴があります。
疎結合化
アプリケーション内部において、一般的なイベント実装(event
やObservable
)ではメッセージの発行者を意識し、それを購読しにいくという仕組みのため密結合な状態になります。
もちろん、インタフェースを挟むなどすれば実装オブジェクト同士は疎結合にもできますが、特定のインタフェースを参照しなくてはいけないという点は残ります。
一方のPub/Sub
パターンでは、「Subscriber
さえ知っていればメッセージを受け取れる」という状況にすることができます。
つまり、途中の経路にどのような実装が潜んでいるかを把握することなく、目的のメッセージを簡単に発信/受信することが可能となります。
このように、Pub/Sub
パターンを用いることでオブジェクト間の関係性を意識せず、柔軟なメッセージ伝達が可能となります。
注意点
疎結合にできることはメリットでもある反面、次のような問題点も含んでいることに注意する必要もあります。
- レイヤやアーキテクチャといった「設計上重要な制約」を無視するようなコードを簡単に書けてしまう
- メッセージの送信元/受信先のオブジェクトを追跡しにくくなる(リークが起きる可能性あり)
メッセージ伝達の調整が行いやすい
Pub
/Sub
パターンでは、Publisher
とSubscriber
が特定のオブジェクトに依存しません。つまり「誰でもメッセージの発信ができる」「誰でもメッセージの購読ができる」ということです。
後からPublisher
が増減したり、Subscriber
が増減したり、スケールしやすいのがPub/Sub
パターンの特徴です。
またメッセージの伝達を中継するオブジェクト(Broker
)の実装を調整することで、メッセージ伝達の仕組みそのものを調整できるのもPub/Sub
パターンの特徴です。
(MessagePipe
ではBroker
自体は隠蔽されているものの、Publisher
/Subscriber
のインタフェースを選ぶことである程度挙動が変更可能になっています)
UniRxとの比較
Unityの既存のライブラリでMessagePipe
に似ているものとしては「UniRx
におけるObservable
(もしくはMessageBroker
)」が挙げられます。
Observable
は非常に高い表現力があり活用の幅は広いのですが、多くの場面においてはもっとシンプルなパターン済むことが多く、冗長すぎてしまう欠点を含んでいました。
つまりUniRx
には「シンプルに扱いたい場面を無駄に複雑化させてしまう」という大きな欠点がありました。
MessagePipe
はまさにそのUniRx
が苦手とする「シンプルにメッセージの伝達を行いたい」というニーズを満たすライブラリとなっています。
むしろ「メッセージの扱いやすさ」にはMessagePipe
の方が特化しているため、場面によってはUniRx
よりも柔軟に扱うことが可能となっています。
(MessagePipe
⇢UniRx
への連結もできるようになっているので、必要なら変換しちゃえばOKだったりもします)
またUniRx
はあくまで「同一プロセス内でのメッセージ伝達」にしか用いることができません。
ですがMessagePipe
では、プロセスを跨いだり、ネットワーク経由でメッセージ伝達を行うことが可能です。
UniRx
が「局所的な利便性を追求したライブラリ」だとすると、MessagePipe
は「よりシンプルな形で幅広く利用できるライブラリ」といえます。
実際に触ってみる
長々と話してきましたが、とりあえず導入して触ってみます。
Unityにインストールする
導入はPackageManager
よりUPM経由で導入しました。
Editor
-> ProjectSettings
に次のようにOpenUPM
に対するURLを追加します。
- Name:
OpenUPM
- URL:
https://package.openupm.com
- Scopes
com.cysharp
jp.hadashikick.vcontainer
これでOpenUPM
が利用可能になるので、PackageManager
より次の4つを導入します。
-
MessagePipe :
MessagePipe
の本体 -
MessagePipe.VContainer :
MessagePipe
をVContainer
連携するのに必要 -
VContainer : Unity向けの
DI Container
フレームワーク -
UniTask :
MessagePipe
が依存する非同期処理ライブラリ
これで準備は完了です。
同期・1対1で動かしてみる
ではもっともシンプルなパターンとして、「同期」「1対1」でメッセージ伝達を行ってみます。
今回は「入力イベント」をメッセージとして伝達しCubeを動かしてみます。
(キーボードイベントを伝達して、Cube
が動き回る実装を試す)
送るメッセージ内容
InputParams
という構造体を定義して、これをメッセージとして送信します。
using System;
using UnityEngine;
namespace MessagePipeSample.InputProvider
{
/// <summary>
/// ブロードキャストするメッセージ
/// 入力情報
/// </summary>
public readonly struct InputParams : IEquatable<InputParams>
{
/// <summary>
/// ジャンプフラグ
/// </summary>
public bool IsJump { get; }
/// <summary>
/// 移動操作
/// </summary>
public Vector3 Move { get; }
public InputParams(bool isJump, Vector3 move)
{
IsJump = isJump;
Move = move;
}
public bool Equals(InputParams other)
{
return IsJump == other.IsJump && Move.Equals(other.Move);
}
public override bool Equals(object obj)
{
return obj is InputParams other && Equals(other);
}
public override int GetHashCode()
{
return HashCode.Combine(IsJump, Move);
}
}
}
送信側
メッセージを送信する実体としてInputEventProvider
を定義します。
このInputEventProvider
がIPublisher<InputParams>
を用いてメッセージを配信するようにします。
using MessagePipe;
using UnityEngine;
using VContainer.Unity;
namespace MessagePipeSample.InputProvider
{
public sealed class InputEventProvider : ITickable
{
/// <summary>
/// MessagePipeにメッセージを流す用のインタフェース
/// </summary>
private readonly IPublisher<InputParams> _inputPublisher;
public InputEventProvider(IPublisher<InputParams> inputPublisher)
{
_inputPublisher = inputPublisher;
}
// 毎フレーム実行
public void Tick()
{
// 入力状態を監視
var isJump = Input.GetKey(KeyCode.Space);
var axis = new Vector3(Input.GetAxis("Horizontal"), 0, Input.GetAxis("Vertical"));
// メッセージを作成
var inputParams = new InputParams(isJump, axis);
// メッセージ送信
_inputPublisher.Publish(inputParams);
}
}
}
受信側
メッセージを最終的に受け取りハンドリングするオブジェクトとしてMoveCube
を定義します。
これをCube
のGameObjectにアタッチし、Prefab化しておきます。
using Cysharp.Threading.Tasks;
using MessagePipe;
using UnityEngine;
using VContainer;
namespace MessagePipeSample.InputProvider
{
public sealed class MoveCube : MonoBehaviour
{
/// <summary>
/// MessagePipeからメッセージを受け取る用インタフェース
/// </summary>
[Inject] private ISubscriber<InputParams> _inputEventSubscriber;
// 各種フィールド
private CharacterController _characterController;
private readonly float JumpSpeed = 3.0f;
private readonly float MoveSpeed = 3.0f;
private void Start()
{
_characterController = GetComponent<CharacterController>();
// 入力イベントの受信を開始する
_inputEventSubscriber.Subscribe(OnInputEventReceived)
// MonoBehaviourに寿命を紐づける(これはUniTaskの機能)
.AddTo(this.GetCancellationTokenOnDestroy());
}
/// <summary>
/// 入力イベントを処理する
/// </summary>
private void OnInputEventReceived(InputParams input)
{
var moveVelocity = new Vector3(0, _characterController.velocity.y, 0);
if (input.IsJump && _characterController.isGrounded)
{
moveVelocity += Vector3.up * JumpSpeed;
}
moveVelocity += input.Move * MoveSpeed;
moveVelocity += Physics.gravity * Time.deltaTime;
_characterController.Move(moveVelocity * Time.deltaTime);
}
}
}
DIの設定をする
以上の実装ができたら、これらをVContainer
経由でDI
されるようにVContainer
のLifetimeScope
を定義します。
using MessagePipe;
using UnityEngine;
using VContainer;
using VContainer.Unity;
namespace MessagePipeSample.InputProvider
{
public class GameLifetimeScope : LifetimeScope
{
// MoveCubeのPrefabへの参照
[SerializeField] private MoveCube _moveCubePrefab;
protected override void Configure(IContainerBuilder builder)
{
// MessagePipeの設定
var options = builder.RegisterMessagePipe();
// InputParamsを伝達できるように設定する
builder.RegisterMessageBroker<InputParams>(options);
// InputEventProviderを起動
builder.RegisterEntryPoint<InputEventProvider>(Lifetime.Singleton);
// MoveCubeをDIしながらInstantiate
builder.RegisterBuildCallback(resolver =>
{
resolver.Instantiate(_moveCubePrefab);
});
}
}
}
Unity上の最終的な状態
シーン上にLifetimeScope
がアタッチされたGameObject
を用意し、ここが起点となってシーン全体が初期化されます。
動かしてみる
このように、MessagePipe
が提供する各種インタフェース、MessageBroker
を経由してメッセージ伝達ができました。
今回は「同期」「1対1」で試しましたが、メッセージのハンドリングを「非同期」にできたり、「多対多」も実現できます。
1対多にしてみる
1つのInputEventProvider
(送信)に対して、複数のMoveCube
(受信)を設定してみます。
といってもコードをいじる部分は殆どなく、単にMoveCube
を複数個Instantiate
してしまえばそれだけで勝手に「1対多」として動作します。これはMessagePiep
が最初からメッセージ受信者の増減に対応しているためです。
using MessagePipe;
using UnityEngine;
using VContainer;
using VContainer.Unity;
namespace MessagePipeSample.InputProvider
{
public class GameLifetimeScope : LifetimeScope
{
[SerializeField] private MoveCube _moveCubePrefab;
protected override void Configure(IContainerBuilder builder)
{
// MessagePipeの設定
var options = builder.RegisterMessagePipe();
// InputParamsを伝達できるように設定する
builder.RegisterMessageBroker<InputParams>(options);
// InputEventProviderを起動
builder.RegisterEntryPoint<InputEventProvider>(Lifetime.Singleton);
// MoveCubeをDIしながらInstantiate
builder.RegisterBuildCallback(resolver =>
{
// ---------変更点ここから-----------
// 3つ並べて生成する
for (int i = 0; i < 3; i++)
{
var cube = resolver.Instantiate(_moveCubePrefab);
cube.transform.position = Vector3.forward * (i * 2f);
}
// ---------変更点ここまで----------
});
}
}
}
(MoveCube
が3つ生成され、それぞれにISubscriber
が注入されSubscribe
が実行される)
UniRxに連結してみる
ISubscriber<T>.AsObservable()
を呼び出すことでIObservable<T>
に変換することができます。
IObservable<T>
にさえ変換できれば、あとはUniRx
で好きにできます。
// 入力イベントの受信を開始する
_inputEventSubscriber
.AsObservable() // ↑ ここまでMessagePipe
.DistinctUntilChanged() // ↓ ここからUniRx
.TakeUntilDestroy(this)
.Subscribe(OnInputEventReceived);
まとめ
今回はMessagePipe
の概要、導入方法、シンプルな実装例を紹介しました。
次回以降で「非同期的な使い方」「EventFactory
」「Analyzer
」などにも触れていく予定です。