この記事の内容
gRPC
の ServerStreaming や DuplexStreaming を利用すると、サーバープッシュ機能を実装できます。
そのような場合に Observable パターンを用いると配信と購読の実装をきれいに分離でき、ReactiveExtensions
との親和性も高まります。簡単かつ柔軟にレスポンスストリーム(の代わりになるシーケンス)を生成でき、クライアント側のテストがしやすくなると思います。
このようなメリットを目論見ながら、レスポンスストリームに対する Observable パターンを実装してみます。
前提知識
gRPC
の説明は割愛します。ストリーミングに関する基本的な知識が必要です。
一般的な実装
通常はレスポンスストリームからの受け取りをループ処理として実装します。
// レスポンスストリームの読み込みが完了するまでブロックされるため、待機しないように実装する必要があります。
Task nowait = Task.Run(async () =>
{
// client.Receive は ServerStreaming メソッド
using (var call = client.Receive(request, callOptions))
{
try
{
while (await call.ResponseStream.MoveNext().ConfigureAwait(false))
{
// レスポンスを受け取ったときの処理
OnResponse(call.ResponseStream.Current);
}
catch (Exception ex)
{
// 発生した例外に対する処理。キャンセルした場合も例外が発生します。
}
}
}
}
);
Observable パターンでの実装
RPCメソッドの戻り値からレスポンスを受け取って配信/購読するように実装します。
Observable(配信クラス)
IObservable<TResponse>
インターフェースを実装したクラスを定義します。
ServerStreaming と DuplexStreaming で共通する実装が多いため、主処理を基底クラスとして実装しています。
レスポンスストリームに対する処理を提供するインターフェースと、そのインターフェースを実装するAsyncServerStreamingCall<TResponse>
クラスとAsyncDuplexStreamingCall<TRequest, TResponse>
クラスに対するラッパークラスを定義し、基底クラスを用いずに透過的に実装する方法も考えられます。
/// <summary>
/// 配信オブジェクトの基本実装。
/// </summary>
/// <typeparam name="TResponse">レスポンスの型</typeparam>
public abstract class GrpcStreamingObservableBase<TResponse>
: IObservable<TResponse>, IDisposable
{
#region コンストラクタ
/// <summary>
/// レスポンスストリームを指定してインスタンスを生成します。
/// </summary>
/// <param name="responseStream">レスポンスストリーム</param>
protected GrpcStreamingObservableBase(IAsyncStreamReader<TResponse> responseStream)
{
m_ResponseStream = responseStream;
}
#endregion
#region デストラクタ
/// <summary>
/// デストラクタ
/// </summary>
~GrpcStreamingObservableBase()
{
Dispose(false);
}
#endregion
#region dispose
/// <summary>
/// 使用しているリソースを解放します。
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// 使用しているリソースを解放します。
/// </summary>
/// <param name="disposing">dispose メソッドから呼び出されたかどうか</param>
protected virtual void Dispose(bool disposing)
{
ReleaseObservers();
ReleaseSubscribers();
}
#endregion
#region Stream
/// <summary>
/// レスポンスストリーム
/// </summary>
private IAsyncStreamReader<TResponse> m_ResponseStream;
#endregion
#region 監視
/// <summary>
/// 監視オブジェクトを格納するコレクション
/// </summary>
private readonly List<IObserver<TResponse>> m_Observers = new List<IObserver<TResponse>>();
/// <summary>
/// 監視を開始します。
/// </summary>
/// <returns></returns>
public async Task ObserveAsync()
{
try
{
while (await m_ResponseStream.MoveNext().ConfigureAwait(false))
{
OnNext(m_ResponseStream.Current);
}
OnCompleted();
}
catch (Exception ex)
{
OnError(ex);
throw;
}
finally
{
}
}
/// <summary>
/// 監視オブジェクトを解放します。
/// </summary>
private void ReleaseObservers()
{
lock (m_Observers)
{
m_Observers.Clear();
}
}
#endregion
#region IObservable<Response> インターフェースの実装
/// <summary>
/// 完了したときの処理を行います。
/// </summary>
private void OnCompleted()
{
if (m_Observers.Count == 0) { return; }
lock (m_Observers)
{
if (m_Observers.Count > 0)
{
m_Observers.ForEach(o => o.OnCompleted());
m_Observers.Clear();
}
}
}
/// <summary>
/// 例外が発生したときの処理を行います。
/// </summary>
/// <param name="error"></param>
private void OnError(Exception error)
{
if (m_Observers.Count == 0) { return; }
lock (m_Observers)
{
if (m_Observers.Count > 0)
{
m_Observers.ForEach(o => o.OnError(error));
m_Observers.Clear();
}
}
}
/// <summary>
/// レスポンスを受け取ったときの処理を行います。
/// </summary>
/// <param name="response"></param>
private void OnNext(TResponse response)
{
if (m_Observers.Count == 0) { return; }
lock (m_Observers)
{
if (m_Observers.Count > 0)
{
m_Observers.ForEach(o => o.OnNext(response));
}
}
}
#endregion
#region 購読
/// <summary>
/// 購読オブジェクトを格納するコレクション
/// </summary>
private readonly List<IDisposable> m_Subscribers = new List<IDisposable>();
/// <summary>
/// 指定された監視オブジェクトを登録します。
/// </summary>
/// <param name="observer">監視オブジェクト</param>
/// <returns>購読オブジェクト</returns>
public IDisposable Subscribe(IObserver<TResponse> observer)
{
lock (m_Observers)
{
m_Observers.Add(observer);
}
// dispose されたら監視オブジェクトを削除する
Action onDispose = () =>
{
lock (m_Observers)
{
m_Observers.Remove(observer);
}
};
IDisposable subscriber = new Subscriber(onDispose);
lock (m_Subscribers)
{
m_Subscribers.Add(subscriber);
}
return subscriber;
}
/// <summary>
/// 購読オブジェクトを解放します。
/// </summary>
private void ReleaseSubscribers()
{
lock (m_Subscribers)
{
m_Subscribers.ForEach(o => o.Dispose());
m_Subscribers.Clear();
}
}
/// <summary>
/// 購読オブジェクト。
/// </summary>
private sealed class Subscriber : IDisposable
{
/// <summary>
/// 解放処理を指定してインスタンスを生成します。
/// </summary>
/// <param name="onDispose">解放処理</param>
internal Subscriber(Action onDispose)
{
m_OnDispose = onDispose;
}
/// <summary>
/// デストラクタ
/// </summary>
~Subscriber()
{
Dispose(false);
}
private Action m_OnDispose;
/// <summary>
/// 使用しているリソースを解放します。
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(true);
}
/// <summary>
/// 使用しているリソースを解放します。
/// </summary>
/// <param name="disposing"></param>
private void Dispose(bool disposing)
{
m_OnDispose();
}
}
#endregion
}
基底クラスを継承し、ServerStreaming に対する配信クラスを実装します。
/// <summary>
/// ServerStreaming に対する配信オブジェクト。
/// </summary>
/// <typeparam name="TResponse">レスポンスの型</typeparam>
public sealed class GrpcServerStreamingObservable<TResponse> : GrpcStreamingObservableBase<TResponse>
{
/// <summary>
/// コンストラクタ
/// </summary>
/// <param name="call">呼び出しオブジェクト</param>
/// <param name="disposableCall">配信オブジェクトの dispose 時に呼び出しオブジェクトを dispose するかどうか</param>
private GrpcServerStreamingObservable(AsyncServerStreamingCall<TResponse> call, bool disposableCall)
: base(call.ResponseStream)
{
m_Call = call;
m_DisposableCall = disposableCall;
}
/// <summary>
/// ServerStreaming に対する配信オブジェクトを生成します。
/// </summary>
/// <param name="call">呼び出しオブジェクト</param>
/// <param name="disposableCall">配信オブジェクトの dispose 時に呼び出しオブジェクトを dispose するかどうか</param>
public static GrpcServerStreamingObservable<TResponse> Observe(AsyncServerStreamingCall<TResponse> call, bool disposableCall)
{
return new GrpcServerStreamingObservable<TResponse>(call, disposableCall);
}
private AsyncServerStreamingCall<TResponse> m_Call;
private bool m_DisposableCall;
/// <summary>
/// 使用しているリソースを解放します。
/// </summary>
/// <param name="disposing">dispose メソッドから呼び出されたかどうか</param>
protected override void Dispose(bool disposing)
{
try
{
if (m_DisposableCall) { m_Call.Dispose(); }
}
finally
{
base.Dispose(disposing);
}
}
}
同様に DuplexStreaming に対する配信クラスを実装します。
リクエストストリームに対する書き込みメソッドと完了メソッドを実装しています。
/// <summary>
/// DuplexStreaming に対する配信オブジェクト。
/// </summary>
/// <typeparam name="TRequest">リクエストの型</typeparam>
/// <typeparam name="TResponse">レスポンスの型</typeparam>
public sealed class GrpcDuplexStreamingObservable<TRequest, TResponse>
: GrpcStreamingObservableBase<TResponse>
{
/// <summary>
/// コンストラクタ
/// </summary>
/// <param name="call">呼び出しオブジェクト</param>
/// <param name="disposableCall">配信オブジェクトの dispose 時に呼び出しオブジェクトを dispose するかどうか</param>
private GrpcDuplexStreamingObservable(AsyncDuplexStreamingCall<TRequest, TResponse> call, bool disposableCall)
: base(call.ResponseStream)
{
m_Call = call;
m_DisposableCall = disposableCall;
}
/// <summary>
/// DuplexStreaming に対する配信オブジェクトを生成します。
/// </summary>
/// <param name="call">呼び出しオブジェクト</param>
/// <param name="disposableCall">配信オブジェクトの dispose 時に呼び出しオブジェクトを dispose するかどうか</param>
public static GrpcDuplexStreamingObservable<TRequest, TResponse> Observe(AsyncDuplexStreamingCall<TRequest, TResponse> call, bool disposableCall)
{
return new GrpcDuplexStreamingObservable<TRequest, TResponse>(call, disposableCall);
}
private AsyncDuplexStreamingCall<TRequest, TResponse> m_Call;
private bool m_DisposableCall;
/// <summary>
/// 使用しているリソースを解放します。
/// </summary>
/// <param name="disposing">dispose メソッドから呼び出されたかどうか</param>
protected override void Dispose(bool disposing)
{
try
{
if (m_DisposableCall) { m_Call.Dispose(); }
}
finally
{
base.Dispose(disposing);
}
}
/// <summary>
/// 指定されたリクエストを書き込みます。
/// </summary>
/// <param name="request">リクエスト</param>
/// <returns></returns>
public async Task WriteRequestAsync(TRequest request)
{
await m_Call.RequestStream.WriteAsync(request).ConfigureAwait(false);
}
/// <summary>
/// リクエストの完了を通知します。
/// </summary>
/// <returns></returns>
public async Task CompleteRequestAsync()
{
await m_Call.RequestStream.CompleteAsync().ConfigureAwait(false);
}
}
配信クラスを生成する拡張メソッド
上記の配信クラスを生成するメソッドです。AsyncServerStreamingCall<TResponse>
クラスとAsyncDuplexStreamingCall<TRequest, TResponse>
クラスに対する拡張メソッドとして定義しています。
/// <summary>
/// Observable 機能を提供します。
/// </summary>
public static class GrpcObservable
{
/// <summary>
/// DuplexStreaming に対する配信オブジェクトを生成します。
/// </summary>
/// <typeparam name="TRequest">リクエストの型</typeparam>
/// <typeparam name="TResponse">レスポンスの型</typeparam>
/// <param name="call">呼び出しオブジェクト</param>
/// <param name="disposableCall">配信オブジェクトの dispose 時に呼び出しオブジェクトを dispose するかどうか</param>
/// <returns>配信オブジェクト</returns>
public static GrpcDuplexStreamingObservable<TRequest, TResponse> ObserveDuplexStreaming<TRequest, TResponse>(AsyncDuplexStreamingCall<TRequest, TResponse> call, bool disposableCall = true)
{
return GrpcDuplexStreamingObservable<TRequest, TResponse>.Observe(call, disposableCall);
}
/// <summary>
/// ServerStreaming に対する配信オブジェクトを生成します。
/// </summary>
/// <typeparam name="TResponse">レスポンスの型</typeparam>
/// <param name="call">呼び出しオブジェクト</param>
/// <param name="disposableCall">配信オブジェクトの dispose 時に呼び出しオブジェクトを dispose するかどうか</param>
/// <returns>配信オブジェクト</returns>
public static GrpcServerStreamingObservable<TResponse> ObserveServerStreaming<TResponse>(AsyncServerStreamingCall<TResponse> call, bool disposableCall = true)
{
return GrpcServerStreamingObservable<TResponse>.Observe(call, disposableCall);
}
/// <summary>
/// DuplexStreaming に対する配信オブジェクトを生成します。
/// </summary>
/// <typeparam name="TRequest">リクエストの型</typeparam>
/// <typeparam name="TResponse">レスポンスの型</typeparam>
/// <param name="call">呼び出しオブジェクト</param>
/// <param name="disposableCall">配信オブジェクトの dispose 時に呼び出しオブジェクトを dispose するかどうか</param>
/// <returns>配信オブジェクト</returns>
public static GrpcDuplexStreamingObservable<TRequest, TResponse> ToObservable<TRequest, TResponse>(this AsyncDuplexStreamingCall<TRequest, TResponse> call, bool disposableCall = true)
{
return ObserveDuplexStreaming(call, disposableCall);
}
/// <summary>
/// ServerStreaming に対する配信オブジェクトを生成します。
/// </summary>
/// <typeparam name="TResponse">レスポンスの型</typeparam>
/// <param name="call">呼び出しオブジェクト</param>
/// <param name="disposableCall">配信オブジェクトの dispose 時に呼び出しオブジェクトを dispose するかどうか</param>
/// <returns>配信オブジェクト</returns>
public static GrpcServerStreamingObservable<TResponse> ToObservable<TResponse>(this AsyncServerStreamingCall<TResponse> call, bool disposableCall = true)
{
return ObserveServerStreaming(call, disposableCall);
}
監視クラス(Observer)
ほとんどの場合、監視は OnNext, OnError, OnComplete に対するデリゲートを指定できればよいですので、デリゲートを受け取ってIObserver<TResponse>
インターフェースを返すメソッドとして定義しました。前述のGrpcObservable
クラスに定義しています。
ReactiveExtensions
を利用しているのであれば、IObservable<TResponse>
インターフェースに対する Subscribe 拡張メソッドを利用するのがよいと思います。
/// <summary>
/// Observable 機能を提供します。
/// </summary>
public static class GrpcObservable
{
/// <summary>
/// 監視オブジェクトを生成します。
/// </summary>
/// <typeparam name="TResponse">レスポンスの型</typeparam>
/// <param name="onResponse">レスポンスを受け取ったときの処理</param>
/// <param name="onError">例外が発生したときの処理</param>
/// <param name="onComplete">完了したときの処理</param>
/// <returns>監視オブジェクト</returns>
public static IObserver<TResponse> CreateObserver<TResponse>(Action<TResponse> onResponse, Action<Exception> onError = null, Action onComplete = null)
{
return new GrpcResponseObserver<TResponse>(onResponse, onError, onComplete);
}
/// <summary>
/// 監視オブジェクト
/// </summary>
/// <typeparam name="TResponse">レスポンスの型</typeparam>
private sealed class GrpcResponseObserver<TResponse> : IObserver<TResponse>
{
/// <summary>
/// インスタンスを生成します。
/// </summary>
/// <param name="onResponse">レスポンスを受け取ったときの処理</param>
/// <param name="onError">例外が発生したときの処理</param>
/// <param name="onComplete">完了したときの処理</param>
public GrpcResponseObserver(Action<TResponse> onResponse, Action<Exception> onError, Action onComplete)
{
m_OnNext = onResponse;
m_OnError = onError;
m_OnComplete = onComplete;
}
private Action<TResponse> m_OnNext;
private Action<Exception> m_OnError;
private Action m_OnComplete;
/// <summary>
/// 完了したときの処理を行います。
/// </summary>
public void OnCompleted()
{
if (m_OnComplete != null) { m_OnComplete(); }
}
/// <summary>
/// 例外が発生したときの処理を行います。
/// </summary>
/// <param name="error"></param>
public void OnError(Exception error)
{
if (m_OnError != null) { m_OnError(error); }
}
/// <summary>
/// レスポンスを受け取ったときの処理を行います。
/// </summary>
/// <param name="value"></param>
public void OnNext(TResponse value)
{
if (m_OnNext != null) { m_OnNext(value); }
}
}
}
アプリケーションコードから使用する
上記のクラスをアプリケーションコードから使用する手順です。
このドキュメントでは一続きに記述していますが、通常はRPCメソッド呼び出し/監視開始/購読開始/購読終了/監視終了は別々のタイミングで実行することになると思います。
購読前に監視を開始した場合、受け取ったレスポンスは捨てられます。
- RPCメソッドを実行し、配信オブジェクトを生成します。
- 監視オブジェクトを生成します。
- 監視オブジェクトを渡して購読オブジェクトを受け取ります。
- 監視を開始します。
- 購読を終了するには購読オブジェクトを解放します。
GrpcServerStreamingObservable<Response> observable;
IDisposable subscriber;
IObserver<Response> observer;
// レスポンスストリームの読み込みが完了するまでブロックされるため、待機しないように実装する必要があります。
Task nowait = Task.Run(async () =>
{
try
{
// 1. RPCメソッドを実行し、配信オブジェクトを生成します。
using (observable = client.Receive(request, callOptions).ToObservable())
{
// 2. 監視オブジェクトを生成します。
observer = GrpcObservable.CreateObserver<Response>(
// レスポンスを受け取ったときの処理
response => OnResponse(response)
// 例外が発生したときの処理
, ex => OnError(ex)
// 完了したときの処理
, () => OnComplete()
);
// 3. 監視オブジェクトを渡して購読オブジェクトを受け取ります。
subscriver = observable.Subscribe(observer);
// 4. 監視を開始します。
await observable.ObserveAsync().ConfigureAwait(false);
}
}
catch (Exception ex)
{
// 配信クラスでは発生した例外を再スローしています。
}
finally
{
// 配信クラスでは dispose 時に購読オブジェクトを解放しています。
// if (subscriber!= null) {subscriber.Dispose();}
}
}
);
// 5. 購読を終了するには購読オブジェクトを解放します。
subscriber.Dispose();
// 2'. 新しい監視オブジェクトを生成します。
var observer2 = GrpcObservable.CreateObserver<Response>(
// レスポンスを受け取ったときの処理
response => OnResponse2(response)
// 例外が発生したときの処理
, ex => OnError2(ex)
// 完了したときの処理
, () => OnComplete2()
);
// 3'. 新しい監視オブジェクトを渡して購読オブジェクトを受け取ります。
var subscriver2 = observable.Subscribe(observer2);
DuplexStreaming の場合はリクエストの書き込みと完了の操作が増えます。
リクエストの操作と監視の操作の順番は入れ替わっても構いません。
- RPCメソッドを実行し、配信オブジェクトを生成します。
- 監視オブジェクトを生成します。
- 監視オブジェクトを渡して購読オブジェクトを受け取ります。
- 監視を開始します。
- リクエストを書き込みます。
- リクエストの完了を通知します。
- 購読を終了するには購読オブジェクトを解放します。
GrpcDuplexStreamingObservable<Request, Response> observable;
IDisposable subscriber;
IObserver<Response> observer;
// レスポンスストリームの読み込みが完了するまでブロックされるため、待機しないように実装する必要があります。
Task nowait = Task.Run(async () =>
{
try
{
// 1. RPCメソッドを実行し、配信オブジェクトを生成します。
using (observable = client.Receive(callOptions).ToObservable())
{
// 2. 監視オブジェクトを生成します。
observer = GrpcObservable.CreateObserver<Response>(
// レスポンスを受け取ったときの処理
response => OnResponse(response)
// 例外が発生したときの処理
, ex => OnError(ex)
// 完了したときの処理
, () => OnComplete()
);
// 3. 監視オブジェクトを渡して購読オブジェクトを受け取ります。
subscriver = observable.Subscribe(observer);
// 4. 監視を開始します。
await observable.ObserveAsync().ConfigureAwait(false);
}
}
catch (Exception ex)
{
// 配信クラスでは発生した例外を再スローしています。
}
finally
{
// 配信クラスでは dispose 時に購読オブジェクトを解放しています。
// if (subscriber!= null) {subscriber.Dispose();}
}
}
);
// 5. リクエストを書き込む。
await observable.WriteRequestAsync(request).ConfigureAwait(false);
// 6. リクエストの完了を通知する。
await observable.CompleteRequestAsync().ConfigureAwait(false);
// 7. 購読を終了するには購読オブジェクトを解放します。
subscriber.Dispose();
// 2'. 新しい監視オブジェクトを生成します。
var observer2 = GrpcObservable.CreateObserver<Response>(
// レスポンスを受け取ったときの処理
response => OnResponse2(response)
// 例外が発生したときの処理
, ex => OnError2(ex)
// 完了したときの処理
, () => OnComplete2()
);
// 3'. 新しい監視オブジェクトを渡して購読オブジェクトを受け取ります。
var subscriver2 = observable.Subscribe(observer2);