やりたいこと
ElastiCache(Redisを選択)の値が更新されたらgRPCにて通知する。
利用技術
C#
Redis
gRPC
#ソースコード
ざっくりこんな感じでPubSubを受け取り、gRPCでServer Streamingしていました。
RedisPubSub.cs
RedisUtil.PubSubConnection.Subscribe(x, (channel, message) =>
{
var messageArray = message.ToString().Split(',');
quoteStockResponse.Elemnt = messageArray[0];
quoteStockResponse.Value = messageArray[1];
responseStream.WriteAsync(quoteStockResponse);
});
#問題
Only one write can be pending at a time
というエラーが発生。Async初心者の様なミスをしました。
#ソースコード(修正版)
Queueに詰めて、取り出す処理に変えました。
ElastiCacheClusterConfig.cs
var Queue = new AsyncProducerConsumerQueue<QuoteStockResponse>();
// 同期版
RedisUtil.PubSubConnection.Subscribe(x, (channel, message) =>
{
var messageArray = message.ToString().Split(',');
quoteStockResponse.Elemnt = messageArray[0];
quoteStockResponse.Value = messageArray[1];
Queue.EnqueueAsync(quoteStockResponse);
});
while (true)
{
var value = Queue.DequeueAsync().Result;
await responseStream.WriteAsync(value);
}
#小ネタ
AWSが提供しているSDKはMemcashedにしか対応していません。
そのため、最初はRedisではなくMemcashedを利用してSDKを用いて開発する予定でしたが、以下実装を確認する限り
localhost(踏み台経由)からのアクセスができないため、諦めました。
ElastiCacheClusterConfig.cs
if (setup.ClusterEndPoint.HostName.IndexOf(".cfg", StringComparison.OrdinalIgnoreCase) >= 0)
{
if (setup.ClusterNode != null)
{
var _tries = setup.ClusterNode.NodeTries > 0 ? setup.ClusterNode.NodeTries : DiscoveryNode.DEFAULT_TRY_COUNT;
var _delay = setup.ClusterNode.NodeDelay >= 0 ? setup.ClusterNode.NodeDelay : DiscoveryNode.DEFAULT_TRY_DELAY;
this.DiscoveryNode = new DiscoveryNode(this, setup.ClusterEndPoint.HostName, setup.ClusterEndPoint.Port, _tries, _delay);
}
else
this.DiscoveryNode = new DiscoveryNode(this, setup.ClusterEndPoint.HostName, setup.ClusterEndPoint.Port);
}
else
{
throw new ArgumentException("The provided endpoint does not support auto discovery");
}
社内プロキシ配下で環境している事が悪いって事ですよね・・・つらい・・・
#今後
パフォーマンスについて計測したら、別枠で書きます。