はじめに
MagicOnion についてはこちらをご覧ください。
非ストリーミング通信での共通処理の挟み方は以前の記事で紹介しました。
ストリーミング通信でも同じく gRPC Interceptor を用いて処理を挟むことができるのですが、
MagicOnion 特有のフォーマットで送受信されているためそれを理解する必要があります。
実装
方針
Grpc.Core.Interceptors.Interceptor クラスを継承し、必要なメソッドを override します。
今回必要になるのは AsyncDuplexStreamingCall のみです。
public class StreamingLoggingInterceptor : Interceptor
{
public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(
ClientInterceptorContext<TRequest, TResponse> context,
AsyncDuplexStreamingCallContinuation<TRequest, TResponse> continuation
)
{
// とりあえずそのまま返す
return continuation(context);
}
}
AsyncDuplexStreamingCall<TRequest, TResponse> の定義は以下のとおりです。
public sealed class AsyncDuplexStreamingCall<TRequest, TResponse> : IDisposable
{
public IAsyncStreamReader<TResponse> ResponseStream { get; }
public IClientStreamWriter<TRequest> RequestStream { get; }
public Task<Metadata> ResponseHeadersAsync { get; }
public void Dispose();
public Status GetStatus();
public Metadata GetTrailers();
}
入出力はそれぞれ RequestStream, ResponseStream を通して行われているようです。
そのためこいつらを自前実装したクラスに差し替えてやれば入出力に対して処理を挟むことができるようになります。
RequestStream, ResponseStream の実装
それぞれ、生データ(byte[])に対する処理と元のStreamを渡す構成にしておきます。
class MyRequestStream<T> : IClientStreamWriter<T>
{
private Action<byte[]> _interceptor;
private IClientStreamWriter<T> _baseStream;
public MyRequestStream(Action<byte[]> interceptor, IClientStreamWriter<T> baseStream)
=> (_interceptor, _baseStream) = (interceptor, baseStream);
// キモの部分
public Task WriteAsync(T message)
{
// 書き込む前にintercept
_interceptor(message as byte[]);
return _baseStream.WriteAsync(message);
}
// 以下、インターフェースを満たすための実装
public Task CompleteAsync() => _baseStream.CompleteAsync();
public WriteOptions WriteOptions
{
get => _baseStream.WriteOptions;
set => _baseStream.WriteOptions = value;
}
}
class MyResponseStream<T> : IAsyncStreamReader<T>
{
private Action<byte[]> _interceptor;
private IAsyncStreamReader<T> _baseStream;
public MyResponseStream(Action<byte[]> interceptor, IAsyncStreamReader<T> baseStream)
=> (_interceptor, _baseStream) = (interceptor, baseStream);
// キモの部分
public T Current
{
get
{
// 読み取る前にintercept
_interceptor(_baseStream.Current as byte[]);
return _baseStream.Current;
}
}
// 以下、インターフェースを満たすための実装
public Task<bool> MoveNext(CancellationToken cancellationToken) => _baseStream.MoveNext(cancellationToken);
public void Dispose() => _baseStream.Dispose();
}
これを適用してみましょう。
public class StreamingLoggingInterceptor : Interceptor
{
public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(
ClientInterceptorContext<TRequest, TResponse> context,
AsyncDuplexStreamingCallContinuation<TRequest, TResponse> continuation
)
{
// 差し込む処理の定義
void intercept(byte[] data)
{
// 雑にJSON化
Debug.Log(MessagePackSerializer.ToJson(data));
}
var call = continuation(context);
// 自作Streamに差し替えたcallを返す
return new AsyncDuplexStreamingCall<TRequest, TResponse>(
new MyRequestStream<TRequest>(intercept, call.RequestStream),
new MyResponseStream<TResponse>(intercept, call.ResponseStream),
call.ResponseHeadersAsync,
call.GetStatus,
call.GetTrailers,
call.Dispose
);
}
}
MagicOnion での送受信フォーマット
これで一応リクエスト/レスポンスデータを見ることはできるのですが、なんだかよくわからないフォーマットになっています。
例えば SendStampAsync(3) というリクエストに対するデータは [2,1766806594,3] となりました。
これがどういう構造なのかは StreamingHubClientBase.cs と StreamingHubClientBuilder.cs を頑張って読めばわかるのですが、[メッセージID, メソッドID, 引数のデータ本体] という3要素の配列になっています。
メッセージIDは自身が送ったメッセージ1個ごとに固有のIDが振られます。
メソッドIDはメソッド名のFNV1A32ハッシュ値です。(※MethodIdAttribute を付けることで任意のメソッドIDを付けられるようですが、ここでは付けてないものとします。)
レスポンスは3種類のフォーマットがあります。どれが送られてきたかは配列の要素数で判別できるようです。
2要素: Receiverメソッドの呼び出し [メソッドID, 引数のデータ本体]
3要素: リクエストに対するレスポンス [メッセージID, メソッドID, 戻り値]
4要素: エラーレスポンス [メッセージID, ステータスコード, エラー詳細, エラー文言]
フォーマットに従った出力
メソッドIDじゃ分かりにくいのでメソッド名で出力してくれるように改造します。
そのため、初めにメソッドIDからメソッド名への変換テーブルを用意します。
public class StreamingLoggingInterceptor<THub, TReceiver> : Interceptor
{
private Dictionary<int, string> _methodNameDic = new Dictionary<int, string>();
// メソッドIDからメソッド名への変換テーブルを用意
public StreamingLoggingInterceptor()
{
foreach (var method in typeof(THub).GetMethods())
{
_methodNameDic[FNV1A32.GetHashCode(method.Name)] = method.Name;
}
foreach (var method in typeof(TReceiver).GetMethods())
{
_methodNameDic[FNV1A32.GetHashCode(method.Name)] = method.Name;
}
}
// ログ出力処理本体
private void PrintLog(string type, byte[] bytes)
{
// 要素数を取得
var readSize = 0;
var length = MessagePackBinary.ReadArrayHeader(bytes, 0, out readSize);
var offset = readSize;
// 要素数4はエラー出力なので無視(!)
if (length == 4) return;
// 要素数3ならはじめはメッセージIDなので捨てる
if (length == 3) offset += MessagePackBinary.ReadNext(bytes, offset);
// メソッドID取得
var methodId = MessagePackBinary.ReadInt32(bytes, offset, out readSize);
offset += readSize;
// 残りのデータをコピー(ToJsonにoffset渡したい…)
var newBytes = BufferPool.Default.Rent();
Array.Copy(bytes, offset, newBytes, 0, bytes.Length - offset);
// 出力
Debug.Log($"[{type}]{_methodNameDic[methodId]}: {MessagePackSerializer.ToJson(newBytes)}");
BufferPool.Default.Return(newBytes);
}
public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(
ClientInterceptorContext<TRequest, TResponse> context,
AsyncDuplexStreamingCallContinuation<TRequest, TResponse> continuation
)
{
var call = continuation(context);
return new AsyncDuplexStreamingCall<TRequest, TResponse>(
new MyRequestStream<TRequest>(x => PrintLog("SEND", x), call.RequestStream),
new MyResponseStream<TResponse>(x => PrintLog("RECEIVE", x), call.ResponseStream),
call.ResponseHeadersAsync,
call.GetStatus,
call.GetTrailers,
call.Dispose
);
}
}
使う側はこんな感じです。
var channel = new Channel("host:port", ChannelCredentials.Insecure);
var invoker = channel.Intercept(new StreamingLoggingInterceptor<IhogeHub, IHogeHubReceiver>());
var client = StreamingHubClient.Connect<IhogeHub, IHogeHubReceiver>(invoker, receiver);
これで以下のようなログが出力されるようになりました。
[SEND]SendStampAsync: 3
[RECEIVE]OnReceiveStamp: [1010826359,3]
[RECEIVE]SendStampAsync: null
おわりに
gRPC 側の仕組みに頼らないフィルタの開発が予定されており、
そちらが実装されるとこんな面倒なことはする必要がなくなるかと思われます。