LoginSignup
1

More than 5 years have passed since last update.

AWS ElastiCacheとgRPCを使ってServer Streaming処理を実装する。

Posted at

やりたいこと

ElastiCache(Redisを選択)の値が更新されたらgRPCにて通知する。

利用技術

C#
Redis
gRPC

ソースコード

ざっくりこんな感じでPubSubを受け取り、gRPCでServer Streamingしていました。

RedisPubSub.cs
RedisUtil.PubSubConnection.Subscribe(x, (channel, message) =>
                {
                    var messageArray = message.ToString().Split(',');
                    quoteStockResponse.Elemnt = messageArray[0];
                    quoteStockResponse.Value = messageArray[1];
                    responseStream.WriteAsync(quoteStockResponse);
                            });

問題

Only one write can be pending at a time
というエラーが発生。Async初心者の様なミスをしました。

ソースコード(修正版)

Queueに詰めて、取り出す処理に変えました。

ElastiCacheClusterConfig.cs
var Queue = new AsyncProducerConsumerQueue<QuoteStockResponse>();
            // 同期版
                RedisUtil.PubSubConnection.Subscribe(x, (channel, message) =>
                {
                    var messageArray = message.ToString().Split(',');
                    quoteStockResponse.Elemnt = messageArray[0];
                    quoteStockResponse.Value = messageArray[1];
                    Queue.EnqueueAsync(quoteStockResponse);
            });

while (true)
            {
                var value = Queue.DequeueAsync().Result;
                await responseStream.WriteAsync(value);
            }

小ネタ

AWSが提供しているSDKはMemcashedにしか対応していません。
そのため、最初はRedisではなくMemcashedを利用してSDKを用いて開発する予定でしたが、以下実装を確認する限り
localhost(踏み台経由)からのアクセスができないため、諦めました。

ElastiCacheClusterConfig.cs
 if (setup.ClusterEndPoint.HostName.IndexOf(".cfg", StringComparison.OrdinalIgnoreCase) >= 0)
            {
                if (setup.ClusterNode != null)
                {
                    var _tries = setup.ClusterNode.NodeTries > 0 ? setup.ClusterNode.NodeTries : DiscoveryNode.DEFAULT_TRY_COUNT;
                    var _delay = setup.ClusterNode.NodeDelay >= 0 ? setup.ClusterNode.NodeDelay : DiscoveryNode.DEFAULT_TRY_DELAY;
                    this.DiscoveryNode = new DiscoveryNode(this, setup.ClusterEndPoint.HostName, setup.ClusterEndPoint.Port, _tries, _delay);
                }
                else
                    this.DiscoveryNode = new DiscoveryNode(this, setup.ClusterEndPoint.HostName, setup.ClusterEndPoint.Port);
            }
            else
            {
                throw new ArgumentException("The provided endpoint does not support auto discovery");
            }

社内プロキシ配下で環境している事が悪いって事ですよね・・・つらい・・・

今後

パフォーマンスについて計測したら、別枠で書きます。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1