はじめに
今回はUnityにおけるMessagePipeを学習がてら触りつつ、その内容を自分用のメモ兼解説書という形でまとめていきます。
記事に登場するコード類はこちらのリポジトリで公開しています。
MessagePipeとは
公式で解説記事があるので、詳しくはこちらを読んで下さい。
かいつまんでまとめると、MessagePipeはこのようなライブラリです。
-
Pub/Subパターンの実装を提供するライブラリ - 加えて局所的な範囲で用いることができるメッセージングの仕組み
- さまざまな用途で利用が可能
- インメモリ駆動
- サーバ-クライアント間での通信(
MagicOnionやSignalRとの連携) - プロセス間通信(
IPC) - GUI実装における
View-ViewModel間のメッセージング -
CQRS向けのMediatorパターン実装 - Unityにおける
Zenject.SignalBusやUniRx.MessageBrokerの代替
- パフォーマンスがめちゃくちゃよい
-
DIContainerと併用が前提
要するに「メッセージの伝達」に特化したライブラリです。
Pub/Subパターンとは
Pub/SubパターンはPub/Subメッセージングモデルとも呼ばれます。
こちらは主に次の3つの概念から構成される、メッセージ(イベント)伝達のパターンです。
-
Publisher: メッセージを発信する -
Subscriber: メッセージを受信する -
Broker: 発信メッセージを管理し、受信側へと伝達する機構
一般的なイベント実装と違い、Pub/Subパターンは「オブジェクト同時をより疎結合にできる」「メッセージ伝達の調整が行いやすい」という特徴があります。
疎結合化
アプリケーション内部において、一般的なイベント実装(eventやObservable)ではメッセージの発行者を意識し、それを購読しにいくという仕組みのため密結合な状態になります。
もちろん、インタフェースを挟むなどすれば実装オブジェクト同士は疎結合にもできますが、特定のインタフェースを参照しなくてはいけないという点は残ります。
一方のPub/Subパターンでは、「Subscriberさえ知っていればメッセージを受け取れる」という状況にすることができます。
つまり、途中の経路にどのような実装が潜んでいるかを把握することなく、目的のメッセージを簡単に発信/受信することが可能となります。
このように、Pub/Subパターンを用いることでオブジェクト間の関係性を意識せず、柔軟なメッセージ伝達が可能となります。
注意点
疎結合にできることはメリットでもある反面、次のような問題点も含んでいることに注意する必要もあります。
- レイヤやアーキテクチャといった「設計上重要な制約」を無視するようなコードを簡単に書けてしまう
- メッセージの送信元/受信先のオブジェクトを追跡しにくくなる(リークが起きる可能性あり)
メッセージ伝達の調整が行いやすい
Pub/Subパターンでは、PublisherとSubscriberが特定のオブジェクトに依存しません。つまり「誰でもメッセージの発信ができる」「誰でもメッセージの購読ができる」ということです。
後からPublisherが増減したり、Subscriberが増減したり、スケールしやすいのがPub/Subパターンの特徴です。
またメッセージの伝達を中継するオブジェクト(Broker)の実装を調整することで、メッセージ伝達の仕組みそのものを調整できるのもPub/Subパターンの特徴です。
(MessagePipeではBroker自体は隠蔽されているものの、Publisher/Subscriberのインタフェースを選ぶことである程度挙動が変更可能になっています)
UniRxとの比較
Unityの既存のライブラリでMessagePipeに似ているものとしては「UniRxにおけるObservable(もしくはMessageBroker)」が挙げられます。
Observableは非常に高い表現力があり活用の幅は広いのですが、多くの場面においてはもっとシンプルなパターン済むことが多く、冗長すぎてしまう欠点を含んでいました。
つまりUniRxには「シンプルに扱いたい場面を無駄に複雑化させてしまう」という大きな欠点がありました。
MessagePipeはまさにそのUniRxが苦手とする「シンプルにメッセージの伝達を行いたい」というニーズを満たすライブラリとなっています。
むしろ「メッセージの扱いやすさ」にはMessagePipeの方が特化しているため、場面によってはUniRxよりも柔軟に扱うことが可能となっています。
(MessagePipe⇢UniRxへの連結もできるようになっているので、必要なら変換しちゃえばOKだったりもします)
またUniRxはあくまで「同一プロセス内でのメッセージ伝達」にしか用いることができません。
ですがMessagePipeでは、プロセスを跨いだり、ネットワーク経由でメッセージ伝達を行うことが可能です。
UniRxが「局所的な利便性を追求したライブラリ」だとすると、MessagePipeは「よりシンプルな形で幅広く利用できるライブラリ」といえます。
実際に触ってみる
長々と話してきましたが、とりあえず導入して触ってみます。
Unityにインストールする
導入はPackageManagerよりUPM経由で導入しました。
Editor -> ProjectSettingsに次のようにOpenUPMに対するURLを追加します。
- Name:
OpenUPM - URL:
https://package.openupm.com - Scopes
com.cysharpjp.hadashikick.vcontainer
これでOpenUPMが利用可能になるので、PackageManagerより次の4つを導入します。
-
MessagePipe :
MessagePipeの本体 -
MessagePipe.VContainer :
MessagePipeをVContainer連携するのに必要 -
VContainer : Unity向けの
DI Containerフレームワーク -
UniTask :
MessagePipeが依存する非同期処理ライブラリ
これで準備は完了です。
同期・1対1で動かしてみる
ではもっともシンプルなパターンとして、「同期」「1対1」でメッセージ伝達を行ってみます。
今回は「入力イベント」をメッセージとして伝達しCubeを動かしてみます。

(キーボードイベントを伝達して、Cubeが動き回る実装を試す)
送るメッセージ内容
InputParamsという構造体を定義して、これをメッセージとして送信します。
using System;
using UnityEngine;
namespace MessagePipeSample.InputProvider
{
/// <summary>
/// ブロードキャストするメッセージ
/// 入力情報
/// </summary>
public readonly struct InputParams : IEquatable<InputParams>
{
/// <summary>
/// ジャンプフラグ
/// </summary>
public bool IsJump { get; }
/// <summary>
/// 移動操作
/// </summary>
public Vector3 Move { get; }
public InputParams(bool isJump, Vector3 move)
{
IsJump = isJump;
Move = move;
}
public bool Equals(InputParams other)
{
return IsJump == other.IsJump && Move.Equals(other.Move);
}
public override bool Equals(object obj)
{
return obj is InputParams other && Equals(other);
}
public override int GetHashCode()
{
return HashCode.Combine(IsJump, Move);
}
}
}
送信側
メッセージを送信する実体としてInputEventProviderを定義します。
このInputEventProviderがIPublisher<InputParams>を用いてメッセージを配信するようにします。
using MessagePipe;
using UnityEngine;
using VContainer.Unity;
namespace MessagePipeSample.InputProvider
{
public sealed class InputEventProvider : ITickable
{
/// <summary>
/// MessagePipeにメッセージを流す用のインタフェース
/// </summary>
private readonly IPublisher<InputParams> _inputPublisher;
public InputEventProvider(IPublisher<InputParams> inputPublisher)
{
_inputPublisher = inputPublisher;
}
// 毎フレーム実行
public void Tick()
{
// 入力状態を監視
var isJump = Input.GetKey(KeyCode.Space);
var axis = new Vector3(Input.GetAxis("Horizontal"), 0, Input.GetAxis("Vertical"));
// メッセージを作成
var inputParams = new InputParams(isJump, axis);
// メッセージ送信
_inputPublisher.Publish(inputParams);
}
}
}
受信側
メッセージを最終的に受け取りハンドリングするオブジェクトとしてMoveCubeを定義します。
これをCubeのGameObjectにアタッチし、Prefab化しておきます。
using Cysharp.Threading.Tasks;
using MessagePipe;
using UnityEngine;
using VContainer;
namespace MessagePipeSample.InputProvider
{
public sealed class MoveCube : MonoBehaviour
{
/// <summary>
/// MessagePipeからメッセージを受け取る用インタフェース
/// </summary>
[Inject] private ISubscriber<InputParams> _inputEventSubscriber;
// 各種フィールド
private CharacterController _characterController;
private readonly float JumpSpeed = 3.0f;
private readonly float MoveSpeed = 3.0f;
private void Start()
{
_characterController = GetComponent<CharacterController>();
// 入力イベントの受信を開始する
_inputEventSubscriber.Subscribe(OnInputEventReceived)
// MonoBehaviourに寿命を紐づける(これはUniTaskの機能)
.AddTo(this.GetCancellationTokenOnDestroy());
}
/// <summary>
/// 入力イベントを処理する
/// </summary>
private void OnInputEventReceived(InputParams input)
{
var moveVelocity = new Vector3(0, _characterController.velocity.y, 0);
if (input.IsJump && _characterController.isGrounded)
{
moveVelocity += Vector3.up * JumpSpeed;
}
moveVelocity += input.Move * MoveSpeed;
moveVelocity += Physics.gravity * Time.deltaTime;
_characterController.Move(moveVelocity * Time.deltaTime);
}
}
}
DIの設定をする
以上の実装ができたら、これらをVContainer経由でDIされるようにVContainerのLifetimeScopeを定義します。
using MessagePipe;
using UnityEngine;
using VContainer;
using VContainer.Unity;
namespace MessagePipeSample.InputProvider
{
public class GameLifetimeScope : LifetimeScope
{
// MoveCubeのPrefabへの参照
[SerializeField] private MoveCube _moveCubePrefab;
protected override void Configure(IContainerBuilder builder)
{
// MessagePipeの設定
var options = builder.RegisterMessagePipe();
// InputParamsを伝達できるように設定する
builder.RegisterMessageBroker<InputParams>(options);
// InputEventProviderを起動
builder.RegisterEntryPoint<InputEventProvider>(Lifetime.Singleton);
// MoveCubeをDIしながらInstantiate
builder.RegisterBuildCallback(resolver =>
{
resolver.Instantiate(_moveCubePrefab);
});
}
}
}
Unity上の最終的な状態
シーン上にLifetimeScopeがアタッチされたGameObjectを用意し、ここが起点となってシーン全体が初期化されます。
動かしてみる
このように、MessagePipeが提供する各種インタフェース、MessageBrokerを経由してメッセージ伝達ができました。
今回は「同期」「1対1」で試しましたが、メッセージのハンドリングを「非同期」にできたり、「多対多」も実現できます。
1対多にしてみる
1つのInputEventProvider(送信)に対して、複数のMoveCube(受信)を設定してみます。
といってもコードをいじる部分は殆どなく、単にMoveCubeを複数個Instantiateしてしまえばそれだけで勝手に「1対多」として動作します。これはMessagePiepが最初からメッセージ受信者の増減に対応しているためです。
using MessagePipe;
using UnityEngine;
using VContainer;
using VContainer.Unity;
namespace MessagePipeSample.InputProvider
{
public class GameLifetimeScope : LifetimeScope
{
[SerializeField] private MoveCube _moveCubePrefab;
protected override void Configure(IContainerBuilder builder)
{
// MessagePipeの設定
var options = builder.RegisterMessagePipe();
// InputParamsを伝達できるように設定する
builder.RegisterMessageBroker<InputParams>(options);
// InputEventProviderを起動
builder.RegisterEntryPoint<InputEventProvider>(Lifetime.Singleton);
// MoveCubeをDIしながらInstantiate
builder.RegisterBuildCallback(resolver =>
{
// ---------変更点ここから-----------
// 3つ並べて生成する
for (int i = 0; i < 3; i++)
{
var cube = resolver.Instantiate(_moveCubePrefab);
cube.transform.position = Vector3.forward * (i * 2f);
}
// ---------変更点ここまで----------
});
}
}
}

(MoveCubeが3つ生成され、それぞれにISubscriberが注入されSubscribeが実行される)
UniRxに連結してみる
ISubscriber<T>.AsObservable()を呼び出すことでIObservable<T>に変換することができます。
IObservable<T>にさえ変換できれば、あとはUniRxで好きにできます。
// 入力イベントの受信を開始する
_inputEventSubscriber
.AsObservable() // ↑ ここまでMessagePipe
.DistinctUntilChanged() // ↓ ここからUniRx
.TakeUntilDestroy(this)
.Subscribe(OnInputEventReceived);
まとめ
今回はMessagePipeの概要、導入方法、シンプルな実装例を紹介しました。
次回以降で「非同期的な使い方」「EventFactory」「Analyzer」などにも触れていく予定です。





