この記事は続きです。前回はこちら。
【Unity】MessagePipeを触ってみる1 ~概要・導入編~
この記事ではVContainer
の使用を前提としています。
記事に登場するコード類はこちらのリポジトリで公開しています。
さまざまなIPublisher/ISubscriber
MessagePipe
ではIPublisher<T>
/ISubscriber<T>
を用いてメッセージの送信と受信を行います(長いのでIPub/ISub
と略します)。
このIPub/ISub
ですがいくつかバリエーションが用意されており、それぞれで挙動が異なります。
今回はそのバリエーションを紹介します。
IPublisher<T>
/ISubscriber<T>
概要
まずはもっともベーシックなものです。
指定した型T
のデータを同期でそのまま即時伝達を行います。
例
using System;
namespace MessagePipeSample.PubSub.Normal
{
/// <summary>
/// 送信データ(適当に定義した型)
/// </summary>
public readonly struct SendData : IEquatable<SendData>
{
public int Id { get; }
public int Value { get; }
public SendData(int id, int value)
{
Id = id;
Value = value;
}
public bool Equals(SendData other)
{
return Id == other.Id && Value == other.Value;
}
public override bool Equals(object obj)
{
return obj is SendData other && Equals(other);
}
public override int GetHashCode()
{
return HashCode.Combine(Id, Value);
}
}
}
using System;
using MessagePipe;
using UnityEngine;
using VContainer.Unity;
namespace MessagePipeSample.PubSub.Normal
{
public class ReadWriteSample1 : IInitializable, IDisposable, ITickable
{
private readonly IPublisher<SendData> _publisher;
private readonly ISubscriber<SendData> _subscriber;
private IDisposable _disposable;
// それぞれをDIしてもらう
public ReadWriteSample1(
IPublisher<SendData> publisher,
ISubscriber<SendData> subscriber)
{
_publisher = publisher;
_subscriber = subscriber;
}
public void Initialize()
{
// 同じクラス内でPub/Subするのはナンセンスだけど、
// 使い方のサンプルということでゆるして
// SendDataを購読し、受信したらログにだす
_disposable = _subscriber.Subscribe(data =>
{
// 受信データのうち、Idが一致するもののみ処理する
if (data.Id == 0)
{
Debug.Log($"Received: Id={data.Id} Value={data.Value}");
}
});
}
public void Tick()
{
// 毎フレーム送信する
_publisher.Publish(new SendData(id: 0, UnityEngine.Random.Range(0, 100)));
}
public void Dispose()
{
// 購読中止
_disposable?.Dispose();
}
}
}
using MessagePipe;
using VContainer;
using VContainer.Unity;
namespace MessagePipeSample.PubSub.Normal
{
public class GameLifetimeScope : LifetimeScope
{
protected override void Configure(IContainerBuilder builder)
{
// MessagePipeの設定
var options = builder.RegisterMessagePipe();
// SendDataを伝達できるように設定する
builder.RegisterMessageBroker<SendData>(options);
// ReadWriteSample1を起動
builder.RegisterEntryPoint<ReadWriteSample1>(Lifetime.Singleton);
}
}
}
実行結果
毎フレーム、ログが繰り返し表示され続けます。
Received: Id=0 Value=2
Received: Id=0 Value=59
Received: Id=0 Value=21
Received: Id=0 Value=86
...(以下実行を停止するまで続く)
IPublisher<T>
/ISubscriber<T>
はすごくシンプルに、Publish()
されたときのデータを即時・同期ですべてのSubscriber
へと伝達します。これにより「非常にシンプルな形でのメッセージのグローバル配信」が可能となります。
考察:型が同じだが用途が異なる場合に、Pub/Sub
を別けて扱えるのか
IPublisher<T>
/ISubscriber<T>
は型を指定して送受信はできるが、これを「グループ分けして使えるのか」という疑問です。
(「グループAにはそれ専用のIPub/ISub
」を、「グループBにはそれ専用のそれを」といった区別ができるのか)
結論:IPublisher<TKey, T>
/ISubscriber<TKey, T>
を使おう
IPublisher<TKey, T>
/ISubscriber<TKey, T>
という、Key
を指定して送受信できるものが用意されているのでこちらを使いましょう。
使い方については後述します。
別解A: データの中に識別子を入れる
何らかの理由でIPublisher<TKey, T>
/ISubscriber<TKey, T>
が使えない場合にどうするか。簡単なやり方としては、送信するデータの中に識別用の何かを入れて、受信側でハンドリングするというやり方です。
_disposable = _subscriber.Subscribe(data =>
{
// 受信データのうち、Idが自分と一致するもののみ処理する
if (data.Id == _myId)
{
Debug.Log($"Received: Id={data.Id} Value={data.Value}");
}
});
この場合はSubscriber
側ですべてのデータを受けて、そこから不要なデータは捨てるという挙動になります。そのためSubscriber
の数が膨大な場合は無駄な負荷が発生します。
別解B: 用途ごとに型を変える
前提からひっくり返すパターン。しかし設計上、「別の意味・用途をもつデータ構造なら概念ごと別ける」というは重要だったりします。なので場合によってはこれが最適解のパターンになることもありそう。
// 用途が違うならそもそも型から別けてしまうべき
builder.RegisterMessageBroker<SendData1>(options);
builder.RegisterMessageBroker<SendData2>(options);
IPublisher<TKey, T>
/ISubscriber<TKey, T>
概要
挙動はIPublisher<T>
/ISubscriber<T>
とだいたい同じなのですが、TKey
を指定して送受信のグルーピングが可能になっています。
Publish
時に指定したTKey
と同じ値でSubscribe
している対象にのみメッセージが届くように、MessageBroker
の内部でフィルタリングが実行されます。このため、関係ないSubscriber
に不必要にデータを送ることがなくなり負荷軽減にも繋がります。
(MessageBroker
の内部でグループ分けされるため、異なるTKey
のSubscriber
にはメッセージ送信はされない)
なお内部実装的にはTKey
をKeyとしたDictionary
が用いられています。そのためTKey
として扱うオブジェクトには適切にGetHashCode
を実装することを忘れないでください。
例
using System;
namespace MessagePipeSample.PubSub.Key
{
/// <summary>
/// 識別子
/// </summary>
public readonly struct KeyData : IEquatable<KeyData>
{
public int _raw { get; }
public KeyData(int raw)
{
_raw = raw;
}
public bool Equals(KeyData other)
{
return _raw == other._raw;
}
public override bool Equals(object obj)
{
return obj is KeyData other && Equals(other);
}
public override int GetHashCode()
{
return _raw;
}
}
}
using System;
namespace MessagePipeSample.PubSub.Key
{
/// <summary>
/// 送信データ(適当に定義した型)
/// </summary>
public readonly struct SendData : IEquatable<SendData>
{
public string Value { get; }
public SendData(string value)
{
Value = value;
}
public bool Equals(SendData other)
{
return Value == other.Value;
}
public override bool Equals(object obj)
{
return obj is SendData other && Equals(other);
}
public override int GetHashCode()
{
return (Value != null ? Value.GetHashCode() : 0);
}
}
}
using MessagePipe;
using VContainer.Unity;
namespace MessagePipeSample.PubSub.Key
{
/// <summary>
/// 順繰りにデータを送る
/// </summary>
public sealed class RoundRobinSender : ITickable
{
private readonly IPublisher<KeyData, SendData> _publisher;
private int _roundRobinIndex;
public RoundRobinSender(IPublisher<KeyData, SendData> publisher)
{
_publisher = publisher;
}
public void Tick()
{
// 0,1,2 を順繰りにデータ送信する
_roundRobinIndex = (_roundRobinIndex + 1) % 3;
var key = new KeyData(_roundRobinIndex);
var sendData = new SendData($"Key={_roundRobinIndex},Value={UnityEngine.Random.Range(0, 100)}");
// Keyを指定して送信
_publisher.Publish(key, sendData);
}
}
}
using System;
using MessagePipe;
using UnityEngine;
using VContainer.Unity;
namespace MessagePipeSample.PubSub.Key
{
public sealed class Receiver : IInitializable, IDisposable
{
private readonly ISubscriber<KeyData, SendData> _subscriber;
private IDisposable _disposable;
public Receiver(ISubscriber<KeyData, SendData> subscriber)
{
_subscriber = subscriber;
}
public void Initialize()
{
// まとめて破棄するやつ
var bag = DisposableBag.CreateBuilder();
// --- Key = 0 のみを購読 ---
_subscriber.Subscribe(new KeyData(0), data =>
{
Debug.Log($"MyKey=0, ReceivedData=[{data.Value}]");
})
.AddTo(bag);
// --- Key = 1 のみを購読 ---
_subscriber.Subscribe(new KeyData(1), data =>
{
Debug.Log($"MyKey=1, ReceivedData=[{data.Value}]");
})
.AddTo(bag);
// --- Key = 2 は購読しない ---
_disposable = bag.Build();
}
public void Dispose()
{
_disposable?.Dispose();
}
}
}
using MessagePipe;
using VContainer;
using VContainer.Unity;
namespace MessagePipeSample.PubSub.Key
{
public class GameLifetimeScope : LifetimeScope
{
protected override void Configure(IContainerBuilder builder)
{
// MessagePipeの設定
var options = builder.RegisterMessagePipe();
// KeyDataでグループ別けしてSendDataを送受信する
builder.RegisterMessageBroker<KeyData, SendData>(options);
builder.RegisterEntryPoint<RoundRobinSender>();
builder.RegisterEntryPoint<Receiver>();
}
}
}
実行結果
KeyData
を用いてフィルタリングされた形でメッセージの受信が実行されます。
MyKey=1, ReceivedData=[Key=1,Value=92]
MyKey=0, ReceivedData=[Key=0,Value=28]
MyKey=1, ReceivedData=[Key=1,Value=13]
MyKey=0, ReceivedData=[Key=0,Value=78]
MyKey=1, ReceivedData=[Key=1,Value=2]
MyKey=0, ReceivedData=[Key=0,Value=44]
...(以下、Key=0/1を交互に繰り返し)
考察: Subscriberが居ないTKeyへのPublishはどうなるのか
「誰も購読していないTKey
」を指定してPublish
を行うとどうなるのでしょうか。
結論: 送信すらされない
IPublisher<TKey, T>
/ISubscriber<TKey, T>
では、TKey
に応じて別けてSubscriber
を管理しています。そしてSubscriber
が存在しないTKey
については、そもそも送信処理が実行されず無視されるようになっています。
IBufferedPublisher<T>
/IBufferedSubscriber<T>
概要
IBufferedPublisher<T>
/IBufferedSubscriber<T>
は「直前にPublish
された値を1つだけ保持する」という挙動をします。つまり「Subscribe
時に最新の値があればそれを発行し、以降は通常のIPub
/ISub
と同じ」という動作をします。
なおT
がclass
であり、かつnull
がPublish
されている(または1度もPublishしていない
)場合はSubscribe
時の最新値の発行は行われません。
例
using System;
namespace MessagePipeSample.PubSub.Buffered
{
/// <summary>
/// 送信データ
/// </summary>
public readonly struct SendData : IEquatable<SendData>
{
public string Value { get; }
public SendData(string value)
{
Value = value;
}
public bool Equals(SendData other)
{
return Value == other.Value;
}
public override bool Equals(object obj)
{
return obj is SendData other && Equals(other);
}
public override int GetHashCode()
{
return (Value != null ? Value.GetHashCode() : 0);
}
}
}
using System;
using Cysharp.Threading.Tasks;
using MessagePipe;
using UnityEngine;
using VContainer.Unity;
namespace MessagePipeSample.PubSub.Buffered
{
public sealed class BufferSender : IInitializable
{
private readonly IBufferedPublisher<SendData> _bufferedPublisher;
public BufferSender(IBufferedPublisher<SendData> bufferedPublisher)
{
_bufferedPublisher = bufferedPublisher;
}
public void Initialize()
{
// 2回送信する
_bufferedPublisher.Publish(new SendData("1"));
// この2度目がBufferに積まれた状態でしばらく置かれる
_bufferedPublisher.Publish(new SendData("2"));
Debug.Log("Sender: Message sent.");
UniTask.Void(async () =>
{
Debug.Log("Wait for a second...");
await UniTask.Delay(TimeSpan.FromSeconds(1));
// しばらくたったから次の値をPublish
_bufferedPublisher.Publish(new SendData("3"));
});
}
}
}
using System;
using MessagePipe;
using UnityEngine;
using VContainer.Unity;
namespace MessagePipeSample.PubSub.Buffered
{
public sealed class BufferReceiver : IStartable, IDisposable
{
private readonly IBufferedSubscriber<SendData> _bufferedSubscriber;
private IDisposable _disposable;
public BufferReceiver(IBufferedSubscriber<SendData> bufferedSubscriber)
{
_bufferedSubscriber = bufferedSubscriber;
}
// Sender.Initialize()よりこっちの方が実行開始が遅い
public void Start()
{
Debug.Log("BufferReceiver: Subscribe start.");
_disposable = _bufferedSubscriber.Subscribe(data =>
{
Debug.Log($"Received! :{data.Value}");
});
}
public void Dispose()
{
_disposable?.Dispose();
}
}
}
using MessagePipe;
using VContainer;
using VContainer.Unity;
namespace MessagePipeSample.PubSub.Buffered
{
public class GameLifetimeScope : LifetimeScope
{
protected override void Configure(IContainerBuilder builder)
{
// MessagePipeの設定
var options = builder.RegisterMessagePipe();
// KeyDataでグループ別けしてSendDataを送受信する
builder.RegisterMessageBroker<SendData>(options);
builder.RegisterEntryPoint<BufferSender>();
builder.RegisterEntryPoint<BufferReceiver>();
}
}
}
実行結果
// この前に発行された「1」は無視される
Sender: Message sent. // この時点で「2」がBufferに積まれている
Wait for a second...
BufferReceiver: Subscribe start.
Received! :2 // Bufferされていた「2」が出力
Received! :3 // 後にPublishされた「3」
IAsyncPublisher<T>
/IAsyncSubscriber<T>
概要
IAsyncPublisher<T>
/IAsyncSubscriber<T>
(長いのでIAP<T>
/IAS<T>
と書きます)は、IPub<T>
/ISub<T>
の非同期版です。何が非同期なのかというと、「メッセージの送信時に、受信側の処理を完了するのを待つ」の部分が非同期になっています。
AsyncPublishStrategy
IAP<T>
/IAS<T>
はAsyncPublishStrategy
を指定することで非同期処理の戦略を変更することができます。
AsyncPublishStrategy.Parallel
Parallel
を指定した場合は並行動作となります。
PublishAsync
したタイミングで各Subscriber
へ一斉にメッセージを発送し、Subscriber
側の非同期処理がすべて終わるのを待機します。
(一斉に非同期処理を実行してUniTask.WhenAll
で待ち受けるイメージ)
AsyncPublishStrategy.Sequential
Sequential
を指定した場合は直列動作となります。
PublishAsync
したタイミングで各Subscriber
へ1つずつ順番にメッセージを発送します。発送先のSubscriber
で非同期処理が終わるのを待機し、完了したら次のSubscriber
へ発送するという処理を繰り返します。
実装例
実際の動きを見たほうがわかりやすいと思うので実装例を。
- シーン上にCubeが配置されている
- 「リセットボタン」でCubeの位置がランダムに初期化される
- 「Sequential」を押すとAsyncPublishStrategy.SequentialでCubeをゴールに移動させるメッセージを送る
- 「Parallel」を押すとAsyncPublishStrategy.ParallelでCubeをゴールに移動させるメッセージを送る
- 移動中は各ボタン操作を無効化する
という挙動をIAP<T>
/ISP<T>
を用いて実装してみます。
送信メッセージ
今回は「ゴール先の座標送信」と「位置のリセット」で2つメッセージを用意します。
using System;
using UnityEngine;
namespace MessagePipeSample.PubSub.Async
{
/// <summary>
/// 目的地情報
/// </summary>
public readonly struct TargetPosition : IEquatable<TargetPosition>
{
public Vector3 Position { get; }
public TargetPosition(Vector3 position)
{
Position = position;
}
public bool Equals(TargetPosition other)
{
return Position.Equals(other.Position);
}
public override bool Equals(object obj)
{
return obj is TargetPosition other && Equals(other);
}
public override int GetHashCode()
{
return Position.GetHashCode();
}
}
}
namespace MessagePipeSample.PubSub.Async
{
// 状態リセットイベント
public readonly struct ResetEvent
{
public static readonly ResetEvent Default = new ResetEvent();
}
}
移動するCube
Cube
側に貼り付けるコンポーネントです。
IAsyncSubscriber<TargetPosition>
からゴール先の座標を受け取ったら、非同期処理として移動処理を実行します。
using System.Threading;
using Cysharp.Threading.Tasks;
using MessagePipe;
using UnityEngine;
using VContainer;
namespace MessagePipeSample.PubSub.Async
{
public sealed class MovingCube : MonoBehaviour
{
[Inject] private IAsyncSubscriber<TargetPosition> _asyncTargetSubscriber;
[Inject] private ISubscriber<ResetEvent> _resetEventSubscriber;
private readonly float _speed = 5.0f;
private void Start()
{
_asyncTargetSubscriber.Subscribe(async (target, ct) =>
{
await MoveToTargetAsync(target.Position, ct);
})
.AddTo(this.GetCancellationTokenOnDestroy());
_resetEventSubscriber.Subscribe(_ => ResetPosition())
.AddTo(this.GetCancellationTokenOnDestroy());
}
/// <summary>
/// 指定座標に向かってゆっくり移動する
/// </summary>
private async UniTask MoveToTargetAsync(Vector3 target, CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
// 目標地点との差分
var delta = (target - transform.position);
// 1.0m以内なら到着
if (delta.sqrMagnitude < 1.0f)
{
return;
}
// 離れているなら目標に向かって移動する
transform.position += _speed * (delta.normalized) * Time.deltaTime;
await UniTask.Yield();
}
}
private void ResetPosition()
{
transform.position =
new Vector3(Random.Range(-5f, 5f), 0, Random.Range(-5f, 5f));
}
}
}
UI操作部分
uGUI
のButton
と連動してメッセージを発行します。
(GetAsyncClickEventHandlerの使い方はこちらを参考に)
ここで「移動ボタン」が押されたときに各Cubeへメッセージを送り、それぞれの移動が終わるのを待つという処理をIAP<T>/IAS<T>
で実現しています。
using System.Threading;
using Cysharp.Threading.Tasks;
using MessagePipe;
using UnityEngine;
using UnityEngine.UI;
using VContainer;
using Random = UnityEngine.Random;
namespace MessagePipeSample.PubSub.Async
{
/// <summary>
/// uGUIに連動してCubeに命令を出す
/// </summary>
public sealed class CubeController : MonoBehaviour
{
[Inject] private IAsyncPublisher<TargetPosition> _asyncPublisher;
[Inject] private IPublisher<ResetEvent> _resetPublisher;
// 各種ボタン
[SerializeField] private Button _moveParallelButton;
[SerializeField] private Button _moveSequentialButton;
[SerializeField] private Button _resetButton;
[SerializeField] private Transform _goalMarkerObject;
private void Start()
{
var ct = this.GetCancellationTokenOnDestroy();
// ボタンイベントの購読開始
WaitForResetButtonAsync(ct).Forget();
WaitForMoveParallelButtonAsync(ct).Forget();
WaitForMoveSequentialButtonAsync(ct).Forget();
}
/// <summary>
/// リセットボタンの処理
/// </summary>
private async UniTaskVoid WaitForResetButtonAsync(CancellationToken ct)
{
var asyncHandler = _resetButton.GetAsyncClickEventHandler(ct);
while (!ct.IsCancellationRequested)
{
// ボタンがクリックされたらリセットメッセージを発行
await asyncHandler.OnClickAsync();
_resetPublisher.Publish(ResetEvent.Default);
}
}
/// <summary>
/// 移動ボタン(並行処理)
/// </summary>
private async UniTaskVoid WaitForMoveParallelButtonAsync(CancellationToken ct)
{
var asyncHandler = _moveParallelButton.GetAsyncClickEventHandler(ct);
while (!ct.IsCancellationRequested)
{
await asyncHandler.OnClickAsync();
// PublishAsyncが完了するまでボタンを無効化
SwitchButtonInteractable(false);
// ランダムに移動先を決定
var target = new UnityEngine.Vector3(Random.Range(-5f, 5f), 0, Random.Range(-5f, 5f));
_goalMarkerObject.transform.position = target;
// Cubeにメッセージを送信し、移動が終わるまで待つ
// AsyncPublishStrategy.Parallel を指定しているので並行実行
await _asyncPublisher.PublishAsync(new TargetPosition(target),
AsyncPublishStrategy.Parallel, ct);
// ボタンを再有効化
SwitchButtonInteractable(true);
}
}
/// <summary>
/// 移動ボタン(直列処理)
/// </summary>
private async UniTaskVoid WaitForMoveSequentialButtonAsync(CancellationToken ct)
{
var asyncHandler = _moveSequentialButton.GetAsyncClickEventHandler(ct);
while (!ct.IsCancellationRequested)
{
await asyncHandler.OnClickAsync();
// PublishAsyncが完了するまでボタンを無効化
SwitchButtonInteractable(false);
// ランダムに移動先を決定
var target = new UnityEngine.Vector3(Random.Range(-5f, 5f), 0, Random.Range(-5f, 5f));
_goalMarkerObject.transform.position = target;
// Cubeにメッセージを送信し、移動が終わるまで待つ
// AsyncPublishStrategy.Sequential を指定しているので直列実行
await _asyncPublisher.PublishAsync(new TargetPosition(target),
AsyncPublishStrategy.Sequential, ct);
// ボタンを再有効化
SwitchButtonInteractable(true);
}
}
private void SwitchButtonInteractable(bool isEnabled)
{
_moveParallelButton.interactable = isEnabled;
_moveSequentialButton.interactable = isEnabled;
_resetButton.interactable = isEnabled;
}
}
}
DI設定
using MessagePipe;
using UnityEngine;
using VContainer;
using VContainer.Unity;
namespace MessagePipeSample.PubSub.Async
{
public class GameLifetimeScope : LifetimeScope
{
[SerializeField] private MovingCube _cubePrefab;
protected override void Configure(IContainerBuilder builder)
{
// MessagePipeの設定
var options = builder.RegisterMessagePipe();
// デフォルトの非同期処理のやり方
// PublishAsync時に個別指定もできる
options.DefaultAsyncPublishStrategy = AsyncPublishStrategy.Parallel;
builder.RegisterMessageBroker<TargetPosition>(options);
builder.RegisterMessageBroker<ResetEvent>(options);
// DIしながらInstantiate
builder.RegisterBuildCallback(resolver =>
{
// 3つ生成する
for (int i = 0; i < 3; i++)
{
resolver.Instantiate(_cubePrefab);
}
});
}
}
}
実行結果
- 並行実行時
AsyncPublishStrategy.Parallel
でメッセージ送信した場合の挙動です。
すべてのCube
へ一斉にメッセージを渡して移動処理を行い、すべてが終わるまで待ちます。
- 直列実行時
AsyncPublishStrategy.Sequential
でメッセージ送信した場合の挙動です。
1つずつ順番にCube
へとメッセージを渡して移動処理を行い、それが終わったら次のCube
へ…という処理を終わるまで待ちます。
考察
IAP<T>
/ISP<T>
はObservable
では表現できなかった機能なので、MessagePipe
の目玉機能だと思います。(一応、UniRxでもAsyncMessageBroker
とかあったけど)
IAsyncPublisher<TKey, T>
/IAsyncSubscriber<TKey, T>
IAP<T>
/ISP<T>
のTkey
が指定できる版です。IPublisher<TKey, T>
/ISubscriber<TKey, T>
が非同期になったのと同等なので説明は省略します。
IBufferedAsyncPublisher<T>
/IBufferedAsyncSubscriber<T>
IBufferedPublisher<T>
/IBufferedSubscriber<T>
の非同期版です。
挙動はIBufferedPublisher<T>
/IBufferedSubscriber<T>
とだいたい同じですが、Subscribe
自体がAsync
になっている点が異なります。
// SubscribeAsync()の戻り値はUniTask<IDisposable>
// Subscribe時に最新値があれば即発行されて非同期処理が実行される
var disposable = await _asyncBufferedTargetSubscriber.SubscribeAsync(
async (target, ct) =>
{
await MoveToTargetAsync(target.Position, ct);
});
IDistributedPublisher<TKey, TMessage>
/IDistributedSubscriber<TKey, TMessage>
IDistributedPublisher<TKey, TMessage>
/IDistributedSubscriber<TKey, TMessage>
は分散化された(ネットワークを跨いだ)メッセージ伝達に持ちいることができます。
長くなるのでこれの解説は別記事で。
まとめ
どのパターンのPub/Sub
も便利なので、ユースケースに応じて適切なものを選択できるとよいでしょう。
個人的にはAsyncPub/AsyncSub
の挙動がかなり面白くて気に入っているので、使える場面があればガンガン使って行きたいと思います。