はじめに
MagicOnion
は gRPC
を用いた通信ライブラリです。リクエストやレスポンスとして大きなデータを送受信する場合、ストリームを用いるとスケーラビリティが向上します。MagicOnion
の既定のシリアライザには MessagePack for C#
が採用されていますが、この MessagePack の Union 機能を用いると複数の種類のデータオブジェクトで構成される複合オブジェクトを送受信することが容易になります。その簡単な例を紹介します。
実装例の説明
-
次のデータオブジェクトで構成される販売情報オブジェクトをサーバーから受信します。
- ヘッダー
- 明細
- 明細の商品コードに対応する商品情報
-
明細数や商品数が不定で非常に多い場合があることを想定し、サービスメソッドの種類は ServerStreaming メソッドにします。
データクラスの実装
-
ServerStreaming のデータ型として使用するインターフェースと、実際に送受信するデータクラスを実装します。
-
複合オブジェクト(販売情報)を組み立てるメソッドはデータクラスに実装していますが、どこでも構いません。
- このメソッドの引数には MagicOnion や gRPC で提供されている型でなく、より抽象的にするために IAsyncEnumerable<T> としました。
using MessagePack;
/// <summary>
/// シリアライズ対象の型として使用するインターフェース。
/// </summary>
/// <remarks>
/// 実際の型を <see cref="UnionAttribute"/> 属性で指定します。
/// </remarks>
[Union(0, typeof(SalesInfo.SalesHeader))]
[Union(1, typeof(SalesInfo.SalesDetail))]
[Union(2, typeof(GoodsInfo))]
public interface ISalesObject { }
/// <summary>
/// 販売情報
/// </summary>
/// <remarks>
/// ヘッダー・明細と関連する商品情報で構成された複合オブジェクトです。
/// </remarks>
public class SalesInfo
{
/// <summary>
/// 指定されたストリームからオブジェクトを読み込み、販売情報を組み立てます。
/// </summary>
/// <param name="salesObjects">販売情報を構成するオブジェクトを列挙するストリーム</param>
/// <returns>販売情報</returns>
public static async Task<SalesInfo> BuildAsync(IAsyncEnumerable<ISalesObject> salesObjects)
{
// 取得したオブジェクトをキャストします
SalesHeader header = null!;
var details = new List<SalesDetail>();
var goods = new Dictionary<string, GoodsInfo>();
await foreach (var obj in salesObjects.ConfigureAwait(false))
{
// ヘッダー
if (obj is SalesHeader h)
{
header = h;
}
// 明細
else if (obj is SalesDetail d)
{
details.Add(d);
}
// 商品
else if (obj is GoodsInfo g)
{
goods[g.Code] = g;
}
}
return new SalesInfo(header, details, goods);
}
/// <summary>
/// コンストラクタ
/// </summary>
/// <param name="header"></param>
/// <param name="details"></param>
/// <param name="goods"></param>
private SalesInfo(
SalesHeader header,
IList<SalesDetail> details,
IDictionary<string, GoodsInfo> goods
)
{
Header = header;
Details = details;
Goods = goods;
}
/// <summary>
/// ヘッダーを取得します。
/// </summary>
public SalesHeader Header { get; }
/// <summary>
/// 明細を取得します。
/// </summary>
public IList<SalesDetail> Details { get; }
/// <summary>
/// 関連する商品情報を取得します。
/// </summary>
public IDictionary<string, GoodsInfo> Goods { get; }
/// <summary>
/// 販売情報のヘッダー
/// </summary>
[MessagePackObject]
public class SalesHeader : ISalesObject
{
[Key(0)]
public Guid Id { get; set; }
[Key(1)]
public DateTime SalesDate { get; set; }
}
/// <summary>
/// 販売情報の明細
/// </summary>
[MessagePackObject]
public class SalesDetail : ISalesObject
{
[Key(0)]
public string? GoodsCode { get; set; }
[Key(1)]
public int SalesNumber { get; set; }
[Key(2)]
public decimal SalesAmount { get; set; }
}
}
/// <summary>
/// 商品情報
/// </summary>
[MessagePackObject]
public class GoodsInfo : ISalesObject
{
[Key(0)]
public string Code { get; set; } = null!;
[Key(1)]
public string? Name { get; set; }
[Key(2)]
public decimal Price { get; set; }
}
サービスの実装
- ServerStreaming メソッドを実装します。
using MagicOnion;
public interface ISampleService : IService<ISampleService>
{
/// <summary>
/// 販売情報を取得します。
/// </summary>
/// <param name="saledId">販売情報のID</param>
/// <returns></returns>
Task<ServerStreamingResult<ISalesObject>> GetSalesAsync(Guid saledId);
}
サーバーサイドの実装
- 商品情報を構成するヘッダー・明細・商品情報オブジェクトを取得してストリームに書き込みます。
using MagicOnion;
using MagicOnion.Server;
internal class SampleService : ServiceBase<ISampleService>, ISampleService
{
/// <summary>
/// 販売情報を取得します。
/// </summary>
/// <param name="salesId">販売情報のID</param>
/// <returns></returns>
public async Task<ServerStreamingResult<ISalesObject>> GetSalesAsync(Guid salesId)
{
var context = GetServerStreamingContext<ISalesObject>();
// 販売情報のヘッダーを取得してストリームに書き込みます
await context.WriteAsync(GetHeader(salesId)).ConfigureAwait(false);
// 販売情報の明細と関連する商品情報を取得してストリームに書き込みます
var foundGoods = new Dictionary<string, GoodsInfo>();
foreach (var (detail, goods) in GetDetails(salesId))
{
if (!foundGoods.ContainsKey(goods.Code))
{
foundGoods.Add(goods.Code, goods);
await context.WriteAsync(goods).ConfigureAwait(false);
}
await context.WriteAsync(detail).ConfigureAwait(false);
}
return context.Result();
}
}
クライアントサイドの実装
-
サービスメソッドを呼び出してストリームを取得し、ストリームから販売情報を組み立てて返します。
-
IAsyncStreamReader<T> を IAsyncEnumerable<T> に変換するメソッドは、拡張メソッドとして実装しておくと便利かもしれません。
using MagicOnion.Client;
using Grpc.Net.Client;
using Grpc.Core;
using System.Runtime.CompilerServices;
/// <summary>
/// 販売情報を取得します。
/// </summary>
/// <param name="salesId">販売情報のID</param>
/// <returns></returns>
private async Task<SalesInfo> GetSalesAsync(Guid salesId)
{
// 通常はチャネルは事前に生成したインスタンスを使うと思います
using var channel = GrpcChannel.ForAddress("https://localhost:5001");
var client = MagicOnionClient.Create<ISampleService>(channel);
// サービスのメソッドを呼び出します
using var result = await client.GetSalesAsync(salesId).ConfigureAwait(false);
// ストリームを読み込んで販売情報を組み立てます
return await SalesInfo.BuildAsync(EnumerateAsync(result.ResponseStream)).ConfigureAwait(false);
}
/// <summary>
/// IAsyncStreamReader{T} を IAsyncEnumerable{T} に変換します。
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="reader"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
private static async IAsyncEnumerable<T> EnumerateAsync<T>(
IAsyncStreamReader<T> reader,
[EnumeratorCancellation] CancellationToken cancellationToken = default
)
{
while (await reader.MoveNext(cancellationToken).ConfigureAwait(false))
{
yield return reader.Current;
}
}
おわりに
MagicOnion や gRPC は主にゲームアプリなどでの採用実績が多く、このような大きな複合オブジェクトを送受信する例はあまり見かけません。ゲームアプリ分野ではアンチパターンかもしれません。
一方、私が携わる基幹業務系システムではこのような大きな複合オブジェクトを送受信しなくてはならないことが少なくありません。ストリームを用いることでスケーラビリティを確保しつつ「N+1問題」を回避する手段として有効であると思います。