LoginSignup
1
1

More than 5 years have passed since last update.

gRPC インターセプターの実装(呼び出しブロック)

Last updated at Posted at 2018-10-13

インターセプターを使用した呼び出しブロック

認証エラーやサービスダウン時に RPC メソッドの呼び出しをブロックしたいことがあります。このような場合は個々の RPC メソッドに実装するよりもインターセプターを使用したほうが簡潔に実装でき、柔軟な対応が可能になります。

インターセプターを実装するには、Grpc.Core.Interceptors.Interceptor クラスを継承します。
インターセプターの仕組みについてはこちらを参照してください。
gRPC インターセプターの利用(アイデア編)

実装

特定の条件にフォーカスしたものではなく、汎用的な実装としました。

説明
RpcCallBlockInterceptor クラス インターセプタークラス。
IRpcCallBlocker インターフェース RPC メソッド呼び出しのブロックに関する機能を提供するインターフェース。
IRpcContextFilter インターフェース RPC メソッドのフィルタに関する機能を提供するインターフェース。

RpcCallBlockInterceptor クラス

コンストラクタで受け取った IRpcCallBlocker インスタンスと IRpcContextFilter インスタンスに従い、RPC メソッドの呼び出しをブロックします。


    /// <summary>
    /// RPCメソッドの呼び出しをブロックするためのインターセプター。
    /// </summary>
    public class RpcCallBlockInterceptor : Interceptor
    {

        #region コンストラクタ

        /// <summary>
        /// インスタンスを生成します。
        /// </summary>
        public RpcCallBlockInterceptor(IRpcCallBlocker blocker, IRpcContextFilter contextFilter) : base()
        {
            Blocker = blocker ?? NullRpcCallBlocker.Default;
            ContextFilter = contextFilter ?? NullRpcContextFilter.Default;
        }

        #endregion

        #region RPCメソッド呼び出しの割り込み(クライアント側)

        /// <summary>
        /// Unary メソッド呼び出しに対する割り込みを行います。
        /// </summary>
        public override TResponse BlockingUnaryCall<TRequest, TResponse>(
            TRequest request
            , ClientInterceptorContext<TRequest, TResponse> context
            , BlockingUnaryCallContinuation<TRequest, TResponse> continuation)
        {
            if (NeedInterceptOnClient(context)) { AssertOnClient(request, context); }
            return base.BlockingUnaryCall(request, context, continuation);
        }

        /// <summary>
        /// 非同期 Unary メソッド呼び出しに対する割り込みを行います。
        /// </summary>
        public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
            TRequest request
            , ClientInterceptorContext<TRequest, TResponse> context
            , AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
        {
            if (NeedInterceptOnClient(context)) { AssertOnClient(request, context); }
            return base.AsyncUnaryCall(request, context, continuation);
        }

        /// <summary>
        /// ClientStreaming メソッド呼び出しに対する割り込みを行います。
        /// </summary>
        public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(
            ClientInterceptorContext<TRequest, TResponse> context
            , AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation)
        {
            if (NeedInterceptOnClient(context)) { AssertOnClient(default(TRequest), context); }
            return base.AsyncClientStreamingCall(context, continuation);
        }

        /// <summary>
        /// ServerStreaming メソッド呼び出しに対する割り込みを行います。
        /// </summary>
        public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(
            TRequest request
            , ClientInterceptorContext<TRequest, TResponse> context
            , AsyncServerStreamingCallContinuation<TRequest, TResponse> continuation)
        {
            if (NeedInterceptOnClient(context)) { AssertOnClient(request, context); }
            return base.AsyncServerStreamingCall(request, context, continuation);
        }

        /// <summary>
        /// DuplexStreaming メソッド呼び出しに対する割り込みを行います。
        /// </summary>
        public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(
            ClientInterceptorContext<TRequest, TResponse> context
            , AsyncDuplexStreamingCallContinuation<TRequest, TResponse> continuation)
        {
            if (NeedInterceptOnClient(context)) { AssertOnClient(default(TRequest), context); }
            return base.AsyncDuplexStreamingCall(context, continuation);
        }

        #endregion

        #region RPCメソッド呼び出しの割り込み(サーバー側)

        /// <summary>
        /// Unary メソッド呼び出しに対する割り込みを行います。
        /// </summary>
        public async override Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
            TRequest request
            , ServerCallContext context
            , UnaryServerMethod<TRequest, TResponse> continuation)
        {
            if (NeedInterceptOnServer(context)) { AssertOnServer(request, context); }
            return await base.UnaryServerHandler(request, context, continuation).ConfigureAwait(false);
        }

        /// <summary>
        /// ClientStreaming メソッド呼び出しに対する割り込みを行います。
        /// </summary>
        public async override Task<TResponse> ClientStreamingServerHandler<TRequest, TResponse>(
            IAsyncStreamReader<TRequest> requestStream
            , ServerCallContext context
            , ClientStreamingServerMethod<TRequest, TResponse> continuation)
        {
            if (NeedInterceptOnServer(context)) { AssertOnServer(default(TRequest), context); }
            return await base.ClientStreamingServerHandler(requestStream, context, continuation).ConfigureAwait(false);
        }

        /// <summary>
        /// ServerStreaming メソッド呼び出しに対する割り込みを行います。
        /// </summary>
        public async override Task ServerStreamingServerHandler<TRequest, TResponse>(
            TRequest request
            , IServerStreamWriter<TResponse> responseStream, ServerCallContext context
            , ServerStreamingServerMethod<TRequest, TResponse> continuation)
        {
            if (NeedInterceptOnServer(context)) { AssertOnServer(request, context); }
            await base.ServerStreamingServerHandler(request, responseStream, context, continuation).ConfigureAwait(false);
        }

        /// <summary>
        /// DuplexStreaming メソッド呼び出しに対する割り込みを行います。
        /// </summary>
        public async override Task DuplexStreamingServerHandler<TRequest, TResponse>(
            IAsyncStreamReader<TRequest> requestStream
            , IServerStreamWriter<TResponse> responseStream
            , ServerCallContext context
            , DuplexStreamingServerMethod<TRequest, TResponse> continuation)
        {
            if (NeedInterceptOnServer(context)) { AssertOnServer(default(TRequest), context); }
            await base.DuplexStreamingServerHandler(requestStream, responseStream, context, continuation).ConfigureAwait(false);
        }

        #endregion

        #region フィルタ

        /// <summary>
        /// フィルタを取得します。
        /// </summary>
        public IRpcContextFilter ContextFilter { get; private set; }

        /// <summary>
        /// クライアント側で割り込みを行う必要があるかどうかを取得します。
        /// </summary>
        protected virtual bool NeedInterceptOnClient<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context)
            where TRequest : class
            where TResponse : class
        {
            if (!Blocker.Enabled) { return false; }
            if (!ContextFilter.IsMatch(context)) { return false; }
            return true;
        }

        /// <summary>
        /// サーバー側で割り込みを行う必要があるかどうかを取得します。
        /// </summary>
        protected virtual bool NeedInterceptOnServer(ServerCallContext context)
        {
            if (!Blocker.Enabled) { return false; }
            if (!ContextFilter.IsMatch(context)) { return false; }
            return true;
        }

        #endregion

        #region ブロック

        /// <summary>
        /// ブロッカーを取得します。
        /// </summary>
        public IRpcCallBlocker Blocker { get; private set; }

        /// <summary>
        /// クライアント側でメソッド呼び出しをブロックする場合、例外を発生させます。
        /// </summary>
        protected virtual void AssertOnClient<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context)
            where TRequest : class
            where TResponse : class
        {
            if (!AllowInvocationOnClient(request, context, out Status errorStatus, out Metadata errorData))
            {
                throw CreateRpcException(errorStatus, errorData);
            }
        }

        /// <summary>
        /// サーバー側でメソッド呼び出しをブロックする場合、例外を発生させます。
        /// </summary>
        protected virtual void AssertOnServer<TRequest>(TRequest request, ServerCallContext context)
            where TRequest : class
        {
            if (!AllowInvocationOnServer(request, context, out Status errorStatus, out Metadata errorData))
            {
                throw CreateRpcException(errorStatus, errorData);
            }
        }

        /// <summary>
        /// クライアント側でメソッド呼び出しを許可するかどうかを取得します。
        /// </summary>
        protected virtual bool AllowInvocationOnClient<TRequest, TResponse>(
            TRequest request
            , ClientInterceptorContext<TRequest, TResponse> context
            , out Status errorStatus, out Metadata errorData)
            where TRequest : class
            where TResponse : class
        {
            if (!Blocker.Enabled)
            {
                errorStatus = default(Status);
                errorData = null;
                return true;
            }
            return Blocker.AllowInvocationOnClient(request, context, out errorStatus, out errorData);
        }

        /// <summary>
        /// サーバー側でメソッド呼び出しを許可するかどうかを取得します。
        /// </summary>
        protected virtual bool AllowInvocationOnServer<TRequest>(
            TRequest request
            , ServerCallContext context
            , out Status errorStatus, out Metadata errorData)
            where TRequest : class
        {
            if (!Blocker.Enabled)
            {
                errorStatus = default(Status);
                errorData = null;
                return true;
            }
            return Blocker.AllowInvocationOnServer(request, context, out errorStatus, out errorData);
        }

        /// <summary>
        /// 指定されたステータスとメタデータから例外を生成します。
        /// </summary>
        protected virtual Exception CreateRpcException(Status errorStatus, Metadata errorData)
        {
            return Blocker.CreateRpcException(errorStatus, errorData);
        }

        #endregion

    }

IRpcCallBlocker インターフェース

RPC メソッドの呼び出しをブロックするかどうかを判定し、ブロックする理由などをステータスとメタデータで返します。いろいろなインターセプターで実装を共有したり実装を取り替えたりできるようにするため、インターフェースとして定義しました。


    /// <summary>
    /// RPC メソッド呼び出しのブロックに関する機能を提供します。
    /// </summary>
    public interface IRpcCallBlocker
    {

        /// <summary>
        /// 有効かどうかを取得します。
        /// </summary>
        bool Enabled { get; }

        /// <summary>
        /// クライアント側でメソッド呼び出しを許可するかどうかを取得します。
        /// </summary>
        bool AllowInvocationOnClient<TRequest, TResponse>(
            TRequest request
            , ClientInterceptorContext<TRequest, TResponse> context
            , out Status errorStatus, out Metadata errorData)
            where TRequest : class
            where TResponse : class
        ;

        /// <summary>
        /// サーバー側でメソッド呼び出しを許可するかどうかを取得します。
        /// </summary>
        bool AllowInvocationOnServer<TRequest>(
            TRequest request
            , ServerCallContext context
            , out Status errorStatus, out Metadata errorData)
            where TRequest : class
        ;

        /// <summary>
        /// 指定されたステータスとメタデータから例外を生成します。
        /// </summary>
        /// <param name="errorStatus">The status.</param>
        /// <param name="errorData">The trailer.</param>
        /// <returns>An exception that created.</returns>
        Exception CreateRpcException(Status errorStatus, Metadata errorData);

    }

IRpcContextFilter インターフェース

メソッド定義やヘッダーの内容によるフィルタリングを行います。いろいろなインターセプターで実装を共有したり実装を取り替えたりできるようにするため、インターフェースとして定義しました。


    /// <summary>
    /// RPC メソッドのフィルタに関する機能を提供します。
    /// </summary>
    public interface IRpcContextFilter
    {

        /// <summary>
        /// 指定されたコンテキストが条件に合致するかどうかを取得します。
        /// </summary>
        bool IsMatch<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context)
            where TRequest : class
            where TResponse : class
            ;

        /// <summary>
        /// 指定されたコンテキストが条件に合致するかどうかを取得します。
        /// </summary>
        bool IsMatch(ServerCallContext context);

    }

ブロック・フィルタの実装例

MaintenanceBlocker クラス

IRpcCallBlocker インターフェースの実装例です。メンテナンス中である場合、呼び出しをブロックします。クライアント側でメンテナンス中かどうかのフラグを正しく設定することは難しいですが、うまくサーバーからプッシュする仕組みを設ければサーキットブレーカーのような働きをさせることができると思います。

ブロックによってクライアント側で RpcException がスローされ、その例外から「メンテナンスのためにブロックされたかどうか」を判別できる必要があります。メッセージ文字列だけで判断するのは望ましくありませんので、MetaData にエラーコードを格納するような実装を行うことになると思います。


    /// <summary>
    /// メンテナンス中である場合にメソッド呼び出しをブロックします。
    /// </summary>
    class MaintenanceBlocker : IRpcCallBlocker
    {

        /// <summary>
        /// 有効かどうかを取得または設定します。
        /// </summary>
        public bool Enabled { get; set; } = true;

        /// <summary>
        /// メンテナンス中かどうかを取得または設定します。
        /// </summary>
        public bool InMaintenance { get; set; } = false;

        /// <summary>
        /// クライアント側でメソッド呼び出しを許可するかどうかを取得します。
        /// </summary>
        public bool AllowInvocationOnClient<TRequest, TResponse>(
            TRequest request
            , ClientInterceptorContext<TRequest, TResponse> context
            , out Status errorStatus, out Metadata errorData)
            where TRequest : class
            where TResponse : class
        {
            if (InMaintenance)
            {
                FillErrorStatus(out errorStatus, out errorData);
                return false;
            }
            errorStatus = default(Status);
            errorData = null;
            return true;
        }

        /// <summary>
        /// サーバー側でメソッド呼び出しを許可するかどうかを取得します。
        /// </summary>
        public bool AllowInvocationOnServer<TRequest>(
            TRequest request
            , ServerCallContext context
            , out Status errorStatus, out Metadata errorData) where TRequest : class
        {
            if (InMaintenance)
            {
                FillErrorStatus(out errorStatus, out errorData);
                return false;
            }
            errorStatus = default(Status);
            errorData = null;
            return true;

        }

        /// <summary>
        /// 指定されたステータスとメタデータから例外を生成します。
        /// </summary>
        public Exception CreateRpcException(Status errorStatus, Metadata errorData)
        {
            if (errorData != null)
            {
                throw new RpcException(errorStatus, errorData);
            }
            else
            {
                throw new RpcException(errorStatus);
            }
        }

        /// <summary>
        /// エラーステータスとエラーデータを格納します。
        /// </summary>
        private void FillErrorStatus(out Status errorStatus, out Metadata errorData)
        {
            errorStatus = new Status(StatusCode.Unavailable, "In Maintenance.");
            errorData = new Metadata();
            errorData.Add("errorcode", "InMaintenance");
        }

    }

Null オブジェクト

前述の RpcCallBlockInterceptor クラスの実装に使用している Null オブジェクトの実装です。ブロックやフィルタを行わないように実装しているだけです。


    public sealed class NullRpcCallBlocker : IRpcCallBlocker
    {

        private NullRpcCallBlocker() { }

        public readonly static NullRpcCallBlocker Default = new NullRpcCallBlocker();

        public bool Enabled 
        {
            get { return false; }
        }

        public bool AllowInvocationOnClient<TRequest, TResponse>(
            TRequest request
            , ClientInterceptorContext<TRequest, TResponse> context
            , out Status errorStatus, out Metadata errorData)
            where TRequest : class
            where TResponse : class
        {
            errorStatus = default(Status);
            errorData = null;
            return true;
        }

        public bool AllowInvocationOnServer<TRequest>(
            TRequest request, ServerCallContext context
            , out Status errorStatus, out Metadata errorData)
            where TRequest : class
        {
            errorStatus = default(Status);
            errorData = null;
            return true;
        }

        public Exception CreateRpcException(Status errorStatus, Metadata errorData)
        {
            if (errorData == null)
            {
                return new RpcException(errorStatus);
            }
            else
            {
                return new RpcException(errorStatus, errorData);
            }
        }

    }

    public sealed class NullRpcContextFilter : IRpcContextFilter
    {

        private NullRpcContextFilter() { }

        public readonly static NullRpcContextFilter Default = new NullRpcContextFilter();

        public bool IsMatch<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context)
            where TRequest : class
            where TResponse : class
        {
            return false;
        }

        public bool IsMatch(ServerCallContext context)
        {
            return false;
        }

    }

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1