LoginSignup
3
4

More than 5 years have passed since last update.

C# gRPC ストリーム入出力に対してインターセプトする方法

Posted at

公式のインターセプターは RPC メソッドの呼び出しが対象

Grpc.Core.Intercepters.Intercepter クラスには RPC メソッドの呼び出しに対するインターセプトメソッドが定義されており、これらのメソッドをオーバーライドすることによって RPC メソッドの呼び出し前後に任意の処理を割り込ませたり、例外に対する処理を行うことができます。

インターセプターの仕組みについてはこちらを参照してください。
gRPC インターセプターの利用(アイデア編)

ストリーム入出力に対してインターセプトしたい

私は今、Timeout や Unavailable など特定の例外が連続して発生した場合は一定時間 RPC メソッドの呼び出しを遮断する「サーキットブレーカー」を実装しているのですが、非同期 Unary, ClientStreaming, ServerStreaming, DuplexStreaming (つまり同期 Unary 以外)では RPC メソッドの呼び出し自体で例外が発生することは稀です。これらのメソッドはこの後行う非同期処理のための Call オブジェクトを生成するだけだからです。サーバーダウンしていたとしても呼び出しは成功し、リクエストを書き込んだりレスポンスを読み込んだりした時点で例外が発生します。

このときの例外は Intercepter クラスのメソッドを単純にオーバーライドするだけでは捕捉できません。そこで、ストリーム入出力に対するインターセプト機能を追加することにしました。

Intercepter

public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(
    ClientInterceptorContext<TRequest, TResponse> context
    , AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation
)
{
    try
    {
        return base.AsyncClientStreamingCall(context, continuation);
    }
    catch (Exception ex)
    {
        // サーバーがダウンしていてもここでは例外は発生しません。
    }
}

実装方針

  • Intercepter クラスを継承した DeepIntercepter クラスとして実装します。

  • Intercepter クラスではインターセプトを行いたい箇所に対応したインターセプトメソッドをオーバーライドする設計になっていますので、ストリーム入出力に対するインターセプトもこの設計に倣うことにします。public である必要はないと考えられるため、protected virtual メソッドとして実装します。

  • RPC 呼び出しメソッドの引数として受け取ったストリームリーダー/ライターをラップし、WriteAsync や MoveNext メソッドに対するインターセプト処理を組み込んだラッパーを生成します。オリジナルのストリームリーダー/ライターの代わりにラッパーを後続の処理に渡します。

ソースコードは GitHub にアップロードしてあります。
【GitHub】DeepInterceptor のソースコード

ラッパークラスの実装

ラップ対象のストリームリーダー/ライターの型は次の三つです。ラップ対象のインスタンスを内包し、ストリーム操作を行うメソッドが呼び出されたときに実行する処理をデリゲートとして受け取ります。

使用されている箇所
IClientStreamWriter<T> クライアント側のリクエストストリームへの書き込みを行うライター
IServerStreamWriter<T> サーバー側のレスポンスストリームへの書き込みを行うライター
IAsyncStreamReader<T> クライアント側のレスポンスストリームとサーバー側のリクエストストリームからの読み込みを行うリーダー

IClientStreamWriter<T> に対するラッパークラス

ClientStreamWriterWrapper<T>

    internal sealed class ClientStreamWriterWrapper<T> : IClientStreamWriter<T>
    {

        internal ClientStreamWriterWrapper(
            IClientStreamWriter<T> streamWriter
            , Func<IClientStreamWriter<T>, Task> onComplete
            , Func<IClientStreamWriter<T>, T, Task> onWrite)
        {
            m_StreamWriter = streamWriter;
            m_OnComplete = onComplete;
            m_OnWrite = onWrite;
        }

        private readonly IClientStreamWriter<T> m_StreamWriter;
        private readonly Func<IClientStreamWriter<T>, Task> m_OnComplete;
        private readonly Func<IClientStreamWriter<T>, T, Task> m_OnWrite;

        public WriteOptions WriteOptions
        {
            get { return m_StreamWriter.WriteOptions; }
            set { m_StreamWriter.WriteOptions = value; }
        }

        public Task CompleteAsync()
        {
            return m_OnComplete(m_StreamWriter);
        }

        public Task WriteAsync(T message)
        {
            return m_OnWrite(m_StreamWriter, message);
        }

    }

IServerStreamWriter<T> に対するラッパークラス

ServerStreamWriterWrapper<T>

    internal sealed class ServerStreamWriterWrapper<T> : IServerStreamWriter<T>
    {

        internal ServerStreamWriterWrapper(
            IServerStreamWriter<T> streamWriter
            , Func<IServerStreamWriter<T>, T, Task> onWrite)
        {
            m_StreamWriter = streamWriter;
            m_OnWrite = onWrite;
        }

        private readonly IServerStreamWriter<T> m_StreamWriter;
        private readonly Func<IServerStreamWriter<T>, T, Task> m_OnWrite;

        public WriteOptions WriteOptions
        {
            get { return m_StreamWriter.WriteOptions; }
            set { m_StreamWriter.WriteOptions = value; }
        }

        public Task WriteAsync(T message)
        {
            return m_OnWrite(m_StreamWriter, message);
        }

    }

IAsyncStreamReader<T> に対するラッパークラス

AsyncStreamReaderWrapper<T>

    internal sealed class AsyncStreamReaderWrapper<T> : IAsyncStreamReader<T>
    {

        internal AsyncStreamReaderWrapper(
            IAsyncStreamReader<T> streamReader
            , Func<IAsyncStreamReader<T>, CancellationToken, Task<bool>> onMoveNext
            , Action<IAsyncStreamReader<T>> onDispose)
        {
            m_StreamReader = streamReader;
            m_OnMoveNext = onMoveNext;
            m_OnDispose = onDispose;
        }

        private readonly IAsyncStreamReader<T> m_StreamReader;
        private readonly Func<IAsyncStreamReader<T>, CancellationToken, Task<bool>> m_OnMoveNext;
        private readonly Action<IAsyncStreamReader<T>> m_OnDispose;

        public T Current
        {
            get { return m_StreamReader.Current; }
        }

        public void Dispose()
        {
            m_OnDispose(m_StreamReader);
        }

        public async Task<bool> MoveNext(CancellationToken cancellationToken)
        {
            return await m_OnMoveNext(m_StreamReader, cancellationToken).ConfigureAwait(false);
        }

    }

DeepInterceptor クラスの実装

Intercepter クラスを継承して DeepIntercepter クラスを定義します。

インターセプトメソッドのオーバーライド

インターセプトメソッドをオーバーライドし、引数で受け渡しされるストリームリーダー/ライターをラッパーに置き換えます。以下は ClientStream のインターセプトを行う AsyncClientStreamingCall メソッドのオーバーライト部分の抜粋です。

DeepIntercepter

    public class DeepInterceptor : Interceptor
    {

        public bool DeepIntercepterEnabled { get; protected set; }

        public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(
            ClientInterceptorContext<TRequest, TResponse> context
            , AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation)
        {
            var call = base.AsyncClientStreamingCall(context, continuation);

            if (!DeepIntercepterEnabled) { return call; }

            // ストリームライターをラップ
            IClientStreamWriter<TRequest> streamWriter = CreateClientStreamWriter(call.RequestStream, context);

            // Callオブジェクトのメソッドをラップ
            Task<TResponse> getResponse = InterceptGetResponse(call.ResponseAsync, context);
            Task<Metadata> getHeader = InterceptGetResponseHeaders(call.ResponseHeadersAsync, context);
            Func<Status> getStatus = () => { return InterceptGetStatus(call.GetStatus, context); };
            Func<Metadata> getTrailer = () => { return InterceptGetTrailers(call.GetTrailers, context); };

            // ラッパーを内包したCallオブジェクトを返す
            return new AsyncClientStreamingCall<TRequest, TResponse>(
                streamWriter
                , getResponse
                , getHeader
                , getStatus
                , getTrailer
                , call.Dispose
            );
        }

    }

ストリームリーダー/ライターのラップ

続いて、上記のメソッド内で呼び出しているストリームライターのラップ処理です。

DeepIntercepter

    private IClientStreamWriter<TRequest> CreateClientStreamWriter<TRequest, TResponse>(
        IClientStreamWriter<TRequest> streamWriter
        , ClientInterceptorContext<TRequest, TResponse> context
    )
        where TRequest : class
        where TResponse : class
    {

        Task onComplete(IClientStreamWriter<TRequest> writer)
        {
            async Task func()
            {
                await writer.CompleteAsync().ConfigureAwait(false);
            }
            return InterceptClientCompleteRequest(func, context);
        }

        Task onWrite(IClientStreamWriter<TRequest> writer, TRequest request)
        {
            async Task func(TRequest req)
            {
                await writer.WriteAsync(req).ConfigureAwait(false);
            }
            return InterceptClientWriteRequest(func, request, context);
        }

        return new Core.ClientStreamWriterWrapper<TRequest>(streamWriter, onComplete, onWrite);
    }

ストリーム入出力に対するインターセプトメソッドの定義

最後にインターセプトメソッドです。protected virtual メソッドとして定義しています。ストリームや Call オブジェクトに対して操作が行われたときにこれらのメソッドが呼び出されます。割り込みを行いたい場合、これらのメソッドをオーバーライドします。

DeepIntercepter

    /// <summary>
    /// レスポンスの取得に対する割り込みを行います。
    /// </summary>
    protected virtual Task<TResponse> InterceptGetResponse<TRequest, TResponse>(
        Task<TResponse> getReponse
        , ClientInterceptorContext<TRequest, TResponse> context
    )
        where TRequest : class
        where TResponse : class
    {
        return getReponse;
    }

    /// <summary>
    /// レスポンスヘッダーの取得に対する割り込みを行います。
    /// </summary>
    protected virtual Task<Metadata> InterceptGetResponseHeaders<TRequest, TResponse>(
        Task<Metadata> getReponseHeaders
        , ClientInterceptorContext<TRequest, TResponse> context
    )
        where TRequest : class
        where TResponse : class
    {
        return getReponseHeaders;
    }

    /// <summary>
    /// ステータスの取得に対する割り込みを行います。
    /// </summary>
    protected virtual Status InterceptGetStatus<TRequest, TResponse>(
        Func<Status> getStatus
        , ClientInterceptorContext<TRequest, TResponse> context
    )
        where TRequest : class
        where TResponse : class
    {
        return getStatus();
    }

    /// <summary>
    /// トレーラーの取得に対する割り込みを行います。
    /// </summary>
    protected virtual Metadata InterceptGetTrailers<TRequest, TResponse>(
        Func<Metadata> getTrailers
        , ClientInterceptorContext<TRequest, TResponse> context
    )
        where TRequest : class
        where TResponse : class
    {
        return getTrailers();
    }

    /// <summary>
    /// リクエストの書き込みに対する割り込みを行います。
    /// </summary>
    protected virtual Task InterceptClientWriteRequest<TRequest, TResponse>(
        Func<TRequest, Task> writeRequest
        , TRequest request
        , ClientInterceptorContext<TRequest, TResponse> context
    )
        where TRequest : class
        where TResponse : class
    {
        return writeRequest(request);
    }

    /// <summary>
    /// リクエストの完了通知に対する割り込みを行います。
    /// </summary>
    protected virtual Task InterceptClientCompleteRequest<TRequest, TResponse>(
        Func<Task> completeRequest
        , ClientInterceptorContext<TRequest, TResponse> context
    )
        where TRequest : class
        where TResponse : class
    {
        return completeRequest();
    }

DeepInterceptor クラスの利用例

Interceptor クラスで提供されているインターセプトメソッドに加えて DeepInterceptor に定義したインターセプトメソッドをオーバーライドし、RPC メソッド呼び出しとストリーム入出力の両方に対する割り込みを行うことができるようになりました。


    public class ExceptionHandlerSample : DeepInterceptor
    {
        public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(
            ClientInterceptorContext<TRequest, TResponse> context
            , AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation
        )
        {
            try
            {
                return base.AsyncClientStreamingCall(context, continuation);
            }
            catch (Exception ex)
            {
                // 例外が発生したときの処理
            }
        }

        protected async override Task InterceptClientWriteRequest<TRequest, TResponse>(
            Func<TRequest, Task> writeRequest
            , TRequest request
            , ClientInterceptorContext<TRequest, TResponse> context
        )
        {
            try
            {
                await base.InterceptClientWriteRequest(writeRequest, request, context).ConfigureAwait(false);
            }
            catch(Exception ex)
            {
                // 例外が発生したときの処理
            }
        }

        protected override Task InterceptClientCompleteRequest<TRequest, TResponse>(
            Func<Task> completeRequest
            , ClientInterceptorContext<TRequest, TResponse> context
        )
        {
            try
            {
                await base.InterceptClientCompleteRequest(completeRequest, context).ConfigureAwait(false);
            }
            catch (Exception ex)
            {
                // 例外が発生したときの処理
            }
        }
    }
3
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
4