1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

C# gRPC Polly でエラー制御を実装する

Posted at

gRPC におけるエラー制御

リモートAPI である gRPC では通信系の障害対策は必須と言えます。Polly を使った Interceptor を考えてみます。

Polly とは

Polly は、フロー制御ライブラリです。複雑になりがちな次のフローを簡潔に実装することができます。

  • リトライ
  • サーキットブレーカー
  • タイムアウト
  • バルクヘッド
  • キャッシュ
  • フォールバック

【しばやん雑記】C# と Polly を使って回復力の高いアプリケーションを書く
【Microsoft Docs】サーキット ブレーカー パターンを実装する

実装例

IRcpPollyProvider

ポリシーの生成をインターセプター本体から分離するため、ポリシーを返すインターフェースを定義しました。

IRpcPollyProvider

/// <summary>
/// gRPC インターセプター用の Polly ポリシーを生成します。
/// </summary>
public interface IRpcPollyProvider
{
    /// <summary>
    /// クライアントサイド用の同期ポリシーを取得します。
    /// </summary>
    Polly.ISyncPolicy GetClientSyncPolicy<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context)
        where TRequest : class
        where TResponse : class;

    /// <summary>
    /// クライアントサイド用の非同期ポリシーを取得します。
    /// </summary>
    Polly.IAsyncPolicy GetClientAsyncPolicy<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context)
        where TRequest : class
        where TResponse : class;

    /// <summary>
    /// サーバーサイド用の同期ポリシーを取得します。
    /// </summary>
    Polly.ISyncPolicy GetServerSyncPolicy(ServerCallContext context);

    /// <summary>
    /// サーバーサイド用の非同期ポリシーを取得します。
    /// </summary>
    Polly.IAsyncPolicy GetServerAsyncPolicy(ServerCallContext context);
}

RpcPollyInterceptor(実は思った通りに制御できない版)

Polly によるフロー制御を行うインターセプターです。コンストラクタで IRpcPollyProvider インスタンスを受け取ります。取得したポリシーの Execute メソッドを介して RPC メソッドを実行します。

RpcPollyInterceptor
using Grpc.Core;
using Grpc.Core.Interceptors;

/// <summary>
/// Polly によるフロー制御を行うインターセプター。
/// </summary>
public class RpcPollyInterceptor : Interceptor
{
    public RpcPollyInterceptor(IRpcPollyProvider pollyProvider) : base()
    {
        PollyProvider = pollyProvider;
    }

    #region Polly

    /// <summary>
    /// ポリシープロバイダーを取得します。
    /// </summary>
    public IRpcPollyProvider PollyProvider { get; }

    /// <summary>
    /// クライアントサイド用の同期ポリシーを取得します。
    /// </summary>
    protected virtual Polly.ISyncPolicy GetClientSyncPolicy<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context)
        where TRequest : class
        where TResponse : class
    {
        return PollyProvider.GetClientSyncPolicy(context);
    }

    /// <summary>
    /// クライアントサイド用の非同期ポリシーを取得します。
    /// </summary>
    protected virtual Polly.IAsyncPolicy GetClientAsyncPolicy<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context)
        where TRequest : class
        where TResponse : class
    {
        return PollyProvider.GetClientAsyncPolicy(context);
    }

    /// <summary>
    /// サーバーサイド用の同期ポリシーを取得します。
    /// </summary>
    protected virtual Polly.ISyncPolicy GetServerSyncPolicy(ServerCallContext context)
    {
        return PollyProvider.GetServerSyncPolicy(context);
    }

    /// <summary>
    /// サーバーサイド用の非同期ポリシーを取得します。
    /// </summary>
    protected virtual Polly.IAsyncPolicy GetServerAsyncPolicy(ServerCallContext context)
    {
        return PollyProvider.GetServerAsyncPolicy(context);
    }

    #endregion

    #region client interceptor

    /// <summary>
    /// Intercepts a blocking invocation of a simple remote call.
    /// </summary>
    public override TResponse BlockingUnaryCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, BlockingUnaryCallContinuation<TRequest, TResponse> continuation)
    {
        Polly.ISyncPolicy policy = GetClientSyncPolicy(context);
        if (policy == null) { return base.BlockingUnaryCall(request, context, continuation); }
        else { return policy.Execute(() => base.BlockingUnaryCall(request, context, continuation)); }
    }

    /// <summary>
    /// Intercepts an asynchronous invocation of a simple remote call.
    /// </summary>
    public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
    {
        Polly.ISyncPolicy policy = GetClientSyncPolicy(context);
        if (policy == null) { return base.AsyncUnaryCall(request, context, continuation); }
        else { return policy.Execute(() => base.AsyncUnaryCall(request, context, continuation)); }
    }

    // ClientStreaming, ServerStreaming, DuplexStreaming も実装の流れは同じですので割愛します。

    #endregion

    #region server interceptor

    /// <summary>
    /// Server-side handler for intercepting and incoming unary call.
    /// </summary>
    public override Task<TResponse> UnaryServerHandler<TRequest, TResponse>(TRequest request, ServerCallContext context, UnaryServerMethod<TRequest, TResponse> continuation)
    {
        Polly.IAsyncPolicy policy = GetServerAsyncPolicy(context);
        if (policy == null) { return base.UnaryServerHandler(request, context, continuation); }
        else { return policy.ExecuteAsync(() => base.UnaryServerHandler(request, context, continuation)); }
    }

    // ClientStreaming, ServerStreaming, DuplexStreaming も実装の流れは同じですので割愛します。

    #endregion

}

RpcRetryPolicy

RpcException がスローされたときにリトライを行うポリシーを返す IRpcPollyProvider 実装クラスです。
リトライを行うたびにリトライまでの待機時間を 1 秒ずつのばしています。

RpcRetryPolicy
public class RpcRetryPolicy : IRpcPollyProvider
{
    /// <summary>
    /// 
    /// </summary>
    /// <param name="maxRetryCount">最大リトライ回数</param>
    public RpcRetryPolicy(int maxRetryCount)
    {
        m_SyncPolicy = Policy.Handle<RpcException>().Retry(
            maxRetryCount
            , (ex, count) => { Thread.Sleep(count * 1000); }
        );
        m_AsyncPolicy = Policy.Handle<RpcException>().RetryAsync(
            maxRetryCount
            , (ex, count) => { return Task.Delay(count * 1000); }
        );
    }

    private readonly ISyncPolicy m_SyncPolicy;
    private readonly IAsyncPolicy m_AsyncPolicy;

    public ISyncPolicy GetClientSyncPolicy<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context)
        where TRequest : class
        where TResponse : class
    {
        return m_SyncPolicy;
    }

    public IAsyncPolicy GetClientAsyncPolicy<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context)
        where TRequest : class
        where TResponse : class
    {
        return m_AsyncPolicy;
    }

    public ISyncPolicy GetServerSyncPolicy(ServerCallContext context)
    {
        return m_SyncPolicy;
    }

    public IAsyncPolicy GetServerAsyncPolicy(ServerCallContext context)
    {
        return m_AsyncPolicy;
    }
}

インターセプターを設定

クライアントサイドで生成した CallInvoker に対してインターセプターを設定します。

Channel channel = new Channel("localhost", 50000, ChannelCredentials.Insecure);

CallInvoker callInvoker = new DefaultCallInvoker(channel)
    .Intercept(new RpcPollyInterceptor(new RpcRetryPolicy(3)));

var rpcClient = new SampleServiceClient(callInvoker);

この実装では何が思った通りに制御できないのか

gRPC には 5 種類の RPC メソッドがあります。

  1. BlockingUnaryCall
  2. AsyncUnaryCall
  3. AsyncClientStreamingCall
  4. AsyncServerStreamingCall
  5. AsyncDuplexStreamingCall

このうち 1 を除く 2 ~ 4 のメソッドは、サーバーと接続していない状態でも呼び出し自体は成功します。その後、メソッドの戻り値の Call オブジェクトを通じて通信を伴う処理が行われた時点で例外が発生します。そのため、インターセプターに定義されているインターセプトメソッドに対して Polly による制御を行っても、トリガーとしたい例外自体が発生しないため何も行われません。例外の発生をトリガーとしない場合のみ制御可能です。

つまり、通信系の障害対策を行いたければ、C# gRPC ストリーム入出力に対してインターセプトする方法 のようにストリーム操作に対しても Polly による制御を行うように実装する必要があります。

今回はここまでとします。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?