LoginSignup
0
0

More than 5 years have passed since last update.

gRPC サイズが不定な複合オブジェクトをストリームで受信する

Posted at

gRPC の送受信データサイズには上限がある

gRPC ではリクエスト/レスポンスのデータサイズに上限があります。既定値は4MBです。
データオブジェクト単体でこの上限を超えることはあまり多くはありませんが、データオブジェクトの数が多い場合にはその合計サイズが上限を超えたりします。
上限を変更することはできますが、データサイズが不定である場合はストリームを用いることが一般的です。リクエスト/レスポンスの一つ一つが上限を超えなければサイズ超過エラーは発生しません。

service Sample
{
    // 一つのリクエストに対して複数のレスポンスを受け取る
    rpc GetObjects (Request) returns (stream Response){}

    // 複数のリクエストに対して複数のレスポンスを受け取る
    rpc GetObjects (stream Request) returns (stream Response){}

    // 複数のリクエストに対して一つのレスポンスを受け取る
    rpc SetObjects (stream Request) returns (Response){}
}

リクエスト/レスポンスのデータサイズが不定だったら?

この方法を採用しても、リクエスト/レスポンスの一つ一つのデータサイズが上限を超える場合はサイズ超過エラーが発生します。リクエスト/レスポンスの型がリスト構造を内包するような場合、データサイズは内包するオブジェクトの数に比例します。数が不定であればデータサイズも不定になります。

次のような場合の実装を考えてみました。

  • 一回の API 呼び出しで受信するオブジェクトの数が不定である
  • 受信する型が複数の型を内包する複合オブジェクトである
  • その内包されるオブジェクトの数も不定である

そもそもアプリケーション設計や API 設計を見直した方がよいような気もしますが、サーバーサイドでできるだけバッファを持たずにクライアントへ返すように実装するようなケースにも転用できる考え方だと思います。

複合オブジェクトの定義

今回のサンプルで使用する複合オブジェクトです。ComplexHeader, ComplexItem, ComplexSubItem を内包します。ComplexItem, ComplexSubItem の数が不定で、非常に多い可能性があるものとします。
FillResponse メソッドを呼び出して受信したオブジェクトを詰め込んでいきます。

複合オブジェクト
public class ComplexObject
{

    public ComplexHeader Header { get; set; }

    public IList<ComplexItem> Items
    {
        get { return m_Items; }
    }
    private readonly List<ComplexItem> m_Items = new List<ComplexItem>();

    public IList<ComplexSubItem> SubItems
    {
        get { return m_SubItems; }
    }
    private readonly List<ComplexSubItem> m_SubItems = new List<ComplexSubItem>();

    /// <summary>
    /// 指定されたレスポンスに含まれるオブジェクトを格納します。
    /// </summary>
    /// <param name="response">レスポンス</param>
    public void FillResponse(ComplexObjectResponse response)
    {
        switch (response.ObjectsCase)
        {
            case ComplexObjectResponse.ObjectsOneofCase.Header:
                Header = response.Header;
                break;

            case ComplexObjectResponse.ObjectsOneofCase.Item:
                Items.Add(response.Item);
                break;

            case ComplexObjectResponse.ObjectsOneofCase.SubItem:
                SubItems.Add(response.SubItem);
                break;
        }
    }

}

gRPC APIの定義

今回は ProtocolBuffers で定義しました。oneof を使用しています。

LargeMessage.proto
syntax = "proto3";
option csharp_namespace = "Examples.GrpcModels.LargeMessage";

// 検索条件:今回のサンプルでは適当な内容
message Condition
{
    string Keyword = 1;
}

// 複合オブジェクトを構成する要素を受信するためのレスポンス
message ComplexObjectResponse
{
    int32 Index = 1;
    oneof Objects
    {
        ComplexHeader Header = 11;
        ComplexItem Item = 12;
        ComplexSubItem SubItem = 13;
    }
}

// 複合オブジェクトのヘッダー
message ComplexHeader
{
    string Id = 1;
    string Description = 2;
}

// 複合オブジェクトのアイテム
message ComplexItem
{
    string Id = 1;
    string CreatedDate = 2;
    int32 Value = 3;
}

// 複合オブジェクトのサブアイテム
message ComplexSubItem
{
    string Id = 1;
    string Value = 3;
}

// サービス定義
service LargeSearch
{
    rpc GetComplexObjects (Condition) returns (stream ComplexObjectResponse){}
}

サービスの実装

取得した ComplexHeader, ComplexItem, ComplexSubItem オブジェクトを一つずつクライアントに送信しています。

サービスの実装
internal class LargeSearchServiceImpl : LargeSearch.LargeSearchBase
{
    public override async Task GetComplexObjects(Condition request, IServerStreamWriter<ComplexObjectResponse> responseStream, ServerCallContext context)
    {
        // レスポンスのインスタンスは使いまわす
        ComplexObjectResponse response = new ComplexObjectResponse() { };

        // オブジェクトのキーとインデックスの組み合わせ
        Dictionary<string, int> indexes = new Dictionary<string, int>();

        // header
        foreach (ComplexHeader header in GetComplexHeaders(request))
        {
            response.Index = GetOrAddIndex(indexes, header.Id);
            response.Header = header;
            await responseStream.WriteAsync(response).ConfigureAwait(false);
        }

        // items
        foreach (ComplexItem item in GetComplexItems(request))
        {
            response.Index = GetOrAddIndex(indexes, item.Id);
            response.Item = item;
            await responseStream.WriteAsync(response).ConfigureAwait(false);
        }

        // subitems
        foreach (ComplexSubItem subitem in GetComplexSubItems(request))
        {
            response.Index = GetOrAddIndex(indexes, subitem.Id);
            response.SubItem = subitem;
            await responseStream.WriteAsync(response).ConfigureAwait(false);
        }

    }

    /// <summary>
    /// 指定されたキーに対応するインデックスを取得します。
    /// </summary>
    /// <param name="indexes">キーとインデックスの組み合わせ</param>
    /// <param name="key">キー</param>
    /// <returns>インデックス</returns>
    private int GetOrAddIndex(Dictionary<string, int> indexes, string key)
    {
        if (indexes.TryGetValue(key, out int index)) { return index; }
        lock (indexes)
        {
            if (indexes.TryGetValue(key, out index)) { return index; }
            index = indexes.Count;
            indexes.Add(key, index);
            return index;
        }
    }

    /// <summary>
    /// ヘッダーを取得します。
    /// </summary>
    /// <param name="condition">取得条件</param>
    /// <returns>ヘッダー</returns>
    private IEnumerable<ComplexHeader> GetComplexHeaders(Condition condition)
    {
        // 実際にはデータベースなどからデータを取得して返すような実装になります。
        yield return new ComplexHeader() { Id = "1", Description = "オブジェクト1" };
        yield return new ComplexHeader() { Id = "2", Description = "オブジェクト2" };
        yield return new ComplexHeader() { Id = "4", Description = "オブジェクト4" };
    }

    /// <summary>
    /// アイテムを取得します。
    /// </summary>
    /// <param name="condition">取得条件</param>
    /// <returns>アイテム</returns>
    private IEnumerable<ComplexItem> GetComplexItems(Condition condition)
    {
        // 実際にはデータベースなどからデータを取得して返すような実装になります。
        string createdDate = DateTime.Now.ToString();
        yield return new ComplexItem() { Id = "1", CreatedDate = createdDate, Value = 1001 };
        yield return new ComplexItem() { Id = "1", CreatedDate = createdDate, Value = 1002 };
        yield return new ComplexItem() { Id = "2", CreatedDate = createdDate, Value = 2001 };
        yield return new ComplexItem() { Id = "2", CreatedDate = createdDate, Value = 2002 };
        yield return new ComplexItem() { Id = "2", CreatedDate = createdDate, Value = 2003 };
        yield return new ComplexItem() { Id = "4", CreatedDate = createdDate, Value = 4001 };
    }

    /// <summary>
    /// サブアイテムを取得します。
    /// </summary>
    /// <param name="condition">取得条件</param>
    /// <returns>サブアイテム</returns>
    private IEnumerable<ComplexSubItem> GetComplexSubItems(Condition condition)
    {
        // 実際にはデータベースなどからデータを取得して返すような実装になります。
        yield return new ComplexSubItem() { Id = "1", Value = "サブアイテム1-1" };
        yield return new ComplexSubItem() { Id = "5", Value = "サブアイテム5-1" };
    }
}

クライアントの実装

レスポンスとして受け取ったオブジェクトを ComplexObject に格納していきます。

private async Task GetComplexObjects()
{
    Grpc.Core.Channel channel = GetChannel();
    LargeSearch.LargeSearchClient client = new LargeSearch.LargeSearchClient(channel);

    Condition condition = new Condition
    {
        Keyword = "***"
    };

    // 取得したオブジェクトを格納するリスト
    List<ComplexObject> objects = new List<ComplexObject>();

    using (AsyncServerStreamingCall<ComplexObjectResponse> call = client.GetComplexObjects(condition))
    {
        while (await call.ResponseStream.MoveNext().ConfigureAwait(false))
        {
            ComplexObjectResponse response = call.ResponseStream.Current;
            ComplexObject obj = GetOrAdd(objects, response.Index);
            obj.FillResponse(response);
        }
    }

    for (int i = 0; i < objects.Count; ++i)
    {
        Debug.WriteLine(string.Format("ComplexObject[{0}] = {1}"
        , i
        , objects[i] == null ? null
            : Newtonsoft.Json.JsonConvert.SerializeObject(objects[i], Newtonsoft.Json.Formatting.Indented)
        ));
    }

}

/// <summary>
/// 指定されたインデックスに対応するオブジェクトを返します。
/// </summary>
/// <param name="objects">オブジェクトを格納しているリスト</param>
/// <param name="index">インデックス</param>
/// <returns>オブジェクト</returns>
private ComplexObject GetOrAdd(List<ComplexObject> objects, int index)
{
    if (index < objects.Count) { return objects[index]; }
    lock (objects)
    {
        if (index < objects.Count) { return objects[index]; }
        if (index > objects.Count) { objects.AddRange(new ComplexObject[index - objects.Count]); }
        ComplexObject obj = new ComplexObject();
        objects.Add(obj);
        return obj;
    }
}

ComplexHeader, ComplexItem, ComplexSubItem オブジェクトが格納された ComplexObject が返されます。

出力結果
ComplexObject[0] = {
  "Header": {
    "Id": "1",
    "Description": "オブジェクト1"
  },
  "Items": [
    {
      "Id": "1",
      "CreatedDate": "2019/01/01 12:21:30",
      "Value": 1001
    },
    {
      "Id": "1",
      "CreatedDate": "2019/01/01 12:21:30",
      "Value": 1002
    }
  ],
  "SubItems": [
    {
      "Id": "1",
      "Value": "サブアイテム1-1"
    }
  ]
}
ComplexObject[1] = {
  "Header": {
    "Id": "2",
    "Description": "オブジェクト2"
  },
  "Items": [
    {
      "Id": "2",
      "CreatedDate": "2019/01/01 12:21:30",
      "Value": 2001
    },
    {
      "Id": "2",
      "CreatedDate": "2019/01/01 12:21:30",
      "Value": 2002
    },
    {
      "Id": "2",
      "CreatedDate": "2019/01/01 12:21:30",
      "Value": 2003
    }
  ],
  "SubItems": []
}
ComplexObject[2] = {
  "Header": {
    "Id": "4",
    "Description": "オブジェクト4"
  },
  "Items": [
    {
      "Id": "4",
      "CreatedDate": "2019/01/01 12:21:30",
      "Value": 4001
    }
  ],
  "SubItems": []
}
ComplexObject[3] = {
  "Header": null,
  "Items": [],
  "SubItems": [
    {
      "Id": "5",
      "Value": "サブアイテム5-1"
    }
  ]
}
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0